Skip to main content

filthy_rich/
runner.rs

1use std::{sync::Arc, time::Duration};
2use tokio::{sync::mpsc, task::JoinHandle, time::sleep};
3
4use crate::{
5    PresenceClient,
6    errors::{DisconnectReason, DiscordSockError, PresenceRunnerError},
7    socket::{DiscordSock, Opcode},
8    types::{
9        ActivitySpec, DynamicRPCFrame, IPCCommand, ReadyRPCFrame,
10        data::{ActivityResponseData, ReadyData},
11    },
12    utils::get_current_timestamp,
13};
14
15type Callback<T> = Option<Arc<dyn Fn(T) + Send + Sync + 'static>>;
16
17macro_rules! impl_callback {
18    ($name:ident, $arg:ty, $doc:expr) => {
19        #[doc = $doc]
20        pub fn $name<F: Fn($arg) + Send + Sync + 'static>(mut self, f: F) -> Self {
21            self.$name = Some(Arc::new(f));
22            self
23        }
24    };
25}
26
27macro_rules! retry {
28    ($retries:ident, $on_retry:expr) => {
29        sleep(RETRY_DELAY).await;
30        $retries += 1;
31        if let Some(f) = &$on_retry {
32            let f = f.clone();
33            tokio::spawn(async move { f($retries) });
34        }
35    };
36}
37
38/// A runner that manages the Discord RPC background task.
39/// Create a runner, configure it, run it to get a client handle, then clone the handle for sharing.
40pub struct PresenceRunner {
41    rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
42    client: PresenceClient,
43    join_handle: Option<JoinHandle<()>>,
44    on_ready: Callback<ReadyData>,
45    on_activity_send: Callback<ActivityResponseData>,
46    on_disconnect: Callback<DisconnectReason>,
47    on_retry: Callback<usize>,
48    show_errors: bool,
49    max_retries: usize,
50}
51
52impl PresenceRunner {
53    #[must_use]
54    /// Create a new [`PresenceRunner`] instance. Requires the client ID of your chosen app from the
55    /// [Discord Developer Portal](https://discord.com/developers/applications).
56    pub fn new(client_id: impl Into<String>) -> Self {
57        let (tx, rx) = mpsc::channel(32);
58        let client = PresenceClient {
59            tx,
60            client_id: client_id.into(),
61        };
62
63        Self {
64            rx: Some(rx),
65            client,
66            join_handle: None,
67            on_ready: None,
68            on_activity_send: None,
69            on_disconnect: None,
70            on_retry: None,
71            show_errors: false,
72            max_retries: 0,
73        }
74    }
75
76    impl_callback!(
77        on_ready,
78        ReadyData,
79        "Runs a particular closure after receiving a READY event.
80
81This can fire multiple times depending on how many times the client
82needs to disconnect and reconnect."
83    );
84
85    impl_callback!(
86        on_activity_send,
87        ActivityResponseData,
88        "Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to pass its data through the IPC channel.
89
90This can fire multiple times."
91    );
92
93    impl_callback!(
94        on_disconnect,
95        DisconnectReason,
96        "Runs a particular closure after the RPC connection is lost.
97
98Unlike `on_retry`, this fires only when a previously initialized connection drops, which means that you should prefer
99`on_retry` instead if you want more accurate responses to the initial connect or handshake failures, and `on_disconnect`
100for reacting to post-connection failures.
101
102This can fire multiple times depending on how many times the client disconnects and reconnects again."
103    );
104
105    impl_callback!(
106        on_retry,
107        usize,
108        "Runs a particular closure when retrying for socket creation or handshake.
109
110This can fire multiple times, or for a limited amount depending on the amount of maximum
111retries that has been set (through [`PresenceRunner::set_max_retries`]).
112
113The closure parameter is the count of retries done at the time of its execution."
114    );
115
116    /// Enable verbose error logging over [`std::io::stderr`] writes for RPC and code events.
117    #[must_use]
118    pub fn show_errors(mut self) -> Self {
119        self.show_errors = true;
120        self
121    }
122
123    /// Sets the amount of maximum retries to do on socket creation and handshakes before the runner should give up.
124    ///
125    /// By default this is set to `0` internally, which means the inner loop would retry indefinitely.
126    #[must_use]
127    pub fn set_max_retries(mut self, count: usize) -> Self {
128        self.max_retries = count;
129        self
130    }
131
132    /// Run the runner.
133    /// Must be called before any client handle operations.
134    pub async fn run(
135        &mut self,
136        wait_for_ready: bool,
137    ) -> Result<&PresenceClient, PresenceRunnerError> {
138        if self.join_handle.is_some() {
139            return Err(PresenceRunnerError::MultipleRun);
140        }
141
142        let client_id = self.client.client_id.clone();
143        let show_errors = self.show_errors;
144        let max_retries = self.max_retries;
145
146        let mut rx = self
147            .rx
148            .take()
149            .ok_or_else(|| PresenceRunnerError::ReceiverError)?;
150
151        // oneshot channel to signal when READY is received the first time
152        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
153
154        // executable closers (executed within the loop)
155        let on_ready = self.on_ready.take();
156        let on_activity_send = self.on_activity_send.take();
157        let on_disconnect = self.on_disconnect.take();
158        let on_retry = self.on_retry.take();
159
160        let join_handle = tokio::spawn(async move {
161            const RETRY_DELAY: Duration = Duration::from_secs(1);
162            let mut last_activity: Option<ActivitySpec> = None;
163            let mut ready_tx = Some(ready_tx);
164            let mut retries = 0;
165
166            let mut session_start: Option<u64> = None;
167
168            'outer: loop {
169                if max_retries != 0 && retries >= max_retries {
170                    break;
171                }
172
173                // initial connect
174                let mut socket = match DiscordSock::new().await {
175                    Ok(s) => s,
176                    Err(_) => {
177                        retry!(retries, on_retry);
178                        continue;
179                    }
180                };
181
182                // initial handshake
183                if socket.do_handshake(&client_id).await.is_err() {
184                    retry!(retries, on_retry);
185                    continue;
186                }
187
188                // ready loop
189                loop {
190                    let frame = match socket.read_frame().await {
191                        Ok(f) => f,
192                        Err(_) => {
193                            retry!(retries, on_retry);
194                            continue 'outer;
195                        }
196                    };
197
198                    if Opcode::Frame != frame.opcode {
199                        continue;
200                    }
201
202                    if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
203                        if json.cmd.as_deref() == Some("DISPATCH")
204                            && json.evt.as_deref() == Some("READY")
205                        {
206                            if let Some(tx) = ready_tx.take() {
207                                let _ = tx.send(());
208                            }
209                            if let Some(f) = &on_ready {
210                                if let Some(data) = json.data {
211                                    let f = f.clone();
212                                    tokio::spawn(async move { f(data) });
213                                }
214                            }
215
216                            // reset retires here
217                            retries = 0;
218                            break;
219                        }
220
221                        if json.evt.as_deref() == Some("ERROR") && show_errors {
222                            eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
223                        }
224                    }
225                }
226
227                // restore last activity (if any)
228                if let Some(activity) = &last_activity {
229                    if let Some(t) = session_start {
230                        if let Err(e) = socket.send_activity(activity.clone(), t).await {
231                            if show_errors {
232                                eprintln!("Discord RPC last activity restore error: {e}")
233                            }
234                        }
235                    }
236                }
237
238                // generic loop for receiving commands and responding to pings from Discord itself
239                let disconnect_reason = loop {
240                    tokio::select! {
241                        cmd = rx.recv() => {
242                            match cmd {
243                                Some(cmd) => {
244                                    match cmd {
245                                        IPCCommand::SetActivity { activity } => {
246                                            let session_start_unpacked = if let Some(s) = session_start {
247                                                s
248                                            } else {
249                                                match get_current_timestamp() {
250                                                    Ok(t) => {
251                                                        session_start = Some(t);
252                                                        t
253                                                    },
254                                                    Err(e) => {
255                                                        if show_errors {
256                                                            eprintln!("Discord RPC pre-send_activity time parsing error: {e}")
257                                                        }
258                                                        break Some(DisconnectReason::OldRelicComputer(e.to_string()));
259                                                    }
260                                                }
261                                            };
262
263                                            let activity = *activity;
264                                            last_activity = Some(activity.clone());
265
266                                            if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
267                                                if show_errors {
268                                                    eprintln!("Discord RPC send_activity error: {e}");
269                                                }
270                                                break Some(DisconnectReason::SendActivityError(e.to_string()));
271                                            }
272                                        },
273                                        IPCCommand::ClearActivity => {
274                                            last_activity = None;
275                                            session_start = None;
276
277                                            if let Err(e) = socket.clear_activity().await {
278                                                if show_errors {
279                                                    eprintln!("Discord RPC clear_activity error: {e}");
280                                                }
281                                                break Some(DisconnectReason::ClearActivityError(e.to_string()));
282                                            }
283                                        },
284                                        IPCCommand::Close { done_tx }=> {
285                                            let _ = socket.close().await;
286                                            let _ = done_tx.send(());
287                                            break 'outer;
288                                        }
289                                    }
290                                },
291                                None => break Some(DisconnectReason::ClientChannelClosed),
292                            }
293                        }
294
295                        frame = socket.read_frame() => {
296                            match frame {
297                                Ok(frame) => {
298                                    match frame.opcode {
299                                        Opcode::Frame => {
300                                            if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
301                                                if json.evt.as_deref() == Some("ERROR") && show_errors {
302                                                    eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
303                                                } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
304                                                    if let Some(f) = &on_activity_send {
305                                                        if let Some(data) = json.data {
306                                                            let data = serde_json::from_value::<ActivityResponseData>(data);
307
308                                                            if let Ok(d) = data {
309                                                                let f = f.clone();
310                                                                tokio::spawn(async move { f(d)});
311                                                            } else if let Err(e) = data{
312                                                                println!("{e}")
313                                                            }
314                                                        }
315                                                    }
316                                                }
317                                            }
318                                        },
319                                        Opcode::Close => break Some(DisconnectReason::ServerClosed),
320                                        Opcode::Ping => {
321                                            if let Err(e) = socket.send_frame(Opcode::Pong, frame.body).await {
322                                                if show_errors {
323                                                    eprintln!("Discord RPC send_frame error: {e}");
324                                                }
325                                                break Some(DisconnectReason::SendFrameError(e.to_string()));
326                                            }
327                                        },
328                                        _ => {}
329                                    }
330                                },
331                                Err(e) => {
332                                    if show_errors {
333                                        eprintln!("Discord RPC generic frame read error: {e}")
334                                    }
335                                    if let DiscordSockError::IoError(error) = &e {
336                                        if error.kind() == std::io::ErrorKind::UnexpectedEof {
337                                            break Some(DisconnectReason::PeerClosed);
338                                        }
339                                    }
340                                    break Some(DisconnectReason::ReadFrameError(e.to_string()));
341                                },
342                            }
343                        }
344                    }
345                };
346
347                if let Some(f) = &on_disconnect {
348                    f(disconnect_reason.unwrap_or(DisconnectReason::Unknown));
349                }
350
351                sleep(RETRY_DELAY).await;
352            }
353        });
354
355        self.join_handle = Some(join_handle);
356
357        if wait_for_ready {
358            match ready_rx.await {
359                Ok(()) => (),
360                Err(_) => return Err(PresenceRunnerError::ExitBeforeReady),
361            }
362        }
363
364        Ok(&self.client)
365    }
366
367    /// Returns a clone of the client handle for sharing.
368    #[must_use]
369    pub fn clone_handle(&self) -> PresenceClient {
370        self.client.clone()
371    }
372
373    /// Waits for the IPC task to finish.
374    ///
375    /// NOTE: If there's no `join_handle` present, the function will do nothing and
376    /// just return blank.
377    pub async fn wait(&mut self) -> Result<(), PresenceRunnerError> {
378        if let Some(handle) = self.join_handle.take() {
379            handle.await?;
380        }
381
382        Ok(())
383    }
384}