d_engine/config/
raft.rs

1use std::fmt::Debug;
2
3use serde::Deserialize;
4use serde::Serialize;
5
6use crate::Error;
7use crate::Result;
8
9/// Configuration parameters for the Raft consensus algorithm implementation
10#[derive(Serialize, Deserialize, Clone)]
11pub struct RaftConfig {
12    /// Configuration settings related to log replication
13    /// Includes parameters like replication batch size and network retry behavior
14    #[serde(default)]
15    pub replication: ReplicationConfig,
16
17    /// Configuration settings for leader election mechanism
18    /// Controls timeouts and randomization factors for election timing
19    #[serde(default)]
20    pub election: ElectionConfig,
21
22    /// Configuration settings for cluster membership changes
23    /// Handles joint consensus transitions and cluster reconfiguration rules
24    #[serde(default)]
25    pub membership: MembershipConfig,
26
27    /// Configuration settings for commit application handling
28    /// Controls how committed log entries are applied to the state machine
29    #[serde(default)]
30    pub commit_handler: CommitHandlerConfig,
31
32    /// Maximum allowed log entry gap between leader and learner nodes
33    /// Learners with larger gaps than this value will trigger catch-up replication
34    /// Default value is set via default_learner_gap() function
35    #[serde(default = "default_learner_gap")]
36    pub learner_raft_log_gap: u64,
37
38    /// Base timeout duration (in milliseconds) for general Raft operations
39    /// Used as fallback timeout when operation-specific timeouts are not set
40    /// Default value is set via default_general_timeout() function
41    #[serde(default = "default_general_timeout")]
42    pub general_raft_timeout_duration_in_ms: u64,
43}
44
45impl Debug for RaftConfig {
46    fn fmt(
47        &self,
48        f: &mut std::fmt::Formatter<'_>,
49    ) -> std::fmt::Result {
50        f.debug_struct("RaftConfig").finish()
51    }
52}
53impl Default for RaftConfig {
54    fn default() -> Self {
55        Self {
56            replication: ReplicationConfig::default(),
57            election: ElectionConfig::default(),
58            membership: MembershipConfig::default(),
59            commit_handler: CommitHandlerConfig::default(),
60            learner_raft_log_gap: default_learner_gap(),
61            general_raft_timeout_duration_in_ms: default_general_timeout(),
62        }
63    }
64}
65impl RaftConfig {
66    /// Validates all Raft subsystem configurations
67    pub fn validate(&self) -> Result<()> {
68        if self.learner_raft_log_gap == 0 {
69            return Err(Error::InvalidConfig(
70                "learner_raft_log_gap must be greater than 0".into(),
71            ));
72        }
73
74        if self.general_raft_timeout_duration_in_ms < 1 {
75            return Err(Error::InvalidConfig(
76                "general_raft_timeout_duration_in_ms must be at least 1ms".into(),
77            ));
78        }
79
80        self.replication.validate()?;
81        self.election.validate()?;
82        self.membership.validate()?;
83        self.commit_handler.validate()?;
84
85        Ok(())
86    }
87}
88
89fn default_learner_gap() -> u64 {
90    10
91}
92// in ms
93fn default_general_timeout() -> u64 {
94    50
95}
96
97#[derive(Debug, Serialize, Deserialize, Clone)]
98pub struct ReplicationConfig {
99    #[serde(default = "default_append_interval")]
100    pub rpc_append_entries_clock_in_ms: u64,
101
102    #[serde(default = "default_batch_threshold")]
103    pub rpc_append_entries_in_batch_threshold: usize,
104
105    #[serde(default = "default_batch_delay")]
106    pub rpc_append_entries_batch_process_delay_in_ms: u64,
107
108    #[serde(default = "default_entries_per_replication")]
109    pub append_entries_max_entries_per_replication: u64,
110}
111
112impl Default for ReplicationConfig {
113    fn default() -> Self {
114        Self {
115            rpc_append_entries_clock_in_ms: default_append_interval(),
116            rpc_append_entries_in_batch_threshold: default_batch_threshold(),
117            rpc_append_entries_batch_process_delay_in_ms: default_batch_delay(),
118            append_entries_max_entries_per_replication: default_entries_per_replication(),
119        }
120    }
121}
122impl ReplicationConfig {
123    fn validate(&self) -> Result<()> {
124        if self.rpc_append_entries_clock_in_ms == 0 {
125            return Err(Error::InvalidConfig(
126                "rpc_append_entries_clock_in_ms cannot be 0".into(),
127            ));
128        }
129
130        if self.rpc_append_entries_in_batch_threshold == 0 {
131            return Err(Error::InvalidConfig(
132                "rpc_append_entries_in_batch_threshold must be > 0".into(),
133            ));
134        }
135
136        if self.append_entries_max_entries_per_replication == 0 {
137            return Err(Error::InvalidConfig(
138                "append_entries_max_entries_per_replication must be > 0".into(),
139            ));
140        }
141
142        if self.rpc_append_entries_batch_process_delay_in_ms >= self.rpc_append_entries_clock_in_ms {
143            return Err(Error::InvalidConfig(format!(
144                "batch_delay {}ms should be less than append_interval {}ms",
145                self.rpc_append_entries_batch_process_delay_in_ms, self.rpc_append_entries_clock_in_ms
146            )));
147        }
148
149        Ok(())
150    }
151}
152fn default_append_interval() -> u64 {
153    100
154}
155fn default_batch_threshold() -> usize {
156    100
157}
158fn default_batch_delay() -> u64 {
159    1
160}
161fn default_entries_per_replication() -> u64 {
162    100
163}
164#[derive(Debug, Serialize, Deserialize, Clone)]
165pub struct ElectionConfig {
166    #[serde(default = "default_election_timeout_min")]
167    pub election_timeout_min: u64,
168
169    #[serde(default = "default_election_timeout_max")]
170    pub election_timeout_max: u64,
171
172    #[serde(default = "default_peer_monitor_interval")]
173    pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
174
175    #[serde(default = "default_client_request_id")]
176    pub internal_rpc_client_request_id: u32,
177}
178
179impl Default for ElectionConfig {
180    fn default() -> Self {
181        Self {
182            election_timeout_min: default_election_timeout_min(),
183            election_timeout_max: default_election_timeout_max(),
184            rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
185            internal_rpc_client_request_id: default_client_request_id(),
186        }
187    }
188}
189impl ElectionConfig {
190    fn validate(&self) -> Result<()> {
191        if self.election_timeout_min >= self.election_timeout_max {
192            return Err(Error::InvalidConfig(format!(
193                "election_timeout_min {}ms must be less than election_timeout_max {}ms",
194                self.election_timeout_min, self.election_timeout_max
195            )));
196        }
197
198        if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
199            return Err(Error::InvalidConfig(
200                "rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
201            ));
202        }
203
204        Ok(())
205    }
206}
207fn default_election_timeout_min() -> u64 {
208    500
209}
210fn default_election_timeout_max() -> u64 {
211    1000
212}
213fn default_peer_monitor_interval() -> u64 {
214    30
215}
216fn default_client_request_id() -> u32 {
217    0
218}
219
220#[derive(Debug, Serialize, Deserialize, Clone)]
221pub struct MembershipConfig {
222    #[serde(default = "default_probe_service")]
223    pub cluster_healthcheck_probe_service_name: String,
224}
225impl Default for MembershipConfig {
226    fn default() -> Self {
227        Self {
228            cluster_healthcheck_probe_service_name: default_probe_service(),
229        }
230    }
231}
232fn default_probe_service() -> String {
233    "rpc_service.RpcService".to_string()
234}
235
236impl MembershipConfig {
237    fn validate(&self) -> Result<()> {
238        if self.cluster_healthcheck_probe_service_name.is_empty() {
239            return Err(Error::InvalidConfig(
240                "cluster_healthcheck_probe_service_name cannot be empty".into(),
241            ));
242        }
243        Ok(())
244    }
245}
246
247/// Submit processor-specific configuration
248#[derive(Debug, Serialize, Deserialize, Clone)]
249pub struct CommitHandlerConfig {
250    #[serde(default = "default_batch_size")]
251    pub batch_size: u64,
252
253    #[serde(default = "default_process_interval_ms")]
254    pub process_interval_ms: u64,
255
256    #[serde(default = "default_max_entries_per_chunk")]
257    pub max_entries_per_chunk: usize,
258}
259impl Default for CommitHandlerConfig {
260    fn default() -> Self {
261        Self {
262            batch_size: default_batch_size(),
263            process_interval_ms: default_process_interval_ms(),
264            max_entries_per_chunk: default_max_entries_per_chunk(),
265        }
266    }
267}
268impl CommitHandlerConfig {
269    fn validate(&self) -> Result<()> {
270        if self.batch_size == 0 {
271            return Err(Error::InvalidConfig("batch_size must be > 0".into()));
272        }
273
274        if self.process_interval_ms == 0 {
275            return Err(Error::InvalidConfig("process_interval_ms must be > 0".into()));
276        }
277
278        if self.max_entries_per_chunk == 0 {
279            return Err(Error::InvalidConfig("max_entries_per_chunk must be > 0".into()));
280        }
281
282        Ok(())
283    }
284}
285fn default_batch_size() -> u64 {
286    100
287}
288fn default_process_interval_ms() -> u64 {
289    10
290}
291fn default_max_entries_per_chunk() -> usize {
292    100
293}