Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15    runtime::{Builder, Runtime},
16    sync::mpsc::{self, Sender},
17    task::JoinHandle,
18    time::sleep,
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24/*
25 *
26 * Helper funcs
27 *
28 */
29
30fn get_current_timestamp() -> Result<u64> {
31    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35    let mut bytes = Vec::with_capacity(8);
36    bytes.extend_from_slice(&opcode.to_le_bytes());
37    bytes.extend_from_slice(&data_len.to_le_bytes());
38    bytes
39}
40
41/*
42 *
43 * General functionality
44 *
45 */
46
47#[derive(Debug)]
48enum IPCCommand {
49    SetActivity { details: String, state: String },
50    Close,
51}
52
53#[derive(Debug, Deserialize)]
54struct RpcFrame {
55    cmd: Option<String>,
56    evt: Option<String>,
57    data: Option<serde_json::Value>,
58}
59
60/// Primary struct for you to set and update Discord Rich Presences with.
61#[derive(Debug, Clone)]
62pub struct DiscordIPC {
63    tx: Sender<IPCCommand>,
64    client_id: String,
65    running: Arc<AtomicBool>,
66}
67
68impl DiscordIPC {
69    pub async fn new(client_id: &str) -> Result<Self> {
70        let (tx, _rx) = mpsc::channel(32);
71
72        Ok(Self {
73            tx,
74            client_id: client_id.to_string(),
75            running: Arc::new(AtomicBool::new(false)),
76        })
77    }
78
79    /// The Discord client ID that has been used to initialize this IPC client instance.
80    pub fn client_id(&self) -> String {
81        self.client_id.clone()
82    }
83
84    /// Run the client.
85    /// Returns a `JoinHandle<anyhow::Result<()>>` for management.
86    /// NOTE: Must be called before any .set_activity() calls.
87    pub async fn run(&mut self) -> Result<JoinHandle<Result<()>>> {
88        if self.running.swap(true, Ordering::SeqCst) {
89            bail!(
90                "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
91            )
92        }
93
94        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
95        self.tx = tx;
96        let client_id = self.client_id.clone();
97
98        let running = self.running.clone();
99
100        let handle = tokio::spawn(async move {
101            let mut backoff = 1;
102            let mut last_activity: Option<(String, String)> = None;
103            let timestamp = get_current_timestamp()?;
104
105            while running.load(Ordering::SeqCst) {
106                // initial connect
107                let mut socket = match DiscordIPCSocket::new().await {
108                    Ok(s) => s,
109                    Err(_) => {
110                        sleep(Duration::from_secs(backoff)).await;
111                        continue;
112                    }
113                };
114
115                // initial handshake
116                let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
117                let packed = pack(0, handshake.len() as u32);
118
119                if socket.write(&packed).await.is_err()
120                    || socket.write(handshake.as_bytes()).await.is_err()
121                {
122                    sleep(Duration::from_secs(backoff)).await;
123                    continue;
124                }
125
126                // wait for ready
127                loop {
128                    let frame = match socket.read_frame().await {
129                        Ok(f) => f,
130                        Err(_) => break,
131                    };
132
133                    if frame.opcode != 1 {
134                        continue;
135                    }
136
137                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
138                        if json.cmd.as_deref() == Some("DISPATCH")
139                            && json.evt.as_deref() == Some("READY")
140                        {
141                            break;
142                        }
143                        if json.evt.as_deref() == Some("ERROR") {
144                            eprintln!("Discord RPC error: {:?}", json.data);
145                        }
146                    }
147                }
148
149                // reset activity if previous instance failed and this instance is basically reconnecting
150                if let Some((details, state)) = &last_activity {
151                    let _ = send_activity(&mut socket, details, state, timestamp).await;
152                }
153
154                backoff = 1;
155
156                loop {
157                    tokio::select! {
158                        Some(cmd) = rx.recv() => {
159                            match cmd {
160                                IPCCommand::SetActivity { details, state } => {
161                                    last_activity = Some((details.clone(), state.clone()));
162
163                                    if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
164                                        break;
165                                    }
166                                },
167                                IPCCommand::Close => {
168                                    socket.close().await?;
169                                    running.store(false, Ordering::SeqCst);
170                                    return Ok(());
171                                }
172                            }
173                        }
174
175                        frame = socket.read_frame() => {
176                            match frame {
177                                Ok(frame) => match frame.opcode {
178                                    1 => {
179                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
180                                            if json.evt.as_deref() == Some("ERROR") {
181                                                eprintln!("Discord RPC error: {:?}", json.data);
182                                            }
183                                        }
184                                    }
185                                    2 => break,
186                                    3 => {
187                                        let packed = pack(3, frame.body.len() as u32);
188                                        socket.write(&packed).await?;
189                                        socket.write(&frame.body).await?;
190                                    }
191                                    _ => {}
192                                },
193                                Err(_) => break,
194                            }
195                        }
196                    }
197                }
198
199                sleep(Duration::from_secs(backoff)).await;
200                backoff = (backoff * 2).min(4);
201            }
202
203            Ok(())
204        });
205
206        Ok(handle)
207    }
208
209    /// Sets/updates the Discord Rich presence activity.
210    /// NOTE: .run() must be executed prior to calling this.
211    pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
212        if !self.running.load(Ordering::SeqCst) {
213            bail!("Call .run() before .set_activity() execution.");
214        }
215
216        self.tx
217            .send(IPCCommand::SetActivity {
218                details: details.to_string(),
219                state: state.to_string(),
220            })
221            .await?;
222        Ok(())
223    }
224
225    /// Closes the current session of Rich Presence activity.
226    pub async fn close(&self) -> Result<()> {
227        self.tx.send(IPCCommand::Close).await?;
228        Ok(())
229    }
230}
231
232async fn send_activity(
233    socket: &mut DiscordIPCSocket,
234    details: &str,
235    state: &str,
236    timestamp: u64,
237) -> Result<()> {
238    let json = json!({
239        "cmd": "SET_ACTIVITY",
240        "args": {
241            "pid": std::process::id(),
242            "activity": {
243                "details": details,
244                "state": state,
245                "timestamps": { "start": timestamp }
246            }
247        },
248        "nonce": Uuid::new_v4().to_string()
249    })
250    .to_string();
251
252    let packed = pack(1, json.len() as u32);
253    socket.write(&packed).await?;
254    socket.write(json.as_bytes()).await?;
255    Ok(())
256}
257
258/// Blocking wrapper
259#[derive(Debug)]
260pub struct DiscordIPCSync {
261    inner: DiscordIPC,
262    rt: Runtime,
263    ipc_task: Option<JoinHandle<Result<()>>>,
264}
265
266impl DiscordIPCSync {
267    pub fn new(client_id: &str) -> Result<Self> {
268        let rt = Builder::new_multi_thread().enable_all().build()?;
269        let inner = rt.block_on(DiscordIPC::new(client_id))?;
270        Ok(Self {
271            inner,
272            rt,
273            ipc_task: None,
274        })
275    }
276
277    pub fn client_id(&self) -> String {
278        self.inner.client_id()
279    }
280
281    pub fn run(&mut self) -> Result<()> {
282        let handle = self.rt.block_on(self.inner.run())?;
283        self.ipc_task = Some(handle);
284        Ok(())
285    }
286
287    pub fn wait(&mut self) -> Result<()> {
288        if let Some(handle) = self.ipc_task.take() {
289            self.rt.block_on(handle)??;
290        }
291        Ok(())
292    }
293
294    pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
295        self.rt.block_on(self.inner.set_activity(details, state))
296    }
297
298    pub fn close(&self) -> Result<()> {
299        self.rt.block_on(self.inner.close())
300    }
301}
302
303impl Drop for DiscordIPCSync {
304    fn drop(&mut self) {
305        if let Some(handle) = self.ipc_task.take() {
306            let _ = self.rt.block_on(self.inner.close());
307            let _ = self.rt.block_on(handle);
308        }
309    }
310}