Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15    runtime::{Builder, Runtime},
16    sync::{
17        mpsc::{self, Sender},
18        oneshot,
19    },
20    task::JoinHandle,
21    time::sleep,
22};
23
24use crate::{
25    socket::DiscordIPCSocket,
26    types::{ActivityPayload, IPCActivityCmd, TimestampPayload},
27};
28
29/*
30 *
31 * Helper funcs
32 *
33 */
34
35fn get_current_timestamp() -> Result<u64> {
36    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
37}
38
39/*
40 *
41 * Frame/cmd structs
42 *
43 */
44
45#[derive(Debug)]
46enum IPCCommand {
47    SetActivity { activity: Activity },
48    ClearActivity,
49    Close,
50}
51
52#[derive(Debug, Deserialize)]
53struct RpcFrame {
54    cmd: Option<String>,
55    evt: Option<String>,
56    data: Option<ReadyData>,
57}
58
59/// An object which is passed during READY capture from Discord IPC instance.
60#[derive(Debug, Clone, Deserialize)]
61pub struct ReadyData {
62    pub user: DiscordUser,
63}
64
65/// Represents a Discord user.
66#[derive(Debug, Clone, Deserialize)]
67pub struct DiscordUser {
68    pub id: String,
69    pub username: String,
70    pub global_name: Option<String>,
71    pub discriminator: Option<String>,
72    pub avatar: Option<String>,
73    pub avatar_decoration_data: Option<serde_json::Value>,
74    pub bot: bool,
75    pub flags: Option<u64>,
76    pub premium_type: Option<u64>,
77}
78
79/// Represents a Discord Rich Presence activity.
80#[derive(Debug, Clone)]
81pub struct Activity {
82    details: Option<String>,
83    state: Option<String>,
84    duration: Option<Duration>,
85}
86
87pub struct ActivityBuilder;
88
89/// A Rich Presence activity with top text and possibly more attributes.
90/// [`ActivityWithDetails::build`] needs to be called on it in order to
91/// turn it into a proper [`Activity`] instance.
92pub struct ActivityWithDetails {
93    details: String,
94    state: Option<String>,
95    duration: Option<Duration>,
96}
97
98impl Activity {
99    pub fn new() -> ActivityBuilder {
100        ActivityBuilder
101    }
102
103    /// Initializes a Rich Presence activity without any content; useful for small apps.
104    pub fn build_empty() -> Self {
105        Self {
106            details: None,
107            state: None,
108            duration: None,
109        }
110    }
111}
112
113impl ActivityBuilder {
114    /// Top text for your activity.
115    pub fn details(self, details: impl Into<String>) -> ActivityWithDetails {
116        ActivityWithDetails {
117            details: details.into(),
118            state: None,
119            duration: None,
120        }
121    }
122}
123
124impl ActivityWithDetails {
125    /// Bottom text for your activity.
126    pub fn state(mut self, state: impl Into<String>) -> Self {
127        self.state = Some(state.into());
128        self
129    }
130
131    /// Countdown duration for your activity.
132    pub fn duration(mut self, duration: Duration) -> Self {
133        self.duration = Some(duration);
134        self
135    }
136
137    /// Parses the state of this builder into a usable [`Activity`] for you to pass through either [`DiscordIPC::set_activity`]
138    /// or [`DiscordIPCSync::set_activity`].
139    pub fn build(self) -> Activity {
140        Activity {
141            details: Some(self.details),
142            state: self.state,
143            duration: self.duration,
144        }
145    }
146}
147
148/*
149 *
150 * Async implementation
151 *
152 */
153
154/// Primary struct for you to set and update Discord Rich Presences with.
155pub struct DiscordIPC {
156    tx: Sender<IPCCommand>,
157    client_id: String,
158    running: Arc<AtomicBool>,
159    handle: Option<JoinHandle<Result<()>>>,
160    on_ready: Option<Box<dyn Fn(ReadyData) + Send + Sync + 'static>>,
161}
162
163impl DiscordIPC {
164    /// Creates a new Discord IPC client instance.
165    pub fn new(client_id: &str) -> Self {
166        let (tx, _rx) = mpsc::channel(32);
167
168        Self {
169            tx,
170            client_id: client_id.to_string(),
171            running: Arc::new(AtomicBool::new(false)),
172            handle: None,
173            on_ready: None,
174        }
175    }
176
177    /// Run a particular closure after receiving the READY event from the local Discord IPC server.
178    pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
179        self.on_ready = Some(Box::new(f));
180        self
181    }
182
183    /// The Discord client ID that has been used to initialize this IPC client instance.
184    pub fn client_id(&self) -> String {
185        self.client_id.clone()
186    }
187
188    /// Run the client.
189    /// Must be called before any [`DiscordIPC::set_activity`] calls.
190    pub async fn run(&mut self, wait_for_ready: bool) -> Result<()> {
191        if self.running.swap(true, Ordering::SeqCst) {
192            bail!(
193                "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
194            )
195        }
196
197        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
198        self.tx = tx;
199        let client_id = self.client_id.clone();
200        let running = self.running.clone();
201
202        // oneshot channel to signal when READY is received the first time
203        let (ready_tx, ready_rx) = oneshot::channel::<()>();
204
205        let on_ready = self.on_ready.take();
206
207        let handle = tokio::spawn(async move {
208            let mut backoff = 1;
209            let mut last_activity: Option<Activity> = None;
210            let mut ready_tx = Some(ready_tx);
211
212            'outer: while running.load(Ordering::SeqCst) {
213                // initial connect
214                let mut socket = match DiscordIPCSocket::new().await {
215                    Ok(s) => s,
216                    Err(_) => {
217                        sleep(Duration::from_secs(backoff)).await;
218                        continue;
219                    }
220                };
221
222                // initial handshake
223                if socket.do_handshake(&client_id).await.is_err() {
224                    sleep(Duration::from_secs(backoff)).await;
225                    continue;
226                }
227
228                // wait for READY (blocks until first READY or socket fails)
229                loop {
230                    let frame = match socket.read_frame().await {
231                        Ok(f) => f,
232                        Err(_) => continue 'outer,
233                    };
234
235                    if frame.opcode != 1 {
236                        continue;
237                    }
238
239                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
240                        if json.cmd.as_deref() == Some("DISPATCH")
241                            && json.evt.as_deref() == Some("READY")
242                        {
243                            if let Some(tx) = ready_tx.take() {
244                                let _ = tx.send(());
245                            }
246                            if let (Some(f), Some(data)) = (&on_ready, json.data) {
247                                f(data);
248                            }
249                            break;
250                        }
251
252                        if json.evt.as_deref() == Some("ERROR") {
253                            eprintln!("Discord RPC error: {:?}", json.data);
254                        }
255                    }
256                }
257
258                // should reset per new run() call
259                let session_start = get_current_timestamp()?;
260
261                // reset activity if previous instance failed and this instance is basically reconnecting
262                if let Some(activity) = &last_activity {
263                    let _ = socket.send_activity(activity.clone(), session_start).await;
264                }
265
266                backoff = 1;
267
268                loop {
269                    tokio::select! {
270                        Some(cmd) = rx.recv() => {
271                            match cmd {
272                                IPCCommand::SetActivity { activity } => {
273                                    last_activity = Some(activity.clone());
274
275                                    if socket.send_activity(activity, session_start).await.is_err() {
276                                        break;
277                                    }
278                                },
279                                IPCCommand::ClearActivity => {
280                                    last_activity = None;
281
282                                    if socket.clear_activity().await.is_err() { break; }
283                                },
284                                IPCCommand::Close => {
285                                    let _ = socket.close().await;
286                                    running.store(false, Ordering::SeqCst);
287                                    break 'outer;
288                                }
289                            }
290                        }
291
292                        frame = socket.read_frame() => {
293                            match frame {
294                                Ok(frame) => match frame.opcode {
295                                    1 => {
296                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
297                                            if json.evt.as_deref() == Some("ERROR") {
298                                                eprintln!("Discord RPC error: {:?}", json.data);
299                                            }
300                                        }
301                                    }
302                                    2 => break,
303                                    3 => {
304                                        if socket.send_frame(3, frame.body).await.is_err() { break; }
305                                    }
306                                    _ => {}
307                                },
308                                Err(_) => break,
309                            }
310                        }
311                    }
312                }
313
314                sleep(Duration::from_secs(backoff)).await;
315                backoff = (backoff * 2).min(4);
316            }
317
318            Ok(())
319        });
320
321        self.handle = Some(handle);
322
323        if wait_for_ready {
324            match ready_rx.await {
325                Ok(()) => Ok(()),
326                Err(_) => bail!("Background task exited before READY."),
327            }
328        } else {
329            Ok(())
330        }
331    }
332
333    /// Waits for the IPC task to finish.
334    pub async fn wait(&mut self) -> Result<()> {
335        if let Some(handle) = self.handle.take() {
336            handle.await??;
337        }
338
339        Ok(())
340    }
341
342    /// Checks whether the task is running through the internal atomic indicator flag.
343    pub fn is_running(&self) -> bool {
344        self.running.load(Ordering::SeqCst)
345    }
346
347    /// Sets/updates the Discord Rich presence activity.
348    /// [`DiscordIPC::run`] must be executed prior to calling this.
349    pub async fn set_activity(&self, activity: Activity) -> Result<()> {
350        if !self.is_running() {
351            bail!("Call .run() before .set_activity() execution.");
352        }
353
354        self.tx.send(IPCCommand::SetActivity { activity }).await?;
355        Ok(())
356    }
357
358    /// Clears a previously set Discord Rich Presence activity.
359    pub async fn clear_activity(&self) -> Result<()> {
360        if self.is_running() {
361            self.tx.send(IPCCommand::ClearActivity).await?;
362        }
363
364        Ok(())
365    }
366
367    /// Closes the current connection if any.
368    pub async fn close(&mut self) -> Result<()> {
369        if self.is_running() {
370            let _ = self.tx.send(IPCCommand::Close).await;
371            if let Some(handle) = self.handle.take() {
372                if let Err(e) = handle.await {
373                    eprintln!("DiscordIPC background task failed on close: {e:?}");
374                }
375            }
376        }
377
378        Ok(())
379    }
380}
381
382/*
383 *
384 * Extension of DiscordIPCSocket
385 * (for convenience)
386 *
387 */
388
389impl DiscordIPCSocket {
390    async fn send_activity(&mut self, activity: Activity, session_start: u64) -> Result<()> {
391        let current_t = get_current_timestamp()?;
392        let end_timestamp = activity.duration.map(|d| current_t + d.as_secs());
393
394        let cmd = IPCActivityCmd::new_with(Some(ActivityPayload {
395            details: activity.details,
396            state: activity.state,
397            timestamps: TimestampPayload {
398                start: session_start,
399                end: end_timestamp,
400            },
401        }));
402
403        self.send_cmd(cmd).await
404    }
405
406    async fn clear_activity(&mut self) -> Result<()> {
407        let cmd = IPCActivityCmd::new_with(None);
408        self.send_cmd(cmd).await?;
409        Ok(())
410    }
411
412    async fn do_handshake(&mut self, client_id: &str) -> Result<()> {
413        let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
414        self.send_frame(0, handshake).await?;
415        Ok(())
416    }
417
418    async fn send_cmd(&mut self, cmd: IPCActivityCmd) -> Result<()> {
419        let cmd = cmd.to_json()?;
420        self.send_frame(1, cmd).await?;
421        Ok(())
422    }
423}
424
425/*
426 *
427 * Blocking implementation
428 *
429 */
430
431/// Blocking implementation of [`DiscordIPC`].
432pub struct DiscordIPCSync {
433    inner: DiscordIPC,
434    rt: Runtime,
435}
436
437impl DiscordIPCSync {
438    /// Creates a new Discord IPC client instance.
439    pub fn new(client_id: &str) -> Result<Self> {
440        let rt = Builder::new_multi_thread().enable_all().build()?;
441        let inner = DiscordIPC::new(client_id);
442
443        Ok(Self { inner, rt })
444    }
445
446    /// Run a particular closure after receiving the READY event from the local Discord IPC server.
447    pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
448        self.inner.on_ready = Some(Box::new(f));
449        self
450    }
451
452    /// The Discord client ID that has been used to initialize this IPC client instance.
453    pub fn client_id(&self) -> String {
454        self.inner.client_id()
455    }
456
457    /// Run the client.
458    /// Must be called before any [`DiscordIPCSync::set_activity`] calls.
459    pub fn run(&mut self, wait_for_ready: bool) -> Result<()> {
460        self.rt.block_on(self.inner.run(wait_for_ready))
461    }
462
463    /// Waits for the IPC task to finish.
464    /// [`DiscordIPCSync::run`] must be called to spawn it in the first place.
465    pub fn wait(&mut self) -> Result<()> {
466        self.rt.block_on(self.inner.wait())
467    }
468
469    /// Checks whether the task is running through the internal atomic indicator flag.
470    pub fn is_running(&self) -> bool {
471        self.inner.is_running()
472    }
473
474    /// Sets/updates the Discord Rich presence activity.
475    /// [`DiscordIPCSync::run`] must be executed prior to calling this.
476    pub fn set_activity(&self, activity: Activity) -> Result<()> {
477        self.rt.block_on(self.inner.set_activity(activity))
478    }
479
480    /// Clears a previously set Discord Rich Presence activity.
481    pub fn clear_activity(&self) -> Result<()> {
482        self.rt.block_on(self.inner.clear_activity())
483    }
484
485    /// Closes the current connection if any.
486    pub fn close(&mut self) -> Result<()> {
487        self.rt.block_on(self.inner.close())
488    }
489}
490
491impl Drop for DiscordIPCSync {
492    fn drop(&mut self) {
493        let _ = self.rt.block_on(self.inner.close());
494    }
495}