Skip to main content

rs_zero/rpc/
config.rs

1use std::{net::SocketAddr, time::Duration};
2
3/// RPC client configuration.
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub struct RpcClientConfig {
6    /// Endpoint URI, for example `http://127.0.0.1:50051`.
7    pub endpoint: String,
8    /// Connection establishment timeout.
9    pub connect_timeout: Duration,
10    /// Per-request timeout.
11    pub request_timeout: Duration,
12    /// Optional resilience behavior used by generated clients or helpers.
13    pub resilience: RpcResilienceConfig,
14    /// Client retry behavior.
15    pub retry: RpcRetryConfig,
16    /// Deadline propagation behavior.
17    pub deadline: RpcDeadlineConfig,
18    /// Client load-balancing behavior.
19    pub load_balance: RpcLoadBalanceConfig,
20    /// Optional service discovery target.
21    pub discovery: RpcDiscoveryConfig,
22    /// Streaming RPC instrumentation behavior.
23    pub streaming: RpcStreamingConfig,
24}
25
26impl RpcClientConfig {
27    /// Creates a client config with practical defaults.
28    pub fn new(endpoint: impl Into<String>) -> Self {
29        Self {
30            endpoint: endpoint.into(),
31            connect_timeout: Duration::from_secs(3),
32            request_timeout: Duration::from_secs(5),
33            resilience: RpcResilienceConfig::default(),
34            retry: RpcRetryConfig::default(),
35            deadline: RpcDeadlineConfig::default(),
36            load_balance: RpcLoadBalanceConfig::default(),
37            discovery: RpcDiscoveryConfig::default(),
38            streaming: RpcStreamingConfig::default(),
39        }
40    }
41}
42
43impl RpcClientConfig {
44    /// Creates a client config with go-zero style resilience defaults.
45    pub fn go_zero_defaults(endpoint: impl Into<String>) -> Self {
46        Self {
47            resilience: RpcResilienceConfig::go_zero_defaults(),
48            retry: RpcRetryConfig::go_zero_defaults(),
49            deadline: RpcDeadlineConfig::go_zero_defaults(),
50            load_balance: RpcLoadBalanceConfig::go_zero_defaults(),
51            streaming: RpcStreamingConfig::go_zero_defaults(),
52            ..Self::new(endpoint)
53        }
54    }
55}
56
57/// RPC server configuration.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct RpcServerConfig {
60    /// Logical service name.
61    pub name: String,
62    /// Server listen address.
63    pub addr: SocketAddr,
64    /// Resilience adapter boundary for tonic services.
65    pub resilience: RpcResilienceConfig,
66    /// Streaming RPC instrumentation behavior.
67    pub streaming: RpcStreamingConfig,
68}
69
70impl RpcServerConfig {
71    /// Creates server config.
72    pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
73        Self {
74            name: name.into(),
75            addr,
76            resilience: RpcResilienceConfig::default(),
77            streaming: RpcStreamingConfig::default(),
78        }
79    }
80}
81
82impl RpcServerConfig {
83    /// Creates a server config with go-zero style resilience defaults.
84    pub fn go_zero_defaults(name: impl Into<String>, addr: SocketAddr) -> Self {
85        Self {
86            resilience: RpcResilienceConfig::go_zero_defaults(),
87            streaming: RpcStreamingConfig::go_zero_defaults(),
88            ..Self::new(name, addr)
89        }
90    }
91}
92
93/// RPC retry configuration.
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct RpcRetryConfig {
96    /// Whether client retries are enabled.
97    pub enabled: bool,
98    /// Maximum total attempts including the first call.
99    pub max_attempts: u32,
100    /// Initial retry backoff.
101    pub initial_backoff: Duration,
102    /// Maximum retry backoff.
103    pub max_backoff: Duration,
104    /// Retryable tonic codes.
105    pub retryable_codes: Vec<tonic::Code>,
106}
107
108impl Default for RpcRetryConfig {
109    fn default() -> Self {
110        Self {
111            enabled: false,
112            max_attempts: 1,
113            initial_backoff: Duration::from_millis(50),
114            max_backoff: Duration::from_secs(1),
115            retryable_codes: vec![
116                tonic::Code::Unavailable,
117                tonic::Code::DeadlineExceeded,
118                tonic::Code::ResourceExhausted,
119            ],
120        }
121    }
122}
123
124impl RpcRetryConfig {
125    /// Returns a go-zero style retry profile for client calls.
126    pub fn go_zero_defaults() -> Self {
127        Self {
128            enabled: true,
129            max_attempts: 3,
130            initial_backoff: Duration::from_millis(50),
131            max_backoff: Duration::from_millis(500),
132            ..Self::default()
133        }
134    }
135}
136
137/// RPC deadline propagation configuration.
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct RpcDeadlineConfig {
140    /// Whether outgoing requests should carry remaining timeout metadata.
141    pub propagate: bool,
142    /// Whether retries should be clipped by remaining request budget.
143    pub clip_retries_to_budget: bool,
144}
145
146impl Default for RpcDeadlineConfig {
147    fn default() -> Self {
148        Self {
149            propagate: false,
150            clip_retries_to_budget: true,
151        }
152    }
153}
154
155impl RpcDeadlineConfig {
156    /// Returns production-oriented deadline defaults.
157    pub fn go_zero_defaults() -> Self {
158        Self {
159            propagate: true,
160            clip_retries_to_budget: true,
161        }
162    }
163}
164
165/// RPC load balance policy.
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
167pub enum LoadBalancePolicy {
168    /// Use the configured static endpoint directly.
169    #[default]
170    Static,
171    /// Select discovered instances by weight and round-robin cursor.
172    WeightedRoundRobin,
173}
174
175/// RPC load balance configuration.
176#[derive(Debug, Clone, PartialEq, Eq, Default)]
177pub struct RpcLoadBalanceConfig {
178    /// Selection policy.
179    pub policy: LoadBalancePolicy,
180}
181
182impl RpcLoadBalanceConfig {
183    /// Returns production-oriented load balance defaults.
184    pub fn go_zero_defaults() -> Self {
185        Self {
186            policy: LoadBalancePolicy::WeightedRoundRobin,
187        }
188    }
189}
190
191/// RPC service discovery configuration.
192#[derive(Debug, Clone, PartialEq, Eq, Default)]
193pub struct RpcDiscoveryConfig {
194    /// Optional logical service name used by discovery clients.
195    pub service: Option<String>,
196}
197
198/// RPC streaming instrumentation configuration.
199#[derive(Debug, Clone, PartialEq, Eq, Default)]
200pub struct RpcStreamingConfig {
201    /// Whether stream send/receive events should be observed.
202    pub observe: bool,
203    /// Whether stream completion should be mapped to resilience outcomes.
204    pub resilience: bool,
205    /// Optional per-stream timeout.
206    pub timeout: Option<Duration>,
207}
208
209impl RpcStreamingConfig {
210    /// Returns production-oriented streaming defaults.
211    pub fn go_zero_defaults() -> Self {
212        Self {
213            observe: true,
214            resilience: true,
215            timeout: Some(Duration::from_secs(30)),
216        }
217    }
218}
219
220/// RPC resilience configuration shared by future tonic layers.
221#[derive(Debug, Clone, PartialEq, Eq)]
222pub struct RpcResilienceConfig {
223    /// Whether RPC breaker integration should be enabled by adapters.
224    pub breaker_enabled: bool,
225    /// Consecutive failures that open an RPC breaker.
226    pub breaker_failure_threshold: u32,
227    /// Time before an open RPC breaker allows a half-open trial call.
228    pub breaker_reset_timeout: Duration,
229    /// Whether the RPC breaker uses Google SRE style adaptive rejection.
230    pub breaker_sre_enabled: bool,
231    /// SRE breaker multiplier in millis. `1500` means `k = 1.5`.
232    pub breaker_sre_k_millis: u32,
233    /// Minimum total samples before SRE breaker rejection can start.
234    pub breaker_sre_protection: u64,
235    /// Optional maximum number of in-flight RPC calls.
236    pub max_concurrency: Option<usize>,
237    /// Per-call timeout used by generated service adapters.
238    pub request_timeout: Duration,
239    /// Whether adaptive RPC shedding is enabled.
240    pub shedding_enabled: bool,
241    /// Maximum in-flight calls used by the adaptive shedder.
242    pub shedding_max_in_flight: Option<usize>,
243    /// Minimum samples before latency-based shedding can reject.
244    pub shedding_min_request_count: u64,
245    /// Average latency threshold used by adaptive shedding.
246    pub shedding_max_latency: Duration,
247    /// CPU usage threshold used by adaptive shedding, where `1000` means 100%.
248    pub shedding_cpu_threshold_millis: u32,
249    /// Cool-off duration after a recent adaptive shedder drop.
250    pub shedding_cool_off: Duration,
251    /// Number of buckets used by adaptive shedder rolling windows.
252    pub shedding_window_buckets: usize,
253    /// Duration represented by each adaptive shedder bucket.
254    pub shedding_window_bucket_duration: Duration,
255    /// Optional Redis-backed RPC limiter.
256    #[cfg(all(feature = "resil", feature = "cache-redis"))]
257    pub rate_limiter: RpcRateLimiterConfig,
258}
259
260impl Default for RpcResilienceConfig {
261    fn default() -> Self {
262        Self {
263            breaker_enabled: false,
264            breaker_failure_threshold: 5,
265            breaker_reset_timeout: Duration::from_secs(30),
266            breaker_sre_enabled: false,
267            breaker_sre_k_millis: 1500,
268            breaker_sre_protection: 5,
269            max_concurrency: None,
270            request_timeout: Duration::from_secs(5),
271            shedding_enabled: false,
272            shedding_max_in_flight: None,
273            shedding_min_request_count: 20,
274            shedding_max_latency: Duration::from_millis(250),
275            shedding_cpu_threshold_millis: 900,
276            shedding_cool_off: Duration::from_secs(1),
277            shedding_window_buckets: 50,
278            shedding_window_bucket_duration: Duration::from_millis(100),
279            #[cfg(all(feature = "resil", feature = "cache-redis"))]
280            rate_limiter: RpcRateLimiterConfig::default(),
281        }
282    }
283}
284
285impl RpcResilienceConfig {
286    /// Returns a go-zero style resilience profile for unary RPC calls.
287    pub fn go_zero_defaults() -> Self {
288        Self {
289            breaker_enabled: true,
290            breaker_sre_enabled: true,
291            max_concurrency: Some(1024),
292            request_timeout: Duration::from_secs(5),
293            shedding_enabled: true,
294            shedding_max_in_flight: Some(1024),
295            shedding_min_request_count: 20,
296            shedding_max_latency: Duration::from_millis(250),
297            ..Self::default()
298        }
299    }
300}
301
302/// Redis-backed RPC limiter selection.
303#[cfg(all(feature = "resil", feature = "cache-redis"))]
304#[derive(Debug, Clone, PartialEq, Eq, Default)]
305pub enum RpcRateLimiterConfig {
306    /// No Redis limiter is applied.
307    #[default]
308    Disabled,
309    /// Token-bucket limiter backed by Redis.
310    RedisToken(crate::resil::RedisTokenLimiterConfig),
311    /// Fixed-window period limiter backed by Redis.
312    RedisPeriod(crate::resil::RedisPeriodLimiterConfig),
313}