d_engine_core/config/
retry.rs

1use std::fmt::Debug;
2
3use config::ConfigError;
4use serde::Deserialize;
5use serde::Serialize;
6
7use crate::Error;
8use crate::Result;
9
10/// Configuration for exponential backoff retry strategy
11#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
12pub struct BackoffPolicy {
13    /// Maximum number of retries (0 means unlimited retries)
14    #[serde(default = "default_max_retries")]
15    pub max_retries: usize,
16
17    /// Single operation timeout (unit: milliseconds)
18    #[serde(default = "default_op_timeout_ms")]
19    pub timeout_ms: u64,
20
21    /// Backoff base (unit: milliseconds)
22    #[serde(default = "default_base_delay_ms")]
23    pub base_delay_ms: u64,
24
25    /// Maximum backoff time (unit: milliseconds)
26    #[serde(default = "default_max_delay_ms")]
27    pub max_delay_ms: u64,
28}
29
30/// Configuration for exponential backoff retry strategy
31#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
32pub struct InstallSnapshotBackoffPolicy {
33    /// Maximum number of retries (0 means unlimited retries)
34    #[serde(default = "default_max_retries")]
35    pub max_retries: usize,
36
37    /// Single operation timeout (unit: milliseconds)
38    #[serde(default = "default_op_timeout_ms")]
39    pub timeout_ms: u64,
40
41    /// Backoff base (unit: milliseconds)
42    #[serde(default = "default_base_delay_ms")]
43    pub base_delay_ms: u64,
44
45    /// Maximum backoff time (unit: milliseconds)
46    #[serde(default = "default_max_delay_ms")]
47    pub max_delay_ms: u64,
48
49    // New fields for snapshot transfers
50    /// Timeout per chunk during transfer (milliseconds)
51    #[serde(default = "default_per_chunk_timeout_ms")]
52    pub per_chunk_timeout_ms: u64,
53
54    /// Minimum overall timeout for snapshot RPC (milliseconds)
55    #[serde(default = "default_min_timeout_ms")]
56    pub min_timeout_ms: u64,
57
58    /// Maximum overall timeout for snapshot RPC (milliseconds)
59    #[serde(default = "default_max_timeout_ms")]
60    pub max_timeout_ms: u64,
61
62    /// Timeout between chunks on receiver side (milliseconds)
63    #[serde(default = "default_between_chunk_timeout_ms")]
64    pub between_chunk_timeout_ms: u64,
65}
66
67/// Domain-specific retry strategy configurations for Raft subsystems
68/// Enables fine-grained control over different RPC types and operations
69#[derive(Serialize, Deserialize, Clone)]
70pub struct RetryPolicies {
71    /// Retry policy for AppendEntries RPC operations
72    /// Governs log replication attempts between leader and followers
73    #[serde(default)]
74    pub append_entries: BackoffPolicy,
75
76    /// Retry policy for RequestVote RPC operations
77    /// Controls election-related communication retry behavior
78    #[serde(default)]
79    pub election: BackoffPolicy,
80
81    /// Retry policy for cluster membership changes
82    /// Requires higher reliability for configuration change operations
83    #[serde(default)]
84    pub membership: BackoffPolicy,
85
86    /// Retry policy for leader auto discovery request
87    #[serde(default)]
88    pub auto_discovery: BackoffPolicy,
89
90    /// Retry policy for join cluster request
91    #[serde(default)]
92    pub join_cluster: BackoffPolicy,
93
94    /// Retry policy for install snapshot requests
95    #[serde(default)]
96    pub install_snapshot: InstallSnapshotBackoffPolicy,
97
98    /// Retry policy for purge log requests
99    #[serde(default)]
100    pub purge_log: BackoffPolicy,
101
102    /// Retry policy for node health checks
103    /// Optimized for frequent liveness detection with lower overhead
104    #[serde(default)]
105    pub healthcheck: BackoffPolicy,
106
107    /// Retry policy for internal quorum verification
108    /// Used to confirm leadership status through internal consensus checks
109    #[serde(default)]
110    pub internal_quorum: BackoffPolicy,
111}
112
113impl Debug for RetryPolicies {
114    fn fmt(
115        &self,
116        f: &mut std::fmt::Formatter<'_>,
117    ) -> std::fmt::Result {
118        f.debug_struct("RetryPolicies").finish()
119    }
120}
121// Default value implementation
122impl Default for RetryPolicies {
123    fn default() -> Self {
124        Self {
125            append_entries: BackoffPolicy {
126                max_retries: 1,
127                timeout_ms: 100,
128                base_delay_ms: 50,
129                max_delay_ms: 1000,
130            },
131            election: BackoffPolicy {
132                // Note: `retries` > 3 might prevent a successful election.
133                max_retries: 3,
134                timeout_ms: 100,
135                base_delay_ms: 50,
136                max_delay_ms: 5000,
137            },
138            membership: BackoffPolicy {
139                max_retries: 120,
140                timeout_ms: 500,
141                base_delay_ms: 3000,
142                max_delay_ms: 60000,
143            },
144            healthcheck: BackoffPolicy {
145                max_retries: 10000,
146                timeout_ms: 100,
147                base_delay_ms: 1000,
148                max_delay_ms: 10000,
149            },
150
151            auto_discovery: BackoffPolicy {
152                max_retries: 3,
153                timeout_ms: 100,
154                base_delay_ms: 50,
155                max_delay_ms: 1000,
156            },
157
158            join_cluster: BackoffPolicy {
159                max_retries: 3,
160                timeout_ms: 500,
161                base_delay_ms: 3000,
162                max_delay_ms: 60000,
163            },
164
165            // Recommended configuration examples for different network scenarios:
166            //
167            // Scenario                      Recommended Settings
168            // ------------------------------------------------------------
169            // Local Data Center (Low Latency)     -> max_retries=3, timeout_ms=500,
170            // max_delay_ms=2000
171            //
172            // Cross-Region Network (High Latency) -> max_retries=10,
173            // timeout_ms=5000, max_delay_ms=10000
174            //
175            // Edge Network (Unstable)
176            // -> max_retries=10, timeout_ms=3000, max_delay_ms=30000
177            //
178            // Current config:
179            install_snapshot: InstallSnapshotBackoffPolicy {
180                max_retries: 3,
181                timeout_ms: 500,
182                base_delay_ms: 50,
183                max_delay_ms: 2000,
184                per_chunk_timeout_ms: default_per_chunk_timeout_ms(),
185                min_timeout_ms: default_min_timeout_ms(),
186                max_timeout_ms: default_max_timeout_ms(),
187                between_chunk_timeout_ms: default_between_chunk_timeout_ms(),
188            },
189
190            purge_log: BackoffPolicy {
191                max_retries: 1,
192                timeout_ms: 100,
193                base_delay_ms: 50,
194                max_delay_ms: 1000,
195            },
196            internal_quorum: BackoffPolicy {
197                // Minimum must be 3: the first quorum check may fail if the leader is newly elected
198                // and followers haven't yet advanced their next_index
199                max_retries: 3,
200                timeout_ms: 100,
201                base_delay_ms: 50,
202                max_delay_ms: 1000,
203            },
204        }
205    }
206}
207impl BackoffPolicy {
208    /// Validates backoff policy parameters
209    /// # Errors
210    /// Returns `Error::InvalidConfig` when:
211    /// - Timeout exceeds maximum delay
212    /// - Base delay > max delay
213    /// - Infinite retries without proper safeguards
214    pub fn validate(
215        &self,
216        policy_name: &str,
217    ) -> Result<()> {
218        // Validate retry limits
219        if self.max_retries == 0 {
220            return Err(Error::Config(ConfigError::Message(format!(
221                "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
222            ))));
223        }
224
225        // Validate timeout constraints
226        if self.timeout_ms == 0 {
227            return Err(Error::Config(ConfigError::Message(format!(
228                "{policy_name}: timeout_ms cannot be 0"
229            ))));
230        }
231
232        // Validate delay progression
233        if self.base_delay_ms >= self.max_delay_ms {
234            return Err(Error::Config(ConfigError::Message(format!(
235                "{}: base_delay_ms({}) must be less than max_delay_ms({})",
236                policy_name, self.base_delay_ms, self.max_delay_ms
237            ))));
238        }
239
240        // Ensure reasonable maximums
241        if self.max_delay_ms > 120_000 {
242            // 2 minutes
243            return Err(Error::Config(ConfigError::Message(format!(
244                "{}: max_delay_ms({}) exceeds 2min limit",
245                policy_name, self.max_delay_ms
246            ))));
247        }
248
249        Ok(())
250    }
251}
252
253impl InstallSnapshotBackoffPolicy {
254    /// Validates snapshot backoff policy parameters
255    /// # Errors
256    /// Returns `Error::InvalidConfig` when:
257    /// - Inherited backoff parameters fail validation (see BackoffPolicy)
258    /// - Timeouts for snapshot chunks/transfers are invalid
259    /// - Snapshot RPC timeout constraints are violated
260    pub fn validate(
261        &self,
262        policy_name: &str,
263    ) -> Result<()> {
264        // First validate common backoff parameters
265        self.validate_base_policy(policy_name)?;
266
267        // Validate chunk-related timeouts
268        self.validate_chunk_timeouts(policy_name)?;
269
270        // Validate snapshot RPC timeout constraints
271        self.validate_rpc_timeouts(policy_name)?;
272
273        Ok(())
274    }
275
276    /// Validate parameters inherited from BackoffPolicy
277    fn validate_base_policy(
278        &self,
279        policy_name: &str,
280    ) -> Result<()> {
281        if self.max_retries == 0 {
282            return Err(Error::Config(ConfigError::Message(format!(
283                "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
284            ))));
285        }
286
287        if self.timeout_ms == 0 {
288            return Err(Error::Config(ConfigError::Message(format!(
289                "{policy_name}: timeout_ms cannot be 0"
290            ))));
291        }
292
293        if self.base_delay_ms >= self.max_delay_ms {
294            return Err(Error::Config(ConfigError::Message(format!(
295                "{}: base_delay_ms({}) must be less than max_delay_ms({})",
296                policy_name, self.base_delay_ms, self.max_delay_ms
297            ))));
298        }
299
300        if self.max_delay_ms > 120_000 {
301            return Err(Error::Config(ConfigError::Message(format!(
302                "{}: max_delay_ms({}) exceeds 2min limit",
303                policy_name, self.max_delay_ms
304            ))));
305        }
306
307        Ok(())
308    }
309
310    /// Validate snapshot chunk transfer parameters
311    fn validate_chunk_timeouts(
312        &self,
313        policy_name: &str,
314    ) -> Result<()> {
315        if self.per_chunk_timeout_ms == 0 {
316            return Err(Error::Config(ConfigError::Message(format!(
317                "{policy_name}: per_chunk_timeout_ms cannot be 0"
318            ))));
319        }
320
321        if self.between_chunk_timeout_ms == 0 {
322            return Err(Error::Config(ConfigError::Message(format!(
323                "{policy_name}: between_chunk_timeout_ms cannot be 0"
324            ))));
325        }
326
327        Ok(())
328    }
329
330    /// Validate snapshot RPC timeout hierarchy
331    fn validate_rpc_timeouts(
332        &self,
333        policy_name: &str,
334    ) -> Result<()> {
335        const MAX_RPC_TIMEOUT: u64 = 86_400_000; // 24 hours
336
337        if self.min_timeout_ms == 0 {
338            return Err(Error::Config(ConfigError::Message(format!(
339                "{policy_name}: min_timeout_ms cannot be 0"
340            ))));
341        }
342
343        if self.max_timeout_ms == 0 {
344            return Err(Error::Config(ConfigError::Message(format!(
345                "{policy_name}: max_timeout_ms cannot be 0"
346            ))));
347        }
348
349        if self.min_timeout_ms > self.max_timeout_ms {
350            return Err(Error::Config(ConfigError::Message(format!(
351                "{}: min_timeout_ms({}) > max_timeout_ms({})",
352                policy_name, self.min_timeout_ms, self.max_timeout_ms
353            ))));
354        }
355
356        if self.max_timeout_ms > MAX_RPC_TIMEOUT {
357            return Err(Error::Config(ConfigError::Message(format!(
358                "{}: max_timeout_ms({}) exceeds 24-hour limit",
359                policy_name, self.max_timeout_ms
360            ))));
361        }
362
363        Ok(())
364    }
365}
366
367impl RetryPolicies {
368    /// Validates all retry policies according to Raft protocol requirements
369    pub fn validate(&self) -> Result<()> {
370        self.validate_append_entries()?;
371        self.validate_election()?;
372        self.validate_membership()?;
373        self.validate_healthcheck()?;
374        self.validate_auto_discovery()?;
375        self.validate_join_cluster()?;
376        self.validate_install_snapshot()?;
377        self.validate_purge_log()?;
378        self.validate_internal_quorum()?;
379        Ok(())
380    }
381
382    fn validate_append_entries(&self) -> Result<()> {
383        self.append_entries.validate("append_entries")?;
384
385        Ok(())
386    }
387
388    fn validate_election(&self) -> Result<()> {
389        self.election.validate("election")?;
390
391        Ok(())
392    }
393
394    fn validate_membership(&self) -> Result<()> {
395        self.membership.validate("membership")?;
396
397        Ok(())
398    }
399
400    fn validate_healthcheck(&self) -> Result<()> {
401        self.healthcheck.validate("healthcheck")?;
402
403        Ok(())
404    }
405
406    fn validate_purge_log(&self) -> Result<()> {
407        self.purge_log.validate("purge_log")?;
408
409        Ok(())
410    }
411
412    fn validate_install_snapshot(&self) -> Result<()> {
413        self.install_snapshot.validate("install_snapshot")?;
414
415        Ok(())
416    }
417
418    fn validate_join_cluster(&self) -> Result<()> {
419        self.join_cluster.validate("join_cluster")?;
420
421        Ok(())
422    }
423
424    fn validate_auto_discovery(&self) -> Result<()> {
425        self.auto_discovery.validate("auto_discovery")?;
426
427        Ok(())
428    }
429
430    fn validate_internal_quorum(&self) -> Result<()> {
431        self.internal_quorum.validate("internal_quorum")?;
432
433        Ok(())
434    }
435}
436
437fn default_max_retries() -> usize {
438    3
439}
440fn default_op_timeout_ms() -> u64 {
441    100
442}
443fn default_base_delay_ms() -> u64 {
444    50
445}
446fn default_max_delay_ms() -> u64 {
447    1000
448}
449fn default_per_chunk_timeout_ms() -> u64 {
450    100
451}
452fn default_min_timeout_ms() -> u64 {
453    100
454}
455fn default_max_timeout_ms() -> u64 {
456    30_000
457}
458fn default_between_chunk_timeout_ms() -> u64 {
459    30_000
460}