Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15    runtime::{Builder, Runtime},
16    sync::{
17        mpsc::{self, Sender},
18        oneshot,
19    },
20    task::JoinHandle,
21    time::sleep,
22};
23
24use crate::{
25    socket::DiscordIPCSocket,
26    types::{ActivityPayload, IPCActivityCmd, TimestampPayload},
27};
28
29/*
30 *
31 * Helper funcs
32 *
33 */
34
35fn get_current_timestamp() -> Result<u64> {
36    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
37}
38
39fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
40    let mut bytes = Vec::with_capacity(8);
41    bytes.extend_from_slice(&opcode.to_le_bytes());
42    bytes.extend_from_slice(&data_len.to_le_bytes());
43    bytes
44}
45
46/*
47 *
48 * Frame/cmd structs
49 *
50 */
51
52#[derive(Debug)]
53enum IPCCommand {
54    SetActivity { activity: Activity },
55    ClearActivity,
56    Close,
57}
58
59#[derive(Debug, Deserialize)]
60struct RpcFrame {
61    cmd: Option<String>,
62    evt: Option<String>,
63    data: Option<ReadyData>,
64}
65
66/// An object which is passed during READY capture from Discord IPC instance.
67#[derive(Debug, Clone, Deserialize)]
68pub struct ReadyData {
69    pub user: DiscordUser,
70}
71
72/// Represents a Discord user.
73#[derive(Debug, Clone, Deserialize)]
74pub struct DiscordUser {
75    pub id: String,
76    pub username: String,
77    pub global_name: Option<String>,
78    pub discriminator: Option<String>,
79    pub avatar: Option<String>,
80    pub avatar_decoration_data: Option<serde_json::Value>,
81    pub bot: bool,
82    pub flags: Option<u64>,
83    pub premium_type: Option<u64>,
84}
85
86/// Represents a Discord Rich Presence activity.
87#[derive(Debug, Clone)]
88pub struct Activity {
89    pub(crate) details: String,
90    pub(crate) state: Option<String>,
91    pub(crate) duration: Option<Duration>,
92}
93
94impl Activity {
95    /// Create a new Discord Rich Presence activity.
96    pub fn new(details: impl Into<String>) -> Self {
97        Self {
98            details: details.into(),
99            state: None,
100            duration: None,
101        }
102    }
103
104    /// Sets the state (bottom text) for the activity.
105    pub fn state(mut self, state: impl Into<String>) -> Self {
106        self.state = Some(state.into());
107        self
108    }
109
110    /// Sets the duration for the activity. This is used to create a countdown timer.
111    pub fn duration(mut self, duration: Duration) -> Self {
112        self.duration = Some(duration);
113        self
114    }
115}
116
117/*
118 *
119 * Async implementation
120 *
121 */
122
123/// Primary struct for you to set and update Discord Rich Presences with.
124pub struct DiscordIPC {
125    tx: Sender<IPCCommand>,
126    client_id: String,
127    running: Arc<AtomicBool>,
128    handle: Option<JoinHandle<Result<()>>>,
129    on_ready: Option<Box<dyn Fn(ReadyData) + Send + Sync + 'static>>,
130}
131
132impl DiscordIPC {
133    /// Creates a new Discord IPC client instance.
134    pub fn new(client_id: &str) -> Self {
135        let (tx, _rx) = mpsc::channel(32);
136
137        Self {
138            tx,
139            client_id: client_id.to_string(),
140            running: Arc::new(AtomicBool::new(false)),
141            handle: None,
142            on_ready: None,
143        }
144    }
145
146    /// Run a particular closure after receiving the READY event from the local Discord IPC server.
147    pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
148        self.on_ready = Some(Box::new(f));
149        self
150    }
151
152    /// The Discord client ID that has been used to initialize this IPC client instance.
153    pub fn client_id(&self) -> String {
154        self.client_id.clone()
155    }
156
157    /// Run the client.
158    /// Must be called before any [`DiscordIPC::set_activity`] calls.
159    pub async fn run(&mut self, wait_for_ready: bool) -> Result<()> {
160        if self.running.swap(true, Ordering::SeqCst) {
161            bail!(
162                "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
163            )
164        }
165
166        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
167        self.tx = tx;
168        let client_id = self.client_id.clone();
169        let running = self.running.clone();
170
171        // oneshot channel to signal when READY is received the first time
172        let (ready_tx, ready_rx) = oneshot::channel::<()>();
173
174        let on_ready = self.on_ready.take();
175
176        let handle = tokio::spawn(async move {
177            let mut backoff = 1;
178            let mut last_activity: Option<Activity> = None;
179            let mut ready_tx = Some(ready_tx);
180
181            'outer: while running.load(Ordering::SeqCst) {
182                // initial connect
183                let mut socket = match DiscordIPCSocket::new().await {
184                    Ok(s) => s,
185                    Err(_) => {
186                        sleep(Duration::from_secs(backoff)).await;
187                        continue;
188                    }
189                };
190
191                // initial handshake
192                if socket.do_handshake(&client_id).await.is_err() {
193                    sleep(Duration::from_secs(backoff)).await;
194                    continue;
195                }
196
197                // wait for READY (blocks until first READY or socket fails)
198                loop {
199                    let frame = match socket.read_frame().await {
200                        Ok(f) => f,
201                        Err(_) => continue 'outer,
202                    };
203
204                    if frame.opcode != 1 {
205                        continue;
206                    }
207
208                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
209                        if json.cmd.as_deref() == Some("DISPATCH")
210                            && json.evt.as_deref() == Some("READY")
211                        {
212                            if let Some(tx) = ready_tx.take() {
213                                let _ = tx.send(());
214                            }
215                            if let (Some(f), Some(data)) = (&on_ready, json.data) {
216                                f(data);
217                            }
218                            break;
219                        }
220
221                        if json.evt.as_deref() == Some("ERROR") {
222                            eprintln!("Discord RPC error: {:?}", json.data);
223                        }
224                    }
225                }
226
227                // should reset per new run() call
228                let session_start = get_current_timestamp()?;
229
230                // reset activity if previous instance failed and this instance is basically reconnecting
231                if let Some(activity) = &last_activity {
232                    let _ = socket.send_activity(activity.clone(), session_start).await;
233                }
234
235                backoff = 1;
236
237                loop {
238                    tokio::select! {
239                        Some(cmd) = rx.recv() => {
240                            match cmd {
241                                IPCCommand::SetActivity { activity } => {
242                                    last_activity = Some(activity.clone());
243
244                                    if socket.send_activity(activity, session_start).await.is_err() {
245                                        break;
246                                    }
247                                },
248                                IPCCommand::ClearActivity => {
249                                    last_activity = None;
250
251                                    if socket.clear_activity().await.is_err() { break; }
252                                },
253                                IPCCommand::Close => {
254                                    let _ = socket.close().await;
255                                    running.store(false, Ordering::SeqCst);
256                                    break 'outer;
257                                }
258                            }
259                        }
260
261                        frame = socket.read_frame() => {
262                            match frame {
263                                Ok(frame) => match frame.opcode {
264                                    1 => {
265                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
266                                            if json.evt.as_deref() == Some("ERROR") {
267                                                eprintln!("Discord RPC error: {:?}", json.data);
268                                            }
269                                        }
270                                    }
271                                    2 => break,
272                                    3 => {
273                                        let packed = pack(3, frame.body.len() as u32);
274                                        if socket.write(&packed).await.is_err() { break; }
275                                        if socket.write(&frame.body).await.is_err() { break; }
276                                    }
277                                    _ => {}
278                                },
279                                Err(_) => break,
280                            }
281                        }
282                    }
283                }
284
285                sleep(Duration::from_secs(backoff)).await;
286                backoff = (backoff * 2).min(4);
287            }
288
289            Ok(())
290        });
291
292        self.handle = Some(handle);
293
294        if wait_for_ready {
295            match ready_rx.await {
296                Ok(()) => Ok(()),
297                Err(_) => bail!("Background task exited before READY."),
298            }
299        } else {
300            Ok(())
301        }
302    }
303
304    /// Waits for the IPC task to finish.
305    pub async fn wait(&mut self) -> Result<()> {
306        if let Some(handle) = self.handle.take() {
307            handle.await??;
308        }
309
310        Ok(())
311    }
312
313    /// Checks whether the task is running through the internal atomic indicator flag.
314    pub fn is_running(&self) -> bool {
315        self.running.load(Ordering::SeqCst)
316    }
317
318    /// Sets/updates the Discord Rich presence activity.
319    /// [`DiscordIPC::run`] must be executed prior to calling this.
320    pub async fn set_activity(&self, activity: Activity) -> Result<()> {
321        if !self.is_running() {
322            bail!("Call .run() before .set_activity() execution.");
323        }
324
325        self.tx.send(IPCCommand::SetActivity { activity }).await?;
326        Ok(())
327    }
328
329    /// Clears a previously set Discord Rich Presence activity.
330    pub async fn clear_activity(&self) -> Result<()> {
331        if self.is_running() {
332            self.tx.send(IPCCommand::ClearActivity).await?;
333        }
334
335        Ok(())
336    }
337
338    /// Closes the current connection if any.
339    pub async fn close(&mut self) -> Result<()> {
340        if self.is_running() {
341            let _ = self.tx.send(IPCCommand::Close).await;
342            if let Some(handle) = self.handle.take() {
343                if let Err(e) = handle.await {
344                    eprintln!("DiscordIPC background task failed on close: {e:?}");
345                }
346            }
347        }
348
349        Ok(())
350    }
351}
352
353/*
354 *
355 * Extension of DiscordIPCSocket
356 * (for convenience)
357 *
358 */
359
360impl DiscordIPCSocket {
361    async fn send_activity(&mut self, activity: Activity, session_start: u64) -> Result<()> {
362        let current_t = get_current_timestamp()?;
363        let end_timestamp = activity.duration.map(|d| current_t + d.as_secs());
364
365        let cmd = IPCActivityCmd::new_with(Some(ActivityPayload {
366            details: activity.details,
367            state: activity.state,
368            timestamps: TimestampPayload {
369                start: session_start,
370                end: end_timestamp,
371            },
372        }));
373
374        self.send_cmd(cmd).await
375    }
376
377    async fn clear_activity(&mut self) -> Result<()> {
378        let cmd = IPCActivityCmd::new_with(None);
379        self.send_cmd(cmd).await?;
380        Ok(())
381    }
382
383    async fn do_handshake(&mut self, client_id: &str) -> Result<()> {
384        let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
385        let packed = pack(0, handshake.len() as u32);
386
387        self.write(&packed).await?;
388        self.write(handshake.as_bytes()).await?;
389
390        Ok(())
391    }
392
393    async fn send_cmd(&mut self, cmd: IPCActivityCmd) -> Result<()> {
394        let cmd = cmd.to_json()?;
395
396        let packed = pack(1, cmd.len() as u32);
397        self.write(&packed).await?;
398        self.write(cmd.as_bytes()).await?;
399        Ok(())
400    }
401}
402
403/*
404 *
405 * Blocking implementation
406 *
407 */
408
409/// Blocking implementation of [`DiscordIPC`].
410pub struct DiscordIPCSync {
411    inner: DiscordIPC,
412    rt: Runtime,
413}
414
415impl DiscordIPCSync {
416    /// Creates a new Discord IPC client instance.
417    pub fn new(client_id: &str) -> Result<Self> {
418        let rt = Builder::new_multi_thread().enable_all().build()?;
419        let inner = DiscordIPC::new(client_id);
420
421        Ok(Self { inner, rt })
422    }
423
424    /// Run a particular closure after receiving the READY event from the local Discord IPC server.
425    pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
426        self.inner.on_ready = Some(Box::new(f));
427        self
428    }
429
430    /// The Discord client ID that has been used to initialize this IPC client instance.
431    pub fn client_id(&self) -> String {
432        self.inner.client_id()
433    }
434
435    /// Run the client.
436    /// Must be called before any [`DiscordIPCSync::set_activity`] calls.
437    pub fn run(&mut self, wait_for_ready: bool) -> Result<()> {
438        self.rt.block_on(self.inner.run(wait_for_ready))
439    }
440
441    /// Waits for the IPC task to finish.
442    /// [`DiscordIPCSync::run`] must be called to spawn it in the first place.
443    pub fn wait(&mut self) -> Result<()> {
444        self.rt.block_on(self.inner.wait())
445    }
446
447    /// Checks whether the task is running through the internal atomic indicator flag.
448    pub fn is_running(&self) -> bool {
449        self.inner.is_running()
450    }
451
452    /// Sets/updates the Discord Rich presence activity.
453    /// [`DiscordIPCSync::run`] must be executed prior to calling this.
454    pub fn set_activity(&self, activity: Activity) -> Result<()> {
455        self.rt.block_on(self.inner.set_activity(activity))
456    }
457
458    /// Clears a previously set Discord Rich Presence activity.
459    pub fn clear_activity(&self) -> Result<()> {
460        self.rt.block_on(self.inner.clear_activity())
461    }
462
463    /// Closes the current connection if any.
464    pub fn close(&mut self) -> Result<()> {
465        self.rt.block_on(self.inner.close())
466    }
467}
468
469impl Drop for DiscordIPCSync {
470    fn drop(&mut self) {
471        let _ = self.rt.block_on(self.inner.close());
472    }
473}