Skip to main content

rs_zero/core/dependency_config/
rpc.rs

1use serde::Deserialize;
2
3#[cfg(feature = "rpc")]
4use crate::core::CoreResult;
5
6use super::etcd::EtcdDiscoverySection;
7
8/// Runtime provider used by an outbound RPC client.
9#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Default)]
10#[serde(rename_all = "snake_case")]
11pub enum RpcClientProvider {
12    /// Use a static tonic endpoint.
13    #[default]
14    Static,
15    /// Resolve instances through etcd service discovery.
16    Etcd,
17}
18
19/// Serializable RPC client dependency configuration.
20#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
21#[serde(default, deny_unknown_fields)]
22pub struct RpcClientSection {
23    /// Client provider: `static` or `etcd`.
24    pub provider: RpcClientProvider,
25    /// Static tonic endpoint, for example `http://127.0.0.1:50051`.
26    pub endpoint: String,
27    /// Logical service name used by discovery providers.
28    pub service: String,
29    /// Connection establishment timeout in milliseconds.
30    pub connect_timeout_ms: u64,
31    /// Per-request timeout in milliseconds.
32    pub request_timeout_ms: u64,
33    /// Whether to apply production-oriented RPC resilience defaults.
34    pub resilience: bool,
35    /// Retry behavior.
36    pub retry: RpcRetrySection,
37    /// Deadline propagation behavior.
38    pub deadline: RpcDeadlineSection,
39    /// Load-balance behavior.
40    pub load_balance: RpcLoadBalanceSection,
41    /// Streaming observation behavior.
42    pub streaming: RpcStreamingSection,
43    /// etcd discovery settings when `provider = "etcd"`.
44    pub etcd: Option<EtcdDiscoverySection>,
45}
46
47impl Default for RpcClientSection {
48    fn default() -> Self {
49        Self {
50            provider: RpcClientProvider::Static,
51            endpoint: String::new(),
52            service: String::new(),
53            connect_timeout_ms: 3000,
54            request_timeout_ms: 5000,
55            resilience: true,
56            retry: RpcRetrySection::default(),
57            deadline: RpcDeadlineSection::default(),
58            load_balance: RpcLoadBalanceSection::default(),
59            streaming: RpcStreamingSection::default(),
60            etcd: None,
61        }
62    }
63}
64
65impl RpcClientSection {
66    /// Converts this section into the runtime RPC client config.
67    #[cfg(feature = "rpc")]
68    pub fn to_rpc_client_config(&self) -> CoreResult<crate::rpc::RpcClientConfig> {
69        use std::time::Duration;
70
71        if self.provider == RpcClientProvider::Static && self.endpoint.trim().is_empty() {
72            return Err(super::config_error(
73                "rpc client endpoint must be set for static provider",
74            ));
75        }
76        if self.provider == RpcClientProvider::Etcd && self.service.trim().is_empty() {
77            return Err(super::config_error(
78                "rpc client service must be set for etcd provider",
79            ));
80        }
81
82        let mut config = if self.resilience {
83            crate::rpc::RpcClientConfig::production_defaults(self.endpoint.clone())
84        } else {
85            crate::rpc::RpcClientConfig::new(self.endpoint.clone())
86        };
87        config.connect_timeout = Duration::from_millis(self.connect_timeout_ms);
88        config.request_timeout = Duration::from_millis(self.request_timeout_ms);
89        config.retry = self.retry.to_rpc_retry_config();
90        config.deadline = self.deadline.to_rpc_deadline_config();
91        config.load_balance = self.load_balance.to_rpc_load_balance_config();
92        config.discovery.service = (!self.service.is_empty()).then(|| self.service.clone());
93        config.streaming = self.streaming.to_rpc_streaming_config();
94        Ok(config)
95    }
96}
97
98/// Serializable RPC retry configuration.
99#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
100#[serde(default, deny_unknown_fields)]
101pub struct RpcRetrySection {
102    pub enabled: bool,
103    pub max_attempts: u32,
104    pub initial_backoff_ms: u64,
105    pub max_backoff_ms: u64,
106}
107
108impl Default for RpcRetrySection {
109    fn default() -> Self {
110        Self {
111            enabled: true,
112            max_attempts: 3,
113            initial_backoff_ms: 50,
114            max_backoff_ms: 500,
115        }
116    }
117}
118
119impl RpcRetrySection {
120    #[cfg(feature = "rpc")]
121    fn to_rpc_retry_config(&self) -> crate::rpc::RpcRetryConfig {
122        crate::rpc::RpcRetryConfig {
123            enabled: self.enabled,
124            max_attempts: self.max_attempts,
125            initial_backoff: std::time::Duration::from_millis(self.initial_backoff_ms),
126            max_backoff: std::time::Duration::from_millis(self.max_backoff_ms),
127            ..crate::rpc::RpcRetryConfig::default()
128        }
129    }
130}
131
132/// Serializable RPC deadline configuration.
133#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
134#[serde(default, deny_unknown_fields)]
135pub struct RpcDeadlineSection {
136    pub propagate: bool,
137    pub clip_retries_to_budget: bool,
138}
139
140impl Default for RpcDeadlineSection {
141    fn default() -> Self {
142        Self {
143            propagate: true,
144            clip_retries_to_budget: true,
145        }
146    }
147}
148
149impl RpcDeadlineSection {
150    #[cfg(feature = "rpc")]
151    fn to_rpc_deadline_config(&self) -> crate::rpc::RpcDeadlineConfig {
152        crate::rpc::RpcDeadlineConfig {
153            propagate: self.propagate,
154            clip_retries_to_budget: self.clip_retries_to_budget,
155        }
156    }
157}
158
159/// Serializable RPC load-balance configuration.
160#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)]
161#[serde(default, deny_unknown_fields)]
162pub struct RpcLoadBalanceSection {
163    pub policy: RpcLoadBalancePolicySection,
164}
165
166/// Serializable RPC load-balance policy.
167#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Default)]
168#[serde(rename_all = "snake_case")]
169pub enum RpcLoadBalancePolicySection {
170    /// Use the configured static endpoint directly.
171    #[default]
172    Static,
173    /// Select discovered instances by weight and round-robin cursor.
174    WeightedRoundRobin,
175}
176
177impl RpcLoadBalanceSection {
178    #[cfg(feature = "rpc")]
179    fn to_rpc_load_balance_config(&self) -> crate::rpc::RpcLoadBalanceConfig {
180        let policy = match self.policy {
181            RpcLoadBalancePolicySection::Static => crate::rpc::LoadBalancePolicy::Static,
182            RpcLoadBalancePolicySection::WeightedRoundRobin => {
183                crate::rpc::LoadBalancePolicy::WeightedRoundRobin
184            }
185        };
186        crate::rpc::RpcLoadBalanceConfig { policy }
187    }
188}
189
190/// Serializable RPC streaming configuration.
191#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
192#[serde(default, deny_unknown_fields)]
193pub struct RpcStreamingSection {
194    pub observe: bool,
195    pub resilience: bool,
196    pub timeout_ms: u64,
197}
198
199impl Default for RpcStreamingSection {
200    fn default() -> Self {
201        Self {
202            observe: true,
203            resilience: true,
204            timeout_ms: 30000,
205        }
206    }
207}
208
209impl RpcStreamingSection {
210    #[cfg(feature = "rpc")]
211    fn to_rpc_streaming_config(&self) -> crate::rpc::RpcStreamingConfig {
212        crate::rpc::RpcStreamingConfig {
213            observe: self.observe,
214            resilience: self.resilience,
215            timeout: (self.timeout_ms > 0)
216                .then(|| std::time::Duration::from_millis(self.timeout_ms)),
217        }
218    }
219}