1use std::fmt::Debug;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use config::ConfigError;
6use serde::Deserialize;
7use serde::Serialize;
8use tracing::warn;
9
10use super::lease::LeaseConfig;
11use super::validate_directory;
12use crate::Error;
13use crate::Result;
14
15#[derive(Serialize, Deserialize, Clone)]
17pub struct RaftConfig {
18 #[serde(default)]
21 pub replication: ReplicationConfig,
22
23 #[serde(default)]
26 pub election: ElectionConfig,
27
28 #[serde(default)]
31 pub membership: MembershipConfig,
32
33 #[serde(default)]
36 pub commit_handler: CommitHandlerConfig,
37
38 #[serde(default, alias = "storage")]
42 pub state_machine: StateMachineConfig,
43
44 #[serde(default)]
46 pub snapshot: SnapshotConfig,
47
48 #[serde(default)]
51 pub persistence: PersistenceConfig,
52
53 #[serde(default = "default_learner_catchup_threshold")]
57 pub learner_catchup_threshold: u64,
58
59 #[serde(default = "default_learner_check_throttle_ms")]
63 pub learner_check_throttle_ms: u64,
64
65 #[serde(default = "default_general_timeout")]
69 pub general_raft_timeout_duration_in_ms: u64,
70
71 #[serde(default = "default_snapshot_rpc_timeout_ms")]
73 pub snapshot_rpc_timeout_ms: u64,
74
75 #[serde(default)]
77 pub auto_join: AutoJoinConfig,
78
79 #[serde(default)]
82 pub read_consistency: ReadConsistencyConfig,
83
84 #[serde(default)]
90 pub rpc_compression: RpcCompressionConfig,
91
92 #[serde(default)]
95 pub watch: WatchConfig,
96}
97
98impl Debug for RaftConfig {
99 fn fmt(
100 &self,
101 f: &mut std::fmt::Formatter<'_>,
102 ) -> std::fmt::Result {
103 f.debug_struct("RaftConfig").finish()
104 }
105}
106impl Default for RaftConfig {
107 fn default() -> Self {
108 Self {
109 replication: ReplicationConfig::default(),
110 election: ElectionConfig::default(),
111 membership: MembershipConfig::default(),
112 commit_handler: CommitHandlerConfig::default(),
113 state_machine: StateMachineConfig::default(),
114 snapshot: SnapshotConfig::default(),
115 persistence: PersistenceConfig::default(),
116 learner_catchup_threshold: default_learner_catchup_threshold(),
117 learner_check_throttle_ms: default_learner_check_throttle_ms(),
118 general_raft_timeout_duration_in_ms: default_general_timeout(),
119 auto_join: AutoJoinConfig::default(),
120 snapshot_rpc_timeout_ms: default_snapshot_rpc_timeout_ms(),
121 read_consistency: ReadConsistencyConfig::default(),
122 rpc_compression: RpcCompressionConfig::default(),
123 watch: WatchConfig::default(),
124 }
125 }
126}
127impl RaftConfig {
128 pub fn validate(&self) -> Result<()> {
130 if self.learner_catchup_threshold == 0 {
131 return Err(Error::Config(ConfigError::Message(
132 "learner_catchup_threshold must be greater than 0".into(),
133 )));
134 }
135
136 if self.general_raft_timeout_duration_in_ms < 1 {
137 return Err(Error::Config(ConfigError::Message(
138 "general_raft_timeout_duration_in_ms must be at least 1ms".into(),
139 )));
140 }
141
142 self.replication.validate()?;
143 self.election.validate()?;
144 self.membership.validate()?;
145 self.commit_handler.validate()?;
146 self.state_machine.validate()?;
147 self.snapshot.validate()?;
148 self.read_consistency.validate()?;
149 self.watch.validate()?;
150
151 if self.read_consistency.lease_duration_ms > self.election.election_timeout_min / 2 {
153 warn!(
154 "read_consistency.lease_duration_ms ({}) is greater than half of election_timeout_min ({}ms). \
155 This may cause lease expiration during normal operation.",
156 self.read_consistency.lease_duration_ms,
157 self.election.election_timeout_min / 2
158 );
159 }
160
161 Ok(())
162 }
163}
164
165fn default_learner_catchup_threshold() -> u64 {
166 1
167}
168
169fn default_learner_check_throttle_ms() -> u64 {
170 1000 }
172
173fn default_general_timeout() -> u64 {
175 50
176}
177fn default_snapshot_rpc_timeout_ms() -> u64 {
178 3_600_000
180}
181#[derive(Debug, Serialize, Deserialize, Clone)]
182pub struct ReplicationConfig {
183 #[serde(default = "default_append_interval")]
184 pub rpc_append_entries_clock_in_ms: u64,
185
186 #[serde(default = "default_batch_threshold")]
187 pub rpc_append_entries_in_batch_threshold: usize,
188
189 #[serde(default = "default_batch_delay")]
190 pub rpc_append_entries_batch_process_delay_in_ms: u64,
191
192 #[serde(default = "default_entries_per_replication")]
193 pub append_entries_max_entries_per_replication: u64,
194}
195
196impl Default for ReplicationConfig {
197 fn default() -> Self {
198 Self {
199 rpc_append_entries_clock_in_ms: default_append_interval(),
200 rpc_append_entries_in_batch_threshold: default_batch_threshold(),
201 rpc_append_entries_batch_process_delay_in_ms: default_batch_delay(),
202 append_entries_max_entries_per_replication: default_entries_per_replication(),
203 }
204 }
205}
206impl ReplicationConfig {
207 fn validate(&self) -> Result<()> {
208 if self.rpc_append_entries_clock_in_ms == 0 {
209 return Err(Error::Config(ConfigError::Message(
210 "rpc_append_entries_clock_in_ms cannot be 0".into(),
211 )));
212 }
213
214 if self.rpc_append_entries_in_batch_threshold == 0 {
215 return Err(Error::Config(ConfigError::Message(
216 "rpc_append_entries_in_batch_threshold must be > 0".into(),
217 )));
218 }
219
220 if self.append_entries_max_entries_per_replication == 0 {
221 return Err(Error::Config(ConfigError::Message(
222 "append_entries_max_entries_per_replication must be > 0".into(),
223 )));
224 }
225
226 if self.rpc_append_entries_batch_process_delay_in_ms >= self.rpc_append_entries_clock_in_ms
227 {
228 return Err(Error::Config(ConfigError::Message(format!(
229 "batch_delay {}ms should be less than append_interval {}ms",
230 self.rpc_append_entries_batch_process_delay_in_ms,
231 self.rpc_append_entries_clock_in_ms
232 ))));
233 }
234
235 Ok(())
236 }
237}
238fn default_append_interval() -> u64 {
239 100
240}
241fn default_batch_threshold() -> usize {
242 100
243}
244fn default_batch_delay() -> u64 {
245 1
246}
247fn default_entries_per_replication() -> u64 {
248 100
249}
250#[derive(Debug, Serialize, Deserialize, Clone)]
251pub struct ElectionConfig {
252 #[serde(default = "default_election_timeout_min")]
253 pub election_timeout_min: u64,
254
255 #[serde(default = "default_election_timeout_max")]
256 pub election_timeout_max: u64,
257
258 #[serde(default = "default_peer_monitor_interval")]
259 pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
260
261 #[serde(default = "default_client_request_id")]
262 pub internal_rpc_client_request_id: u32,
263}
264
265impl Default for ElectionConfig {
266 fn default() -> Self {
267 Self {
268 election_timeout_min: default_election_timeout_min(),
269 election_timeout_max: default_election_timeout_max(),
270 rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
271 internal_rpc_client_request_id: default_client_request_id(),
272 }
273 }
274}
275impl ElectionConfig {
276 fn validate(&self) -> Result<()> {
277 if self.election_timeout_min >= self.election_timeout_max {
278 return Err(Error::Config(ConfigError::Message(format!(
279 "election_timeout_min {}ms must be less than election_timeout_max {}ms",
280 self.election_timeout_min, self.election_timeout_max
281 ))));
282 }
283
284 if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
285 return Err(Error::Config(ConfigError::Message(
286 "rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
287 )));
288 }
289
290 Ok(())
291 }
292}
293fn default_election_timeout_min() -> u64 {
294 500
295}
296fn default_election_timeout_max() -> u64 {
297 1000
298}
299fn default_peer_monitor_interval() -> u64 {
300 30
301}
302fn default_client_request_id() -> u32 {
303 0
304}
305
306#[derive(Debug, Serialize, Deserialize, Clone)]
307pub struct MembershipConfig {
308 #[serde(default = "default_probe_service")]
309 pub cluster_healthcheck_probe_service_name: String,
310
311 #[serde(default = "default_verify_leadership_persistent_timeout")]
312 pub verify_leadership_persistent_timeout: Duration,
313
314 #[serde(default = "default_membership_maintenance_interval")]
315 pub membership_maintenance_interval: Duration,
316
317 #[serde(default)]
318 pub zombie: ZombieConfig,
319
320 #[serde(default)]
322 pub promotion: PromotionConfig,
323}
324impl Default for MembershipConfig {
325 fn default() -> Self {
326 Self {
327 cluster_healthcheck_probe_service_name: default_probe_service(),
328 verify_leadership_persistent_timeout: default_verify_leadership_persistent_timeout(),
329 membership_maintenance_interval: default_membership_maintenance_interval(),
330 zombie: ZombieConfig::default(),
331 promotion: PromotionConfig::default(),
332 }
333 }
334}
335fn default_probe_service() -> String {
336 "d_engine.server.cluster.ClusterManagementService".to_string()
337}
338
339fn default_membership_maintenance_interval() -> Duration {
341 Duration::from_secs(30)
342}
343
344fn default_verify_leadership_persistent_timeout() -> Duration {
351 Duration::from_secs(3600)
352}
353
354impl MembershipConfig {
355 fn validate(&self) -> Result<()> {
356 if self.cluster_healthcheck_probe_service_name.is_empty() {
357 return Err(Error::Config(ConfigError::Message(
358 "cluster_healthcheck_probe_service_name cannot be empty".into(),
359 )));
360 }
361 Ok(())
362 }
363}
364
365#[derive(Debug, Serialize, Deserialize, Clone)]
367pub struct CommitHandlerConfig {
368 #[serde(default = "default_batch_size_threshold")]
369 pub batch_size_threshold: u64,
370
371 #[serde(default = "default_process_interval_ms")]
372 pub process_interval_ms: u64,
373
374 #[serde(default = "default_max_entries_per_chunk")]
375 pub max_entries_per_chunk: usize,
376}
377impl Default for CommitHandlerConfig {
378 fn default() -> Self {
379 Self {
380 batch_size_threshold: default_batch_size_threshold(),
381 process_interval_ms: default_process_interval_ms(),
382 max_entries_per_chunk: default_max_entries_per_chunk(),
383 }
384 }
385}
386impl CommitHandlerConfig {
387 fn validate(&self) -> Result<()> {
388 if self.batch_size_threshold == 0 {
389 return Err(Error::Config(ConfigError::Message(
390 "batch_size_threshold must be > 0".into(),
391 )));
392 }
393
394 if self.process_interval_ms == 0 {
395 return Err(Error::Config(ConfigError::Message(
396 "process_interval_ms must be > 0".into(),
397 )));
398 }
399
400 if self.max_entries_per_chunk == 0 {
401 return Err(Error::Config(ConfigError::Message(
402 "max_entries_per_chunk must be > 0".into(),
403 )));
404 }
405
406 Ok(())
407 }
408}
409fn default_batch_size_threshold() -> u64 {
410 100
411}
412fn default_process_interval_ms() -> u64 {
413 10
414}
415fn default_max_entries_per_chunk() -> usize {
416 10
417}
418
419#[derive(Serialize, Deserialize, Clone, Debug)]
425#[derive(Default)]
426pub struct StateMachineConfig {
427 #[serde(alias = "ttl")]
431 pub lease: LeaseConfig,
432}
433
434impl StateMachineConfig {
435 pub fn validate(&self) -> Result<()> {
436 self.lease.validate()?;
437 Ok(())
438 }
439}
440
441#[derive(Debug, Serialize, Deserialize, Clone)]
443pub struct SnapshotConfig {
444 #[serde(default = "default_snapshot_enabled")]
446 pub enable: bool,
447
448 #[serde(default = "default_max_log_entries_before_snapshot")]
451 pub max_log_entries_before_snapshot: u64,
452
453 #[serde(default = "default_snapshot_cool_down_since_last_check")]
456 pub snapshot_cool_down_since_last_check: Duration,
457
458 #[serde(default = "default_cleanup_retain_count")]
461 pub cleanup_retain_count: u64,
462
463 #[serde(default = "default_snapshots_dir")]
467 pub snapshots_dir: PathBuf,
468
469 #[serde(default = "default_snapshots_dir_prefix")]
470 pub snapshots_dir_prefix: String,
471
472 #[serde(default = "default_chunk_size")]
476 pub chunk_size: usize,
477
478 #[serde(default = "default_retained_log_entries")]
480 pub retained_log_entries: u64,
481
482 #[serde(default = "default_sender_yield_every_n_chunks")]
484 pub sender_yield_every_n_chunks: usize,
485
486 #[serde(default = "default_receiver_yield_every_n_chunks")]
488 pub receiver_yield_every_n_chunks: usize,
489
490 #[serde(default = "default_max_bandwidth_mbps")]
491 pub max_bandwidth_mbps: u32,
492
493 #[serde(default = "default_push_queue_size")]
494 pub push_queue_size: usize,
495
496 #[serde(default = "default_cache_size")]
497 pub cache_size: usize,
498
499 #[serde(default = "default_max_retries")]
500 pub max_retries: u32,
501
502 #[serde(default = "default_transfer_timeout_in_sec")]
503 pub transfer_timeout_in_sec: u64,
504
505 #[serde(default = "default_retry_interval_in_ms")]
506 pub retry_interval_in_ms: u64,
507
508 #[serde(default = "default_snapshot_push_backoff_in_ms")]
509 pub snapshot_push_backoff_in_ms: u64,
510
511 #[serde(default = "default_snapshot_push_max_retry")]
512 pub snapshot_push_max_retry: u32,
513
514 #[serde(default = "default_push_timeout_in_ms")]
515 pub push_timeout_in_ms: u64,
516}
517impl Default for SnapshotConfig {
518 fn default() -> Self {
519 Self {
520 max_log_entries_before_snapshot: default_max_log_entries_before_snapshot(),
521 snapshot_cool_down_since_last_check: default_snapshot_cool_down_since_last_check(),
522 cleanup_retain_count: default_cleanup_retain_count(),
523 snapshots_dir: default_snapshots_dir(),
524 snapshots_dir_prefix: default_snapshots_dir_prefix(),
525 chunk_size: default_chunk_size(),
526 retained_log_entries: default_retained_log_entries(),
527 sender_yield_every_n_chunks: default_sender_yield_every_n_chunks(),
528 receiver_yield_every_n_chunks: default_receiver_yield_every_n_chunks(),
529 max_bandwidth_mbps: default_max_bandwidth_mbps(),
530 push_queue_size: default_push_queue_size(),
531 cache_size: default_cache_size(),
532 max_retries: default_max_retries(),
533 transfer_timeout_in_sec: default_transfer_timeout_in_sec(),
534 retry_interval_in_ms: default_retry_interval_in_ms(),
535 snapshot_push_backoff_in_ms: default_snapshot_push_backoff_in_ms(),
536 snapshot_push_max_retry: default_snapshot_push_max_retry(),
537 push_timeout_in_ms: default_push_timeout_in_ms(),
538 enable: default_snapshot_enabled(),
539 }
540 }
541}
542impl SnapshotConfig {
543 fn validate(&self) -> Result<()> {
544 if self.max_log_entries_before_snapshot == 0 {
545 return Err(Error::Config(ConfigError::Message(
546 "max_log_entries_before_snapshot must be greater than 0".into(),
547 )));
548 }
549
550 if self.cleanup_retain_count == 0 {
551 return Err(Error::Config(ConfigError::Message(
552 "cleanup_retain_count must be greater than 0".into(),
553 )));
554 }
555 validate_directory(&self.snapshots_dir, "snapshots_dir")?;
557
558 if self.chunk_size == 0 {
560 return Err(Error::Config(ConfigError::Message(format!(
561 "chunk_size must be at least {} bytes (got {})",
562 0, self.chunk_size
563 ))));
564 }
565
566 if self.retained_log_entries < 1 {
567 return Err(Error::Config(ConfigError::Message(format!(
568 "retained_log_entries must be >= 1, (got {})",
569 self.retained_log_entries
570 ))));
571 }
572
573 if self.sender_yield_every_n_chunks < 1 {
574 return Err(Error::Config(ConfigError::Message(format!(
575 "sender_yield_every_n_chunks must be >= 1, (got {})",
576 self.sender_yield_every_n_chunks
577 ))));
578 }
579
580 if self.receiver_yield_every_n_chunks < 1 {
581 return Err(Error::Config(ConfigError::Message(format!(
582 "receiver_yield_every_n_chunks must be >= 1, (got {})",
583 self.receiver_yield_every_n_chunks
584 ))));
585 }
586
587 if self.push_queue_size < 1 {
588 return Err(Error::Config(ConfigError::Message(format!(
589 "push_queue_size must be >= 1, (got {})",
590 self.push_queue_size
591 ))));
592 }
593
594 if self.snapshot_push_max_retry < 1 {
595 return Err(Error::Config(ConfigError::Message(format!(
596 "snapshot_push_max_retry must be >= 1, (got {})",
597 self.snapshot_push_max_retry
598 ))));
599 }
600
601 Ok(())
602 }
603}
604
605fn default_snapshot_enabled() -> bool {
606 true
607}
608
609fn default_max_log_entries_before_snapshot() -> u64 {
611 1000
612}
613
614fn default_snapshot_cool_down_since_last_check() -> Duration {
619 Duration::from_secs(3600)
620}
621
622fn default_cleanup_retain_count() -> u64 {
624 2
625}
626fn default_snapshots_dir() -> PathBuf {
628 PathBuf::from("/tmp/snapshots")
629}
630fn default_snapshots_dir_prefix() -> String {
632 "snapshot-".to_string()
633}
634
635fn default_chunk_size() -> usize {
637 1024
638}
639
640#[derive(Debug, Serialize, Deserialize, Clone)]
641pub struct AutoJoinConfig {
642 #[serde(default = "default_rpc_enable_compression")]
643 pub rpc_enable_compression: bool,
644}
645impl Default for AutoJoinConfig {
646 fn default() -> Self {
647 Self {
648 rpc_enable_compression: default_rpc_enable_compression(),
649 }
650 }
651}
652fn default_rpc_enable_compression() -> bool {
653 true
654}
655
656fn default_retained_log_entries() -> u64 {
657 1
658}
659
660fn default_sender_yield_every_n_chunks() -> usize {
661 1
662}
663
664fn default_receiver_yield_every_n_chunks() -> usize {
665 1
666}
667
668fn default_max_bandwidth_mbps() -> u32 {
669 1
670}
671
672fn default_push_queue_size() -> usize {
673 100
674}
675
676fn default_cache_size() -> usize {
677 10000
678}
679fn default_max_retries() -> u32 {
680 1
681}
682fn default_transfer_timeout_in_sec() -> u64 {
683 600
684}
685fn default_retry_interval_in_ms() -> u64 {
686 10
687}
688fn default_snapshot_push_backoff_in_ms() -> u64 {
689 100
690}
691fn default_snapshot_push_max_retry() -> u32 {
692 3
693}
694fn default_push_timeout_in_ms() -> u64 {
695 300_000
696}
697
698#[derive(Debug, Serialize, Deserialize, Clone)]
699pub struct ZombieConfig {
700 #[serde(default = "default_zombie_threshold")]
702 pub threshold: u32,
703
704 #[serde(default = "default_zombie_purge_interval")]
705 pub purge_interval: Duration,
706}
707
708impl Default for ZombieConfig {
709 fn default() -> Self {
710 Self {
711 threshold: default_zombie_threshold(),
712 purge_interval: default_zombie_purge_interval(),
713 }
714 }
715}
716
717fn default_zombie_threshold() -> u32 {
718 3
719}
720fn default_zombie_purge_interval() -> Duration {
722 Duration::from_secs(30)
723}
724
725#[derive(Debug, Serialize, Deserialize, Clone)]
726pub struct PromotionConfig {
727 #[serde(default = "default_stale_learner_threshold")]
728 pub stale_learner_threshold: Duration,
729 #[serde(default = "default_stale_check_interval")]
730 pub stale_check_interval: Duration,
731}
732
733impl Default for PromotionConfig {
734 fn default() -> Self {
735 Self {
736 stale_learner_threshold: default_stale_learner_threshold(),
737 stale_check_interval: default_stale_check_interval(),
738 }
739 }
740}
741
742fn default_stale_learner_threshold() -> Duration {
744 Duration::from_secs(300)
745}
746fn default_stale_check_interval() -> Duration {
748 Duration::from_secs(30)
749}
750#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
758pub enum PersistenceStrategy {
759 DiskFirst,
772
773 MemFirst,
787}
788
789#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
791pub enum FlushPolicy {
792 Immediate,
797
798 Batch { threshold: usize, interval_ms: u64 },
805}
806
807#[derive(Serialize, Deserialize, Clone, Debug)]
809pub struct PersistenceConfig {
810 #[serde(default = "default_persistence_strategy")]
816 pub strategy: PersistenceStrategy,
817
818 #[serde(default = "default_flush_policy")]
823 pub flush_policy: FlushPolicy,
824
825 #[serde(default = "default_max_buffered_entries")]
830 pub max_buffered_entries: usize,
831
832 #[serde(default = "default_flush_workers")]
842 pub flush_workers: usize,
843
844 #[serde(default = "default_channel_capacity")]
851 pub channel_capacity: usize,
852}
853
854fn default_persistence_strategy() -> PersistenceStrategy {
856 PersistenceStrategy::MemFirst
857}
858
859fn default_flush_workers() -> usize {
861 2
862}
863
864fn default_channel_capacity() -> usize {
866 100
867}
868
869fn default_flush_policy() -> FlushPolicy {
874 FlushPolicy::Batch {
875 threshold: 1024,
876 interval_ms: 100,
877 }
878}
879
880fn default_max_buffered_entries() -> usize {
882 10_000
883}
884
885impl Default for PersistenceConfig {
886 fn default() -> Self {
887 Self {
888 strategy: default_persistence_strategy(),
889 flush_policy: default_flush_policy(),
890 max_buffered_entries: default_max_buffered_entries(),
891 flush_workers: default_flush_workers(),
892 channel_capacity: default_channel_capacity(),
893 }
894 }
895}
896
897#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
902pub enum ReadConsistencyPolicy {
903 LeaseRead,
910
911 #[default]
917 LinearizableRead,
918
919 EventualConsistency,
927}
928
929#[derive(Clone, Debug, Serialize, Deserialize)]
931pub struct ReadConsistencyConfig {
932 #[serde(default)]
937 pub default_policy: ReadConsistencyPolicy,
938
939 #[serde(default = "default_lease_duration_ms")]
944 pub lease_duration_ms: u64,
945
946 #[serde(default = "default_allow_client_override")]
951 pub allow_client_override: bool,
952
953 #[serde(default = "default_state_machine_sync_timeout_ms")]
959 pub state_machine_sync_timeout_ms: u64,
960
961 #[serde(default)]
963 pub read_batching: ReadBatchingConfig,
964}
965
966#[derive(Clone, Debug, Serialize, Deserialize)]
968pub struct ReadBatchingConfig {
969 pub size_threshold: usize,
971
972 pub time_threshold_ms: u64,
974}
975
976impl Default for ReadBatchingConfig {
977 fn default() -> Self {
978 Self {
979 size_threshold: 50,
980 time_threshold_ms: 10,
981 }
982 }
983}
984
985impl Default for ReadConsistencyConfig {
986 fn default() -> Self {
987 Self {
988 default_policy: ReadConsistencyPolicy::default(),
989 lease_duration_ms: default_lease_duration_ms(),
990 allow_client_override: default_allow_client_override(),
991 state_machine_sync_timeout_ms: default_state_machine_sync_timeout_ms(),
992 read_batching: ReadBatchingConfig::default(),
993 }
994 }
995}
996
997fn default_lease_duration_ms() -> u64 {
998 250
1000}
1001
1002fn default_allow_client_override() -> bool {
1003 true
1005}
1006
1007fn default_state_machine_sync_timeout_ms() -> u64 {
1008 10 }
1010
1011impl ReadConsistencyConfig {
1012 fn validate(&self) -> Result<()> {
1013 if self.lease_duration_ms == 0 {
1015 return Err(Error::Config(ConfigError::Message(
1016 "read_consistency.lease_duration_ms must be greater than 0".into(),
1017 )));
1018 }
1019 Ok(())
1020 }
1021}
1022
1023impl From<d_engine_proto::client::ReadConsistencyPolicy> for ReadConsistencyPolicy {
1024 fn from(proto_policy: d_engine_proto::client::ReadConsistencyPolicy) -> Self {
1025 match proto_policy {
1026 d_engine_proto::client::ReadConsistencyPolicy::LeaseRead => Self::LeaseRead,
1027 d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead => {
1028 Self::LinearizableRead
1029 }
1030 d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency => {
1031 Self::EventualConsistency
1032 }
1033 }
1034 }
1035}
1036
1037impl From<ReadConsistencyPolicy> for d_engine_proto::client::ReadConsistencyPolicy {
1038 fn from(config_policy: ReadConsistencyPolicy) -> Self {
1039 match config_policy {
1040 ReadConsistencyPolicy::LeaseRead => {
1041 d_engine_proto::client::ReadConsistencyPolicy::LeaseRead
1042 }
1043 ReadConsistencyPolicy::LinearizableRead => {
1044 d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead
1045 }
1046 ReadConsistencyPolicy::EventualConsistency => {
1047 d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency
1048 }
1049 }
1050 }
1051}
1052
1053#[derive(Debug, Clone, Serialize, Deserialize)]
1060pub struct RpcCompressionConfig {
1061 #[serde(default = "default_replication_compression")]
1069 pub replication_response: bool,
1070
1071 #[serde(default = "default_election_compression")]
1078 pub election_response: bool,
1079
1080 #[serde(default = "default_snapshot_compression")]
1087 pub snapshot_response: bool,
1088
1089 #[serde(default = "default_cluster_compression")]
1096 pub cluster_response: bool,
1097
1098 #[serde(default = "default_client_compression")]
1105 pub client_response: bool,
1106}
1107
1108impl Default for RpcCompressionConfig {
1109 fn default() -> Self {
1110 Self {
1111 replication_response: default_replication_compression(),
1112 election_response: default_election_compression(),
1113 snapshot_response: default_snapshot_compression(),
1114 cluster_response: default_cluster_compression(),
1115 client_response: default_client_compression(),
1116 }
1117 }
1118}
1119
1120fn default_replication_compression() -> bool {
1122 false
1125}
1126
1127fn default_election_compression() -> bool {
1128 true
1130}
1131
1132fn default_snapshot_compression() -> bool {
1133 true
1135}
1136
1137fn default_cluster_compression() -> bool {
1138 true
1140}
1141
1142fn default_client_compression() -> bool {
1143 false
1145}
1146
1147#[derive(Debug, Serialize, Deserialize, Clone)]
1168pub struct WatchConfig {
1169 #[serde(default = "default_event_queue_size")]
1186 pub event_queue_size: usize,
1187
1188 #[serde(default = "default_watcher_buffer_size")]
1205 pub watcher_buffer_size: usize,
1206
1207 #[serde(default = "default_enable_watch_metrics")]
1215 pub enable_metrics: bool,
1216}
1217
1218impl Default for WatchConfig {
1219 fn default() -> Self {
1220 Self {
1221 event_queue_size: default_event_queue_size(),
1222 watcher_buffer_size: default_watcher_buffer_size(),
1223 enable_metrics: default_enable_watch_metrics(),
1224 }
1225 }
1226}
1227
1228impl WatchConfig {
1229 pub fn validate(&self) -> Result<()> {
1231 if self.event_queue_size == 0 {
1232 return Err(Error::Config(ConfigError::Message(
1233 "watch.event_queue_size must be greater than 0".into(),
1234 )));
1235 }
1236
1237 if self.event_queue_size > 100_000 {
1238 warn!(
1239 "watch.event_queue_size ({}) is very large and may consume significant memory (~{}MB)",
1240 self.event_queue_size,
1241 (self.event_queue_size * 24) / 1_000_000
1242 );
1243 }
1244
1245 if self.watcher_buffer_size == 0 {
1246 return Err(Error::Config(ConfigError::Message(
1247 "watch.watcher_buffer_size must be greater than 0".into(),
1248 )));
1249 }
1250
1251 if self.watcher_buffer_size > 1000 {
1252 warn!(
1253 "watch.watcher_buffer_size ({}) is very large. Each watcher will consume ~{}KB memory",
1254 self.watcher_buffer_size,
1255 (self.watcher_buffer_size * 240) / 1000
1256 );
1257 }
1258
1259 Ok(())
1260 }
1261}
1262
1263const fn default_event_queue_size() -> usize {
1264 1000
1265}
1266
1267const fn default_watcher_buffer_size() -> usize {
1268 10
1269}
1270
1271const fn default_enable_watch_metrics() -> bool {
1272 false
1273}