Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15    runtime::{Builder, Runtime},
16    sync::{
17        mpsc::{self, Sender},
18        oneshot,
19    },
20    task::JoinHandle,
21    time::sleep,
22};
23use uuid::Uuid;
24
25use crate::socket::DiscordIPCSocket;
26
27/*
28 *
29 * Helper funcs
30 *
31 */
32
33fn get_current_timestamp() -> Result<u64> {
34    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
35}
36
37fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
38    let mut bytes = Vec::with_capacity(8);
39    bytes.extend_from_slice(&opcode.to_le_bytes());
40    bytes.extend_from_slice(&data_len.to_le_bytes());
41    bytes
42}
43
44/*
45 *
46 * Frame/cmd structs
47 *
48 */
49
50#[derive(Debug)]
51enum IPCCommand {
52    SetActivity { details: String, state: String },
53    ClearActivity,
54    Close,
55}
56
57#[derive(Debug, Deserialize)]
58struct RpcFrame {
59    cmd: Option<String>,
60    evt: Option<String>,
61    data: Option<serde_json::Value>,
62}
63
64/*
65 *
66 * Async implementation
67 *
68 */
69
70/// Primary struct for you to set and update Discord Rich Presences with.
71#[derive(Debug)]
72pub struct DiscordIPC {
73    tx: Sender<IPCCommand>,
74    client_id: String,
75    running: Arc<AtomicBool>,
76    handle: Option<JoinHandle<Result<()>>>,
77}
78
79impl DiscordIPC {
80    pub fn new(client_id: &str) -> Self {
81        let (tx, _rx) = mpsc::channel(32);
82
83        Self {
84            tx,
85            client_id: client_id.to_string(),
86            running: Arc::new(AtomicBool::new(false)),
87            handle: None,
88        }
89    }
90
91    /// The Discord client ID that has been used to initialize this IPC client instance.
92    pub fn client_id(&self) -> String {
93        self.client_id.clone()
94    }
95
96    /// Run the client.
97    /// Returns a `JoinHandle<anyhow::Result<()>>` for management.
98    /// NOTE: Must be called before any .set_activity() calls.
99    pub async fn run(&mut self) -> Result<()> {
100        if self.running.swap(true, Ordering::SeqCst) {
101            bail!(
102                "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
103            )
104        }
105
106        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
107        self.tx = tx;
108        let client_id = self.client_id.clone();
109        let running = self.running.clone();
110
111        // oneshot channel to signal when READY is received the first time
112        let (ready_tx, ready_rx) = oneshot::channel::<()>();
113
114        let handle = tokio::spawn(async move {
115            let mut backoff = 1;
116            let mut last_activity: Option<(String, String)> = None;
117            let mut ready_tx = Some(ready_tx);
118
119            'outer: while running.load(Ordering::SeqCst) {
120                // initial connect
121                let mut socket = match DiscordIPCSocket::new().await {
122                    Ok(s) => s,
123                    Err(_) => {
124                        let _ = ready_tx.take(); // signal failure to run()
125                        sleep(Duration::from_secs(backoff)).await;
126                        continue;
127                    }
128                };
129
130                // initial handshake
131                let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
132                let packed = pack(0, handshake.len() as u32);
133
134                if socket.write(&packed).await.is_err()
135                    || socket.write(handshake.as_bytes()).await.is_err()
136                {
137                    let _ = ready_tx.take(); // drop sender so ready_rx fails
138                    sleep(Duration::from_secs(backoff)).await;
139                    continue;
140                }
141
142                // wait for READY (blocks until first READY or socket fails)
143                loop {
144                    let frame = match socket.read_frame().await {
145                        Ok(f) => f,
146                        Err(_) => continue 'outer,
147                    };
148
149                    if frame.opcode != 1 {
150                        continue;
151                    }
152
153                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
154                        if json.cmd.as_deref() == Some("DISPATCH")
155                            && json.evt.as_deref() == Some("READY")
156                        {
157                            if let Some(tx) = ready_tx.take() {
158                                let _ = tx.send(()); // moves the sender here exactly once
159                            }
160                            break;
161                        }
162                        if json.evt.as_deref() == Some("ERROR") {
163                            eprintln!("Discord RPC error: {:?}", json.data);
164                        }
165                    }
166                }
167
168                let timestamp = get_current_timestamp()?;
169
170                // reset activity if previous instance failed and this instance is basically reconnecting
171                if let Some((details, state)) = &last_activity {
172                    let _ = send_activity(&mut socket, details, state, timestamp).await;
173                }
174
175                backoff = 1;
176
177                loop {
178                    tokio::select! {
179                        Some(cmd) = rx.recv() => {
180                            match cmd {
181                                IPCCommand::SetActivity { details, state } => {
182                                    last_activity = Some((details.clone(), state.clone()));
183                                    if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
184                                        break;
185                                    }
186                                },
187                                IPCCommand::ClearActivity => {
188                                    last_activity = None;
189                                    if clear_activity(&mut socket).await.is_err() { break; }
190                                },
191                                IPCCommand::Close => {
192                                    let json = b"{}";
193                                    let packed = pack(2, json.len() as u32);
194                                    let _ = socket.write(&packed).await;
195                                    let _ = socket.close().await;
196                                    running.store(false, Ordering::SeqCst);
197                                    break 'outer;
198                                }
199                            }
200                        }
201
202                        frame = socket.read_frame() => {
203                            match frame {
204                                Ok(frame) => match frame.opcode {
205                                    1 => {
206                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
207                                            if json.evt.as_deref() == Some("ERROR") {
208                                                eprintln!("Discord RPC error: {:?}", json.data);
209                                            }
210                                        }
211                                    }
212                                    2 => break,
213                                    3 => {
214                                        let packed = pack(3, frame.body.len() as u32);
215                                        if socket.write(&packed).await.is_err() { break; }
216                                        if socket.write(&frame.body).await.is_err() { break; }
217                                    }
218                                    _ => {}
219                                },
220                                Err(_) => break,
221                            }
222                        }
223                    }
224                }
225
226                sleep(Duration::from_secs(backoff)).await;
227                backoff = (backoff * 2).min(4);
228            }
229
230            Ok(())
231        });
232
233        self.handle = Some(handle);
234
235        // block run() until first READY
236        match ready_rx.await {
237            Ok(()) => Ok(()),
238            Err(_) => bail!("Background task exited before READY."),
239        }
240    }
241
242    /// Waits for the IPC task to finish.
243    pub async fn wait(&mut self) -> Result<()> {
244        if let Some(handle) = self.handle.take() {
245            handle.await??;
246        }
247
248        Ok(())
249    }
250
251    /// Sets/updates the Discord Rich presence activity.
252    /// NOTE: .run() must be executed prior to calling this.
253    pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
254        if !self.running.load(Ordering::SeqCst) {
255            bail!("Call .run() before .set_activity() execution.");
256        }
257
258        self.tx
259            .send(IPCCommand::SetActivity {
260                details: details.to_string(),
261                state: state.to_string(),
262            })
263            .await?;
264        Ok(())
265    }
266
267    /// Clears a previously set Discord Rich Presence activity.
268    pub async fn clear_activity(&self) -> Result<()> {
269        if !self.running.load(Ordering::SeqCst) {
270            return Ok(());
271        }
272
273        self.tx.send(IPCCommand::ClearActivity).await?;
274        Ok(())
275    }
276
277    /// Safe limit for closing connection.
278    const CLOSE_COOLDOWN_MILLIS: u64 = 200;
279
280    /// Closes the current session of Rich Presence activity.
281    pub async fn close(&mut self) -> Result<()> {
282        if self.running.swap(false, Ordering::SeqCst) {
283            let _ = self.tx.send(IPCCommand::Close).await;
284            if let Some(handle) = self.handle.take() {
285                if let Err(e) = handle.await {
286                    eprintln!("DiscordIPC background task failed on close: {e:?}");
287                }
288            }
289        }
290
291        tokio::time::sleep(Duration::from_millis(Self::CLOSE_COOLDOWN_MILLIS)).await;
292        Ok(())
293    }
294}
295
296async fn send_activity(
297    socket: &mut DiscordIPCSocket,
298    details: &str,
299    state: &str,
300    timestamp: u64,
301) -> Result<()> {
302    let json = json!({
303        "cmd": "SET_ACTIVITY",
304        "args": {
305            "pid": std::process::id(),
306            "activity": {
307                "details": details,
308                "state": state,
309                "timestamps": { "start": timestamp }
310            }
311        },
312        "nonce": Uuid::new_v4().to_string()
313    })
314    .to_string();
315
316    let packed = pack(1, json.len() as u32);
317    socket.write(&packed).await?;
318    socket.write(json.as_bytes()).await?;
319    Ok(())
320}
321
322async fn clear_activity(socket: &mut DiscordIPCSocket) -> Result<()> {
323    let json = json!({
324        "cmd": "SET_ACTIVITY",
325        "args": {
326            "pid": std::process::id(),
327            "activity": null
328        },
329        "nonce": Uuid::new_v4().to_string()
330    })
331    .to_string();
332
333    let packed = pack(1, json.len() as u32);
334    socket.write(&packed).await?;
335    socket.write(json.as_bytes()).await?;
336    Ok(())
337}
338
339/*
340 *
341 * Blocking implementation
342 *
343 */
344
345#[derive(Debug)]
346pub struct DiscordIPCSync {
347    inner: DiscordIPC,
348    rt: Runtime,
349}
350
351impl DiscordIPCSync {
352    pub fn new(client_id: &str) -> Result<Self> {
353        let rt = Builder::new_multi_thread().enable_all().build()?;
354        let inner = DiscordIPC::new(client_id);
355
356        Ok(Self { inner, rt })
357    }
358
359    pub fn client_id(&self) -> String {
360        self.inner.client_id()
361    }
362
363    pub fn run(&mut self) -> Result<()> {
364        self.rt.block_on(self.inner.run())
365    }
366
367    pub fn wait(&mut self) -> Result<()> {
368        self.rt.block_on(self.inner.wait())
369    }
370
371    pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
372        self.rt.block_on(self.inner.set_activity(details, state))
373    }
374
375    pub fn close(&mut self) -> Result<()> {
376        self.rt.block_on(self.inner.close())
377    }
378}
379
380impl Drop for DiscordIPCSync {
381    fn drop(&mut self) {
382        let _ = self.rt.block_on(self.inner.close());
383    }
384}