1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::mpsc::{self, Sender},
17 task::JoinHandle,
18 time::{sleep, timeout},
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24fn get_current_timestamp() -> Result<u64> {
31 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35 let mut bytes = Vec::with_capacity(8);
36 bytes.extend_from_slice(&opcode.to_le_bytes());
37 bytes.extend_from_slice(&data_len.to_le_bytes());
38 bytes
39}
40
41#[derive(Debug)]
48enum IPCCommand {
49 SetActivity { details: String, state: String },
50 ClearActivity,
51 Close,
52}
53
54#[derive(Debug, Deserialize)]
55struct RpcFrame {
56 cmd: Option<String>,
57 evt: Option<String>,
58 data: Option<serde_json::Value>,
59}
60
61#[derive(Debug)]
63pub struct DiscordIPC {
64 tx: Sender<IPCCommand>,
65 client_id: String,
66 handle: Option<JoinHandle<Result<()>>>,
67 running: Arc<AtomicBool>,
68}
69
70impl DiscordIPC {
71 #[must_use]
73 pub fn new(client_id: &str) -> Self {
74 let (tx, _rx) = mpsc::channel(32);
75
76 Self {
77 tx,
78 client_id: client_id.to_string(),
79 handle: None,
80 running: Arc::new(AtomicBool::new(false)),
81 }
82 }
83
84 #[must_use]
86 pub fn client_id(&self) -> String {
87 self.client_id.clone()
88 }
89
90 pub fn is_running(&self) -> bool {
93 self.running.load(Ordering::SeqCst) && self.handle.is_some()
94 }
95
96 pub async fn run(&mut self) -> Result<()> {
98 if self.is_running() {
99 bail!("Cannot run multiple instances of .run().")
100 }
101
102 self.running.store(true, Ordering::SeqCst);
103 let running = self.running.clone();
104
105 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
106 self.tx = tx;
107 let client_id = self.client_id.clone();
108
109 let handle = tokio::spawn(async move {
110 let mut backoff = 1;
111 let mut last_activity: Option<(String, String)> = None;
112 let timestamp = get_current_timestamp()?;
113
114 while running.load(Ordering::SeqCst) {
115 let mut socket = match DiscordIPCSocket::new().await {
117 Ok(s) => s,
118 Err(_) => {
119 sleep(Duration::from_secs(backoff)).await;
120 continue;
121 }
122 };
123
124 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
126 let packed = pack(0, handshake.len() as u32);
127
128 if socket.write(&packed).await.is_err()
129 || socket.write(handshake.as_bytes()).await.is_err()
130 {
131 sleep(Duration::from_secs(backoff)).await;
132 continue;
133 }
134
135 loop {
137 let frame = match socket.read_frame().await {
138 Ok(f) => f,
139 Err(_) => break,
140 };
141
142 if frame.opcode != 1 {
143 continue;
144 }
145
146 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
147 if json.cmd.as_deref() == Some("DISPATCH")
148 && json.evt.as_deref() == Some("READY")
149 {
150 break;
151 }
152 if json.evt.as_deref() == Some("ERROR") {
153 eprintln!("Discord RPC error: {:?}", json.data);
154 }
155 }
156 }
157
158 if let Some((details, state)) = &last_activity {
160 send_activity(&mut socket, details, state, timestamp).await?;
161 }
162
163 backoff = 1;
164
165 loop {
166 tokio::select! {
167 Some(cmd) = rx.recv() => {
168 match cmd {
169 IPCCommand::SetActivity { details, state } => {
170 last_activity = Some((details.clone(), state.clone()));
171
172 if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
173 break;
174 }
175 },
176 IPCCommand::ClearActivity => {
177 if clear_activity(&mut socket).await.is_err() {
178 break;
179 }
180 }
181 IPCCommand::Close => {
182 clear_activity(&mut socket).await?;
183 socket.close().await?;
184 running.store(false, Ordering::SeqCst);
185 return Ok(());
186 }
187 }
188 }
189
190 frame = socket.read_frame() => {
191 match frame {
192 Ok(frame) => match frame.opcode {
193 1 => {
194 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
195 if json.evt.as_deref() == Some("ERROR") {
196 eprintln!("Discord RPC error: {:?}", json.data);
197 }
198 }
199 }
200 2 => break,
201 3 => {
202 let packed = pack(3, frame.body.len() as u32);
203 socket.write(&packed).await?;
204 socket.write(&frame.body).await?;
205 }
206 _ => {}
207 },
208 Err(_) => break,
209 }
210 }
211 }
212 }
213
214 sleep(Duration::from_secs(backoff)).await;
215 backoff = (backoff * 2).min(4);
216 }
217
218 Ok(())
219 });
220
221 self.handle = Some(handle);
222
223 Ok(())
224 }
225
226 pub async fn wait(&mut self) -> Result<()> {
229 if let Some(handle) = self.handle.take() {
230 handle.await??;
231 }
232 Ok(())
233 }
234
235 pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
237 if !self.is_running() {
238 bail!("Call .run() before .set_activity().");
239 }
240
241 self.tx
242 .send(IPCCommand::SetActivity {
243 details: details.to_string(),
244 state: state.to_string(),
245 })
246 .await?;
247
248 Ok(())
249 }
250
251 pub async fn clear_activity(&self) -> Result<()> {
254 if self.is_running() {
255 self.tx.send(IPCCommand::ClearActivity).await?;
256 }
257
258 Ok(())
259 }
260
261 const CLOSE_COOLDOWN_MS: u64 = 200;
262
263 pub async fn close(&mut self) -> Result<()> {
269 if !self.running.swap(false, Ordering::SeqCst) {
270 return Ok(());
271 }
272
273 self.tx.send(IPCCommand::Close).await?;
274
275 if let Some(handle) = self.handle.take() {
276 match timeout(Duration::from_secs(2), handle).await {
277 Ok(result) => result??,
278 Err(_) => eprintln!("DiscordIPC close() timed out."),
279 }
280 }
281
282 sleep(Duration::from_millis(Self::CLOSE_COOLDOWN_MS)).await;
283
284 Ok(())
285 }
286}
287
288async fn send_activity(
295 socket: &mut DiscordIPCSocket,
296 details: &str,
297 state: &str,
298 timestamp: u64,
299) -> Result<()> {
300 let json = json!({
301 "cmd": "SET_ACTIVITY",
302 "args": {
303 "pid": std::process::id(),
304 "activity": {
305 "details": details,
306 "state": state,
307 "timestamps": { "start": timestamp }
308 }
309 },
310 "nonce": Uuid::new_v4().to_string()
311 })
312 .to_string();
313
314 let packed = pack(1, json.len() as u32);
315 socket.write(&packed).await?;
316 socket.write(json.as_bytes()).await?;
317 Ok(())
318}
319
320async fn clear_activity(socket: &mut DiscordIPCSocket) -> Result<()> {
321 let json = json!({
322 "cmd": "SET_ACTIVITY",
323 "args": {
324 "pid": std::process::id(),
325 "activity": null
326 },
327 "nonce": Uuid::new_v4().to_string()
328 })
329 .to_string();
330
331 let packed = pack(1, json.len() as u32);
332 socket.write(&packed).await?;
333 socket.write(json.as_bytes()).await?;
334 Ok(())
335}
336
337#[derive(Debug)]
346pub struct DiscordIPCSync {
347 inner: DiscordIPC,
348 rt: Runtime,
349}
350
351impl DiscordIPCSync {
352 pub fn new(client_id: &str) -> Result<Self> {
354 let rt = Builder::new_multi_thread().enable_all().build()?;
355 let inner = DiscordIPC::new(client_id);
356 Ok(Self { inner, rt })
357 }
358
359 pub fn client_id(&self) -> String {
361 self.inner.client_id()
362 }
363
364 pub fn run(&mut self) -> Result<()> {
366 self.rt.block_on(self.inner.run())
367 }
368
369 pub fn wait(&mut self) -> Result<()> {
371 self.rt.block_on(self.inner.wait())
372 }
373
374 pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
377 self.rt.block_on(self.inner.set_activity(details, state))
378 }
379
380 pub fn clear_activity(&self) -> Result<()> {
382 self.rt.block_on(self.inner.clear_activity())
383 }
384
385 pub fn close(&mut self) -> Result<()> {
388 self.rt.block_on(self.inner.close())
389 }
390}