etcd-detector 0.5.2

An etcd-based implementation of service registration and discovery.
Documentation
use crate::response::parse_meta_response;
use crate::watch::Watcher;
use detector::{Meta, MetaKey};
use std::time::Duration;

pub struct MetaDetector {
    meta: Meta,
    client: etcd_client::Client,
}

impl MetaDetector {
    async fn real_fetch(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
        self.client.get(self.meta.key.path(), None).await
    }

    async fn real_fetch_all(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
        self.client
            .get(
                self.meta.key.root_path(),
                Some(etcd_client::GetOptions::new().with_prefix()),
            )
            .await
    }

    async fn watch_inner<G>(
        &mut self,
        resp: etcd_client::GetResponse,
        key: String,
        watch_opts_fn: G,
    ) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error>
    where
        G: FnOnce(i64) -> etcd_client::WatchOptions,
    {
        // 2. 获取 revision
        let revision = resp.header().unwrap().revision();

        // 3. 创建 watch
        let watch = self
            .client
            .watch(key, Some(watch_opts_fn(revision + 1)))
            .await?;

        Ok(Watcher::new(parse_meta_response(resp), watch.0, watch.1))
    }
}

impl MetaDetector {
    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
        meta: Meta,
        endpoints: S,
    ) -> Result<Self, etcd_client::Error> {
        let options = Some(
            etcd_client::ConnectOptions::new()
                .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
        );
        let client = etcd_client::Client::connect(endpoints, options).await?;
        Ok(Self::new(client, meta))
    }

    pub fn new(client: etcd_client::Client, meta: Meta) -> Self {
        Self { meta, client }
    }
}

impl MetaDetector {
    /// 注册服务元数据, 不需要ttl
    pub async fn register(&mut self) -> Result<(), etcd_client::Error> {
        let key = self.meta.key.path();
        let value = serde_json::to_string(&self.meta).expect("Serialize Error");
        self.client.put(key, value, None).await.map(|_| ())
    }

    pub async fn unregister(&mut self) -> Result<(), etcd_client::Error> {
        let key = self.meta.key.path();
        self.client.delete(key, None).await.map(|_| ())
    }

    pub async fn fetch(&mut self) -> Result<Option<Meta>, etcd_client::Error> {
        let metas = parse_meta_response(self.real_fetch().await?);
        if metas.is_empty() {
            Ok(None)
        } else {
            Ok(Some(metas[0].clone()))
        }
    }

    pub async fn fetch_all(&mut self) -> Result<Vec<Meta>, etcd_client::Error> {
        Ok(parse_meta_response(self.real_fetch_all().await?))
    }

    /// 监听元数据
    pub async fn watch(&mut self) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error> {
        let resp = self.real_fetch().await?;
        self.watch_inner(resp, self.meta.key.path(), |rev| {
            etcd_client::WatchOptions::new().with_start_revision(rev)
        })
        .await
    }

    /// 监听所有的元数据
    pub async fn watch_all(&mut self) -> Result<Watcher<Meta, MetaKey>, etcd_client::Error> {
        let resp = self.real_fetch_all().await?;
        self.watch_inner(resp, self.meta.key.root_path(), |rev| {
            etcd_client::WatchOptions::new()
                .with_prefix()
                .with_start_revision(rev)
        })
        .await
    }
}