filthy_rich/runner.rs
1use std::time::Duration;
2use tokio::{sync::mpsc, task::JoinHandle, time::sleep};
3
4use crate::{
5 PresenceClient,
6 errors::{DisconnectReason, DiscordSockError, PresenceRunnerError},
7 socket::DiscordSock,
8 types::{
9 Activity, ActivityResponseData, DynamicRPCFrame, IPCCommand, ReadyData, ReadyRPCFrame,
10 },
11 utils::get_current_timestamp,
12};
13
14/// A runner that manages the Discord RPC background task.
15/// Create a runner, configure it, run it to get a client handle, then clone the handle for sharing.
16pub struct PresenceRunner {
17 rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
18 client: PresenceClient,
19 join_handle: Option<JoinHandle<()>>,
20 on_ready: Option<Box<dyn Fn(ReadyData) + Send + Sync + 'static>>,
21 on_activity_send: Option<Box<dyn Fn(ActivityResponseData) + Send + Sync + 'static>>,
22 on_disconnect: Option<Box<dyn Fn(DisconnectReason) + Send + Sync + 'static>>,
23 show_errors: bool,
24}
25
26impl PresenceRunner {
27 #[must_use]
28 /// Create a new [`PresenceRunner`] instance. Requires the client ID of your chosen app from the
29 /// [Discord Developer Portal](https://discord.com/developers/applications).
30 pub fn new(client_id: &str) -> Self {
31 let (tx, rx) = mpsc::channel(32);
32 let client = PresenceClient {
33 tx,
34 client_id: client_id.to_string(),
35 };
36
37 Self {
38 rx: Some(rx),
39 client,
40 join_handle: None,
41 on_ready: None,
42 on_activity_send: None,
43 on_disconnect: None,
44 show_errors: false,
45 }
46 }
47
48 /// Run a particular closure after receiving the READY event from the Discord RPC server.
49 ///
50 /// This event can fire multiple times depending on how many times the client needs to reconnect with Discord RPC.
51 pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
52 self.on_ready = Some(Box::new(f));
53 self
54 }
55
56 /// Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to
57 /// pass its data through the IPC channel.
58 ///
59 /// This event can fire multiple times based on how many activities you set.
60 pub fn on_activity_send<F: Fn(ActivityResponseData) + Send + Sync + 'static>(
61 mut self,
62 f: F,
63 ) -> Self {
64 self.on_activity_send = Some(Box::new(f));
65 self
66 }
67
68 /// Run a particular closure after the RPC connection is lost.
69 ///
70 /// This can fire multiple times if the client reconnects and disconnects again.
71 pub fn on_disconnect<F: Fn(DisconnectReason) + Send + Sync + 'static>(mut self, f: F) -> Self {
72 self.on_disconnect = Some(Box::new(f));
73 self
74 }
75
76 /// Enable verbose error logging for RPC and code events.
77 #[must_use]
78 pub fn show_errors(mut self) -> Self {
79 self.show_errors = true;
80 self
81 }
82
83 /// Run the runner.
84 /// Must be called before any client handle operations.
85 pub async fn run(
86 &mut self,
87 wait_for_ready: bool,
88 ) -> Result<&PresenceClient, PresenceRunnerError> {
89 if self.join_handle.is_some() {
90 return Err(PresenceRunnerError::MultipleRun);
91 }
92
93 let client_id = self.client.client_id.clone();
94 let show_errors = self.show_errors;
95
96 let mut rx = self
97 .rx
98 .take()
99 .ok_or_else(|| PresenceRunnerError::ReceiverError)?;
100
101 // oneshot channel to signal when READY is received the first time
102 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
103
104 // executable closers (executed within the loop)
105 let on_ready = self.on_ready.take();
106 let on_activity_send = self.on_activity_send.take();
107 let on_disconnect = self.on_disconnect.take();
108
109 let join_handle = tokio::spawn(async move {
110 let mut backoff = 1;
111 let mut last_activity: Option<Activity> = None;
112 let mut ready_tx = Some(ready_tx);
113 let mut connected = false;
114
115 let mut session_start: Option<u64> = None;
116
117 'outer: loop {
118 // initial connect
119 let mut socket = match DiscordSock::new().await {
120 Ok(s) => s,
121 Err(_) => {
122 sleep(Duration::from_secs(backoff)).await;
123 continue;
124 }
125 };
126
127 // initial handshake
128 if socket.do_handshake(&client_id).await.is_err() {
129 sleep(Duration::from_secs(backoff)).await;
130 continue;
131 }
132
133 // ready loop
134 loop {
135 let frame = match socket.read_frame().await {
136 Ok(f) => f,
137 Err(_) => {
138 break;
139 }
140 };
141
142 if frame.opcode != 1 {
143 continue;
144 }
145
146 if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
147 if json.cmd.as_deref() == Some("DISPATCH")
148 && json.evt.as_deref() == Some("READY")
149 {
150 if let Some(tx) = ready_tx.take() {
151 let _ = tx.send(());
152 }
153 if let Some(f) = &on_ready {
154 if let Some(data) = json.data {
155 f(data);
156 }
157 }
158 connected = true;
159 break;
160 }
161
162 if json.evt.as_deref() == Some("ERROR") && show_errors {
163 eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
164 }
165 }
166 }
167
168 // restore last activity (if any)
169 if let Some(activity) = &last_activity {
170 if let Some(t) = session_start {
171 if let Err(e) = socket.send_activity(activity.clone(), t).await {
172 if show_errors {
173 eprintln!("Discord RPC last activity restore error: {e}")
174 }
175 }
176 }
177 }
178
179 backoff = 1;
180
181 // generic loop for receiving commands and responding to pings from Discord itself
182 let disconnect_reason = loop {
183 tokio::select! {
184 biased;
185
186 cmd = rx.recv() => {
187 match cmd {
188 Some(cmd) => {
189 match cmd {
190 IPCCommand::SetActivity { activity } => {
191 let session_start_unpacked = if let Some(s) = session_start {
192 s
193 } else {
194 let t = get_current_timestamp().unwrap_or_default();
195 session_start = Some(t);
196 t
197 };
198
199 let activity = *activity;
200 last_activity = Some(activity.clone());
201
202 if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
203 if show_errors {
204 eprintln!("Discord RPC send_activity error: {e}");
205 }
206 break Some(DisconnectReason::SendActivityError(e.to_string()));
207 }
208 },
209 IPCCommand::ClearActivity => {
210 last_activity = None;
211 session_start = None;
212
213 if let Err(e) = socket.clear_activity().await {
214 if show_errors {
215 eprintln!("Discord RPC clear_activity error: {e}");
216 }
217 break Some(DisconnectReason::ClearActivityError(e.to_string()));
218 }
219 },
220 IPCCommand::Close { done }=> {
221 let _ = socket.close().await;
222 let _ = done.send(());
223 break 'outer;
224 }
225 }
226 },
227 None => break Some(DisconnectReason::ClientChannelClosed),
228 }
229 }
230
231 frame = socket.read_frame() => {
232 match frame {
233 Ok(frame) => {
234 match frame.opcode {
235 1 => {
236 if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
237 if json.evt.as_deref() == Some("ERROR") && show_errors {
238 eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
239 } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
240 if let Some(f) = &on_activity_send {
241 if let Some(data) = json.data {
242 let data = serde_json::from_value::<ActivityResponseData>(data);
243
244 if let Ok(d) = data {
245 f(d)
246 }
247 }
248 }
249 }
250 }
251 }
252 2 => break Some(DisconnectReason::ServerClosed),
253 3 => {
254 if let Err(e) = socket.send_frame(3, frame.body).await {
255 if show_errors {
256 eprintln!("Discord RPC send_frame error: {e}");
257 }
258 break Some(DisconnectReason::SendFrameError(e.to_string()));
259 }
260 }
261 _ => {}
262 }
263 },
264 Err(e) => {
265 if show_errors {
266 eprintln!("Discord RPC generic frame read error: {e}")
267 }
268 // if let Some(io_err) = e.downcast_ref::<std::io::Error>() {
269 // if io_err.kind() == std::io::ErrorKind::UnexpectedEof {
270 // break Some(DisconnectReason::PeerClosed);
271 // }
272 // }
273 if let DiscordSockError::IoError(error) = &e {
274 if error.kind() == std::io::ErrorKind::UnexpectedEof {
275 break Some(DisconnectReason::PeerClosed);
276 }
277 }
278 break Some(DisconnectReason::ReadFrameError(e.to_string()));
279 },
280 }
281 }
282 }
283 };
284
285 if connected {
286 if let Some(f) = &on_disconnect {
287 f(disconnect_reason.unwrap_or(DisconnectReason::Unknown));
288 }
289 connected = false;
290 }
291
292 sleep(Duration::from_secs(backoff)).await;
293 backoff = (backoff * 2).min(4);
294 }
295 });
296
297 self.join_handle = Some(join_handle);
298
299 if wait_for_ready {
300 match ready_rx.await {
301 Ok(()) => (),
302 Err(_) => return Err(PresenceRunnerError::ExitBeforeReady),
303 }
304 }
305
306 Ok(&self.client)
307 }
308
309 /// Returns a clone of the client handle for sharing.
310 #[must_use]
311 pub fn clone_handle(&self) -> PresenceClient {
312 self.client.clone()
313 }
314
315 /// Waits for the IPC task to finish.
316 ///
317 /// NOTE: If there's no `join_handle` present, the function will do nothing and
318 /// just return blank.
319 pub async fn wait(&mut self) -> Result<(), PresenceRunnerError> {
320 if let Some(handle) = self.join_handle.take() {
321 handle.await?;
322 }
323
324 Ok(())
325 }
326}