use serde::Deserialize;
#[cfg(feature = "rpc")]
use crate::core::CoreResult;
use super::etcd::EtcdDiscoverySection;
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum RpcClientProvider {
#[default]
Static,
Etcd,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(default, deny_unknown_fields)]
pub struct RpcClientSection {
pub provider: RpcClientProvider,
pub endpoint: String,
pub service: String,
pub connect_timeout_ms: u64,
pub request_timeout_ms: u64,
pub resilience: bool,
pub retry: RpcRetrySection,
pub deadline: RpcDeadlineSection,
pub load_balance: RpcLoadBalanceSection,
pub streaming: RpcStreamingSection,
pub etcd: Option<EtcdDiscoverySection>,
}
impl Default for RpcClientSection {
fn default() -> Self {
Self {
provider: RpcClientProvider::Static,
endpoint: String::new(),
service: String::new(),
connect_timeout_ms: 3000,
request_timeout_ms: 5000,
resilience: true,
retry: RpcRetrySection::default(),
deadline: RpcDeadlineSection::default(),
load_balance: RpcLoadBalanceSection::default(),
streaming: RpcStreamingSection::default(),
etcd: None,
}
}
}
impl RpcClientSection {
#[cfg(feature = "rpc")]
pub fn to_rpc_client_config(&self) -> CoreResult<crate::rpc::RpcClientConfig> {
use std::time::Duration;
if self.provider == RpcClientProvider::Static && self.endpoint.trim().is_empty() {
return Err(super::config_error(
"rpc client endpoint must be set for static provider",
));
}
if self.provider == RpcClientProvider::Etcd && self.service.trim().is_empty() {
return Err(super::config_error(
"rpc client service must be set for etcd provider",
));
}
let mut config = if self.resilience {
crate::rpc::RpcClientConfig::production_defaults(self.endpoint.clone())
} else {
crate::rpc::RpcClientConfig::new(self.endpoint.clone())
};
config.connect_timeout = Duration::from_millis(self.connect_timeout_ms);
config.request_timeout = Duration::from_millis(self.request_timeout_ms);
config.retry = self.retry.to_rpc_retry_config();
config.deadline = self.deadline.to_rpc_deadline_config();
config.load_balance = self.load_balance.to_rpc_load_balance_config();
config.discovery.service = (!self.service.is_empty()).then(|| self.service.clone());
config.streaming = self.streaming.to_rpc_streaming_config();
Ok(config)
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(default, deny_unknown_fields)]
pub struct RpcRetrySection {
pub enabled: bool,
pub max_attempts: u32,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
}
impl Default for RpcRetrySection {
fn default() -> Self {
Self {
enabled: true,
max_attempts: 3,
initial_backoff_ms: 50,
max_backoff_ms: 500,
}
}
}
impl RpcRetrySection {
#[cfg(feature = "rpc")]
fn to_rpc_retry_config(&self) -> crate::rpc::RpcRetryConfig {
crate::rpc::RpcRetryConfig {
enabled: self.enabled,
max_attempts: self.max_attempts,
initial_backoff: std::time::Duration::from_millis(self.initial_backoff_ms),
max_backoff: std::time::Duration::from_millis(self.max_backoff_ms),
..crate::rpc::RpcRetryConfig::default()
}
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(default, deny_unknown_fields)]
pub struct RpcDeadlineSection {
pub propagate: bool,
pub clip_retries_to_budget: bool,
}
impl Default for RpcDeadlineSection {
fn default() -> Self {
Self {
propagate: true,
clip_retries_to_budget: true,
}
}
}
impl RpcDeadlineSection {
#[cfg(feature = "rpc")]
fn to_rpc_deadline_config(&self) -> crate::rpc::RpcDeadlineConfig {
crate::rpc::RpcDeadlineConfig {
propagate: self.propagate,
clip_retries_to_budget: self.clip_retries_to_budget,
}
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)]
#[serde(default, deny_unknown_fields)]
pub struct RpcLoadBalanceSection {
pub policy: RpcLoadBalancePolicySection,
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum RpcLoadBalancePolicySection {
#[default]
Static,
WeightedRoundRobin,
}
impl RpcLoadBalanceSection {
#[cfg(feature = "rpc")]
fn to_rpc_load_balance_config(&self) -> crate::rpc::RpcLoadBalanceConfig {
let policy = match self.policy {
RpcLoadBalancePolicySection::Static => crate::rpc::LoadBalancePolicy::Static,
RpcLoadBalancePolicySection::WeightedRoundRobin => {
crate::rpc::LoadBalancePolicy::WeightedRoundRobin
}
};
crate::rpc::RpcLoadBalanceConfig { policy }
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(default, deny_unknown_fields)]
pub struct RpcStreamingSection {
pub observe: bool,
pub resilience: bool,
pub timeout_ms: u64,
}
impl Default for RpcStreamingSection {
fn default() -> Self {
Self {
observe: true,
resilience: true,
timeout_ms: 30000,
}
}
}
impl RpcStreamingSection {
#[cfg(feature = "rpc")]
fn to_rpc_streaming_config(&self) -> crate::rpc::RpcStreamingConfig {
crate::rpc::RpcStreamingConfig {
observe: self.observe,
resilience: self.resilience,
timeout: (self.timeout_ms > 0)
.then(|| std::time::Duration::from_millis(self.timeout_ms)),
}
}
}