Skip to main content

filthy_rich/
runner.rs

1use std::{sync::Arc, time::Duration};
2use tokio::{
3    sync::mpsc::{self},
4    task::JoinHandle,
5    time::sleep,
6};
7
8use crate::{
9    PresenceClient,
10    errors::{DisconnectReason, DiscordSockError, PresenceRunnerError},
11    socket::{self, DiscordSock, Opcode},
12    types::{
13        ActivitySpec, DynamicRPCFrame, IPCCommand, ReadyRPCFrame,
14        data::{ActivityResponseData, ReadyData},
15    },
16    utils::get_current_timestamp,
17};
18
19type Callback<T> = Option<Arc<dyn Fn(T) + Send + Sync + 'static>>;
20
21/// Type alias for [`PresenceRunner`].
22pub type RPCRunner = PresenceRunner;
23
24macro_rules! impl_callback {
25    ($name:ident, $arg:ty, $doc:expr) => {
26        #[doc = $doc]
27        pub fn $name<F: Fn($arg) + Send + Sync + 'static>(mut self, f: F) -> Self {
28            self.$name = Some(Arc::new(f));
29            self
30        }
31    };
32}
33
34macro_rules! retry {
35    ($retries:ident, $on_retry:expr) => {
36        sleep(RETRY_DELAY).await;
37        $retries += 1;
38        if let Some(f) = &$on_retry {
39            let f = f.clone();
40            tokio::spawn(async move { f($retries) });
41        }
42    };
43}
44
45/// A runner that manages the Discord RPC background task.
46/// Create a runner, configure it, run it to get a client handle, then clone the handle for sharing.
47pub struct PresenceRunner {
48    rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
49    client: PresenceClient,
50    join_handle: Option<JoinHandle<()>>,
51    on_ready: Callback<ReadyData>,
52    on_activity_send: Callback<ActivityResponseData>,
53    on_disconnect: Callback<DisconnectReason>,
54    on_retry: Callback<usize>,
55    show_errors: bool,
56    max_retries: usize,
57}
58
59impl PresenceRunner {
60    /// Whether or not a Discord client is available for connection. Returns `false` if there is no
61    /// "running" Discord client and `true` if at least one client is running.
62    #[must_use]
63    pub fn can_connect() -> bool {
64        socket::get_pipe_path().is_some()
65    }
66
67    #[must_use]
68    /// Create a new [`PresenceRunner`] instance. Requires the client ID of your chosen app from the
69    /// [Discord Developer Portal](https://discord.com/developers/applications).
70    pub fn new(client_id: impl Into<String>) -> Self {
71        let (tx, rx) = mpsc::channel(32);
72        let client = PresenceClient {
73            tx,
74            client_id: client_id.into(),
75        };
76
77        Self {
78            rx: Some(rx),
79            client,
80            join_handle: None,
81            on_ready: None,
82            on_activity_send: None,
83            on_disconnect: None,
84            on_retry: None,
85            show_errors: false,
86            max_retries: 0,
87        }
88    }
89
90    impl_callback!(
91        on_ready,
92        ReadyData,
93        "Runs a particular closure after receiving a READY event.
94
95This can fire multiple times depending on how many times the client
96needs to disconnect and reconnect."
97    );
98
99    impl_callback!(
100        on_activity_send,
101        ActivityResponseData,
102        "Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to pass its data through the IPC channel.
103
104This can fire multiple times."
105    );
106
107    impl_callback!(
108        on_disconnect,
109        DisconnectReason,
110        "Runs a particular closure after the RPC connection is lost.
111
112Unlike `on_retry`, this fires only when a previously initialized connection drops, which means that you should prefer
113`on_retry` instead if you want more accurate responses to the initial connect or handshake failures, and `on_disconnect`
114for reacting to post-connection failures.
115
116This can fire multiple times depending on how many times the client disconnects and reconnects again."
117    );
118
119    impl_callback!(
120        on_retry,
121        usize,
122        "Runs a particular closure when retrying for socket creation or handshake.
123
124This can fire multiple times, or for a limited amount depending on the amount of maximum
125retries that has been set (through [`PresenceRunner::set_max_retries`]).
126
127The closure parameter is the count of retries done at the time of its execution."
128    );
129
130    /// Enable verbose error logging over [`std::io::stderr`] writes for RPC and code events.
131    #[must_use]
132    pub fn show_errors(mut self) -> Self {
133        self.show_errors = true;
134        self
135    }
136
137    /// Sets the amount of maximum retries to do on socket creation and handshakes before the runner should give up.
138    ///
139    /// By default this is set to `0` internally, which means the inner loop would retry indefinitely.
140    #[must_use]
141    pub fn set_max_retries(mut self, count: usize) -> Self {
142        self.max_retries = count;
143        self
144    }
145
146    /// Run the runner.
147    /// Must be called before any client handle operations.
148    pub async fn run(
149        &mut self,
150        wait_for_ready: bool,
151    ) -> Result<&PresenceClient, PresenceRunnerError> {
152        if self.join_handle.is_some() {
153            return Err(PresenceRunnerError::MultipleRun);
154        }
155
156        let client_id = self.client.client_id.clone();
157        let show_errors = self.show_errors;
158        let max_retries = self.max_retries;
159
160        let mut rx = self
161            .rx
162            .take()
163            .ok_or_else(|| PresenceRunnerError::ReceiverError)?;
164
165        // oneshot channel to signal when READY is received the first time
166        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
167
168        // executable closers (executed within the loop)
169        let on_ready = self.on_ready.take();
170        let on_activity_send = self.on_activity_send.take();
171        let on_disconnect = self.on_disconnect.take();
172        let on_retry = self.on_retry.take();
173
174        let join_handle = tokio::spawn(async move {
175            const RETRY_DELAY: Duration = Duration::from_secs(1);
176            let mut last_activity: Option<ActivitySpec> = None;
177            let mut ready_tx = Some(ready_tx);
178            let mut retries = 0;
179
180            let mut session_start: Option<u64> = None;
181
182            'outer: loop {
183                if max_retries != 0 && retries >= max_retries {
184                    break;
185                }
186
187                // initial connect
188                let mut socket = match DiscordSock::new().await {
189                    Ok(s) => s,
190                    Err(_) => {
191                        retry!(retries, on_retry);
192                        continue;
193                    }
194                };
195
196                // initial handshake
197                if socket.do_handshake(&client_id).await.is_err() {
198                    retry!(retries, on_retry);
199                    continue;
200                }
201
202                // ready loop
203                loop {
204                    let frame = match socket.read_frame().await {
205                        Ok(f) => f,
206                        Err(_) => {
207                            retry!(retries, on_retry);
208                            continue 'outer;
209                        }
210                    };
211
212                    if Opcode::Frame != frame.opcode {
213                        continue;
214                    }
215
216                    if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
217                        if json.cmd.as_deref() == Some("DISPATCH")
218                            && json.evt.as_deref() == Some("READY")
219                        {
220                            if let Some(tx) = ready_tx.take() {
221                                let _ = tx.send(());
222                            }
223                            if let Some(f) = &on_ready {
224                                if let Some(data) = json.data {
225                                    let f = f.clone();
226                                    tokio::spawn(async move { f(data) });
227                                }
228                            }
229
230                            // reset retires here
231                            retries = 0;
232                            break;
233                        }
234
235                        if json.evt.as_deref() == Some("ERROR") && show_errors {
236                            eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
237                        }
238                    }
239                }
240
241                // restore last activity (if any)
242                if let Some(activity) = &last_activity {
243                    if let Some(t) = session_start {
244                        if let Err(e) = socket.send_activity(activity.clone(), t).await {
245                            if show_errors {
246                                eprintln!("Discord RPC last activity restore error: {e}")
247                            }
248                        }
249                    }
250                }
251
252                // generic loop for receiving commands and responding to pings from Discord itself
253                let disconnect_reason = loop {
254                    tokio::select! {
255                        cmd = rx.recv() => {
256                            match cmd {
257                                Some(cmd) => {
258                                    match cmd {
259                                        IPCCommand::SetActivity { activity } => {
260                                            let session_start_unpacked = if let Some(s) = session_start {
261                                                s
262                                            } else {
263                                                match get_current_timestamp() {
264                                                    Ok(t) => {
265                                                        session_start = Some(t);
266                                                        t
267                                                    },
268                                                    Err(e) => {
269                                                        if show_errors {
270                                                            eprintln!("Discord RPC pre-send_activity time parsing error: {e}")
271                                                        }
272                                                        break Some(DisconnectReason::OldRelicComputer(e.to_string()));
273                                                    }
274                                                }
275                                            };
276
277                                            let activity = *activity;
278                                            last_activity = Some(activity.clone());
279
280                                            if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
281                                                if show_errors {
282                                                    eprintln!("Discord RPC send_activity error: {e}");
283                                                }
284                                                break Some(DisconnectReason::SendActivityError(e.to_string()));
285                                            }
286                                        },
287                                        IPCCommand::ClearActivity => {
288                                            last_activity = None;
289                                            session_start = None;
290
291                                            if let Err(e) = socket.clear_activity().await {
292                                                if show_errors {
293                                                    eprintln!("Discord RPC clear_activity error: {e}");
294                                                }
295                                                break Some(DisconnectReason::ClearActivityError(e.to_string()));
296                                            }
297                                        },
298                                        IPCCommand::Close { done_tx }=> {
299                                            let _ = socket.close().await;
300                                            let _ = done_tx.send(());
301                                            break 'outer;
302                                        }
303                                    }
304                                },
305                                None => break Some(DisconnectReason::ClientChannelClosed),
306                            }
307                        }
308
309                        frame = socket.read_frame() => {
310                            match frame {
311                                Ok(frame) => {
312                                    match frame.opcode {
313                                        Opcode::Frame => {
314                                            if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
315                                                if json.evt.as_deref() == Some("ERROR") && show_errors {
316                                                    eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
317                                                } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
318                                                    if let Some(f) = &on_activity_send {
319                                                        if let Some(data) = json.data {
320                                                            let data = serde_json::from_value::<ActivityResponseData>(data);
321
322                                                            if let Ok(d) = data {
323                                                                let f = f.clone();
324                                                                tokio::spawn(async move { f(d)});
325                                                            } else if let Err(e) = data{
326                                                                println!("{e}")
327                                                            }
328                                                        }
329                                                    }
330                                                }
331                                            }
332                                        },
333                                        Opcode::Close => break Some(DisconnectReason::ServerClosed),
334                                        Opcode::Ping => {
335                                            if let Err(e) = socket.send_frame(Opcode::Pong, frame.body).await {
336                                                if show_errors {
337                                                    eprintln!("Discord RPC send_frame error: {e}");
338                                                }
339                                                break Some(DisconnectReason::SendFrameError(e.to_string()));
340                                            }
341                                        },
342                                        _ => {}
343                                    }
344                                },
345                                Err(e) => {
346                                    if show_errors {
347                                        eprintln!("Discord RPC generic frame read error: {e}")
348                                    }
349                                    if let DiscordSockError::IoError(error) = &e {
350                                        if error.kind() == std::io::ErrorKind::UnexpectedEof {
351                                            break Some(DisconnectReason::PeerClosed);
352                                        }
353                                    }
354                                    break Some(DisconnectReason::ReadFrameError(e.to_string()));
355                                },
356                            }
357                        }
358                    }
359                };
360
361                if let Some(f) = &on_disconnect {
362                    f(disconnect_reason.unwrap_or(DisconnectReason::Unknown));
363                }
364
365                sleep(RETRY_DELAY).await;
366            }
367        });
368
369        self.join_handle = Some(join_handle);
370
371        if wait_for_ready {
372            match ready_rx.await {
373                Ok(()) => (),
374                Err(_) => return Err(PresenceRunnerError::ExitBeforeReady),
375            }
376        }
377
378        Ok(&self.client)
379    }
380
381    /// Returns a clone of the client handle for sharing.
382    #[must_use]
383    pub fn clone_handle(&self) -> PresenceClient {
384        self.client.clone()
385    }
386
387    /// Waits for the IPC task to finish.
388    ///
389    /// NOTE: If there's no `join_handle` present, the function will do nothing and
390    /// just return blank.
391    pub async fn wait(&mut self) -> Result<(), PresenceRunnerError> {
392        if let Some(handle) = self.join_handle.take() {
393            handle.await?;
394        }
395
396        Ok(())
397    }
398}