Skip to main content

filthy_rich/
runner.rs

1use std::time::Duration;
2use tokio::{sync::mpsc, task::JoinHandle, time::sleep};
3
4use crate::{
5    PresenceClient,
6    errors::{DisconnectReason, DiscordSockError, PresenceRunnerError},
7    socket::DiscordSock,
8    types::{
9        Activity, ActivityResponseData, DynamicRPCFrame, IPCCommand, ReadyData, ReadyRPCFrame,
10    },
11    utils::get_current_timestamp,
12};
13
14/// A runner that manages the Discord RPC background task.
15/// Create a runner, configure it, run it to get a client handle, then clone the handle for sharing.
16pub struct PresenceRunner {
17    rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
18    client: PresenceClient,
19    join_handle: Option<JoinHandle<()>>,
20    on_ready: Option<Box<dyn Fn(ReadyData) + Send + Sync + 'static>>,
21    on_activity_send: Option<Box<dyn Fn(ActivityResponseData) + Send + Sync + 'static>>,
22    on_disconnect: Option<Box<dyn Fn(DisconnectReason) + Send + Sync + 'static>>,
23    show_errors: bool,
24}
25
26impl PresenceRunner {
27    #[must_use]
28    /// Create a new [`PresenceRunner`] instance. Requires the client ID of your chosen app from the
29    /// [Discord Developer Portal](https://discord.com/developers/applications).
30    pub fn new(client_id: &str) -> Self {
31        let (tx, rx) = mpsc::channel(32);
32        let client = PresenceClient {
33            tx,
34            client_id: client_id.to_string(),
35        };
36
37        Self {
38            rx: Some(rx),
39            client,
40            join_handle: None,
41            on_ready: None,
42            on_activity_send: None,
43            on_disconnect: None,
44            show_errors: false,
45        }
46    }
47
48    /// Run a particular closure after receiving the READY event from the Discord RPC server.
49    ///
50    /// This event can fire multiple times depending on how many times the client needs to reconnect with Discord RPC.
51    pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
52        self.on_ready = Some(Box::new(f));
53        self
54    }
55
56    /// Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to
57    /// pass its data through the IPC channel.
58    ///
59    /// This event can fire multiple times based on how many activities you set.
60    pub fn on_activity_send<F: Fn(ActivityResponseData) + Send + Sync + 'static>(
61        mut self,
62        f: F,
63    ) -> Self {
64        self.on_activity_send = Some(Box::new(f));
65        self
66    }
67
68    /// Run a particular closure after the RPC connection is lost.
69    ///
70    /// This can fire multiple times if the client reconnects and disconnects again.
71    pub fn on_disconnect<F: Fn(DisconnectReason) + Send + Sync + 'static>(mut self, f: F) -> Self {
72        self.on_disconnect = Some(Box::new(f));
73        self
74    }
75
76    /// Enable verbose error logging for RPC and code events.
77    #[must_use]
78    pub fn show_errors(mut self) -> Self {
79        self.show_errors = true;
80        self
81    }
82
83    /// Run the runner.
84    /// Must be called before any client handle operations.
85    pub async fn run(
86        &mut self,
87        wait_for_ready: bool,
88    ) -> Result<&PresenceClient, PresenceRunnerError> {
89        if self.join_handle.is_some() {
90            return Err(PresenceRunnerError::MultipleRun);
91        }
92
93        let client_id = self.client.client_id.clone();
94        let show_errors = self.show_errors;
95
96        let mut rx = self
97            .rx
98            .take()
99            .ok_or_else(|| PresenceRunnerError::ReceiverError)?;
100
101        // oneshot channel to signal when READY is received the first time
102        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
103
104        // executable closers (executed within the loop)
105        let on_ready = self.on_ready.take();
106        let on_activity_send = self.on_activity_send.take();
107        let on_disconnect = self.on_disconnect.take();
108
109        let join_handle = tokio::spawn(async move {
110            let mut backoff = 1;
111            let mut last_activity: Option<Activity> = None;
112            let mut ready_tx = Some(ready_tx);
113            let mut connected = false;
114
115            let mut session_start: Option<u64> = None;
116
117            'outer: loop {
118                // initial connect
119                let mut socket = match DiscordSock::new().await {
120                    Ok(s) => s,
121                    Err(_) => {
122                        sleep(Duration::from_secs(backoff)).await;
123                        continue;
124                    }
125                };
126
127                // initial handshake
128                if socket.do_handshake(&client_id).await.is_err() {
129                    sleep(Duration::from_secs(backoff)).await;
130                    continue;
131                }
132
133                // ready loop
134                loop {
135                    let frame = match socket.read_frame().await {
136                        Ok(f) => f,
137                        Err(_) => {
138                            break;
139                        }
140                    };
141
142                    if frame.opcode != 1 {
143                        continue;
144                    }
145
146                    if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
147                        if json.cmd.as_deref() == Some("DISPATCH")
148                            && json.evt.as_deref() == Some("READY")
149                        {
150                            if let Some(tx) = ready_tx.take() {
151                                let _ = tx.send(());
152                            }
153                            if let Some(f) = &on_ready {
154                                if let Some(data) = json.data {
155                                    f(data);
156                                }
157                            }
158                            connected = true;
159                            break;
160                        }
161
162                        if json.evt.as_deref() == Some("ERROR") && show_errors {
163                            eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
164                        }
165                    }
166                }
167
168                // restore last activity (if any)
169                if let Some(activity) = &last_activity {
170                    if let Some(t) = session_start {
171                        if let Err(e) = socket.send_activity(activity.clone(), t).await {
172                            if show_errors {
173                                eprintln!("Discord RPC last activity restore error: {e}")
174                            }
175                        }
176                    }
177                }
178
179                backoff = 1;
180
181                // generic loop for receiving commands and responding to pings from Discord itself
182                let disconnect_reason = loop {
183                    tokio::select! {
184                        biased;
185
186                        cmd = rx.recv() => {
187                            match cmd {
188                                Some(cmd) => {
189                                    match cmd {
190                                        IPCCommand::SetActivity { activity } => {
191                                            let session_start_unpacked = if let Some(s) = session_start {
192                                                s
193                                            } else {
194                                                let t = get_current_timestamp().unwrap_or_default();
195                                                session_start = Some(t);
196                                                t
197                                            };
198
199                                            let activity = *activity;
200                                            last_activity = Some(activity.clone());
201
202                                            if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
203                                                if show_errors {
204                                                    eprintln!("Discord RPC send_activity error: {e}");
205                                                }
206                                                break Some(DisconnectReason::SendActivityError(e.to_string()));
207                                            }
208                                        },
209                                        IPCCommand::ClearActivity => {
210                                            last_activity = None;
211                                            session_start = None;
212
213                                            if let Err(e) = socket.clear_activity().await {
214                                                if show_errors {
215                                                    eprintln!("Discord RPC clear_activity error: {e}");
216                                                }
217                                                break Some(DisconnectReason::ClearActivityError(e.to_string()));
218                                            }
219                                        },
220                                        IPCCommand::Close { done }=> {
221                                            let _ = socket.close().await;
222                                            let _ = done.send(());
223                                            break 'outer;
224                                        }
225                                    }
226                                },
227                                None => break Some(DisconnectReason::ClientChannelClosed),
228                            }
229                        }
230
231                        frame = socket.read_frame() => {
232                            match frame {
233                                Ok(frame) => {
234                                    match frame.opcode {
235                                    1 => {
236                                        if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
237                                            if json.evt.as_deref() == Some("ERROR") && show_errors {
238                                                eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
239                                            } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
240                                                if let Some(f) = &on_activity_send {
241                                                    if let Some(data) = json.data {
242                                                        let data = serde_json::from_value::<ActivityResponseData>(data);
243
244                                                        if let Ok(d) = data {
245                                                            f(d)
246                                                        }
247                                                    }
248                                                }
249                                            }
250                                        }
251                                    }
252                                    2 => break Some(DisconnectReason::ServerClosed),
253                                    3 => {
254                                        if let Err(e) = socket.send_frame(3, frame.body).await {
255                                            if show_errors {
256                                                eprintln!("Discord RPC send_frame error: {e}");
257                                            }
258                                            break Some(DisconnectReason::SendFrameError(e.to_string()));
259                                        }
260                                    }
261                                    _ => {}
262                                }
263                                },
264                                Err(e) => {
265                                    if show_errors {
266                                        eprintln!("Discord RPC generic frame read error: {e}")
267                                    }
268                                    // if let Some(io_err) = e.downcast_ref::<std::io::Error>() {
269                                    //     if io_err.kind() == std::io::ErrorKind::UnexpectedEof {
270                                    //         break Some(DisconnectReason::PeerClosed);
271                                    //     }
272                                    // }
273                                    if let DiscordSockError::IoError(error) = &e {
274                                        if error.kind() == std::io::ErrorKind::UnexpectedEof {
275                                            break Some(DisconnectReason::PeerClosed);
276                                        }
277                                    }
278                                    break Some(DisconnectReason::ReadFrameError(e.to_string()));
279                                },
280                            }
281                        }
282                    }
283                };
284
285                if connected {
286                    if let Some(f) = &on_disconnect {
287                        f(disconnect_reason.unwrap_or(DisconnectReason::Unknown));
288                    }
289                    connected = false;
290                }
291
292                sleep(Duration::from_secs(backoff)).await;
293                backoff = (backoff * 2).min(4);
294            }
295        });
296
297        self.join_handle = Some(join_handle);
298
299        if wait_for_ready {
300            match ready_rx.await {
301                Ok(()) => (),
302                Err(_) => return Err(PresenceRunnerError::ExitBeforeReady),
303            }
304        }
305
306        Ok(&self.client)
307    }
308
309    /// Returns a clone of the client handle for sharing.
310    #[must_use]
311    pub fn clone_handle(&self) -> PresenceClient {
312        self.client.clone()
313    }
314
315    /// Waits for the IPC task to finish.
316    ///
317    /// NOTE: If there's no `join_handle` present, the function will do nothing and
318    /// just return blank.
319    pub async fn wait(&mut self) -> Result<(), PresenceRunnerError> {
320        if let Some(handle) = self.join_handle.take() {
321            handle.await?;
322        }
323
324        Ok(())
325    }
326}