1use std::{sync::Arc, time::Duration};
2use tokio::{sync::mpsc, task::JoinHandle, time::sleep};
3
4use crate::{
5 PresenceClient,
6 errors::{DisconnectReason, DiscordSockError, PresenceRunnerError},
7 socket::{DiscordSock, Opcode},
8 types::{
9 ActivitySpec, DynamicRPCFrame, IPCCommand, ReadyRPCFrame,
10 data::{ActivityResponseData, ReadyData},
11 },
12 utils::get_current_timestamp,
13};
14
15type Callback<T> = Option<Arc<dyn Fn(T) + Send + Sync + 'static>>;
16
17macro_rules! impl_callback {
18 ($name:ident, $arg:ty, $doc:expr) => {
19 #[doc = $doc]
20 pub fn $name<F: Fn($arg) + Send + Sync + 'static>(mut self, f: F) -> Self {
21 self.$name = Some(Arc::new(f));
22 self
23 }
24 };
25}
26
27macro_rules! retry {
28 ($retries:ident, $on_retry:expr) => {
29 sleep(RETRY_DELAY).await;
30 $retries += 1;
31 if let Some(f) = &$on_retry {
32 let f = f.clone();
33 tokio::spawn(async move { f($retries) });
34 }
35 };
36}
37
38pub struct PresenceRunner {
41 rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
42 client: PresenceClient,
43 join_handle: Option<JoinHandle<()>>,
44 on_ready: Callback<ReadyData>,
45 on_activity_send: Callback<ActivityResponseData>,
46 on_disconnect: Callback<DisconnectReason>,
47 on_retry: Callback<usize>,
48 show_errors: bool,
49 max_retries: usize,
50}
51
52impl PresenceRunner {
53 #[must_use]
54 pub fn new(client_id: impl Into<String>) -> Self {
57 let (tx, rx) = mpsc::channel(32);
58 let client = PresenceClient {
59 tx,
60 client_id: client_id.into(),
61 };
62
63 Self {
64 rx: Some(rx),
65 client,
66 join_handle: None,
67 on_ready: None,
68 on_activity_send: None,
69 on_disconnect: None,
70 on_retry: None,
71 show_errors: false,
72 max_retries: 0,
73 }
74 }
75
76 impl_callback!(
77 on_ready,
78 ReadyData,
79 "Runs a particular closure after receiving a READY event.
80
81This can fire multiple times depending on how many times the client
82needs to disconnect and reconnect."
83 );
84
85 impl_callback!(
86 on_activity_send,
87 ActivityResponseData,
88 "Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to pass its data through the IPC channel.
89
90This can fire multiple times."
91 );
92
93 impl_callback!(
94 on_disconnect,
95 DisconnectReason,
96 "Runs a particular closure after the RPC connection is lost.
97
98Unlike `on_retry`, this fires only when a previously initialized connection drops, which means that you should prefer
99`on_retry` instead if you want more accurate responses to the initial connect or handshake failures, and `on_disconnect`
100for reacting to post-connection failures.
101
102This can fire multiple times depending on how many times the client disconnects and reconnects again."
103 );
104
105 impl_callback!(
106 on_retry,
107 usize,
108 "Runs a particular closure when retrying for socket creation or handshake.
109
110This can fire multiple times, or for a limited amount depending on the amount of maximum
111retries that has been set (through [`PresenceRunner::set_max_retries`]).
112
113The closure parameter is the count of retries done at the time of its execution."
114 );
115
116 #[must_use]
118 pub fn show_errors(mut self) -> Self {
119 self.show_errors = true;
120 self
121 }
122
123 #[must_use]
127 pub fn set_max_retries(mut self, count: usize) -> Self {
128 self.max_retries = count;
129 self
130 }
131
132 pub async fn run(
135 &mut self,
136 wait_for_ready: bool,
137 ) -> Result<&PresenceClient, PresenceRunnerError> {
138 if self.join_handle.is_some() {
139 return Err(PresenceRunnerError::MultipleRun);
140 }
141
142 let client_id = self.client.client_id.clone();
143 let show_errors = self.show_errors;
144 let max_retries = self.max_retries;
145
146 let mut rx = self
147 .rx
148 .take()
149 .ok_or_else(|| PresenceRunnerError::ReceiverError)?;
150
151 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
153
154 let on_ready = self.on_ready.take();
156 let on_activity_send = self.on_activity_send.take();
157 let on_disconnect = self.on_disconnect.take();
158 let on_retry = self.on_retry.take();
159
160 let join_handle = tokio::spawn(async move {
161 const RETRY_DELAY: Duration = Duration::from_secs(1);
162 let mut last_activity: Option<ActivitySpec> = None;
163 let mut ready_tx = Some(ready_tx);
164 let mut retries = 0;
165
166 let mut session_start: Option<u64> = None;
167
168 'outer: loop {
169 if max_retries != 0 && retries >= max_retries {
170 break;
171 }
172
173 let mut socket = match DiscordSock::new().await {
175 Ok(s) => s,
176 Err(_) => {
177 retry!(retries, on_retry);
178 continue;
179 }
180 };
181
182 if socket.do_handshake(&client_id).await.is_err() {
184 retry!(retries, on_retry);
185 continue;
186 }
187
188 loop {
190 let frame = match socket.read_frame().await {
191 Ok(f) => f,
192 Err(_) => {
193 retry!(retries, on_retry);
194 continue 'outer;
195 }
196 };
197
198 if Opcode::Frame != frame.opcode {
199 continue;
200 }
201
202 if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
203 if json.cmd.as_deref() == Some("DISPATCH")
204 && json.evt.as_deref() == Some("READY")
205 {
206 if let Some(tx) = ready_tx.take() {
207 let _ = tx.send(());
208 }
209 if let Some(f) = &on_ready {
210 if let Some(data) = json.data {
211 let f = f.clone();
212 tokio::spawn(async move { f(data) });
213 }
214 }
215
216 retries = 0;
218 break;
219 }
220
221 if json.evt.as_deref() == Some("ERROR") && show_errors {
222 eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
223 }
224 }
225 }
226
227 if let Some(activity) = &last_activity {
229 if let Some(t) = session_start {
230 if let Err(e) = socket.send_activity(activity.clone(), t).await {
231 if show_errors {
232 eprintln!("Discord RPC last activity restore error: {e}")
233 }
234 }
235 }
236 }
237
238 let disconnect_reason = loop {
240 tokio::select! {
241 cmd = rx.recv() => {
242 match cmd {
243 Some(cmd) => {
244 match cmd {
245 IPCCommand::SetActivity { activity } => {
246 let session_start_unpacked = if let Some(s) = session_start {
247 s
248 } else {
249 match get_current_timestamp() {
250 Ok(t) => {
251 session_start = Some(t);
252 t
253 },
254 Err(e) => {
255 if show_errors {
256 eprintln!("Discord RPC pre-send_activity time parsing error: {e}")
257 }
258 break Some(DisconnectReason::OldRelicComputer(e.to_string()));
259 }
260 }
261 };
262
263 let activity = *activity;
264 last_activity = Some(activity.clone());
265
266 if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
267 if show_errors {
268 eprintln!("Discord RPC send_activity error: {e}");
269 }
270 break Some(DisconnectReason::SendActivityError(e.to_string()));
271 }
272 },
273 IPCCommand::ClearActivity => {
274 last_activity = None;
275 session_start = None;
276
277 if let Err(e) = socket.clear_activity().await {
278 if show_errors {
279 eprintln!("Discord RPC clear_activity error: {e}");
280 }
281 break Some(DisconnectReason::ClearActivityError(e.to_string()));
282 }
283 },
284 IPCCommand::Close { done_tx }=> {
285 let _ = socket.close().await;
286 let _ = done_tx.send(());
287 break 'outer;
288 }
289 }
290 },
291 None => break Some(DisconnectReason::ClientChannelClosed),
292 }
293 }
294
295 frame = socket.read_frame() => {
296 match frame {
297 Ok(frame) => {
298 match frame.opcode {
299 Opcode::Frame => {
300 if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
301 if json.evt.as_deref() == Some("ERROR") && show_errors {
302 eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
303 } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
304 if let Some(f) = &on_activity_send {
305 if let Some(data) = json.data {
306 let data = serde_json::from_value::<ActivityResponseData>(data);
307
308 if let Ok(d) = data {
309 let f = f.clone();
310 tokio::spawn(async move { f(d)});
311 } else if let Err(e) = data{
312 println!("{e}")
313 }
314 }
315 }
316 }
317 }
318 },
319 Opcode::Close => break Some(DisconnectReason::ServerClosed),
320 Opcode::Ping => {
321 if let Err(e) = socket.send_frame(Opcode::Pong, frame.body).await {
322 if show_errors {
323 eprintln!("Discord RPC send_frame error: {e}");
324 }
325 break Some(DisconnectReason::SendFrameError(e.to_string()));
326 }
327 },
328 _ => {}
329 }
330 },
331 Err(e) => {
332 if show_errors {
333 eprintln!("Discord RPC generic frame read error: {e}")
334 }
335 if let DiscordSockError::IoError(error) = &e {
336 if error.kind() == std::io::ErrorKind::UnexpectedEof {
337 break Some(DisconnectReason::PeerClosed);
338 }
339 }
340 break Some(DisconnectReason::ReadFrameError(e.to_string()));
341 },
342 }
343 }
344 }
345 };
346
347 if let Some(f) = &on_disconnect {
348 f(disconnect_reason.unwrap_or(DisconnectReason::Unknown));
349 }
350
351 sleep(RETRY_DELAY).await;
352 }
353 });
354
355 self.join_handle = Some(join_handle);
356
357 if wait_for_ready {
358 match ready_rx.await {
359 Ok(()) => (),
360 Err(_) => return Err(PresenceRunnerError::ExitBeforeReady),
361 }
362 }
363
364 Ok(&self.client)
365 }
366
367 #[must_use]
369 pub fn clone_handle(&self) -> PresenceClient {
370 self.client.clone()
371 }
372
373 pub async fn wait(&mut self) -> Result<(), PresenceRunnerError> {
378 if let Some(handle) = self.join_handle.take() {
379 handle.await?;
380 }
381
382 Ok(())
383 }
384}