1use crate::client::{Client, ConnectParams, Message, ReconnectConfig, ReconnectHandler};
3use crate::events::Event;
4use std::mem;
5
6#[derive(Clone)]
8pub struct ConnectParamsOwned {
9 pub host: String,
10 pub tcp: i32,
11 pub udp: i32,
12 pub encrypted: bool,
13}
14
15impl ConnectParamsOwned {
16 pub fn new(host: impl Into<String>, tcp: i32, udp: i32, encrypted: bool) -> Self {
18 Self {
19 host: host.into(),
20 tcp,
21 udp,
22 encrypted,
23 }
24 }
25
26 pub fn as_params(&self) -> ConnectParams<'_> {
28 ConnectParams {
29 host: &self.host,
30 tcp: self.tcp,
31 udp: self.udp,
32 encrypted: self.encrypted,
33 }
34 }
35}
36
37impl From<ConnectParams<'_>> for ConnectParamsOwned {
38 fn from(params: ConnectParams<'_>) -> Self {
39 Self::new(params.host, params.tcp, params.udp, params.encrypted)
40 }
41}
42
43#[derive(Clone)]
44pub struct ReconnectSettings {
46 pub params: ConnectParamsOwned,
47 pub config: ReconnectConfig,
48 pub extra_events: Vec<Event>,
49}
50
51impl ReconnectSettings {
52 pub fn new(params: ConnectParamsOwned, config: ReconnectConfig) -> Self {
54 Self {
55 params,
56 config,
57 extra_events: Vec::new(),
58 }
59 }
60
61 pub fn with_extra_events(mut self, extra_events: Vec<Event>) -> Self {
63 self.extra_events = extra_events;
64 self
65 }
66}
67
68#[derive(Clone)]
69pub struct ClientConfig {
71 pub poll_timeout_ms: i32,
72 pub reconnect: Option<ReconnectSettings>,
73}
74
75impl Default for ClientConfig {
76 fn default() -> Self {
77 Self {
78 poll_timeout_ms: 100,
79 reconnect: None,
80 }
81 }
82}
83
84impl ClientConfig {
85 pub fn new() -> Self {
87 Self::default()
88 }
89
90 pub fn poll_timeout_ms(mut self, timeout_ms: i32) -> Self {
92 self.poll_timeout_ms = timeout_ms;
93 self
94 }
95
96 pub fn reconnect(mut self, params: ConnectParamsOwned, config: ReconnectConfig) -> Self {
98 self.reconnect = Some(ReconnectSettings::new(params, config));
99 self
100 }
101
102 pub fn reconnect_with_events(
104 mut self,
105 params: ConnectParamsOwned,
106 config: ReconnectConfig,
107 extra_events: Vec<Event>,
108 ) -> Self {
109 self.reconnect =
110 Some(ReconnectSettings::new(params, config).with_extra_events(extra_events));
111 self
112 }
113
114 pub fn without_reconnect(mut self) -> Self {
115 self.reconnect = None;
116 self
117 }
118}
119
120#[derive(Clone, Copy, PartialEq, Eq)]
122pub enum DispatchFlow {
123 Continue,
124 Stop,
125}
126
127#[derive(Clone, Copy)]
129pub struct EventContext<'a> {
130 event: Event,
131 message: &'a Message,
132 client: Option<&'a Client>,
133}
134
135impl<'a> EventContext<'a> {
136 pub fn event(&self) -> Event {
138 self.event
139 }
140
141 pub fn message(&self) -> &Message {
143 self.message
144 }
145
146 pub fn client(&self) -> Option<&Client> {
148 self.client
149 }
150}
151
152pub trait EventSource {
154 fn poll(&mut self, timeout_ms: i32) -> Option<(Event, Message)>;
156 fn client(&self) -> Option<&Client>;
158}
159
160impl EventSource for Client {
161 fn poll(&mut self, timeout_ms: i32) -> Option<(Event, Message)> {
162 Client::poll(self, timeout_ms)
163 }
164
165 fn client(&self) -> Option<&Client> {
166 Some(self)
167 }
168}
169
170impl EventSource for &Client {
171 fn poll(&mut self, timeout_ms: i32) -> Option<(Event, Message)> {
172 (*self).poll(timeout_ms)
173 }
174
175 fn client(&self) -> Option<&Client> {
176 Some(*self)
177 }
178}
179
180type HandlerFn = Box<dyn for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send>;
181
182struct HandlerEntry {
183 event: Option<Event>,
184 handler: HandlerFn,
185}
186
187impl HandlerEntry {
188 fn matches(&self, event: &Event) -> bool {
189 match &self.event {
190 Some(e) => mem::discriminant(e) == mem::discriminant(event),
191 None => true,
192 }
193 }
194}
195
196struct ReconnectState {
197 params: ConnectParamsOwned,
198 handler: ReconnectHandler,
199 extra_events: Vec<Event>,
200}
201
202impl ReconnectState {
203 fn new(settings: ReconnectSettings) -> Self {
204 Self {
205 handler: ReconnectHandler::new(settings.config),
206 params: settings.params,
207 extra_events: settings.extra_events,
208 }
209 }
210
211 fn on_event(&mut self, client: Option<&Client>, event: &Event) {
212 if matches!(event, Event::ConnectSuccess) {
213 self.handler.mark_connected();
214 }
215 if event.is_reconnect_needed_with(&self.extra_events) {
216 self.handler.mark_disconnected();
217 if let Some(client) = client {
218 let params = self.params.as_params();
219 client.handle_reconnect(¶ms, &mut self.handler);
220 }
221 }
222 }
223}
224
225pub struct Dispatcher<S: EventSource> {
226 source: S,
227 handlers: Vec<HandlerEntry>,
228 poll_timeout_ms: i32,
229 reconnect: Option<ReconnectState>,
230 stop: bool,
231}
232
233impl<S: EventSource> Dispatcher<S> {
234 pub fn new(source: S) -> Self {
236 Self::with_config(source, ClientConfig::default())
237 }
238
239 pub fn with_config(source: S, config: ClientConfig) -> Self {
241 let reconnect = config.reconnect.map(ReconnectState::new);
242 Self {
243 source,
244 handlers: Vec::new(),
245 poll_timeout_ms: config.poll_timeout_ms,
246 reconnect,
247 stop: false,
248 }
249 }
250
251 pub fn source(&self) -> &S {
253 &self.source
254 }
255
256 pub fn source_mut(&mut self) -> &mut S {
258 &mut self.source
259 }
260
261 pub fn add_handler<F>(&mut self, event: Event, handler: F)
263 where
264 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
265 {
266 self.handlers.push(HandlerEntry {
267 event: Some(event),
268 handler: Box::new(handler),
269 });
270 }
271
272 pub fn add_handler_any<F>(&mut self, handler: F)
274 where
275 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
276 {
277 self.handlers.push(HandlerEntry {
278 event: None,
279 handler: Box::new(handler),
280 });
281 }
282
283 pub fn on_event<F>(mut self, event: Event, handler: F) -> Self
285 where
286 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
287 {
288 self.add_handler(event, handler);
289 self
290 }
291
292 pub fn on_any<F>(mut self, handler: F) -> Self
294 where
295 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
296 {
297 self.add_handler_any(handler);
298 self
299 }
300
301 pub fn on_user_joined<F>(self, handler: F) -> Self
303 where
304 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
305 {
306 self.on_event(Event::UserJoined, handler)
307 }
308
309 pub fn on_user_left<F>(self, handler: F) -> Self
311 where
312 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
313 {
314 self.on_event(Event::UserLeft, handler)
315 }
316
317 pub fn on_text_message<F>(self, handler: F) -> Self
319 where
320 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
321 {
322 self.on_event(Event::TextMessage, handler)
323 }
324
325 pub fn on_connect_success<F>(self, handler: F) -> Self
327 where
328 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
329 {
330 self.on_event(Event::ConnectSuccess, handler)
331 }
332
333 pub fn on_connection_lost<F>(self, handler: F) -> Self
335 where
336 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
337 {
338 self.on_event(Event::ConnectionLost, handler)
339 }
340
341 pub fn on_connect_failed<F>(self, handler: F) -> Self
343 where
344 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
345 {
346 self.on_event(Event::ConnectFailed, handler)
347 }
348
349 pub fn on_command_error<F>(self, handler: F) -> Self
351 where
352 F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
353 {
354 self.on_event(Event::CmdError, handler)
355 }
356
357 pub fn stop(&mut self) {
359 self.stop = true;
360 }
361
362 pub fn run(&mut self) -> DispatchFlow {
364 self.run_with_timeout(self.poll_timeout_ms)
365 }
366
367 pub fn run_with_timeout(&mut self, timeout_ms: i32) -> DispatchFlow {
369 while !self.stop {
370 let flow = self.step(timeout_ms);
371 if flow == DispatchFlow::Stop {
372 self.stop = true;
373 }
374 }
375 DispatchFlow::Stop
376 }
377
378 pub fn step(&mut self, timeout_ms: i32) -> DispatchFlow {
380 match self.source.poll(timeout_ms) {
381 Some((event, message)) => self.process_event(event, message),
382 None => DispatchFlow::Continue,
383 }
384 }
385
386 fn process_event(&mut self, event: Event, message: Message) -> DispatchFlow {
387 let client = self.source.client();
388 if let Some(reconnect) = self.reconnect.as_mut() {
389 reconnect.on_event(client, &event);
390 }
391 #[cfg(feature = "logging")]
392 crate::logging::event(&event, &message);
393 let ctx = EventContext {
394 event,
395 message: &message,
396 client,
397 };
398 let mut flow = DispatchFlow::Continue;
399 for handler in self.handlers.iter_mut() {
400 if handler.matches(&event) && (handler.handler)(ctx) == DispatchFlow::Stop {
401 flow = DispatchFlow::Stop;
402 }
403 }
404 flow
405 }
406}