1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::{
17 mpsc::{self, Sender},
18 oneshot,
19 },
20 task::JoinHandle,
21 time::sleep,
22};
23use uuid::Uuid;
24
25use crate::socket::DiscordIPCSocket;
26
27fn get_current_timestamp() -> Result<u64> {
34 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
35}
36
37fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
38 let mut bytes = Vec::with_capacity(8);
39 bytes.extend_from_slice(&opcode.to_le_bytes());
40 bytes.extend_from_slice(&data_len.to_le_bytes());
41 bytes
42}
43
44#[derive(Debug)]
51enum IPCCommand {
52 SetActivity { details: String, state: String },
53 ClearActivity,
54 Close,
55}
56
57#[derive(Debug, Deserialize)]
58struct RpcFrame {
59 cmd: Option<String>,
60 evt: Option<String>,
61 data: Option<serde_json::Value>,
62}
63
64#[derive(Debug)]
72pub struct DiscordIPC {
73 tx: Sender<IPCCommand>,
74 client_id: String,
75 running: Arc<AtomicBool>,
76 handle: Option<JoinHandle<Result<()>>>,
77}
78
79impl DiscordIPC {
80 pub fn new(client_id: &str) -> Self {
82 let (tx, _rx) = mpsc::channel(32);
83
84 Self {
85 tx,
86 client_id: client_id.to_string(),
87 running: Arc::new(AtomicBool::new(false)),
88 handle: None,
89 }
90 }
91
92 pub fn client_id(&self) -> String {
94 self.client_id.clone()
95 }
96
97 pub async fn run(&mut self) -> Result<()> {
100 if self.running.swap(true, Ordering::SeqCst) {
101 bail!(
102 "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
103 )
104 }
105
106 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
107 self.tx = tx;
108 let client_id = self.client_id.clone();
109 let running = self.running.clone();
110
111 let (ready_tx, ready_rx) = oneshot::channel::<()>();
113
114 let handle = tokio::spawn(async move {
115 let mut backoff = 1;
116 let mut last_activity: Option<(String, String)> = None;
117 let mut ready_tx = Some(ready_tx);
118
119 'outer: while running.load(Ordering::SeqCst) {
120 let mut socket = match DiscordIPCSocket::new().await {
122 Ok(s) => s,
123 Err(_) => {
124 let _ = ready_tx.take(); sleep(Duration::from_secs(backoff)).await;
126 continue;
127 }
128 };
129
130 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
132 let packed = pack(0, handshake.len() as u32);
133
134 if socket.write(&packed).await.is_err()
135 || socket.write(handshake.as_bytes()).await.is_err()
136 {
137 let _ = ready_tx.take(); sleep(Duration::from_secs(backoff)).await;
139 continue;
140 }
141
142 loop {
144 let frame = match socket.read_frame().await {
145 Ok(f) => f,
146 Err(_) => continue 'outer,
147 };
148
149 if frame.opcode != 1 {
150 continue;
151 }
152
153 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
154 if json.cmd.as_deref() == Some("DISPATCH")
155 && json.evt.as_deref() == Some("READY")
156 {
157 if let Some(tx) = ready_tx.take() {
158 let _ = tx.send(()); }
160 break;
161 }
162 if json.evt.as_deref() == Some("ERROR") {
163 eprintln!("Discord RPC error: {:?}", json.data);
164 }
165 }
166 }
167
168 let timestamp = get_current_timestamp()?;
169
170 if let Some((details, state)) = &last_activity {
172 let _ = send_activity(&mut socket, details, state, timestamp).await;
173 }
174
175 backoff = 1;
176
177 loop {
178 tokio::select! {
179 Some(cmd) = rx.recv() => {
180 match cmd {
181 IPCCommand::SetActivity { details, state } => {
182 last_activity = Some((details.clone(), state.clone()));
183 if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
184 break;
185 }
186 },
187 IPCCommand::ClearActivity => {
188 last_activity = None;
189 if clear_activity(&mut socket).await.is_err() { break; }
190 },
191 IPCCommand::Close => {
192 let json = b"{}";
193 let packed = pack(2, json.len() as u32);
194 let _ = socket.write(&packed).await;
195 let _ = socket.close().await;
196 running.store(false, Ordering::SeqCst);
197 break 'outer;
198 }
199 }
200 }
201
202 frame = socket.read_frame() => {
203 match frame {
204 Ok(frame) => match frame.opcode {
205 1 => {
206 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
207 if json.evt.as_deref() == Some("ERROR") {
208 eprintln!("Discord RPC error: {:?}", json.data);
209 }
210 }
211 }
212 2 => break,
213 3 => {
214 let packed = pack(3, frame.body.len() as u32);
215 if socket.write(&packed).await.is_err() { break; }
216 if socket.write(&frame.body).await.is_err() { break; }
217 }
218 _ => {}
219 },
220 Err(_) => break,
221 }
222 }
223 }
224 }
225
226 sleep(Duration::from_secs(backoff)).await;
227 backoff = (backoff * 2).min(4);
228 }
229
230 Ok(())
231 });
232
233 self.handle = Some(handle);
234
235 match ready_rx.await {
237 Ok(()) => Ok(()),
238 Err(_) => bail!("Background task exited before READY."),
239 }
240 }
241
242 pub async fn wait(&mut self) -> Result<()> {
244 if let Some(handle) = self.handle.take() {
245 handle.await??;
246 }
247
248 Ok(())
249 }
250
251 pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
254 if !self.running.load(Ordering::SeqCst) {
255 bail!("Call .run() before .set_activity() execution.");
256 }
257
258 self.tx
259 .send(IPCCommand::SetActivity {
260 details: details.to_string(),
261 state: state.to_string(),
262 })
263 .await?;
264 Ok(())
265 }
266
267 pub async fn clear_activity(&self) -> Result<()> {
269 if !self.running.load(Ordering::SeqCst) {
270 return Ok(());
271 }
272
273 self.tx.send(IPCCommand::ClearActivity).await?;
274 Ok(())
275 }
276
277 pub async fn close(&mut self) -> Result<()> {
279 if self.running.load(Ordering::SeqCst) {
280 let _ = self.tx.send(IPCCommand::Close).await;
281 if let Some(handle) = self.handle.take() {
282 if let Err(e) = handle.await {
283 eprintln!("DiscordIPC background task failed on close: {e:?}");
284 }
285 }
286 }
287
288 Ok(())
289 }
290}
291
292async fn send_activity(
293 socket: &mut DiscordIPCSocket,
294 details: &str,
295 state: &str,
296 timestamp: u64,
297) -> Result<()> {
298 let json = json!({
299 "cmd": "SET_ACTIVITY",
300 "args": {
301 "pid": std::process::id(),
302 "activity": {
303 "details": details,
304 "state": state,
305 "timestamps": { "start": timestamp }
306 }
307 },
308 "nonce": Uuid::new_v4().to_string()
309 })
310 .to_string();
311
312 let packed = pack(1, json.len() as u32);
313 socket.write(&packed).await?;
314 socket.write(json.as_bytes()).await?;
315 Ok(())
316}
317
318async fn clear_activity(socket: &mut DiscordIPCSocket) -> Result<()> {
319 let json = json!({
320 "cmd": "SET_ACTIVITY",
321 "args": {
322 "pid": std::process::id(),
323 "activity": null
324 },
325 "nonce": Uuid::new_v4().to_string()
326 })
327 .to_string();
328
329 let packed = pack(1, json.len() as u32);
330 socket.write(&packed).await?;
331 socket.write(json.as_bytes()).await?;
332 Ok(())
333}
334
335#[derive(Debug)]
342pub struct DiscordIPCSync {
343 inner: DiscordIPC,
344 rt: Runtime,
345}
346
347impl DiscordIPCSync {
348 pub fn new(client_id: &str) -> Result<Self> {
350 let rt = Builder::new_multi_thread().enable_all().build()?;
351 let inner = DiscordIPC::new(client_id);
352
353 Ok(Self { inner, rt })
354 }
355
356 pub fn client_id(&self) -> String {
358 self.inner.client_id()
359 }
360 pub fn run(&mut self) -> Result<()> {
363 self.rt.block_on(self.inner.run())
364 }
365
366 pub fn wait(&mut self) -> Result<()> {
369 self.rt.block_on(self.inner.wait())
370 }
371
372 pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
375 self.rt.block_on(self.inner.set_activity(details, state))
376 }
377
378 pub fn clear_activity(&self) -> Result<()> {
380 self.rt.block_on(self.inner.clear_activity())
381 }
382
383 pub fn close(&mut self) -> Result<()> {
385 self.rt.block_on(self.inner.close())
386 }
387}
388
389impl Drop for DiscordIPCSync {
390 fn drop(&mut self) {
391 let _ = self.rt.block_on(self.inner.close());
392 }
393}