rs_zero/core/dependency_config/
rpc.rs1use serde::Deserialize;
2
3#[cfg(feature = "rpc")]
4use crate::core::CoreResult;
5
6use super::etcd::EtcdDiscoverySection;
7
8#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Default)]
10#[serde(rename_all = "snake_case")]
11pub enum RpcClientProvider {
12 #[default]
14 Static,
15 Etcd,
17}
18
19#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
21#[serde(default, deny_unknown_fields)]
22pub struct RpcClientSection {
23 pub provider: RpcClientProvider,
25 pub endpoint: String,
27 pub service: String,
29 pub connect_timeout_ms: u64,
31 pub request_timeout_ms: u64,
33 pub resilience: bool,
35 pub retry: RpcRetrySection,
37 pub deadline: RpcDeadlineSection,
39 pub load_balance: RpcLoadBalanceSection,
41 pub streaming: RpcStreamingSection,
43 pub etcd: Option<EtcdDiscoverySection>,
45}
46
47impl Default for RpcClientSection {
48 fn default() -> Self {
49 Self {
50 provider: RpcClientProvider::Static,
51 endpoint: String::new(),
52 service: String::new(),
53 connect_timeout_ms: 3000,
54 request_timeout_ms: 5000,
55 resilience: true,
56 retry: RpcRetrySection::default(),
57 deadline: RpcDeadlineSection::default(),
58 load_balance: RpcLoadBalanceSection::default(),
59 streaming: RpcStreamingSection::default(),
60 etcd: None,
61 }
62 }
63}
64
65impl RpcClientSection {
66 #[cfg(feature = "rpc")]
68 pub fn to_rpc_client_config(&self) -> CoreResult<crate::rpc::RpcClientConfig> {
69 use std::time::Duration;
70
71 if self.provider == RpcClientProvider::Static && self.endpoint.trim().is_empty() {
72 return Err(super::config_error(
73 "rpc client endpoint must be set for static provider",
74 ));
75 }
76 if self.provider == RpcClientProvider::Etcd && self.service.trim().is_empty() {
77 return Err(super::config_error(
78 "rpc client service must be set for etcd provider",
79 ));
80 }
81
82 let mut config = if self.resilience {
83 crate::rpc::RpcClientConfig::production_defaults(self.endpoint.clone())
84 } else {
85 crate::rpc::RpcClientConfig::new(self.endpoint.clone())
86 };
87 config.connect_timeout = Duration::from_millis(self.connect_timeout_ms);
88 config.request_timeout = Duration::from_millis(self.request_timeout_ms);
89 config.retry = self.retry.to_rpc_retry_config();
90 config.deadline = self.deadline.to_rpc_deadline_config();
91 config.load_balance = self.load_balance.to_rpc_load_balance_config();
92 config.discovery.service = (!self.service.is_empty()).then(|| self.service.clone());
93 config.streaming = self.streaming.to_rpc_streaming_config();
94 Ok(config)
95 }
96}
97
98#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
100#[serde(default, deny_unknown_fields)]
101pub struct RpcRetrySection {
102 pub enabled: bool,
103 pub max_attempts: u32,
104 pub initial_backoff_ms: u64,
105 pub max_backoff_ms: u64,
106}
107
108impl Default for RpcRetrySection {
109 fn default() -> Self {
110 Self {
111 enabled: true,
112 max_attempts: 3,
113 initial_backoff_ms: 50,
114 max_backoff_ms: 500,
115 }
116 }
117}
118
119impl RpcRetrySection {
120 #[cfg(feature = "rpc")]
121 fn to_rpc_retry_config(&self) -> crate::rpc::RpcRetryConfig {
122 crate::rpc::RpcRetryConfig {
123 enabled: self.enabled,
124 max_attempts: self.max_attempts,
125 initial_backoff: std::time::Duration::from_millis(self.initial_backoff_ms),
126 max_backoff: std::time::Duration::from_millis(self.max_backoff_ms),
127 ..crate::rpc::RpcRetryConfig::default()
128 }
129 }
130}
131
132#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
134#[serde(default, deny_unknown_fields)]
135pub struct RpcDeadlineSection {
136 pub propagate: bool,
137 pub clip_retries_to_budget: bool,
138}
139
140impl Default for RpcDeadlineSection {
141 fn default() -> Self {
142 Self {
143 propagate: true,
144 clip_retries_to_budget: true,
145 }
146 }
147}
148
149impl RpcDeadlineSection {
150 #[cfg(feature = "rpc")]
151 fn to_rpc_deadline_config(&self) -> crate::rpc::RpcDeadlineConfig {
152 crate::rpc::RpcDeadlineConfig {
153 propagate: self.propagate,
154 clip_retries_to_budget: self.clip_retries_to_budget,
155 }
156 }
157}
158
159#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)]
161#[serde(default, deny_unknown_fields)]
162pub struct RpcLoadBalanceSection {
163 pub policy: RpcLoadBalancePolicySection,
164}
165
166#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Default)]
168#[serde(rename_all = "snake_case")]
169pub enum RpcLoadBalancePolicySection {
170 #[default]
172 Static,
173 WeightedRoundRobin,
175}
176
177impl RpcLoadBalanceSection {
178 #[cfg(feature = "rpc")]
179 fn to_rpc_load_balance_config(&self) -> crate::rpc::RpcLoadBalanceConfig {
180 let policy = match self.policy {
181 RpcLoadBalancePolicySection::Static => crate::rpc::LoadBalancePolicy::Static,
182 RpcLoadBalancePolicySection::WeightedRoundRobin => {
183 crate::rpc::LoadBalancePolicy::WeightedRoundRobin
184 }
185 };
186 crate::rpc::RpcLoadBalanceConfig { policy }
187 }
188}
189
190#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
192#[serde(default, deny_unknown_fields)]
193pub struct RpcStreamingSection {
194 pub observe: bool,
195 pub resilience: bool,
196 pub timeout_ms: u64,
197}
198
199impl Default for RpcStreamingSection {
200 fn default() -> Self {
201 Self {
202 observe: true,
203 resilience: true,
204 timeout_ms: 30000,
205 }
206 }
207}
208
209impl RpcStreamingSection {
210 #[cfg(feature = "rpc")]
211 fn to_rpc_streaming_config(&self) -> crate::rpc::RpcStreamingConfig {
212 crate::rpc::RpcStreamingConfig {
213 observe: self.observe,
214 resilience: self.resilience,
215 timeout: (self.timeout_ms > 0)
216 .then(|| std::time::Duration::from_millis(self.timeout_ms)),
217 }
218 }
219}