detcd 0.1.4

An etcd-based implementation of service registration and discovery.
Documentation
use std::collections::HashMap;
use std::time::Duration;

use detector::{Meta, MetaKey, Service, ServiceKey, Services};
use etcd_client::{Error, GetOptions, GetResponse, WatchOptions};

use crate::response::{parse_meta_response, parse_service_response};
use crate::watch::Watcher;

#[derive(Clone)]
pub struct Client {
    pub(crate) client: etcd_client::Client,
}

pub struct Builder {
    opt: etcd_client::ConnectOptions,
}

impl Builder {
    pub fn new() -> Self {
        Self {
            opt: etcd_client::ConnectOptions::new(),
        }
    }

    pub async fn build<E: AsRef<str>, S: AsRef<[E]>>(self, e: S) -> Result<Client, Error> {
        let client = etcd_client::Client::connect(e, Some(self.opt)).await?;
        Ok(Client { client })
    }

    pub fn with_user(mut self, name: impl Into<String>, password: impl Into<String>) -> Self {
        self.opt = self.opt.with_user(name, password);
        self
    }

    pub fn with_keep_alive(mut self, interval: Duration, timeout: Duration) -> Self {
        self.opt = self.opt.with_keep_alive(interval, timeout);
        self
    }

    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.opt = self.opt.with_timeout(timeout);
        self
    }

    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
        self.opt = self.opt.with_connect_timeout(timeout);
        self
    }

    pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self {
        self.opt = self.opt.with_tcp_keepalive(tcp_keepalive);
        self
    }

    /// Whether send keep alive pings even there are no active requests.
    /// If disabled, keep-alive pings are only sent while there are opened request/response streams.
    /// If enabled, pings are also sent when no streams are active.
    /// NOTE: Some implementations of gRPC server may send GOAWAY if there are too many pings.
    ///       This would be useful if you meet some error like `too many pings`.
    pub fn with_keep_alive_while_idle(mut self, enabled: bool) -> Self {
        self.opt = self.opt.with_keep_alive_while_idle(enabled);
        self
    }

    pub fn with_require_leader(mut self, require_leader: bool) -> Self {
        self.opt = self.opt.with_require_leader(require_leader);
        self
    }
}

impl Client {
    async fn do_fetch_meta(&mut self, key: &MetaKey) -> Result<GetResponse, Error> {
        self.client.get(key.path(), None).await
    }

    async fn do_fetch_metas(&mut self, key: &MetaKey) -> Result<GetResponse, Error> {
        self.client
            .get(key.root_path(), Some(GetOptions::new().with_prefix()))
            .await
    }

    async fn do_fetch_service(&mut self, key: &ServiceKey) -> Result<GetResponse, Error> {
        self.client
            .get(
                key.path().ok_or(Error::InvalidArgs("no id".to_string()))?,
                None,
            )
            .await
    }

    async fn do_fetch_services(&mut self, key: &ServiceKey) -> Result<GetResponse, Error> {
        self.client
            .get(key.parent_path(), Some(GetOptions::new().with_prefix()))
            .await
    }

    async fn do_fetch_all_services(&mut self, key: &ServiceKey) -> Result<GetResponse, Error> {
        self.client
            .get(key.root_path(), Some(GetOptions::new().with_prefix()))
            .await
    }

    async fn do_watch_meta<G>(
        &mut self,
        resp: GetResponse,
        key: String,
        watch_opts_fn: G,
    ) -> Result<Watcher<Meta, MetaKey>, Error>
    where
        G: FnOnce(i64) -> WatchOptions,
    {
        // 获取 revision
        let revision = resp.header().unwrap().revision();
        // 创建 watch
        let watch = self
            .client
            .watch(key, Some(watch_opts_fn(revision + 1)))
            .await?;

        Ok(Watcher::new(parse_meta_response(resp), watch.0, watch.1))
    }

    async fn do_watch_service<G>(
        &mut self,
        resp: GetResponse,
        key: String,
        watch_opts_fn: G,
    ) -> Result<Watcher<Service, ServiceKey>, Error>
    where
        G: FnOnce(i64) -> WatchOptions,
    {
        // 获取 revision
        let revision = resp.header().unwrap().revision();
        // 创建 watch
        let watch = self
            .client
            .watch(key, Some(watch_opts_fn(revision + 1)))
            .await?;

        let init: Vec<Service> = parse_service_response(resp)
            .into_values()
            .map(|s| s.services)
            .flatten()
            .collect();
        Ok(Watcher::new(init, watch.0, watch.1))
    }
}

impl Client {
    /// 获取元数据
    pub async fn fetch_meta(&mut self, key: &MetaKey) -> Result<Option<Meta>, Error> {
        let metas = parse_meta_response(self.do_fetch_meta(key).await?);
        if metas.is_empty() {
            Ok(None)
        } else {
            Ok(Some(metas[0].clone()))
        }
    }

    /// 获取所有的元数据
    pub async fn fetch_metas<S: Into<String>>(&mut self, namespace: S) -> Result<Vec<Meta>, Error> {
        Ok(parse_meta_response(
            self.do_fetch_metas(&MetaKey::new("whatever", namespace))
                .await?,
        ))
    }

    /// 获取单个服务实例
    pub async fn fetch_service(&mut self, key: &ServiceKey) -> Result<Option<Service>, Error> {
        let name = key.name.clone();
        let mut resp = parse_service_response(self.do_fetch_service(key).await?);
        match resp.remove(&name) {
            None => Ok(None),
            Some(mut s) => Ok(s.services.pop()),
        }
    }

    /// 获取该类型的所有的服务实例
    pub async fn fetch_services(&mut self, key: &ServiceKey) -> Result<Services, Error> {
        let name = key.name.clone();
        let mut resp = parse_service_response(self.do_fetch_services(key).await?);
        match resp.remove(&name) {
            None => Ok(Services::new()),
            Some(s) => Ok(s),
        }
    }

    /// 获取所有注册的服务实例
    pub async fn fetch_all_services<S: Into<String>>(
        &mut self,
        namespace: S,
    ) -> Result<HashMap<String, Services>, Error> {
        Ok(parse_service_response(
            self.do_fetch_all_services(&ServiceKey::new("whatever", namespace))
                .await?,
        ))
    }

    /// 监听meta
    pub async fn watch_meta(&mut self, key: &MetaKey) -> Result<Watcher<Meta, MetaKey>, Error> {
        let resp = self.do_fetch_meta(key).await?;
        self.do_watch_meta(resp, key.path(), |rev| {
            WatchOptions::new().with_start_revision(rev)
        })
        .await
    }

    /// 监听所有的metas
    pub async fn watch_metas<S: Into<String>>(
        &mut self,
        namespace: S,
    ) -> Result<Watcher<Meta, MetaKey>, Error> {
        let key = MetaKey::new("whatever", namespace);
        let resp = self.do_fetch_metas(&key).await?;
        self.do_watch_meta(resp, key.root_path(), |rev| {
            WatchOptions::new().with_prefix().with_start_revision(rev)
        })
        .await
    }

    /// 监听服务实例
    pub async fn watch_service(
        &mut self,
        key: &ServiceKey,
    ) -> Result<Watcher<Service, ServiceKey>, Error> {
        let resp = self.do_fetch_service(key).await?;
        self.do_watch_service(resp, key.path().clone().unwrap(), |rev| {
            WatchOptions::new().with_start_revision(rev)
        })
        .await
    }

    /// 监听该类型的所有的服务实例.
    pub async fn watch_services(
        &mut self,
        key: &ServiceKey,
    ) -> Result<Watcher<Service, ServiceKey>, Error> {
        let resp = self.do_fetch_services(key).await?;
        self.do_watch_service(resp, key.parent_path(), |rev| {
            WatchOptions::new().with_prefix().with_start_revision(rev)
        })
        .await
    }

    /// 监听所有的服务实例.
    pub async fn watch_all_services<S: Into<String>>(
        &mut self,
        namespace: S,
    ) -> Result<Watcher<Service, ServiceKey>, Error> {
        let key = ServiceKey::new("whatever", namespace);
        let resp = self.do_fetch_all_services(&key).await?;
        self.do_watch_service(resp, key.root_path(), |rev| {
            WatchOptions::new().with_prefix().with_start_revision(rev)
        })
        .await
    }
}