Skip to main content

filthy_rich/
ipc.rs

1// SPDX-License-Identifier: MIT
2
3use std::{
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use tokio::{
13    runtime::{Builder, Runtime},
14    sync::mpsc::{self, Sender},
15    task::JoinHandle,
16    time::sleep,
17};
18use uuid::Uuid;
19
20use crate::socket::DiscordIPCSocket;
21
22/// Commands sent to the IPC task.
23#[derive(Debug)]
24enum IPCCommand {
25    SetActivity { details: String, state: String },
26}
27
28/// Async Discord IPC client.
29#[derive(Debug, Clone)]
30pub struct DiscordIPC {
31    tx: Sender<IPCCommand>,
32    client_id: String,
33    start_timestamp: u64,
34    running: Arc<AtomicBool>,
35}
36
37fn get_current_timestamp() -> Result<u64> {
38    let ts = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
39    Ok(ts)
40}
41
42fn pack(opcode: u32, data_len: u32) -> Result<Vec<u8>> {
43    let mut bytes = Vec::new();
44
45    for byte_array in &[opcode.to_le_bytes(), data_len.to_le_bytes()] {
46        bytes.extend_from_slice(byte_array);
47    }
48
49    Ok(bytes)
50}
51
52impl DiscordIPC {
53    /// Create a new IPC instance (does NOT start connection).
54    /// To start a connection and run the client, use `.run()`.
55    pub async fn new(client_id: &str) -> Result<Self> {
56        let (tx, _rx) = mpsc::channel(32);
57
58        Ok(Self {
59            tx,
60            client_id: client_id.to_string(),
61            start_timestamp: get_current_timestamp()?,
62            running: Arc::new(AtomicBool::new(false)),
63        })
64    }
65
66    /// Connect, handshake, wait for READY and  start the IPC client.
67    pub async fn run(&mut self) -> Result<JoinHandle<Result<()>>> {
68        if self.running.swap(true, Ordering::SeqCst) {
69            bail!("Cannot run mulitple instances of .run() for DiscordIPC.")
70        }
71
72        let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
73        self.tx = tx;
74
75        let client_id = self.client_id.clone();
76        let timestamp = self.start_timestamp;
77
78        let handle = tokio::spawn(async move {
79            let mut backoff = 1;
80            let mut last_activity: Option<(String, String)> = None;
81
82            loop {
83                // initial connect
84                let mut socket = match DiscordIPCSocket::new().await {
85                    Ok(s) => s,
86                    Err(_) => {
87                        sleep(Duration::from_secs(backoff)).await;
88                        continue;
89                    }
90                };
91
92                // handshake
93                let handshake = format!(r#"{{"v":1,"client_id":"{}"}}"#, client_id);
94
95                let packed = pack(0, handshake.len() as u32)?;
96                if socket.write(&packed).await.is_err()
97                    || socket.write(handshake.as_bytes()).await.is_err()
98                {
99                    sleep(Duration::from_secs(backoff)).await;
100                    continue;
101                }
102
103                // wait for ready
104                loop {
105                    let frame = match socket.read_frame().await {
106                        Ok(f) => f,
107                        Err(_) => break,
108                    };
109
110                    if frame.opcode == 1 && frame.body.windows(5).any(|w| w == b"READY") {
111                        break;
112                    }
113                }
114
115                // replay activity after socket
116                if let Some((details, state)) = &last_activity {
117                    let _ = send_activity(&mut socket, details, state, timestamp).await;
118                }
119
120                backoff = 1;
121
122                // main loop
123                loop {
124                    tokio::select! {
125                        Some(cmd) = rx.recv() => {
126                            match cmd {
127                                IPCCommand::SetActivity { details, state } => {
128                                    last_activity = Some((details.clone(), state.clone()));
129
130                                    if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
131                                        break;
132                                    }
133                                }
134                            }
135                        }
136
137                        frame = socket.read_frame() => {
138                            match frame {
139                                Ok(frame) => {
140                                    match frame.opcode {
141                                        // PING
142                                        3 => {
143                                            let packed = pack(frame.opcode, frame.body.len() as u32)?;
144                                            socket.write(&packed).await?;
145                                            socket.write(&frame.body).await?;
146                                        }
147                                        // CLOSE
148                                        2 => break,
149                                        _ => {}
150                                    }
151                                }
152                                Err(_) => break,
153                            }
154                        }
155                    }
156                }
157
158                sleep(Duration::from_secs(backoff)).await;
159                backoff = (backoff * 2).min(4);
160            }
161        });
162
163        Ok(handle)
164    }
165
166    /// Sets the Discord Rich presence activity.
167    pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
168        self.tx
169            .send(IPCCommand::SetActivity {
170                details: details.to_string(),
171                state: state.to_string(),
172            })
173            .await?;
174
175        Ok(())
176    }
177}
178
179/// Helper to send activity JSON.
180async fn send_activity(
181    socket: &mut DiscordIPCSocket,
182    details: &str,
183    state: &str,
184    timestamp: u64,
185) -> Result<()> {
186    let pid = std::process::id();
187    let uuid = Uuid::new_v4();
188
189    let json = format!(
190        r#"{{
191    "cmd":"SET_ACTIVITY",
192    "args": {{
193        "pid": {},
194        "activity": {{
195            "details":"{}",
196            "state":"{}",
197            "timestamps": {{
198                "start": {}
199            }}
200        }}
201    }},
202    "nonce":"{}"
203}}"#,
204        pid, details, state, timestamp, uuid
205    );
206
207    let packed = pack(1, json.len() as u32)?;
208    socket.write(&packed).await?;
209    socket.write(json.as_bytes()).await?;
210
211    Ok(())
212}
213
214/// Blocking wrapper around DiscordIPC.
215#[derive(Debug)]
216pub struct DiscordIPCSync {
217    inner: DiscordIPC,
218    rt: Runtime,
219    ipc_task: Option<JoinHandle<Result<()>>>,
220}
221
222impl DiscordIPCSync {
223    pub fn new(client_id: &str) -> Result<Self> {
224        let rt = Builder::new_multi_thread().enable_all().build()?;
225        let inner = rt.block_on(DiscordIPC::new(client_id))?;
226
227        Ok(Self {
228            inner,
229            rt,
230            ipc_task: None,
231        })
232    }
233
234    pub fn run(&mut self) -> Result<()> {
235        if self.ipc_task.is_some() {
236            bail!("run() called multiple times");
237        }
238
239        let handle = self.rt.block_on(self.inner.run())?;
240        self.ipc_task = Some(handle);
241        Ok(())
242    }
243
244    pub fn wait(&mut self) -> Result<()> {
245        if let Some(handle) = self.ipc_task.take() {
246            self.rt.block_on(handle)??;
247        }
248        Ok(())
249    }
250
251    pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
252        self.rt.block_on(self.inner.set_activity(details, state))
253    }
254}