rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::sync::Arc;

use tokio::{sync::Mutex, time::timeout};

use crate::discovery_etcd::{EtcdDiscoveryConfig, EtcdDiscoveryError, EtcdDiscoveryResult};

/// Shared etcd client guarded by an async mutex because the upstream client uses `&mut self`.
#[cfg(feature = "discovery-etcd")]
pub(crate) type SharedEtcdClient = Arc<Mutex<etcd_client::Client>>;

/// Factory for etcd clients.
#[derive(Debug, Clone)]
pub struct EtcdClientFactory {
    config: EtcdDiscoveryConfig,
}

impl EtcdClientFactory {
    /// Creates a factory after validating endpoints and timing configuration.
    pub fn new(config: EtcdDiscoveryConfig) -> EtcdDiscoveryResult<Self> {
        if config
            .endpoints
            .iter()
            .all(|endpoint| endpoint.trim().is_empty())
        {
            return Err(EtcdDiscoveryError::MissingEndpoint);
        }
        validate_config(&config)?;
        Ok(Self { config })
    }

    /// Returns configured endpoints.
    pub fn endpoints(&self) -> &[String] {
        &self.config.endpoints
    }

    /// Creates a real etcd client when `discovery-etcd` is enabled.
    #[cfg(feature = "discovery-etcd")]
    pub async fn connect(&self) -> EtcdDiscoveryResult<etcd_client::Client> {
        let options = self.connect_options();
        match timeout(
            self.config.connect_timeout,
            etcd_client::Client::connect(self.config.endpoints.clone(), Some(options)),
        )
        .await
        {
            Ok(Ok(client)) => Ok(client),
            Ok(Err(error)) => Err(EtcdDiscoveryError::Backend(error.to_string())),
            Err(_) => Err(EtcdDiscoveryError::Timeout {
                operation: "connect",
            }),
        }
    }

    #[cfg(feature = "discovery-etcd")]
    fn connect_options(&self) -> etcd_client::ConnectOptions {
        let mut options = etcd_client::ConnectOptions::new()
            .with_connect_timeout(self.config.connect_timeout)
            .with_timeout(self.config.operation_timeout)
            .with_keep_alive(
                self.config.keep_alive_interval,
                self.config.keep_alive_timeout,
            )
            .with_keep_alive_while_idle(true);
        if let Some(auth) = &self.config.auth {
            options = options.with_user(auth.username.clone(), auth.password.clone());
        }
        options
    }
}

pub(crate) fn validate_config(config: &EtcdDiscoveryConfig) -> EtcdDiscoveryResult<()> {
    if config.prefix.trim_matches('/').is_empty() {
        return Err(EtcdDiscoveryError::InvalidConfig(
            "prefix must not be empty".to_string(),
        ));
    }
    if config.lease_ttl <= 0 {
        return Err(EtcdDiscoveryError::InvalidConfig(
            "lease_ttl must be greater than zero".to_string(),
        ));
    }
    if config.connect_timeout.is_zero()
        || config.operation_timeout.is_zero()
        || config.keep_alive_interval.is_zero()
        || config.keep_alive_timeout.is_zero()
    {
        return Err(EtcdDiscoveryError::InvalidConfig(
            "timeouts and keep-alive interval must be greater than zero".to_string(),
        ));
    }
    Ok(())
}