1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::{
17 mpsc::{self, Sender},
18 oneshot,
19 },
20 task::JoinHandle,
21 time::sleep,
22};
23
24use crate::{
25 socket::DiscordIPCSocket,
26 types::{ActivityPayload, IPCActivityCmd, TimestampPayload},
27};
28
29fn get_current_timestamp() -> Result<u64> {
36 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
37}
38
39#[derive(Debug)]
46enum IPCCommand {
47 SetActivity { activity: Activity },
48 ClearActivity,
49 Close,
50}
51
52#[derive(Debug, Deserialize)]
53struct RpcFrame {
54 cmd: Option<String>,
55 evt: Option<String>,
56 data: Option<ReadyData>,
57}
58
59#[derive(Debug, Clone, Deserialize)]
61pub struct ReadyData {
62 pub user: DiscordUser,
63}
64
65#[derive(Debug, Clone, Deserialize)]
67pub struct DiscordUser {
68 pub id: String,
69 pub username: String,
70 pub global_name: Option<String>,
71 pub discriminator: Option<String>,
72 pub avatar: Option<String>,
73 pub avatar_decoration_data: Option<serde_json::Value>,
74 pub bot: bool,
75 pub flags: Option<u64>,
76 pub premium_type: Option<u64>,
77}
78
79#[derive(Debug, Clone)]
81pub struct Activity {
82 details: Option<String>,
83 state: Option<String>,
84 duration: Option<Duration>,
85}
86
87pub struct ActivityBuilder;
88
89pub struct ActivityWithDetails {
93 details: String,
94 state: Option<String>,
95 duration: Option<Duration>,
96}
97
98impl Activity {
99 pub fn new() -> ActivityBuilder {
100 ActivityBuilder
101 }
102
103 pub fn build_empty() -> Self {
105 Self {
106 details: None,
107 state: None,
108 duration: None,
109 }
110 }
111}
112
113impl ActivityBuilder {
114 pub fn details(self, details: impl Into<String>) -> ActivityWithDetails {
116 ActivityWithDetails {
117 details: details.into(),
118 state: None,
119 duration: None,
120 }
121 }
122}
123
124impl ActivityWithDetails {
125 pub fn state(mut self, state: impl Into<String>) -> Self {
127 self.state = Some(state.into());
128 self
129 }
130
131 pub fn duration(mut self, duration: Duration) -> Self {
133 self.duration = Some(duration);
134 self
135 }
136
137 pub fn build(self) -> Activity {
140 Activity {
141 details: Some(self.details),
142 state: self.state,
143 duration: self.duration,
144 }
145 }
146}
147
148pub struct DiscordIPC {
156 tx: Sender<IPCCommand>,
157 client_id: String,
158 running: Arc<AtomicBool>,
159 handle: Option<JoinHandle<Result<()>>>,
160 on_ready: Option<Box<dyn Fn(ReadyData) + Send + Sync + 'static>>,
161}
162
163impl DiscordIPC {
164 pub fn new(client_id: &str) -> Self {
166 let (tx, _rx) = mpsc::channel(32);
167
168 Self {
169 tx,
170 client_id: client_id.to_string(),
171 running: Arc::new(AtomicBool::new(false)),
172 handle: None,
173 on_ready: None,
174 }
175 }
176
177 pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
179 self.on_ready = Some(Box::new(f));
180 self
181 }
182
183 pub fn client_id(&self) -> String {
185 self.client_id.clone()
186 }
187
188 pub async fn run(&mut self, wait_for_ready: bool) -> Result<()> {
191 if self.running.swap(true, Ordering::SeqCst) {
192 bail!(
193 "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
194 )
195 }
196
197 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
198 self.tx = tx;
199 let client_id = self.client_id.clone();
200 let running = self.running.clone();
201
202 let (ready_tx, ready_rx) = oneshot::channel::<()>();
204
205 let on_ready = self.on_ready.take();
206
207 let handle = tokio::spawn(async move {
208 let mut backoff = 1;
209 let mut last_activity: Option<Activity> = None;
210 let mut ready_tx = Some(ready_tx);
211
212 'outer: while running.load(Ordering::SeqCst) {
213 let mut socket = match DiscordIPCSocket::new().await {
215 Ok(s) => s,
216 Err(_) => {
217 sleep(Duration::from_secs(backoff)).await;
218 continue;
219 }
220 };
221
222 if socket.do_handshake(&client_id).await.is_err() {
224 sleep(Duration::from_secs(backoff)).await;
225 continue;
226 }
227
228 loop {
230 let frame = match socket.read_frame().await {
231 Ok(f) => f,
232 Err(_) => continue 'outer,
233 };
234
235 if frame.opcode != 1 {
236 continue;
237 }
238
239 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
240 if json.cmd.as_deref() == Some("DISPATCH")
241 && json.evt.as_deref() == Some("READY")
242 {
243 if let Some(tx) = ready_tx.take() {
244 let _ = tx.send(());
245 }
246 if let (Some(f), Some(data)) = (&on_ready, json.data) {
247 f(data);
248 }
249 break;
250 }
251
252 if json.evt.as_deref() == Some("ERROR") {
253 eprintln!("Discord RPC error: {:?}", json.data);
254 }
255 }
256 }
257
258 let session_start = get_current_timestamp()?;
260
261 if let Some(activity) = &last_activity {
263 let _ = socket.send_activity(activity.clone(), session_start).await;
264 }
265
266 backoff = 1;
267
268 loop {
269 tokio::select! {
270 Some(cmd) = rx.recv() => {
271 match cmd {
272 IPCCommand::SetActivity { activity } => {
273 last_activity = Some(activity.clone());
274
275 if socket.send_activity(activity, session_start).await.is_err() {
276 break;
277 }
278 },
279 IPCCommand::ClearActivity => {
280 last_activity = None;
281
282 if socket.clear_activity().await.is_err() { break; }
283 },
284 IPCCommand::Close => {
285 let _ = socket.close().await;
286 running.store(false, Ordering::SeqCst);
287 break 'outer;
288 }
289 }
290 }
291
292 frame = socket.read_frame() => {
293 match frame {
294 Ok(frame) => match frame.opcode {
295 1 => {
296 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
297 if json.evt.as_deref() == Some("ERROR") {
298 eprintln!("Discord RPC error: {:?}", json.data);
299 }
300 }
301 }
302 2 => break,
303 3 => {
304 if socket.send_frame(3, frame.body).await.is_err() { break; }
305 }
306 _ => {}
307 },
308 Err(_) => break,
309 }
310 }
311 }
312 }
313
314 sleep(Duration::from_secs(backoff)).await;
315 backoff = (backoff * 2).min(4);
316 }
317
318 Ok(())
319 });
320
321 self.handle = Some(handle);
322
323 if wait_for_ready {
324 match ready_rx.await {
325 Ok(()) => Ok(()),
326 Err(_) => bail!("Background task exited before READY."),
327 }
328 } else {
329 Ok(())
330 }
331 }
332
333 pub async fn wait(&mut self) -> Result<()> {
335 if let Some(handle) = self.handle.take() {
336 handle.await??;
337 }
338
339 Ok(())
340 }
341
342 pub fn is_running(&self) -> bool {
344 self.running.load(Ordering::SeqCst)
345 }
346
347 pub async fn set_activity(&self, activity: Activity) -> Result<()> {
350 if !self.is_running() {
351 bail!("Call .run() before .set_activity() execution.");
352 }
353
354 self.tx.send(IPCCommand::SetActivity { activity }).await?;
355 Ok(())
356 }
357
358 pub async fn clear_activity(&self) -> Result<()> {
360 if self.is_running() {
361 self.tx.send(IPCCommand::ClearActivity).await?;
362 }
363
364 Ok(())
365 }
366
367 pub async fn close(&mut self) -> Result<()> {
369 if self.is_running() {
370 let _ = self.tx.send(IPCCommand::Close).await;
371 if let Some(handle) = self.handle.take() {
372 if let Err(e) = handle.await {
373 eprintln!("DiscordIPC background task failed on close: {e:?}");
374 }
375 }
376 }
377
378 Ok(())
379 }
380}
381
382impl DiscordIPCSocket {
390 async fn send_activity(&mut self, activity: Activity, session_start: u64) -> Result<()> {
391 let current_t = get_current_timestamp()?;
392 let end_timestamp = activity.duration.map(|d| current_t + d.as_secs());
393
394 let cmd = IPCActivityCmd::new_with(Some(ActivityPayload {
395 details: activity.details,
396 state: activity.state,
397 timestamps: TimestampPayload {
398 start: session_start,
399 end: end_timestamp,
400 },
401 }));
402
403 self.send_cmd(cmd).await
404 }
405
406 async fn clear_activity(&mut self) -> Result<()> {
407 let cmd = IPCActivityCmd::new_with(None);
408 self.send_cmd(cmd).await?;
409 Ok(())
410 }
411
412 async fn do_handshake(&mut self, client_id: &str) -> Result<()> {
413 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
414 self.send_frame(0, handshake).await?;
415 Ok(())
416 }
417
418 async fn send_cmd(&mut self, cmd: IPCActivityCmd) -> Result<()> {
419 let cmd = cmd.to_json()?;
420 self.send_frame(1, cmd).await?;
421 Ok(())
422 }
423}
424
425pub struct DiscordIPCSync {
433 inner: DiscordIPC,
434 rt: Runtime,
435}
436
437impl DiscordIPCSync {
438 pub fn new(client_id: &str) -> Result<Self> {
440 let rt = Builder::new_multi_thread().enable_all().build()?;
441 let inner = DiscordIPC::new(client_id);
442
443 Ok(Self { inner, rt })
444 }
445
446 pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
448 self.inner.on_ready = Some(Box::new(f));
449 self
450 }
451
452 pub fn client_id(&self) -> String {
454 self.inner.client_id()
455 }
456
457 pub fn run(&mut self, wait_for_ready: bool) -> Result<()> {
460 self.rt.block_on(self.inner.run(wait_for_ready))
461 }
462
463 pub fn wait(&mut self) -> Result<()> {
466 self.rt.block_on(self.inner.wait())
467 }
468
469 pub fn is_running(&self) -> bool {
471 self.inner.is_running()
472 }
473
474 pub fn set_activity(&self, activity: Activity) -> Result<()> {
477 self.rt.block_on(self.inner.set_activity(activity))
478 }
479
480 pub fn clear_activity(&self) -> Result<()> {
482 self.rt.block_on(self.inner.clear_activity())
483 }
484
485 pub fn close(&mut self) -> Result<()> {
487 self.rt.block_on(self.inner.close())
488 }
489}
490
491impl Drop for DiscordIPCSync {
492 fn drop(&mut self) {
493 let _ = self.rt.block_on(self.inner.close());
494 }
495}