Skip to main content

d_engine_core/config/
retry.rs

1use std::fmt::Debug;
2
3use config::ConfigError;
4use serde::Deserialize;
5use serde::Serialize;
6
7use crate::Error;
8use crate::Result;
9
10/// Configuration for exponential backoff retry strategy
11#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
12pub struct BackoffPolicy {
13    /// Maximum number of retries (0 means unlimited retries)
14    #[serde(default = "default_max_retries")]
15    pub max_retries: usize,
16
17    /// Single operation timeout (unit: milliseconds)
18    #[serde(default = "default_op_timeout_ms")]
19    pub timeout_ms: u64,
20
21    /// Backoff base (unit: milliseconds)
22    #[serde(default = "default_base_delay_ms")]
23    pub base_delay_ms: u64,
24
25    /// Maximum backoff time (unit: milliseconds)
26    #[serde(default = "default_max_delay_ms")]
27    pub max_delay_ms: u64,
28}
29
30/// Configuration for exponential backoff retry strategy
31#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
32pub struct InstallSnapshotBackoffPolicy {
33    /// Maximum number of retries (0 means unlimited retries)
34    #[serde(default = "default_max_retries")]
35    pub max_retries: usize,
36
37    /// Single operation timeout (unit: milliseconds)
38    #[serde(default = "default_op_timeout_ms")]
39    pub timeout_ms: u64,
40
41    /// Backoff base (unit: milliseconds)
42    #[serde(default = "default_base_delay_ms")]
43    pub base_delay_ms: u64,
44
45    /// Maximum backoff time (unit: milliseconds)
46    #[serde(default = "default_max_delay_ms")]
47    pub max_delay_ms: u64,
48
49    // New fields for snapshot transfers
50    /// Timeout per chunk during transfer (milliseconds)
51    #[serde(default = "default_per_chunk_timeout_ms")]
52    pub per_chunk_timeout_ms: u64,
53
54    /// Minimum overall timeout for snapshot RPC (milliseconds)
55    #[serde(default = "default_min_timeout_ms")]
56    pub min_timeout_ms: u64,
57
58    /// Maximum overall timeout for snapshot RPC (milliseconds)
59    #[serde(default = "default_max_timeout_ms")]
60    pub max_timeout_ms: u64,
61
62    /// Timeout between chunks on receiver side (milliseconds)
63    #[serde(default = "default_between_chunk_timeout_ms")]
64    pub between_chunk_timeout_ms: u64,
65}
66
67/// Domain-specific retry strategy configurations for Raft subsystems
68/// Enables fine-grained control over different RPC types and operations
69#[derive(Serialize, Deserialize, Clone)]
70pub struct RetryPolicies {
71    /// Retry policy for AppendEntries RPC operations
72    /// Governs log replication attempts between leader and followers
73    #[serde(default)]
74    pub append_entries: BackoffPolicy,
75
76    /// Retry policy for RequestVote RPC operations
77    /// Controls election-related communication retry behavior
78    #[serde(default)]
79    pub election: BackoffPolicy,
80
81    /// Retry policy for cluster membership changes
82    /// Requires higher reliability for configuration change operations
83    #[serde(default)]
84    pub membership: BackoffPolicy,
85
86    /// Retry policy for leader auto discovery request
87    #[serde(default)]
88    pub auto_discovery: BackoffPolicy,
89
90    /// Retry policy for join cluster request
91    #[serde(default)]
92    pub join_cluster: BackoffPolicy,
93
94    /// Retry policy for install snapshot requests
95    #[serde(default)]
96    pub install_snapshot: InstallSnapshotBackoffPolicy,
97
98    /// Retry policy for node health checks
99    /// Optimized for frequent liveness detection with lower overhead
100    #[serde(default)]
101    pub healthcheck: BackoffPolicy,
102
103    /// Retry policy for internal quorum verification
104    /// Used to confirm leadership status through internal consensus checks
105    #[serde(default)]
106    pub internal_quorum: BackoffPolicy,
107}
108
109impl Debug for RetryPolicies {
110    fn fmt(
111        &self,
112        f: &mut std::fmt::Formatter<'_>,
113    ) -> std::fmt::Result {
114        f.debug_struct("RetryPolicies").finish()
115    }
116}
117// Default value implementation
118impl Default for RetryPolicies {
119    fn default() -> Self {
120        Self {
121            append_entries: BackoffPolicy {
122                max_retries: 1,
123                timeout_ms: 100,
124                base_delay_ms: 50,
125                max_delay_ms: 1000,
126            },
127            election: BackoffPolicy {
128                // Note: `retries` > 3 might prevent a successful election.
129                max_retries: 3,
130                timeout_ms: 100,
131                base_delay_ms: 50,
132                max_delay_ms: 5000,
133            },
134            membership: BackoffPolicy {
135                max_retries: 120,
136                timeout_ms: 500,
137                base_delay_ms: 3000,
138                max_delay_ms: 60000,
139            },
140            healthcheck: BackoffPolicy {
141                max_retries: 10000,
142                timeout_ms: 100,
143                base_delay_ms: 1000,
144                max_delay_ms: 10000,
145            },
146
147            auto_discovery: BackoffPolicy {
148                max_retries: 3,
149                timeout_ms: 100,
150                base_delay_ms: 50,
151                max_delay_ms: 1000,
152            },
153
154            join_cluster: BackoffPolicy {
155                max_retries: 3,
156                timeout_ms: 500,
157                base_delay_ms: 3000,
158                max_delay_ms: 60000,
159            },
160
161            // Recommended configuration examples for different network scenarios:
162            //
163            // Scenario                      Recommended Settings
164            // ------------------------------------------------------------
165            // Local Data Center (Low Latency)     -> max_retries=3, timeout_ms=500,
166            // max_delay_ms=2000
167            //
168            // Cross-Region Network (High Latency) -> max_retries=10,
169            // timeout_ms=5000, max_delay_ms=10000
170            //
171            // Edge Network (Unstable)
172            // -> max_retries=10, timeout_ms=3000, max_delay_ms=30000
173            //
174            // Current config:
175            install_snapshot: InstallSnapshotBackoffPolicy {
176                max_retries: 3,
177                timeout_ms: 500,
178                base_delay_ms: 50,
179                max_delay_ms: 2000,
180                per_chunk_timeout_ms: default_per_chunk_timeout_ms(),
181                min_timeout_ms: default_min_timeout_ms(),
182                max_timeout_ms: default_max_timeout_ms(),
183                between_chunk_timeout_ms: default_between_chunk_timeout_ms(),
184            },
185
186            internal_quorum: BackoffPolicy {
187                // Minimum must be 3: the first quorum check may fail if the leader is newly elected
188                // and followers haven't yet advanced their next_index
189                max_retries: 3,
190                timeout_ms: 100,
191                base_delay_ms: 50,
192                max_delay_ms: 1000,
193            },
194        }
195    }
196}
197impl BackoffPolicy {
198    /// Validates backoff policy parameters
199    /// # Errors
200    /// Returns `Error::InvalidConfig` when:
201    /// - Timeout exceeds maximum delay
202    /// - Base delay > max delay
203    /// - Infinite retries without proper safeguards
204    pub fn validate(
205        &self,
206        policy_name: &str,
207    ) -> Result<()> {
208        // Validate retry limits
209        if self.max_retries == 0 {
210            return Err(Error::Config(ConfigError::Message(format!(
211                "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
212            ))));
213        }
214
215        // Validate timeout constraints
216        if self.timeout_ms == 0 {
217            return Err(Error::Config(ConfigError::Message(format!(
218                "{policy_name}: timeout_ms cannot be 0"
219            ))));
220        }
221
222        // Validate delay progression
223        if self.base_delay_ms >= self.max_delay_ms {
224            return Err(Error::Config(ConfigError::Message(format!(
225                "{}: base_delay_ms({}) must be less than max_delay_ms({})",
226                policy_name, self.base_delay_ms, self.max_delay_ms
227            ))));
228        }
229
230        // Ensure reasonable maximums
231        if self.max_delay_ms > 120_000 {
232            // 2 minutes
233            return Err(Error::Config(ConfigError::Message(format!(
234                "{}: max_delay_ms({}) exceeds 2min limit",
235                policy_name, self.max_delay_ms
236            ))));
237        }
238
239        Ok(())
240    }
241}
242
243impl InstallSnapshotBackoffPolicy {
244    /// Validates snapshot backoff policy parameters
245    /// # Errors
246    /// Returns `Error::InvalidConfig` when:
247    /// - Inherited backoff parameters fail validation (see BackoffPolicy)
248    /// - Timeouts for snapshot chunks/transfers are invalid
249    /// - Snapshot RPC timeout constraints are violated
250    pub fn validate(
251        &self,
252        policy_name: &str,
253    ) -> Result<()> {
254        // First validate common backoff parameters
255        self.validate_base_policy(policy_name)?;
256
257        // Validate chunk-related timeouts
258        self.validate_chunk_timeouts(policy_name)?;
259
260        // Validate snapshot RPC timeout constraints
261        self.validate_rpc_timeouts(policy_name)?;
262
263        Ok(())
264    }
265
266    /// Validate parameters inherited from BackoffPolicy
267    fn validate_base_policy(
268        &self,
269        policy_name: &str,
270    ) -> Result<()> {
271        if self.max_retries == 0 {
272            return Err(Error::Config(ConfigError::Message(format!(
273                "{policy_name}: max_retries=0 means infinite retries - dangerous for {policy_name} operations"
274            ))));
275        }
276
277        if self.timeout_ms == 0 {
278            return Err(Error::Config(ConfigError::Message(format!(
279                "{policy_name}: timeout_ms cannot be 0"
280            ))));
281        }
282
283        if self.base_delay_ms >= self.max_delay_ms {
284            return Err(Error::Config(ConfigError::Message(format!(
285                "{}: base_delay_ms({}) must be less than max_delay_ms({})",
286                policy_name, self.base_delay_ms, self.max_delay_ms
287            ))));
288        }
289
290        if self.max_delay_ms > 120_000 {
291            return Err(Error::Config(ConfigError::Message(format!(
292                "{}: max_delay_ms({}) exceeds 2min limit",
293                policy_name, self.max_delay_ms
294            ))));
295        }
296
297        Ok(())
298    }
299
300    /// Validate snapshot chunk transfer parameters
301    fn validate_chunk_timeouts(
302        &self,
303        policy_name: &str,
304    ) -> Result<()> {
305        if self.per_chunk_timeout_ms == 0 {
306            return Err(Error::Config(ConfigError::Message(format!(
307                "{policy_name}: per_chunk_timeout_ms cannot be 0"
308            ))));
309        }
310
311        if self.between_chunk_timeout_ms == 0 {
312            return Err(Error::Config(ConfigError::Message(format!(
313                "{policy_name}: between_chunk_timeout_ms cannot be 0"
314            ))));
315        }
316
317        Ok(())
318    }
319
320    /// Validate snapshot RPC timeout hierarchy
321    fn validate_rpc_timeouts(
322        &self,
323        policy_name: &str,
324    ) -> Result<()> {
325        const MAX_RPC_TIMEOUT: u64 = 86_400_000; // 24 hours
326
327        if self.min_timeout_ms == 0 {
328            return Err(Error::Config(ConfigError::Message(format!(
329                "{policy_name}: min_timeout_ms cannot be 0"
330            ))));
331        }
332
333        if self.max_timeout_ms == 0 {
334            return Err(Error::Config(ConfigError::Message(format!(
335                "{policy_name}: max_timeout_ms cannot be 0"
336            ))));
337        }
338
339        if self.min_timeout_ms > self.max_timeout_ms {
340            return Err(Error::Config(ConfigError::Message(format!(
341                "{}: min_timeout_ms({}) > max_timeout_ms({})",
342                policy_name, self.min_timeout_ms, self.max_timeout_ms
343            ))));
344        }
345
346        if self.max_timeout_ms > MAX_RPC_TIMEOUT {
347            return Err(Error::Config(ConfigError::Message(format!(
348                "{}: max_timeout_ms({}) exceeds 24-hour limit",
349                policy_name, self.max_timeout_ms
350            ))));
351        }
352
353        Ok(())
354    }
355}
356
357impl RetryPolicies {
358    /// Validates all retry policies according to Raft protocol requirements
359    pub fn validate(&self) -> Result<()> {
360        self.validate_append_entries()?;
361        self.validate_election()?;
362        self.validate_membership()?;
363        self.validate_healthcheck()?;
364        self.validate_auto_discovery()?;
365        self.validate_join_cluster()?;
366        self.validate_install_snapshot()?;
367        self.validate_internal_quorum()?;
368        Ok(())
369    }
370
371    fn validate_append_entries(&self) -> Result<()> {
372        self.append_entries.validate("append_entries")?;
373
374        Ok(())
375    }
376
377    fn validate_election(&self) -> Result<()> {
378        self.election.validate("election")?;
379
380        Ok(())
381    }
382
383    fn validate_membership(&self) -> Result<()> {
384        self.membership.validate("membership")?;
385
386        Ok(())
387    }
388
389    fn validate_healthcheck(&self) -> Result<()> {
390        self.healthcheck.validate("healthcheck")?;
391
392        Ok(())
393    }
394
395    fn validate_install_snapshot(&self) -> Result<()> {
396        self.install_snapshot.validate("install_snapshot")?;
397
398        Ok(())
399    }
400
401    fn validate_join_cluster(&self) -> Result<()> {
402        self.join_cluster.validate("join_cluster")?;
403
404        Ok(())
405    }
406
407    fn validate_auto_discovery(&self) -> Result<()> {
408        self.auto_discovery.validate("auto_discovery")?;
409
410        Ok(())
411    }
412
413    fn validate_internal_quorum(&self) -> Result<()> {
414        self.internal_quorum.validate("internal_quorum")?;
415
416        Ok(())
417    }
418}
419
420fn default_max_retries() -> usize {
421    3
422}
423fn default_op_timeout_ms() -> u64 {
424    100
425}
426fn default_base_delay_ms() -> u64 {
427    50
428}
429fn default_max_delay_ms() -> u64 {
430    1000
431}
432fn default_per_chunk_timeout_ms() -> u64 {
433    100
434}
435fn default_min_timeout_ms() -> u64 {
436    100
437}
438fn default_max_timeout_ms() -> u64 {
439    30_000
440}
441fn default_between_chunk_timeout_ms() -> u64 {
442    30_000
443}