Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use anyhow::{Result, bail};
6use serde::Deserialize;
7use serde_json::json;
8use tokio::{
9    runtime::{Builder, Runtime},
10    sync::mpsc::{self, Sender},
11    task::JoinHandle,
12    time::sleep,
13};
14use uuid::Uuid;
15
16use crate::socket::DiscordIPCSocket;
17
18/*
19 *
20 * Helper funcs
21 *
22 */
23
24fn get_current_timestamp() -> Result<u64> {
25    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
26}
27
28fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
29    let mut bytes = Vec::with_capacity(8);
30    bytes.extend_from_slice(&opcode.to_le_bytes());
31    bytes.extend_from_slice(&data_len.to_le_bytes());
32    bytes
33}
34
35/*
36 *
37 * General functionality
38 *
39 */
40
41#[derive(Debug)]
42enum IPCCommand {
43    SetActivity { details: String, state: String },
44}
45
46#[derive(Debug, Deserialize)]
47struct RpcFrame {
48    cmd: Option<String>,
49    evt: Option<String>,
50    data: Option<serde_json::Value>,
51}
52
53/// Primary struct for you to set and update Discord Rich Presences with.
54#[derive(Debug)]
55pub struct DiscordIPC {
56    tx: Sender<IPCCommand>,
57    client_id: String,
58    start_timestamp: u64,
59    handle: Option<JoinHandle<Result<()>>>,
60}
61
62impl DiscordIPC {
63    pub async fn new(client_id: &str) -> Result<Self> {
64        let (tx, _rx) = mpsc::channel(32);
65
66        Ok(Self {
67            tx,
68            client_id: client_id.to_string(),
69            start_timestamp: get_current_timestamp()?,
70            handle: None,
71        })
72    }
73
74    /// The Discord client ID that has been used to initialize this IPC client instance.
75    pub fn client_id(&self) -> String {
76        self.client_id.clone()
77    }
78
79    /// Run the client.
80    /// NOTE: Must be called before any .set_activity() calls.
81    pub async fn run(&mut self) -> Result<()> {
82        if self.handle.is_some() {
83            bail!("Cannot run multiple instances of .run() for DiscordIPC.")
84        }
85
86        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
87        self.tx = tx;
88        let client_id = self.client_id.clone();
89        let timestamp = self.start_timestamp;
90
91        let handle = tokio::spawn(async move {
92            let mut backoff = 1;
93            let mut last_activity: Option<(String, String)> = None;
94
95            loop {
96                // initial connect
97                let mut socket = match DiscordIPCSocket::new().await {
98                    Ok(s) => s,
99                    Err(_) => {
100                        sleep(Duration::from_secs(backoff)).await;
101                        continue;
102                    }
103                };
104
105                // initial handshake
106                let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
107                let packed = pack(0, handshake.len() as u32);
108
109                if socket.write(&packed).await.is_err()
110                    || socket.write(handshake.as_bytes()).await.is_err()
111                {
112                    sleep(Duration::from_secs(backoff)).await;
113                    continue;
114                }
115
116                // wait for ready
117                loop {
118                    let frame = match socket.read_frame().await {
119                        Ok(f) => f,
120                        Err(_) => break,
121                    };
122
123                    if frame.opcode != 1 {
124                        continue;
125                    }
126
127                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
128                        if json.cmd.as_deref() == Some("DISPATCH")
129                            && json.evt.as_deref() == Some("READY")
130                        {
131                            break;
132                        }
133                        if json.evt.as_deref() == Some("ERROR") {
134                            eprintln!("Discord RPC error: {:?}", json.data);
135                        }
136                    }
137                }
138
139                if let Some((details, state)) = &last_activity {
140                    let _ = send_activity(&mut socket, details, state, timestamp).await;
141                }
142
143                backoff = 1;
144
145                loop {
146                    tokio::select! {
147                        Some(cmd) = rx.recv() => {
148                            match cmd {
149                                IPCCommand::SetActivity { details, state } => {
150                                    last_activity = Some((details.clone(), state.clone()));
151
152                                    if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
153                                        break;
154                                    }
155                                }
156                            }
157                        }
158
159                        frame = socket.read_frame() => {
160                            match frame {
161                                Ok(frame) => match frame.opcode {
162                                    1 => {
163                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
164                                            if json.evt.as_deref() == Some("ERROR") {
165                                                eprintln!("Discord RPC error: {:?}", json.data);
166                                            }
167                                        }
168                                    }
169                                    2 => break,
170                                    3 => {
171                                        let packed = pack(3, frame.body.len() as u32);
172                                        socket.write(&packed).await?;
173                                        socket.write(&frame.body).await?;
174                                    }
175                                    _ => {}
176                                },
177                                Err(_) => break,
178                            }
179                        }
180                    }
181                }
182
183                sleep(Duration::from_secs(backoff)).await;
184                backoff = (backoff * 2).min(4);
185            }
186        });
187
188        self.handle = Some(handle);
189        Ok(())
190    }
191
192    /// Waits for existing IPC loop handle to finish; helps run it forever.
193    pub async fn wait(&mut self) -> Result<()> {
194        if let Some(handle) = self.handle.take() {
195            handle.await??;
196        }
197        Ok(())
198    }
199
200    /// Sets/updates the Discord Rich presence activity.
201    /// NOTE: .run() must be executed prior to calling this.
202    pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
203        self.tx
204            .send(IPCCommand::SetActivity {
205                details: details.to_string(),
206                state: state.to_string(),
207            })
208            .await?;
209        Ok(())
210    }
211}
212
213async fn send_activity(
214    socket: &mut DiscordIPCSocket,
215    details: &str,
216    state: &str,
217    timestamp: u64,
218) -> Result<()> {
219    let json = json!({
220        "cmd": "SET_ACTIVITY",
221        "args": {
222            "pid": std::process::id(),
223            "activity": {
224                "details": details,
225                "state": state,
226                "timestamps": { "start": timestamp }
227            }
228        },
229        "nonce": Uuid::new_v4().to_string()
230    })
231    .to_string();
232
233    let packed = pack(1, json.len() as u32);
234    socket.write(&packed).await?;
235    socket.write(json.as_bytes()).await?;
236    Ok(())
237}
238
239/// Blocking wrapper
240#[derive(Debug)]
241pub struct DiscordIPCSync {
242    inner: DiscordIPC,
243    rt: Runtime,
244}
245
246impl DiscordIPCSync {
247    pub fn new(client_id: &str) -> Result<Self> {
248        let rt = Builder::new_multi_thread().enable_all().build()?;
249        let inner = rt.block_on(DiscordIPC::new(client_id))?;
250        Ok(Self { inner, rt })
251    }
252
253    pub fn client_id(&self) -> String {
254        self.inner.client_id()
255    }
256
257    pub fn run(&mut self) -> Result<()> {
258        self.rt.block_on(self.inner.run())?;
259        Ok(())
260    }
261
262    pub fn wait(&mut self) -> Result<()> {
263        self.rt.block_on(async { self.inner.wait().await })?;
264        Ok(())
265    }
266
267    pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
268        self.rt.block_on(self.inner.set_activity(details, state))
269    }
270}