1use std::{sync::Arc, time::Duration};
2use tokio::{sync::mpsc, task::JoinHandle, time::sleep};
3
4use crate::{
5 PresenceClient,
6 errors::{DisconnectReason, DiscordSockError, PresenceRunnerError},
7 socket::{DiscordSock, Opcode},
8 types::{
9 ActivitySpec, DynamicRPCFrame, IPCCommand, ReadyRPCFrame,
10 data::{ActivityResponseData, ReadyData},
11 },
12 utils::get_current_timestamp,
13};
14
15type Callback<T> = Option<Arc<dyn Fn(T) + Send + Sync + 'static>>;
16
17macro_rules! impl_callback {
18 ($name:ident, $arg:ty, $doc:expr) => {
19 #[doc = $doc]
20 pub fn $name<F: Fn($arg) + Send + Sync + 'static>(mut self, f: F) -> Self {
21 self.$name = Some(Arc::new(f));
22 self
23 }
24 };
25}
26
27pub struct PresenceRunner {
30 rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
31 client: PresenceClient,
32 join_handle: Option<JoinHandle<()>>,
33 on_ready: Callback<ReadyData>,
34 on_activity_send: Callback<ActivityResponseData>,
35 on_disconnect: Callback<DisconnectReason>,
36 on_retry: Callback<usize>,
37 show_errors: bool,
38 max_retries: usize,
39}
40
41impl PresenceRunner {
42 #[must_use]
43 pub fn new(client_id: impl Into<String>) -> Self {
46 let (tx, rx) = mpsc::channel(32);
47 let client = PresenceClient {
48 tx,
49 client_id: client_id.into(),
50 };
51
52 Self {
53 rx: Some(rx),
54 client,
55 join_handle: None,
56 on_ready: None,
57 on_activity_send: None,
58 on_disconnect: None,
59 on_retry: None,
60 show_errors: false,
61 max_retries: 0,
62 }
63 }
64
65 impl_callback!(
66 on_ready,
67 ReadyData,
68 "Runs a particular closure after receiving a READY event.
69
70This can fire multiple times depending on how many times the client
71needs to disconnect and reconnect."
72 );
73
74 impl_callback!(
75 on_activity_send,
76 ActivityResponseData,
77 "Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to pass its data through the IPC channel.
78
79This can fire multiple times."
80 );
81
82 impl_callback!(
83 on_disconnect,
84 DisconnectReason,
85 "Runs a particular closure after the RPC connection is lost.
86
87Unlike `on_retry`, this fires only when a previously initialized connection drops, which means that you should prefer
88`on_retry` instead if you want more accurate responses to the initial connect or handshake failures, and `on_disconnect`
89for reacting to post-connection failures.
90
91This can fire multiple times depending on how many times the client disconnects and reconnects again."
92 );
93
94 impl_callback!(
95 on_retry,
96 usize,
97 "Runs a particular closure when retrying for socket creation or handshake.
98
99This can fire multiple times, or for a limited amount depending on the amount of maximum
100retries that has been set (through [`PresenceRunner::set_max_retries`]).
101
102The closure parameter is the count of retries done at the time of its execution."
103 );
104
105 #[must_use]
107 pub fn show_errors(mut self) -> Self {
108 self.show_errors = true;
109 self
110 }
111
112 #[must_use]
116 pub fn set_max_retries(mut self, count: usize) -> Self {
117 self.max_retries = count;
118 self
119 }
120
121 pub async fn run(
124 &mut self,
125 wait_for_ready: bool,
126 ) -> Result<&PresenceClient, PresenceRunnerError> {
127 if self.join_handle.is_some() {
128 return Err(PresenceRunnerError::MultipleRun);
129 }
130
131 let client_id = self.client.client_id.clone();
132 let show_errors = self.show_errors;
133 let max_retries = self.max_retries;
134
135 let mut rx = self
136 .rx
137 .take()
138 .ok_or_else(|| PresenceRunnerError::ReceiverError)?;
139
140 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
142
143 let on_ready = self.on_ready.take();
145 let on_activity_send = self.on_activity_send.take();
146 let on_disconnect = self.on_disconnect.take();
147 let on_retry = self.on_retry.take();
148
149 let join_handle = tokio::spawn(async move {
150 const RETRY_DELAY: Duration = Duration::from_secs(1);
151 let mut last_activity: Option<ActivitySpec> = None;
152 let mut ready_tx = Some(ready_tx);
153 let mut retries = 0;
154
155 let mut session_start: Option<u64> = None;
156
157 'outer: loop {
158 if max_retries != 0 && retries >= max_retries {
159 break;
160 }
161
162 let mut socket = match DiscordSock::new().await {
164 Ok(s) => s,
165 Err(_) => {
166 sleep(RETRY_DELAY).await;
167
168 retries += 1;
169 if let Some(f) = &on_retry {
170 let f = f.clone();
171 tokio::spawn(async move { f(retries) });
172 }
173
174 continue;
175 }
176 };
177
178 if socket.do_handshake(&client_id).await.is_err() {
180 sleep(RETRY_DELAY).await;
181
182 retries += 1;
183 if let Some(f) = &on_retry {
184 let f = f.clone();
185 tokio::spawn(async move { f(retries) });
186 }
187
188 continue;
189 }
190
191 loop {
193 let frame = match socket.read_frame().await {
194 Ok(f) => f,
195 Err(_) => {
196 sleep(RETRY_DELAY).await;
197 retries += 1;
198 if let Some(f) = &on_retry {
199 let f = f.clone();
200 tokio::spawn(async move { f(retries) });
201 }
202 continue 'outer;
203 }
204 };
205
206 if Opcode::Frame != frame.opcode {
207 continue;
208 }
209
210 if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
211 if json.cmd.as_deref() == Some("DISPATCH")
212 && json.evt.as_deref() == Some("READY")
213 {
214 if let Some(tx) = ready_tx.take() {
215 let _ = tx.send(());
216 }
217 if let Some(f) = &on_ready {
218 if let Some(data) = json.data {
219 let f = f.clone();
220 tokio::spawn(async move { f(data) });
221 }
222 }
223
224 retries = 0;
226 break;
227 }
228
229 if json.evt.as_deref() == Some("ERROR") && show_errors {
230 eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
231 }
232 }
233 }
234
235 if let Some(activity) = &last_activity {
237 if let Some(t) = session_start {
238 if let Err(e) = socket.send_activity(activity.clone(), t).await {
239 if show_errors {
240 eprintln!("Discord RPC last activity restore error: {e}")
241 }
242 }
243 }
244 }
245
246 let disconnect_reason = loop {
248 tokio::select! {
249 cmd = rx.recv() => {
250 match cmd {
251 Some(cmd) => {
252 match cmd {
253 IPCCommand::SetActivity { activity } => {
254 let session_start_unpacked = if let Some(s) = session_start {
255 s
256 } else {
257 match get_current_timestamp() {
258 Ok(t) => {
259 session_start = Some(t);
260 t
261 },
262 Err(e) => {
263 if show_errors {
264 eprintln!("Discord RPC pre-send_activity time parsing error: {e}")
265 }
266 break Some(DisconnectReason::OldRelicComputer(e.to_string()));
267 }
268 }
269 };
270
271 let activity = *activity;
272 last_activity = Some(activity.clone());
273
274 if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
275 if show_errors {
276 eprintln!("Discord RPC send_activity error: {e}");
277 }
278 break Some(DisconnectReason::SendActivityError(e.to_string()));
279 }
280 },
281 IPCCommand::ClearActivity => {
282 last_activity = None;
283 session_start = None;
284
285 if let Err(e) = socket.clear_activity().await {
286 if show_errors {
287 eprintln!("Discord RPC clear_activity error: {e}");
288 }
289 break Some(DisconnectReason::ClearActivityError(e.to_string()));
290 }
291 },
292 IPCCommand::Close { done_tx }=> {
293 let _ = socket.close().await;
294 let _ = done_tx.send(());
295 break 'outer;
296 }
297 }
298 },
299 None => break Some(DisconnectReason::ClientChannelClosed),
300 }
301 }
302
303 frame = socket.read_frame() => {
304 match frame {
305 Ok(frame) => {
306 match frame.opcode {
307 Opcode::Frame => {
308 if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
309 if json.evt.as_deref() == Some("ERROR") && show_errors {
310 eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
311 } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
312 if let Some(f) = &on_activity_send {
313 if let Some(data) = json.data {
314 let data = serde_json::from_value::<ActivityResponseData>(data);
315
316 if let Ok(d) = data {
317 let f = f.clone();
318 tokio::spawn(async move { f(d)});
319 } else if let Err(e) = data{
320 println!("{e}")
321 }
322 }
323 }
324 }
325 }
326 },
327 Opcode::Close => break Some(DisconnectReason::ServerClosed),
328 Opcode::Ping => {
329 if let Err(e) = socket.send_frame(Opcode::Pong, frame.body).await {
330 if show_errors {
331 eprintln!("Discord RPC send_frame error: {e}");
332 }
333 break Some(DisconnectReason::SendFrameError(e.to_string()));
334 }
335 },
336 _ => {}
337 }
338 },
339 Err(e) => {
340 if show_errors {
341 eprintln!("Discord RPC generic frame read error: {e}")
342 }
343 if let DiscordSockError::IoError(error) = &e {
344 if error.kind() == std::io::ErrorKind::UnexpectedEof {
345 break Some(DisconnectReason::PeerClosed);
346 }
347 }
348 break Some(DisconnectReason::ReadFrameError(e.to_string()));
349 },
350 }
351 }
352 }
353 };
354
355 if let Some(f) = &on_disconnect {
356 f(disconnect_reason.unwrap_or(DisconnectReason::Unknown));
357 }
358
359 sleep(RETRY_DELAY).await;
360 }
361 });
362
363 self.join_handle = Some(join_handle);
364
365 if wait_for_ready {
366 match ready_rx.await {
367 Ok(()) => (),
368 Err(_) => return Err(PresenceRunnerError::ExitBeforeReady),
369 }
370 }
371
372 Ok(&self.client)
373 }
374
375 #[must_use]
377 pub fn clone_handle(&self) -> PresenceClient {
378 self.client.clone()
379 }
380
381 pub async fn wait(&mut self) -> Result<(), PresenceRunnerError> {
386 if let Some(handle) = self.join_handle.take() {
387 handle.await?;
388 }
389
390 Ok(())
391 }
392}