use futures::stream::{
FuturesUnordered,
StreamExt
};
use tracing::{Span, info_span};
use crate::control::DispatcherReq;
use super::{
AnyEvent,
DaemonEvent,
DynamicEvent,
PeerEvent
};
use super::handle::HandlerTask;
use crate::util::*;
#[derive(Debug)]
pub enum Interest {
Dynamic(HandlerTask<DynamicEvent>),
Peer(HandlerTask<PeerEvent>),
Daemon(HandlerTask<DaemonEvent>)
}
pub struct DispatcherSetup {
pub dispatcher: Dispatcher,
pub query_tx: Sender<DispatcherReq>,
pub event_tx: Sender<AnyEvent>,
pub interest_tx: Sender<Interest>,
}
pub struct Dispatcher {
event_rx: Receiver<AnyEvent>,
query_rx: Receiver<DispatcherReq>,
interest_rx: Receiver<Interest>,
span: Span,
dynamic_hs: FuturesUnordered<HandlerTask<DynamicEvent>>,
peer_hs: FuturesUnordered<HandlerTask<PeerEvent>>,
daemon_hs: FuturesUnordered<HandlerTask<DaemonEvent>>
}
impl Dispatcher {
pub fn setup(parent_span: &Span) -> DispatcherSetup {
let (event_tx, event_rx) = new_channel::<AnyEvent>();
let (interest_tx, interest_rx) = new_channel::<Interest>();
let (query_tx, query_rx) = new_channel::<DispatcherReq>();
let dispatcher = Self {
event_rx,
query_rx,
interest_rx,
span: parent_span.in_scope(|| info_span!("dispatcher")),
dynamic_hs: FuturesUnordered::new(),
peer_hs: FuturesUnordered::new(),
daemon_hs: FuturesUnordered::new()
};
DispatcherSetup {
dispatcher,
query_tx,
event_tx,
interest_tx
}
}
pub async fn run(mut self) {
info!(@self, "started");
loop {
tokio::select! {
Some(h) = self.dynamic_hs.next() => {
debug!(@self, handler = ?h, "handler terminated");
},
Some(h) = self.peer_hs.next() => {
debug!(@self, handler = ?h, "handler terminated");
},
Some(h) = self.daemon_hs.next() => {
debug!(@self, handler = ?h, "handler terminated");
},
Some(i) = self.interest_rx.recv() => self.register(i),
Some(q) = self.query_rx.recv() => self.handle_query(q),
e = self.event_rx.recv() => match e {
Some(evt) => self.dispatch(evt),
None => break
},
}
}
}
fn register(&mut self, interest: Interest) {
match interest {
Interest::Dynamic(h) => self.dynamic_hs.push(h),
Interest::Peer(h) => self.peer_hs.push(h),
Interest::Daemon(h) => self.daemon_hs.push(h)
}
}
fn dispatch(&self, event: AnyEvent) {
trace!(@self, ?event, "dispatching");
match event {
AnyEvent::Dynamic(evt) => {
let unhandled = self.dynamic_hs
.iter()
.map(|t| t.send(evt.clone()))
.filter(|&b| b)
.count() == 0;
if unhandled {
warn!(@self, ?evt, "unhandled dynamic event");
}
},
AnyEvent::Peer(evt) => self.peer_hs
.iter()
.for_each(|t| {t.send(evt.clone());}),
AnyEvent::Daemon(evt) => self.daemon_hs
.iter()
.for_each(|t| {t.send(evt.clone());})
}
}
fn list_handlers(&self) -> Vec<String> {
self.daemon_hs
.iter()
.map(|t| format!("{t}"))
.chain(self.peer_hs
.iter()
.map(|t| format!("{t}"))
)
.chain(self.dynamic_hs
.iter()
.map(|t| format!("{t}"))
)
.collect()
}
fn handle_query(&self, query: DispatcherReq) {
match query {
DispatcherReq::GetHandlers(req) => {
req.reply(self.list_handlers());
}
DispatcherReq::Dispatch((evt, replier)) => {
self.dispatch(evt.into());
replier.reply(true);
}
}
}
}