1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::{
17 mpsc::{self, Sender},
18 oneshot,
19 },
20 task::JoinHandle,
21 time::sleep,
22};
23use uuid::Uuid;
24
25use crate::socket::DiscordIPCSocket;
26
27fn get_current_timestamp() -> Result<u64> {
34 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
35}
36
37fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
38 let mut bytes = Vec::with_capacity(8);
39 bytes.extend_from_slice(&opcode.to_le_bytes());
40 bytes.extend_from_slice(&data_len.to_le_bytes());
41 bytes
42}
43
44#[derive(Debug)]
51enum IPCCommand {
52 SetActivity { details: String, state: String },
53 ClearActivity,
54 Close,
55}
56
57#[derive(Debug, Deserialize)]
58struct RpcFrame {
59 cmd: Option<String>,
60 evt: Option<String>,
61 data: Option<serde_json::Value>,
62}
63
64#[derive(Debug)]
72pub struct DiscordIPC {
73 tx: Sender<IPCCommand>,
74 client_id: String,
75 running: Arc<AtomicBool>,
76 handle: Option<JoinHandle<Result<()>>>,
77}
78
79impl DiscordIPC {
80 pub fn new(client_id: &str) -> Self {
81 let (tx, _rx) = mpsc::channel(32);
82
83 Self {
84 tx,
85 client_id: client_id.to_string(),
86 running: Arc::new(AtomicBool::new(false)),
87 handle: None,
88 }
89 }
90
91 pub fn client_id(&self) -> String {
93 self.client_id.clone()
94 }
95
96 pub async fn run(&mut self) -> Result<()> {
100 if self.running.swap(true, Ordering::SeqCst) {
101 bail!(
102 "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
103 )
104 }
105
106 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
107 self.tx = tx;
108 let client_id = self.client_id.clone();
109 let running = self.running.clone();
110
111 let (ready_tx, ready_rx) = oneshot::channel::<()>();
113
114 let handle = tokio::spawn(async move {
115 let mut backoff = 1;
116 let mut last_activity: Option<(String, String)> = None;
117 let mut ready_tx = Some(ready_tx);
118
119 'outer: while running.load(Ordering::SeqCst) {
120 let mut socket = match DiscordIPCSocket::new().await {
122 Ok(s) => s,
123 Err(_) => {
124 let _ = ready_tx.take(); sleep(Duration::from_secs(backoff)).await;
126 continue;
127 }
128 };
129
130 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
132 let packed = pack(0, handshake.len() as u32);
133
134 if socket.write(&packed).await.is_err()
135 || socket.write(handshake.as_bytes()).await.is_err()
136 {
137 let _ = ready_tx.take(); sleep(Duration::from_secs(backoff)).await;
139 continue;
140 }
141
142 loop {
144 let frame = match socket.read_frame().await {
145 Ok(f) => f,
146 Err(_) => continue 'outer,
147 };
148
149 if frame.opcode != 1 {
150 continue;
151 }
152
153 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
154 if json.cmd.as_deref() == Some("DISPATCH")
155 && json.evt.as_deref() == Some("READY")
156 {
157 if let Some(tx) = ready_tx.take() {
158 let _ = tx.send(()); }
160 break;
161 }
162 if json.evt.as_deref() == Some("ERROR") {
163 eprintln!("Discord RPC error: {:?}", json.data);
164 }
165 }
166 }
167
168 let timestamp = get_current_timestamp()?;
169
170 if let Some((details, state)) = &last_activity {
172 let _ = send_activity(&mut socket, details, state, timestamp).await;
173 }
174
175 backoff = 1;
176
177 loop {
178 tokio::select! {
179 Some(cmd) = rx.recv() => {
180 match cmd {
181 IPCCommand::SetActivity { details, state } => {
182 last_activity = Some((details.clone(), state.clone()));
183 if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
184 break;
185 }
186 },
187 IPCCommand::ClearActivity => {
188 last_activity = None;
189 if clear_activity(&mut socket).await.is_err() { break; }
190 },
191 IPCCommand::Close => {
192 let json = b"{}";
193 let packed = pack(2, json.len() as u32);
194 let _ = socket.write(&packed).await;
195 let _ = socket.close().await;
196 running.store(false, Ordering::SeqCst);
197 break 'outer;
198 }
199 }
200 }
201
202 frame = socket.read_frame() => {
203 match frame {
204 Ok(frame) => match frame.opcode {
205 1 => {
206 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
207 if json.evt.as_deref() == Some("ERROR") {
208 eprintln!("Discord RPC error: {:?}", json.data);
209 }
210 }
211 }
212 2 => break,
213 3 => {
214 let packed = pack(3, frame.body.len() as u32);
215 if socket.write(&packed).await.is_err() { break; }
216 if socket.write(&frame.body).await.is_err() { break; }
217 }
218 _ => {}
219 },
220 Err(_) => break,
221 }
222 }
223 }
224 }
225
226 sleep(Duration::from_secs(backoff)).await;
227 backoff = (backoff * 2).min(4);
228 }
229
230 Ok(())
231 });
232
233 self.handle = Some(handle);
234
235 match ready_rx.await {
237 Ok(()) => Ok(()),
238 Err(_) => bail!("Background task exited before READY."),
239 }
240 }
241
242 pub async fn wait(&mut self) -> Result<()> {
244 if let Some(handle) = self.handle.take() {
245 handle.await??;
246 }
247
248 Ok(())
249 }
250
251 pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
254 if !self.running.load(Ordering::SeqCst) {
255 bail!("Call .run() before .set_activity() execution.");
256 }
257
258 self.tx
259 .send(IPCCommand::SetActivity {
260 details: details.to_string(),
261 state: state.to_string(),
262 })
263 .await?;
264 Ok(())
265 }
266
267 pub async fn clear_activity(&self) -> Result<()> {
269 if !self.running.load(Ordering::SeqCst) {
270 return Ok(());
271 }
272
273 self.tx.send(IPCCommand::ClearActivity).await?;
274 Ok(())
275 }
276
277 const CLOSE_COOLDOWN_MILLIS: u64 = 200;
279
280 pub async fn close(&mut self) -> Result<()> {
282 if self.running.swap(false, Ordering::SeqCst) {
283 let _ = self.tx.send(IPCCommand::Close).await;
284 if let Some(handle) = self.handle.take() {
285 if let Err(e) = handle.await {
286 eprintln!("DiscordIPC background task failed on close: {e:?}");
287 }
288 }
289 }
290
291 tokio::time::sleep(Duration::from_millis(Self::CLOSE_COOLDOWN_MILLIS)).await;
292 Ok(())
293 }
294}
295
296async fn send_activity(
297 socket: &mut DiscordIPCSocket,
298 details: &str,
299 state: &str,
300 timestamp: u64,
301) -> Result<()> {
302 let json = json!({
303 "cmd": "SET_ACTIVITY",
304 "args": {
305 "pid": std::process::id(),
306 "activity": {
307 "details": details,
308 "state": state,
309 "timestamps": { "start": timestamp }
310 }
311 },
312 "nonce": Uuid::new_v4().to_string()
313 })
314 .to_string();
315
316 let packed = pack(1, json.len() as u32);
317 socket.write(&packed).await?;
318 socket.write(json.as_bytes()).await?;
319 Ok(())
320}
321
322async fn clear_activity(socket: &mut DiscordIPCSocket) -> Result<()> {
323 let json = json!({
324 "cmd": "SET_ACTIVITY",
325 "args": {
326 "pid": std::process::id(),
327 "activity": null
328 },
329 "nonce": Uuid::new_v4().to_string()
330 })
331 .to_string();
332
333 let packed = pack(1, json.len() as u32);
334 socket.write(&packed).await?;
335 socket.write(json.as_bytes()).await?;
336 Ok(())
337}
338
339#[derive(Debug)]
346pub struct DiscordIPCSync {
347 inner: DiscordIPC,
348 rt: Runtime,
349}
350
351impl DiscordIPCSync {
352 pub fn new(client_id: &str) -> Result<Self> {
353 let rt = Builder::new_multi_thread().enable_all().build()?;
354 let inner = DiscordIPC::new(client_id);
355
356 Ok(Self { inner, rt })
357 }
358
359 pub fn client_id(&self) -> String {
360 self.inner.client_id()
361 }
362
363 pub fn run(&mut self) -> Result<()> {
364 self.rt.block_on(self.inner.run())
365 }
366
367 pub fn wait(&mut self) -> Result<()> {
368 self.rt.block_on(self.inner.wait())
369 }
370
371 pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
372 self.rt.block_on(self.inner.set_activity(details, state))
373 }
374
375 pub fn close(&mut self) -> Result<()> {
376 self.rt.block_on(self.inner.close())
377 }
378}
379
380impl Drop for DiscordIPCSync {
381 fn drop(&mut self) {
382 let _ = self.rt.block_on(self.inner.close());
383 }
384}