Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15    runtime::{Builder, Runtime},
16    sync::mpsc::{self, Sender},
17    task::JoinHandle,
18    time::sleep,
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24/*
25 *
26 * Helper funcs
27 *
28 */
29
30fn get_current_timestamp() -> Result<u64> {
31    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35    let mut bytes = Vec::with_capacity(8);
36    bytes.extend_from_slice(&opcode.to_le_bytes());
37    bytes.extend_from_slice(&data_len.to_le_bytes());
38    bytes
39}
40
41/*
42 *
43 * Frame/cmd structs
44 *
45 */
46
47#[derive(Debug)]
48enum IPCCommand {
49    SetActivity { details: String, state: String },
50    ClearActivity,
51    Close,
52}
53
54#[derive(Debug, Deserialize)]
55struct RpcFrame {
56    cmd: Option<String>,
57    evt: Option<String>,
58    data: Option<serde_json::Value>,
59}
60
61/*
62 *
63 * Async implementation
64 *
65 */
66
67/// Primary struct for you to set and update Discord Rich Presences with.
68#[derive(Debug)]
69pub struct DiscordIPC {
70    tx: Sender<IPCCommand>,
71    client_id: String,
72    running: Arc<AtomicBool>,
73    handle: Option<JoinHandle<Result<()>>>,
74}
75
76impl DiscordIPC {
77    pub fn new(client_id: &str) -> Self {
78        let (tx, _rx) = mpsc::channel(32);
79
80        Self {
81            tx,
82            client_id: client_id.to_string(),
83            running: Arc::new(AtomicBool::new(false)),
84            handle: None,
85        }
86    }
87
88    /// The Discord client ID that has been used to initialize this IPC client instance.
89    pub fn client_id(&self) -> String {
90        self.client_id.clone()
91    }
92
93    /// Run the client.
94    /// Returns a `JoinHandle<anyhow::Result<()>>` for management.
95    /// NOTE: Must be called before any .set_activity() calls.
96    pub async fn run(&mut self) -> Result<()> {
97        if self.running.swap(true, Ordering::SeqCst) {
98            bail!(
99                "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
100            )
101        }
102
103        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
104        self.tx = tx;
105        let client_id = self.client_id.clone();
106
107        let running = self.running.clone();
108
109        let handle = tokio::spawn(async move {
110            let mut backoff = 1;
111            let mut last_activity: Option<(String, String)> = None;
112            let timestamp = get_current_timestamp()?;
113
114            while running.load(Ordering::SeqCst) {
115                // initial connect
116                let mut socket = match DiscordIPCSocket::new().await {
117                    Ok(s) => s,
118                    Err(_) => {
119                        sleep(Duration::from_secs(backoff)).await;
120                        continue;
121                    }
122                };
123
124                // initial handshake
125                let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
126                let packed = pack(0, handshake.len() as u32);
127
128                if socket.write(&packed).await.is_err()
129                    || socket.write(handshake.as_bytes()).await.is_err()
130                {
131                    sleep(Duration::from_secs(backoff)).await;
132                    continue;
133                }
134
135                // wait for ready
136                loop {
137                    let frame = match socket.read_frame().await {
138                        Ok(f) => f,
139                        Err(_) => break,
140                    };
141
142                    if frame.opcode != 1 {
143                        continue;
144                    }
145
146                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
147                        if json.cmd.as_deref() == Some("DISPATCH")
148                            && json.evt.as_deref() == Some("READY")
149                        {
150                            break;
151                        }
152                        if json.evt.as_deref() == Some("ERROR") {
153                            eprintln!("Discord RPC error: {:?}", json.data);
154                        }
155                    }
156                }
157
158                // reset activity if previous instance failed and this instance is basically reconnecting
159                if let Some((details, state)) = &last_activity {
160                    let _ = send_activity(&mut socket, details, state, timestamp).await;
161                }
162
163                backoff = 1;
164
165                loop {
166                    tokio::select! {
167                        Some(cmd) = rx.recv() => {
168                            match cmd {
169                                IPCCommand::SetActivity { details, state } => {
170                                    last_activity = Some((details.clone(), state.clone()));
171
172                                    if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
173                                        break;
174                                    }
175                                },
176                                IPCCommand::ClearActivity => {
177                                    if clear_activity(&mut socket).await.is_err() {
178                                        break;
179                                    }
180                                },
181                                IPCCommand::Close => {
182                                    socket.close().await?;
183                                    running.store(false, Ordering::SeqCst);
184                                    return Ok(());
185                                }
186                            }
187                        }
188
189                        frame = socket.read_frame() => {
190                            match frame {
191                                Ok(frame) => match frame.opcode {
192                                    1 => {
193                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
194                                            if json.evt.as_deref() == Some("ERROR") {
195                                                eprintln!("Discord RPC error: {:?}", json.data);
196                                            }
197                                        }
198                                    }
199                                    2 => break,
200                                    3 => {
201                                        let packed = pack(3, frame.body.len() as u32);
202                                        socket.write(&packed).await?;
203                                        socket.write(&frame.body).await?;
204                                    }
205                                    _ => {}
206                                },
207                                Err(_) => break,
208                            }
209                        }
210                    }
211                }
212
213                sleep(Duration::from_secs(backoff)).await;
214                backoff = (backoff * 2).min(4);
215            }
216
217            Ok(())
218        });
219
220        self.handle = Some(handle);
221        Ok(())
222    }
223
224    /// Waits for the IPC task to finish.
225    pub async fn wait(&mut self) -> Result<()> {
226        if let Some(handle) = self.handle.take() {
227            handle.await??;
228        }
229
230        Ok(())
231    }
232
233    /// Sets/updates the Discord Rich presence activity.
234    /// NOTE: .run() must be executed prior to calling this.
235    pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
236        if !self.running.load(Ordering::SeqCst) {
237            bail!("Call .run() before .set_activity() execution.");
238        }
239
240        self.tx
241            .send(IPCCommand::SetActivity {
242                details: details.to_string(),
243                state: state.to_string(),
244            })
245            .await?;
246        Ok(())
247    }
248
249    /// Clears a previously set Discord Rich Presence activity.
250    pub async fn clear_activity(&self) -> Result<()> {
251        if !self.running.load(Ordering::SeqCst) {
252            return Ok(());
253        }
254
255        self.tx.send(IPCCommand::ClearActivity).await?;
256        Ok(())
257    }
258
259    /// Safe limit for closing connection.
260    const CLOSE_COOLDOWN_MILLIS: u64 = 200;
261
262    /// Closes the current session of Rich Presence activity.
263    pub async fn close(&mut self) -> Result<()> {
264        if self.running.swap(false, Ordering::SeqCst) {
265            let _ = self.tx.send(IPCCommand::Close).await;
266            if let Some(handle) = self.handle.take() {
267                if let Err(e) = handle.await {
268                    eprintln!("DiscordIPC background task failed on close: {e:?}");
269                }
270            }
271        }
272
273        tokio::time::sleep(Duration::from_millis(Self::CLOSE_COOLDOWN_MILLIS)).await;
274        Ok(())
275    }
276}
277
278async fn send_activity(
279    socket: &mut DiscordIPCSocket,
280    details: &str,
281    state: &str,
282    timestamp: u64,
283) -> Result<()> {
284    let json = json!({
285        "cmd": "SET_ACTIVITY",
286        "args": {
287            "pid": std::process::id(),
288            "activity": {
289                "details": details,
290                "state": state,
291                "timestamps": { "start": timestamp }
292            }
293        },
294        "nonce": Uuid::new_v4().to_string()
295    })
296    .to_string();
297
298    let packed = pack(1, json.len() as u32);
299    socket.write(&packed).await?;
300    socket.write(json.as_bytes()).await?;
301    Ok(())
302}
303
304async fn clear_activity(socket: &mut DiscordIPCSocket) -> Result<()> {
305    let json = json!({
306        "cmd": "SET_ACTIVITY",
307        "args": {
308            "pid": std::process::id(),
309            "activity": null
310        },
311        "nonce": Uuid::new_v4().to_string()
312    })
313    .to_string();
314
315    let packed = pack(1, json.len() as u32);
316    socket.write(&packed).await?;
317    socket.write(json.as_bytes()).await?;
318    Ok(())
319}
320
321/*
322 *
323 * Blocking implementation
324 *
325 */
326
327#[derive(Debug)]
328pub struct DiscordIPCSync {
329    inner: DiscordIPC,
330    rt: Runtime,
331}
332
333impl DiscordIPCSync {
334    pub fn new(client_id: &str) -> Result<Self> {
335        let rt = Builder::new_multi_thread().enable_all().build()?;
336        let inner = DiscordIPC::new(client_id);
337
338        Ok(Self { inner, rt })
339    }
340
341    pub fn client_id(&self) -> String {
342        self.inner.client_id()
343    }
344
345    pub fn run(&mut self) -> Result<()> {
346        self.rt.block_on(self.inner.run())
347    }
348
349    pub fn wait(&mut self) -> Result<()> {
350        self.rt.block_on(self.inner.wait())
351    }
352
353    pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
354        self.rt.block_on(self.inner.set_activity(details, state))
355    }
356
357    pub fn close(&mut self) -> Result<()> {
358        self.rt.block_on(self.inner.close())
359    }
360}
361
362impl Drop for DiscordIPCSync {
363    fn drop(&mut self) {
364        let _ = self.rt.block_on(self.inner.close());
365    }
366}