1use std::fmt::Debug;
2
3use config::ConfigError;
4use serde::Deserialize;
5use serde::Serialize;
6
7use crate::Error;
8use crate::Result;
9
10#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
12pub struct BackoffPolicy {
13 #[serde(default = "default_max_retries")]
15 pub max_retries: usize,
16
17 #[serde(default = "default_op_timeout_ms")]
19 pub timeout_ms: u64,
20
21 #[serde(default = "default_base_delay_ms")]
23 pub base_delay_ms: u64,
24
25 #[serde(default = "default_max_delay_ms")]
27 pub max_delay_ms: u64,
28}
29
30#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
32pub struct InstallSnapshotBackoffPolicy {
33 #[serde(default = "default_max_retries")]
35 pub max_retries: usize,
36
37 #[serde(default = "default_op_timeout_ms")]
39 pub timeout_ms: u64,
40
41 #[serde(default = "default_base_delay_ms")]
43 pub base_delay_ms: u64,
44
45 #[serde(default = "default_max_delay_ms")]
47 pub max_delay_ms: u64,
48
49 #[serde(default = "default_per_chunk_timeout_ms")]
52 pub per_chunk_timeout_ms: u64,
53
54 #[serde(default = "default_min_timeout_ms")]
56 pub min_timeout_ms: u64,
57
58 #[serde(default = "default_max_timeout_ms")]
60 pub max_timeout_ms: u64,
61
62 #[serde(default = "default_between_chunk_timeout_ms")]
64 pub between_chunk_timeout_ms: u64,
65}
66
67#[derive(Serialize, Deserialize, Clone)]
70pub struct RetryPolicies {
71 #[serde(default)]
74 pub append_entries: BackoffPolicy,
75
76 #[serde(default)]
79 pub election: BackoffPolicy,
80
81 #[serde(default)]
84 pub membership: BackoffPolicy,
85
86 #[serde(default)]
88 pub auto_discovery: BackoffPolicy,
89
90 #[serde(default)]
92 pub join_cluster: BackoffPolicy,
93
94 #[serde(default)]
96 pub install_snapshot: InstallSnapshotBackoffPolicy,
97
98 #[serde(default)]
100 pub purge_log: BackoffPolicy,
101
102 #[serde(default)]
105 pub healthcheck: BackoffPolicy,
106
107 #[serde(default)]
110 pub internal_quorum: BackoffPolicy,
111}
112
113impl Debug for RetryPolicies {
114 fn fmt(
115 &self,
116 f: &mut std::fmt::Formatter<'_>,
117 ) -> std::fmt::Result {
118 f.debug_struct("RetryPolicies").finish()
119 }
120}
121impl Default for RetryPolicies {
123 fn default() -> Self {
124 Self {
125 append_entries: BackoffPolicy {
126 max_retries: 1,
127 timeout_ms: 100,
128 base_delay_ms: 50,
129 max_delay_ms: 1000,
130 },
131 election: BackoffPolicy {
132 max_retries: 3,
134 timeout_ms: 100,
135 base_delay_ms: 50,
136 max_delay_ms: 5000,
137 },
138 membership: BackoffPolicy {
139 max_retries: 120,
140 timeout_ms: 500,
141 base_delay_ms: 3000,
142 max_delay_ms: 60000,
143 },
144 healthcheck: BackoffPolicy {
145 max_retries: 10000,
146 timeout_ms: 100,
147 base_delay_ms: 1000,
148 max_delay_ms: 10000,
149 },
150
151 auto_discovery: BackoffPolicy {
152 max_retries: 3,
153 timeout_ms: 100,
154 base_delay_ms: 50,
155 max_delay_ms: 1000,
156 },
157
158 join_cluster: BackoffPolicy {
159 max_retries: 3,
160 timeout_ms: 500,
161 base_delay_ms: 3000,
162 max_delay_ms: 60000,
163 },
164
165 install_snapshot: InstallSnapshotBackoffPolicy {
180 max_retries: 3,
181 timeout_ms: 500,
182 base_delay_ms: 50,
183 max_delay_ms: 2000,
184 per_chunk_timeout_ms: default_per_chunk_timeout_ms(),
185 min_timeout_ms: default_min_timeout_ms(),
186 max_timeout_ms: default_max_timeout_ms(),
187 between_chunk_timeout_ms: default_between_chunk_timeout_ms(),
188 },
189
190 purge_log: BackoffPolicy {
191 max_retries: 1,
192 timeout_ms: 100,
193 base_delay_ms: 50,
194 max_delay_ms: 1000,
195 },
196 internal_quorum: BackoffPolicy {
197 max_retries: 3,
200 timeout_ms: 100,
201 base_delay_ms: 50,
202 max_delay_ms: 1000,
203 },
204 }
205 }
206}
207impl BackoffPolicy {
208 pub fn validate(
215 &self,
216 policy_name: &str,
217 ) -> Result<()> {
218 if self.max_retries == 0 {
220 return Err(Error::Config(ConfigError::Message(format!(
221 "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
222 ))));
223 }
224
225 if self.timeout_ms == 0 {
227 return Err(Error::Config(ConfigError::Message(format!(
228 "{policy_name}: timeout_ms cannot be 0"
229 ))));
230 }
231
232 if self.base_delay_ms >= self.max_delay_ms {
234 return Err(Error::Config(ConfigError::Message(format!(
235 "{}: base_delay_ms({}) must be less than max_delay_ms({})",
236 policy_name, self.base_delay_ms, self.max_delay_ms
237 ))));
238 }
239
240 if self.max_delay_ms > 120_000 {
242 return Err(Error::Config(ConfigError::Message(format!(
244 "{}: max_delay_ms({}) exceeds 2min limit",
245 policy_name, self.max_delay_ms
246 ))));
247 }
248
249 Ok(())
250 }
251}
252
253impl InstallSnapshotBackoffPolicy {
254 pub fn validate(
261 &self,
262 policy_name: &str,
263 ) -> Result<()> {
264 self.validate_base_policy(policy_name)?;
266
267 self.validate_chunk_timeouts(policy_name)?;
269
270 self.validate_rpc_timeouts(policy_name)?;
272
273 Ok(())
274 }
275
276 fn validate_base_policy(
278 &self,
279 policy_name: &str,
280 ) -> Result<()> {
281 if self.max_retries == 0 {
282 return Err(Error::Config(ConfigError::Message(format!(
283 "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
284 ))));
285 }
286
287 if self.timeout_ms == 0 {
288 return Err(Error::Config(ConfigError::Message(format!(
289 "{policy_name}: timeout_ms cannot be 0"
290 ))));
291 }
292
293 if self.base_delay_ms >= self.max_delay_ms {
294 return Err(Error::Config(ConfigError::Message(format!(
295 "{}: base_delay_ms({}) must be less than max_delay_ms({})",
296 policy_name, self.base_delay_ms, self.max_delay_ms
297 ))));
298 }
299
300 if self.max_delay_ms > 120_000 {
301 return Err(Error::Config(ConfigError::Message(format!(
302 "{}: max_delay_ms({}) exceeds 2min limit",
303 policy_name, self.max_delay_ms
304 ))));
305 }
306
307 Ok(())
308 }
309
310 fn validate_chunk_timeouts(
312 &self,
313 policy_name: &str,
314 ) -> Result<()> {
315 if self.per_chunk_timeout_ms == 0 {
316 return Err(Error::Config(ConfigError::Message(format!(
317 "{policy_name}: per_chunk_timeout_ms cannot be 0"
318 ))));
319 }
320
321 if self.between_chunk_timeout_ms == 0 {
322 return Err(Error::Config(ConfigError::Message(format!(
323 "{policy_name}: between_chunk_timeout_ms cannot be 0"
324 ))));
325 }
326
327 Ok(())
328 }
329
330 fn validate_rpc_timeouts(
332 &self,
333 policy_name: &str,
334 ) -> Result<()> {
335 const MAX_RPC_TIMEOUT: u64 = 86_400_000; if self.min_timeout_ms == 0 {
338 return Err(Error::Config(ConfigError::Message(format!(
339 "{policy_name}: min_timeout_ms cannot be 0"
340 ))));
341 }
342
343 if self.max_timeout_ms == 0 {
344 return Err(Error::Config(ConfigError::Message(format!(
345 "{policy_name}: max_timeout_ms cannot be 0"
346 ))));
347 }
348
349 if self.min_timeout_ms > self.max_timeout_ms {
350 return Err(Error::Config(ConfigError::Message(format!(
351 "{}: min_timeout_ms({}) > max_timeout_ms({})",
352 policy_name, self.min_timeout_ms, self.max_timeout_ms
353 ))));
354 }
355
356 if self.max_timeout_ms > MAX_RPC_TIMEOUT {
357 return Err(Error::Config(ConfigError::Message(format!(
358 "{}: max_timeout_ms({}) exceeds 24-hour limit",
359 policy_name, self.max_timeout_ms
360 ))));
361 }
362
363 Ok(())
364 }
365}
366
367impl RetryPolicies {
368 pub fn validate(&self) -> Result<()> {
370 self.validate_append_entries()?;
371 self.validate_election()?;
372 self.validate_membership()?;
373 self.validate_healthcheck()?;
374 self.validate_auto_discovery()?;
375 self.validate_join_cluster()?;
376 self.validate_install_snapshot()?;
377 self.validate_purge_log()?;
378 self.validate_internal_quorum()?;
379 Ok(())
380 }
381
382 fn validate_append_entries(&self) -> Result<()> {
383 self.append_entries.validate("append_entries")?;
384
385 Ok(())
386 }
387
388 fn validate_election(&self) -> Result<()> {
389 self.election.validate("election")?;
390
391 Ok(())
392 }
393
394 fn validate_membership(&self) -> Result<()> {
395 self.membership.validate("membership")?;
396
397 Ok(())
398 }
399
400 fn validate_healthcheck(&self) -> Result<()> {
401 self.healthcheck.validate("healthcheck")?;
402
403 Ok(())
404 }
405
406 fn validate_purge_log(&self) -> Result<()> {
407 self.purge_log.validate("purge_log")?;
408
409 Ok(())
410 }
411
412 fn validate_install_snapshot(&self) -> Result<()> {
413 self.install_snapshot.validate("install_snapshot")?;
414
415 Ok(())
416 }
417
418 fn validate_join_cluster(&self) -> Result<()> {
419 self.join_cluster.validate("join_cluster")?;
420
421 Ok(())
422 }
423
424 fn validate_auto_discovery(&self) -> Result<()> {
425 self.auto_discovery.validate("auto_discovery")?;
426
427 Ok(())
428 }
429
430 fn validate_internal_quorum(&self) -> Result<()> {
431 self.internal_quorum.validate("internal_quorum")?;
432
433 Ok(())
434 }
435}
436
437fn default_max_retries() -> usize {
438 3
439}
440fn default_op_timeout_ms() -> u64 {
441 100
442}
443fn default_base_delay_ms() -> u64 {
444 50
445}
446fn default_max_delay_ms() -> u64 {
447 1000
448}
449fn default_per_chunk_timeout_ms() -> u64 {
450 100
451}
452fn default_min_timeout_ms() -> u64 {
453 100
454}
455fn default_max_timeout_ms() -> u64 {
456 30_000
457}
458fn default_between_chunk_timeout_ms() -> u64 {
459 30_000
460}