use super::source::{EventSource, HandlerEntry, ReconnectState};
use super::{ClientConfig, DispatchFlow, Event, EventContext, Message};
pub struct Dispatcher<S: EventSource> {
source: S,
handlers: Vec<HandlerEntry>,
poll_timeout_ms: i32,
reconnect: Option<ReconnectState>,
stop: bool,
}
impl<S: EventSource> Dispatcher<S> {
pub fn new(source: S) -> Self {
Self::with_config(source, ClientConfig::default())
}
pub fn with_config(source: S, config: ClientConfig) -> Self {
let reconnect = config.reconnect.map(ReconnectState::new);
Self {
source,
handlers: Vec::new(),
poll_timeout_ms: config.poll_timeout_ms,
reconnect,
stop: false,
}
}
pub fn source(&self) -> &S {
&self.source
}
pub fn source_mut(&mut self) -> &mut S {
&mut self.source
}
pub fn add_handler<F>(&mut self, event: Event, handler: F)
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.handlers.push(HandlerEntry {
event: Some(event),
handler: Box::new(handler),
});
}
pub fn add_handler_any<F>(&mut self, handler: F)
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.handlers.push(HandlerEntry {
event: None,
handler: Box::new(handler),
});
}
pub fn on_event<F>(mut self, event: Event, handler: F) -> Self
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.add_handler(event, handler);
self
}
pub fn on_any<F>(mut self, handler: F) -> Self
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.add_handler_any(handler);
self
}
pub fn on_user_joined<F>(self, handler: F) -> Self
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.on_event(Event::UserJoined, handler)
}
pub fn on_user_left<F>(self, handler: F) -> Self
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.on_event(Event::UserLeft, handler)
}
pub fn on_text_message<F>(self, handler: F) -> Self
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.on_event(Event::TextMessage, handler)
}
pub fn on_connect_success<F>(self, handler: F) -> Self
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.on_event(Event::ConnectSuccess, handler)
}
pub fn on_connection_lost<F>(self, handler: F) -> Self
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.on_event(Event::ConnectionLost, handler)
}
pub fn on_connect_failed<F>(self, handler: F) -> Self
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.on_event(Event::ConnectFailed, handler)
}
pub fn on_command_error<F>(self, handler: F) -> Self
where
F: for<'a> FnMut(EventContext<'a>) -> DispatchFlow + Send + 'static,
{
self.on_event(Event::CmdError, handler)
}
pub fn stop(&mut self) {
self.stop = true;
}
pub fn run(&mut self) -> DispatchFlow {
self.run_with_timeout(self.poll_timeout_ms)
}
pub fn run_with_timeout(&mut self, timeout_ms: i32) -> DispatchFlow {
while !self.stop {
let flow = self.step(timeout_ms);
if flow == DispatchFlow::Stop {
self.stop = true;
}
}
DispatchFlow::Stop
}
pub fn step(&mut self, timeout_ms: i32) -> DispatchFlow {
match self.source.poll(timeout_ms) {
Some((event, message)) => self.process_event(event, message),
None => DispatchFlow::Continue,
}
}
fn process_event(&mut self, event: Event, message: Message) -> DispatchFlow {
let client = self.source.client();
if let Some(reconnect) = self.reconnect.as_mut() {
reconnect.on_event(client, &event);
}
#[cfg(feature = "logging")]
crate::logging::event(&event, &message);
let ctx = EventContext {
event,
message: &message,
client,
};
let mut flow = DispatchFlow::Continue;
for handler in self.handlers.iter_mut() {
if handler.matches(&event) && (handler.handler)(ctx) == DispatchFlow::Stop {
flow = DispatchFlow::Stop;
}
}
flow
}
}