discord_presence/
client.rs

1use std::{
2    sync::{atomic::Ordering, Arc},
3    thread::{JoinHandle, Thread},
4    time::Duration,
5};
6
7use crate::{
8    connection::Manager as ConnectionManager,
9    event_handler::{Context as EventContext, EventCallbackHandle, HandlerRegistry},
10    models::{
11        commands::{Subscription, SubscriptionArgs},
12        message::Message,
13        payload::Payload,
14        rich_presence::{
15            Activity, CloseActivityRequestArgs, SendActivityJoinInviteArgs, SetActivityArgs,
16        },
17        Command, Event, OpCode,
18    },
19    DiscordError, Result,
20};
21use crossbeam_channel::Sender;
22use serde::{de::DeserializeOwned, Serialize};
23use serde_json::Value;
24
25macro_rules! event_handler_function {
26    ( $( $name:ident, $event:expr ),* ) => {
27        event_handler_function!{@gen $([ $name, $event])*}
28    };
29
30    (@gen $( [ $name:ident, $event:expr ] ), *) => {
31        $(
32            #[doc = concat!("Listens for the `", stringify!($event), "` event")]
33            pub fn $name<F>(&self, handler: F) -> EventCallbackHandle
34                where F: Fn(EventContext) + 'static + Send + Sync
35            {
36                self.on_event($event, handler)
37            }
38        )*
39    }
40}
41
42/// Wrapper around the [`JoinHandle`] returned by [`Client::start`]
43#[allow(clippy::module_name_repetitions)]
44pub struct ClientThread(JoinHandle<()>, Sender<()>);
45
46impl ClientThread {
47    // Ignore missing error docs because it's an alias of `join`
48    #[allow(clippy::missing_errors_doc)]
49    /// Alias of [`JoinHandle::join()`]
50    pub fn join(self) -> std::thread::Result<()> {
51        self.0.join()
52    }
53
54    // Ignore missing error docs because it's an alias of `is_finished`
55    #[allow(clippy::missing_errors_doc)]
56    #[must_use]
57    /// Alias of [`JoinHandle::is_finished`]
58    pub fn is_finished(&self) -> bool {
59        self.0.is_finished()
60    }
61    // Ignore missing error docs because it's an alias of `thread`
62    #[allow(clippy::missing_errors_doc)]
63    #[must_use]
64    /// Alias of [`JoinHandle::thread`]
65    pub fn thread(&self) -> &Thread {
66        self.0.thread()
67    }
68
69    /// Attempt to stop the client's send and receive loop
70    ///
71    /// # Errors
72    /// - Failed to send stop message (maybe it has already stopped?)
73    /// - The event loop had its own error
74    pub fn stop(self) -> Result<()> {
75        // Attempt to send the message to stop the thread
76        self.1.send(())?;
77
78        self.join().map_err(|_| DiscordError::EventLoopError)?;
79
80        Ok(())
81    }
82
83    /// "Forgets" client thread, removing the variable, but keeping the client running indefinitely.
84    pub fn persist(self) {
85        std::mem::forget(self);
86    }
87}
88
89#[derive(Clone)]
90/// The Discord client
91pub struct Client {
92    connection_manager: ConnectionManager,
93    event_handler_registry: Arc<HandlerRegistry>,
94    thread: Option<Arc<ClientThread>>,
95}
96
97impl Client {
98    /// Creates a new `Client` with default error sleep duration of 5 seconds, and no limit on connection attempts
99    #[must_use]
100    pub fn new(client_id: u64) -> Self {
101        Self::with_error_config(client_id, Duration::from_secs(5), None)
102    }
103
104    /// Creates a new `Client` with a custom error sleep duration, and number of attempts
105    #[must_use]
106    pub fn with_error_config(
107        client_id: u64,
108        sleep_duration: Duration,
109        attempts: Option<usize>,
110    ) -> Self {
111        let event_handler_registry = Arc::new(HandlerRegistry::new());
112        let connection_manager = ConnectionManager::new(
113            client_id,
114            event_handler_registry.clone(),
115            sleep_duration,
116            attempts,
117        );
118
119        Self {
120            connection_manager,
121            event_handler_registry,
122            thread: None,
123        }
124    }
125
126    // TODO: Add examples
127    /// Start the connection manager
128    ///
129    /// Only join the thread if there is no other task keeping the program alive.
130    ///
131    /// This must be called before all and any actions such as `set_activity`
132    pub fn start(&mut self) {
133        // Shutdown notify channel
134        let (tx, rx) = crossbeam_channel::bounded::<()>(1);
135
136        let thread = self.connection_manager.start(rx);
137
138        self.thread = Some(Arc::new(ClientThread(thread, tx)));
139    }
140
141    /// Shutdown the client and its thread
142    ///
143    /// # Errors
144    /// - The internal connection thread ran into an error
145    /// - The client was not started, or has already been shutdown
146    pub fn shutdown(self) -> Result<()> {
147        if let Some(thread) = self.thread.as_ref() {
148            thread.1.send(())?;
149
150            crate::READY.store(false, Ordering::Relaxed);
151
152            self.block_on()
153        } else {
154            Err(DiscordError::NotStarted)
155        }
156    }
157
158    /// Block indefinitely until the client shuts down
159    ///
160    /// This is nearly the same as [`Client::shutdown()`],
161    /// except that it does not attempt to stop the internal thread,
162    /// and rather waits for it to finish, which could never happen.
163    ///
164    /// # Errors
165    /// - The internal connection thread ran into an error
166    /// - The client was not started, or has already been shutdown
167    pub fn block_on(mut self) -> Result<()> {
168        let thread = self.unwrap_thread()?;
169
170        // If into_inner succeeds, await the thread completing.
171        // Otherwise, the thread will be dropped and shut down anyway
172        thread.join().map_err(|_| DiscordError::ThreadError)?;
173
174        Ok(())
175    }
176
177    fn unwrap_thread(&mut self) -> Result<ClientThread> {
178        if let Some(thread) = self.thread.take() {
179            let thread = Arc::try_unwrap(thread).map_err(|_| DiscordError::ThreadInUse)?;
180
181            Ok(thread)
182        } else {
183            Err(DiscordError::NotStarted)
184        }
185    }
186
187    #[must_use]
188    /// Check if the client is ready
189    pub fn is_ready() -> bool {
190        crate::READY.load(Ordering::Relaxed)
191    }
192
193    fn execute<A, E>(&mut self, cmd: Command, args: A, evt: Option<Event>) -> Result<Payload<E>>
194    where
195        A: Serialize + Send + Sync,
196        E: Serialize + DeserializeOwned + Send + Sync,
197    {
198        if !crate::READY.load(Ordering::Relaxed) {
199            return Err(DiscordError::NotStarted);
200        }
201
202        trace!("Executing command: {cmd:?}");
203
204        let message = Message::new(
205            OpCode::Frame,
206            Payload::with_nonce(cmd, Some(args), None, evt),
207        );
208        self.connection_manager.send(message?)?;
209        let Message { payload, .. } = self.connection_manager.recv()?;
210        trace!("Received response payload: {payload}");
211        let response: Payload<E> = serde_json::from_str(&payload)?;
212        trace!("Parsed response payload.");
213
214        match response.evt {
215            Some(Event::Error) => Err(DiscordError::SubscriptionFailed),
216            _ => Ok(response),
217        }
218    }
219
220    /// Set the users current activity
221    ///
222    /// # Errors
223    /// - See [`DiscordError`] for more info
224    pub fn set_activity<F>(&mut self, f: F) -> Result<Payload<Activity>>
225    where
226        F: FnOnce(Activity) -> Activity,
227    {
228        self.execute(Command::SetActivity, SetActivityArgs::new(f), None)
229    }
230
231    /// Clear the users current activity
232    ///
233    /// # Errors
234    /// - See [`DiscordError`] for more info
235    pub fn clear_activity(&mut self) -> Result<Payload<Activity>> {
236        self.execute(Command::SetActivity, SetActivityArgs::default(), None)
237    }
238
239    // NOTE: Not sure what the actual response values of
240    //       SEND_ACTIVITY_JOIN_INVITE and CLOSE_ACTIVITY_REQUEST are,
241    //       they are not documented.
242    /// Send an invite to a user to join a game
243    ///
244    /// # Errors
245    /// - See [`DiscordError`] for more info
246    pub fn send_activity_join_invite(&mut self, user_id: u64) -> Result<Payload<Value>> {
247        self.execute(
248            Command::SendActivityJoinInvite,
249            SendActivityJoinInviteArgs::new(user_id),
250            None,
251        )
252    }
253
254    /// Close request to join a game
255    ///
256    /// # Errors
257    /// - See [`DiscordError`] for more info
258    pub fn close_activity_request(&mut self, user_id: u64) -> Result<Payload<Value>> {
259        self.execute(
260            Command::CloseActivityRequest,
261            CloseActivityRequestArgs::new(user_id),
262            None,
263        )
264    }
265
266    /// Subscribe to a given event
267    ///
268    /// # Errors
269    /// - See [`DiscordError`] for more info
270    pub fn subscribe<F>(&mut self, evt: Event, f: F) -> Result<Payload<Subscription>>
271    where
272        F: FnOnce(SubscriptionArgs) -> SubscriptionArgs,
273    {
274        self.execute(Command::Subscribe, f(SubscriptionArgs::new()), Some(evt))
275    }
276
277    /// Unsubscribe from a given event
278    ///
279    /// # Errors
280    /// - See [`DiscordError`] for more info
281    pub fn unsubscribe<F>(&mut self, evt: Event, f: F) -> Result<Payload<Subscription>>
282    where
283        F: FnOnce(SubscriptionArgs) -> SubscriptionArgs,
284    {
285        self.execute(Command::Unsubscribe, f(SubscriptionArgs::new()), Some(evt))
286    }
287
288    /// Listens for a given event, and returns a handle that unregisters the listener when it is dropped.
289    ///
290    /// # Examples
291    ///
292    /// ```no_run
293    /// # use std::{thread::sleep, time::Duration};
294    /// # use discord_presence::Client;
295    /// let mut drpc = Client::new(1003450375732482138);
296    /// let _ready = drpc.on_ready(|_ctx| {
297    ///     println!("READY!");
298    /// });
299    ///
300    /// drpc.start();
301    ///
302    /// {
303    ///     let _ready_first_3_seconds = drpc.on_ready(|_ctx| {
304    ///         println!("READY, IN THE FIRST 3 SECONDS!");
305    ///     });
306    ///     sleep(Duration::from_secs(3));
307    /// }
308    ///
309    /// // You can also manually remove the handler
310    ///
311    /// let never_ready = drpc.on_ready(|_ctx| {
312    ///     println!("I will never be ready!");
313    /// });
314    /// never_ready.remove();
315    ///
316    /// // Or via [`std::mem::drop`]
317    /// let never_ready = drpc.on_ready(|_ctx| {
318    ///     println!("I will never be ready!");
319    /// });
320    /// drop(never_ready);
321    ///
322    /// drpc.block_on().unwrap();
323    /// ```
324    ///
325    /// You can use `.persist` or [`std::mem::forget`] to disable the automatic unregister-on-drop:
326    ///
327    /// ```no_run
328    /// # use discord_presence::Client;
329    /// # let mut drpc = Client::new(1003450375732482138);
330    ///
331    /// {
332    ///     let ready = drpc.on_ready(|_ctx| {
333    ///         println!("READY!");
334    ///     }).persist();
335    /// }
336    /// // Or
337    /// {
338    ///     let ready = drpc.on_ready(|_ctx| {
339    ///         println!("READY!");
340    ///     });
341    ///     std::mem::forget(ready);
342    /// }
343    /// // the event listener is still registered
344    ///
345    /// # drpc.start();
346    /// # drpc.block_on().unwrap();
347    /// ```
348    pub fn on_event<F>(&self, event: Event, handler: F) -> EventCallbackHandle
349    where
350        F: Fn(EventContext) + 'static + Send + Sync,
351    {
352        self.event_handler_registry.register(event, handler)
353    }
354
355    /// Block the current thread until the event is fired
356    ///
357    /// Returns the context the event was fired in
358    ///
359    /// NOTE: Please only use this for the ready event, or if you know what you are doing.
360    ///
361    /// # Errors
362    /// - Channel disconnected
363    ///
364    /// # Panics
365    /// - Panics if the channel is disconnected for whatever reason.
366    pub fn block_until_event(&mut self, event: Event) -> Result<crate::event_handler::Context> {
367        // TODO: Use bounded channel
368        let (tx, rx) = crossbeam_channel::unbounded::<crate::event_handler::Context>();
369
370        let handler = move |info| {
371            // dbg!("Blocked until at ", std::time::SystemTime::now());
372            if let Err(e) = tx.send(info) {
373                error!("{e}");
374            }
375        };
376
377        // `handler` is automatically unregistered once this variable drops
378        let cb_handle = self.on_event(event, handler);
379
380        let response = rx.recv()?;
381
382        drop(cb_handle);
383
384        Ok(response)
385    }
386
387    event_handler_function!(on_ready, Event::Ready);
388
389    event_handler_function!(on_error, Event::Error);
390
391    event_handler_function!(on_activity_join, Event::ActivityJoin);
392
393    event_handler_function!(on_activity_join_request, Event::ActivityJoinRequest);
394
395    event_handler_function!(on_activity_spectate, Event::ActivitySpectate);
396
397    event_handler_function!(on_connected, Event::Connected);
398
399    event_handler_function!(on_disconnected, Event::Disconnected);
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_is_ready() {
408        assert!(!Client::is_ready());
409
410        crate::READY.store(true, Ordering::Relaxed);
411
412        assert!(Client::is_ready());
413    }
414}