use async_stream::stream;
use detector::WatchEvent;
use futures_core::Stream;
use futures_util::StreamExt;
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::pin::Pin;
pub struct Watcher<T1, T2>
where
T1: Clone + Debug + Send,
T2: Clone + Debug + Send,
{
stream: Pin<Box<dyn Stream<Item = Result<WatchEvent<T1, T2>, etcd_client::Error>> + Send>>,
}
impl<T1, T2> Watcher<T1, T2>
where
T1: Clone + Debug + DeserializeOwned + 'static + Send,
T2: Clone + Debug + for<'a> TryFrom<&'a [u8]> + 'static + Send,
for<'a> <T2 as TryFrom<&'a [u8]>>::Error: Send,
{
pub(crate) fn new(
init: Vec<T1>,
_watcher: etcd_client::Watcher,
watch_stream: etcd_client::WatchStream,
) -> Self {
let stream = Box::pin(stream! {
for i in init {
yield Ok(WatchEvent::Changed(i));
}
let _watcher = _watcher;
let mut watch_stream = watch_stream;
loop {
match watch_stream.message().await {
Err(e) => yield Err(e),
Ok(resp) => {
match resp {
None => yield Err(etcd_client::Error::WatchError(String::from("stream closed"))),
Some(resp) => {
for event in resp.events() {
if event.kv().is_none() {
continue;
}
let kv = event.kv().unwrap();
match event.event_type() {
etcd_client::EventType::Put => {
if let Ok(r) = serde_json::from_slice::<T1>(kv.value()) {
yield Ok(WatchEvent::Changed(r));
}
},
etcd_client::EventType::Delete => {
if let Ok(r) = T2::try_from(kv.key()) {
yield Ok(WatchEvent::Deleted(r));
}
}
}
}
}
}
}
}
}
});
Self { stream }
}
pub async fn event(&mut self) -> Result<WatchEvent<T1, T2>, etcd_client::Error> {
if let Some(e) = self.stream.next().await {
e
} else {
panic!("error: should not return none");
}
}
}