detcd 0.1.4

An etcd-based implementation of service registration and discovery.
Documentation
use crate::client::Client;
use crate::watch::Watcher;
use detector::{Meta, MetaKey, Service, ServiceKey, WatchEvent};
use etcd_client::Error;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::fmt::Debug;
use std::pin::Pin;

#[derive(Debug, Clone)]
pub enum HistoryEvent<T: Clone + Debug> {
    Changed(T, Option<T>),
    Deleted(T),
}

/// 监听的历史类型
#[derive(Debug, Clone)]
pub enum HistoryType {
    /// 单个meta
    Meta(MetaKey),
    /// 所有的meta
    Metas(String),
    /// 单个service
    Service(ServiceKey),
    /// 某类型中的所有service
    Services(ServiceKey),
    /// 所有类型的所有的service
    AllServices(String),
}

pub trait HasPath {
    fn get_path(&self) -> Option<String>;
}

impl HasPath for Meta {
    fn get_path(&self) -> Option<String> {
        Some(self.key.path())
    }
}

impl HasPath for MetaKey {
    fn get_path(&self) -> Option<String> {
        Some(self.path())
    }
}

impl HasPath for Service {
    fn get_path(&self) -> Option<String> {
        self.key.path()
    }
}

impl HasPath for ServiceKey {
    fn get_path(&self) -> Option<String> {
        self.path()
    }
}

/// 记录历史事件, 具有断线自主恢复的功能
pub struct History<T1, T2>
where
    T1: Clone + Debug + DeserializeOwned + 'static + Send,
    T2: Clone + Debug + for<'a> TryFrom<&'a [u8]> + 'static + Send,
    for<'a> <T2 as TryFrom<&'a [u8]>>::Error: Send,
{
    create: Box<
        dyn FnMut() -> Pin<Box<dyn Future<Output = Result<Watcher<T1, T2>, Error>> + Send>> + Send,
    >,
    watcher: Option<Watcher<T1, T2>>,
    record: Option<HashMap<String, T1>>,
}

impl<T1, T2> History<T1, T2>
where
    T1: Clone + Debug + DeserializeOwned + HasPath + 'static + Send,
    T2: Clone + Debug + HasPath + for<'a> TryFrom<&'a [u8]> + 'static + Send,
    for<'a> <T2 as TryFrom<&'a [u8]>>::Error: Send,
{
    pub fn create(
        create: Box<
            dyn FnMut() -> Pin<Box<dyn Future<Output = Result<Watcher<T1, T2>, Error>> + Send>>
                + Send,
        >,
    ) -> Self {
        Self {
            create,
            watcher: None,
            record: None,
        }
    }

    pub async fn get_watcher(&mut self) -> Result<&mut Watcher<T1, T2>, Option<Vec<T1>>> {
        if self.watcher.is_none() {
            match (self.create)().await {
                Ok(w) => self.watcher = Some(w),
                Err(_) => return Err(self.flatten_record()),
            }
        }
        Ok(self.watcher.as_mut().expect("infallible"))
    }

    /// 获取事件, 失败后会返回所有的旧记录.
    /// 失败后还可以继续使用,会自动恢复.
    pub async fn event(&mut self) -> Result<HistoryEvent<T1>, Option<Vec<T1>>> {
        loop {
            let watcher = self.get_watcher().await?;
            let we = match watcher.event().await {
                Err(_) => {
                    self.watcher = None;
                    return Err(self.flatten_record());
                }
                Ok(we) => we,
            };

            match we {
                WatchEvent::Changed(m) => {
                    if let Some(path) = m.get_path() {
                        let old = self.add_record(path, m.clone());
                        return Ok(HistoryEvent::Changed(m, old));
                    }
                    continue;
                }
                WatchEvent::Deleted(k) => {
                    if let Some(path) = k.get_path() {
                        if let Some(old) = self.del_record(&path) {
                            return Ok(HistoryEvent::Deleted(old));
                        }
                    }
                    continue;
                }
            }
        }
    }

    fn flatten_record(&mut self) -> Option<Vec<T1>> {
        match self.record.take() {
            None => None,
            Some(h) => Some(h.into_values().map(|t| t).collect()),
        }
    }

    fn add_record(&mut self, path: String, t1: T1) -> Option<T1> {
        if self.record.is_none() {
            self.record = Some(HashMap::new());
        }

        let record = self.record.as_mut().unwrap();
        let old = record.insert(path, t1);
        old
    }

    fn del_record(&mut self, path: &String) -> Option<T1> {
        match self.record {
            None => None,
            Some(ref mut r) => r.remove(path),
        }
    }
}

impl History<Meta, MetaKey> {
    pub fn new(client: Client, ht: HistoryType) -> Self {
        match ht {
            HistoryType::Meta(key) => Self::create(Box::new(move || {
                let mut client = client.clone();
                let key = key.clone();
                Box::pin(async move { client.watch_meta(&key).await })
            })),
            HistoryType::Metas(ns) => Self::create(Box::new(move || {
                let mut client = client.clone();
                let ns = ns.clone();
                Box::pin(async move { client.watch_metas(ns).await })
            })),
            _ => panic!("wrong type"),
        }
    }
}

impl History<Service, ServiceKey> {
    pub fn new(client: Client, ht: HistoryType) -> Self {
        match ht {
            HistoryType::Service(key) => Self::create(Box::new(move || {
                let mut client = client.clone();
                let key = key.clone();
                Box::pin(async move { client.watch_service(&key).await })
            })),
            HistoryType::Services(key) => Self::create(Box::new(move || {
                let mut client = client.clone();
                let key = key.clone();
                Box::pin(async move { client.watch_services(&key).await })
            })),
            HistoryType::AllServices(ns) => Self::create(Box::new(move || {
                let mut client = client.clone();
                let ns = ns.clone();
                Box::pin(async move { client.watch_all_services(ns).await })
            })),
            _ => panic!("wrong type"),
        }
    }
}