Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15    runtime::{Builder, Runtime},
16    sync::mpsc::{self, Sender},
17    task::JoinHandle,
18    time::{sleep, timeout},
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24/*
25 *
26 * Helper funcs
27 *
28 */
29
30fn get_current_timestamp() -> Result<u64> {
31    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35    let mut bytes = Vec::with_capacity(8);
36    bytes.extend_from_slice(&opcode.to_le_bytes());
37    bytes.extend_from_slice(&data_len.to_le_bytes());
38    bytes
39}
40
41/*
42 *
43 * General functionality
44 *
45 */
46
47#[derive(Debug)]
48enum IPCCommand {
49    SetActivity { details: String, state: String },
50    ClearActivity,
51    Close,
52}
53
54#[derive(Debug, Deserialize)]
55struct RpcFrame {
56    cmd: Option<String>,
57    evt: Option<String>,
58    data: Option<serde_json::Value>,
59}
60
61/// Primary struct for you to set and update Discord Rich Presences with.
62#[derive(Debug)]
63pub struct DiscordIPC {
64    tx: Sender<IPCCommand>,
65    client_id: String,
66    handle: Option<JoinHandle<Result<()>>>,
67    running: Arc<AtomicBool>,
68}
69
70impl DiscordIPC {
71    /// Creates a new asynchronous Discord IPC client with the given client ID.
72    #[must_use]
73    pub fn new(client_id: &str) -> Self {
74        let (tx, _rx) = mpsc::channel(32);
75
76        Self {
77            tx,
78            client_id: client_id.to_string(),
79            handle: None,
80            running: Arc::new(AtomicBool::new(false)),
81        }
82    }
83
84    /// Returns the Discord client ID that has been used to initialize this IPC client.
85    #[must_use]
86    pub fn client_id(&self) -> String {
87        self.client_id.clone()
88    }
89
90    /// Checks internally whether or not an existing IPC client instance
91    /// is running and currently attached with Discord.
92    pub fn is_running(&self) -> bool {
93        self.running.load(Ordering::SeqCst) && self.handle.is_some()
94    }
95
96    /// Runs the Discord IPC client.
97    pub async fn run(&mut self) -> Result<()> {
98        if self.handle.is_some() && self.running.swap(true, Ordering::SeqCst) {
99            bail!("Cannot run multiple instances of .run().");
100        }
101
102        let running = self.running.clone();
103
104        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
105        self.tx = tx;
106        let client_id = self.client_id.clone();
107
108        let handle = tokio::spawn(async move {
109            let mut backoff = 1;
110            let mut last_activity: Option<(String, String)> = None;
111            let timestamp = get_current_timestamp()?;
112
113            while running.load(Ordering::SeqCst) {
114                // initial connect
115                let mut socket = match DiscordIPCSocket::new().await {
116                    Ok(s) => s,
117                    Err(_) => {
118                        sleep(Duration::from_secs(backoff)).await;
119                        continue;
120                    }
121                };
122
123                // initial handshake
124                let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
125                let packed = pack(0, handshake.len() as u32);
126
127                if socket.write(&packed).await.is_err()
128                    || socket.write(handshake.as_bytes()).await.is_err()
129                {
130                    sleep(Duration::from_secs(backoff)).await;
131                    continue;
132                }
133
134                // wait for ready
135                loop {
136                    let frame = match socket.read_frame().await {
137                        Ok(f) => f,
138                        Err(_) => break,
139                    };
140
141                    if frame.opcode != 1 {
142                        continue;
143                    }
144
145                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
146                        if json.cmd.as_deref() == Some("DISPATCH")
147                            && json.evt.as_deref() == Some("READY")
148                        {
149                            break;
150                        }
151                        if json.evt.as_deref() == Some("ERROR") {
152                            eprintln!("Discord RPC error: {:?}", json.data);
153                        }
154                    }
155                }
156
157                // reset activity if previous instance failed and this instance is basically reconnecting
158                if let Some((details, state)) = &last_activity {
159                    let _ = send_activity(&mut socket, details, state, timestamp).await;
160                }
161
162                backoff = 1;
163
164                loop {
165                    tokio::select! {
166                        Some(cmd) = rx.recv() => {
167                            match cmd {
168                                IPCCommand::SetActivity { details, state } => {
169                                    last_activity = Some((details.clone(), state.clone()));
170
171                                    if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
172                                        break;
173                                    }
174                                },
175                                IPCCommand::ClearActivity => {
176                                    if clear_activity(&mut socket).await.is_err() {
177                                        break;
178                                    }
179                                }
180                                IPCCommand::Close => {
181                                    clear_activity(&mut socket).await?;
182                                    socket.close().await?;
183                                    running.store(false, Ordering::SeqCst);
184                                    return Ok(());
185                                }
186                            }
187                        }
188
189                        frame = socket.read_frame() => {
190                            match frame {
191                                Ok(frame) => match frame.opcode {
192                                    1 => {
193                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
194                                            if json.evt.as_deref() == Some("ERROR") {
195                                                eprintln!("Discord RPC error: {:?}", json.data);
196                                            }
197                                        }
198                                    }
199                                    2 => break,
200                                    3 => {
201                                        let packed = pack(3, frame.body.len() as u32);
202                                        if socket.write(&packed).await.is_err() { break; }
203                                        if socket.write(&frame.body).await.is_err() { break; }
204                                    }
205                                    _ => {}
206                                },
207                                Err(_) => break,
208                            }
209                        }
210                    }
211                }
212
213                sleep(Duration::from_secs(backoff)).await;
214                backoff = (backoff * 2).min(4);
215            }
216
217            Ok(())
218        });
219
220        self.handle = Some(handle);
221
222        Ok(())
223    }
224
225    /// Waits for the primary IPC client task to finish.
226    /// Can also be used to keep the IPC client running forever.
227    pub async fn wait(&mut self) -> Result<()> {
228        if let Some(handle) = self.handle.take() {
229            handle.await??;
230        }
231        Ok(())
232    }
233
234    /// Sets/updates a Discord Rich Presence activity.
235    pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
236        if !self.is_running() {
237            bail!("Call .run() before .set_activity().");
238        }
239
240        self.tx
241            .send(IPCCommand::SetActivity {
242                details: details.to_string(),
243                state: state.to_string(),
244            })
245            .await?;
246
247        Ok(())
248    }
249
250    /// Clears a previously-set Discord Rich Presence activity. Prefer this function over
251    /// close() if the current process is not being exited with the presence toggle.
252    pub async fn clear_activity(&self) -> Result<()> {
253        if self.is_running() {
254            self.tx.send(IPCCommand::ClearActivity).await?;
255        }
256
257        Ok(())
258    }
259
260    const CLOSE_COOLDOWN_MS: u64 = 200;
261
262    /// Closes the current session of Rich Presence activity.
263    ///
264    /// NOTE: Always try to use one activity per session as Discord might sometimes behave
265    /// weirdly with session closes; this was only found noticeable if run() and close() were
266    /// repeatedly called.
267    pub async fn close(&mut self) -> Result<()> {
268        if !self.running.swap(false, Ordering::SeqCst) {
269            return Ok(());
270        }
271
272        self.tx.send(IPCCommand::Close).await?;
273
274        if let Some(handle) = self.handle.take() {
275            match timeout(Duration::from_secs(2), handle).await {
276                Ok(result) => result??,
277                Err(_) => eprintln!("DiscordIPC close() timed out."),
278            }
279        }
280
281        sleep(Duration::from_millis(Self::CLOSE_COOLDOWN_MS)).await;
282
283        Ok(())
284    }
285}
286
287/*
288 *
289 * Command-specific functions
290 *
291 */
292
293async fn send_activity(
294    socket: &mut DiscordIPCSocket,
295    details: &str,
296    state: &str,
297    timestamp: u64,
298) -> Result<()> {
299    let json = json!({
300        "cmd": "SET_ACTIVITY",
301        "args": {
302            "pid": std::process::id(),
303            "activity": {
304                "details": details,
305                "state": state,
306                "timestamps": { "start": timestamp }
307            }
308        },
309        "nonce": Uuid::new_v4().to_string()
310    })
311    .to_string();
312
313    let packed = pack(1, json.len() as u32);
314    socket.write(&packed).await?;
315    socket.write(json.as_bytes()).await?;
316    Ok(())
317}
318
319async fn clear_activity(socket: &mut DiscordIPCSocket) -> Result<()> {
320    let json = json!({
321        "cmd": "SET_ACTIVITY",
322        "args": {
323            "pid": std::process::id(),
324            "activity": null
325        },
326        "nonce": Uuid::new_v4().to_string()
327    })
328    .to_string();
329
330    let packed = pack(1, json.len() as u32);
331    socket.write(&packed).await?;
332    socket.write(json.as_bytes()).await?;
333    Ok(())
334}
335
336/*
337 *
338 * Blocking wrapper
339 *
340 */
341
342/// Synchronous wrapper around [`DiscordIPC`] for use in non-async contexts.
343/// All operations block on an internal Tokio runtime.
344#[derive(Debug)]
345pub struct DiscordIPCSync {
346    inner: DiscordIPC,
347    rt: Runtime,
348}
349
350impl DiscordIPCSync {
351    /// Creates a new synchronous Discord IPC client with the given client ID.
352    pub fn new(client_id: &str) -> Result<Self> {
353        let rt = Builder::new_multi_thread().enable_all().build()?;
354        let inner = DiscordIPC::new(client_id);
355        Ok(Self { inner, rt })
356    }
357
358    /// Returns the Discord client ID that has been used to initialize this IPC client.
359    pub fn client_id(&self) -> String {
360        self.inner.client_id()
361    }
362
363    /// Run the IPC client. Must be called before any `set_activity()` calls.
364    pub fn run(&mut self) -> Result<()> {
365        self.rt.block_on(self.inner.run())
366    }
367
368    /// Waits for the primary IPC task to finish.
369    pub fn wait(&mut self) -> Result<()> {
370        self.rt.block_on(self.inner.wait())
371    }
372
373    /// Sets/updates the Discord Rich Presence activity.
374    /// `run()` must be called prior to calling this.
375    pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
376        self.rt.block_on(self.inner.set_activity(details, state))
377    }
378
379    /// Clears the current Discord Rich Presence activity without closing the session.
380    pub fn clear_activity(&self) -> Result<()> {
381        self.rt.block_on(self.inner.clear_activity())
382    }
383
384    /// Closes the current session of Rich Presence activity.
385    /// After this, you can safely call `run()` again.
386    pub fn close(&mut self) -> Result<()> {
387        self.rt.block_on(self.inner.close())
388    }
389}