1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::mpsc::{self, Sender},
17 task::JoinHandle,
18 time::sleep,
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24fn get_current_timestamp() -> Result<u64> {
31 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35 let mut bytes = Vec::with_capacity(8);
36 bytes.extend_from_slice(&opcode.to_le_bytes());
37 bytes.extend_from_slice(&data_len.to_le_bytes());
38 bytes
39}
40
41#[derive(Debug)]
48enum IPCCommand {
49 SetActivity { details: String, state: String },
50 Close,
51}
52
53#[derive(Debug, Deserialize)]
54struct RpcFrame {
55 cmd: Option<String>,
56 evt: Option<String>,
57 data: Option<serde_json::Value>,
58}
59
60#[derive(Debug, Clone)]
62pub struct DiscordIPC {
63 tx: Sender<IPCCommand>,
64 client_id: String,
65 running: Arc<AtomicBool>,
66}
67
68impl DiscordIPC {
69 pub async fn new(client_id: &str) -> Result<Self> {
70 let (tx, _rx) = mpsc::channel(32);
71
72 Ok(Self {
73 tx,
74 client_id: client_id.to_string(),
75 running: Arc::new(AtomicBool::new(false)),
76 })
77 }
78
79 pub fn client_id(&self) -> String {
81 self.client_id.clone()
82 }
83
84 pub async fn run(&mut self) -> Result<JoinHandle<Result<()>>> {
88 if self.running.swap(true, Ordering::SeqCst) {
89 bail!(
90 "Cannot run multiple instances of .run() for DiscordIPC, or when a session is still closing."
91 )
92 }
93
94 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
95 self.tx = tx;
96 let client_id = self.client_id.clone();
97
98 let running = self.running.clone();
99
100 let handle = tokio::spawn(async move {
101 let mut backoff = 1;
102 let mut last_activity: Option<(String, String)> = None;
103 let timestamp = get_current_timestamp()?;
104
105 while running.load(Ordering::SeqCst) {
106 let mut socket = match DiscordIPCSocket::new().await {
108 Ok(s) => s,
109 Err(_) => {
110 sleep(Duration::from_secs(backoff)).await;
111 continue;
112 }
113 };
114
115 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
117 let packed = pack(0, handshake.len() as u32);
118
119 if socket.write(&packed).await.is_err()
120 || socket.write(handshake.as_bytes()).await.is_err()
121 {
122 sleep(Duration::from_secs(backoff)).await;
123 continue;
124 }
125
126 loop {
128 let frame = match socket.read_frame().await {
129 Ok(f) => f,
130 Err(_) => break,
131 };
132
133 if frame.opcode != 1 {
134 continue;
135 }
136
137 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
138 if json.cmd.as_deref() == Some("DISPATCH")
139 && json.evt.as_deref() == Some("READY")
140 {
141 break;
142 }
143 if json.evt.as_deref() == Some("ERROR") {
144 eprintln!("Discord RPC error: {:?}", json.data);
145 }
146 }
147 }
148
149 if let Some((details, state)) = &last_activity {
151 let _ = send_activity(&mut socket, details, state, timestamp).await;
152 }
153
154 backoff = 1;
155
156 loop {
157 tokio::select! {
158 Some(cmd) = rx.recv() => {
159 match cmd {
160 IPCCommand::SetActivity { details, state } => {
161 last_activity = Some((details.clone(), state.clone()));
162
163 if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
164 break;
165 }
166 },
167 IPCCommand::Close => {
168 socket.close().await?;
169 running.store(false, Ordering::SeqCst);
170 return Ok(());
171 }
172 }
173 }
174
175 frame = socket.read_frame() => {
176 match frame {
177 Ok(frame) => match frame.opcode {
178 1 => {
179 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
180 if json.evt.as_deref() == Some("ERROR") {
181 eprintln!("Discord RPC error: {:?}", json.data);
182 }
183 }
184 }
185 2 => break,
186 3 => {
187 let packed = pack(3, frame.body.len() as u32);
188 socket.write(&packed).await?;
189 socket.write(&frame.body).await?;
190 }
191 _ => {}
192 },
193 Err(_) => break,
194 }
195 }
196 }
197 }
198
199 sleep(Duration::from_secs(backoff)).await;
200 backoff = (backoff * 2).min(4);
201 }
202
203 Ok(())
204 });
205
206 Ok(handle)
207 }
208
209 pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
212 if !self.running.load(Ordering::SeqCst) {
213 bail!("Call .run() before .set_activity() execution.");
214 }
215
216 self.tx
217 .send(IPCCommand::SetActivity {
218 details: details.to_string(),
219 state: state.to_string(),
220 })
221 .await?;
222 Ok(())
223 }
224
225 pub async fn close(&self) -> Result<()> {
227 self.tx.send(IPCCommand::Close).await?;
228 Ok(())
229 }
230}
231
232async fn send_activity(
233 socket: &mut DiscordIPCSocket,
234 details: &str,
235 state: &str,
236 timestamp: u64,
237) -> Result<()> {
238 let json = json!({
239 "cmd": "SET_ACTIVITY",
240 "args": {
241 "pid": std::process::id(),
242 "activity": {
243 "details": details,
244 "state": state,
245 "timestamps": { "start": timestamp }
246 }
247 },
248 "nonce": Uuid::new_v4().to_string()
249 })
250 .to_string();
251
252 let packed = pack(1, json.len() as u32);
253 socket.write(&packed).await?;
254 socket.write(json.as_bytes()).await?;
255 Ok(())
256}
257
258#[derive(Debug)]
260pub struct DiscordIPCSync {
261 inner: DiscordIPC,
262 rt: Runtime,
263 ipc_task: Option<JoinHandle<Result<()>>>,
264}
265
266impl DiscordIPCSync {
267 pub fn new(client_id: &str) -> Result<Self> {
268 let rt = Builder::new_multi_thread().enable_all().build()?;
269 let inner = rt.block_on(DiscordIPC::new(client_id))?;
270 Ok(Self {
271 inner,
272 rt,
273 ipc_task: None,
274 })
275 }
276
277 pub fn client_id(&self) -> String {
278 self.inner.client_id()
279 }
280
281 pub fn run(&mut self) -> Result<()> {
282 let handle = self.rt.block_on(self.inner.run())?;
283 self.ipc_task = Some(handle);
284 Ok(())
285 }
286
287 pub fn wait(&mut self) -> Result<()> {
288 if let Some(handle) = self.ipc_task.take() {
289 self.rt.block_on(handle)??;
290 }
291 Ok(())
292 }
293
294 pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
295 self.rt.block_on(self.inner.set_activity(details, state))
296 }
297
298 pub fn close(&self) -> Result<()> {
299 self.rt.block_on(self.inner.close())
300 }
301}
302
303impl Drop for DiscordIPCSync {
304 fn drop(&mut self) {
305 if let Some(handle) = self.ipc_task.take() {
306 let _ = self.rt.block_on(self.inner.close());
307 let _ = self.rt.block_on(handle);
308 }
309 }
310}