1use std::{
2 sync::{atomic::Ordering, Arc},
3 thread::{JoinHandle, Thread},
4 time::Duration,
5};
6
7use crate::{
8 connection::Manager as ConnectionManager,
9 event_handler::{Context as EventContext, EventCallbackHandle, HandlerRegistry},
10 models::{
11 commands::{Subscription, SubscriptionArgs},
12 message::Message,
13 payload::Payload,
14 rich_presence::{
15 Activity, CloseActivityRequestArgs, SendActivityJoinInviteArgs, SetActivityArgs,
16 },
17 Command, Event, OpCode,
18 },
19 DiscordError, Result,
20};
21use crossbeam_channel::Sender;
22use serde::{de::DeserializeOwned, Serialize};
23use serde_json::Value;
24
25macro_rules! event_handler_function {
26 ( $( $name:ident, $event:expr ),* ) => {
27 event_handler_function!{@gen $([ $name, $event])*}
28 };
29
30 (@gen $( [ $name:ident, $event:expr ] ), *) => {
31 $(
32 #[doc = concat!("Listens for the `", stringify!($event), "` event")]
33 pub fn $name<F>(&self, handler: F) -> EventCallbackHandle
34 where F: Fn(EventContext) + 'static + Send + Sync
35 {
36 self.on_event($event, handler)
37 }
38 )*
39 }
40}
41
42#[allow(clippy::module_name_repetitions)]
44pub struct ClientThread(JoinHandle<()>, Sender<()>);
45
46impl ClientThread {
47 #[allow(clippy::missing_errors_doc)]
49 pub fn join(self) -> std::thread::Result<()> {
51 self.0.join()
52 }
53
54 #[allow(clippy::missing_errors_doc)]
56 #[must_use]
57 pub fn is_finished(&self) -> bool {
59 self.0.is_finished()
60 }
61 #[allow(clippy::missing_errors_doc)]
63 #[must_use]
64 pub fn thread(&self) -> &Thread {
66 self.0.thread()
67 }
68
69 pub fn stop(self) -> Result<()> {
75 self.1.send(())?;
77
78 self.join().map_err(|_| DiscordError::EventLoopError)?;
79
80 Ok(())
81 }
82
83 pub fn persist(self) {
85 std::mem::forget(self);
86 }
87}
88
89#[derive(Clone)]
90pub struct Client {
92 connection_manager: ConnectionManager,
93 event_handler_registry: Arc<HandlerRegistry>,
94 thread: Option<Arc<ClientThread>>,
95}
96
97impl Client {
98 #[must_use]
100 pub fn new(client_id: u64) -> Self {
101 Self::with_error_config(client_id, Duration::from_secs(5), None)
102 }
103
104 #[must_use]
106 pub fn with_error_config(
107 client_id: u64,
108 sleep_duration: Duration,
109 attempts: Option<usize>,
110 ) -> Self {
111 let event_handler_registry = Arc::new(HandlerRegistry::new());
112 let connection_manager = ConnectionManager::new(
113 client_id,
114 event_handler_registry.clone(),
115 sleep_duration,
116 attempts,
117 );
118
119 Self {
120 connection_manager,
121 event_handler_registry,
122 thread: None,
123 }
124 }
125
126 pub fn start(&mut self) {
133 let (tx, rx) = crossbeam_channel::bounded::<()>(1);
135
136 let thread = self.connection_manager.start(rx);
137
138 self.thread = Some(Arc::new(ClientThread(thread, tx)));
139 }
140
141 pub fn shutdown(self) -> Result<()> {
147 if let Some(thread) = self.thread.as_ref() {
148 thread.1.send(())?;
149
150 crate::READY.store(false, Ordering::Relaxed);
151
152 self.block_on()
153 } else {
154 Err(DiscordError::NotStarted)
155 }
156 }
157
158 pub fn block_on(mut self) -> Result<()> {
168 let thread = self.unwrap_thread()?;
169
170 thread.join().map_err(|_| DiscordError::ThreadError)?;
173
174 Ok(())
175 }
176
177 fn unwrap_thread(&mut self) -> Result<ClientThread> {
178 if let Some(thread) = self.thread.take() {
179 let thread = Arc::try_unwrap(thread).map_err(|_| DiscordError::ThreadInUse)?;
180
181 Ok(thread)
182 } else {
183 Err(DiscordError::NotStarted)
184 }
185 }
186
187 #[must_use]
188 pub fn is_ready() -> bool {
190 crate::READY.load(Ordering::Relaxed)
191 }
192
193 fn execute<A, E>(&mut self, cmd: Command, args: A, evt: Option<Event>) -> Result<Payload<E>>
194 where
195 A: Serialize + Send + Sync,
196 E: Serialize + DeserializeOwned + Send + Sync,
197 {
198 if !crate::READY.load(Ordering::Relaxed) {
199 return Err(DiscordError::NotStarted);
200 }
201
202 trace!("Executing command: {cmd:?}");
203
204 let message = Message::new(
205 OpCode::Frame,
206 Payload::with_nonce(cmd, Some(args), None, evt),
207 );
208 self.connection_manager.send(message?)?;
209 let Message { payload, .. } = self.connection_manager.recv()?;
210 trace!("Received response payload: {payload}");
211 let response: Payload<E> = serde_json::from_str(&payload)?;
212 trace!("Parsed response payload.");
213
214 match response.evt {
215 Some(Event::Error) => Err(DiscordError::SubscriptionFailed),
216 _ => Ok(response),
217 }
218 }
219
220 pub fn set_activity<F>(&mut self, f: F) -> Result<Payload<Activity>>
225 where
226 F: FnOnce(Activity) -> Activity,
227 {
228 self.execute(Command::SetActivity, SetActivityArgs::new(f), None)
229 }
230
231 pub fn clear_activity(&mut self) -> Result<Payload<Activity>> {
236 self.execute(Command::SetActivity, SetActivityArgs::default(), None)
237 }
238
239 pub fn send_activity_join_invite(&mut self, user_id: u64) -> Result<Payload<Value>> {
247 self.execute(
248 Command::SendActivityJoinInvite,
249 SendActivityJoinInviteArgs::new(user_id),
250 None,
251 )
252 }
253
254 pub fn close_activity_request(&mut self, user_id: u64) -> Result<Payload<Value>> {
259 self.execute(
260 Command::CloseActivityRequest,
261 CloseActivityRequestArgs::new(user_id),
262 None,
263 )
264 }
265
266 pub fn subscribe<F>(&mut self, evt: Event, f: F) -> Result<Payload<Subscription>>
271 where
272 F: FnOnce(SubscriptionArgs) -> SubscriptionArgs,
273 {
274 self.execute(Command::Subscribe, f(SubscriptionArgs::new()), Some(evt))
275 }
276
277 pub fn unsubscribe<F>(&mut self, evt: Event, f: F) -> Result<Payload<Subscription>>
282 where
283 F: FnOnce(SubscriptionArgs) -> SubscriptionArgs,
284 {
285 self.execute(Command::Unsubscribe, f(SubscriptionArgs::new()), Some(evt))
286 }
287
288 pub fn on_event<F>(&self, event: Event, handler: F) -> EventCallbackHandle
349 where
350 F: Fn(EventContext) + 'static + Send + Sync,
351 {
352 self.event_handler_registry.register(event, handler)
353 }
354
355 pub fn block_until_event(&mut self, event: Event) -> Result<crate::event_handler::Context> {
367 let (tx, rx) = crossbeam_channel::unbounded::<crate::event_handler::Context>();
369
370 let handler = move |info| {
371 if let Err(e) = tx.send(info) {
373 error!("{e}");
374 }
375 };
376
377 let cb_handle = self.on_event(event, handler);
379
380 let response = rx.recv()?;
381
382 drop(cb_handle);
383
384 Ok(response)
385 }
386
387 event_handler_function!(on_ready, Event::Ready);
388
389 event_handler_function!(on_error, Event::Error);
390
391 event_handler_function!(on_activity_join, Event::ActivityJoin);
392
393 event_handler_function!(on_activity_join_request, Event::ActivityJoinRequest);
394
395 event_handler_function!(on_activity_spectate, Event::ActivitySpectate);
396
397 event_handler_function!(on_connected, Event::Connected);
398
399 event_handler_function!(on_disconnected, Event::Disconnected);
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 #[test]
407 fn test_is_ready() {
408 assert!(!Client::is_ready());
409
410 crate::READY.store(true, Ordering::Relaxed);
411
412 assert!(Client::is_ready());
413 }
414}