1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::mpsc::{self, Sender},
17 task::JoinHandle,
18 time::sleep,
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24fn get_current_timestamp() -> Result<u64> {
31 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35 let mut bytes = Vec::with_capacity(8);
36 bytes.extend_from_slice(&opcode.to_le_bytes());
37 bytes.extend_from_slice(&data_len.to_le_bytes());
38 bytes
39}
40
41#[derive(Debug)]
48enum IPCCommand {
49 SetActivity { details: String, state: String },
50 ClearActivity,
51 Close,
52}
53
54#[derive(Debug, Deserialize)]
55struct RpcFrame {
56 cmd: Option<String>,
57 evt: Option<String>,
58 data: Option<serde_json::Value>,
59}
60
61#[derive(Debug)]
69pub struct DiscordIPC {
70 tx: Sender<IPCCommand>,
71 client_id: String,
72 running: Arc<AtomicBool>,
73 handle: Option<JoinHandle<Result<()>>>,
74}
75
76impl DiscordIPC {
77 pub fn new(client_id: &str) -> Self {
78 let (tx, _rx) = mpsc::channel(32);
79
80 Self {
81 tx,
82 client_id: client_id.to_string(),
83 running: Arc::new(AtomicBool::new(false)),
84 handle: None,
85 }
86 }
87
88 pub fn client_id(&self) -> String {
90 self.client_id.clone()
91 }
92
93 pub async fn run(&mut self) -> Result<()> {
97 if self.running.swap(true, Ordering::SeqCst) {
98 bail!(
99 "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
100 )
101 }
102
103 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
104 self.tx = tx;
105 let client_id = self.client_id.clone();
106
107 let running = self.running.clone();
108
109 let handle = tokio::spawn(async move {
110 let mut backoff = 1;
111 let mut last_activity: Option<(String, String)> = None;
112 let timestamp = get_current_timestamp()?;
113
114 while running.load(Ordering::SeqCst) {
115 let mut socket = match DiscordIPCSocket::new().await {
117 Ok(s) => s,
118 Err(_) => {
119 sleep(Duration::from_secs(backoff)).await;
120 continue;
121 }
122 };
123
124 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
126 let packed = pack(0, handshake.len() as u32);
127
128 if socket.write(&packed).await.is_err()
129 || socket.write(handshake.as_bytes()).await.is_err()
130 {
131 sleep(Duration::from_secs(backoff)).await;
132 continue;
133 }
134
135 loop {
137 let frame = match socket.read_frame().await {
138 Ok(f) => f,
139 Err(_) => break,
140 };
141
142 if frame.opcode != 1 {
143 continue;
144 }
145
146 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
147 if json.cmd.as_deref() == Some("DISPATCH")
148 && json.evt.as_deref() == Some("READY")
149 {
150 break;
151 }
152 if json.evt.as_deref() == Some("ERROR") {
153 eprintln!("Discord RPC error: {:?}", json.data);
154 }
155 }
156 }
157
158 if let Some((details, state)) = &last_activity {
160 let _ = send_activity(&mut socket, details, state, timestamp).await;
161 }
162
163 backoff = 1;
164
165 loop {
166 tokio::select! {
167 Some(cmd) = rx.recv() => {
168 match cmd {
169 IPCCommand::SetActivity { details, state } => {
170 last_activity = Some((details.clone(), state.clone()));
171
172 if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
173 break;
174 }
175 },
176 IPCCommand::ClearActivity => {
177 if clear_activity(&mut socket).await.is_err() {
178 break;
179 }
180 },
181 IPCCommand::Close => {
182 socket.close().await?;
183 running.store(false, Ordering::SeqCst);
184 return Ok(());
185 }
186 }
187 }
188
189 frame = socket.read_frame() => {
190 match frame {
191 Ok(frame) => match frame.opcode {
192 1 => {
193 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
194 if json.evt.as_deref() == Some("ERROR") {
195 eprintln!("Discord RPC error: {:?}", json.data);
196 }
197 }
198 }
199 2 => break,
200 3 => {
201 let packed = pack(3, frame.body.len() as u32);
202 socket.write(&packed).await?;
203 socket.write(&frame.body).await?;
204 }
205 _ => {}
206 },
207 Err(_) => break,
208 }
209 }
210 }
211 }
212
213 sleep(Duration::from_secs(backoff)).await;
214 backoff = (backoff * 2).min(4);
215 }
216
217 Ok(())
218 });
219
220 self.handle = Some(handle);
221 Ok(())
222 }
223
224 pub async fn wait(&mut self) -> Result<()> {
226 if let Some(handle) = self.handle.take() {
227 handle.await??;
228 }
229
230 Ok(())
231 }
232
233 pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
236 if !self.running.load(Ordering::SeqCst) {
237 bail!("Call .run() before .set_activity() execution.");
238 }
239
240 self.tx
241 .send(IPCCommand::SetActivity {
242 details: details.to_string(),
243 state: state.to_string(),
244 })
245 .await?;
246 Ok(())
247 }
248
249 pub async fn clear_activity(&self) -> Result<()> {
251 if !self.running.load(Ordering::SeqCst) {
252 return Ok(());
253 }
254
255 self.tx.send(IPCCommand::ClearActivity).await?;
256 Ok(())
257 }
258
259 const CLOSE_COOLDOWN_MILLIS: u64 = 200;
261
262 pub async fn close(&mut self) -> Result<()> {
264 if self.running.swap(false, Ordering::SeqCst) {
265 let _ = self.tx.send(IPCCommand::Close).await;
266 if let Some(handle) = self.handle.take() {
267 if let Err(e) = handle.await {
268 eprintln!("DiscordIPC background task failed on close: {e:?}");
269 }
270 }
271 }
272
273 tokio::time::sleep(Duration::from_millis(Self::CLOSE_COOLDOWN_MILLIS)).await;
274 Ok(())
275 }
276}
277
278async fn send_activity(
279 socket: &mut DiscordIPCSocket,
280 details: &str,
281 state: &str,
282 timestamp: u64,
283) -> Result<()> {
284 let json = json!({
285 "cmd": "SET_ACTIVITY",
286 "args": {
287 "pid": std::process::id(),
288 "activity": {
289 "details": details,
290 "state": state,
291 "timestamps": { "start": timestamp }
292 }
293 },
294 "nonce": Uuid::new_v4().to_string()
295 })
296 .to_string();
297
298 let packed = pack(1, json.len() as u32);
299 socket.write(&packed).await?;
300 socket.write(json.as_bytes()).await?;
301 Ok(())
302}
303
304async fn clear_activity(socket: &mut DiscordIPCSocket) -> Result<()> {
305 let json = json!({
306 "cmd": "SET_ACTIVITY",
307 "args": {
308 "pid": std::process::id(),
309 "activity": null
310 },
311 "nonce": Uuid::new_v4().to_string()
312 })
313 .to_string();
314
315 let packed = pack(1, json.len() as u32);
316 socket.write(&packed).await?;
317 socket.write(json.as_bytes()).await?;
318 Ok(())
319}
320
321#[derive(Debug)]
328pub struct DiscordIPCSync {
329 inner: DiscordIPC,
330 rt: Runtime,
331}
332
333impl DiscordIPCSync {
334 pub fn new(client_id: &str) -> Result<Self> {
335 let rt = Builder::new_multi_thread().enable_all().build()?;
336 let inner = DiscordIPC::new(client_id);
337
338 Ok(Self { inner, rt })
339 }
340
341 pub fn client_id(&self) -> String {
342 self.inner.client_id()
343 }
344
345 pub fn run(&mut self) -> Result<()> {
346 self.rt.block_on(self.inner.run())
347 }
348
349 pub fn wait(&mut self) -> Result<()> {
350 self.rt.block_on(self.inner.wait())
351 }
352
353 pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
354 self.rt.block_on(self.inner.set_activity(details, state))
355 }
356
357 pub fn close(&mut self) -> Result<()> {
358 self.rt.block_on(self.inner.close())
359 }
360}
361
362impl Drop for DiscordIPCSync {
363 fn drop(&mut self) {
364 let _ = self.rt.block_on(self.inner.close());
365 }
366}