Skip to main content

teamtalk/
dispatch.rs

1//! Event dispatcher built on top of `Client::poll`.
2use crate::client::{Client, ConnectParams, Message, ReconnectConfig, ReconnectHandler};
3use crate::events::Event;
4use std::mem;
5
6/// Owned connection parameters for reconnect workflows.
7#[derive(Clone)]
8pub struct ConnectParamsOwned {
9    pub host: String,
10    pub tcp: i32,
11    pub udp: i32,
12    pub encrypted: bool,
13}
14
15impl ConnectParamsOwned {
16    /// Creates owned connection parameters.
17    pub fn new(host: impl Into<String>, tcp: i32, udp: i32, encrypted: bool) -> Self {
18        Self {
19            host: host.into(),
20            tcp,
21            udp,
22            encrypted,
23        }
24    }
25
26    /// Returns a borrowed `ConnectParams` view.
27    pub fn as_params(&self) -> ConnectParams<'_> {
28        ConnectParams {
29            host: &self.host,
30            tcp: self.tcp,
31            udp: self.udp,
32            encrypted: self.encrypted,
33        }
34    }
35}
36
37impl From<ConnectParams<'_>> for ConnectParamsOwned {
38    fn from(params: ConnectParams<'_>) -> Self {
39        Self::new(params.host, params.tcp, params.udp, params.encrypted)
40    }
41}
42
43#[derive(Clone)]
44/// Reconnect settings for dispatch flows.
45pub struct ReconnectSettings {
46    pub params: ConnectParamsOwned,
47    pub config: ReconnectConfig,
48    pub extra_events: Vec<Event>,
49}
50
51impl ReconnectSettings {
52    /// Creates reconnect settings with default event set.
53    pub fn new(params: ConnectParamsOwned, config: ReconnectConfig) -> Self {
54        Self {
55            params,
56            config,
57            extra_events: Vec::new(),
58        }
59    }
60
61    /// Adds extra events which should trigger reconnection.
62    pub fn with_extra_events(mut self, extra_events: Vec<Event>) -> Self {
63        self.extra_events = extra_events;
64        self
65    }
66}
67
68#[derive(Clone)]
69/// Configuration for the dispatcher runtime.
70pub struct ClientConfig {
71    pub poll_timeout_ms: i32,
72    pub reconnect: Option<ReconnectSettings>,
73}
74
75impl Default for ClientConfig {
76    fn default() -> Self {
77        Self {
78            poll_timeout_ms: 100,
79            reconnect: None,
80        }
81    }
82}
83
84impl ClientConfig {
85    /// Creates a configuration with defaults.
86    pub fn new() -> Self {
87        Self::default()
88    }
89
90    /// Sets the poll timeout in milliseconds.
91    pub fn poll_timeout_ms(mut self, timeout_ms: i32) -> Self {
92        self.poll_timeout_ms = timeout_ms;
93        self
94    }
95
96    /// Enables reconnect using provided connection parameters.
97    pub fn reconnect(mut self, params: ConnectParamsOwned, config: ReconnectConfig) -> Self {
98        self.reconnect = Some(ReconnectSettings::new(params, config));
99        self
100    }
101
102    /// Enables reconnect and adds extra events which trigger reconnect.
103    pub fn reconnect_with_events(
104        mut self,
105        params: ConnectParamsOwned,
106        config: ReconnectConfig,
107        extra_events: Vec<Event>,
108    ) -> Self {
109        self.reconnect =
110            Some(ReconnectSettings::new(params, config).with_extra_events(extra_events));
111        self
112    }
113
114    pub fn without_reconnect(mut self) -> Self {
115        self.reconnect = None;
116        self
117    }
118}
119
120/// Controls dispatcher loop flow.
121#[derive(Clone, Copy, PartialEq, Eq)]
122pub enum DispatchFlow {
123    Continue,
124    Stop,
125}
126
127/// Event context passed into dispatcher handlers.
128#[derive(Clone, Copy)]
129pub struct EventContext<'a> {
130    event: Event,
131    message: &'a Message,
132    client: Option<&'a Client>,
133}
134
135impl<'a> EventContext<'a> {
136    /// Returns the event.
137    pub fn event(&self) -> Event {
138        self.event
139    }
140
141    /// Returns the raw message.
142    pub fn message(&self) -> &Message {
143        self.message
144    }
145
146    /// Returns the client if the source provides one.
147    pub fn client(&self) -> Option<&Client> {
148        self.client
149    }
150}
151
152/// Event source abstraction for the dispatcher.
153pub trait EventSource {
154    /// Polls for the next event.
155    fn poll(&mut self, timeout_ms: i32) -> Option<(Event, Message)>;
156    /// Returns the underlying client if available.
157    fn client(&self) -> Option<&Client>;
158}
159
160impl EventSource for Client {
161    fn poll(&mut self, timeout_ms: i32) -> Option<(Event, Message)> {
162        Client::poll(self, timeout_ms)
163    }
164
165    fn client(&self) -> Option<&Client> {
166        Some(self)
167    }
168}
169
170impl EventSource for &Client {
171    fn poll(&mut self, timeout_ms: i32) -> Option<(Event, Message)> {
172        (*self).poll(timeout_ms)
173    }
174
175    fn client(&self) -> Option<&Client> {
176        Some(*self)
177    }
178}
179
180type HandlerFn = Box<dyn for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send>;
181
182struct HandlerEntry {
183    event: Option<Event>,
184    handler: HandlerFn,
185}
186
187impl HandlerEntry {
188    fn matches(&self, event: &Event) -> bool {
189        match &self.event {
190            Some(e) => mem::discriminant(e) == mem::discriminant(event),
191            None => true,
192        }
193    }
194}
195
196struct ReconnectState {
197    params: ConnectParamsOwned,
198    handler: ReconnectHandler,
199    extra_events: Vec<Event>,
200}
201
202impl ReconnectState {
203    fn new(settings: ReconnectSettings) -> Self {
204        Self {
205            handler: ReconnectHandler::new(settings.config),
206            params: settings.params,
207            extra_events: settings.extra_events,
208        }
209    }
210
211    fn on_event(&mut self, client: Option<&Client>, event: &Event) {
212        if matches!(event, Event::ConnectSuccess) {
213            self.handler.mark_connected();
214        }
215        if event.is_reconnect_needed_with(&self.extra_events) {
216            self.handler.mark_disconnected();
217            if let Some(client) = client {
218                let params = self.params.as_params();
219                client.handle_reconnect(&params, &mut self.handler);
220            }
221        }
222    }
223}
224
225pub struct Dispatcher<S: EventSource> {
226    source: S,
227    handlers: Vec<HandlerEntry>,
228    poll_timeout_ms: i32,
229    reconnect: Option<ReconnectState>,
230    stop: bool,
231}
232
233impl<S: EventSource> Dispatcher<S> {
234    /// Creates a dispatcher with default configuration.
235    pub fn new(source: S) -> Self {
236        Self::with_config(source, ClientConfig::default())
237    }
238
239    /// Creates a dispatcher with a custom configuration.
240    pub fn with_config(source: S, config: ClientConfig) -> Self {
241        let reconnect = config.reconnect.map(ReconnectState::new);
242        Self {
243            source,
244            handlers: Vec::new(),
245            poll_timeout_ms: config.poll_timeout_ms,
246            reconnect,
247            stop: false,
248        }
249    }
250
251    /// Returns the underlying event source.
252    pub fn source(&self) -> &S {
253        &self.source
254    }
255
256    /// Returns a mutable reference to the event source.
257    pub fn source_mut(&mut self) -> &mut S {
258        &mut self.source
259    }
260
261    /// Adds a handler for a specific event.
262    pub fn add_handler<F>(&mut self, event: Event, handler: F)
263    where
264        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
265    {
266        self.handlers.push(HandlerEntry {
267            event: Some(event),
268            handler: Box::new(handler),
269        });
270    }
271
272    /// Adds a handler which receives all events.
273    pub fn add_handler_any<F>(&mut self, handler: F)
274    where
275        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
276    {
277        self.handlers.push(HandlerEntry {
278            event: None,
279            handler: Box::new(handler),
280        });
281    }
282
283    /// Adds a handler and returns the dispatcher for chaining.
284    pub fn on_event<F>(mut self, event: Event, handler: F) -> Self
285    where
286        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
287    {
288        self.add_handler(event, handler);
289        self
290    }
291
292    /// Adds a handler for all events and returns the dispatcher for chaining.
293    pub fn on_any<F>(mut self, handler: F) -> Self
294    where
295        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
296    {
297        self.add_handler_any(handler);
298        self
299    }
300
301    /// Adds a handler for user join events.
302    pub fn on_user_joined<F>(self, handler: F) -> Self
303    where
304        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
305    {
306        self.on_event(Event::UserJoined, handler)
307    }
308
309    /// Adds a handler for user left events.
310    pub fn on_user_left<F>(self, handler: F) -> Self
311    where
312        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
313    {
314        self.on_event(Event::UserLeft, handler)
315    }
316
317    /// Adds a handler for text messages.
318    pub fn on_text_message<F>(self, handler: F) -> Self
319    where
320        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
321    {
322        self.on_event(Event::TextMessage, handler)
323    }
324
325    /// Adds a handler for connection success events.
326    pub fn on_connect_success<F>(self, handler: F) -> Self
327    where
328        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
329    {
330        self.on_event(Event::ConnectSuccess, handler)
331    }
332
333    /// Adds a handler for connection lost events.
334    pub fn on_connection_lost<F>(self, handler: F) -> Self
335    where
336        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
337    {
338        self.on_event(Event::ConnectionLost, handler)
339    }
340
341    /// Adds a handler for connection failure events.
342    pub fn on_connect_failed<F>(self, handler: F) -> Self
343    where
344        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
345    {
346        self.on_event(Event::ConnectFailed, handler)
347    }
348
349    /// Adds a handler for command error events.
350    pub fn on_command_error<F>(self, handler: F) -> Self
351    where
352        F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
353    {
354        self.on_event(Event::CmdError, handler)
355    }
356
357    /// Requests the dispatcher loop to stop.
358    pub fn stop(&mut self) {
359        self.stop = true;
360    }
361
362    /// Runs the dispatcher loop with the configured timeout.
363    pub fn run(&mut self) -> DispatchFlow {
364        self.run_with_timeout(self.poll_timeout_ms)
365    }
366
367    /// Runs the dispatcher loop with an explicit timeout.
368    pub fn run_with_timeout(&mut self, timeout_ms: i32) -> DispatchFlow {
369        while !self.stop {
370            let flow = self.step(timeout_ms);
371            if flow == DispatchFlow::Stop {
372                self.stop = true;
373            }
374        }
375        DispatchFlow::Stop
376    }
377
378    /// Performs one poll/dispatch step.
379    pub fn step(&mut self, timeout_ms: i32) -> DispatchFlow {
380        match self.source.poll(timeout_ms) {
381            Some((event, message)) => self.process_event(event, message),
382            None => DispatchFlow::Continue,
383        }
384    }
385
386    fn process_event(&mut self, event: Event, message: Message) -> DispatchFlow {
387        let client = self.source.client();
388        if let Some(reconnect) = self.reconnect.as_mut() {
389            reconnect.on_event(client, &event);
390        }
391        #[cfg(feature = "logging")]
392        crate::logging::event(&event, &message);
393        let ctx = EventContext {
394            event,
395            message: &message,
396            client,
397        };
398        let mut flow = DispatchFlow::Continue;
399        for handler in self.handlers.iter_mut() {
400            if handler.matches(&event) && (handler.handler)(ctx) == DispatchFlow::Stop {
401                flow = DispatchFlow::Stop;
402            }
403        }
404        flow
405    }
406}