1use std::fmt::Debug;
2
3use config::ConfigError;
4use serde::Deserialize;
5use serde::Serialize;
6
7use crate::Error;
8use crate::Result;
9
10#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
12pub struct BackoffPolicy {
13 #[serde(default = "default_max_retries")]
15 pub max_retries: usize,
16
17 #[serde(default = "default_op_timeout_ms")]
19 pub timeout_ms: u64,
20
21 #[serde(default = "default_base_delay_ms")]
23 pub base_delay_ms: u64,
24
25 #[serde(default = "default_max_delay_ms")]
27 pub max_delay_ms: u64,
28}
29
30#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
32pub struct InstallSnapshotBackoffPolicy {
33 #[serde(default = "default_max_retries")]
35 pub max_retries: usize,
36
37 #[serde(default = "default_op_timeout_ms")]
39 pub timeout_ms: u64,
40
41 #[serde(default = "default_base_delay_ms")]
43 pub base_delay_ms: u64,
44
45 #[serde(default = "default_max_delay_ms")]
47 pub max_delay_ms: u64,
48
49 #[serde(default = "default_per_chunk_timeout_ms")]
52 pub per_chunk_timeout_ms: u64,
53
54 #[serde(default = "default_min_timeout_ms")]
56 pub min_timeout_ms: u64,
57
58 #[serde(default = "default_max_timeout_ms")]
60 pub max_timeout_ms: u64,
61
62 #[serde(default = "default_between_chunk_timeout_ms")]
64 pub between_chunk_timeout_ms: u64,
65
66 #[serde(default = "default_push_failure_alert_threshold")]
73 pub push_failure_alert_threshold: u32,
74
75 #[serde(default = "default_push_backoff_max_delay_ms")]
85 pub push_backoff_max_delay_ms: u64,
86}
87
88#[derive(Serialize, Deserialize, Clone)]
91pub struct RetryPolicies {
92 #[serde(default)]
95 pub append_entries: BackoffPolicy,
96
97 #[serde(default)]
100 pub election: BackoffPolicy,
101
102 #[serde(default)]
105 pub membership: BackoffPolicy,
106
107 #[serde(default)]
109 pub auto_discovery: BackoffPolicy,
110
111 #[serde(default)]
113 pub join_cluster: BackoffPolicy,
114
115 #[serde(default)]
117 pub install_snapshot: InstallSnapshotBackoffPolicy,
118
119 #[serde(default)]
122 pub healthcheck: BackoffPolicy,
123
124 #[serde(default)]
127 pub internal_quorum: BackoffPolicy,
128}
129
130impl Debug for RetryPolicies {
131 fn fmt(
132 &self,
133 f: &mut std::fmt::Formatter<'_>,
134 ) -> std::fmt::Result {
135 f.debug_struct("RetryPolicies").finish()
136 }
137}
138impl Default for RetryPolicies {
140 fn default() -> Self {
141 Self {
142 append_entries: BackoffPolicy {
143 max_retries: 1,
144 timeout_ms: 100,
145 base_delay_ms: 50,
146 max_delay_ms: 1000,
147 },
148 election: BackoffPolicy {
149 max_retries: 3,
151 timeout_ms: 100,
152 base_delay_ms: 50,
153 max_delay_ms: 5000,
154 },
155 membership: BackoffPolicy {
156 max_retries: 120,
157 timeout_ms: 500,
158 base_delay_ms: 3000,
159 max_delay_ms: 60000,
160 },
161 healthcheck: BackoffPolicy {
162 max_retries: 10000,
163 timeout_ms: 100,
164 base_delay_ms: 1000,
165 max_delay_ms: 10000,
166 },
167
168 auto_discovery: BackoffPolicy {
169 max_retries: 3,
170 timeout_ms: 100,
171 base_delay_ms: 50,
172 max_delay_ms: 1000,
173 },
174
175 join_cluster: BackoffPolicy {
176 max_retries: 3,
177 timeout_ms: 500,
178 base_delay_ms: 3000,
179 max_delay_ms: 60000,
180 },
181
182 install_snapshot: InstallSnapshotBackoffPolicy {
197 max_retries: 3,
198 timeout_ms: 500,
199 base_delay_ms: 50,
200 max_delay_ms: 2000,
201 per_chunk_timeout_ms: default_per_chunk_timeout_ms(),
202 min_timeout_ms: default_min_timeout_ms(),
203 max_timeout_ms: default_max_timeout_ms(),
204 between_chunk_timeout_ms: default_between_chunk_timeout_ms(),
205 push_failure_alert_threshold: default_push_failure_alert_threshold(),
206 push_backoff_max_delay_ms: default_push_backoff_max_delay_ms(),
207 },
208
209 internal_quorum: BackoffPolicy {
210 max_retries: 3,
213 timeout_ms: 100,
214 base_delay_ms: 50,
215 max_delay_ms: 1000,
216 },
217 }
218 }
219}
220impl BackoffPolicy {
221 pub fn validate(
228 &self,
229 policy_name: &str,
230 ) -> Result<()> {
231 if self.max_retries == 0 {
233 return Err(Error::Config(ConfigError::Message(format!(
234 "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
235 ))));
236 }
237
238 if self.timeout_ms == 0 {
240 return Err(Error::Config(ConfigError::Message(format!(
241 "{policy_name}: timeout_ms cannot be 0"
242 ))));
243 }
244
245 if self.base_delay_ms >= self.max_delay_ms {
247 return Err(Error::Config(ConfigError::Message(format!(
248 "{}: base_delay_ms({}) must be less than max_delay_ms({})",
249 policy_name, self.base_delay_ms, self.max_delay_ms
250 ))));
251 }
252
253 if self.max_delay_ms > 120_000 {
255 return Err(Error::Config(ConfigError::Message(format!(
257 "{}: max_delay_ms({}) exceeds 2min limit",
258 policy_name, self.max_delay_ms
259 ))));
260 }
261
262 Ok(())
263 }
264}
265
266impl InstallSnapshotBackoffPolicy {
267 pub fn validate(
274 &self,
275 policy_name: &str,
276 ) -> Result<()> {
277 self.validate_base_policy(policy_name)?;
279
280 self.validate_chunk_timeouts(policy_name)?;
282
283 self.validate_rpc_timeouts(policy_name)?;
285
286 Ok(())
287 }
288
289 fn validate_base_policy(
291 &self,
292 policy_name: &str,
293 ) -> Result<()> {
294 if self.max_retries == 0 {
295 return Err(Error::Config(ConfigError::Message(format!(
296 "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
297 ))));
298 }
299
300 if self.timeout_ms == 0 {
301 return Err(Error::Config(ConfigError::Message(format!(
302 "{policy_name}: timeout_ms cannot be 0"
303 ))));
304 }
305
306 if self.base_delay_ms >= self.max_delay_ms {
307 return Err(Error::Config(ConfigError::Message(format!(
308 "{}: base_delay_ms({}) must be less than max_delay_ms({})",
309 policy_name, self.base_delay_ms, self.max_delay_ms
310 ))));
311 }
312
313 if self.max_delay_ms > 120_000 {
314 return Err(Error::Config(ConfigError::Message(format!(
315 "{}: max_delay_ms({}) exceeds 2min limit",
316 policy_name, self.max_delay_ms
317 ))));
318 }
319
320 if self.push_failure_alert_threshold == 0 {
321 return Err(Error::Config(ConfigError::Message(format!(
322 "{policy_name}: push_failure_alert_threshold cannot be 0"
323 ))));
324 }
325
326 if self.push_backoff_max_delay_ms < self.max_delay_ms {
327 return Err(Error::Config(ConfigError::Message(format!(
328 "{}: push_backoff_max_delay_ms({}) must be >= max_delay_ms({}) \
329 to ensure leader-side backoff is not shorter than chunk-level retry cap",
330 policy_name, self.push_backoff_max_delay_ms, self.max_delay_ms
331 ))));
332 }
333
334 Ok(())
335 }
336
337 fn validate_chunk_timeouts(
339 &self,
340 policy_name: &str,
341 ) -> Result<()> {
342 if self.per_chunk_timeout_ms == 0 {
343 return Err(Error::Config(ConfigError::Message(format!(
344 "{policy_name}: per_chunk_timeout_ms cannot be 0"
345 ))));
346 }
347
348 if self.between_chunk_timeout_ms == 0 {
349 return Err(Error::Config(ConfigError::Message(format!(
350 "{policy_name}: between_chunk_timeout_ms cannot be 0"
351 ))));
352 }
353
354 Ok(())
355 }
356
357 fn validate_rpc_timeouts(
359 &self,
360 policy_name: &str,
361 ) -> Result<()> {
362 const MAX_RPC_TIMEOUT: u64 = 86_400_000; if self.min_timeout_ms == 0 {
365 return Err(Error::Config(ConfigError::Message(format!(
366 "{policy_name}: min_timeout_ms cannot be 0"
367 ))));
368 }
369
370 if self.max_timeout_ms == 0 {
371 return Err(Error::Config(ConfigError::Message(format!(
372 "{policy_name}: max_timeout_ms cannot be 0"
373 ))));
374 }
375
376 if self.min_timeout_ms > self.max_timeout_ms {
377 return Err(Error::Config(ConfigError::Message(format!(
378 "{}: min_timeout_ms({}) > max_timeout_ms({})",
379 policy_name, self.min_timeout_ms, self.max_timeout_ms
380 ))));
381 }
382
383 if self.max_timeout_ms > MAX_RPC_TIMEOUT {
384 return Err(Error::Config(ConfigError::Message(format!(
385 "{}: max_timeout_ms({}) exceeds 24-hour limit",
386 policy_name, self.max_timeout_ms
387 ))));
388 }
389
390 Ok(())
391 }
392}
393
394impl RetryPolicies {
395 pub fn validate(&self) -> Result<()> {
397 self.validate_append_entries()?;
398 self.validate_election()?;
399 self.validate_membership()?;
400 self.validate_healthcheck()?;
401 self.validate_auto_discovery()?;
402 self.validate_join_cluster()?;
403 self.validate_install_snapshot()?;
404 self.validate_internal_quorum()?;
405 Ok(())
406 }
407
408 fn validate_append_entries(&self) -> Result<()> {
409 self.append_entries.validate("append_entries")?;
410
411 Ok(())
412 }
413
414 fn validate_election(&self) -> Result<()> {
415 self.election.validate("election")?;
416
417 Ok(())
418 }
419
420 fn validate_membership(&self) -> Result<()> {
421 self.membership.validate("membership")?;
422
423 Ok(())
424 }
425
426 fn validate_healthcheck(&self) -> Result<()> {
427 self.healthcheck.validate("healthcheck")?;
428
429 Ok(())
430 }
431
432 fn validate_install_snapshot(&self) -> Result<()> {
433 self.install_snapshot.validate("install_snapshot")?;
434
435 Ok(())
436 }
437
438 fn validate_join_cluster(&self) -> Result<()> {
439 self.join_cluster.validate("join_cluster")?;
440
441 Ok(())
442 }
443
444 fn validate_auto_discovery(&self) -> Result<()> {
445 self.auto_discovery.validate("auto_discovery")?;
446
447 Ok(())
448 }
449
450 fn validate_internal_quorum(&self) -> Result<()> {
451 self.internal_quorum.validate("internal_quorum")?;
452
453 Ok(())
454 }
455}
456
457fn default_max_retries() -> usize {
458 3
459}
460fn default_op_timeout_ms() -> u64 {
461 100
462}
463fn default_base_delay_ms() -> u64 {
464 50
465}
466fn default_max_delay_ms() -> u64 {
467 1000
468}
469fn default_per_chunk_timeout_ms() -> u64 {
470 100
471}
472fn default_min_timeout_ms() -> u64 {
473 100
474}
475fn default_max_timeout_ms() -> u64 {
476 30_000
477}
478fn default_between_chunk_timeout_ms() -> u64 {
479 30_000
480}
481fn default_push_failure_alert_threshold() -> u32 {
482 5
483}
484fn default_push_backoff_max_delay_ms() -> u64 {
485 30_000
486}