rs-zero 0.2.9

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use serde::Deserialize;

#[cfg(feature = "rpc")]
use crate::core::CoreResult;

use super::etcd::EtcdDiscoverySection;

/// Runtime provider used by an outbound RPC client.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum RpcClientProvider {
    /// Use a static tonic endpoint.
    #[default]
    Static,
    /// Resolve instances through etcd service discovery.
    Etcd,
}

/// Serializable RPC client dependency configuration.
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(default, deny_unknown_fields)]
pub struct RpcClientSection {
    /// Client provider: `static` or `etcd`.
    pub provider: RpcClientProvider,
    /// Static tonic endpoint, for example `http://127.0.0.1:50051`.
    pub endpoint: String,
    /// Logical service name used by discovery providers.
    pub service: String,
    /// Connection establishment timeout in milliseconds.
    pub connect_timeout_ms: u64,
    /// Per-request timeout in milliseconds.
    pub request_timeout_ms: u64,
    /// Whether to apply production-oriented RPC resilience defaults.
    pub resilience: bool,
    /// Retry behavior.
    pub retry: RpcRetrySection,
    /// Deadline propagation behavior.
    pub deadline: RpcDeadlineSection,
    /// Load-balance behavior.
    pub load_balance: RpcLoadBalanceSection,
    /// Streaming observation behavior.
    pub streaming: RpcStreamingSection,
    /// etcd discovery settings when `provider = "etcd"`.
    pub etcd: Option<EtcdDiscoverySection>,
}

impl Default for RpcClientSection {
    fn default() -> Self {
        Self {
            provider: RpcClientProvider::Static,
            endpoint: String::new(),
            service: String::new(),
            connect_timeout_ms: 3000,
            request_timeout_ms: 5000,
            resilience: true,
            retry: RpcRetrySection::default(),
            deadline: RpcDeadlineSection::default(),
            load_balance: RpcLoadBalanceSection::default(),
            streaming: RpcStreamingSection::default(),
            etcd: None,
        }
    }
}

impl RpcClientSection {
    /// Converts this section into the runtime RPC client config.
    #[cfg(feature = "rpc")]
    pub fn to_rpc_client_config(&self) -> CoreResult<crate::rpc::RpcClientConfig> {
        use std::time::Duration;

        if self.provider == RpcClientProvider::Static && self.endpoint.trim().is_empty() {
            return Err(super::config_error(
                "rpc client endpoint must be set for static provider",
            ));
        }
        if self.provider == RpcClientProvider::Etcd && self.service.trim().is_empty() {
            return Err(super::config_error(
                "rpc client service must be set for etcd provider",
            ));
        }

        let mut config = if self.resilience {
            crate::rpc::RpcClientConfig::production_defaults(self.endpoint.clone())
        } else {
            crate::rpc::RpcClientConfig::new(self.endpoint.clone())
        };
        config.connect_timeout = Duration::from_millis(self.connect_timeout_ms);
        config.request_timeout = Duration::from_millis(self.request_timeout_ms);
        config.retry = self.retry.to_rpc_retry_config();
        config.deadline = self.deadline.to_rpc_deadline_config();
        config.load_balance = self.load_balance.to_rpc_load_balance_config();
        config.discovery.service = (!self.service.is_empty()).then(|| self.service.clone());
        config.streaming = self.streaming.to_rpc_streaming_config();
        Ok(config)
    }
}

/// Serializable RPC retry configuration.
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(default, deny_unknown_fields)]
pub struct RpcRetrySection {
    pub enabled: bool,
    pub max_attempts: u32,
    pub initial_backoff_ms: u64,
    pub max_backoff_ms: u64,
}

impl Default for RpcRetrySection {
    fn default() -> Self {
        Self {
            enabled: true,
            max_attempts: 3,
            initial_backoff_ms: 50,
            max_backoff_ms: 500,
        }
    }
}

impl RpcRetrySection {
    #[cfg(feature = "rpc")]
    fn to_rpc_retry_config(&self) -> crate::rpc::RpcRetryConfig {
        crate::rpc::RpcRetryConfig {
            enabled: self.enabled,
            max_attempts: self.max_attempts,
            initial_backoff: std::time::Duration::from_millis(self.initial_backoff_ms),
            max_backoff: std::time::Duration::from_millis(self.max_backoff_ms),
            ..crate::rpc::RpcRetryConfig::default()
        }
    }
}

/// Serializable RPC deadline configuration.
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(default, deny_unknown_fields)]
pub struct RpcDeadlineSection {
    pub propagate: bool,
    pub clip_retries_to_budget: bool,
}

impl Default for RpcDeadlineSection {
    fn default() -> Self {
        Self {
            propagate: true,
            clip_retries_to_budget: true,
        }
    }
}

impl RpcDeadlineSection {
    #[cfg(feature = "rpc")]
    fn to_rpc_deadline_config(&self) -> crate::rpc::RpcDeadlineConfig {
        crate::rpc::RpcDeadlineConfig {
            propagate: self.propagate,
            clip_retries_to_budget: self.clip_retries_to_budget,
        }
    }
}

/// Serializable RPC load-balance configuration.
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)]
#[serde(default, deny_unknown_fields)]
pub struct RpcLoadBalanceSection {
    pub policy: RpcLoadBalancePolicySection,
}

/// Serializable RPC load-balance policy.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum RpcLoadBalancePolicySection {
    /// Use the configured static endpoint directly.
    #[default]
    Static,
    /// Select discovered instances by weight and round-robin cursor.
    WeightedRoundRobin,
}

impl RpcLoadBalanceSection {
    #[cfg(feature = "rpc")]
    fn to_rpc_load_balance_config(&self) -> crate::rpc::RpcLoadBalanceConfig {
        let policy = match self.policy {
            RpcLoadBalancePolicySection::Static => crate::rpc::LoadBalancePolicy::Static,
            RpcLoadBalancePolicySection::WeightedRoundRobin => {
                crate::rpc::LoadBalancePolicy::WeightedRoundRobin
            }
        };
        crate::rpc::RpcLoadBalanceConfig { policy }
    }
}

/// Serializable RPC streaming configuration.
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(default, deny_unknown_fields)]
pub struct RpcStreamingSection {
    pub observe: bool,
    pub resilience: bool,
    pub timeout_ms: u64,
}

impl Default for RpcStreamingSection {
    fn default() -> Self {
        Self {
            observe: true,
            resilience: true,
            timeout_ms: 30000,
        }
    }
}

impl RpcStreamingSection {
    #[cfg(feature = "rpc")]
    fn to_rpc_streaming_config(&self) -> crate::rpc::RpcStreamingConfig {
        crate::rpc::RpcStreamingConfig {
            observe: self.observe,
            resilience: self.resilience,
            timeout: (self.timeout_ms > 0)
                .then(|| std::time::Duration::from_millis(self.timeout_ms)),
        }
    }
}