1use std::fmt::Debug;
2
3use serde::Deserialize;
4use serde::Serialize;
5
6use crate::Error;
7use crate::Result;
8
9#[derive(Serialize, Deserialize, Clone)]
11pub struct RaftConfig {
12 #[serde(default)]
15 pub replication: ReplicationConfig,
16
17 #[serde(default)]
20 pub election: ElectionConfig,
21
22 #[serde(default)]
25 pub membership: MembershipConfig,
26
27 #[serde(default)]
30 pub commit_handler: CommitHandlerConfig,
31
32 #[serde(default = "default_learner_gap")]
36 pub learner_raft_log_gap: u64,
37
38 #[serde(default = "default_general_timeout")]
42 pub general_raft_timeout_duration_in_ms: u64,
43}
44
45impl Debug for RaftConfig {
46 fn fmt(
47 &self,
48 f: &mut std::fmt::Formatter<'_>,
49 ) -> std::fmt::Result {
50 f.debug_struct("RaftConfig").finish()
51 }
52}
53impl Default for RaftConfig {
54 fn default() -> Self {
55 Self {
56 replication: ReplicationConfig::default(),
57 election: ElectionConfig::default(),
58 membership: MembershipConfig::default(),
59 commit_handler: CommitHandlerConfig::default(),
60 learner_raft_log_gap: default_learner_gap(),
61 general_raft_timeout_duration_in_ms: default_general_timeout(),
62 }
63 }
64}
65impl RaftConfig {
66 pub fn validate(&self) -> Result<()> {
68 if self.learner_raft_log_gap == 0 {
69 return Err(Error::InvalidConfig(
70 "learner_raft_log_gap must be greater than 0".into(),
71 ));
72 }
73
74 if self.general_raft_timeout_duration_in_ms < 1 {
75 return Err(Error::InvalidConfig(
76 "general_raft_timeout_duration_in_ms must be at least 1ms".into(),
77 ));
78 }
79
80 self.replication.validate()?;
81 self.election.validate()?;
82 self.membership.validate()?;
83 self.commit_handler.validate()?;
84
85 Ok(())
86 }
87}
88
89fn default_learner_gap() -> u64 {
90 10
91}
92fn default_general_timeout() -> u64 {
94 50
95}
96
97#[derive(Debug, Serialize, Deserialize, Clone)]
98pub struct ReplicationConfig {
99 #[serde(default = "default_append_interval")]
100 pub rpc_append_entries_clock_in_ms: u64,
101
102 #[serde(default = "default_batch_threshold")]
103 pub rpc_append_entries_in_batch_threshold: usize,
104
105 #[serde(default = "default_batch_delay")]
106 pub rpc_append_entries_batch_process_delay_in_ms: u64,
107
108 #[serde(default = "default_entries_per_replication")]
109 pub append_entries_max_entries_per_replication: u64,
110}
111
112impl Default for ReplicationConfig {
113 fn default() -> Self {
114 Self {
115 rpc_append_entries_clock_in_ms: default_append_interval(),
116 rpc_append_entries_in_batch_threshold: default_batch_threshold(),
117 rpc_append_entries_batch_process_delay_in_ms: default_batch_delay(),
118 append_entries_max_entries_per_replication: default_entries_per_replication(),
119 }
120 }
121}
122impl ReplicationConfig {
123 fn validate(&self) -> Result<()> {
124 if self.rpc_append_entries_clock_in_ms == 0 {
125 return Err(Error::InvalidConfig(
126 "rpc_append_entries_clock_in_ms cannot be 0".into(),
127 ));
128 }
129
130 if self.rpc_append_entries_in_batch_threshold == 0 {
131 return Err(Error::InvalidConfig(
132 "rpc_append_entries_in_batch_threshold must be > 0".into(),
133 ));
134 }
135
136 if self.append_entries_max_entries_per_replication == 0 {
137 return Err(Error::InvalidConfig(
138 "append_entries_max_entries_per_replication must be > 0".into(),
139 ));
140 }
141
142 if self.rpc_append_entries_batch_process_delay_in_ms >= self.rpc_append_entries_clock_in_ms {
143 return Err(Error::InvalidConfig(format!(
144 "batch_delay {}ms should be less than append_interval {}ms",
145 self.rpc_append_entries_batch_process_delay_in_ms, self.rpc_append_entries_clock_in_ms
146 )));
147 }
148
149 Ok(())
150 }
151}
152fn default_append_interval() -> u64 {
153 100
154}
155fn default_batch_threshold() -> usize {
156 100
157}
158fn default_batch_delay() -> u64 {
159 1
160}
161fn default_entries_per_replication() -> u64 {
162 100
163}
164#[derive(Debug, Serialize, Deserialize, Clone)]
165pub struct ElectionConfig {
166 #[serde(default = "default_election_timeout_min")]
167 pub election_timeout_min: u64,
168
169 #[serde(default = "default_election_timeout_max")]
170 pub election_timeout_max: u64,
171
172 #[serde(default = "default_peer_monitor_interval")]
173 pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
174
175 #[serde(default = "default_client_request_id")]
176 pub internal_rpc_client_request_id: u32,
177}
178
179impl Default for ElectionConfig {
180 fn default() -> Self {
181 Self {
182 election_timeout_min: default_election_timeout_min(),
183 election_timeout_max: default_election_timeout_max(),
184 rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
185 internal_rpc_client_request_id: default_client_request_id(),
186 }
187 }
188}
189impl ElectionConfig {
190 fn validate(&self) -> Result<()> {
191 if self.election_timeout_min >= self.election_timeout_max {
192 return Err(Error::InvalidConfig(format!(
193 "election_timeout_min {}ms must be less than election_timeout_max {}ms",
194 self.election_timeout_min, self.election_timeout_max
195 )));
196 }
197
198 if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
199 return Err(Error::InvalidConfig(
200 "rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
201 ));
202 }
203
204 Ok(())
205 }
206}
207fn default_election_timeout_min() -> u64 {
208 500
209}
210fn default_election_timeout_max() -> u64 {
211 1000
212}
213fn default_peer_monitor_interval() -> u64 {
214 30
215}
216fn default_client_request_id() -> u32 {
217 0
218}
219
220#[derive(Debug, Serialize, Deserialize, Clone)]
221pub struct MembershipConfig {
222 #[serde(default = "default_probe_service")]
223 pub cluster_healthcheck_probe_service_name: String,
224}
225impl Default for MembershipConfig {
226 fn default() -> Self {
227 Self {
228 cluster_healthcheck_probe_service_name: default_probe_service(),
229 }
230 }
231}
232fn default_probe_service() -> String {
233 "rpc_service.RpcService".to_string()
234}
235
236impl MembershipConfig {
237 fn validate(&self) -> Result<()> {
238 if self.cluster_healthcheck_probe_service_name.is_empty() {
239 return Err(Error::InvalidConfig(
240 "cluster_healthcheck_probe_service_name cannot be empty".into(),
241 ));
242 }
243 Ok(())
244 }
245}
246
247#[derive(Debug, Serialize, Deserialize, Clone)]
249pub struct CommitHandlerConfig {
250 #[serde(default = "default_batch_size")]
251 pub batch_size: u64,
252
253 #[serde(default = "default_process_interval_ms")]
254 pub process_interval_ms: u64,
255
256 #[serde(default = "default_max_entries_per_chunk")]
257 pub max_entries_per_chunk: usize,
258}
259impl Default for CommitHandlerConfig {
260 fn default() -> Self {
261 Self {
262 batch_size: default_batch_size(),
263 process_interval_ms: default_process_interval_ms(),
264 max_entries_per_chunk: default_max_entries_per_chunk(),
265 }
266 }
267}
268impl CommitHandlerConfig {
269 fn validate(&self) -> Result<()> {
270 if self.batch_size == 0 {
271 return Err(Error::InvalidConfig("batch_size must be > 0".into()));
272 }
273
274 if self.process_interval_ms == 0 {
275 return Err(Error::InvalidConfig("process_interval_ms must be > 0".into()));
276 }
277
278 if self.max_entries_per_chunk == 0 {
279 return Err(Error::InvalidConfig("max_entries_per_chunk must be > 0".into()));
280 }
281
282 Ok(())
283 }
284}
285fn default_batch_size() -> u64 {
286 100
287}
288fn default_process_interval_ms() -> u64 {
289 10
290}
291fn default_max_entries_per_chunk() -> usize {
292 100
293}