use std::collections::HashMap;
use std::fmt::{Debug, Formatter, Result as FmtResult};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::*;
use crate::proto::ReadFrom;
use crate::zookeeper::RawResponse;
use crate::{KeeperState, WatchedEventType};
#[derive(Clone, Debug)]
pub struct WatchedEvent {
pub event_type: WatchedEventType,
pub keeper_state: KeeperState,
pub path: Option<String>,
}
#[derive(Debug, PartialEq)]
pub enum WatchType {
Child,
Data,
Exist,
}
pub struct Watch {
pub path: String,
pub watch_type: WatchType,
pub watcher: Box<dyn FnOnce(WatchedEvent) + Send>,
}
impl Debug for Watch {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
f.debug_struct("Watch")
.field("path", &self.path)
.field("watch_type", &self.watch_type)
.finish()
}
}
pub trait Watcher: Send {
fn handle(&self, event: WatchedEvent);
}
impl<F> Watcher for F
where
F: Fn(WatchedEvent) + Send,
{
fn handle(&self, event: WatchedEvent) {
self(event)
}
}
#[derive(Debug)]
pub enum WatchMessage {
Event(RawResponse),
Watch(Watch),
}
pub struct ZkWatch<W: Watcher> {
watcher: W,
watches: HashMap<String, Vec<Watch>>,
chroot: Option<String>,
rx: Receiver<WatchMessage>,
}
impl<W: Watcher> ZkWatch<W> {
pub fn new(watcher: W, chroot: Option<String>) -> (Self, Sender<WatchMessage>) {
trace!("ZkWatch::new");
let (tx, rx) = channel(64);
let watch = ZkWatch {
watches: HashMap::new(),
watcher,
chroot,
rx,
};
(watch, tx)
}
pub async fn run(mut self) {
while let Some(msg) = self.rx.recv().await {
self.process_message(msg);
}
}
fn process_message(&mut self, message: WatchMessage) {
match message {
WatchMessage::Event(response) => {
info!("Event thread got response {:?}", response.header);
let mut data = response.data;
match response.header.err {
0 => match WatchedEvent::read_from(&mut data) {
Ok(mut event) => {
self.cut_chroot(&mut event);
self.dispatch(&event);
}
Err(e) => error!("Failed to parse WatchedEvent {:?}", e),
},
e => error!("WatchedEvent.error {:?}", e),
}
}
WatchMessage::Watch(watch) => {
self.watches
.entry(watch.path.clone())
.or_insert_with(Vec::new)
.push(watch);
}
}
}
fn cut_chroot(&self, event: &mut WatchedEvent) {
if let Some(ref chroot) = self.chroot {
if event.path.is_some() {
event.path = Some(event.path.as_ref().unwrap()[chroot.len()..].to_owned());
}
}
}
fn dispatch(&mut self, event: &WatchedEvent) {
debug!("{:?}", event);
if let Some(watches) = self.find_watches(event) {
for watch in watches.into_iter() {
(watch.watcher)(event.clone())
}
} else {
self.watcher.handle(event.clone())
}
}
fn find_watches(&mut self, event: &WatchedEvent) -> Option<Vec<Watch>> {
if let Some(ref path) = event.path {
match self.watches.remove(path) {
Some(watches) => {
let (matching, left): (_, Vec<Watch>) =
watches.into_iter().partition(|w| match event.event_type {
WatchedEventType::NodeChildrenChanged => {
w.watch_type == WatchType::Child
}
WatchedEventType::NodeCreated | WatchedEventType::NodeDataChanged => {
w.watch_type == WatchType::Data || w.watch_type == WatchType::Exist
}
WatchedEventType::NodeDeleted => true,
_ => false,
});
if !left.is_empty() {
self.watches.insert(path.to_owned(), left);
}
if matching.is_empty() {
None
} else {
Some(matching)
}
}
None => None,
}
} else {
None
}
}
}