discord_presence/
client.rs

1use std::{
2    sync::{atomic::Ordering, Arc},
3    thread::{JoinHandle, Thread},
4    time::Duration,
5};
6
7use crate::{
8    connection::Manager as ConnectionManager,
9    event_handler::{Context as EventContext, EventCallbackHandle, HandlerRegistry},
10    models::{
11        commands::{Subscription, SubscriptionArgs},
12        message::Message,
13        payload::Payload,
14        rich_presence::{
15            Activity, CloseActivityRequestArgs, SendActivityJoinInviteArgs, SetActivityArgs,
16        },
17        Command, Event, OpCode,
18    },
19    DiscordError, Result,
20};
21use crossbeam_channel::Sender;
22use serde::{de::DeserializeOwned, Serialize};
23use serde_json::Value;
24
25macro_rules! event_handler_function {
26    ( $( $name:ident, $event:expr ),* ) => {
27        event_handler_function!{@gen $([ $name, $event])*}
28    };
29
30    (@gen $( [ $name:ident, $event:expr ] ), *) => {
31        $(
32            #[doc = concat!("Listens for the `", stringify!($event), "` event")]
33            pub fn $name<F>(&self, handler: F) -> EventCallbackHandle
34                where F: Fn(EventContext) + 'static + Send + Sync
35            {
36                self.on_event($event, handler)
37            }
38        )*
39    }
40}
41
42/// Wrapper around the [`JoinHandle`] returned by [`Client::start`]
43#[allow(clippy::module_name_repetitions)]
44pub struct ClientThread(JoinHandle<()>, Sender<()>);
45
46impl ClientThread {
47    // Ignore missing error docs because it's an alias of `join`
48    #[allow(clippy::missing_errors_doc)]
49    /// Alias of [`JoinHandle::join()`]
50    pub fn join(self) -> std::thread::Result<()> {
51        self.0.join()
52    }
53
54    // Ignore missing error docs because it's an alias of `is_finished`
55    #[allow(clippy::missing_errors_doc)]
56    #[must_use]
57    /// Alias of [`JoinHandle::is_finished`]
58    pub fn is_finished(&self) -> bool {
59        self.0.is_finished()
60    }
61    // Ignore missing error docs because it's an alias of `thread`
62    #[allow(clippy::missing_errors_doc)]
63    #[must_use]
64    /// Alias of [`JoinHandle::thread`]
65    pub fn thread(&self) -> &Thread {
66        self.0.thread()
67    }
68
69    /// Attempt to stop the client's send and receive loop
70    ///
71    /// # Errors
72    /// - Failed to send stop message (maybe it has already stopped?)
73    /// - The event loop had its own error
74    pub fn stop(self) -> Result<()> {
75        // Attempt to send the message to stop the thread
76        self.1.send(())?;
77
78        self.join().map_err(|_| DiscordError::EventLoopError)?;
79
80        Ok(())
81    }
82
83    /// "Forgets" client thread, removing the variable, but keeping the client running indefinitely.
84    pub fn persist(self) {
85        std::mem::forget(self);
86    }
87}
88
89#[derive(Clone)]
90/// The Discord client
91pub struct Client {
92    connection_manager: ConnectionManager,
93    event_handler_registry: Arc<HandlerRegistry>,
94    thread: Option<Arc<ClientThread>>,
95}
96
97impl Client {
98    /// Creates a new `Client` with default error sleep duration of 5 seconds, and no limit on connection attempts
99    #[must_use]
100    pub fn new(client_id: u64) -> Self {
101        Self::with_error_config(client_id, Duration::from_secs(5), None)
102    }
103
104    /// Creates a new `Client` with a custom error sleep duration, and number of attempts
105    #[must_use]
106    pub fn with_error_config(
107        client_id: u64,
108        sleep_duration: Duration,
109        attempts: Option<usize>,
110    ) -> Self {
111        let event_handler_registry = Arc::new(HandlerRegistry::new());
112
113        let connection_manager = ConnectionManager::new(
114            client_id,
115            event_handler_registry.clone(),
116            sleep_duration,
117            attempts,
118        );
119
120        Self {
121            connection_manager,
122            event_handler_registry,
123            thread: None,
124        }
125    }
126
127    // TODO: Add examples
128    /// Start the connection manager
129    ///
130    /// Only join the thread if there is no other task keeping the program alive.
131    ///
132    /// This must be called before all and any actions such as `set_activity`
133    pub fn start(&mut self) {
134        // Shutdown notify channel
135        let (tx, rx) = crossbeam_channel::bounded::<()>(1);
136
137        let thread = self.connection_manager.start(rx);
138
139        self.thread = Some(Arc::new(ClientThread(thread, tx)));
140    }
141
142    /// Shutdown the client and its thread
143    ///
144    /// # Errors
145    /// - The internal connection thread ran into an error
146    /// - The client was not started, or has already been shutdown
147    pub fn shutdown(self) -> Result<()> {
148        if let Some(thread) = self.thread.as_ref() {
149            thread.1.send(())?;
150
151            crate::READY.store(false, Ordering::Relaxed);
152
153            self.block_on()
154        } else {
155            Err(DiscordError::NotStarted)
156        }
157    }
158
159    /// Block indefinitely until the client shuts down
160    ///
161    /// This is nearly the same as [`Client::shutdown()`],
162    /// except that it does not attempt to stop the internal thread,
163    /// and rather waits for it to finish, which could never happen.
164    ///
165    /// # Errors
166    /// - The internal connection thread ran into an error
167    /// - The client was not started, or has already been shutdown
168    pub fn block_on(mut self) -> Result<()> {
169        let thread = self.unwrap_thread()?;
170
171        // If into_inner succeeds, await the thread completing.
172        // Otherwise, the thread will be dropped and shut down anyway
173        thread.join().map_err(|_| DiscordError::ThreadError)?;
174
175        Ok(())
176    }
177
178    fn unwrap_thread(&mut self) -> Result<ClientThread> {
179        if let Some(thread) = self.thread.take() {
180            let thread = Arc::try_unwrap(thread).map_err(|_| DiscordError::ThreadInUse)?;
181
182            Ok(thread)
183        } else {
184            Err(DiscordError::NotStarted)
185        }
186    }
187
188    #[must_use]
189    /// Check if the client is ready
190    pub fn is_ready() -> bool {
191        crate::READY.load(Ordering::Relaxed)
192    }
193
194    fn execute<A, E>(&mut self, cmd: Command, args: A, evt: Option<Event>) -> Result<Payload<E>>
195    where
196        A: Serialize + Send + Sync,
197        E: Serialize + DeserializeOwned + Send + Sync,
198    {
199        if !crate::READY.load(Ordering::Relaxed) {
200            return Err(DiscordError::NotStarted);
201        }
202
203        trace!("Executing command: {cmd:?}");
204
205        let message = Message::new(
206            OpCode::Frame,
207            Payload::with_nonce(cmd, Some(args), None, evt),
208        );
209        self.connection_manager.send(message?)?;
210        let Message { payload, .. } = self.connection_manager.recv()?;
211        trace!("Received response payload: {payload}");
212        let response: Payload<E> = serde_json::from_str(&payload)?;
213        trace!("Parsed response payload.");
214
215        match response.evt {
216            Some(Event::Error) => Err(DiscordError::SubscriptionFailed),
217            _ => Ok(response),
218        }
219    }
220
221    /// Set the users current activity immediately
222    ///
223    /// # Errors
224    /// - See [`DiscordError`] for more info
225    pub fn set_activity<F>(&mut self, f: F) -> Result<Payload<Activity>>
226    where
227        F: FnOnce(Activity) -> Activity,
228    {
229        let args = SetActivityArgs::new(f);
230        self.update_activity(args)
231    }
232
233    /// Clear the users current activity
234    ///
235    /// # Errors
236    /// - See [`DiscordError`] for more info
237    pub fn clear_activity(&mut self) -> Result<Payload<Activity>> {
238        self.update_activity(SetActivityArgs::default())
239    }
240
241    /// Update the current user's activity immediately
242    fn update_activity(&mut self, args: SetActivityArgs) -> Result<Payload<Activity>> {
243        let result = self.execute(Command::SetActivity, args, None);
244
245        // if the activity update was successful, mark it as sent in the rate limiter
246        if result.is_ok() {
247            self.connection_manager.rate_limiter.mark_sent();
248        }
249
250        result
251    }
252
253    /// Queue an activity update to be sent at the next rate limit interval.
254    ///
255    /// Unlike [`Client::set_activity`], this doesn't send the activity immediately.
256    /// Instead, it queues the update to be sent when the rate limiter allows.
257    /// If multiple activities are queued, only the most recent one will be sent.
258    pub fn queue_activity<F>(&mut self, f: F)
259    where
260        F: FnOnce(Activity) -> Activity,
261    {
262        self.connection_manager
263            .rate_limiter
264            .queue(SetActivityArgs::new(f));
265    }
266
267    // NOTE: Not sure what the actual response values of
268    //       SEND_ACTIVITY_JOIN_INVITE and CLOSE_ACTIVITY_REQUEST are,
269    //       they are not documented.
270    /// Send an invite to a user to join a game
271    ///
272    /// # Errors
273    /// - See [`DiscordError`] for more info
274    pub fn send_activity_join_invite(&mut self, user_id: u64) -> Result<Payload<Value>> {
275        self.execute(
276            Command::SendActivityJoinInvite,
277            SendActivityJoinInviteArgs::new(user_id),
278            None,
279        )
280    }
281
282    /// Close request to join a game
283    ///
284    /// # Errors
285    /// - See [`DiscordError`] for more info
286    pub fn close_activity_request(&mut self, user_id: u64) -> Result<Payload<Value>> {
287        self.execute(
288            Command::CloseActivityRequest,
289            CloseActivityRequestArgs::new(user_id),
290            None,
291        )
292    }
293
294    /// Subscribe to a given event
295    ///
296    /// # Errors
297    /// - See [`DiscordError`] for more info
298    pub fn subscribe<F>(&mut self, evt: Event, f: F) -> Result<Payload<Subscription>>
299    where
300        F: FnOnce(SubscriptionArgs) -> SubscriptionArgs,
301    {
302        self.execute(Command::Subscribe, f(SubscriptionArgs::new()), Some(evt))
303    }
304
305    /// Unsubscribe from a given event
306    ///
307    /// # Errors
308    /// - See [`DiscordError`] for more info
309    pub fn unsubscribe<F>(&mut self, evt: Event, f: F) -> Result<Payload<Subscription>>
310    where
311        F: FnOnce(SubscriptionArgs) -> SubscriptionArgs,
312    {
313        self.execute(Command::Unsubscribe, f(SubscriptionArgs::new()), Some(evt))
314    }
315
316    /// Listens for a given event, and returns a handle that unregisters the listener when it is dropped.
317    ///
318    /// # Examples
319    ///
320    /// ```no_run
321    /// # use std::{thread::sleep, time::Duration};
322    /// # use discord_presence::Client;
323    /// let mut drpc = Client::new(1003450375732482138);
324    /// let _ready = drpc.on_ready(|_ctx| {
325    ///     println!("READY!");
326    /// });
327    ///
328    /// drpc.start();
329    ///
330    /// {
331    ///     let _ready_first_3_seconds = drpc.on_ready(|_ctx| {
332    ///         println!("READY, IN THE FIRST 3 SECONDS!");
333    ///     });
334    ///     sleep(Duration::from_secs(3));
335    /// }
336    ///
337    /// // You can also manually remove the handler
338    ///
339    /// let never_ready = drpc.on_ready(|_ctx| {
340    ///     println!("I will never be ready!");
341    /// });
342    /// never_ready.remove();
343    ///
344    /// // Or via [`std::mem::drop`]
345    /// let never_ready = drpc.on_ready(|_ctx| {
346    ///     println!("I will never be ready!");
347    /// });
348    /// drop(never_ready);
349    ///
350    /// drpc.block_on().unwrap();
351    /// ```
352    ///
353    /// You can use `.persist` or [`std::mem::forget`] to disable the automatic unregister-on-drop:
354    ///
355    /// ```no_run
356    /// # use discord_presence::Client;
357    /// # let mut drpc = Client::new(1003450375732482138);
358    ///
359    /// {
360    ///     let ready = drpc.on_ready(|_ctx| {
361    ///         println!("READY!");
362    ///     }).persist();
363    /// }
364    /// // Or
365    /// {
366    ///     let ready = drpc.on_ready(|_ctx| {
367    ///         println!("READY!");
368    ///     });
369    ///     std::mem::forget(ready);
370    /// }
371    /// // the event listener is still registered
372    ///
373    /// # drpc.start();
374    /// # drpc.block_on().unwrap();
375    /// ```
376    pub fn on_event<F>(&self, event: Event, handler: F) -> EventCallbackHandle
377    where
378        F: Fn(EventContext) + 'static + Send + Sync,
379    {
380        self.event_handler_registry.register(event, handler)
381    }
382
383    /// Block the current thread until the event is fired
384    ///
385    /// Returns the context the event was fired in
386    ///
387    /// NOTE: Please only use this for the ready event, or if you know what you are doing.
388    ///
389    /// # Errors
390    /// - Channel disconnected
391    ///
392    /// # Panics
393    /// - Panics if the channel is disconnected for whatever reason.
394    pub fn block_until_event(&mut self, event: Event) -> Result<crate::event_handler::Context> {
395        // TODO: Use bounded channel
396        let (tx, rx) = crossbeam_channel::unbounded::<crate::event_handler::Context>();
397
398        let handler = move |info| {
399            // dbg!("Blocked until at ", std::time::SystemTime::now());
400            if let Err(e) = tx.send(info) {
401                error!("{e}");
402            }
403        };
404
405        // `handler` is automatically unregistered once this variable drops
406        let cb_handle = self.on_event(event, handler);
407
408        let response = rx.recv()?;
409
410        drop(cb_handle);
411
412        Ok(response)
413    }
414
415    event_handler_function!(on_ready, Event::Ready);
416
417    event_handler_function!(on_error, Event::Error);
418
419    event_handler_function!(on_activity_join, Event::ActivityJoin);
420
421    event_handler_function!(on_activity_join_request, Event::ActivityJoinRequest);
422
423    event_handler_function!(on_activity_spectate, Event::ActivitySpectate);
424
425    event_handler_function!(on_connected, Event::Connected);
426
427    event_handler_function!(on_disconnected, Event::Disconnected);
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433
434    #[test]
435    // this function is a no-op and just explains what the code does
436    fn test_client_send_sync() {
437        #[allow(dead_code)]
438        trait SendSyncReq: Send + Sync {}
439
440        impl SendSyncReq for Client {}
441    }
442
443    #[test]
444    fn test_is_ready() {
445        assert!(!Client::is_ready());
446
447        crate::READY.store(true, Ordering::Relaxed);
448
449        assert!(Client::is_ready());
450    }
451}