etcd-detector 0.5.2

An etcd-based implementation of service registration and discovery.
Documentation
use crate::response::parse_service_response;
use crate::trace::debug;
use crate::watch::Oneself;
use crate::{MetaDetector, Watcher};
use detector::{Meta, MetaKey, Service, ServiceKey, ServiceStatus, Services};
use std::collections::HashMap;
use std::time::Duration;

struct Inner {
    service: Service,
    client: etcd_client::Client,
    sender: tokio::sync::watch::Sender<ServiceStatus>,
    receiver: tokio::sync::watch::Receiver<ServiceStatus>,
}

pub struct Detector {
    inner: Inner,
}

impl Detector {
    /// Connect to etcd servers from given endpoints.
    pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
        service: Service,
        endpoints: S,
    ) -> Result<crate::Detector, etcd_client::Error> {
        let options = Some(
            etcd_client::ConnectOptions::new()
                .with_keep_alive(Duration::from_secs(5), Duration::from_secs(10)),
        );
        let client = etcd_client::Client::connect(endpoints, options).await?;
        Ok(crate::Detector::new(client, service))
    }

    pub fn new(client: etcd_client::Client, service: Service) -> Self {
        let (sender, receiver) = tokio::sync::watch::channel(ServiceStatus::Unregistered);
        Self {
            inner: Inner {
                service,
                client,
                sender,
                receiver,
            },
        }
    }

    async fn real_fetch(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
        self.inner
            .client
            .get(
                self.inner.service.key.parent_path(),
                Some(etcd_client::GetOptions::new().with_prefix()),
            )
            .await
    }

    async fn real_fetch_all(&mut self) -> Result<etcd_client::GetResponse, etcd_client::Error> {
        self.inner
            .client
            .get(
                self.inner.service.key.root_path(),
                Some(etcd_client::GetOptions::new().with_prefix()),
            )
            .await
    }

    async fn watch_inner(
        &mut self,
        resp: etcd_client::GetResponse,
        key: String,
    ) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
        // 2. 获取 revision
        let revision = resp.header().unwrap().revision();

        // 3. 创建 watch
        let watch = self
            .inner
            .client
            .watch(
                key,
                Some(
                    etcd_client::WatchOptions::new()
                        .with_prefix()
                        .with_start_revision(revision + 1),
                ),
            )
            .await?;

        let init: Vec<Service> = parse_service_response(resp)
            .into_values()
            .map(|s| s.services)
            .flatten()
            .collect();
        Ok(Watcher::new(init, watch.0, watch.1))
    }
}

impl Detector {
    pub fn service(&self) -> &Service {
        &self.inner.service
    }

    /// 获取本类型服所有实例
    pub async fn fetch(&mut self) -> Result<Services, etcd_client::Error> {
        let name = &self.inner.service.key.name.clone();
        let mut resp = parse_service_response(self.real_fetch().await?);
        match resp.remove(name) {
            None => Ok(Services::new()),
            Some(s) => Ok(s),
        }
    }

    /// 获取所有注册的服务实例
    pub async fn fetch_all(&mut self) -> Result<HashMap<String, Services>, etcd_client::Error> {
        Ok(parse_service_response(self.real_fetch_all().await?))
    }

    /// 监听本类型服务实例
    pub async fn watch(&mut self) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
        let resp = self.real_fetch().await?;
        let parent_path = self.inner.service.key.parent_path();
        self.watch_inner(resp, parent_path).await
    }

    /// 监听所有注册的服务实例
    pub async fn watch_all(&mut self) -> Result<Watcher<Service, ServiceKey>, etcd_client::Error> {
        let resp = self.real_fetch_all().await?;
        let root_path = self.inner.service.key.root_path();
        self.watch_inner(resp, root_path).await
    }

    /// 监听服务自己本身状态
    pub fn oneself(&self) -> Oneself {
        Oneself::new(self.inner.receiver.clone())
    }

    /// 第一次注册如果失败则返回,如果成功也返回但内部会有掉线重试机制
    pub async fn register(&mut self) -> Result<Oneself, etcd_client::Error> {
        if !self.inner.service.key.has_id() {
            register_service(&mut self.inner).await?;
        }
        Ok(self.oneself())
    }
}

async fn register_service(inner: &mut Inner) -> Result<(), etcd_client::Error> {
    // 先获取元数据
    let mut meta_detect = MetaDetector::new(
        inner.client.clone(),
        Meta::from_key(MetaKey::new(
            inner.service.key.name.clone(),
            inner.service.key.ns.clone(),
        )),
    );

    let meta = meta_detect
        .fetch()
        .await?
        .ok_or(etcd_client::Error::InvalidArgs("no meta".to_string()))?;
    let instances = meta.instances;

    // 填充meta
    inner.service.meta = Some(meta);
    // 如果没设置ttl则给个默认的
    inner.service.ttl = Some(
        inner
            .service
            .ttl
            .map_or(60, |t| if t < 10 { 10 } else { t }),
    );

    let lease_id = {
        let mut lease_id = 0;
        // 挨个尝试
        for id in 0..instances {
            inner.service.key.id = Some(id);
            let result = do_register(&inner.service, &mut inner.client).await?;
            if result != 0 {
                lease_id = result;
                break;
            }
        }
        lease_id
    };

    if lease_id == 0 {
        debug!(
            "failed to register cause of no instance enough: {:?}",
            inner.service
        );
        // 实例数不够
        return Err(etcd_client::Error::InvalidArgs(
            "no instance enough".to_string(),
        ));
    }

    // 状态修改
    let _ = inner.sender.send(ServiceStatus::Registered);

    // 维系租约
    keep_alive(
        lease_id,
        inner.service.clone(),
        inner.client.clone(),
        inner.sender.clone(),
    );
    Ok(())
}

async fn do_register(
    service: &Service,
    client: &mut etcd_client::Client,
) -> Result<i64, etcd_client::Error> {
    debug!("try to register with:{:?}", service);
    let path = service.key.path().unwrap();
    // 获取租约
    let lease = client.lease_grant(service.ttl.unwrap(), None).await?;

    // key不存在时版本是0
    let cmp = etcd_client::Compare::version(&*path, etcd_client::CompareOp::Equal, 0);

    // 序列化数据
    let value = serde_json::to_string(&service).expect("Serialize Error");

    // 定义事务
    let put = etcd_client::TxnOp::put(
        path,
        value,
        Some(etcd_client::PutOptions::new().with_lease(lease.id())),
    );

    let txn = etcd_client::Txn::new().when([cmp]).and_then([put]);

    // 执行事务
    let resp = client.txn(txn).await?;

    if resp.succeeded() {
        debug!("register ok, lease id:{:?}", lease.id());
        Ok(lease.id())
    } else {
        Ok(0)
    }
}

fn keep_alive(
    lease_id: i64,
    service: Service,
    mut client: etcd_client::Client,
    sender: tokio::sync::watch::Sender<ServiceStatus>,
) {
    async fn do_keep_alive(
        lease_id: i64,
        client: &mut etcd_client::Client,
        sender: &tokio::sync::watch::Sender<ServiceStatus>,
    ) {
        debug!("lease id:{} enter keep_alive", lease_id);
        loop {
            // 退出
            if sender.sender_count() == 1 {
                break;
            }

            match client.lease_keep_alive(lease_id).await {
                Ok((mut keeper, mut stream)) => {
                    // 持续发送消息维持租约
                    loop {
                        // 退出
                        if sender.sender_count() == 1 {
                            break;
                        }
                        if let Err(_e) = keeper.keep_alive().await {
                            debug!(
                                "lease id:{} send keep_alive request happened error:{:?}",
                                lease_id, _e
                            );
                            break;
                        }
                        match stream.message().await {
                            Err(_e) => {
                                debug!(
                                    "lease id:{} recv keep_alive response happened error:{:?}",
                                    lease_id, _e
                                );
                                break;
                            }
                            Ok(Some(r)) => {
                                if r.ttl() == 0 {
                                    // 租约到期了, 要重新注册
                                    return;
                                }
                                let _ = sender.send(ServiceStatus::Registered);
                            }
                            Ok(None) => {
                                // 流断了
                                break;
                            }
                        }
                        // 稍做停顿
                        tokio::time::sleep(Duration::from_secs(5)).await;
                    }
                }
                Err(etcd_client::Error::LeaseKeepAliveError(_e)) => {
                    // 租约不存在了
                    debug!(
                        "lease id:{} get keep_alive stream happened error:{:?}",
                        lease_id, _e
                    );
                    return;
                }
                Err(_e) => {
                    debug!(
                        "lease id:{} get keep_alive stream happened error:{:?}",
                        lease_id, _e
                    );
                    tokio::time::sleep(Duration::from_secs(1)).await;
                }
            }

            // 设置为未注册
            let _ = sender.send(ServiceStatus::Unregistered);
        }
    }

    tokio::spawn(async move {
        do_keep_alive(lease_id, &mut client, &sender).await;
        let _ = sender.send(ServiceStatus::Unregistered);
        register_service_again(service, client, sender).await;
    });
}

async fn register_service_again(
    service: Service,
    mut client: etcd_client::Client,
    sender: tokio::sync::watch::Sender<ServiceStatus>,
) {
    // 一直重试到成功
    let lease_id = loop {
        if sender.sender_count() == 1 {
            // 直接卸载掉
            let path = service.key.path().unwrap();
            let _ = client.delete(path, None).await;
            debug!("unregister ok: {:?}", service);
            return;
        }

        match do_register(&service, &mut client).await {
            Ok(r) if r != 0 => {
                break r;
            }
            _ => {
                tokio::time::sleep(Duration::from_secs(2)).await;
            }
        }
    };
    if lease_id != 0 {
        // 状态修改
        let _ = sender.send(ServiceStatus::Registered);
        keep_alive(lease_id, service, client, sender);
    }
}