Skip to main content

filthy_rich/
runner.rs

1use std::{sync::Arc, time::Duration};
2use tokio::{sync::mpsc, task::JoinHandle, time::sleep};
3
4use crate::{
5    PresenceClient,
6    errors::{DisconnectReason, DiscordSockError, PresenceRunnerError},
7    socket::{DiscordSock, Opcode},
8    types::{
9        ActivitySpec, DynamicRPCFrame, IPCCommand, ReadyRPCFrame,
10        data::{ActivityResponseData, ReadyData},
11    },
12    utils::get_current_timestamp,
13};
14
15type Callback<T> = Option<Arc<dyn Fn(T) + Send + Sync + 'static>>;
16
17macro_rules! impl_callback {
18    ($name:ident, $arg:ty, $doc:expr) => {
19        #[doc = $doc]
20        pub fn $name<F: Fn($arg) + Send + Sync + 'static>(mut self, f: F) -> Self {
21            self.$name = Some(Arc::new(f));
22            self
23        }
24    };
25}
26
27/// A runner that manages the Discord RPC background task.
28/// Create a runner, configure it, run it to get a client handle, then clone the handle for sharing.
29pub struct PresenceRunner {
30    rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
31    client: PresenceClient,
32    join_handle: Option<JoinHandle<()>>,
33    on_ready: Callback<ReadyData>,
34    on_activity_send: Callback<ActivityResponseData>,
35    on_disconnect: Callback<DisconnectReason>,
36    on_retry: Callback<usize>,
37    show_errors: bool,
38    max_retries: usize,
39}
40
41impl PresenceRunner {
42    #[must_use]
43    /// Create a new [`PresenceRunner`] instance. Requires the client ID of your chosen app from the
44    /// [Discord Developer Portal](https://discord.com/developers/applications).
45    pub fn new(client_id: impl Into<String>) -> Self {
46        let (tx, rx) = mpsc::channel(32);
47        let client = PresenceClient {
48            tx,
49            client_id: client_id.into(),
50        };
51
52        Self {
53            rx: Some(rx),
54            client,
55            join_handle: None,
56            on_ready: None,
57            on_activity_send: None,
58            on_disconnect: None,
59            on_retry: None,
60            show_errors: false,
61            max_retries: 0,
62        }
63    }
64
65    impl_callback!(
66        on_ready,
67        ReadyData,
68        "Runs a particular closure after receiving a READY event.
69
70This can fire multiple times depending on how many times the client
71needs to disconnect and reconnect."
72    );
73
74    impl_callback!(
75        on_activity_send,
76        ActivityResponseData,
77        "Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to pass its data through the IPC channel.
78
79This can fire multiple times."
80    );
81
82    impl_callback!(
83        on_disconnect,
84        DisconnectReason,
85        "Runs a particular closure after the RPC connection is lost.
86
87Unlike `on_retry`, this fires only when a previously initialized connection drops, which means that you should prefer
88`on_retry` instead if you want more accurate responses to the initial connect or handshake failures, and `on_disconnect`
89for reacting to post-connection failures.
90
91This can fire multiple times depending on how many times the client disconnects and reconnects again."
92    );
93
94    impl_callback!(
95        on_retry,
96        usize,
97        "Runs a particular closure when retrying for socket creation or handshake.
98
99This can fire multiple times, or for a limited amount depending on the amount of maximum
100retries that has been set (through [`PresenceRunner::set_max_retries`]).
101
102The closure parameter is the count of retries done at the time of its execution."
103    );
104
105    /// Enable verbose error logging over [`std::io::stderr`] writes for RPC and code events.
106    #[must_use]
107    pub fn show_errors(mut self) -> Self {
108        self.show_errors = true;
109        self
110    }
111
112    /// Sets the amount of maximum retries to do on socket creation and handshakes before the runner should give up.
113    ///
114    /// By default this is set to `0` internally, which means the inner loop would retry indefinitely.
115    #[must_use]
116    pub fn set_max_retries(mut self, count: usize) -> Self {
117        self.max_retries = count;
118        self
119    }
120
121    /// Run the runner.
122    /// Must be called before any client handle operations.
123    pub async fn run(
124        &mut self,
125        wait_for_ready: bool,
126    ) -> Result<&PresenceClient, PresenceRunnerError> {
127        if self.join_handle.is_some() {
128            return Err(PresenceRunnerError::MultipleRun);
129        }
130
131        let client_id = self.client.client_id.clone();
132        let show_errors = self.show_errors;
133        let max_retries = self.max_retries;
134
135        let mut rx = self
136            .rx
137            .take()
138            .ok_or_else(|| PresenceRunnerError::ReceiverError)?;
139
140        // oneshot channel to signal when READY is received the first time
141        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
142
143        // executable closers (executed within the loop)
144        let on_ready = self.on_ready.take();
145        let on_activity_send = self.on_activity_send.take();
146        let on_disconnect = self.on_disconnect.take();
147        let on_retry = self.on_retry.take();
148
149        let join_handle = tokio::spawn(async move {
150            const RETRY_DELAY: Duration = Duration::from_secs(1);
151            let mut last_activity: Option<ActivitySpec> = None;
152            let mut ready_tx = Some(ready_tx);
153            let mut retries = 0;
154
155            let mut session_start: Option<u64> = None;
156
157            'outer: loop {
158                if max_retries != 0 && retries >= max_retries {
159                    break;
160                }
161
162                // initial connect
163                let mut socket = match DiscordSock::new().await {
164                    Ok(s) => s,
165                    Err(_) => {
166                        sleep(RETRY_DELAY).await;
167
168                        retries += 1;
169                        if let Some(f) = &on_retry {
170                            let f = f.clone();
171                            tokio::spawn(async move { f(retries) });
172                        }
173
174                        continue;
175                    }
176                };
177
178                // initial handshake
179                if socket.do_handshake(&client_id).await.is_err() {
180                    sleep(RETRY_DELAY).await;
181
182                    retries += 1;
183                    if let Some(f) = &on_retry {
184                        let f = f.clone();
185                        tokio::spawn(async move { f(retries) });
186                    }
187
188                    continue;
189                }
190
191                // ready loop
192                loop {
193                    let frame = match socket.read_frame().await {
194                        Ok(f) => f,
195                        Err(_) => {
196                            sleep(RETRY_DELAY).await;
197                            retries += 1;
198                            if let Some(f) = &on_retry {
199                                let f = f.clone();
200                                tokio::spawn(async move { f(retries) });
201                            }
202                            continue 'outer;
203                        }
204                    };
205
206                    if Opcode::Frame != frame.opcode {
207                        continue;
208                    }
209
210                    if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
211                        if json.cmd.as_deref() == Some("DISPATCH")
212                            && json.evt.as_deref() == Some("READY")
213                        {
214                            if let Some(tx) = ready_tx.take() {
215                                let _ = tx.send(());
216                            }
217                            if let Some(f) = &on_ready {
218                                if let Some(data) = json.data {
219                                    let f = f.clone();
220                                    tokio::spawn(async move { f(data) });
221                                }
222                            }
223
224                            // reset retires here
225                            retries = 0;
226                            break;
227                        }
228
229                        if json.evt.as_deref() == Some("ERROR") && show_errors {
230                            eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
231                        }
232                    }
233                }
234
235                // restore last activity (if any)
236                if let Some(activity) = &last_activity {
237                    if let Some(t) = session_start {
238                        if let Err(e) = socket.send_activity(activity.clone(), t).await {
239                            if show_errors {
240                                eprintln!("Discord RPC last activity restore error: {e}")
241                            }
242                        }
243                    }
244                }
245
246                // generic loop for receiving commands and responding to pings from Discord itself
247                let disconnect_reason = loop {
248                    tokio::select! {
249                        cmd = rx.recv() => {
250                            match cmd {
251                                Some(cmd) => {
252                                    match cmd {
253                                        IPCCommand::SetActivity { activity } => {
254                                            let session_start_unpacked = if let Some(s) = session_start {
255                                                s
256                                            } else {
257                                                match get_current_timestamp() {
258                                                    Ok(t) => {
259                                                        session_start = Some(t);
260                                                        t
261                                                    },
262                                                    Err(e) => {
263                                                        if show_errors {
264                                                            eprintln!("Discord RPC pre-send_activity time parsing error: {e}")
265                                                        }
266                                                        break Some(DisconnectReason::OldRelicComputer(e.to_string()));
267                                                    }
268                                                }
269                                            };
270
271                                            let activity = *activity;
272                                            last_activity = Some(activity.clone());
273
274                                            if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
275                                                if show_errors {
276                                                    eprintln!("Discord RPC send_activity error: {e}");
277                                                }
278                                                break Some(DisconnectReason::SendActivityError(e.to_string()));
279                                            }
280                                        },
281                                        IPCCommand::ClearActivity => {
282                                            last_activity = None;
283                                            session_start = None;
284
285                                            if let Err(e) = socket.clear_activity().await {
286                                                if show_errors {
287                                                    eprintln!("Discord RPC clear_activity error: {e}");
288                                                }
289                                                break Some(DisconnectReason::ClearActivityError(e.to_string()));
290                                            }
291                                        },
292                                        IPCCommand::Close { done_tx }=> {
293                                            let _ = socket.close().await;
294                                            let _ = done_tx.send(());
295                                            break 'outer;
296                                        }
297                                    }
298                                },
299                                None => break Some(DisconnectReason::ClientChannelClosed),
300                            }
301                        }
302
303                        frame = socket.read_frame() => {
304                            match frame {
305                                Ok(frame) => {
306                                    match frame.opcode {
307                                        Opcode::Frame => {
308                                            if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
309                                                if json.evt.as_deref() == Some("ERROR") && show_errors {
310                                                    eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
311                                                } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
312                                                    if let Some(f) = &on_activity_send {
313                                                        if let Some(data) = json.data {
314                                                            let data = serde_json::from_value::<ActivityResponseData>(data);
315
316                                                            if let Ok(d) = data {
317                                                                let f = f.clone();
318                                                                tokio::spawn(async move { f(d)});
319                                                            } else if let Err(e) = data{
320                                                                println!("{e}")
321                                                            }
322                                                        }
323                                                    }
324                                                }
325                                            }
326                                        },
327                                        Opcode::Close => break Some(DisconnectReason::ServerClosed),
328                                        Opcode::Ping => {
329                                            if let Err(e) = socket.send_frame(Opcode::Pong, frame.body).await {
330                                                if show_errors {
331                                                    eprintln!("Discord RPC send_frame error: {e}");
332                                                }
333                                                break Some(DisconnectReason::SendFrameError(e.to_string()));
334                                            }
335                                        },
336                                        _ => {}
337                                    }
338                                },
339                                Err(e) => {
340                                    if show_errors {
341                                        eprintln!("Discord RPC generic frame read error: {e}")
342                                    }
343                                    if let DiscordSockError::IoError(error) = &e {
344                                        if error.kind() == std::io::ErrorKind::UnexpectedEof {
345                                            break Some(DisconnectReason::PeerClosed);
346                                        }
347                                    }
348                                    break Some(DisconnectReason::ReadFrameError(e.to_string()));
349                                },
350                            }
351                        }
352                    }
353                };
354
355                if let Some(f) = &on_disconnect {
356                    f(disconnect_reason.unwrap_or(DisconnectReason::Unknown));
357                }
358
359                sleep(RETRY_DELAY).await;
360            }
361        });
362
363        self.join_handle = Some(join_handle);
364
365        if wait_for_ready {
366            match ready_rx.await {
367                Ok(()) => (),
368                Err(_) => return Err(PresenceRunnerError::ExitBeforeReady),
369            }
370        }
371
372        Ok(&self.client)
373    }
374
375    /// Returns a clone of the client handle for sharing.
376    #[must_use]
377    pub fn clone_handle(&self) -> PresenceClient {
378        self.client.clone()
379    }
380
381    /// Waits for the IPC task to finish.
382    ///
383    /// NOTE: If there's no `join_handle` present, the function will do nothing and
384    /// just return blank.
385    pub async fn wait(&mut self) -> Result<(), PresenceRunnerError> {
386        if let Some(handle) = self.join_handle.take() {
387            handle.await?;
388        }
389
390        Ok(())
391    }
392}