1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Context, Result, bail};
12use serde::Deserialize;
13use serde_json::json;
14use tokio::{
15 runtime::{Builder, Runtime},
16 sync::mpsc::{self, Sender},
17 task::JoinHandle,
18 time::sleep,
19};
20use uuid::Uuid;
21
22use crate::socket::DiscordIPCSocket;
23
24fn get_current_timestamp() -> Result<u64> {
31 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
32}
33
34fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
35 let mut bytes = Vec::with_capacity(8);
36 bytes.extend_from_slice(&opcode.to_le_bytes());
37 bytes.extend_from_slice(&data_len.to_le_bytes());
38 bytes
39}
40
41#[derive(Debug)]
48enum IPCCommand {
49 SetActivity { details: String, state: String },
50}
51
52#[derive(Debug, Deserialize)]
53struct RpcFrame {
54 cmd: Option<String>,
55 evt: Option<String>,
56 data: Option<serde_json::Value>,
57}
58
59#[derive(Debug, Clone)]
61pub struct DiscordIPC {
62 tx: Sender<IPCCommand>,
63 client_id: String,
64 start_timestamp: u64,
65 running: Arc<AtomicBool>,
66}
67
68impl DiscordIPC {
69 pub async fn new(client_id: &str) -> Result<Self> {
70 let (tx, _rx) = mpsc::channel(32);
71
72 Ok(Self {
73 tx,
74 client_id: client_id.to_string(),
75 start_timestamp: get_current_timestamp()?,
76 running: Arc::new(AtomicBool::new(false)),
77 })
78 }
79
80 pub fn client_id(&self) -> String {
82 self.client_id.clone()
83 }
84
85 pub fn duration_since(&self) -> Result<Duration> {
87 let then = UNIX_EPOCH + Duration::from_secs(self.start_timestamp);
88 let now = SystemTime::now();
89
90 let duration_since = now
91 .duration_since(then)
92 .context("Time went backwards for the IPC client.")?;
93
94 Ok(duration_since)
95 }
96
97 pub async fn run(&mut self) -> Result<JoinHandle<Result<()>>> {
100 if self.running.swap(true, Ordering::SeqCst) {
101 bail!("Cannot run multiple instances of .run() for DiscordIPC.")
102 }
103
104 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
105 self.tx = tx;
106 let client_id = self.client_id.clone();
107 let timestamp = self.start_timestamp;
108
109 let handle = tokio::spawn(async move {
110 let mut backoff = 1;
111 let mut last_activity: Option<(String, String)> = None;
112
113 loop {
114 let mut socket = match DiscordIPCSocket::new().await {
116 Ok(s) => s,
117 Err(_) => {
118 sleep(Duration::from_secs(backoff)).await;
119 continue;
120 }
121 };
122
123 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
125 let packed = pack(0, handshake.len() as u32);
126
127 if socket.write(&packed).await.is_err()
128 || socket.write(handshake.as_bytes()).await.is_err()
129 {
130 sleep(Duration::from_secs(backoff)).await;
131 continue;
132 }
133
134 loop {
136 let frame = match socket.read_frame().await {
137 Ok(f) => f,
138 Err(_) => break,
139 };
140
141 if frame.opcode != 1 {
142 continue;
143 }
144
145 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
146 if json.cmd.as_deref() == Some("DISPATCH")
147 && json.evt.as_deref() == Some("READY")
148 {
149 break;
150 }
151 if json.evt.as_deref() == Some("ERROR") {
152 eprintln!("Discord RPC error: {:?}", json.data);
153 }
154 }
155 }
156
157 if let Some((details, state)) = &last_activity {
158 let _ = send_activity(&mut socket, details, state, timestamp).await;
159 }
160
161 backoff = 1;
162
163 loop {
164 tokio::select! {
165 Some(cmd) = rx.recv() => {
166 match cmd {
167 IPCCommand::SetActivity { details, state } => {
168 last_activity = Some((details.clone(), state.clone()));
169
170 if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
171 break;
172 }
173 }
174 }
175 }
176
177 frame = socket.read_frame() => {
178 match frame {
179 Ok(frame) => match frame.opcode {
180 1 => {
181 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
182 if json.evt.as_deref() == Some("ERROR") {
183 eprintln!("Discord RPC error: {:?}", json.data);
184 }
185 }
186 }
187 2 => break,
188 3 => {
189 let packed = pack(3, frame.body.len() as u32);
190 socket.write(&packed).await?;
191 socket.write(&frame.body).await?;
192 }
193 _ => {}
194 },
195 Err(_) => break,
196 }
197 }
198 }
199 }
200
201 sleep(Duration::from_secs(backoff)).await;
202 backoff = (backoff * 2).min(4);
203 }
204 });
205
206 Ok(handle)
207 }
208
209 pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
212 self.tx
213 .send(IPCCommand::SetActivity {
214 details: details.to_string(),
215 state: state.to_string(),
216 })
217 .await?;
218 Ok(())
219 }
220}
221
222async fn send_activity(
223 socket: &mut DiscordIPCSocket,
224 details: &str,
225 state: &str,
226 timestamp: u64,
227) -> Result<()> {
228 let json = json!({
229 "cmd": "SET_ACTIVITY",
230 "args": {
231 "pid": std::process::id(),
232 "activity": {
233 "details": details,
234 "state": state,
235 "timestamps": { "start": timestamp }
236 }
237 },
238 "nonce": Uuid::new_v4().to_string()
239 })
240 .to_string();
241
242 let packed = pack(1, json.len() as u32);
243 socket.write(&packed).await?;
244 socket.write(json.as_bytes()).await?;
245 Ok(())
246}
247
248#[derive(Debug)]
250pub struct DiscordIPCSync {
251 inner: DiscordIPC,
252 rt: Runtime,
253 ipc_task: Option<JoinHandle<Result<()>>>,
254}
255
256impl DiscordIPCSync {
257 pub fn new(client_id: &str) -> Result<Self> {
258 let rt = Builder::new_multi_thread().enable_all().build()?;
259 let inner = rt.block_on(DiscordIPC::new(client_id))?;
260 Ok(Self {
261 inner,
262 rt,
263 ipc_task: None,
264 })
265 }
266
267 pub fn client_id(&self) -> String {
268 self.inner.client_id()
269 }
270
271 pub fn duration_since(&self) -> Result<Duration> {
272 self.inner.duration_since()
273 }
274
275 pub fn run(&mut self) -> Result<()> {
276 let handle = self.rt.block_on(self.inner.run())?;
277 self.ipc_task = Some(handle);
278 Ok(())
279 }
280
281 pub fn wait(&mut self) -> Result<()> {
282 if let Some(handle) = self.ipc_task.take() {
283 self.rt.block_on(handle)??;
284 }
285 Ok(())
286 }
287
288 pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
289 self.rt.block_on(self.inner.set_activity(details, state))
290 }
291}