Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15    runtime::{Builder, Runtime},
16    sync::mpsc::{self, Sender},
17    task::JoinHandle,
18    time::{sleep, timeout},
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24/*
25 *
26 * Helper funcs
27 *
28 */
29
30fn get_current_timestamp() -> Result<u64> {
31    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35    let mut bytes = Vec::with_capacity(8);
36    bytes.extend_from_slice(&opcode.to_le_bytes());
37    bytes.extend_from_slice(&data_len.to_le_bytes());
38    bytes
39}
40
41/*
42 *
43 * General functionality
44 *
45 */
46
47#[derive(Debug)]
48enum IPCCommand {
49    SetActivity { details: String, state: String },
50    ClearActivity,
51    Close,
52}
53
54#[derive(Debug, Deserialize)]
55struct RpcFrame {
56    cmd: Option<String>,
57    evt: Option<String>,
58    data: Option<serde_json::Value>,
59}
60
61/// Primary struct for you to set and update Discord Rich Presences with.
62#[derive(Debug)]
63pub struct DiscordIPC {
64    tx: Sender<IPCCommand>,
65    client_id: String,
66    handle: Option<JoinHandle<Result<()>>>,
67    running: Arc<AtomicBool>,
68}
69
70impl DiscordIPC {
71    /// Creates a new asynchronous Discord IPC client with the given client ID.
72    #[must_use]
73    pub fn new(client_id: &str) -> Self {
74        let (tx, _rx) = mpsc::channel(32);
75
76        Self {
77            tx,
78            client_id: client_id.to_string(),
79            handle: None,
80            running: Arc::new(AtomicBool::new(false)),
81        }
82    }
83
84    /// Returns the Discord client ID that has been used to initialize this IPC client.
85    #[must_use]
86    pub fn client_id(&self) -> String {
87        self.client_id.clone()
88    }
89
90    /// Checks internally whether or not an existing IPC client instance
91    /// is running and currently attached with Discord.
92    pub fn is_running(&self) -> bool {
93        self.running.load(Ordering::SeqCst) && self.handle.is_some()
94    }
95
96    /// Runs the Discord IPC client.
97    pub async fn run(&mut self) -> Result<()> {
98        if self.is_running() {
99            bail!("Cannot run multiple instances of .run().")
100        }
101
102        self.running.store(true, Ordering::SeqCst);
103        let running = self.running.clone();
104
105        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
106        self.tx = tx;
107        let client_id = self.client_id.clone();
108
109        let handle = tokio::spawn(async move {
110            let mut backoff = 1;
111            let mut last_activity: Option<(String, String)> = None;
112            let timestamp = get_current_timestamp()?;
113
114            while running.load(Ordering::SeqCst) {
115                // initial connect
116                let mut socket = match DiscordIPCSocket::new().await {
117                    Ok(s) => s,
118                    Err(_) => {
119                        sleep(Duration::from_secs(backoff)).await;
120                        continue;
121                    }
122                };
123
124                // initial handshake
125                let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
126                let packed = pack(0, handshake.len() as u32);
127
128                if socket.write(&packed).await.is_err()
129                    || socket.write(handshake.as_bytes()).await.is_err()
130                {
131                    sleep(Duration::from_secs(backoff)).await;
132                    continue;
133                }
134
135                // wait for ready
136                loop {
137                    let frame = match socket.read_frame().await {
138                        Ok(f) => f,
139                        Err(_) => break,
140                    };
141
142                    if frame.opcode != 1 {
143                        continue;
144                    }
145
146                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
147                        if json.cmd.as_deref() == Some("DISPATCH")
148                            && json.evt.as_deref() == Some("READY")
149                        {
150                            break;
151                        }
152                        if json.evt.as_deref() == Some("ERROR") {
153                            eprintln!("Discord RPC error: {:?}", json.data);
154                        }
155                    }
156                }
157
158                // reset activity if previous instance failed and this instance is basically reconnecting
159                if let Some((details, state)) = &last_activity {
160                    send_activity(&mut socket, details, state, timestamp).await?;
161                }
162
163                backoff = 1;
164
165                loop {
166                    tokio::select! {
167                        Some(cmd) = rx.recv() => {
168                            match cmd {
169                                IPCCommand::SetActivity { details, state } => {
170                                    last_activity = Some((details.clone(), state.clone()));
171
172                                    if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
173                                        break;
174                                    }
175                                },
176                                IPCCommand::ClearActivity => {
177                                    if clear_activity(&mut socket).await.is_err() {
178                                        break;
179                                    }
180                                }
181                                IPCCommand::Close => {
182                                    clear_activity(&mut socket).await?;
183                                    socket.close().await?;
184                                    running.store(false, Ordering::SeqCst);
185                                    return Ok(());
186                                }
187                            }
188                        }
189
190                        frame = socket.read_frame() => {
191                            match frame {
192                                Ok(frame) => match frame.opcode {
193                                    1 => {
194                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
195                                            if json.evt.as_deref() == Some("ERROR") {
196                                                eprintln!("Discord RPC error: {:?}", json.data);
197                                            }
198                                        }
199                                    }
200                                    2 => break,
201                                    3 => {
202                                        let packed = pack(3, frame.body.len() as u32);
203                                        socket.write(&packed).await?;
204                                        socket.write(&frame.body).await?;
205                                    }
206                                    _ => {}
207                                },
208                                Err(_) => break,
209                            }
210                        }
211                    }
212                }
213
214                sleep(Duration::from_secs(backoff)).await;
215                backoff = (backoff * 2).min(4);
216            }
217
218            Ok(())
219        });
220
221        self.handle = Some(handle);
222
223        Ok(())
224    }
225
226    /// Waits for the primary IPC client task to finish.
227    /// Can also be used to keep the IPC client running forever.
228    pub async fn wait(&mut self) -> Result<()> {
229        if let Some(handle) = self.handle.take() {
230            handle.await??;
231        }
232        Ok(())
233    }
234
235    /// Sets/updates a Discord Rich Presence activity.
236    pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
237        if !self.is_running() {
238            bail!("Call .run() before .set_activity().");
239        }
240
241        self.tx
242            .send(IPCCommand::SetActivity {
243                details: details.to_string(),
244                state: state.to_string(),
245            })
246            .await?;
247
248        Ok(())
249    }
250
251    /// Clears a previously-set Discord Rich Presence activity. Prefer this function over
252    /// close() if the current process is not being exited with the presence toggle.
253    pub async fn clear_activity(&self) -> Result<()> {
254        if self.is_running() {
255            self.tx.send(IPCCommand::ClearActivity).await?;
256        }
257
258        Ok(())
259    }
260
261    const CLOSE_COOLDOWN_MS: u64 = 200;
262
263    /// Closes the current session of Rich Presence activity.
264    ///
265    /// NOTE: Always try to use one activity per session as Discord might sometimes behave
266    /// weirdly with session closes; this was only found noticeable if run() and close() were
267    /// repeatedly called.
268    pub async fn close(&mut self) -> Result<()> {
269        if !self.running.swap(false, Ordering::SeqCst) {
270            return Ok(());
271        }
272
273        self.tx.send(IPCCommand::Close).await?;
274
275        if let Some(handle) = self.handle.take() {
276            match timeout(Duration::from_secs(2), handle).await {
277                Ok(result) => result??,
278                Err(_) => eprintln!("DiscordIPC close() timed out."),
279            }
280        }
281
282        sleep(Duration::from_millis(Self::CLOSE_COOLDOWN_MS)).await;
283
284        Ok(())
285    }
286}
287
288/*
289 *
290 * Command-specific functions
291 *
292 */
293
294async fn send_activity(
295    socket: &mut DiscordIPCSocket,
296    details: &str,
297    state: &str,
298    timestamp: u64,
299) -> Result<()> {
300    let json = json!({
301        "cmd": "SET_ACTIVITY",
302        "args": {
303            "pid": std::process::id(),
304            "activity": {
305                "details": details,
306                "state": state,
307                "timestamps": { "start": timestamp }
308            }
309        },
310        "nonce": Uuid::new_v4().to_string()
311    })
312    .to_string();
313
314    let packed = pack(1, json.len() as u32);
315    socket.write(&packed).await?;
316    socket.write(json.as_bytes()).await?;
317    Ok(())
318}
319
320async fn clear_activity(socket: &mut DiscordIPCSocket) -> Result<()> {
321    let json = json!({
322        "cmd": "SET_ACTIVITY",
323        "args": {
324            "pid": std::process::id(),
325            "activity": null
326        },
327        "nonce": Uuid::new_v4().to_string()
328    })
329    .to_string();
330
331    let packed = pack(1, json.len() as u32);
332    socket.write(&packed).await?;
333    socket.write(json.as_bytes()).await?;
334    Ok(())
335}
336
337/*
338 *
339 * Blocking wrapper
340 *
341 */
342
343/// Synchronous wrapper around [`DiscordIPC`] for use in non-async contexts.
344/// All operations block on an internal Tokio runtime.
345#[derive(Debug)]
346pub struct DiscordIPCSync {
347    inner: DiscordIPC,
348    rt: Runtime,
349}
350
351impl DiscordIPCSync {
352    /// Creates a new synchronous Discord IPC client with the given client ID.
353    pub fn new(client_id: &str) -> Result<Self> {
354        let rt = Builder::new_multi_thread().enable_all().build()?;
355        let inner = DiscordIPC::new(client_id);
356        Ok(Self { inner, rt })
357    }
358
359    /// Returns the Discord client ID that has been used to initialize this IPC client.
360    pub fn client_id(&self) -> String {
361        self.inner.client_id()
362    }
363
364    /// Run the IPC client. Must be called before any `set_activity()` calls.
365    pub fn run(&mut self) -> Result<()> {
366        self.rt.block_on(self.inner.run())
367    }
368
369    /// Waits for the primary IPC task to finish.
370    pub fn wait(&mut self) -> Result<()> {
371        self.rt.block_on(self.inner.wait())
372    }
373
374    /// Sets/updates the Discord Rich Presence activity.
375    /// `run()` must be called prior to calling this.
376    pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
377        self.rt.block_on(self.inner.set_activity(details, state))
378    }
379
380    /// Clears the current Discord Rich Presence activity without closing the session.
381    pub fn clear_activity(&self) -> Result<()> {
382        self.rt.block_on(self.inner.clear_activity())
383    }
384
385    /// Closes the current session of Rich Presence activity.
386    /// After this, you can safely call `run()` again.
387    pub fn close(&mut self) -> Result<()> {
388        self.rt.block_on(self.inner.close())
389    }
390}