Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15    runtime::{Builder, Runtime},
16    sync::{
17        mpsc::{self, Sender},
18        oneshot,
19    },
20    task::JoinHandle,
21    time::sleep,
22};
23
24use crate::{
25    activitytypes::{ActivityPayload, IPCActivityCmd, TimestampPayload},
26    socket::DiscordIPCSocket,
27};
28
29/*
30 *
31 * Helper funcs
32 *
33 */
34
35fn get_current_timestamp() -> Result<u64> {
36    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
37}
38
39fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
40    let mut bytes = Vec::with_capacity(8);
41    bytes.extend_from_slice(&opcode.to_le_bytes());
42    bytes.extend_from_slice(&data_len.to_le_bytes());
43    bytes
44}
45
46/*
47 *
48 * Frame/cmd structs
49 *
50 */
51
52#[derive(Debug)]
53enum IPCCommand {
54    SetActivity {
55        details: String,
56        state: Option<String>,
57    },
58    ClearActivity,
59    Close,
60}
61
62#[derive(Debug, Deserialize)]
63struct RpcFrame {
64    cmd: Option<String>,
65    evt: Option<String>,
66    data: Option<serde_json::Value>,
67}
68
69/*
70 *
71 * Async implementation
72 *
73 */
74
75/// Primary struct for you to set and update Discord Rich Presences with.
76pub struct DiscordIPC {
77    tx: Sender<IPCCommand>,
78    client_id: String,
79    running: Arc<AtomicBool>,
80    handle: Option<JoinHandle<Result<()>>>,
81    on_ready: Option<Box<dyn Fn() + Send + 'static>>,
82}
83
84impl DiscordIPC {
85    /// Creates a new Discord IPC client instance.
86    pub fn new(client_id: &str) -> Self {
87        let (tx, _rx) = mpsc::channel(32);
88
89        Self {
90            tx,
91            client_id: client_id.to_string(),
92            running: Arc::new(AtomicBool::new(false)),
93            handle: None,
94            on_ready: None,
95        }
96    }
97
98    /// Run a particular closure after receiving the READY event from the local Discord IPC server.
99    pub fn on_ready<F: Fn() + Send + 'static>(mut self, f: F) -> Self {
100        self.on_ready = Some(Box::new(f));
101        self
102    }
103
104    /// The Discord client ID that has been used to initialize this IPC client instance.
105    pub fn client_id(&self) -> String {
106        self.client_id.clone()
107    }
108
109    /// Run the client.
110    /// Must be called before any [`DiscordIPC::set_activity`] calls.
111    pub async fn run(&mut self, wait_for_ready: bool) -> Result<()> {
112        if self.running.swap(true, Ordering::SeqCst) {
113            bail!(
114                "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
115            )
116        }
117
118        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
119        self.tx = tx;
120        let client_id = self.client_id.clone();
121        let running = self.running.clone();
122
123        // oneshot channel to signal when READY is received the first time
124        let (ready_tx, ready_rx) = oneshot::channel::<()>();
125
126        let on_ready = self.on_ready.take();
127
128        let handle = tokio::spawn(async move {
129            let mut backoff = 1;
130            let mut last_activity: Option<(String, Option<String>)> = None;
131            let mut ready_tx = Some(ready_tx);
132
133            'outer: while running.load(Ordering::SeqCst) {
134                // initial connect
135                let mut socket = match DiscordIPCSocket::new().await {
136                    Ok(s) => s,
137                    Err(_) => {
138                        sleep(Duration::from_secs(backoff)).await;
139                        continue;
140                    }
141                };
142
143                // initial handshake
144                if socket.do_handshake(&client_id).await.is_err() {
145                    sleep(Duration::from_secs(backoff)).await;
146                    continue;
147                }
148
149                // wait for READY (blocks until first READY or socket fails)
150                loop {
151                    let frame = match socket.read_frame().await {
152                        Ok(f) => f,
153                        Err(_) => continue 'outer,
154                    };
155
156                    if frame.opcode != 1 {
157                        continue;
158                    }
159
160                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
161                        if json.cmd.as_deref() == Some("DISPATCH")
162                            && json.evt.as_deref() == Some("READY")
163                        {
164                            if let Some(tx) = ready_tx.take() {
165                                let _ = tx.send(()); // moves the sender here exactly once
166                            }
167                            if let Some(f) = &on_ready {
168                                f();
169                            }
170                            break;
171                        }
172                        if json.evt.as_deref() == Some("ERROR") {
173                            eprintln!("Discord RPC error: {:?}", json.data);
174                        }
175                    }
176                }
177
178                let timestamp = get_current_timestamp()?;
179
180                // reset activity if previous instance failed and this instance is basically reconnecting
181                if let Some((details, state)) = &last_activity {
182                    let _ = socket
183                        .send_activity(details.clone(), state.clone(), timestamp)
184                        .await;
185                }
186
187                backoff = 1;
188
189                loop {
190                    tokio::select! {
191                        Some(cmd) = rx.recv() => {
192                            match cmd {
193                                IPCCommand::SetActivity { details, state } => {
194                                    last_activity = Some((details.clone(), state.clone()));
195
196                                    if socket.send_activity(details, state, timestamp).await.is_err() {
197                                        break;
198                                    }
199                                },
200                                IPCCommand::ClearActivity => {
201                                    last_activity = None;
202
203                                    if socket.clear_activity().await.is_err() { break; }
204                                },
205                                IPCCommand::Close => {
206                                    let _ = socket.close().await;
207                                    running.store(false, Ordering::SeqCst);
208                                    break 'outer;
209                                }
210                            }
211                        }
212
213                        frame = socket.read_frame() => {
214                            match frame {
215                                Ok(frame) => match frame.opcode {
216                                    1 => {
217                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
218                                            if json.evt.as_deref() == Some("ERROR") {
219                                                eprintln!("Discord RPC error: {:?}", json.data);
220                                            }
221                                        }
222                                    }
223                                    2 => break,
224                                    3 => {
225                                        let packed = pack(3, frame.body.len() as u32);
226                                        if socket.write(&packed).await.is_err() { break; }
227                                        if socket.write(&frame.body).await.is_err() { break; }
228                                    }
229                                    _ => {}
230                                },
231                                Err(_) => break,
232                            }
233                        }
234                    }
235                }
236
237                sleep(Duration::from_secs(backoff)).await;
238                backoff = (backoff * 2).min(4);
239            }
240
241            Ok(())
242        });
243
244        self.handle = Some(handle);
245
246        if wait_for_ready {
247            match ready_rx.await {
248                Ok(()) => Ok(()),
249                Err(_) => bail!("Background task exited before READY."),
250            }
251        } else {
252            Ok(())
253        }
254    }
255
256    /// Waits for the IPC task to finish.
257    pub async fn wait(&mut self) -> Result<()> {
258        if let Some(handle) = self.handle.take() {
259            handle.await??;
260        }
261
262        Ok(())
263    }
264
265    /// Checks whether the task is running through the internal atomic indicator flag.
266    pub fn is_running(&self) -> bool {
267        self.running.load(Ordering::SeqCst)
268    }
269
270    /// Sets/updates the Discord Rich presence activity.
271    /// [`DiscordIPC::run`] must be executed prior to calling this.
272    pub async fn set_activity(&self, details: String, state: Option<String>) -> Result<()> {
273        if !self.is_running() {
274            bail!("Call .run() before .set_activity() execution.");
275        }
276
277        self.tx
278            .send(IPCCommand::SetActivity { details, state })
279            .await?;
280        Ok(())
281    }
282
283    /// Clears a previously set Discord Rich Presence activity.
284    pub async fn clear_activity(&self) -> Result<()> {
285        if self.is_running() {
286            self.tx.send(IPCCommand::ClearActivity).await?;
287        }
288
289        Ok(())
290    }
291
292    /// Closes the current connection if any.
293    pub async fn close(&mut self) -> Result<()> {
294        if self.is_running() {
295            let _ = self.tx.send(IPCCommand::Close).await;
296            if let Some(handle) = self.handle.take() {
297                if let Err(e) = handle.await {
298                    eprintln!("DiscordIPC background task failed on close: {e:?}");
299                }
300            }
301        }
302
303        Ok(())
304    }
305}
306
307/*
308 *
309 * Extension of DiscordIPCSocket
310 * (for convenience)
311 *
312 */
313
314impl DiscordIPCSocket {
315    async fn send_activity(
316        &mut self,
317        details: String,
318        state: Option<String>,
319        timestamp: u64,
320    ) -> Result<()> {
321        let cmd = IPCActivityCmd::new_with(Some(ActivityPayload {
322            details,
323            state,
324            timestamps: TimestampPayload { start: timestamp },
325        }));
326
327        self.send_cmd(cmd).await?;
328        Ok(())
329    }
330
331    async fn clear_activity(&mut self) -> Result<()> {
332        let cmd = IPCActivityCmd::new_with(None);
333        self.send_cmd(cmd).await?;
334        Ok(())
335    }
336
337    async fn do_handshake(&mut self, client_id: &str) -> Result<()> {
338        let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
339        let packed = pack(0, handshake.len() as u32);
340
341        self.write(&packed).await?;
342        self.write(handshake.as_bytes()).await?;
343
344        Ok(())
345    }
346
347    async fn send_cmd(&mut self, cmd: IPCActivityCmd) -> Result<()> {
348        let cmd = cmd.to_json()?;
349
350        let packed = pack(1, cmd.len() as u32);
351        self.write(&packed).await?;
352        self.write(cmd.as_bytes()).await?;
353        Ok(())
354    }
355}
356
357/*
358 *
359 * Blocking implementation
360 *
361 */
362
363/// Blocking implementation of [`DiscordIPC`].
364pub struct DiscordIPCSync {
365    inner: DiscordIPC,
366    rt: Runtime,
367}
368
369impl DiscordIPCSync {
370    /// Creates a new Discord IPC client instance.
371    pub fn new(client_id: &str) -> Result<Self> {
372        let rt = Builder::new_multi_thread().enable_all().build()?;
373        let inner = DiscordIPC::new(client_id);
374
375        Ok(Self { inner, rt })
376    }
377
378    /// Run a particular closure after receiving the READY event from the local Discord IPC server.
379    pub fn on_ready<F: Fn() + Send + 'static>(mut self, f: F) -> Self {
380        self.inner.on_ready = Some(Box::new(f));
381        self
382    }
383
384    /// The Discord client ID that has been used to initialize this IPC client instance.
385    pub fn client_id(&self) -> String {
386        self.inner.client_id()
387    }
388
389    /// Run the client.
390    /// Must be called before any [`DiscordIPCSync::set_activity`] calls.
391    pub fn run(&mut self, wait_for_ready: bool) -> Result<()> {
392        self.rt.block_on(self.inner.run(wait_for_ready))
393    }
394
395    /// Waits for the IPC task to finish.
396    /// [`DiscordIPCSync::run`] must be called to spawn it in the first place.
397    pub fn wait(&mut self) -> Result<()> {
398        self.rt.block_on(self.inner.wait())
399    }
400
401    /// Checks whether the task is running through the internal atomic indicator flag.
402    pub fn is_running(&self) -> bool {
403        self.inner.is_running()
404    }
405
406    /// Sets/updates the Discord Rich presence activity.
407    /// [`DiscordIPCSync::run`] must be executed prior to calling this.
408    pub fn set_activity(&self, details: String, state: Option<String>) -> Result<()> {
409        self.rt.block_on(self.inner.set_activity(details, state))
410    }
411
412    /// Clears a previously set Discord Rich Presence activity.
413    pub fn clear_activity(&self) -> Result<()> {
414        self.rt.block_on(self.inner.clear_activity())
415    }
416
417    /// Closes the current connection if any.
418    pub fn close(&mut self) -> Result<()> {
419        self.rt.block_on(self.inner.close())
420    }
421}
422
423impl Drop for DiscordIPCSync {
424    fn drop(&mut self) {
425        let _ = self.rt.block_on(self.inner.close());
426    }
427}