1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::{
17 mpsc::{self, Sender},
18 oneshot,
19 },
20 task::JoinHandle,
21 time::sleep,
22};
23
24use crate::{
25 socket::DiscordIPCSocket,
26 types::{ActivityPayload, IPCActivityCmd, TimestampPayload},
27};
28
29fn get_current_timestamp() -> Result<u64> {
36 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
37}
38
39fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
40 let mut bytes = Vec::with_capacity(8);
41 bytes.extend_from_slice(&opcode.to_le_bytes());
42 bytes.extend_from_slice(&data_len.to_le_bytes());
43 bytes
44}
45
46#[derive(Debug)]
53enum IPCCommand {
54 SetActivity { activity: Activity },
55 ClearActivity,
56 Close,
57}
58
59#[derive(Debug, Deserialize)]
60struct RpcFrame {
61 cmd: Option<String>,
62 evt: Option<String>,
63 data: Option<ReadyData>,
64}
65
66#[derive(Debug, Clone, Deserialize)]
68pub struct ReadyData {
69 pub user: DiscordUser,
70}
71
72#[derive(Debug, Clone, Deserialize)]
74pub struct DiscordUser {
75 pub id: String,
76 pub username: String,
77 pub global_name: Option<String>,
78 pub discriminator: Option<String>,
79 pub avatar: Option<String>,
80 pub avatar_decoration_data: Option<serde_json::Value>,
81 pub bot: bool,
82 pub flags: Option<u64>,
83 pub premium_type: Option<u64>,
84}
85
86#[derive(Debug, Clone)]
88pub struct Activity {
89 pub(crate) details: String,
90 pub(crate) state: Option<String>,
91 pub(crate) duration: Option<Duration>,
92}
93
94impl Activity {
95 pub fn new(details: impl Into<String>) -> Self {
97 Self {
98 details: details.into(),
99 state: None,
100 duration: None,
101 }
102 }
103
104 pub fn state(mut self, state: impl Into<String>) -> Self {
106 self.state = Some(state.into());
107 self
108 }
109
110 pub fn duration(mut self, duration: Duration) -> Self {
112 self.duration = Some(duration);
113 self
114 }
115}
116
117pub struct DiscordIPC {
125 tx: Sender<IPCCommand>,
126 client_id: String,
127 running: Arc<AtomicBool>,
128 handle: Option<JoinHandle<Result<()>>>,
129 on_ready: Option<Box<dyn Fn(ReadyData) + Send + Sync + 'static>>,
130}
131
132impl DiscordIPC {
133 pub fn new(client_id: &str) -> Self {
135 let (tx, _rx) = mpsc::channel(32);
136
137 Self {
138 tx,
139 client_id: client_id.to_string(),
140 running: Arc::new(AtomicBool::new(false)),
141 handle: None,
142 on_ready: None,
143 }
144 }
145
146 pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
148 self.on_ready = Some(Box::new(f));
149 self
150 }
151
152 pub fn client_id(&self) -> String {
154 self.client_id.clone()
155 }
156
157 pub async fn run(&mut self, wait_for_ready: bool) -> Result<()> {
160 if self.running.swap(true, Ordering::SeqCst) {
161 bail!(
162 "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
163 )
164 }
165
166 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
167 self.tx = tx;
168 let client_id = self.client_id.clone();
169 let running = self.running.clone();
170
171 let (ready_tx, ready_rx) = oneshot::channel::<()>();
173
174 let on_ready = self.on_ready.take();
175
176 let handle = tokio::spawn(async move {
177 let mut backoff = 1;
178 let mut last_activity: Option<Activity> = None;
179 let mut ready_tx = Some(ready_tx);
180
181 'outer: while running.load(Ordering::SeqCst) {
182 let mut socket = match DiscordIPCSocket::new().await {
184 Ok(s) => s,
185 Err(_) => {
186 sleep(Duration::from_secs(backoff)).await;
187 continue;
188 }
189 };
190
191 if socket.do_handshake(&client_id).await.is_err() {
193 sleep(Duration::from_secs(backoff)).await;
194 continue;
195 }
196
197 loop {
199 let frame = match socket.read_frame().await {
200 Ok(f) => f,
201 Err(_) => continue 'outer,
202 };
203
204 if frame.opcode != 1 {
205 continue;
206 }
207
208 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
209 if json.cmd.as_deref() == Some("DISPATCH")
210 && json.evt.as_deref() == Some("READY")
211 {
212 if let Some(tx) = ready_tx.take() {
213 let _ = tx.send(());
214 }
215 if let (Some(f), Some(data)) = (&on_ready, json.data) {
216 f(data);
217 }
218 break;
219 }
220
221 if json.evt.as_deref() == Some("ERROR") {
222 eprintln!("Discord RPC error: {:?}", json.data);
223 }
224 }
225 }
226
227 let session_start = get_current_timestamp()?;
229
230 if let Some(activity) = &last_activity {
232 let _ = socket.send_activity(activity.clone(), session_start).await;
233 }
234
235 backoff = 1;
236
237 loop {
238 tokio::select! {
239 Some(cmd) = rx.recv() => {
240 match cmd {
241 IPCCommand::SetActivity { activity } => {
242 last_activity = Some(activity.clone());
243
244 if socket.send_activity(activity, session_start).await.is_err() {
245 break;
246 }
247 },
248 IPCCommand::ClearActivity => {
249 last_activity = None;
250
251 if socket.clear_activity().await.is_err() { break; }
252 },
253 IPCCommand::Close => {
254 let _ = socket.close().await;
255 running.store(false, Ordering::SeqCst);
256 break 'outer;
257 }
258 }
259 }
260
261 frame = socket.read_frame() => {
262 match frame {
263 Ok(frame) => match frame.opcode {
264 1 => {
265 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
266 if json.evt.as_deref() == Some("ERROR") {
267 eprintln!("Discord RPC error: {:?}", json.data);
268 }
269 }
270 }
271 2 => break,
272 3 => {
273 let packed = pack(3, frame.body.len() as u32);
274 if socket.write(&packed).await.is_err() { break; }
275 if socket.write(&frame.body).await.is_err() { break; }
276 }
277 _ => {}
278 },
279 Err(_) => break,
280 }
281 }
282 }
283 }
284
285 sleep(Duration::from_secs(backoff)).await;
286 backoff = (backoff * 2).min(4);
287 }
288
289 Ok(())
290 });
291
292 self.handle = Some(handle);
293
294 if wait_for_ready {
295 match ready_rx.await {
296 Ok(()) => Ok(()),
297 Err(_) => bail!("Background task exited before READY."),
298 }
299 } else {
300 Ok(())
301 }
302 }
303
304 pub async fn wait(&mut self) -> Result<()> {
306 if let Some(handle) = self.handle.take() {
307 handle.await??;
308 }
309
310 Ok(())
311 }
312
313 pub fn is_running(&self) -> bool {
315 self.running.load(Ordering::SeqCst)
316 }
317
318 pub async fn set_activity(&self, activity: Activity) -> Result<()> {
321 if !self.is_running() {
322 bail!("Call .run() before .set_activity() execution.");
323 }
324
325 self.tx.send(IPCCommand::SetActivity { activity }).await?;
326 Ok(())
327 }
328
329 pub async fn clear_activity(&self) -> Result<()> {
331 if self.is_running() {
332 self.tx.send(IPCCommand::ClearActivity).await?;
333 }
334
335 Ok(())
336 }
337
338 pub async fn close(&mut self) -> Result<()> {
340 if self.is_running() {
341 let _ = self.tx.send(IPCCommand::Close).await;
342 if let Some(handle) = self.handle.take() {
343 if let Err(e) = handle.await {
344 eprintln!("DiscordIPC background task failed on close: {e:?}");
345 }
346 }
347 }
348
349 Ok(())
350 }
351}
352
353impl DiscordIPCSocket {
361 async fn send_activity(&mut self, activity: Activity, session_start: u64) -> Result<()> {
362 let current_t = get_current_timestamp()?;
363 let end_timestamp = activity.duration.map(|d| current_t + d.as_secs());
364
365 let cmd = IPCActivityCmd::new_with(Some(ActivityPayload {
366 details: activity.details,
367 state: activity.state,
368 timestamps: TimestampPayload {
369 start: session_start,
370 end: end_timestamp,
371 },
372 }));
373
374 self.send_cmd(cmd).await
375 }
376
377 async fn clear_activity(&mut self) -> Result<()> {
378 let cmd = IPCActivityCmd::new_with(None);
379 self.send_cmd(cmd).await?;
380 Ok(())
381 }
382
383 async fn do_handshake(&mut self, client_id: &str) -> Result<()> {
384 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
385 let packed = pack(0, handshake.len() as u32);
386
387 self.write(&packed).await?;
388 self.write(handshake.as_bytes()).await?;
389
390 Ok(())
391 }
392
393 async fn send_cmd(&mut self, cmd: IPCActivityCmd) -> Result<()> {
394 let cmd = cmd.to_json()?;
395
396 let packed = pack(1, cmd.len() as u32);
397 self.write(&packed).await?;
398 self.write(cmd.as_bytes()).await?;
399 Ok(())
400 }
401}
402
403pub struct DiscordIPCSync {
411 inner: DiscordIPC,
412 rt: Runtime,
413}
414
415impl DiscordIPCSync {
416 pub fn new(client_id: &str) -> Result<Self> {
418 let rt = Builder::new_multi_thread().enable_all().build()?;
419 let inner = DiscordIPC::new(client_id);
420
421 Ok(Self { inner, rt })
422 }
423
424 pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
426 self.inner.on_ready = Some(Box::new(f));
427 self
428 }
429
430 pub fn client_id(&self) -> String {
432 self.inner.client_id()
433 }
434
435 pub fn run(&mut self, wait_for_ready: bool) -> Result<()> {
438 self.rt.block_on(self.inner.run(wait_for_ready))
439 }
440
441 pub fn wait(&mut self) -> Result<()> {
444 self.rt.block_on(self.inner.wait())
445 }
446
447 pub fn is_running(&self) -> bool {
449 self.inner.is_running()
450 }
451
452 pub fn set_activity(&self, activity: Activity) -> Result<()> {
455 self.rt.block_on(self.inner.set_activity(activity))
456 }
457
458 pub fn clear_activity(&self) -> Result<()> {
460 self.rt.block_on(self.inner.clear_activity())
461 }
462
463 pub fn close(&mut self) -> Result<()> {
465 self.rt.block_on(self.inner.close())
466 }
467}
468
469impl Drop for DiscordIPCSync {
470 fn drop(&mut self) {
471 let _ = self.rt.block_on(self.inner.close());
472 }
473}