rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{
    collections::BTreeMap,
    sync::{Arc, Mutex},
};

use tokio::{
    sync::oneshot,
    task::JoinHandle,
    time::{sleep, timeout},
};

use crate::discovery_etcd::EtcdDiscoveryConfig;
use crate::discovery_etcd::client::SharedEtcdClient;

/// Current state of a locally managed etcd lease.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EtcdLeaseState {
    /// Lease keep-alive stream is active.
    Active,
    /// Lease keep-alive failed and is waiting for retry or expiration.
    Failed(String),
    /// Lease keep-alive was stopped by deregistration or registry drop.
    Stopped,
}

/// Status snapshot for a locally managed etcd lease.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EtcdLeaseStatus {
    /// etcd lease id.
    pub lease_id: i64,
    /// Current keep-alive state.
    pub state: EtcdLeaseState,
}

pub(crate) type LeaseStatuses = Arc<Mutex<BTreeMap<String, EtcdLeaseStatus>>>;

pub(crate) struct LeaseRegistration {
    pub(crate) lease_id: i64,
    stop: Option<oneshot::Sender<()>>,
    task: JoinHandle<()>,
}

impl LeaseRegistration {
    pub(crate) fn stop(mut self) {
        if let Some(stop) = self.stop.take() {
            let _ = stop.send(());
        }
        self.task.abort();
    }
}

enum KeepAliveOutcome {
    Stopped,
    Failed(String),
}

pub(crate) fn spawn_keep_alive_task(
    config: EtcdDiscoveryConfig,
    client: SharedEtcdClient,
    key: String,
    lease_id: i64,
    statuses: LeaseStatuses,
) -> LeaseRegistration {
    set_lease_status(&statuses, &key, lease_id, EtcdLeaseState::Active);
    let (stop, stop_receiver) = oneshot::channel();
    let task = tokio::spawn(lease_keep_alive_task(
        config,
        client,
        key,
        lease_id,
        statuses,
        stop_receiver,
    ));
    LeaseRegistration {
        lease_id,
        stop: Some(stop),
        task,
    }
}

pub(crate) fn set_lease_status(
    statuses: &LeaseStatuses,
    key: &str,
    lease_id: i64,
    state: EtcdLeaseState,
) {
    statuses
        .lock()
        .expect("lease status mutex poisoned")
        .insert(key.to_string(), EtcdLeaseStatus { lease_id, state });
}

async fn lease_keep_alive_task(
    config: EtcdDiscoveryConfig,
    client: SharedEtcdClient,
    key: String,
    lease_id: i64,
    statuses: LeaseStatuses,
    mut stop: oneshot::Receiver<()>,
) {
    loop {
        let outcome = run_keep_alive_once(&config, &client, lease_id, &mut stop).await;
        match outcome {
            KeepAliveOutcome::Stopped => {
                set_lease_status(&statuses, &key, lease_id, EtcdLeaseState::Stopped);
                break;
            }
            KeepAliveOutcome::Failed(message) => {
                set_lease_status(&statuses, &key, lease_id, EtcdLeaseState::Failed(message));
            }
        }
        tokio::select! {
            _ = &mut stop => {
                set_lease_status(&statuses, &key, lease_id, EtcdLeaseState::Stopped);
                break;
            }
            _ = sleep(config.reconnect_interval) => {}
        }
    }
}

async fn run_keep_alive_once(
    config: &EtcdDiscoveryConfig,
    client: &SharedEtcdClient,
    lease_id: i64,
    stop: &mut oneshot::Receiver<()>,
) -> KeepAliveOutcome {
    let (mut keeper, mut stream) = match open_keep_alive(config, client, lease_id).await {
        Ok(stream) => stream,
        Err(error) => return KeepAliveOutcome::Failed(error),
    };
    if let Err(error) = send_keep_alive(&mut keeper, &mut stream, config.keep_alive_timeout).await {
        return KeepAliveOutcome::Failed(error);
    }
    loop {
        tokio::select! {
            _ = &mut *stop => return KeepAliveOutcome::Stopped,
            _ = sleep(config.keep_alive_interval) => {
                if let Err(error) =
                    send_keep_alive(&mut keeper, &mut stream, config.keep_alive_timeout).await
                {
                    return KeepAliveOutcome::Failed(error);
                }
            }
        }
    }
}

async fn send_keep_alive(
    keeper: &mut etcd_client::LeaseKeeper,
    stream: &mut etcd_client::LeaseKeepAliveStream,
    response_timeout: std::time::Duration,
) -> Result<(), String> {
    keeper
        .keep_alive()
        .await
        .map_err(|error| error.to_string())?;
    match timeout(response_timeout, stream.message()).await {
        Ok(Ok(Some(response))) if response.ttl() > 0 => Ok(()),
        Ok(Ok(Some(_))) => Err("lease keep-alive returned non-positive ttl".to_string()),
        Ok(Ok(None)) => Err("lease keep-alive stream closed".to_string()),
        Ok(Err(error)) => Err(error.to_string()),
        Err(_) => Err("lease keep-alive timed out".to_string()),
    }
}

async fn open_keep_alive(
    config: &EtcdDiscoveryConfig,
    client: &SharedEtcdClient,
    lease_id: i64,
) -> Result<(etcd_client::LeaseKeeper, etcd_client::LeaseKeepAliveStream), String> {
    let mut client = client.lock().await;
    timeout(config.operation_timeout, client.lease_keep_alive(lease_id))
        .await
        .map_err(|_| "lease_keep_alive timed out".to_string())?
        .map_err(|error| error.to_string())
}