detcd 0.1.4

An etcd-based implementation of service registration and discovery.
Documentation
use async_stream::stream;
use detector::WatchEvent;
use futures_core::Stream;
use futures_util::StreamExt;
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::pin::Pin;

/// 监听事件
pub struct Watcher<T1, T2>
where
    T1: Clone + Debug + Send,
    T2: Clone + Debug + Send,
{
    stream: Pin<Box<dyn Stream<Item = Result<WatchEvent<T1, T2>, etcd_client::Error>> + Send>>,
}

impl<T1, T2> Watcher<T1, T2>
where
    T1: Clone + Debug + DeserializeOwned + 'static + Send,
    T2: Clone + Debug + for<'a> TryFrom<&'a [u8]> + 'static + Send,
    for<'a> <T2 as TryFrom<&'a [u8]>>::Error: Send,
{
    pub(crate) fn new(
        init: Vec<T1>,
        _watcher: etcd_client::Watcher,
        watch_stream: etcd_client::WatchStream,
    ) -> Self {
        let stream = Box::pin(stream! {
            for i in init {
                yield Ok(WatchEvent::Changed(i));
            }

            let _watcher = _watcher;
            let mut watch_stream = watch_stream;
            loop {
                match watch_stream.message().await {
                    Err(e) => yield Err(e),
                    Ok(resp) => {
                        match resp {
                            None => yield Err(etcd_client::Error::WatchError(String::from("stream closed"))),
                            Some(resp) => {
                                for event in resp.events() {
                                    if event.kv().is_none() {
                                        continue;
                                    }

                                    let kv = event.kv().unwrap();
                                    match event.event_type() {
                                        etcd_client::EventType::Put => {
                                            if let Ok(r) = serde_json::from_slice::<T1>(kv.value()) {
                                                yield Ok(WatchEvent::Changed(r));
                                            }
                                        },
                                        etcd_client::EventType::Delete => {
                                            if let Ok(r) = T2::try_from(kv.key()) {
                                                yield Ok(WatchEvent::Deleted(r));
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        });

        Self { stream }
    }

    /// 获取监听的事件
    pub async fn event(&mut self) -> Result<WatchEvent<T1, T2>, etcd_client::Error> {
        if let Some(e) = self.stream.next().await {
            e
        } else {
            panic!("error: should not return none");
        }
    }
}