1use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10
11use anyhow::{Result, bail};
12use tokio::{
13 runtime::{Builder, Runtime},
14 sync::mpsc::{self, Sender},
15 task::JoinHandle,
16 time::sleep,
17};
18use uuid::Uuid;
19
20use crate::socket::DiscordIPCSocket;
21
22#[derive(Debug)]
24enum IPCCommand {
25 SetActivity { details: String, state: String },
26}
27
28#[derive(Debug, Clone)]
30pub struct DiscordIPC {
31 tx: Sender<IPCCommand>,
32 client_id: String,
33 start_timestamp: u64,
34 running: Arc<AtomicBool>,
35}
36
37fn get_current_timestamp() -> Result<u64> {
38 let ts = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
39 Ok(ts)
40}
41
42fn pack(opcode: u32, data_len: u32) -> Result<Vec<u8>> {
43 let mut bytes = Vec::new();
44
45 for byte_array in &[opcode.to_le_bytes(), data_len.to_le_bytes()] {
46 bytes.extend_from_slice(byte_array);
47 }
48
49 Ok(bytes)
50}
51
52impl DiscordIPC {
53 pub async fn new(client_id: &str) -> Result<Self> {
56 let (tx, _rx) = mpsc::channel(32);
57
58 Ok(Self {
59 tx,
60 client_id: client_id.to_string(),
61 start_timestamp: get_current_timestamp()?,
62 running: Arc::new(AtomicBool::new(false)),
63 })
64 }
65
66 pub async fn run(&mut self) -> Result<JoinHandle<Result<()>>> {
68 if self.running.swap(true, Ordering::SeqCst) {
69 bail!("Cannot run mulitple instances of .run() for DiscordIPC.")
70 }
71
72 let (tx, mut rx) = mpsc::channel::<IPCCommand>(32);
73 self.tx = tx;
74
75 let client_id = self.client_id.clone();
76 let timestamp = self.start_timestamp;
77
78 let handle = tokio::spawn(async move {
79 let mut backoff = 1;
80 let mut last_activity: Option<(String, String)> = None;
81
82 loop {
83 let mut socket = match DiscordIPCSocket::new().await {
85 Ok(s) => s,
86 Err(_) => {
87 sleep(Duration::from_secs(backoff)).await;
88 continue;
89 }
90 };
91
92 let handshake = format!(r#"{{"v":1,"client_id":"{}"}}"#, client_id);
94
95 let packed = pack(0, handshake.len() as u32)?;
96 if socket.write(&packed).await.is_err()
97 || socket.write(handshake.as_bytes()).await.is_err()
98 {
99 sleep(Duration::from_secs(backoff)).await;
100 continue;
101 }
102
103 loop {
105 let frame = match socket.read_frame().await {
106 Ok(f) => f,
107 Err(_) => break,
108 };
109
110 if frame.opcode == 1 && frame.body.windows(5).any(|w| w == b"READY") {
111 break;
112 }
113 }
114
115 if let Some((details, state)) = &last_activity {
117 let _ = send_activity(&mut socket, details, state, timestamp).await;
118 }
119
120 backoff = 1;
121
122 loop {
124 tokio::select! {
125 Some(cmd) = rx.recv() => {
126 match cmd {
127 IPCCommand::SetActivity { details, state } => {
128 last_activity = Some((details.clone(), state.clone()));
129
130 if send_activity(&mut socket, &details, &state, timestamp).await.is_err() {
131 break;
132 }
133 }
134 }
135 }
136
137 frame = socket.read_frame() => {
138 match frame {
139 Ok(frame) => {
140 match frame.opcode {
141 3 => {
143 let packed = pack(frame.opcode, frame.body.len() as u32)?;
144 socket.write(&packed).await?;
145 socket.write(&frame.body).await?;
146 }
147 2 => break,
149 _ => {}
150 }
151 }
152 Err(_) => break,
153 }
154 }
155 }
156 }
157
158 sleep(Duration::from_secs(backoff)).await;
159 backoff = (backoff * 2).min(4);
160 }
161 });
162
163 Ok(handle)
164 }
165
166 pub async fn set_activity(&self, details: &str, state: &str) -> Result<()> {
168 self.tx
169 .send(IPCCommand::SetActivity {
170 details: details.to_string(),
171 state: state.to_string(),
172 })
173 .await?;
174
175 Ok(())
176 }
177}
178
179async fn send_activity(
181 socket: &mut DiscordIPCSocket,
182 details: &str,
183 state: &str,
184 timestamp: u64,
185) -> Result<()> {
186 let pid = std::process::id();
187 let uuid = Uuid::new_v4();
188
189 let json = format!(
190 r#"{{
191 "cmd":"SET_ACTIVITY",
192 "args": {{
193 "pid": {},
194 "activity": {{
195 "details":"{}",
196 "state":"{}",
197 "timestamps": {{
198 "start": {}
199 }}
200 }}
201 }},
202 "nonce":"{}"
203}}"#,
204 pid, details, state, timestamp, uuid
205 );
206
207 let packed = pack(1, json.len() as u32)?;
208 socket.write(&packed).await?;
209 socket.write(json.as_bytes()).await?;
210
211 Ok(())
212}
213
214#[derive(Debug)]
216pub struct DiscordIPCSync {
217 inner: DiscordIPC,
218 rt: Runtime,
219 ipc_task: Option<JoinHandle<Result<()>>>,
220}
221
222impl DiscordIPCSync {
223 pub fn new(client_id: &str) -> Result<Self> {
224 let rt = Builder::new_multi_thread().enable_all().build()?;
225 let inner = rt.block_on(DiscordIPC::new(client_id))?;
226
227 Ok(Self {
228 inner,
229 rt,
230 ipc_task: None,
231 })
232 }
233
234 pub fn run(&mut self) -> Result<()> {
235 if self.ipc_task.is_some() {
236 bail!("run() called multiple times");
237 }
238
239 let handle = self.rt.block_on(self.inner.run())?;
240 self.ipc_task = Some(handle);
241 Ok(())
242 }
243
244 pub fn wait(&mut self) -> Result<()> {
245 if let Some(handle) = self.ipc_task.take() {
246 self.rt.block_on(handle)??;
247 }
248 Ok(())
249 }
250
251 pub fn set_activity(&self, details: &str, state: &str) -> Result<()> {
252 self.rt.block_on(self.inner.set_activity(details, state))
253 }
254}