Skip to main content

d_engine_core/config/
retry.rs

1use std::fmt::Debug;
2
3use config::ConfigError;
4use serde::Deserialize;
5use serde::Serialize;
6
7use crate::Error;
8use crate::Result;
9
10/// Configuration for exponential backoff retry strategy
11#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
12pub struct BackoffPolicy {
13    /// Maximum number of retries (0 means unlimited retries)
14    #[serde(default = "default_max_retries")]
15    pub max_retries: usize,
16
17    /// Single operation timeout (unit: milliseconds)
18    #[serde(default = "default_op_timeout_ms")]
19    pub timeout_ms: u64,
20
21    /// Backoff base (unit: milliseconds)
22    #[serde(default = "default_base_delay_ms")]
23    pub base_delay_ms: u64,
24
25    /// Maximum backoff time (unit: milliseconds)
26    #[serde(default = "default_max_delay_ms")]
27    pub max_delay_ms: u64,
28}
29
30/// Configuration for exponential backoff retry strategy
31#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
32pub struct InstallSnapshotBackoffPolicy {
33    /// Maximum number of retries (0 means unlimited retries)
34    #[serde(default = "default_max_retries")]
35    pub max_retries: usize,
36
37    /// Single operation timeout (unit: milliseconds)
38    #[serde(default = "default_op_timeout_ms")]
39    pub timeout_ms: u64,
40
41    /// Backoff base (unit: milliseconds)
42    #[serde(default = "default_base_delay_ms")]
43    pub base_delay_ms: u64,
44
45    /// Maximum backoff time (unit: milliseconds)
46    #[serde(default = "default_max_delay_ms")]
47    pub max_delay_ms: u64,
48
49    // New fields for snapshot transfers
50    /// Timeout per chunk during transfer (milliseconds)
51    #[serde(default = "default_per_chunk_timeout_ms")]
52    pub per_chunk_timeout_ms: u64,
53
54    /// Minimum overall timeout for snapshot RPC (milliseconds)
55    #[serde(default = "default_min_timeout_ms")]
56    pub min_timeout_ms: u64,
57
58    /// Maximum overall timeout for snapshot RPC (milliseconds)
59    #[serde(default = "default_max_timeout_ms")]
60    pub max_timeout_ms: u64,
61
62    /// Timeout between chunks on receiver side (milliseconds)
63    #[serde(default = "default_between_chunk_timeout_ms")]
64    pub between_chunk_timeout_ms: u64,
65
66    /// Number of consecutive leader-initiated snapshot push failures before emitting an
67    /// error-level alert (tracing::error! + metrics counter).
68    ///
69    /// Protecting the leader's replication throughput is the highest priority.
70    /// A value of 5 avoids alert storms from brief network partitions while still
71    /// surfacing persistent peer failures (disk full, crashed node) within seconds.
72    #[serde(default = "default_push_failure_alert_threshold")]
73    pub push_failure_alert_threshold: u32,
74
75    /// Maximum backoff interval between consecutive leader-initiated snapshot push
76    /// attempts for the same peer (milliseconds).
77    ///
78    /// Intentionally higher than `max_delay_ms` (chunk-level RPC retry cap):
79    /// once the chunk-level retries are exhausted, the leader should wait a full
80    /// 30 s before re-attempting the entire transfer. This prevents a permanently
81    /// unreachable peer from consuming I/O bandwidth on every heartbeat cycle and
82    /// starving healthy followers' AppendEntries. Leader protection is the
83    /// highest priority.
84    #[serde(default = "default_push_backoff_max_delay_ms")]
85    pub push_backoff_max_delay_ms: u64,
86}
87
88/// Domain-specific retry strategy configurations for Raft subsystems
89/// Enables fine-grained control over different RPC types and operations
90#[derive(Serialize, Deserialize, Clone)]
91pub struct RetryPolicies {
92    /// Retry policy for AppendEntries RPC operations
93    /// Governs log replication attempts between leader and followers
94    #[serde(default)]
95    pub append_entries: BackoffPolicy,
96
97    /// Retry policy for RequestVote RPC operations
98    /// Controls election-related communication retry behavior
99    #[serde(default)]
100    pub election: BackoffPolicy,
101
102    /// Retry policy for cluster membership changes
103    /// Requires higher reliability for configuration change operations
104    #[serde(default)]
105    pub membership: BackoffPolicy,
106
107    /// Retry policy for leader auto discovery request
108    #[serde(default)]
109    pub auto_discovery: BackoffPolicy,
110
111    /// Retry policy for join cluster request
112    #[serde(default)]
113    pub join_cluster: BackoffPolicy,
114
115    /// Retry policy for install snapshot requests
116    #[serde(default)]
117    pub install_snapshot: InstallSnapshotBackoffPolicy,
118
119    /// Retry policy for node health checks
120    /// Optimized for frequent liveness detection with lower overhead
121    #[serde(default)]
122    pub healthcheck: BackoffPolicy,
123
124    /// Retry policy for internal quorum verification
125    /// Used to confirm leadership status through internal consensus checks
126    #[serde(default)]
127    pub internal_quorum: BackoffPolicy,
128}
129
130impl Debug for RetryPolicies {
131    fn fmt(
132        &self,
133        f: &mut std::fmt::Formatter<'_>,
134    ) -> std::fmt::Result {
135        f.debug_struct("RetryPolicies").finish()
136    }
137}
138// Default value implementation
139impl Default for RetryPolicies {
140    fn default() -> Self {
141        Self {
142            append_entries: BackoffPolicy {
143                max_retries: 1,
144                timeout_ms: 100,
145                base_delay_ms: 50,
146                max_delay_ms: 1000,
147            },
148            election: BackoffPolicy {
149                // Note: `retries` > 3 might prevent a successful election.
150                max_retries: 3,
151                timeout_ms: 100,
152                base_delay_ms: 50,
153                max_delay_ms: 5000,
154            },
155            membership: BackoffPolicy {
156                max_retries: 120,
157                timeout_ms: 500,
158                base_delay_ms: 3000,
159                max_delay_ms: 60000,
160            },
161            healthcheck: BackoffPolicy {
162                max_retries: 10000,
163                timeout_ms: 100,
164                base_delay_ms: 1000,
165                max_delay_ms: 10000,
166            },
167
168            auto_discovery: BackoffPolicy {
169                max_retries: 3,
170                timeout_ms: 100,
171                base_delay_ms: 50,
172                max_delay_ms: 1000,
173            },
174
175            join_cluster: BackoffPolicy {
176                max_retries: 3,
177                timeout_ms: 500,
178                base_delay_ms: 3000,
179                max_delay_ms: 60000,
180            },
181
182            // Recommended configuration examples for different network scenarios:
183            //
184            // Scenario                      Recommended Settings
185            // ------------------------------------------------------------
186            // Local Data Center (Low Latency)     -> max_retries=3, timeout_ms=500,
187            // max_delay_ms=2000
188            //
189            // Cross-Region Network (High Latency) -> max_retries=10,
190            // timeout_ms=5000, max_delay_ms=10000
191            //
192            // Edge Network (Unstable)
193            // -> max_retries=10, timeout_ms=3000, max_delay_ms=30000
194            //
195            // Current config:
196            install_snapshot: InstallSnapshotBackoffPolicy {
197                max_retries: 3,
198                timeout_ms: 500,
199                base_delay_ms: 50,
200                max_delay_ms: 2000,
201                per_chunk_timeout_ms: default_per_chunk_timeout_ms(),
202                min_timeout_ms: default_min_timeout_ms(),
203                max_timeout_ms: default_max_timeout_ms(),
204                between_chunk_timeout_ms: default_between_chunk_timeout_ms(),
205                push_failure_alert_threshold: default_push_failure_alert_threshold(),
206                push_backoff_max_delay_ms: default_push_backoff_max_delay_ms(),
207            },
208
209            internal_quorum: BackoffPolicy {
210                // Minimum must be 3: the first quorum check may fail if the leader is newly elected
211                // and followers haven't yet advanced their next_index
212                max_retries: 3,
213                timeout_ms: 100,
214                base_delay_ms: 50,
215                max_delay_ms: 1000,
216            },
217        }
218    }
219}
220impl BackoffPolicy {
221    /// Validates backoff policy parameters
222    /// # Errors
223    /// Returns `Error::InvalidConfig` when:
224    /// - Timeout exceeds maximum delay
225    /// - Base delay > max delay
226    /// - Infinite retries without proper safeguards
227    pub fn validate(
228        &self,
229        policy_name: &str,
230    ) -> Result<()> {
231        // Validate retry limits
232        if self.max_retries == 0 {
233            return Err(Error::Config(ConfigError::Message(format!(
234                "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
235            ))));
236        }
237
238        // Validate timeout constraints
239        if self.timeout_ms == 0 {
240            return Err(Error::Config(ConfigError::Message(format!(
241                "{policy_name}: timeout_ms cannot be 0"
242            ))));
243        }
244
245        // Validate delay progression
246        if self.base_delay_ms >= self.max_delay_ms {
247            return Err(Error::Config(ConfigError::Message(format!(
248                "{}: base_delay_ms({}) must be less than max_delay_ms({})",
249                policy_name, self.base_delay_ms, self.max_delay_ms
250            ))));
251        }
252
253        // Ensure reasonable maximums
254        if self.max_delay_ms > 120_000 {
255            // 2 minutes
256            return Err(Error::Config(ConfigError::Message(format!(
257                "{}: max_delay_ms({}) exceeds 2min limit",
258                policy_name, self.max_delay_ms
259            ))));
260        }
261
262        Ok(())
263    }
264}
265
266impl InstallSnapshotBackoffPolicy {
267    /// Validates snapshot backoff policy parameters
268    /// # Errors
269    /// Returns `Error::InvalidConfig` when:
270    /// - Inherited backoff parameters fail validation (see BackoffPolicy)
271    /// - Timeouts for snapshot chunks/transfers are invalid
272    /// - Snapshot RPC timeout constraints are violated
273    pub fn validate(
274        &self,
275        policy_name: &str,
276    ) -> Result<()> {
277        // First validate common backoff parameters
278        self.validate_base_policy(policy_name)?;
279
280        // Validate chunk-related timeouts
281        self.validate_chunk_timeouts(policy_name)?;
282
283        // Validate snapshot RPC timeout constraints
284        self.validate_rpc_timeouts(policy_name)?;
285
286        Ok(())
287    }
288
289    /// Validate parameters inherited from BackoffPolicy
290    fn validate_base_policy(
291        &self,
292        policy_name: &str,
293    ) -> Result<()> {
294        if self.max_retries == 0 {
295            return Err(Error::Config(ConfigError::Message(format!(
296                "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
297            ))));
298        }
299
300        if self.timeout_ms == 0 {
301            return Err(Error::Config(ConfigError::Message(format!(
302                "{policy_name}: timeout_ms cannot be 0"
303            ))));
304        }
305
306        if self.base_delay_ms >= self.max_delay_ms {
307            return Err(Error::Config(ConfigError::Message(format!(
308                "{}: base_delay_ms({}) must be less than max_delay_ms({})",
309                policy_name, self.base_delay_ms, self.max_delay_ms
310            ))));
311        }
312
313        if self.max_delay_ms > 120_000 {
314            return Err(Error::Config(ConfigError::Message(format!(
315                "{}: max_delay_ms({}) exceeds 2min limit",
316                policy_name, self.max_delay_ms
317            ))));
318        }
319
320        if self.push_failure_alert_threshold == 0 {
321            return Err(Error::Config(ConfigError::Message(format!(
322                "{policy_name}: push_failure_alert_threshold cannot be 0"
323            ))));
324        }
325
326        if self.push_backoff_max_delay_ms < self.max_delay_ms {
327            return Err(Error::Config(ConfigError::Message(format!(
328                "{}: push_backoff_max_delay_ms({}) must be >= max_delay_ms({}) \
329                 to ensure leader-side backoff is not shorter than chunk-level retry cap",
330                policy_name, self.push_backoff_max_delay_ms, self.max_delay_ms
331            ))));
332        }
333
334        Ok(())
335    }
336
337    /// Validate snapshot chunk transfer parameters
338    fn validate_chunk_timeouts(
339        &self,
340        policy_name: &str,
341    ) -> Result<()> {
342        if self.per_chunk_timeout_ms == 0 {
343            return Err(Error::Config(ConfigError::Message(format!(
344                "{policy_name}: per_chunk_timeout_ms cannot be 0"
345            ))));
346        }
347
348        if self.between_chunk_timeout_ms == 0 {
349            return Err(Error::Config(ConfigError::Message(format!(
350                "{policy_name}: between_chunk_timeout_ms cannot be 0"
351            ))));
352        }
353
354        Ok(())
355    }
356
357    /// Validate snapshot RPC timeout hierarchy
358    fn validate_rpc_timeouts(
359        &self,
360        policy_name: &str,
361    ) -> Result<()> {
362        const MAX_RPC_TIMEOUT: u64 = 86_400_000; // 24 hours
363
364        if self.min_timeout_ms == 0 {
365            return Err(Error::Config(ConfigError::Message(format!(
366                "{policy_name}: min_timeout_ms cannot be 0"
367            ))));
368        }
369
370        if self.max_timeout_ms == 0 {
371            return Err(Error::Config(ConfigError::Message(format!(
372                "{policy_name}: max_timeout_ms cannot be 0"
373            ))));
374        }
375
376        if self.min_timeout_ms > self.max_timeout_ms {
377            return Err(Error::Config(ConfigError::Message(format!(
378                "{}: min_timeout_ms({}) > max_timeout_ms({})",
379                policy_name, self.min_timeout_ms, self.max_timeout_ms
380            ))));
381        }
382
383        if self.max_timeout_ms > MAX_RPC_TIMEOUT {
384            return Err(Error::Config(ConfigError::Message(format!(
385                "{}: max_timeout_ms({}) exceeds 24-hour limit",
386                policy_name, self.max_timeout_ms
387            ))));
388        }
389
390        Ok(())
391    }
392}
393
394impl RetryPolicies {
395    /// Validates all retry policies according to Raft protocol requirements
396    pub fn validate(&self) -> Result<()> {
397        self.validate_append_entries()?;
398        self.validate_election()?;
399        self.validate_membership()?;
400        self.validate_healthcheck()?;
401        self.validate_auto_discovery()?;
402        self.validate_join_cluster()?;
403        self.validate_install_snapshot()?;
404        self.validate_internal_quorum()?;
405        Ok(())
406    }
407
408    fn validate_append_entries(&self) -> Result<()> {
409        self.append_entries.validate("append_entries")?;
410
411        Ok(())
412    }
413
414    fn validate_election(&self) -> Result<()> {
415        self.election.validate("election")?;
416
417        Ok(())
418    }
419
420    fn validate_membership(&self) -> Result<()> {
421        self.membership.validate("membership")?;
422
423        Ok(())
424    }
425
426    fn validate_healthcheck(&self) -> Result<()> {
427        self.healthcheck.validate("healthcheck")?;
428
429        Ok(())
430    }
431
432    fn validate_install_snapshot(&self) -> Result<()> {
433        self.install_snapshot.validate("install_snapshot")?;
434
435        Ok(())
436    }
437
438    fn validate_join_cluster(&self) -> Result<()> {
439        self.join_cluster.validate("join_cluster")?;
440
441        Ok(())
442    }
443
444    fn validate_auto_discovery(&self) -> Result<()> {
445        self.auto_discovery.validate("auto_discovery")?;
446
447        Ok(())
448    }
449
450    fn validate_internal_quorum(&self) -> Result<()> {
451        self.internal_quorum.validate("internal_quorum")?;
452
453        Ok(())
454    }
455}
456
457fn default_max_retries() -> usize {
458    3
459}
460fn default_op_timeout_ms() -> u64 {
461    100
462}
463fn default_base_delay_ms() -> u64 {
464    50
465}
466fn default_max_delay_ms() -> u64 {
467    1000
468}
469fn default_per_chunk_timeout_ms() -> u64 {
470    100
471}
472fn default_min_timeout_ms() -> u64 {
473    100
474}
475fn default_max_timeout_ms() -> u64 {
476    30_000
477}
478fn default_between_chunk_timeout_ms() -> u64 {
479    30_000
480}
481fn default_push_failure_alert_threshold() -> u32 {
482    5
483}
484fn default_push_backoff_max_delay_ms() -> u64 {
485    30_000
486}