1use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use anyhow::{Result, bail};
6use serde::Deserialize;
7use serde_json::json;
8use tokio::{
9 runtime::{Builder, Runtime},
10 sync::mpsc::{self, Sender},
11 task::JoinHandle,
12 time::sleep,
13};
14use uuid::Uuid;
15
16use crate::socket::DiscordIPCSocket;
17
18fn get_current_timestamp() -> Result<u64> {
25 Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
26}
27
28fn pack(opcode: u32, data_len: u32) -> Vec<u8> {
29 let mut bytes = Vec::with_capacity(8);
30 bytes.extend_from_slice(&opcode.to_le_bytes());
31 bytes.extend_from_slice(&data_len.to_le_bytes());
32 bytes
33}
34
35#[derive(Debug)]
42enum IPCCommand {
43 SetActivity { details: String, state: String },
44}
45
46#[derive(Debug, Deserialize)]
47struct RpcFrame {
48 cmd: Option<String>,
49 evt: Option<String>,
50 data: Option<serde_json::Value>,
51}
52
53#[derive(Debug)]
55pub struct DiscordIPC {
56 tx: Sender<IPCCommand>,
57 client_id: String,
58 start_timestamp: u64,
59 handle: Option<JoinHandle<Result<()>>>,
60}
61
62impl DiscordIPC {
63 pub async fn new(client_id: &str) -> Result<Self> {
64 let (tx, _rx) = mpsc::channel(32);
65
66 Ok(Self {
67 tx,
68 client_id: client_id.to_string(),
69 start_timestamp: get_current_timestamp()?,
70 handle: None,
71 })
72 }
73
74 pub fn client_id(&self) -> String {
76 self.client_id.clone()
77 }
78
79 pub async fn run(&mut self) -> Result<()> {
82 if self.handle.is_some() {
83 bail!("Cannot run multiple instances of .run() for DiscordIPC.")
84 }
85
86 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
87 self.tx = tx;
88 let client_id = self.client_id.clone();
89 let timestamp = self.start_timestamp;
90
91 let handle = tokio::spawn(async move {
92 let mut backoff = 1;
93 let mut last_activity: Option<(String, String)> = None;
94
95 loop {
96 let mut socket = match DiscordIPCSocket::new().await {
98 Ok(s) => s,
99 Err(_) => {
100 sleep(Duration::from_secs(backoff)).await;
101 continue;
102 }
103 };
104
105 let handshake = json!({ "v": 1, "client_id": client_id }).to_string();
107 let packed = pack(0, handshake.len() as u32);
108
109 if socket.write(&packed).await.is_err()
110 || socket.write(handshake.as_bytes()).await.is_err()
111 {
112 sleep(Duration::from_secs(backoff)).await;
113 continue;
114 }
115
116 loop {
118 let frame = match socket.read_frame().await {
119 Ok(f) => f,
120 Err(_) => break,
121 };
122
123 if frame.opcode != 1 {
124 continue;
125 }
126
127 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
128 if json.cmd.as_deref() == Some("DISPATCH")
129 && json.evt.as_deref() == Some("READY")
130 {
131 break;
132 }
133 if json.evt.as_deref() == Some("ERROR") {
134 eprintln!("Discord RPC error: {:?}", json.data);
135 }
136 }
137 }
138
139 if let Some((details, state)) = &last_activity {
140 let _ = send_activity(&mut socket, details, state, timestamp).await;
141 }
142
143 backoff = 1;
144
145 loop {
146 tokio::select! {
147 Some(cmd) = rx.recv() => {
148 match cmd {
149 IPCCommand::SetActivity { details, state } => {
150 last_activity = Some((details.clone(), state.clone()));
151
152 if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
153 break;
154 }
155 }
156 }
157 }
158
159 frame = socket.read_frame() => {
160 match frame {
161 Ok(frame) => match frame.opcode {
162 1 => {
163 if let Ok(json) = serde_json::from_slice::<RpcFrame>(&frame.body) {
164 if json.evt.as_deref() == Some("ERROR") {
165 eprintln!("Discord RPC error: {:?}", json.data);
166 }
167 }
168 }
169 2 => break,
170 3 => {
171 let packed = pack(3, frame.body.len() as u32);
172 socket.write(&packed).await?;
173 socket.write(&frame.body).await?;
174 }
175 _ => {}
176 },
177 Err(_) => break,
178 }
179 }
180 }
181 }
182
183 sleep(Duration::from_secs(backoff)).await;
184 backoff = (backoff * 2).min(4);
185 }
186 });
187
188 self.handle = Some(handle);
189 Ok(())
190 }
191
192 pub async fn wait(&mut self) -> Result<()> {
194 if let Some(handle) = self.handle.take() {
195 handle.await??;
196 }
197 Ok(())
198 }
199
200 pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
203 self.tx
204 .send(IPCCommand::SetActivity {
205 details: details.to_string(),
206 state: state.to_string(),
207 })
208 .await?;
209 Ok(())
210 }
211}
212
213async fn send_activity(
214 socket: &mut DiscordIPCSocket,
215 details: &str,
216 state: &str,
217 timestamp: u64,
218) -> Result<()> {
219 let json = json!({
220 "cmd": "SET_ACTIVITY",
221 "args": {
222 "pid": std::process::id(),
223 "activity": {
224 "details": details,
225 "state": state,
226 "timestamps": { "start": timestamp }
227 }
228 },
229 "nonce": Uuid::new_v4().to_string()
230 })
231 .to_string();
232
233 let packed = pack(1, json.len() as u32);
234 socket.write(&packed).await?;
235 socket.write(json.as_bytes()).await?;
236 Ok(())
237}
238
239#[derive(Debug)]
241pub struct DiscordIPCSync {
242 inner: DiscordIPC,
243 rt: Runtime,
244}
245
246impl DiscordIPCSync {
247 pub fn new(client_id: &str) -> Result<Self> {
248 let rt = Builder::new_multi_thread().enable_all().build()?;
249 let inner = rt.block_on(DiscordIPC::new(client_id))?;
250 Ok(Self { inner, rt })
251 }
252
253 pub fn client_id(&self) -> String {
254 self.inner.client_id()
255 }
256
257 pub fn run(&mut self) -> Result<()> {
258 self.rt.block_on(self.inner.run())?;
259 Ok(())
260 }
261
262 pub fn wait(&mut self) -> Result<()> {
263 self.rt.block_on(async { self.inner.wait().await })?;
264 Ok(())
265 }
266
267 pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
268 self.rt.block_on(self.inner.set_activity(details, state))
269 }
270}