1use async_channel::{Receiver, Sender};
2use async_std::{prelude::StreamExt, task::spawn};
3use futures::io::{AsyncBufReadExt, AsyncRead, BufReader};
4use isahc::{http::Response, ResponseExt};
5use std::{error::Error, fmt::Debug, io::Read, str::FromStr, sync::Arc};
6use tracing::{debug, error, info, trace, warn};
7use twilight_cache_inmemory::{EventType, InMemoryCache};
8use twilight_gateway::{cluster::Cluster, Event};
9use twilight_http::Client as HttpClient;
10use twilight_model::{
11 gateway::{
12 payload::update_status::UpdateStatusInfo,
13 presence::{Activity, ActivityType, Status},
14 Intents,
15 },
16 id::{ChannelId, GuildId, UserId},
17};
18
19use crate::{
20 act::{Act, Stage},
21 raccord,
22};
23
24pub struct Forward {
25 pub cache: InMemoryCache,
26 pub cluster: Cluster,
27 pub http: HttpClient,
28}
29
30impl Forward {
31 pub async fn init(
32 token: String,
33 target: Arc<raccord::Client>,
34 ) -> Result<Self, Box<dyn Error + Send + Sync>> {
35 let mut update_status = None;
36 if let Ok(mut connecting_res) = target.get(raccord::Connecting)?.await {
37 if connecting_res.status().is_success() {
38 let content_type = connecting_res
39 .headers()
40 .get("content-type")
41 .and_then(|s| s.to_str().ok())
42 .and_then(|s| mime::Mime::from_str(s).ok())
43 .unwrap_or(mime::APPLICATION_OCTET_STREAM);
44
45 if content_type == mime::APPLICATION_JSON {
46 let presence: raccord::Presence = connecting_res.json()?;
47
48 update_status = Some(UpdateStatusInfo {
49 afk: presence.afk.unwrap_or(true),
50 since: presence.since,
51 status: presence.status.unwrap_or(Status::Online),
52 activities: presence.activity.map(|activity| {
53 let (kind, name) = match activity {
54 raccord::Activity::Playing { name } => {
55 (ActivityType::Playing, name)
56 }
57 raccord::Activity::Streaming { name } => {
58 (ActivityType::Streaming, name)
59 }
60 raccord::Activity::Listening { name } => {
61 (ActivityType::Listening, name)
62 }
63 raccord::Activity::Watching { name } => {
64 (ActivityType::Watching, name)
65 }
66 raccord::Activity::Custom { name } => (ActivityType::Custom, name),
67 };
68
69 vec![Activity {
70 application_id: None,
71 assets: None,
72 created_at: None,
73 details: None,
74 emoji: None,
75 flags: None,
76 id: None,
77 instance: None,
78 party: None,
79 secrets: None,
80 state: None,
81 timestamps: None,
82 url: None,
83 kind,
84 name,
85 }]
86 }),
87 });
88 }
89 }
90 }
91
92 let mut config = Cluster::builder(
94 &token,
95 Intents::DIRECT_MESSAGES | Intents::GUILD_MESSAGES | Intents::GUILD_MEMBERS,
96 );
97
98 if let Some(presence) = update_status {
99 config = config.presence(presence);
100 }
101
102 let cluster = config.build().await?;
103
104 let cluster_spawn = cluster.clone();
105 spawn(async move {
106 cluster_spawn.up().await;
107 });
108
109 let http = HttpClient::new(&token);
110
111 let cache = InMemoryCache::builder()
112 .event_types(
113 EventType::MESSAGE_CREATE
114 | EventType::MESSAGE_DELETE
115 | EventType::MESSAGE_DELETE_BULK
116 | EventType::MESSAGE_UPDATE
117 | EventType::MEMBER_ADD
118 | EventType::MEMBER_CHUNK
119 | EventType::MEMBER_UPDATE
120 | EventType::MEMBER_REMOVE,
121 )
122 .build();
123
124 Ok(Self {
125 cache,
126 cluster,
127 http,
128 })
129 }
130
131 pub async fn worker(
132 self,
133 target: Arc<raccord::Client>,
134 ghosts: Receiver<(u64, Event)>,
135 player: Sender<Stage>,
136 ) -> Result<(), Box<dyn Error + Send + Sync>> {
137 let solids = self.cluster.events();
138 let mut events = solids.merge(ghosts);
139
140 while let Some((shard_id, event)) = events.next().await {
141 spawn(handle_event(
142 self.cache.clone(),
143 target.clone(),
144 shard_id,
145 event,
146 player.clone(),
147 ));
148 }
149
150 Ok(())
151 }
152}
153
154pub async fn handle_event(
155 cache: InMemoryCache,
156 target: Arc<raccord::Client>,
157 shard_id: u64,
158 event: Event,
159 player: Sender<Stage>,
160) {
161 if let Err(err) = try_event(cache, target, shard_id, event, player).await {
162 error!("got error while handling event:\n{}", err);
163 }
164}
165
166pub async fn try_event(
167 cache: InMemoryCache,
168 target: Arc<raccord::Client>,
169 shard_id: u64,
170 event: Event,
171 player: Sender<Stage>,
172) -> Result<(), Box<dyn Error + Send + Sync>> {
173 trace!("updating twilight cache");
174 cache.update(&event);
175
176 match event {
177 Event::MessageCreate(message) if message.guild_id.is_some() => {
178 debug!("received guild message create");
179 let msg = raccord::ServerMessage::from(&**message);
180 let res = if let Some(command) = target.parse_command(&msg.content) {
181 trace!("submitting act: {:?}", command);
182 target.post(raccord::Command {
183 command,
184 message: msg,
185 })
186 } else {
187 trace!("submitting act: {:?}", msg);
188 target.post(msg)
189 }?
190 .await?;
191 trace!("handing off response: {:?}", res);
192 handle_response(
193 res,
194 player,
195 Some(message.guild_id.unwrap()),
196 Some(message.channel_id),
197 None,
198 )
199 .await?;
200 }
201 Event::MessageCreate(message) => {
202 debug!("received direct message create");
203 let msg = raccord::DirectMessage::from(&**message);
204 let res = if let Some(command) = target.parse_command(&msg.content) {
205 trace!("submitting act: {:?}", command);
206 target.post(raccord::Command {
207 command,
208 message: msg,
209 })
210 } else {
211 trace!("submitting act: {:?}", msg);
212 target.post(msg)
213 }?
214 .await?;
215 trace!("handing off response: {:?}", res);
216 handle_response(res, player, None, Some(message.channel_id), None).await?;
217 }
218 Event::MemberAdd(mem) => {
219 debug!("received guild member join");
220 let member = raccord::Member::from(&**mem);
221 trace!("submitting act: {:?}", member);
222 let res = target.post(raccord::ServerJoin(member))?.await?;
223 trace!("handing off response: {:?}", res);
224 handle_response(res, player, Some(mem.guild_id), None, None).await?;
225 }
226 Event::ShardConnected(_) => {
227 info!("connected on shard {}", shard_id);
228 let res = target.post(raccord::Connected { shard: shard_id })?.await?;
229 trace!("handing off response: {:?}", res);
230 handle_response(res, player, None, None, None).await?;
231 }
232 _ => {}
233 }
234
235 Ok(())
236}
237
238async fn handle_response<T: Debug + Read + AsyncRead + Unpin>(
239 mut res: Response<T>,
240 player: Sender<Stage>,
241 from_server: Option<GuildId>,
242 from_channel: Option<ChannelId>,
243 _from_user: Option<UserId>,
244) -> Result<(), Box<dyn Error + Send + Sync>> {
245 let status = res.status();
246 if status.is_informational() {
247 warn!("unhandled information code {:?}", status);
248 }
249
250 if status == 204 || status == 404 {
251 trace!("no action response: {:?}", status);
253 return Ok(());
254 }
255
256 if status.is_redirection() {
257 match status.into() {
258 300 => warn!("TODO: multiple choice (http 300) is not designed yet"),
259 301 | 302 | 303 | 307 | 308 => unreachable!("redirects should be handled by curl"),
260 304 => error!("http 304 response caching not implemented"),
261 305 | 306 => error!("proxy redirections (http 305 and 306) unsupported"),
262 _ => error!("invalid 3xx code"),
263 }
264
265 return Ok(());
266 }
267
268 if status.is_client_error() || status.is_server_error() {
269 error!(
270 "error {:?} from target, TODO: more error handling here",
271 status
272 );
273 return Ok(());
274 }
275
276 if !status.is_success() {
277 error!("invalid response status: {:?}", status);
278 return Ok(());
279 }
280
281 let content_type = res
282 .headers()
283 .get("content-type")
284 .and_then(|s| s.to_str().ok())
285 .and_then(|s| mime::Mime::from_str(s).ok())
286 .unwrap_or(mime::APPLICATION_OCTET_STREAM);
287 trace!("receiving content of type: {:?}", content_type);
288
289 match (content_type.type_(), content_type.subtype()) {
290 (mime::APPLICATION, mime::JSON) => {
291 let default_server_id = res
292 .headers()
293 .get("accord-server-id")
294 .and_then(|h| h.to_str().ok())
295 .and_then(|s| u64::from_str(s).ok())
296 .map(GuildId)
297 .or(from_server);
298 let default_channel_id = res
299 .headers()
300 .get("accord-channel-id")
301 .and_then(|h| h.to_str().ok())
302 .and_then(|s| u64::from_str(s).ok())
303 .map(ChannelId)
304 .or(from_channel);
305 let has_content_length = res
306 .headers()
307 .get("content-length")
308 .and_then(|s| s.to_str().ok())
309 .and_then(|s| usize::from_str(s).ok())
310 .unwrap_or(0) > 0;
311
312 if has_content_length {
313 info!("response has content-length, parsing single act");
314 let act: Act = res.json()?;
315 trace!("parsed act: {:?}", &act);
316 player
317 .send(Stage {
318 act,
319 default_server_id,
320 default_channel_id,
321 })
322 .await?;
323 } else {
324 info!("response has no content-length, streaming multiple acts");
325 let mut lines = BufReader::new(res.into_body()).lines();
326 loop {
327 if let Some(line) = lines.next().await {
328 let line = line?;
329 trace!("got line: {:?}", line);
330 let act: Act = serde_json::from_str(line.trim())?;
331 trace!("parsed act: {:?}", &act);
332 player
333 .send(Stage {
334 act,
335 default_server_id,
336 default_channel_id,
337 })
338 .await?;
339 } else {
340 break;
341 }
342 }
343 info!("done streaming");
344 }
345 }
346 (mime::TEXT, mime::PLAIN) => {
347 let content = res.text()?;
348 let header_channel = res
349 .headers()
350 .get("accord-channel-id")
351 .and_then(|h| h.to_str().ok())
352 .and_then(|s| u64::from_str(s).ok());
353
354 player
355 .send(Stage {
356 act: Act::CreateMessage {
357 content,
358 channel_id: header_channel,
359 },
360 default_server_id: from_server,
361 default_channel_id: from_channel,
362 })
363 .await?;
364 }
365 (t, s) => warn!("unhandled content-type {}/{}", t, s),
366 }
367
368 Ok(())
369}