1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
use std::{net::SocketAddr, time::Duration};
/// RPC client configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RpcClientConfig {
/// Endpoint URI, for example `http://127.0.0.1:50051`.
pub endpoint: String,
/// Connection establishment timeout.
pub connect_timeout: Duration,
/// Per-request timeout.
pub request_timeout: Duration,
/// Optional resilience behavior used by generated clients or helpers.
pub resilience: RpcResilienceConfig,
/// Client retry behavior.
pub retry: RpcRetryConfig,
/// Deadline propagation behavior.
pub deadline: RpcDeadlineConfig,
/// Client load-balancing behavior.
pub load_balance: RpcLoadBalanceConfig,
/// Optional service discovery target.
pub discovery: RpcDiscoveryConfig,
/// Streaming RPC instrumentation behavior.
pub streaming: RpcStreamingConfig,
}
impl RpcClientConfig {
/// Creates a client config with practical defaults.
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
connect_timeout: Duration::from_secs(3),
request_timeout: Duration::from_secs(5),
resilience: RpcResilienceConfig::default(),
retry: RpcRetryConfig::default(),
deadline: RpcDeadlineConfig::default(),
load_balance: RpcLoadBalanceConfig::default(),
discovery: RpcDiscoveryConfig::default(),
streaming: RpcStreamingConfig::default(),
}
}
}
impl RpcClientConfig {
/// Creates a client config with go-zero style resilience defaults.
pub fn go_zero_defaults(endpoint: impl Into<String>) -> Self {
Self {
resilience: RpcResilienceConfig::go_zero_defaults(),
retry: RpcRetryConfig::go_zero_defaults(),
deadline: RpcDeadlineConfig::go_zero_defaults(),
load_balance: RpcLoadBalanceConfig::go_zero_defaults(),
streaming: RpcStreamingConfig::go_zero_defaults(),
..Self::new(endpoint)
}
}
}
/// RPC server configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RpcServerConfig {
/// Logical service name.
pub name: String,
/// Server listen address.
pub addr: SocketAddr,
/// Resilience adapter boundary for tonic services.
pub resilience: RpcResilienceConfig,
/// Streaming RPC instrumentation behavior.
pub streaming: RpcStreamingConfig,
}
impl RpcServerConfig {
/// Creates server config.
pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
Self {
name: name.into(),
addr,
resilience: RpcResilienceConfig::default(),
streaming: RpcStreamingConfig::default(),
}
}
}
impl RpcServerConfig {
/// Creates a server config with go-zero style resilience defaults.
pub fn go_zero_defaults(name: impl Into<String>, addr: SocketAddr) -> Self {
Self {
resilience: RpcResilienceConfig::go_zero_defaults(),
streaming: RpcStreamingConfig::go_zero_defaults(),
..Self::new(name, addr)
}
}
}
/// RPC retry configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RpcRetryConfig {
/// Whether client retries are enabled.
pub enabled: bool,
/// Maximum total attempts including the first call.
pub max_attempts: u32,
/// Initial retry backoff.
pub initial_backoff: Duration,
/// Maximum retry backoff.
pub max_backoff: Duration,
/// Retryable tonic codes.
pub retryable_codes: Vec<tonic::Code>,
}
impl Default for RpcRetryConfig {
fn default() -> Self {
Self {
enabled: false,
max_attempts: 1,
initial_backoff: Duration::from_millis(50),
max_backoff: Duration::from_secs(1),
retryable_codes: vec![
tonic::Code::Unavailable,
tonic::Code::DeadlineExceeded,
tonic::Code::ResourceExhausted,
],
}
}
}
impl RpcRetryConfig {
/// Returns a go-zero style retry profile for client calls.
pub fn go_zero_defaults() -> Self {
Self {
enabled: true,
max_attempts: 3,
initial_backoff: Duration::from_millis(50),
max_backoff: Duration::from_millis(500),
..Self::default()
}
}
}
/// RPC deadline propagation configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RpcDeadlineConfig {
/// Whether outgoing requests should carry remaining timeout metadata.
pub propagate: bool,
/// Whether retries should be clipped by remaining request budget.
pub clip_retries_to_budget: bool,
}
impl Default for RpcDeadlineConfig {
fn default() -> Self {
Self {
propagate: false,
clip_retries_to_budget: true,
}
}
}
impl RpcDeadlineConfig {
/// Returns production-oriented deadline defaults.
pub fn go_zero_defaults() -> Self {
Self {
propagate: true,
clip_retries_to_budget: true,
}
}
}
/// RPC load balance policy.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LoadBalancePolicy {
/// Use the configured static endpoint directly.
#[default]
Static,
/// Select discovered instances by weight and round-robin cursor.
WeightedRoundRobin,
}
/// RPC load balance configuration.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct RpcLoadBalanceConfig {
/// Selection policy.
pub policy: LoadBalancePolicy,
}
impl RpcLoadBalanceConfig {
/// Returns production-oriented load balance defaults.
pub fn go_zero_defaults() -> Self {
Self {
policy: LoadBalancePolicy::WeightedRoundRobin,
}
}
}
/// RPC service discovery configuration.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct RpcDiscoveryConfig {
/// Optional logical service name used by discovery clients.
pub service: Option<String>,
}
/// RPC streaming instrumentation configuration.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct RpcStreamingConfig {
/// Whether stream send/receive events should be observed.
pub observe: bool,
/// Whether stream completion should be mapped to resilience outcomes.
pub resilience: bool,
/// Optional per-stream timeout.
pub timeout: Option<Duration>,
}
impl RpcStreamingConfig {
/// Returns production-oriented streaming defaults.
pub fn go_zero_defaults() -> Self {
Self {
observe: true,
resilience: true,
timeout: Some(Duration::from_secs(30)),
}
}
}
/// RPC resilience configuration shared by future tonic layers.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RpcResilienceConfig {
/// Whether RPC breaker integration should be enabled by adapters.
pub breaker_enabled: bool,
/// Consecutive failures that open an RPC breaker.
pub breaker_failure_threshold: u32,
/// Time before an open RPC breaker allows a half-open trial call.
pub breaker_reset_timeout: Duration,
/// Whether the RPC breaker uses Google SRE style adaptive rejection.
pub breaker_sre_enabled: bool,
/// SRE breaker multiplier in millis. `1500` means `k = 1.5`.
pub breaker_sre_k_millis: u32,
/// Minimum total samples before SRE breaker rejection can start.
pub breaker_sre_protection: u64,
/// Optional maximum number of in-flight RPC calls.
pub max_concurrency: Option<usize>,
/// Per-call timeout used by generated service adapters.
pub request_timeout: Duration,
/// Whether adaptive RPC shedding is enabled.
pub shedding_enabled: bool,
/// Maximum in-flight calls used by the adaptive shedder.
pub shedding_max_in_flight: Option<usize>,
/// Minimum samples before latency-based shedding can reject.
pub shedding_min_request_count: u64,
/// Average latency threshold used by adaptive shedding.
pub shedding_max_latency: Duration,
/// CPU usage threshold used by adaptive shedding, where `1000` means 100%.
pub shedding_cpu_threshold_millis: u32,
/// Cool-off duration after a recent adaptive shedder drop.
pub shedding_cool_off: Duration,
/// Number of buckets used by adaptive shedder rolling windows.
pub shedding_window_buckets: usize,
/// Duration represented by each adaptive shedder bucket.
pub shedding_window_bucket_duration: Duration,
/// Optional Redis-backed RPC limiter.
#[cfg(all(feature = "resil", feature = "cache-redis"))]
pub rate_limiter: RpcRateLimiterConfig,
}
impl Default for RpcResilienceConfig {
fn default() -> Self {
Self {
breaker_enabled: false,
breaker_failure_threshold: 5,
breaker_reset_timeout: Duration::from_secs(30),
breaker_sre_enabled: false,
breaker_sre_k_millis: 1500,
breaker_sre_protection: 5,
max_concurrency: None,
request_timeout: Duration::from_secs(5),
shedding_enabled: false,
shedding_max_in_flight: None,
shedding_min_request_count: 20,
shedding_max_latency: Duration::from_millis(250),
shedding_cpu_threshold_millis: 900,
shedding_cool_off: Duration::from_secs(1),
shedding_window_buckets: 50,
shedding_window_bucket_duration: Duration::from_millis(100),
#[cfg(all(feature = "resil", feature = "cache-redis"))]
rate_limiter: RpcRateLimiterConfig::default(),
}
}
}
impl RpcResilienceConfig {
/// Returns a go-zero style resilience profile for unary RPC calls.
pub fn go_zero_defaults() -> Self {
Self {
breaker_enabled: true,
breaker_sre_enabled: true,
max_concurrency: Some(1024),
request_timeout: Duration::from_secs(5),
shedding_enabled: true,
shedding_max_in_flight: Some(1024),
shedding_min_request_count: 20,
shedding_max_latency: Duration::from_millis(250),
..Self::default()
}
}
}
/// Redis-backed RPC limiter selection.
#[cfg(all(feature = "resil", feature = "cache-redis"))]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum RpcRateLimiterConfig {
/// No Redis limiter is applied.
#[default]
Disabled,
/// Token-bucket limiter backed by Redis.
RedisToken(crate::resil::RedisTokenLimiterConfig),
/// Fixed-window period limiter backed by Redis.
RedisPeriod(crate::resil::RedisPeriodLimiterConfig),
}