1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::mpsc::{self, Sender},
17 task::JoinHandle,
18 time::{sleep, timeout},
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24fn get_current_timestamp() -> Result<u64> {
31 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35 let mut bytes = Vec::with_capacity(8);
36 bytes.extend_from_slice(&opcode.to_le_bytes());
37 bytes.extend_from_slice(&data_len.to_le_bytes());
38 bytes
39}
40
41#[derive(Debug)]
48enum IPCCommand {
49 SetActivity { details: String, state: String },
50 ClearActivity,
51 Close,
52}
53
54#[derive(Debug, Deserialize)]
55struct RpcFrame {
56 cmd: Option<String>,
57 evt: Option<String>,
58 data: Option<serde_json::Value>,
59}
60
61#[derive(Debug)]
63pub struct DiscordIPC {
64 tx: Sender<IPCCommand>,
65 client_id: String,
66 handle: Option<JoinHandle<Result<()>>>,
67 running: Arc<AtomicBool>,
68}
69
70impl DiscordIPC {
71 #[must_use]
73 pub fn new(client_id: &str) -> Self {
74 let (tx, _rx) = mpsc::channel(32);
75
76 Self {
77 tx,
78 client_id: client_id.to_string(),
79 handle: None,
80 running: Arc::new(AtomicBool::new(false)),
81 }
82 }
83
84 #[must_use]
86 pub fn client_id(&self) -> String {
87 self.client_id.clone()
88 }
89
90 pub fn is_running(&self) -> bool {
93 self.running.load(Ordering::SeqCst) && self.handle.is_some()
94 }
95
96 pub async fn run(&mut self) -> Result<()> {
98 if self.handle.is_some() && self.running.swap(true, Ordering::SeqCst) {
99 bail!("Cannot run multiple instances of .run().");
100 }
101
102 let running = self.running.clone();
103
104 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
105 self.tx = tx;
106 let client_id = self.client_id.clone();
107
108 let handle = tokio::spawn(async move {
109 let mut backoff = 1;
110 let mut last_activity: Option<(String, String)> = None;
111 let timestamp = get_current_timestamp()?;
112
113 while running.load(Ordering::SeqCst) {
114 let mut socket = match DiscordIPCSocket::new().await {
116 Ok(s) => s,
117 Err(_) => {
118 sleep(Duration::from_secs(backoff)).await;
119 continue;
120 }
121 };
122
123 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
125 let packed = pack(0, handshake.len() as u32);
126
127 if socket.write(&packed).await.is_err()
128 || socket.write(handshake.as_bytes()).await.is_err()
129 {
130 sleep(Duration::from_secs(backoff)).await;
131 continue;
132 }
133
134 loop {
136 let frame = match socket.read_frame().await {
137 Ok(f) => f,
138 Err(_) => break,
139 };
140
141 if frame.opcode != 1 {
142 continue;
143 }
144
145 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
146 if json.cmd.as_deref() == Some("DISPATCH")
147 && json.evt.as_deref() == Some("READY")
148 {
149 break;
150 }
151 if json.evt.as_deref() == Some("ERROR") {
152 eprintln!("Discord RPC error: {:?}", json.data);
153 }
154 }
155 }
156
157 if let Some((details, state)) = &last_activity {
159 let _ = send_activity(&mut socket, details, state, timestamp).await;
160 }
161
162 backoff = 1;
163
164 loop {
165 tokio::select! {
166 Some(cmd) = rx.recv() => {
167 match cmd {
168 IPCCommand::SetActivity { details, state } => {
169 last_activity = Some((details.clone(), state.clone()));
170
171 if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
172 break;
173 }
174 },
175 IPCCommand::ClearActivity => {
176 if clear_activity(&mut socket).await.is_err() {
177 break;
178 }
179 }
180 IPCCommand::Close => {
181 clear_activity(&mut socket).await?;
182 socket.close().await?;
183 running.store(false, Ordering::SeqCst);
184 return Ok(());
185 }
186 }
187 }
188
189 frame = socket.read_frame() => {
190 match frame {
191 Ok(frame) => match frame.opcode {
192 1 => {
193 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
194 if json.evt.as_deref() == Some("ERROR") {
195 eprintln!("Discord RPC error: {:?}", json.data);
196 }
197 }
198 }
199 2 => break,
200 3 => {
201 let packed = pack(3, frame.body.len() as u32);
202 if socket.write(&packed).await.is_err() { break; }
203 if socket.write(&frame.body).await.is_err() { break; }
204 }
205 _ => {}
206 },
207 Err(_) => break,
208 }
209 }
210 }
211 }
212
213 sleep(Duration::from_secs(backoff)).await;
214 backoff = (backoff * 2).min(4);
215 }
216
217 Ok(())
218 });
219
220 self.handle = Some(handle);
221
222 Ok(())
223 }
224
225 pub async fn wait(&mut self) -> Result<()> {
228 if let Some(handle) = self.handle.take() {
229 handle.await??;
230 }
231 Ok(())
232 }
233
234 pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
236 if !self.is_running() {
237 bail!("Call .run() before .set_activity().");
238 }
239
240 self.tx
241 .send(IPCCommand::SetActivity {
242 details: details.to_string(),
243 state: state.to_string(),
244 })
245 .await?;
246
247 Ok(())
248 }
249
250 pub async fn clear_activity(&self) -> Result<()> {
253 if self.is_running() {
254 self.tx.send(IPCCommand::ClearActivity).await?;
255 }
256
257 Ok(())
258 }
259
260 const CLOSE_COOLDOWN_MS: u64 = 200;
261
262 pub async fn close(&mut self) -> Result<()> {
268 if !self.running.swap(false, Ordering::SeqCst) {
269 return Ok(());
270 }
271
272 self.tx.send(IPCCommand::Close).await?;
273
274 if let Some(handle) = self.handle.take() {
275 match timeout(Duration::from_secs(2), handle).await {
276 Ok(result) => result??,
277 Err(_) => eprintln!("DiscordIPC close() timed out."),
278 }
279 }
280
281 sleep(Duration::from_millis(Self::CLOSE_COOLDOWN_MS)).await;
282
283 Ok(())
284 }
285}
286
287async fn send_activity(
294 socket: &mut DiscordIPCSocket,
295 details: &str,
296 state: &str,
297 timestamp: u64,
298) -> Result<()> {
299 let json = json!({
300 "cmd": "SET_ACTIVITY",
301 "args": {
302 "pid": std::process::id(),
303 "activity": {
304 "details": details,
305 "state": state,
306 "timestamps": { "start": timestamp }
307 }
308 },
309 "nonce": Uuid::new_v4().to_string()
310 })
311 .to_string();
312
313 let packed = pack(1, json.len() as u32);
314 socket.write(&packed).await?;
315 socket.write(json.as_bytes()).await?;
316 Ok(())
317}
318
319async fn clear_activity(socket: &mut DiscordIPCSocket) -> Result<()> {
320 let json = json!({
321 "cmd": "SET_ACTIVITY",
322 "args": {
323 "pid": std::process::id(),
324 "activity": null
325 },
326 "nonce": Uuid::new_v4().to_string()
327 })
328 .to_string();
329
330 let packed = pack(1, json.len() as u32);
331 socket.write(&packed).await?;
332 socket.write(json.as_bytes()).await?;
333 Ok(())
334}
335
336#[derive(Debug)]
345pub struct DiscordIPCSync {
346 inner: DiscordIPC,
347 rt: Runtime,
348}
349
350impl DiscordIPCSync {
351 pub fn new(client_id: &str) -> Result<Self> {
353 let rt = Builder::new_multi_thread().enable_all().build()?;
354 let inner = DiscordIPC::new(client_id);
355 Ok(Self { inner, rt })
356 }
357
358 pub fn client_id(&self) -> String {
360 self.inner.client_id()
361 }
362
363 pub fn run(&mut self) -> Result<()> {
365 self.rt.block_on(self.inner.run())
366 }
367
368 pub fn wait(&mut self) -> Result<()> {
370 self.rt.block_on(self.inner.wait())
371 }
372
373 pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
376 self.rt.block_on(self.inner.set_activity(details, state))
377 }
378
379 pub fn clear_activity(&self) -> Result<()> {
381 self.rt.block_on(self.inner.clear_activity())
382 }
383
384 pub fn close(&mut self) -> Result<()> {
387 self.rt.block_on(self.inner.close())
388 }
389}