discord_sdk/
handler.rs

1pub mod handlers;
2pub mod wheel;
3
4use crate::{
5    io,
6    proto::{self, CommandKind, Event, EventKind, Rpc},
7    types::ErrorPayloadStack,
8    Error,
9};
10use crossbeam_channel as cc;
11
12/// An event or error sent from Discord
13#[derive(Debug)]
14pub enum DiscordMsg {
15    Event(Event),
16    Error(Error),
17}
18
19#[async_trait::async_trait]
20pub trait DiscordHandler: Send + Sync {
21    /// Method called when an [`Event`] or [`Error`] is received from Discord
22    async fn on_message(&self, msg: DiscordMsg);
23}
24
25/// Creates a task which receives raw frame buffers and deserializes them, and either
26/// notifying the awaiting oneshot for a command response, or in the case of events,
27/// broadcasting the event to
28pub(crate) fn handler_task(
29    handler: Box<dyn DiscordHandler>,
30    subscriptions: crate::Subscriptions,
31    stx: cc::Sender<Option<Vec<u8>>>,
32    mut rrx: tokio::sync::mpsc::Receiver<io::IoMsg>,
33    state: crate::State,
34) -> tokio::task::JoinHandle<()> {
35    tokio::task::spawn(async move {
36        tracing::debug!("starting handler loop");
37
38        let pop_nonce = |nonce: usize| -> Option<crate::NotifyItem> {
39            let mut lock = state.notify_queue.lock();
40
41            lock.iter()
42                .position(|item| item.nonce == nonce)
43                .map(|position| lock.swap_remove(position))
44        };
45
46        // Shunt the user handler to a separate task so that we don't care about it blocking
47        // when handling events
48        let (user_tx, mut user_rx) = tokio::sync::mpsc::unbounded_channel();
49        let user_task = tokio::task::spawn(async move {
50            while let Some(msg) = user_rx.recv().await {
51                handler.on_message(msg).await;
52            }
53        });
54
55        macro_rules! user_send {
56            ($msg:expr) => {
57                if user_tx.send($msg).is_err() {
58                    tracing::warn!("user handler task has been dropped");
59                }
60            };
61        }
62
63        while let Some(io_msg) = rrx.recv().await {
64            let msg = match io_msg {
65                io::IoMsg::Disconnected(err) => {
66                    user_send!(DiscordMsg::Event(Event::Disconnected { reason: err }));
67                    continue;
68                }
69                io::IoMsg::Frame(frame) => process_frame(frame),
70            };
71
72            match msg {
73                Msg::Event(event) => {
74                    if let Event::Ready { .. } = &event {
75                        // Spawn a task that subscribes to all of the events
76                        // that the caller was interested in when we've finished
77                        // the handshake with Discord
78                        subscribe_task(subscriptions, stx.clone());
79                    }
80
81                    user_send!(DiscordMsg::Event(event));
82                }
83                Msg::Command { command, kind } => {
84                    // Some commands can also be turned into events for consistency
85                    if let crate::proto::Command::Subscribe { evt } = &command.inner {
86                        tracing::debug!(event = ?evt, "subscription succeeded");
87                        continue;
88                    }
89
90                    match pop_nonce(command.nonce) {
91                        Some(ni) => {
92                            if ni
93                                .tx
94                                .send(if ni.cmd == kind {
95                                    Ok(command.inner)
96                                } else {
97                                    Err(Error::Discord(crate::DiscordErr::MismatchedResponse {
98                                        expected: ni.cmd,
99                                        actual: kind,
100                                        nonce: command.nonce,
101                                    }))
102                                })
103                                .is_err()
104                            {
105                                tracing::warn!(
106                                    cmd = ?kind,
107                                    nonce = command.nonce,
108                                    "command response dropped as receiver was closed",
109                                );
110                            }
111                        }
112                        None => {
113                            tracing::warn!(
114                                cmd = ?command.inner,
115                                nonce = command.nonce,
116                                "received a command response with an unknown nonce",
117                            );
118                        }
119                    }
120                }
121                Msg::Error { nonce, error, .. } => match nonce {
122                    Some(nonce) => match pop_nonce(nonce) {
123                        Some(ni) => {
124                            if let Err(err) = ni.tx.send(Err(error)) {
125                                tracing::warn!(
126                                    error = ?err,
127                                    nonce = nonce,
128                                    "error result dropped as receiver was closed",
129                                );
130                            }
131                        }
132                        None => {
133                            user_send!(DiscordMsg::Error(error));
134                        }
135                    },
136                    None => {
137                        user_send!(DiscordMsg::Error(error));
138                    }
139                },
140            }
141        }
142
143        drop(user_tx);
144        let _ = user_task.await;
145    })
146}
147
148#[derive(Debug)]
149pub(crate) enum Msg {
150    Command {
151        command: proto::command::CommandFrame,
152        kind: CommandKind,
153    },
154    Event(Event),
155    Error {
156        nonce: Option<usize>,
157        error: Error,
158    },
159}
160
161fn process_frame(data_buf: Vec<u8>) -> Msg {
162    // Discord echoes back our requests with the same nonce they were sent
163    // with, however for those echoes, the "evt" field is not set, other than
164    // for the "ERROR" RPC type, so we attempt to deserialize those two
165    // cases first so we can just ignore the echoes and move on to avoid
166    // further complicating the deserialization of the RPCs we actually
167    // care about
168
169    #[derive(serde::Deserialize)]
170    struct RawMsg {
171        cmd: Option<CommandKind>,
172        evt: Option<EventKind>,
173        #[serde(deserialize_with = "crate::util::string::deserialize_opt")]
174        nonce: Option<usize>,
175    }
176
177    let rm: RawMsg = match serde_json::from_slice(&data_buf) {
178        Ok(f) => f,
179        Err(e) => {
180            tracing::warn!(
181                "Failed to deserialize message: {} {}",
182                e,
183                std::str::from_utf8(&data_buf).unwrap(),
184            );
185
186            return Msg::Error {
187                nonce: None,
188                error: Error::Json(e),
189            };
190        }
191    };
192
193    match rm.evt {
194        Some(EventKind::Error) => {
195            #[derive(serde::Deserialize)]
196            struct ErrorMsg<'stack> {
197                #[serde(borrow)]
198                data: Option<ErrorPayloadStack<'stack>>,
199            }
200
201            match serde_json::from_slice::<ErrorMsg<'_>>(&data_buf) {
202                Ok(em) => Msg::Error {
203                    nonce: rm.nonce,
204                    error: Error::Discord(crate::DiscordErr::Api(em.data.into())),
205                },
206                Err(e) => Msg::Error {
207                    nonce: rm.nonce,
208                    error: Error::Discord(crate::DiscordErr::Api(crate::DiscordApiErr::Generic {
209                        code: None,
210                        message: Some(format!("failed to deserialize error: {}", e)),
211                    })),
212                },
213            }
214        }
215        Some(_) => match serde_json::from_slice::<proto::event::EventFrame>(&data_buf) {
216            Ok(event_frame) => Msg::Event(event_frame.inner),
217            Err(e) => {
218                tracing::warn!(
219                    "failed to deserialize event: {:?}",
220                    std::str::from_utf8(&data_buf)
221                );
222                Msg::Error {
223                    nonce: rm.nonce,
224                    error: Error::Json(e),
225                }
226            }
227        },
228        None => match serde_json::from_slice(&data_buf) {
229            Ok(cmd_frame) => Msg::Command {
230                command: cmd_frame,
231                kind: rm
232                    .cmd
233                    .expect("successfully deserialized command with 'cmd' field"),
234            },
235            Err(e) => {
236                tracing::warn!(
237                    "failed to deserialize command: {:?}",
238                    std::str::from_utf8(&data_buf)
239                );
240
241                Msg::Error {
242                    nonce: rm.nonce,
243                    error: Error::Json(e),
244                }
245            }
246        },
247    }
248}
249
250fn subscribe_task(subs: crate::Subscriptions, stx: cc::Sender<Option<Vec<u8>>>) {
251    tokio::task::spawn(async move {
252        // Assume a max of 64KiB write size and just write all of the
253        // subscriptions into a single buffer rather than n
254        let mut buffer = Vec::with_capacity(1024);
255        let mut nonce = 1usize;
256
257        let mut push = |kind: EventKind| {
258            #[cfg(target_pointer_width = "32")]
259            let nunce = 0x10000000 | nonce;
260            #[cfg(target_pointer_width = "64")]
261            let nunce = 0x1000000000000000 | nonce;
262
263            let _ = io::serialize_message(
264                io::OpCode::Frame,
265                &Rpc::<()> {
266                    cmd: CommandKind::Subscribe,
267                    evt: Some(kind),
268                    nonce: nunce.to_string(),
269                    args: None,
270                },
271                &mut buffer,
272            );
273
274            nonce += 1;
275        };
276
277        let activity = if subs.contains(crate::Subscriptions::ACTIVITY) {
278            [
279                EventKind::ActivityInvite,
280                EventKind::ActivityJoin,
281                EventKind::ActivityJoinRequest,
282                EventKind::ActivitySpectate,
283            ]
284            .iter()
285        } else {
286            [].iter()
287        };
288
289        let user = if subs.contains(crate::Subscriptions::USER) {
290            [EventKind::CurrentUserUpdate].iter()
291        } else {
292            [].iter()
293        };
294
295        let relations = if subs.contains(crate::Subscriptions::RELATIONSHIPS) {
296            [EventKind::RelationshipUpdate].iter()
297        } else {
298            [].iter()
299        };
300
301        activity.chain(user).chain(relations).for_each(|kind| {
302            push(*kind);
303        });
304
305        // Unlike EVERY other event, subscribing to OVERLAY_UPDATE requires
306        // an argument... :facepalm:
307        if subs.contains(crate::Subscriptions::OVERLAY) {
308            #[cfg(target_pointer_width = "32")]
309            let nunce = 0x10000000 | nonce;
310            #[cfg(target_pointer_width = "64")]
311            let nunce = 0x1000000000000000 | nonce;
312
313            let _ = io::serialize_message(
314                io::OpCode::Frame,
315                &Rpc {
316                    cmd: CommandKind::Subscribe,
317                    evt: Some(EventKind::OverlayUpdate),
318                    nonce: nunce.to_string(),
319                    args: Some(crate::overlay::OverlayPidArgs::new()),
320                },
321                &mut buffer,
322            );
323
324            //nonce += 1;
325        }
326
327        if stx.send(Some(buffer)).is_err() {
328            tracing::warn!("unable to send subscription RPCs to I/O task");
329        }
330    });
331}