Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Context, Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15    runtime::{Builder, Runtime},
16    sync::mpsc::{self, Sender},
17    task::JoinHandle,
18    time::sleep,
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24/*
25 *
26 * Helper funcs
27 *
28 */
29
30fn get_current_timestamp() -> Result<u64> {
31    Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35    let mut bytes = Vec::with_capacity(8);
36    bytes.extend_from_slice(&opcode.to_le_bytes());
37    bytes.extend_from_slice(&data_len.to_le_bytes());
38    bytes
39}
40
41/*
42 *
43 * General functionality
44 *
45 */
46
47#[derive(Debug)]
48enum IPCCommand {
49    SetActivity { details: String, state: String },
50}
51
52#[derive(Debug, Deserialize)]
53struct RpcFrame {
54    cmd: Option<String>,
55    evt: Option<String>,
56    data: Option<serde_json::Value>,
57}
58
59/// Primary struct for you to set and update Discord Rich Presences with.
60#[derive(Debug, Clone)]
61pub struct DiscordIPC {
62    tx: Sender<IPCCommand>,
63    client_id: String,
64    start_timestamp: u64,
65    running: Arc<AtomicBool>,
66}
67
68impl DiscordIPC {
69    pub async fn new(client_id: &str) -> Result<Self> {
70        let (tx, _rx) = mpsc::channel(32);
71
72        Ok(Self {
73            tx,
74            client_id: client_id.to_string(),
75            start_timestamp: get_current_timestamp()?,
76            running: Arc::new(AtomicBool::new(false)),
77        })
78    }
79
80    /// The Discord client ID that has been used to initialize this IPC client instance.
81    pub fn client_id(&self) -> String {
82        self.client_id.clone()
83    }
84
85    /// Duration since the IPC client was fired up.
86    pub fn duration_since(&self) -> Result<Duration> {
87        let then = UNIX_EPOCH + Duration::from_secs(self.start_timestamp);
88        let now = SystemTime::now();
89
90        let duration_since = now
91            .duration_since(then)
92            .context("Time went backwards for the IPC client.")?;
93
94        Ok(duration_since)
95    }
96
97    /// Run the client.
98    /// NOTE: Must be called before any .set_activity() calls.
99    pub async fn run(&mut self) -> Result<JoinHandle<Result<()>>> {
100        if self.running.swap(true, Ordering::SeqCst) {
101            bail!("Cannot run multiple instances of .run() for DiscordIPC.")
102        }
103
104        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
105        self.tx = tx;
106        let client_id = self.client_id.clone();
107        let timestamp = self.start_timestamp;
108
109        let handle = tokio::spawn(async move {
110            let mut backoff = 1;
111            let mut last_activity: Option<(String, String)> = None;
112
113            loop {
114                // initial connect
115                let mut socket = match DiscordIPCSocket::new().await {
116                    Ok(s) => s,
117                    Err(_) => {
118                        sleep(Duration::from_secs(backoff)).await;
119                        continue;
120                    }
121                };
122
123                // initial handshake
124                let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
125                let packed = pack(0, handshake.len() as u32);
126
127                if socket.write(&packed).await.is_err()
128                    || socket.write(handshake.as_bytes()).await.is_err()
129                {
130                    sleep(Duration::from_secs(backoff)).await;
131                    continue;
132                }
133
134                // wait for ready
135                loop {
136                    let frame = match socket.read_frame().await {
137                        Ok(f) => f,
138                        Err(_) => break,
139                    };
140
141                    if frame.opcode != 1 {
142                        continue;
143                    }
144
145                    if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
146                        if json.cmd.as_deref() == Some("DISPATCH")
147                            && json.evt.as_deref() == Some("READY")
148                        {
149                            break;
150                        }
151                        if json.evt.as_deref() == Some("ERROR") {
152                            eprintln!("Discord RPC error: {:?}", json.data);
153                        }
154                    }
155                }
156
157                if let Some((details, state)) = &last_activity {
158                    let _ = send_activity(&mut socket, details, state, timestamp).await;
159                }
160
161                backoff = 1;
162
163                loop {
164                    tokio::select! {
165                        Some(cmd) = rx.recv() => {
166                            match cmd {
167                                IPCCommand::SetActivity { details, state } => {
168                                    last_activity = Some((details.clone(), state.clone()));
169
170                                    if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
171                                        break;
172                                    }
173                                }
174                            }
175                        }
176
177                        frame = socket.read_frame() => {
178                            match frame {
179                                Ok(frame) => match frame.opcode {
180                                    1 => {
181                                        if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
182                                            if json.evt.as_deref() == Some("ERROR") {
183                                                eprintln!("Discord RPC error: {:?}", json.data);
184                                            }
185                                        }
186                                    }
187                                    2 => break,
188                                    3 => {
189                                        let packed = pack(3, frame.body.len() as u32);
190                                        socket.write(&packed).await?;
191                                        socket.write(&frame.body).await?;
192                                    }
193                                    _ => {}
194                                },
195                                Err(_) => break,
196                            }
197                        }
198                    }
199                }
200
201                sleep(Duration::from_secs(backoff)).await;
202                backoff = (backoff * 2).min(4);
203            }
204        });
205
206        Ok(handle)
207    }
208
209    /// Sets/updates the Discord Rich presence activity.
210    /// NOTE: .run() must be executed prior to calling this.
211    pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
212        self.tx
213            .send(IPCCommand::SetActivity {
214                details: details.to_string(),
215                state: state.to_string(),
216            })
217            .await?;
218        Ok(())
219    }
220}
221
222async fn send_activity(
223    socket: &mut DiscordIPCSocket,
224    details: &str,
225    state: &str,
226    timestamp: u64,
227) -> Result<()> {
228    let json = json!({
229        "cmd": "SET_ACTIVITY",
230        "args": {
231            "pid": std::process::id(),
232            "activity": {
233                "details": details,
234                "state": state,
235                "timestamps": { "start": timestamp }
236            }
237        },
238        "nonce": Uuid::new_v4().to_string()
239    })
240    .to_string();
241
242    let packed = pack(1, json.len() as u32);
243    socket.write(&packed).await?;
244    socket.write(json.as_bytes()).await?;
245    Ok(())
246}
247
248/// Blocking wrapper
249#[derive(Debug)]
250pub struct DiscordIPCSync {
251    inner: DiscordIPC,
252    rt: Runtime,
253    ipc_task: Option<JoinHandle<Result<()>>>,
254}
255
256impl DiscordIPCSync {
257    pub fn new(client_id: &str) -> Result<Self> {
258        let rt = Builder::new_multi_thread().enable_all().build()?;
259        let inner = rt.block_on(DiscordIPC::new(client_id))?;
260        Ok(Self {
261            inner,
262            rt,
263            ipc_task: None,
264        })
265    }
266
267    pub fn client_id(&self) -> String {
268        self.inner.client_id()
269    }
270
271    pub fn duration_since(&self) -> Result<Duration> {
272        self.inner.duration_since()
273    }
274
275    pub fn run(&mut self) -> Result<()> {
276        let handle = self.rt.block_on(self.inner.run())?;
277        self.ipc_task = Some(handle);
278        Ok(())
279    }
280
281    pub fn wait(&mut self) -> Result<()> {
282        if let Some(handle) = self.ipc_task.take() {
283            self.rt.block_on(handle)??;
284        }
285        Ok(())
286    }
287
288    pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
289        self.rt.block_on(self.inner.set_activity(details, state))
290    }
291}