detcd 0.1.4

An etcd-based implementation of service registration and discovery.
Documentation
use crate::client::Client;
use detector::{Meta, MetaKey, Service, ServiceStatus};
use etcd_client::{Error, PutOptions};
use serde::{Serialize, Serializer};
use std::time::Duration;
use tokio::sync::watch::{Receiver, Sender};

enum Inner<T> {
    /// Option<i64>是租约时长, <=0代表没有租约.
    Meta(T, Option<i64>),
    Service(T, Sender<ServiceStatus>, Receiver<ServiceStatus>),
    SimpleService(T),
}

pub struct SimpleService(Service);

impl Serialize for SimpleService {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        self.0.serialize(serializer)
    }
}

/// 注册元数据与注册服务实例数据的行为不一样.
/// 注册元数据不会自动给租约续约(租约存在的情况下).
/// 注册服务实例数据会自动给租约续约(租约存在的情况下).
pub struct Registrar<T> {
    client: Client,
    inner: Inner<T>,
}

impl<T> Registrar<T> {
    async fn do_register(
        &mut self,
        path: String,
        value: String,
        ttl: Option<i64>,
    ) -> Result<Option<i64>, Error> {
        let (opt, id) = if let Some(ttl) = ttl {
            let lease = self.client.client.lease_grant(ttl, None).await?;
            (
                Some(PutOptions::new().with_lease(lease.id())),
                Some(lease.id()),
            )
        } else {
            (None, None)
        };

        let _ = self.client.client.put(path, value, opt).await?;
        Ok(id)
    }

    async fn do_unregister(&mut self, path: String) -> Result<(), Error> {
        self.client.client.delete(path, None).await.map(|_| ())
    }
}

impl<T: Serialize> Registrar<T> {
    fn serialize_value(&self) -> Result<String, Error> {
        match self.inner {
            Inner::Meta(ref m, _) => {
                serde_json::to_string(m).map_err(|e| Error::InvalidArgs(format!("{:?}", e)))
            }
            Inner::Service(ref s, _, _) => {
                serde_json::to_string(s).map_err(|e| Error::InvalidArgs(format!("{:?}", e)))
            }
            Inner::SimpleService(ref s) => {
                serde_json::to_string(s).map_err(|e| Error::InvalidArgs(format!("{:?}", e)))
            }
        }
    }
}

impl From<(Client, Meta)> for Registrar<Meta> {
    fn from(value: (Client, Meta)) -> Self {
        Self {
            client: value.0,
            inner: Inner::Meta(value.1, None),
        }
    }
}

impl Registrar<Meta> {
    /// 带超时时长
    pub fn with_ttl(mut self, ttl: Option<i64>) -> Self {
        if let Inner::Meta(_, ref mut t) = self.inner {
            *t = ttl.filter(|t| *t > 0);
        }
        self
    }

    /// 可以多次重复注册
    pub async fn register(&mut self) -> Result<(), Error> {
        let value = self.serialize_value()?;
        let inner = self.to_inner();
        self.do_register(inner.0.key.path(), value, inner.1.clone())
            .await
            .map(|_| ())
    }

    /// 可以多次重复注销
    pub async fn unregister(&mut self) -> Result<(), Error> {
        let inner = self.to_inner();
        self.do_unregister(inner.0.key.path()).await
    }

    fn to_inner(&self) -> (&Meta, &Option<i64>) {
        match self.inner {
            Inner::Meta(ref m, ref t) => (m, t),
            _ => panic!("never"),
        }
    }
}

impl From<(Client, Service)> for Registrar<SimpleService> {
    fn from(value: (Client, Service)) -> Self {
        Self {
            client: value.0,
            inner: Inner::SimpleService(SimpleService(value.1)),
        }
    }
}

impl Registrar<SimpleService> {
    /// 可以多次重复注册
    pub async fn register(&mut self) -> Result<(), Error> {
        let inner = self.to_inner();
        let path = inner
            .key
            .path()
            .ok_or(Error::InvalidArgs("no id".to_string()))?;
        let ttl = inner.ttl.filter(|t| *t > 0);
        let value = self.serialize_value()?;
        self.do_register(path, value, ttl).await.map(|_| ())
    }

    /// 可以多次重复注销
    pub async fn unregister(&mut self) -> Result<(), Error> {
        let inner = self.to_inner();
        let path = inner
            .key
            .path()
            .ok_or(Error::InvalidArgs("no id".to_string()))?;
        self.do_unregister(path).await
    }

    fn to_inner(&self) -> &Service {
        match self.inner {
            Inner::SimpleService(ref s) => &s.0,
            _ => panic!("never"),
        }
    }
}

impl From<(Client, Service)> for Registrar<Service> {
    fn from(mut value: (Client, Service)) -> Self {
        value.1.ttl = value
            .1
            .ttl
            .map_or(Some(60), |t| if t < 10 { Some(10) } else { Some(t) });
        let (tx, rx) = tokio::sync::watch::channel(ServiceStatus::Unregistered);
        Self {
            client: value.0,
            inner: Inner::Service(value.1, tx, rx),
        }
    }
}

impl Registrar<Service> {
    /// 如果是自动续约模式且已经注册过,则不允许重复调用
    pub async fn register(&mut self) -> Result<(), Error> {
        let mut client = self.client.clone();
        let (service, tx, _) = self.to_inner_mut();
        if service.key.has_id() {
            // 判定义为已注册
            return Err(Error::InvalidArgs(
                "not allow to register again".to_string(),
            ));
        }

        // 获取元数据
        let meta = client
            .fetch_meta(&MetaKey::new(
                service.key.name.clone(),
                service.key.ns.clone(),
            ))
            .await?
            .ok_or(Error::InvalidArgs("no meta".to_string()))?;

        // 轮询注册
        let instances = meta.instances;
        service.meta = Some(meta);
        let lease_id = {
            let mut lease_id = 0;
            // 挨个尝试
            for id in 0..instances {
                service.key.id = Some(id);
                let result = txn_register(service, client.client.clone()).await?;
                if result != 0 {
                    lease_id = result;
                    break;
                }
            }
            lease_id
        };

        if lease_id == 0 {
            // 实例数不够
            return Err(Error::InvalidArgs("no instance enough".to_string()));
        }

        // 状态修改
        change_status(&tx, ServiceStatus::Registered);

        // 维系租约
        keep_lease_alive(lease_id, service.clone(), client.client, tx.clone());
        Ok(())
    }

    pub fn service(&self) -> &Service {
        self.to_inner().0
    }

    /// 查看最新的状态值
    pub fn status(&self) -> ServiceStatus {
        *self.to_inner().2.borrow()
    }

    /// 返回失败则说明通道被关闭了
    pub async fn changed(&mut self) -> ServiceStatus {
        if !self.service().key.has_id() {
            return self.status();
        }
        let _ = self.to_inner_mut().2.changed().await.expect("impossible");
        self.status()
    }
}

impl Registrar<Service> {
    fn to_inner_mut(
        &mut self,
    ) -> (
        &mut Service,
        &mut Sender<ServiceStatus>,
        &mut Receiver<ServiceStatus>,
    ) {
        match self.inner {
            Inner::Service(ref mut s, ref mut tx, ref mut rx) => (s, tx, rx),
            _ => panic!("never"),
        }
    }

    fn to_inner(&self) -> (&Service, &Sender<ServiceStatus>, &Receiver<ServiceStatus>) {
        match self.inner {
            Inner::Service(ref s, ref tx, ref rx) => (s, tx, rx),
            _ => panic!("never"),
        }
    }
}

async fn txn_register(service: &Service, mut client: etcd_client::Client) -> Result<i64, Error> {
    let path = service.key.path().unwrap();
    // 获取租约
    let lease = client.lease_grant(service.ttl.unwrap(), None).await?;

    // key不存在时版本是0
    let cmp = etcd_client::Compare::version(&*path, etcd_client::CompareOp::Equal, 0);

    // 序列化数据
    let value = serde_json::to_string(&service).expect("Serialize Error");

    // 定义事务
    let put = etcd_client::TxnOp::put(path, value, Some(PutOptions::new().with_lease(lease.id())));

    let txn = etcd_client::Txn::new().when([cmp]).and_then([put]);

    // 执行事务
    let resp = client.txn(txn).await?;

    if resp.succeeded() {
        Ok(lease.id())
    } else {
        Ok(0)
    }
}

/// 维系租约
fn keep_lease_alive(
    lease_id: i64,
    service: Service,
    mut client: etcd_client::Client,
    tx: Sender<ServiceStatus>,
) {
    async fn do_keep_alive(
        lease_id: i64,
        client: &mut etcd_client::Client,
        tx: &Sender<ServiceStatus>,
    ) {
        loop {
            // 退出
            if tx.sender_count() == 1 {
                break;
            }

            match client.lease_keep_alive(lease_id).await {
                Ok((mut keeper, mut stream)) => {
                    // 持续发送消息维持租约
                    loop {
                        // 退出
                        if tx.sender_count() == 1 {
                            break;
                        }
                        if let Err(_e) = keeper.keep_alive().await {
                            break;
                        }
                        match stream.message().await {
                            Err(_e) => {
                                break;
                            }
                            Ok(Some(r)) => {
                                if r.ttl() == 0 {
                                    // 租约到期了, 要重新注册
                                    return;
                                }
                                change_status(&tx, ServiceStatus::Registered);
                            }
                            Ok(None) => {
                                // 流断了
                                break;
                            }
                        }
                        // 稍做停顿
                        tokio::time::sleep(Duration::from_secs(5)).await;
                    }
                }
                Err(Error::LeaseKeepAliveError(_e)) => {
                    // 租约不存在了
                    return;
                }
                Err(_e) => {
                    tokio::time::sleep(Duration::from_secs(1)).await;
                }
            }

            // 设置为未注册
            change_status(&tx, ServiceStatus::Unregistered);
        }
    }

    let _ = tokio::spawn(async move {
        do_keep_alive(lease_id, &mut client, &tx).await;
        change_status(&tx, ServiceStatus::Unregistered);
        register_service_again(service, client, tx).await;
    });
}

async fn register_service_again(
    service: Service,
    mut client: etcd_client::Client,
    tx: Sender<ServiceStatus>,
) {
    // 一直重试到成功
    let lease_id = loop {
        if tx.sender_count() == 1 {
            // 直接卸载掉
            let path = service.key.path().unwrap();
            let _ = client.delete(path, None).await;
            return;
        }

        match txn_register(&service, client.clone()).await {
            Ok(r) if r != 0 => {
                break r;
            }
            _ => {
                tokio::time::sleep(Duration::from_secs(2)).await;
            }
        }
    };

    if lease_id != 0 {
        // 状态修改
        change_status(&tx, ServiceStatus::Registered);
        keep_lease_alive(lease_id, service, client, tx);
    }
}

fn change_status(tx: &Sender<ServiceStatus>, status: ServiceStatus) {
    if status != *tx.borrow() {
        let _ = tx.send(status);
    }
}