1use std::{sync::Arc, time::Duration};
2use tokio::{
3 sync::mpsc::{self},
4 task::JoinHandle,
5 time::sleep,
6};
7
8use crate::{
9 PresenceClient,
10 errors::{DisconnectReason, DiscordSockError, PresenceRunnerError},
11 socket::{self, DiscordSock, Opcode},
12 types::{
13 ActivitySpec, DynamicRPCFrame, IPCCommand, ReadyRPCFrame,
14 data::{ActivityResponseData, ReadyData},
15 },
16 utils::get_current_timestamp,
17};
18
19type Callback<T> = Option<Arc<dyn Fn(T) + Send + Sync + 'static>>;
20
21pub type RPCRunner = PresenceRunner;
23
24macro_rules! impl_callback {
25 ($name:ident, $arg:ty, $doc:expr) => {
26 #[doc = $doc]
27 pub fn $name<F: Fn($arg) + Send + Sync + 'static>(mut self, f: F) -> Self {
28 self.$name = Some(Arc::new(f));
29 self
30 }
31 };
32}
33
34macro_rules! retry {
35 ($retries:ident, $on_retry:expr) => {
36 sleep(RETRY_DELAY).await;
37 $retries += 1;
38 if let Some(f) = &$on_retry {
39 let f = f.clone();
40 tokio::spawn(async move { f($retries) });
41 }
42 };
43}
44
45pub struct PresenceRunner {
48 rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
49 client: PresenceClient,
50 join_handle: Option<JoinHandle<()>>,
51 on_ready: Callback<ReadyData>,
52 on_activity_send: Callback<ActivityResponseData>,
53 on_disconnect: Callback<DisconnectReason>,
54 on_retry: Callback<usize>,
55 show_errors: bool,
56 max_retries: usize,
57}
58
59impl PresenceRunner {
60 #[must_use]
63 pub fn can_connect() -> bool {
64 socket::get_pipe_path().is_some()
65 }
66
67 #[must_use]
68 pub fn new(client_id: impl Into<String>) -> Self {
71 let (tx, rx) = mpsc::channel(32);
72 let client = PresenceClient {
73 tx,
74 client_id: client_id.into(),
75 };
76
77 Self {
78 rx: Some(rx),
79 client,
80 join_handle: None,
81 on_ready: None,
82 on_activity_send: None,
83 on_disconnect: None,
84 on_retry: None,
85 show_errors: false,
86 max_retries: 0,
87 }
88 }
89
90 impl_callback!(
91 on_ready,
92 ReadyData,
93 "Runs a particular closure after receiving a READY event.
94
95This can fire multiple times depending on how many times the client
96needs to disconnect and reconnect."
97 );
98
99 impl_callback!(
100 on_activity_send,
101 ActivityResponseData,
102 "Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to pass its data through the IPC channel.
103
104This can fire multiple times."
105 );
106
107 impl_callback!(
108 on_disconnect,
109 DisconnectReason,
110 "Runs a particular closure after the RPC connection is lost.
111
112Unlike `on_retry`, this fires only when a previously initialized connection drops, which means that you should prefer
113`on_retry` instead if you want more accurate responses to the initial connect or handshake failures, and `on_disconnect`
114for reacting to post-connection failures.
115
116This can fire multiple times depending on how many times the client disconnects and reconnects again."
117 );
118
119 impl_callback!(
120 on_retry,
121 usize,
122 "Runs a particular closure when retrying for socket creation or handshake.
123
124This can fire multiple times, or for a limited amount depending on the amount of maximum
125retries that has been set (through [`PresenceRunner::set_max_retries`]).
126
127The closure parameter is the count of retries done at the time of its execution."
128 );
129
130 #[must_use]
132 pub fn show_errors(mut self) -> Self {
133 self.show_errors = true;
134 self
135 }
136
137 #[must_use]
141 pub fn set_max_retries(mut self, count: usize) -> Self {
142 self.max_retries = count;
143 self
144 }
145
146 pub async fn run(
149 &mut self,
150 wait_for_ready: bool,
151 ) -> Result<&PresenceClient, PresenceRunnerError> {
152 if self.join_handle.is_some() {
153 return Err(PresenceRunnerError::MultipleRun);
154 }
155
156 let client_id = self.client.client_id.clone();
157 let show_errors = self.show_errors;
158 let max_retries = self.max_retries;
159
160 let mut rx = self
161 .rx
162 .take()
163 .ok_or_else(|| PresenceRunnerError::ReceiverError)?;
164
165 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
167
168 let on_ready = self.on_ready.take();
170 let on_activity_send = self.on_activity_send.take();
171 let on_disconnect = self.on_disconnect.take();
172 let on_retry = self.on_retry.take();
173
174 let join_handle = tokio::spawn(async move {
175 const RETRY_DELAY: Duration = Duration::from_secs(1);
176 let mut last_activity: Option<ActivitySpec> = None;
177 let mut ready_tx = Some(ready_tx);
178 let mut retries = 0;
179
180 let mut session_start: Option<u64> = None;
181
182 'outer: loop {
183 if max_retries != 0 && retries >= max_retries {
184 break;
185 }
186
187 let mut socket = match DiscordSock::new().await {
189 Ok(s) => s,
190 Err(_) => {
191 retry!(retries, on_retry);
192 continue;
193 }
194 };
195
196 if socket.do_handshake(&client_id).await.is_err() {
198 retry!(retries, on_retry);
199 continue;
200 }
201
202 loop {
204 let frame = match socket.read_frame().await {
205 Ok(f) => f,
206 Err(_) => {
207 retry!(retries, on_retry);
208 continue 'outer;
209 }
210 };
211
212 if Opcode::Frame != frame.opcode {
213 continue;
214 }
215
216 if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
217 if json.cmd.as_deref() == Some("DISPATCH")
218 && json.evt.as_deref() == Some("READY")
219 {
220 if let Some(tx) = ready_tx.take() {
221 let _ = tx.send(());
222 }
223 if let Some(f) = &on_ready {
224 if let Some(data) = json.data {
225 let f = f.clone();
226 tokio::spawn(async move { f(data) });
227 }
228 }
229
230 retries = 0;
232 break;
233 }
234
235 if json.evt.as_deref() == Some("ERROR") && show_errors {
236 eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
237 }
238 }
239 }
240
241 if let Some(activity) = &last_activity {
243 if let Some(t) = session_start {
244 if let Err(e) = socket.send_activity(activity.clone(), t).await {
245 if show_errors {
246 eprintln!("Discord RPC last activity restore error: {e}")
247 }
248 }
249 }
250 }
251
252 let disconnect_reason = loop {
254 tokio::select! {
255 cmd = rx.recv() => {
256 match cmd {
257 Some(cmd) => {
258 match cmd {
259 IPCCommand::SetActivity { activity } => {
260 let session_start_unpacked = if let Some(s) = session_start {
261 s
262 } else {
263 match get_current_timestamp() {
264 Ok(t) => {
265 session_start = Some(t);
266 t
267 },
268 Err(e) => {
269 if show_errors {
270 eprintln!("Discord RPC pre-send_activity time parsing error: {e}")
271 }
272 break Some(DisconnectReason::OldRelicComputer(e.to_string()));
273 }
274 }
275 };
276
277 let activity = *activity;
278 last_activity = Some(activity.clone());
279
280 if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
281 if show_errors {
282 eprintln!("Discord RPC send_activity error: {e}");
283 }
284 break Some(DisconnectReason::SendActivityError(e.to_string()));
285 }
286 },
287 IPCCommand::ClearActivity => {
288 last_activity = None;
289 session_start = None;
290
291 if let Err(e) = socket.clear_activity().await {
292 if show_errors {
293 eprintln!("Discord RPC clear_activity error: {e}");
294 }
295 break Some(DisconnectReason::ClearActivityError(e.to_string()));
296 }
297 },
298 IPCCommand::Close { done_tx }=> {
299 let _ = socket.close().await;
300 let _ = done_tx.send(());
301 break 'outer;
302 }
303 }
304 },
305 None => break Some(DisconnectReason::ClientChannelClosed),
306 }
307 }
308
309 frame = socket.read_frame() => {
310 match frame {
311 Ok(frame) => {
312 match frame.opcode {
313 Opcode::Frame => {
314 if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
315 if json.evt.as_deref() == Some("ERROR") && show_errors {
316 eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
317 } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
318 if let Some(f) = &on_activity_send {
319 if let Some(data) = json.data {
320 let data = serde_json::from_value::<ActivityResponseData>(data);
321
322 if let Ok(d) = data {
323 let f = f.clone();
324 tokio::spawn(async move { f(d)});
325 } else if let Err(e) = data{
326 println!("{e}")
327 }
328 }
329 }
330 }
331 }
332 },
333 Opcode::Close => break Some(DisconnectReason::ServerClosed),
334 Opcode::Ping => {
335 if let Err(e) = socket.send_frame(Opcode::Pong, frame.body).await {
336 if show_errors {
337 eprintln!("Discord RPC send_frame error: {e}");
338 }
339 break Some(DisconnectReason::SendFrameError(e.to_string()));
340 }
341 },
342 _ => {}
343 }
344 },
345 Err(e) => {
346 if show_errors {
347 eprintln!("Discord RPC generic frame read error: {e}")
348 }
349 if let DiscordSockError::IoError(error) = &e {
350 if error.kind() == std::io::ErrorKind::UnexpectedEof {
351 break Some(DisconnectReason::PeerClosed);
352 }
353 }
354 break Some(DisconnectReason::ReadFrameError(e.to_string()));
355 },
356 }
357 }
358 }
359 };
360
361 if let Some(f) = &on_disconnect {
362 f(disconnect_reason.unwrap_or(DisconnectReason::Unknown));
363 }
364
365 sleep(RETRY_DELAY).await;
366 }
367 });
368
369 self.join_handle = Some(join_handle);
370
371 if wait_for_ready {
372 match ready_rx.await {
373 Ok(()) => (),
374 Err(_) => return Err(PresenceRunnerError::ExitBeforeReady),
375 }
376 }
377
378 Ok(&self.client)
379 }
380
381 #[must_use]
383 pub fn clone_handle(&self) -> PresenceClient {
384 self.client.clone()
385 }
386
387 pub async fn wait(&mut self) -> Result<(), PresenceRunnerError> {
392 if let Some(handle) = self.join_handle.take() {
393 handle.await?;
394 }
395
396 Ok(())
397 }
398}