Skip to main content

filthy_rich/
runner.rs

1use std::time::Duration;
2use tokio::{sync::mpsc, task::JoinHandle, time::sleep};
3
4use anyhow::{Result, anyhow, bail};
5
6use crate::{
7    PresenceClient,
8    socket::DiscordSock,
9    types::{
10        Activity, ActivityResponseData, DynamicRPCFrame, IPCCommand, ReadyData, ReadyRPCFrame,
11    },
12    utils::get_current_timestamp,
13};
14
15const MULTIPLE_RUN_CALL_ERR: &str = "PresenceRunner::run() called more than once";
16
17/// A runner that manages the Discord RPC background task.
18/// Create a runner, configure it, run it to get a client handle, then clone the handle for sharing.
19pub struct PresenceRunner {
20    rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
21    client: PresenceClient,
22    join_handle: Option<JoinHandle<Result<()>>>,
23    on_ready: Option<Box<dyn Fn(ReadyData) + Send + Sync + 'static>>,
24    on_activity_send: Option<Box<dyn Fn(ActivityResponseData) + Send + Sync + 'static>>,
25    show_errors: bool,
26}
27
28impl PresenceRunner {
29    #[must_use]
30    /// Create a new [`PresenceRunner`] instance. Requires the client ID of your chosen app from the
31    /// [Discord Developer Portal](https://discord.com/developers/applications).
32    pub fn new(client_id: &str) -> Self {
33        let (tx, rx) = mpsc::channel(32);
34        let client = PresenceClient {
35            tx,
36            client_id: client_id.to_string(),
37        };
38
39        Self {
40            rx: Some(rx),
41            client,
42            join_handle: None,
43            on_ready: None,
44            on_activity_send: None,
45            show_errors: false,
46        }
47    }
48
49    /// Run a particular closure after receiving the READY event from the Discord RPC server.
50    ///
51    /// This event can fire multiple times depending on how many times the client needs to reconnect with Discord RPC.
52    pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
53        self.on_ready = Some(Box::new(f));
54        self
55    }
56
57    /// Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to
58    /// pass its data through the IPC channel.
59    ///
60    /// This event can fire multiple times based on how many activities you set.
61    pub fn on_activity_send<F: Fn(ActivityResponseData) + Send + Sync + 'static>(
62        mut self,
63        f: F,
64    ) -> Self {
65        self.on_activity_send = Some(Box::new(f));
66        self
67    }
68
69    /// Enable verbose error logging for RPC and code events.
70    #[must_use]
71    pub fn show_errors(mut self) -> Self {
72        self.show_errors = true;
73        self
74    }
75
76    /// Run the runner.
77    /// Must be called before any client handle operations.
78    pub async fn run(&mut self, wait_for_ready: bool) -> Result<&PresenceClient> {
79        if self.join_handle.is_some() {
80            bail!(MULTIPLE_RUN_CALL_ERR)
81        }
82
83        let client_id = self.client.client_id.clone();
84        let show_errors = self.show_errors;
85
86        let mut rx = self
87            .rx
88            .take()
89            .ok_or_else(|| anyhow!(MULTIPLE_RUN_CALL_ERR))?;
90
91        // oneshot channel to signal when READY is received the first time
92        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
93
94        // executable closers (executed within the loop)
95        let on_ready = self.on_ready.take();
96        let on_activity_send = self.on_activity_send.take();
97
98        let join_handle = tokio::spawn(async move {
99            let mut backoff = 1;
100            let mut last_activity: Option<Activity> = None;
101            let mut ready_tx = Some(ready_tx);
102
103            let mut session_start: Option<u64> = None;
104
105            'outer: loop {
106                // initial connect
107                let mut socket = match DiscordSock::new().await {
108                    Ok(s) => s,
109                    Err(_) => {
110                        sleep(Duration::from_secs(backoff)).await;
111                        continue;
112                    }
113                };
114
115                // initial handshake
116                if socket.do_handshake(&client_id).await.is_err() {
117                    sleep(Duration::from_secs(backoff)).await;
118                    continue;
119                }
120
121                // ready loop
122                loop {
123                    let frame = match socket.read_frame().await {
124                        Ok(f) => f,
125                        Err(_) => {
126                            break;
127                        }
128                    };
129
130                    if frame.opcode != 1 {
131                        continue;
132                    }
133
134                    if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
135                        if json.cmd.as_deref() == Some("DISPATCH")
136                            && json.evt.as_deref() == Some("READY")
137                        {
138                            if let Some(tx) = ready_tx.take() {
139                                let _ = tx.send(());
140                            }
141                            if let Some(f) = &on_ready {
142                                if let Some(data) = json.data {
143                                    f(data);
144                                }
145                            }
146                            break;
147                        }
148
149                        if json.evt.as_deref() == Some("ERROR") && show_errors {
150                            eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
151                        }
152                    }
153                }
154
155                // restore last activity (if any)
156                if let Some(activity) = &last_activity {
157                    if let Some(t) = session_start {
158                        if let Err(e) = socket.send_activity(activity.clone(), t).await {
159                            if show_errors {
160                                eprintln!("Discord RPC last activity restore error: {e}")
161                            }
162                        }
163                    }
164                }
165
166                backoff = 1;
167
168                // generic loop for receiving commands and responding to pings from Discord itself
169                loop {
170                    tokio::select! {
171                        biased;
172
173                        cmd = rx.recv() => {
174                            match cmd {
175                                Some(cmd) => {
176                                    match cmd {
177                                        IPCCommand::SetActivity { activity } => {
178                                            let session_start_unpacked = if let Some(s) = session_start {
179                                                s
180                                            } else {
181                                                let t = get_current_timestamp()?;
182                                                session_start = Some(t);
183                                                t
184                                            };
185
186                                            let activity = *activity;
187                                            last_activity = Some(activity.clone());
188
189                                            if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
190                                                if show_errors {
191                                                    eprintln!("Discord RPC send_activity error: {e}");
192                                                }
193                                                break;
194                                            }
195                                        },
196                                        IPCCommand::ClearActivity => {
197                                            last_activity = None;
198                                            session_start = None;
199
200                                            if let Err(e) = socket.clear_activity().await {
201                                                if show_errors {
202                                                    eprintln!("Discord RPC clear_activity error: {e}");
203                                                }
204                                                break;
205                                            }
206                                        },
207                                        IPCCommand::Close { done }=> {
208                                            let _ = socket.close().await;
209                                            let _ = done.send(());
210                                            break 'outer;
211                                        }
212                                    }
213                                },
214                                None => break,
215                            }
216                        }
217
218                        frame = socket.read_frame() => {
219                            match frame {
220                                Ok(frame) => {
221                                    match frame.opcode {
222                                    1 => {
223                                        if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
224                                            if json.evt.as_deref() == Some("ERROR") && show_errors {
225                                                eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
226                                            } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
227                                                if let Some(f) = &on_activity_send {
228                                                    if let Some(data) = json.data {
229                                                        let data: ActivityResponseData = serde_json::from_value(data)?;
230                                                        f(data)
231                                                    }
232                                                }
233                                            }
234                                        }
235                                    }
236                                    2 => break,
237                                    3 => {
238                                        if let Err(e) = socket.send_frame(3, frame.body).await {
239                                            if show_errors {
240                                                eprintln!("Discord RPC send_frame error: {e}");
241                                            }
242                                            break;
243                                        }
244                                    }
245                                    _ => {}
246                                }
247                                },
248                                Err(e) => {
249                                    if show_errors {
250                                        eprintln!("Discord RPC generic frame read error: {e}")
251                                    }
252                                    break;
253                                },
254                            }
255                        }
256                    }
257                }
258
259                sleep(Duration::from_secs(backoff)).await;
260                backoff = (backoff * 2).min(4);
261            }
262
263            Ok(())
264        });
265
266        self.join_handle = Some(join_handle);
267
268        if wait_for_ready {
269            match ready_rx.await {
270                Ok(()) => (),
271                Err(_) => bail!("Background task exited before READY."),
272            }
273        }
274
275        Ok(&self.client)
276    }
277
278    /// Returns a clone of the client handle for sharing.
279    #[must_use]
280    pub fn clone_handle(&self) -> PresenceClient {
281        self.client.clone()
282    }
283
284    /// Waits for the IPC task to finish.
285    pub async fn wait(&mut self) -> Result<()> {
286        if let Some(handle) = self.join_handle.take() {
287            handle.await??;
288        }
289
290        Ok(())
291    }
292}