accord/
forward.rs

1use async_channel::{Receiver, Sender};
2use async_std::{prelude::StreamExt, task::spawn};
3use futures::io::{AsyncBufReadExt, AsyncRead, BufReader};
4use isahc::{http::Response, ResponseExt};
5use std::{error::Error, fmt::Debug, io::Read, str::FromStr, sync::Arc};
6use tracing::{debug, error, info, trace, warn};
7use twilight_cache_inmemory::{EventType, InMemoryCache};
8use twilight_gateway::{cluster::Cluster, Event};
9use twilight_http::Client as HttpClient;
10use twilight_model::{
11	gateway::{
12		payload::update_status::UpdateStatusInfo,
13		presence::{Activity, ActivityType, Status},
14		Intents,
15	},
16	id::{ChannelId, GuildId, UserId},
17};
18
19use crate::{
20	act::{Act, Stage},
21	raccord,
22};
23
24pub struct Forward {
25	pub cache: InMemoryCache,
26	pub cluster: Cluster,
27	pub http: HttpClient,
28}
29
30impl Forward {
31	pub async fn init(
32		token: String,
33		target: Arc<raccord::Client>,
34	) -> Result<Self, Box<dyn Error + Send + Sync>> {
35		let mut update_status = None;
36		if let Ok(mut connecting_res) = target.get(raccord::Connecting)?.await {
37			if connecting_res.status().is_success() {
38				let content_type = connecting_res
39					.headers()
40					.get("content-type")
41					.and_then(|s| s.to_str().ok())
42					.and_then(|s| mime::Mime::from_str(s).ok())
43					.unwrap_or(mime::APPLICATION_OCTET_STREAM);
44
45				if content_type == mime::APPLICATION_JSON {
46					let presence: raccord::Presence = connecting_res.json()?;
47
48					update_status = Some(UpdateStatusInfo {
49						afk: presence.afk.unwrap_or(true),
50						since: presence.since,
51						status: presence.status.unwrap_or(Status::Online),
52						activities: presence.activity.map(|activity| {
53							let (kind, name) = match activity {
54								raccord::Activity::Playing { name } => {
55									(ActivityType::Playing, name)
56								}
57								raccord::Activity::Streaming { name } => {
58									(ActivityType::Streaming, name)
59								}
60								raccord::Activity::Listening { name } => {
61									(ActivityType::Listening, name)
62								}
63								raccord::Activity::Watching { name } => {
64									(ActivityType::Watching, name)
65								}
66								raccord::Activity::Custom { name } => (ActivityType::Custom, name),
67							};
68
69							vec![Activity {
70								application_id: None,
71								assets: None,
72								created_at: None,
73								details: None,
74								emoji: None,
75								flags: None,
76								id: None,
77								instance: None,
78								party: None,
79								secrets: None,
80								state: None,
81								timestamps: None,
82								url: None,
83								kind,
84								name,
85							}]
86						}),
87					});
88				}
89			}
90		}
91
92		// TODO: env var control for intents (notably for privileged intents)
93		let mut config = Cluster::builder(
94			&token,
95			Intents::DIRECT_MESSAGES | Intents::GUILD_MESSAGES | Intents::GUILD_MEMBERS,
96		);
97
98		if let Some(presence) = update_status {
99			config = config.presence(presence);
100		}
101
102		let cluster = config.build().await?;
103
104		let cluster_spawn = cluster.clone();
105		spawn(async move {
106			cluster_spawn.up().await;
107		});
108
109		let http = HttpClient::new(&token);
110
111		let cache = InMemoryCache::builder()
112			.event_types(
113				EventType::MESSAGE_CREATE
114					| EventType::MESSAGE_DELETE
115					| EventType::MESSAGE_DELETE_BULK
116					| EventType::MESSAGE_UPDATE
117					| EventType::MEMBER_ADD
118					| EventType::MEMBER_CHUNK
119					| EventType::MEMBER_UPDATE
120					| EventType::MEMBER_REMOVE,
121			)
122			.build();
123
124		Ok(Self {
125			cache,
126			cluster,
127			http,
128		})
129	}
130
131	pub async fn worker(
132		self,
133		target: Arc<raccord::Client>,
134		ghosts: Receiver<(u64, Event)>,
135		player: Sender<Stage>,
136	) -> Result<(), Box<dyn Error + Send + Sync>> {
137		let solids = self.cluster.events();
138		let mut events = solids.merge(ghosts);
139
140		while let Some((shard_id, event)) = events.next().await {
141			spawn(handle_event(
142				self.cache.clone(),
143				target.clone(),
144				shard_id,
145				event,
146				player.clone(),
147			));
148		}
149
150		Ok(())
151	}
152}
153
154pub async fn handle_event(
155	cache: InMemoryCache,
156	target: Arc<raccord::Client>,
157	shard_id: u64,
158	event: Event,
159	player: Sender<Stage>,
160) {
161	if let Err(err) = try_event(cache, target, shard_id, event, player).await {
162		error!("got error while handling event:\n{}", err);
163	}
164}
165
166pub async fn try_event(
167	cache: InMemoryCache,
168	target: Arc<raccord::Client>,
169	shard_id: u64,
170	event: Event,
171	player: Sender<Stage>,
172) -> Result<(), Box<dyn Error + Send + Sync>> {
173	trace!("updating twilight cache");
174	cache.update(&event);
175
176	match event {
177		Event::MessageCreate(message) if message.guild_id.is_some() => {
178			debug!("received guild message create");
179			let msg = raccord::ServerMessage::from(&**message);
180			let res = if let Some(command) = target.parse_command(&msg.content) {
181				trace!("submitting act: {:?}", command);
182				target.post(raccord::Command {
183					command,
184					message: msg,
185				})
186			} else {
187				trace!("submitting act: {:?}", msg);
188				target.post(msg)
189			}?
190			.await?;
191			trace!("handing off response: {:?}", res);
192			handle_response(
193				res,
194				player,
195				Some(message.guild_id.unwrap()),
196				Some(message.channel_id),
197				None,
198			)
199			.await?;
200		}
201		Event::MessageCreate(message) => {
202			debug!("received direct message create");
203			let msg = raccord::DirectMessage::from(&**message);
204			let res = if let Some(command) = target.parse_command(&msg.content) {
205				trace!("submitting act: {:?}", command);
206				target.post(raccord::Command {
207					command,
208					message: msg,
209				})
210			} else {
211				trace!("submitting act: {:?}", msg);
212				target.post(msg)
213			}?
214			.await?;
215			trace!("handing off response: {:?}", res);
216			handle_response(res, player, None, Some(message.channel_id), None).await?;
217		}
218		Event::MemberAdd(mem) => {
219			debug!("received guild member join");
220			let member = raccord::Member::from(&**mem);
221			trace!("submitting act: {:?}", member);
222			let res = target.post(raccord::ServerJoin(member))?.await?;
223			trace!("handing off response: {:?}", res);
224			handle_response(res, player, Some(mem.guild_id), None, None).await?;
225		}
226		Event::ShardConnected(_) => {
227			info!("connected on shard {}", shard_id);
228			let res = target.post(raccord::Connected { shard: shard_id })?.await?;
229			trace!("handing off response: {:?}", res);
230			handle_response(res, player, None, None, None).await?;
231		}
232		_ => {}
233	}
234
235	Ok(())
236}
237
238async fn handle_response<T: Debug + Read + AsyncRead + Unpin>(
239	mut res: Response<T>,
240	player: Sender<Stage>,
241	from_server: Option<GuildId>,
242	from_channel: Option<ChannelId>,
243	_from_user: Option<UserId>,
244) -> Result<(), Box<dyn Error + Send + Sync>> {
245	let status = res.status();
246	if status.is_informational() {
247		warn!("unhandled information code {:?}", status);
248	}
249
250	if status == 204 || status == 404 {
251		// no content, no action
252		trace!("no action response: {:?}", status);
253		return Ok(());
254	}
255
256	if status.is_redirection() {
257		match status.into() {
258			300 => warn!("TODO: multiple choice (http 300) is not designed yet"),
259			301 | 302 | 303 | 307 | 308 => unreachable!("redirects should be handled by curl"),
260			304 => error!("http 304 response caching not implemented"),
261			305 | 306 => error!("proxy redirections (http 305 and 306) unsupported"),
262			_ => error!("invalid 3xx code"),
263		}
264
265		return Ok(());
266	}
267
268	if status.is_client_error() || status.is_server_error() {
269		error!(
270			"error {:?} from target, TODO: more error handling here",
271			status
272		);
273		return Ok(());
274	}
275
276	if !status.is_success() {
277		error!("invalid response status: {:?}", status);
278		return Ok(());
279	}
280
281	let content_type = res
282		.headers()
283		.get("content-type")
284		.and_then(|s| s.to_str().ok())
285		.and_then(|s| mime::Mime::from_str(s).ok())
286		.unwrap_or(mime::APPLICATION_OCTET_STREAM);
287	trace!("receiving content of type: {:?}", content_type);
288
289	match (content_type.type_(), content_type.subtype()) {
290		(mime::APPLICATION, mime::JSON) => {
291			let default_server_id = res
292				.headers()
293				.get("accord-server-id")
294				.and_then(|h| h.to_str().ok())
295				.and_then(|s| u64::from_str(s).ok())
296				.map(GuildId)
297				.or(from_server);
298			let default_channel_id = res
299				.headers()
300				.get("accord-channel-id")
301				.and_then(|h| h.to_str().ok())
302				.and_then(|s| u64::from_str(s).ok())
303				.map(ChannelId)
304				.or(from_channel);
305			let has_content_length = res
306				.headers()
307				.get("content-length")
308				.and_then(|s| s.to_str().ok())
309				.and_then(|s| usize::from_str(s).ok())
310				.unwrap_or(0) > 0;
311
312			if has_content_length {
313				info!("response has content-length, parsing single act");
314				let act: Act = res.json()?;
315				trace!("parsed act: {:?}", &act);
316				player
317					.send(Stage {
318						act,
319						default_server_id,
320						default_channel_id,
321					})
322					.await?;
323			} else {
324				info!("response has no content-length, streaming multiple acts");
325				let mut lines = BufReader::new(res.into_body()).lines();
326				loop {
327					if let Some(line) = lines.next().await {
328						let line = line?;
329						trace!("got line: {:?}", line);
330						let act: Act = serde_json::from_str(line.trim())?;
331						trace!("parsed act: {:?}", &act);
332						player
333							.send(Stage {
334								act,
335								default_server_id,
336								default_channel_id,
337							})
338							.await?;
339					} else {
340						break;
341					}
342				}
343				info!("done streaming");
344			}
345		}
346		(mime::TEXT, mime::PLAIN) => {
347			let content = res.text()?;
348			let header_channel = res
349				.headers()
350				.get("accord-channel-id")
351				.and_then(|h| h.to_str().ok())
352				.and_then(|s| u64::from_str(s).ok());
353
354			player
355				.send(Stage {
356					act: Act::CreateMessage {
357						content,
358						channel_id: header_channel,
359					},
360					default_server_id: from_server,
361					default_channel_id: from_channel,
362				})
363				.await?;
364		}
365		(t, s) => warn!("unhandled content-type {}/{}", t, s),
366	}
367
368	Ok(())
369}