filthy_rich/runner.rs
1use std::time::Duration;
2use tokio::{sync::mpsc, task::JoinHandle, time::sleep};
3
4use anyhow::{Result, anyhow, bail};
5
6use crate::{
7 PresenceClient,
8 socket::DiscordSock,
9 types::{
10 Activity, ActivityResponseData, DynamicRPCFrame, IPCCommand, ReadyData, ReadyRPCFrame,
11 },
12 utils::get_current_timestamp,
13};
14
15const MULTIPLE_RUN_CALL_ERR: &str = "PresenceRunner::run() called more than once";
16
17/// A runner that manages the Discord RPC background task.
18/// Create a runner, configure it, run it to get a client handle, then clone the handle for sharing.
19pub struct PresenceRunner {
20 rx: Option<tokio::sync::mpsc::Receiver<IPCCommand>>,
21 client: PresenceClient,
22 join_handle: Option<JoinHandle<Result<()>>>,
23 on_ready: Option<Box<dyn Fn(ReadyData) + Send + Sync + 'static>>,
24 on_activity_send: Option<Box<dyn Fn(ActivityResponseData) + Send + Sync + 'static>>,
25 show_errors: bool,
26}
27
28impl PresenceRunner {
29 #[must_use]
30 /// Create a new [`PresenceRunner`] instance. Requires the client ID of your chosen app from the
31 /// [Discord Developer Portal](https://discord.com/developers/applications).
32 pub fn new(client_id: &str) -> Self {
33 let (tx, rx) = mpsc::channel(32);
34 let client = PresenceClient {
35 tx,
36 client_id: client_id.to_string(),
37 };
38
39 Self {
40 rx: Some(rx),
41 client,
42 join_handle: None,
43 on_ready: None,
44 on_activity_send: None,
45 show_errors: false,
46 }
47 }
48
49 /// Run a particular closure after receiving the READY event from the Discord RPC server.
50 ///
51 /// This event can fire multiple times depending on how many times the client needs to reconnect with Discord RPC.
52 pub fn on_ready<F: Fn(ReadyData) + Send + Sync + 'static>(mut self, f: F) -> Self {
53 self.on_ready = Some(Box::new(f));
54 self
55 }
56
57 /// Run a particular closure after ensuring that a [`PresenceClient::set_activity`] has successfully managed to
58 /// pass its data through the IPC channel.
59 ///
60 /// This event can fire multiple times based on how many activities you set.
61 pub fn on_activity_send<F: Fn(ActivityResponseData) + Send + Sync + 'static>(
62 mut self,
63 f: F,
64 ) -> Self {
65 self.on_activity_send = Some(Box::new(f));
66 self
67 }
68
69 /// Enable verbose error logging for RPC and code events.
70 #[must_use]
71 pub fn show_errors(mut self) -> Self {
72 self.show_errors = true;
73 self
74 }
75
76 /// Run the runner.
77 /// Must be called before any client handle operations.
78 pub async fn run(&mut self, wait_for_ready: bool) -> Result<&PresenceClient> {
79 if self.join_handle.is_some() {
80 bail!(MULTIPLE_RUN_CALL_ERR)
81 }
82
83 let client_id = self.client.client_id.clone();
84 let show_errors = self.show_errors;
85
86 let mut rx = self
87 .rx
88 .take()
89 .ok_or_else(|| anyhow!(MULTIPLE_RUN_CALL_ERR))?;
90
91 // oneshot channel to signal when READY is received the first time
92 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
93
94 // executable closers (executed within the loop)
95 let on_ready = self.on_ready.take();
96 let on_activity_send = self.on_activity_send.take();
97
98 let join_handle = tokio::spawn(async move {
99 let mut backoff = 1;
100 let mut last_activity: Option<Activity> = None;
101 let mut ready_tx = Some(ready_tx);
102
103 let mut session_start: Option<u64> = None;
104
105 'outer: loop {
106 // initial connect
107 let mut socket = match DiscordSock::new().await {
108 Ok(s) => s,
109 Err(_) => {
110 sleep(Duration::from_secs(backoff)).await;
111 continue;
112 }
113 };
114
115 // initial handshake
116 if socket.do_handshake(&client_id).await.is_err() {
117 sleep(Duration::from_secs(backoff)).await;
118 continue;
119 }
120
121 // ready loop
122 loop {
123 let frame = match socket.read_frame().await {
124 Ok(f) => f,
125 Err(_) => {
126 break;
127 }
128 };
129
130 if frame.opcode != 1 {
131 continue;
132 }
133
134 if let Ok(json) = serde_json::from_slice::<ReadyRPCFrame>(&frame.body) {
135 if json.cmd.as_deref() == Some("DISPATCH")
136 && json.evt.as_deref() == Some("READY")
137 {
138 if let Some(tx) = ready_tx.take() {
139 let _ = tx.send(());
140 }
141 if let Some(f) = &on_ready {
142 if let Some(data) = json.data {
143 f(data);
144 }
145 }
146 break;
147 }
148
149 if json.evt.as_deref() == Some("ERROR") && show_errors {
150 eprintln!("Discord RPC ready event receiver error: {:?}", json.data);
151 }
152 }
153 }
154
155 // restore last activity (if any)
156 if let Some(activity) = &last_activity {
157 if let Some(t) = session_start {
158 if let Err(e) = socket.send_activity(activity.clone(), t).await {
159 if show_errors {
160 eprintln!("Discord RPC last activity restore error: {e}")
161 }
162 }
163 }
164 }
165
166 backoff = 1;
167
168 // generic loop for receiving commands and responding to pings from Discord itself
169 loop {
170 tokio::select! {
171 biased;
172
173 cmd = rx.recv() => {
174 match cmd {
175 Some(cmd) => {
176 match cmd {
177 IPCCommand::SetActivity { activity } => {
178 let session_start_unpacked = if let Some(s) = session_start {
179 s
180 } else {
181 let t = get_current_timestamp()?;
182 session_start = Some(t);
183 t
184 };
185
186 let activity = *activity;
187 last_activity = Some(activity.clone());
188
189 if let Err(e) = socket.send_activity(activity, session_start_unpacked).await {
190 if show_errors {
191 eprintln!("Discord RPC send_activity error: {e}");
192 }
193 break;
194 }
195 },
196 IPCCommand::ClearActivity => {
197 last_activity = None;
198 session_start = None;
199
200 if let Err(e) = socket.clear_activity().await {
201 if show_errors {
202 eprintln!("Discord RPC clear_activity error: {e}");
203 }
204 break;
205 }
206 },
207 IPCCommand::Close { done }=> {
208 let _ = socket.close().await;
209 let _ = done.send(());
210 break 'outer;
211 }
212 }
213 },
214 None => break,
215 }
216 }
217
218 frame = socket.read_frame() => {
219 match frame {
220 Ok(frame) => {
221 match frame.opcode {
222 1 => {
223 if let Ok(json) = serde_json::from_slice::<DynamicRPCFrame>(&frame.body) {
224 if json.evt.as_deref() == Some("ERROR") && show_errors {
225 eprintln!("Discord RPC DynamicRPCFrame error: {:?}", json.data);
226 } else if json.cmd.as_deref() == Some("SET_ACTIVITY") {
227 if let Some(f) = &on_activity_send {
228 if let Some(data) = json.data {
229 let data: ActivityResponseData = serde_json::from_value(data)?;
230 f(data)
231 }
232 }
233 }
234 }
235 }
236 2 => break,
237 3 => {
238 if let Err(e) = socket.send_frame(3, frame.body).await {
239 if show_errors {
240 eprintln!("Discord RPC send_frame error: {e}");
241 }
242 break;
243 }
244 }
245 _ => {}
246 }
247 },
248 Err(e) => {
249 if show_errors {
250 eprintln!("Discord RPC generic frame read error: {e}")
251 }
252 break;
253 },
254 }
255 }
256 }
257 }
258
259 sleep(Duration::from_secs(backoff)).await;
260 backoff = (backoff * 2).min(4);
261 }
262
263 Ok(())
264 });
265
266 self.join_handle = Some(join_handle);
267
268 if wait_for_ready {
269 match ready_rx.await {
270 Ok(()) => (),
271 Err(_) => bail!("Background task exited before READY."),
272 }
273 }
274
275 Ok(&self.client)
276 }
277
278 /// Returns a clone of the client handle for sharing.
279 #[must_use]
280 pub fn clone_handle(&self) -> PresenceClient {
281 self.client.clone()
282 }
283
284 /// Waits for the IPC task to finish.
285 pub async fn wait(&mut self) -> Result<()> {
286 if let Some(handle) = self.join_handle.take() {
287 handle.await??;
288 }
289
290 Ok(())
291 }
292}