1use std::fmt::Debug;
2
3use config::ConfigError;
4use serde::Deserialize;
5use serde::Serialize;
6
7use crate::Error;
8use crate::Result;
9
10#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
12pub struct BackoffPolicy {
13 #[serde(default = "default_max_retries")]
15 pub max_retries: usize,
16
17 #[serde(default = "default_op_timeout_ms")]
19 pub timeout_ms: u64,
20
21 #[serde(default = "default_base_delay_ms")]
23 pub base_delay_ms: u64,
24
25 #[serde(default = "default_max_delay_ms")]
27 pub max_delay_ms: u64,
28}
29
30#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
32pub struct InstallSnapshotBackoffPolicy {
33 #[serde(default = "default_max_retries")]
35 pub max_retries: usize,
36
37 #[serde(default = "default_op_timeout_ms")]
39 pub timeout_ms: u64,
40
41 #[serde(default = "default_base_delay_ms")]
43 pub base_delay_ms: u64,
44
45 #[serde(default = "default_max_delay_ms")]
47 pub max_delay_ms: u64,
48
49 #[serde(default = "default_per_chunk_timeout_ms")]
52 pub per_chunk_timeout_ms: u64,
53
54 #[serde(default = "default_min_timeout_ms")]
56 pub min_timeout_ms: u64,
57
58 #[serde(default = "default_max_timeout_ms")]
60 pub max_timeout_ms: u64,
61
62 #[serde(default = "default_between_chunk_timeout_ms")]
64 pub between_chunk_timeout_ms: u64,
65}
66
67#[derive(Serialize, Deserialize, Clone)]
70pub struct RetryPolicies {
71 #[serde(default)]
74 pub append_entries: BackoffPolicy,
75
76 #[serde(default)]
79 pub election: BackoffPolicy,
80
81 #[serde(default)]
84 pub membership: BackoffPolicy,
85
86 #[serde(default)]
88 pub auto_discovery: BackoffPolicy,
89
90 #[serde(default)]
92 pub join_cluster: BackoffPolicy,
93
94 #[serde(default)]
96 pub install_snapshot: InstallSnapshotBackoffPolicy,
97
98 #[serde(default)]
101 pub healthcheck: BackoffPolicy,
102
103 #[serde(default)]
106 pub internal_quorum: BackoffPolicy,
107}
108
109impl Debug for RetryPolicies {
110 fn fmt(
111 &self,
112 f: &mut std::fmt::Formatter<'_>,
113 ) -> std::fmt::Result {
114 f.debug_struct("RetryPolicies").finish()
115 }
116}
117impl Default for RetryPolicies {
119 fn default() -> Self {
120 Self {
121 append_entries: BackoffPolicy {
122 max_retries: 1,
123 timeout_ms: 100,
124 base_delay_ms: 50,
125 max_delay_ms: 1000,
126 },
127 election: BackoffPolicy {
128 max_retries: 3,
130 timeout_ms: 100,
131 base_delay_ms: 50,
132 max_delay_ms: 5000,
133 },
134 membership: BackoffPolicy {
135 max_retries: 120,
136 timeout_ms: 500,
137 base_delay_ms: 3000,
138 max_delay_ms: 60000,
139 },
140 healthcheck: BackoffPolicy {
141 max_retries: 10000,
142 timeout_ms: 100,
143 base_delay_ms: 1000,
144 max_delay_ms: 10000,
145 },
146
147 auto_discovery: BackoffPolicy {
148 max_retries: 3,
149 timeout_ms: 100,
150 base_delay_ms: 50,
151 max_delay_ms: 1000,
152 },
153
154 join_cluster: BackoffPolicy {
155 max_retries: 3,
156 timeout_ms: 500,
157 base_delay_ms: 3000,
158 max_delay_ms: 60000,
159 },
160
161 install_snapshot: InstallSnapshotBackoffPolicy {
176 max_retries: 3,
177 timeout_ms: 500,
178 base_delay_ms: 50,
179 max_delay_ms: 2000,
180 per_chunk_timeout_ms: default_per_chunk_timeout_ms(),
181 min_timeout_ms: default_min_timeout_ms(),
182 max_timeout_ms: default_max_timeout_ms(),
183 between_chunk_timeout_ms: default_between_chunk_timeout_ms(),
184 },
185
186 internal_quorum: BackoffPolicy {
187 max_retries: 3,
190 timeout_ms: 100,
191 base_delay_ms: 50,
192 max_delay_ms: 1000,
193 },
194 }
195 }
196}
197impl BackoffPolicy {
198 pub fn validate(
205 &self,
206 policy_name: &str,
207 ) -> Result<()> {
208 if self.max_retries == 0 {
210 return Err(Error::Config(ConfigError::Message(format!(
211 "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
212 ))));
213 }
214
215 if self.timeout_ms == 0 {
217 return Err(Error::Config(ConfigError::Message(format!(
218 "{policy_name}: timeout_ms cannot be 0"
219 ))));
220 }
221
222 if self.base_delay_ms >= self.max_delay_ms {
224 return Err(Error::Config(ConfigError::Message(format!(
225 "{}: base_delay_ms({}) must be less than max_delay_ms({})",
226 policy_name, self.base_delay_ms, self.max_delay_ms
227 ))));
228 }
229
230 if self.max_delay_ms > 120_000 {
232 return Err(Error::Config(ConfigError::Message(format!(
234 "{}: max_delay_ms({}) exceeds 2min limit",
235 policy_name, self.max_delay_ms
236 ))));
237 }
238
239 Ok(())
240 }
241}
242
243impl InstallSnapshotBackoffPolicy {
244 pub fn validate(
251 &self,
252 policy_name: &str,
253 ) -> Result<()> {
254 self.validate_base_policy(policy_name)?;
256
257 self.validate_chunk_timeouts(policy_name)?;
259
260 self.validate_rpc_timeouts(policy_name)?;
262
263 Ok(())
264 }
265
266 fn validate_base_policy(
268 &self,
269 policy_name: &str,
270 ) -> Result<()> {
271 if self.max_retries == 0 {
272 return Err(Error::Config(ConfigError::Message(format!(
273 "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
274 ))));
275 }
276
277 if self.timeout_ms == 0 {
278 return Err(Error::Config(ConfigError::Message(format!(
279 "{policy_name}: timeout_ms cannot be 0"
280 ))));
281 }
282
283 if self.base_delay_ms >= self.max_delay_ms {
284 return Err(Error::Config(ConfigError::Message(format!(
285 "{}: base_delay_ms({}) must be less than max_delay_ms({})",
286 policy_name, self.base_delay_ms, self.max_delay_ms
287 ))));
288 }
289
290 if self.max_delay_ms > 120_000 {
291 return Err(Error::Config(ConfigError::Message(format!(
292 "{}: max_delay_ms({}) exceeds 2min limit",
293 policy_name, self.max_delay_ms
294 ))));
295 }
296
297 Ok(())
298 }
299
300 fn validate_chunk_timeouts(
302 &self,
303 policy_name: &str,
304 ) -> Result<()> {
305 if self.per_chunk_timeout_ms == 0 {
306 return Err(Error::Config(ConfigError::Message(format!(
307 "{policy_name}: per_chunk_timeout_ms cannot be 0"
308 ))));
309 }
310
311 if self.between_chunk_timeout_ms == 0 {
312 return Err(Error::Config(ConfigError::Message(format!(
313 "{policy_name}: between_chunk_timeout_ms cannot be 0"
314 ))));
315 }
316
317 Ok(())
318 }
319
320 fn validate_rpc_timeouts(
322 &self,
323 policy_name: &str,
324 ) -> Result<()> {
325 const MAX_RPC_TIMEOUT: u64 = 86_400_000; if self.min_timeout_ms == 0 {
328 return Err(Error::Config(ConfigError::Message(format!(
329 "{policy_name}: min_timeout_ms cannot be 0"
330 ))));
331 }
332
333 if self.max_timeout_ms == 0 {
334 return Err(Error::Config(ConfigError::Message(format!(
335 "{policy_name}: max_timeout_ms cannot be 0"
336 ))));
337 }
338
339 if self.min_timeout_ms > self.max_timeout_ms {
340 return Err(Error::Config(ConfigError::Message(format!(
341 "{}: min_timeout_ms({}) > max_timeout_ms({})",
342 policy_name, self.min_timeout_ms, self.max_timeout_ms
343 ))));
344 }
345
346 if self.max_timeout_ms > MAX_RPC_TIMEOUT {
347 return Err(Error::Config(ConfigError::Message(format!(
348 "{}: max_timeout_ms({}) exceeds 24-hour limit",
349 policy_name, self.max_timeout_ms
350 ))));
351 }
352
353 Ok(())
354 }
355}
356
357impl RetryPolicies {
358 pub fn validate(&self) -> Result<()> {
360 self.validate_append_entries()?;
361 self.validate_election()?;
362 self.validate_membership()?;
363 self.validate_healthcheck()?;
364 self.validate_auto_discovery()?;
365 self.validate_join_cluster()?;
366 self.validate_install_snapshot()?;
367 self.validate_internal_quorum()?;
368 Ok(())
369 }
370
371 fn validate_append_entries(&self) -> Result<()> {
372 self.append_entries.validate("append_entries")?;
373
374 Ok(())
375 }
376
377 fn validate_election(&self) -> Result<()> {
378 self.election.validate("election")?;
379
380 Ok(())
381 }
382
383 fn validate_membership(&self) -> Result<()> {
384 self.membership.validate("membership")?;
385
386 Ok(())
387 }
388
389 fn validate_healthcheck(&self) -> Result<()> {
390 self.healthcheck.validate("healthcheck")?;
391
392 Ok(())
393 }
394
395 fn validate_install_snapshot(&self) -> Result<()> {
396 self.install_snapshot.validate("install_snapshot")?;
397
398 Ok(())
399 }
400
401 fn validate_join_cluster(&self) -> Result<()> {
402 self.join_cluster.validate("join_cluster")?;
403
404 Ok(())
405 }
406
407 fn validate_auto_discovery(&self) -> Result<()> {
408 self.auto_discovery.validate("auto_discovery")?;
409
410 Ok(())
411 }
412
413 fn validate_internal_quorum(&self) -> Result<()> {
414 self.internal_quorum.validate("internal_quorum")?;
415
416 Ok(())
417 }
418}
419
420fn default_max_retries() -> usize {
421 3
422}
423fn default_op_timeout_ms() -> u64 {
424 100
425}
426fn default_base_delay_ms() -> u64 {
427 50
428}
429fn default_max_delay_ms() -> u64 {
430 1000
431}
432fn default_per_chunk_timeout_ms() -> u64 {
433 100
434}
435fn default_min_timeout_ms() -> u64 {
436 100
437}
438fn default_max_timeout_ms() -> u64 {
439 30_000
440}
441fn default_between_chunk_timeout_ms() -> u64 {
442 30_000
443}