1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::{
17 mpsc::{self, Sender},
18 oneshot,
19 },
20 task::JoinHandle,
21 time::sleep,
22};
23
24use crate::{
25 activitytypes::{ActivityPayload, IPCActivityCmd, TimestampPayload},
26 socket::DiscordIPCSocket,
27};
28
29fn get_current_timestamp() -> Result<u64> {
36 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
37}
38
39fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
40 let mut bytes = Vec::with_capacity(8);
41 bytes.extend_from_slice(&opcode.to_le_bytes());
42 bytes.extend_from_slice(&data_len.to_le_bytes());
43 bytes
44}
45
46#[derive(Debug)]
53enum IPCCommand {
54 SetActivity {
55 details: String,
56 state: Option<String>,
57 },
58 ClearActivity,
59 Close,
60}
61
62#[derive(Debug, Deserialize)]
63struct RpcFrame {
64 cmd: Option<String>,
65 evt: Option<String>,
66 data: Option<serde_json::Value>,
67}
68
69pub struct DiscordIPC {
77 tx: Sender<IPCCommand>,
78 client_id: String,
79 running: Arc<AtomicBool>,
80 handle: Option<JoinHandle<Result<()>>>,
81 on_ready: Option<Box<dyn Fn() + Send + 'static>>,
82}
83
84impl DiscordIPC {
85 pub fn new(client_id: &str) -> Self {
87 let (tx, _rx) = mpsc::channel(32);
88
89 Self {
90 tx,
91 client_id: client_id.to_string(),
92 running: Arc::new(AtomicBool::new(false)),
93 handle: None,
94 on_ready: None,
95 }
96 }
97
98 pub fn on_ready<F: Fn() + Send + 'static>(mut self, f: F) -> Self {
100 self.on_ready = Some(Box::new(f));
101 self
102 }
103
104 pub fn client_id(&self) -> String {
106 self.client_id.clone()
107 }
108
109 pub async fn run(&mut self, wait_for_ready: bool) -> Result<()> {
112 if self.running.swap(true, Ordering::SeqCst) {
113 bail!(
114 "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
115 )
116 }
117
118 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
119 self.tx = tx;
120 let client_id = self.client_id.clone();
121 let running = self.running.clone();
122
123 let (ready_tx, ready_rx) = oneshot::channel::<()>();
125
126 let on_ready = self.on_ready.take();
127
128 let handle = tokio::spawn(async move {
129 let mut backoff = 1;
130 let mut last_activity: Option<(String, Option<String>)> = None;
131 let mut ready_tx = Some(ready_tx);
132
133 'outer: while running.load(Ordering::SeqCst) {
134 let mut socket = match DiscordIPCSocket::new().await {
136 Ok(s) => s,
137 Err(_) => {
138 sleep(Duration::from_secs(backoff)).await;
139 continue;
140 }
141 };
142
143 if socket.do_handshake(&client_id).await.is_err() {
145 sleep(Duration::from_secs(backoff)).await;
146 continue;
147 }
148
149 loop {
151 let frame = match socket.read_frame().await {
152 Ok(f) => f,
153 Err(_) => continue 'outer,
154 };
155
156 if frame.opcode != 1 {
157 continue;
158 }
159
160 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
161 if json.cmd.as_deref() == Some("DISPATCH")
162 && json.evt.as_deref() == Some("READY")
163 {
164 if let Some(tx) = ready_tx.take() {
165 let _ = tx.send(()); }
167 if let Some(f) = &on_ready {
168 f();
169 }
170 break;
171 }
172 if json.evt.as_deref() == Some("ERROR") {
173 eprintln!("Discord RPC error: {:?}", json.data);
174 }
175 }
176 }
177
178 let timestamp = get_current_timestamp()?;
179
180 if let Some((details, state)) = &last_activity {
182 let _ = socket
183 .send_activity(details.clone(), state.clone(), timestamp)
184 .await;
185 }
186
187 backoff = 1;
188
189 loop {
190 tokio::select! {
191 Some(cmd) = rx.recv() => {
192 match cmd {
193 IPCCommand::SetActivity { details, state } => {
194 last_activity = Some((details.clone(), state.clone()));
195
196 if socket.send_activity(details, state, timestamp).await.is_err() {
197 break;
198 }
199 },
200 IPCCommand::ClearActivity => {
201 last_activity = None;
202
203 if socket.clear_activity().await.is_err() { break; }
204 },
205 IPCCommand::Close => {
206 let _ = socket.close().await;
207 running.store(false, Ordering::SeqCst);
208 break 'outer;
209 }
210 }
211 }
212
213 frame = socket.read_frame() => {
214 match frame {
215 Ok(frame) => match frame.opcode {
216 1 => {
217 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
218 if json.evt.as_deref() == Some("ERROR") {
219 eprintln!("Discord RPC error: {:?}", json.data);
220 }
221 }
222 }
223 2 => break,
224 3 => {
225 let packed = pack(3, frame.body.len() as u32);
226 if socket.write(&packed).await.is_err() { break; }
227 if socket.write(&frame.body).await.is_err() { break; }
228 }
229 _ => {}
230 },
231 Err(_) => break,
232 }
233 }
234 }
235 }
236
237 sleep(Duration::from_secs(backoff)).await;
238 backoff = (backoff * 2).min(4);
239 }
240
241 Ok(())
242 });
243
244 self.handle = Some(handle);
245
246 if wait_for_ready {
247 match ready_rx.await {
248 Ok(()) => Ok(()),
249 Err(_) => bail!("Background task exited before READY."),
250 }
251 } else {
252 Ok(())
253 }
254 }
255
256 pub async fn wait(&mut self) -> Result<()> {
258 if let Some(handle) = self.handle.take() {
259 handle.await??;
260 }
261
262 Ok(())
263 }
264
265 pub fn is_running(&self) -> bool {
267 self.running.load(Ordering::SeqCst)
268 }
269
270 pub async fn set_activity(&self, details: String, state: Option<String>) -> Result<()> {
273 if !self.is_running() {
274 bail!("Call .run() before .set_activity() execution.");
275 }
276
277 self.tx
278 .send(IPCCommand::SetActivity { details, state })
279 .await?;
280 Ok(())
281 }
282
283 pub async fn clear_activity(&self) -> Result<()> {
285 if self.is_running() {
286 self.tx.send(IPCCommand::ClearActivity).await?;
287 }
288
289 Ok(())
290 }
291
292 pub async fn close(&mut self) -> Result<()> {
294 if self.is_running() {
295 let _ = self.tx.send(IPCCommand::Close).await;
296 if let Some(handle) = self.handle.take() {
297 if let Err(e) = handle.await {
298 eprintln!("DiscordIPC background task failed on close: {e:?}");
299 }
300 }
301 }
302
303 Ok(())
304 }
305}
306
307impl DiscordIPCSocket {
315 async fn send_activity(
316 &mut self,
317 details: String,
318 state: Option<String>,
319 timestamp: u64,
320 ) -> Result<()> {
321 let cmd = IPCActivityCmd::new_with(Some(ActivityPayload {
322 details,
323 state,
324 timestamps: TimestampPayload { start: timestamp },
325 }));
326
327 self.send_cmd(cmd).await?;
328 Ok(())
329 }
330
331 async fn clear_activity(&mut self) -> Result<()> {
332 let cmd = IPCActivityCmd::new_with(None);
333 self.send_cmd(cmd).await?;
334 Ok(())
335 }
336
337 async fn do_handshake(&mut self, client_id: &str) -> Result<()> {
338 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
339 let packed = pack(0, handshake.len() as u32);
340
341 self.write(&packed).await?;
342 self.write(handshake.as_bytes()).await?;
343
344 Ok(())
345 }
346
347 async fn send_cmd(&mut self, cmd: IPCActivityCmd) -> Result<()> {
348 let cmd = cmd.to_json()?;
349
350 let packed = pack(1, cmd.len() as u32);
351 self.write(&packed).await?;
352 self.write(cmd.as_bytes()).await?;
353 Ok(())
354 }
355}
356
357pub struct DiscordIPCSync {
365 inner: DiscordIPC,
366 rt: Runtime,
367}
368
369impl DiscordIPCSync {
370 pub fn new(client_id: &str) -> Result<Self> {
372 let rt = Builder::new_multi_thread().enable_all().build()?;
373 let inner = DiscordIPC::new(client_id);
374
375 Ok(Self { inner, rt })
376 }
377
378 pub fn on_ready<F: Fn() + Send + 'static>(mut self, f: F) -> Self {
380 self.inner.on_ready = Some(Box::new(f));
381 self
382 }
383
384 pub fn client_id(&self) -> String {
386 self.inner.client_id()
387 }
388
389 pub fn run(&mut self, wait_for_ready: bool) -> Result<()> {
392 self.rt.block_on(self.inner.run(wait_for_ready))
393 }
394
395 pub fn wait(&mut self) -> Result<()> {
398 self.rt.block_on(self.inner.wait())
399 }
400
401 pub fn is_running(&self) -> bool {
403 self.inner.is_running()
404 }
405
406 pub fn set_activity(&self, details: String, state: Option<String>) -> Result<()> {
409 self.rt.block_on(self.inner.set_activity(details, state))
410 }
411
412 pub fn clear_activity(&self) -> Result<()> {
414 self.rt.block_on(self.inner.clear_activity())
415 }
416
417 pub fn close(&mut self) -> Result<()> {
419 self.rt.block_on(self.inner.close())
420 }
421}
422
423impl Drop for DiscordIPCSync {
424 fn drop(&mut self) {
425 let _ = self.rt.block_on(self.inner.close());
426 }
427}