1pub mod handlers;
2pub mod wheel;
3
4use crate::{
5 io,
6 proto::{self, CommandKind, Event, EventKind, Rpc},
7 types::ErrorPayloadStack,
8 Error,
9};
10use crossbeam_channel as cc;
11
12#[derive(Debug)]
14pub enum DiscordMsg {
15 Event(Event),
16 Error(Error),
17}
18
19#[async_trait::async_trait]
20pub trait DiscordHandler: Send + Sync {
21 async fn on_message(&self, msg: DiscordMsg);
23}
24
25pub(crate) fn handler_task(
29 handler: Box<dyn DiscordHandler>,
30 subscriptions: crate::Subscriptions,
31 stx: cc::Sender<Option<Vec<u8>>>,
32 mut rrx: tokio::sync::mpsc::Receiver<io::IoMsg>,
33 state: crate::State,
34) -> tokio::task::JoinHandle<()> {
35 tokio::task::spawn(async move {
36 tracing::debug!("starting handler loop");
37
38 let pop_nonce = |nonce: usize| -> Option<crate::NotifyItem> {
39 let mut lock = state.notify_queue.lock();
40
41 lock.iter()
42 .position(|item| item.nonce == nonce)
43 .map(|position| lock.swap_remove(position))
44 };
45
46 let (user_tx, mut user_rx) = tokio::sync::mpsc::unbounded_channel();
49 let user_task = tokio::task::spawn(async move {
50 while let Some(msg) = user_rx.recv().await {
51 handler.on_message(msg).await;
52 }
53 });
54
55 macro_rules! user_send {
56 ($msg:expr) => {
57 if user_tx.send($msg).is_err() {
58 tracing::warn!("user handler task has been dropped");
59 }
60 };
61 }
62
63 while let Some(io_msg) = rrx.recv().await {
64 let msg = match io_msg {
65 io::IoMsg::Disconnected(err) => {
66 user_send!(DiscordMsg::Event(Event::Disconnected { reason: err }));
67 continue;
68 }
69 io::IoMsg::Frame(frame) => process_frame(frame),
70 };
71
72 match msg {
73 Msg::Event(event) => {
74 if let Event::Ready { .. } = &event {
75 subscribe_task(subscriptions, stx.clone());
79 }
80
81 user_send!(DiscordMsg::Event(event));
82 }
83 Msg::Command { command, kind } => {
84 if let crate::proto::Command::Subscribe { evt } = &command.inner {
86 tracing::debug!(event = ?evt, "subscription succeeded");
87 continue;
88 }
89
90 match pop_nonce(command.nonce) {
91 Some(ni) => {
92 if ni
93 .tx
94 .send(if ni.cmd == kind {
95 Ok(command.inner)
96 } else {
97 Err(Error::Discord(crate::DiscordErr::MismatchedResponse {
98 expected: ni.cmd,
99 actual: kind,
100 nonce: command.nonce,
101 }))
102 })
103 .is_err()
104 {
105 tracing::warn!(
106 cmd = ?kind,
107 nonce = command.nonce,
108 "command response dropped as receiver was closed",
109 );
110 }
111 }
112 None => {
113 tracing::warn!(
114 cmd = ?command.inner,
115 nonce = command.nonce,
116 "received a command response with an unknown nonce",
117 );
118 }
119 }
120 }
121 Msg::Error { nonce, error, .. } => match nonce {
122 Some(nonce) => match pop_nonce(nonce) {
123 Some(ni) => {
124 if let Err(err) = ni.tx.send(Err(error)) {
125 tracing::warn!(
126 error = ?err,
127 nonce = nonce,
128 "error result dropped as receiver was closed",
129 );
130 }
131 }
132 None => {
133 user_send!(DiscordMsg::Error(error));
134 }
135 },
136 None => {
137 user_send!(DiscordMsg::Error(error));
138 }
139 },
140 }
141 }
142
143 drop(user_tx);
144 let _ = user_task.await;
145 })
146}
147
148#[derive(Debug)]
149pub(crate) enum Msg {
150 Command {
151 command: proto::command::CommandFrame,
152 kind: CommandKind,
153 },
154 Event(Event),
155 Error {
156 nonce: Option<usize>,
157 error: Error,
158 },
159}
160
161fn process_frame(data_buf: Vec<u8>) -> Msg {
162 #[derive(serde::Deserialize)]
170 struct RawMsg {
171 cmd: Option<CommandKind>,
172 evt: Option<EventKind>,
173 #[serde(deserialize_with = "crate::util::string::deserialize_opt")]
174 nonce: Option<usize>,
175 }
176
177 let rm: RawMsg = match serde_json::from_slice(&data_buf) {
178 Ok(f) => f,
179 Err(e) => {
180 tracing::warn!(
181 "Failed to deserialize message: {} {}",
182 e,
183 std::str::from_utf8(&data_buf).unwrap(),
184 );
185
186 return Msg::Error {
187 nonce: None,
188 error: Error::Json(e),
189 };
190 }
191 };
192
193 match rm.evt {
194 Some(EventKind::Error) => {
195 #[derive(serde::Deserialize)]
196 struct ErrorMsg<'stack> {
197 #[serde(borrow)]
198 data: Option<ErrorPayloadStack<'stack>>,
199 }
200
201 match serde_json::from_slice::<ErrorMsg<'_>>(&data_buf) {
202 Ok(em) => Msg::Error {
203 nonce: rm.nonce,
204 error: Error::Discord(crate::DiscordErr::Api(em.data.into())),
205 },
206 Err(e) => Msg::Error {
207 nonce: rm.nonce,
208 error: Error::Discord(crate::DiscordErr::Api(crate::DiscordApiErr::Generic {
209 code: None,
210 message: Some(format!("failed to deserialize error: {}", e)),
211 })),
212 },
213 }
214 }
215 Some(_) => match serde_json::from_slice::<proto::event::EventFrame>(&data_buf) {
216 Ok(event_frame) => Msg::Event(event_frame.inner),
217 Err(e) => {
218 tracing::warn!(
219 "failed to deserialize event: {:?}",
220 std::str::from_utf8(&data_buf)
221 );
222 Msg::Error {
223 nonce: rm.nonce,
224 error: Error::Json(e),
225 }
226 }
227 },
228 None => match serde_json::from_slice(&data_buf) {
229 Ok(cmd_frame) => Msg::Command {
230 command: cmd_frame,
231 kind: rm
232 .cmd
233 .expect("successfully deserialized command with 'cmd' field"),
234 },
235 Err(e) => {
236 tracing::warn!(
237 "failed to deserialize command: {:?}",
238 std::str::from_utf8(&data_buf)
239 );
240
241 Msg::Error {
242 nonce: rm.nonce,
243 error: Error::Json(e),
244 }
245 }
246 },
247 }
248}
249
250fn subscribe_task(subs: crate::Subscriptions, stx: cc::Sender<Option<Vec<u8>>>) {
251 tokio::task::spawn(async move {
252 let mut buffer = Vec::with_capacity(1024);
255 let mut nonce = 1usize;
256
257 let mut push = |kind: EventKind| {
258 #[cfg(target_pointer_width = "32")]
259 let nunce = 0x10000000 | nonce;
260 #[cfg(target_pointer_width = "64")]
261 let nunce = 0x1000000000000000 | nonce;
262
263 let _ = io::serialize_message(
264 io::OpCode::Frame,
265 &Rpc::<()> {
266 cmd: CommandKind::Subscribe,
267 evt: Some(kind),
268 nonce: nunce.to_string(),
269 args: None,
270 },
271 &mut buffer,
272 );
273
274 nonce += 1;
275 };
276
277 let activity = if subs.contains(crate::Subscriptions::ACTIVITY) {
278 [
279 EventKind::ActivityInvite,
280 EventKind::ActivityJoin,
281 EventKind::ActivityJoinRequest,
282 EventKind::ActivitySpectate,
283 ]
284 .iter()
285 } else {
286 [].iter()
287 };
288
289 let user = if subs.contains(crate::Subscriptions::USER) {
290 [EventKind::CurrentUserUpdate].iter()
291 } else {
292 [].iter()
293 };
294
295 let relations = if subs.contains(crate::Subscriptions::RELATIONSHIPS) {
296 [EventKind::RelationshipUpdate].iter()
297 } else {
298 [].iter()
299 };
300
301 activity.chain(user).chain(relations).for_each(|kind| {
302 push(*kind);
303 });
304
305 if subs.contains(crate::Subscriptions::OVERLAY) {
308 #[cfg(target_pointer_width = "32")]
309 let nunce = 0x10000000 | nonce;
310 #[cfg(target_pointer_width = "64")]
311 let nunce = 0x1000000000000000 | nonce;
312
313 let _ = io::serialize_message(
314 io::OpCode::Frame,
315 &Rpc {
316 cmd: CommandKind::Subscribe,
317 evt: Some(EventKind::OverlayUpdate),
318 nonce: nunce.to_string(),
319 args: Some(crate::overlay::OverlayPidArgs::new()),
320 },
321 &mut buffer,
322 );
323
324 }
326
327 if stx.send(Some(buffer)).is_err() {
328 tracing::warn!("unable to send subscription RPCs to I/O task");
329 }
330 });
331}