1use std::fmt::Debug;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use config::ConfigError;
6use serde::Deserialize;
7use serde::Serialize;
8use tracing::warn;
9
10use super::lease::LeaseConfig;
11use super::validate_directory;
12use crate::Error;
13use crate::Result;
14
15#[derive(Serialize, Deserialize, Clone)]
17pub struct RaftConfig {
18 #[serde(default)]
21 pub replication: ReplicationConfig,
22
23 #[serde(default)]
29 pub batching: BatchingConfig,
30
31 #[serde(default)]
34 pub election: ElectionConfig,
35
36 #[serde(default)]
39 pub membership: MembershipConfig,
40
41 #[serde(default, alias = "storage")]
45 pub state_machine: StateMachineConfig,
46
47 #[serde(default)]
49 pub snapshot: SnapshotConfig,
50
51 #[serde(default)]
54 pub persistence: PersistenceConfig,
55
56 #[serde(default = "default_learner_catchup_threshold")]
60 pub learner_catchup_threshold: u64,
61
62 #[serde(default = "default_learner_check_throttle_ms")]
66 pub learner_check_throttle_ms: u64,
67
68 #[serde(default = "default_general_timeout")]
72 pub general_raft_timeout_duration_in_ms: u64,
73
74 #[serde(default = "default_snapshot_rpc_timeout_ms")]
76 pub snapshot_rpc_timeout_ms: u64,
77
78 #[serde(default = "default_cmd_channel_capacity")]
82 pub cmd_channel_capacity: usize,
83
84 #[serde(default = "default_ordered_channel_capacity")]
88 pub ordered_channel_capacity: usize,
89
90 #[serde(default)]
92 pub auto_join: AutoJoinConfig,
93
94 #[serde(default)]
97 pub read_consistency: ReadConsistencyConfig,
98
99 #[serde(default)]
102 pub backpressure: BackpressureConfig,
103
104 #[serde(default)]
110 pub rpc_compression: RpcCompressionConfig,
111
112 #[serde(default)]
115 pub watch: WatchConfig,
116
117 #[serde(default)]
120 pub metrics: MetricsConfig,
121}
122
123impl Debug for RaftConfig {
124 fn fmt(
125 &self,
126 f: &mut std::fmt::Formatter<'_>,
127 ) -> std::fmt::Result {
128 f.debug_struct("RaftConfig").finish()
129 }
130}
131impl Default for RaftConfig {
132 fn default() -> Self {
133 Self {
134 replication: ReplicationConfig::default(),
135 batching: BatchingConfig::default(),
136 election: ElectionConfig::default(),
137 membership: MembershipConfig::default(),
138 state_machine: StateMachineConfig::default(),
139 snapshot: SnapshotConfig::default(),
140 persistence: PersistenceConfig::default(),
141 learner_catchup_threshold: default_learner_catchup_threshold(),
142 learner_check_throttle_ms: default_learner_check_throttle_ms(),
143 general_raft_timeout_duration_in_ms: default_general_timeout(),
144 auto_join: AutoJoinConfig::default(),
145 snapshot_rpc_timeout_ms: default_snapshot_rpc_timeout_ms(),
146 cmd_channel_capacity: default_cmd_channel_capacity(),
147 ordered_channel_capacity: default_ordered_channel_capacity(),
148 read_consistency: ReadConsistencyConfig::default(),
149 backpressure: BackpressureConfig::default(),
150 rpc_compression: RpcCompressionConfig::default(),
151 watch: WatchConfig::default(),
152 metrics: MetricsConfig::default(),
153 }
154 }
155}
156impl RaftConfig {
157 pub fn validate(&self) -> Result<()> {
159 if self.learner_catchup_threshold == 0 {
160 return Err(Error::Config(ConfigError::Message(
161 "learner_catchup_threshold must be greater than 0".into(),
162 )));
163 }
164
165 if self.general_raft_timeout_duration_in_ms < 1 {
166 return Err(Error::Config(ConfigError::Message(
167 "general_raft_timeout_duration_in_ms must be at least 1ms".into(),
168 )));
169 }
170
171 self.replication.validate()?;
172 self.batching.validate()?;
173 self.election.validate()?;
174 self.membership.validate()?;
175 self.state_machine.validate()?;
176 self.snapshot.validate()?;
177 self.read_consistency.validate()?;
178 self.watch.validate()?;
179 self.persistence.validate()?;
180
181 if self.read_consistency.lease_duration_ms >= self.election.election_timeout_min {
187 return Err(Error::Config(ConfigError::Message(format!(
188 "read_consistency.lease_duration_ms ({}) must be strictly less than \
189 election_timeout_min ({}ms) — required for lease-based linearizable reads \
190 to be safe under partition",
191 self.read_consistency.lease_duration_ms, self.election.election_timeout_min,
192 ))));
193 }
194
195 Ok(())
196 }
197}
198
199fn default_learner_catchup_threshold() -> u64 {
200 1
201}
202
203fn default_learner_check_throttle_ms() -> u64 {
204 1000 }
206
207fn default_general_timeout() -> u64 {
209 50
210}
211fn default_snapshot_rpc_timeout_ms() -> u64 {
212 3_600_000
214}
215
216fn default_cmd_channel_capacity() -> usize {
217 1024
218}
219
220fn default_ordered_channel_capacity() -> usize {
221 1024
222}
223
224#[derive(Debug, Serialize, Deserialize, Clone)]
225pub struct ReplicationConfig {
226 #[serde(default = "default_append_interval")]
228 pub rpc_append_entries_clock_in_ms: u64,
229
230 #[serde(default = "default_entries_per_replication")]
232 pub append_entries_max_entries_per_replication: u64,
233}
234
235impl Default for ReplicationConfig {
236 fn default() -> Self {
237 Self {
238 rpc_append_entries_clock_in_ms: default_append_interval(),
239 append_entries_max_entries_per_replication: default_entries_per_replication(),
240 }
241 }
242}
243impl ReplicationConfig {
244 fn validate(&self) -> Result<()> {
245 if self.rpc_append_entries_clock_in_ms == 0 {
246 return Err(Error::Config(ConfigError::Message(
247 "rpc_append_entries_clock_in_ms cannot be 0".into(),
248 )));
249 }
250
251 if self.append_entries_max_entries_per_replication == 0 {
252 return Err(Error::Config(ConfigError::Message(
253 "append_entries_max_entries_per_replication must be > 0".into(),
254 )));
255 }
256
257 Ok(())
258 }
259}
260
261#[derive(Debug, Serialize, Deserialize, Clone)]
277pub struct BatchingConfig {
278 #[serde(default = "default_max_batch_size")]
282 pub max_batch_size: usize,
283}
284
285impl Default for BatchingConfig {
286 fn default() -> Self {
287 Self {
288 max_batch_size: default_max_batch_size(),
289 }
290 }
291}
292
293impl BatchingConfig {
294 fn validate(&self) -> Result<()> {
295 if self.max_batch_size == 0 {
296 return Err(Error::Config(ConfigError::Message(
297 "batching.max_batch_size must be > 0".into(),
298 )));
299 }
300 Ok(())
301 }
302}
303
304fn default_append_interval() -> u64 {
305 100
306}
307fn default_max_batch_size() -> usize {
308 100
309}
310fn default_entries_per_replication() -> u64 {
311 100
312}
313#[derive(Debug, Serialize, Deserialize, Clone)]
314pub struct ElectionConfig {
315 #[serde(default = "default_election_timeout_min")]
316 pub election_timeout_min: u64,
317
318 #[serde(default = "default_election_timeout_max")]
319 pub election_timeout_max: u64,
320
321 #[serde(default = "default_peer_monitor_interval")]
322 pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
323
324 #[serde(default = "default_client_request_id")]
325 pub internal_rpc_client_request_id: u32,
326}
327
328impl Default for ElectionConfig {
329 fn default() -> Self {
330 Self {
331 election_timeout_min: default_election_timeout_min(),
332 election_timeout_max: default_election_timeout_max(),
333 rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
334 internal_rpc_client_request_id: default_client_request_id(),
335 }
336 }
337}
338impl ElectionConfig {
339 fn validate(&self) -> Result<()> {
340 if self.election_timeout_min >= self.election_timeout_max {
341 return Err(Error::Config(ConfigError::Message(format!(
342 "election_timeout_min {}ms must be less than election_timeout_max {}ms",
343 self.election_timeout_min, self.election_timeout_max
344 ))));
345 }
346
347 if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
348 return Err(Error::Config(ConfigError::Message(
349 "rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
350 )));
351 }
352
353 Ok(())
354 }
355}
356fn default_election_timeout_min() -> u64 {
357 500
358}
359fn default_election_timeout_max() -> u64 {
360 1000
361}
362fn default_peer_monitor_interval() -> u64 {
363 30
364}
365fn default_client_request_id() -> u32 {
366 0
367}
368
369#[derive(Debug, Serialize, Deserialize, Clone)]
370pub struct MembershipConfig {
371 #[serde(default = "default_probe_service")]
372 pub cluster_healthcheck_probe_service_name: String,
373
374 #[serde(default = "default_verify_leadership_persistent_timeout")]
375 pub verify_leadership_persistent_timeout: Duration,
376
377 #[serde(default)]
378 pub zombie: ZombieConfig,
379
380 #[serde(default)]
382 pub promotion: PromotionConfig,
383}
384impl Default for MembershipConfig {
385 fn default() -> Self {
386 Self {
387 cluster_healthcheck_probe_service_name: default_probe_service(),
388 verify_leadership_persistent_timeout: default_verify_leadership_persistent_timeout(),
389 zombie: ZombieConfig::default(),
390 promotion: PromotionConfig::default(),
391 }
392 }
393}
394fn default_probe_service() -> String {
395 "d_engine.server.cluster.ClusterManagementService".to_string()
396}
397
398fn default_verify_leadership_persistent_timeout() -> Duration {
405 Duration::from_secs(3600)
406}
407
408impl MembershipConfig {
409 fn validate(&self) -> Result<()> {
410 if self.cluster_healthcheck_probe_service_name.is_empty() {
411 return Err(Error::Config(ConfigError::Message(
412 "cluster_healthcheck_probe_service_name cannot be empty".into(),
413 )));
414 }
415 Ok(())
416 }
417}
418
419#[derive(Serialize, Deserialize, Clone, Debug)]
425#[derive(Default)]
426pub struct StateMachineConfig {
427 #[serde(alias = "ttl")]
431 pub lease: LeaseConfig,
432}
433
434impl StateMachineConfig {
435 pub fn validate(&self) -> Result<()> {
436 self.lease.validate()?;
437 Ok(())
438 }
439}
440
441#[derive(Debug, Serialize, Deserialize, Clone)]
443pub struct SnapshotConfig {
444 #[serde(default = "default_snapshot_enabled")]
446 pub enable: bool,
447
448 #[serde(default = "default_max_log_entries_before_snapshot")]
451 pub max_log_entries_before_snapshot: u64,
452
453 #[serde(default = "default_snapshot_cool_down_since_last_check")]
456 pub snapshot_cool_down_since_last_check: Duration,
457
458 #[serde(default = "default_cleanup_retain_count")]
461 pub cleanup_retain_count: u64,
462
463 #[serde(default = "default_snapshots_dir")]
467 pub snapshots_dir: PathBuf,
468
469 #[serde(default = "default_snapshots_dir_prefix")]
470 pub snapshots_dir_prefix: String,
471
472 #[serde(default = "default_chunk_size")]
476 pub chunk_size: usize,
477
478 #[serde(default = "default_retained_log_entries")]
480 pub retained_log_entries: u64,
481
482 #[serde(default = "default_sender_yield_every_n_chunks")]
484 pub sender_yield_every_n_chunks: usize,
485
486 #[serde(default = "default_receiver_yield_every_n_chunks")]
488 pub receiver_yield_every_n_chunks: usize,
489
490 #[serde(default = "default_max_bandwidth_mbps")]
491 pub max_bandwidth_mbps: u32,
492
493 #[serde(default = "default_push_queue_size")]
494 pub push_queue_size: usize,
495
496 #[serde(default = "default_cache_size")]
497 pub cache_size: usize,
498
499 #[serde(default = "default_max_retries")]
500 pub max_retries: u32,
501
502 #[serde(default = "default_transfer_timeout_in_sec")]
503 pub transfer_timeout_in_sec: u64,
504
505 #[serde(default = "default_retry_interval_in_ms")]
506 pub retry_interval_in_ms: u64,
507
508 #[serde(default = "default_snapshot_push_backoff_in_ms")]
509 pub snapshot_push_backoff_in_ms: u64,
510
511 #[serde(default = "default_snapshot_push_max_retry")]
512 pub snapshot_push_max_retry: u32,
513
514 #[serde(default = "default_push_timeout_in_ms")]
515 pub push_timeout_in_ms: u64,
516
517 #[serde(default = "default_receive_chunk_timeout_in_sec")]
522 pub receive_chunk_timeout_in_sec: u64,
523}
524impl Default for SnapshotConfig {
525 fn default() -> Self {
526 Self {
527 max_log_entries_before_snapshot: default_max_log_entries_before_snapshot(),
528 snapshot_cool_down_since_last_check: default_snapshot_cool_down_since_last_check(),
529 cleanup_retain_count: default_cleanup_retain_count(),
530 snapshots_dir: default_snapshots_dir(),
531 snapshots_dir_prefix: default_snapshots_dir_prefix(),
532 chunk_size: default_chunk_size(),
533 retained_log_entries: default_retained_log_entries(),
534 sender_yield_every_n_chunks: default_sender_yield_every_n_chunks(),
535 receiver_yield_every_n_chunks: default_receiver_yield_every_n_chunks(),
536 max_bandwidth_mbps: default_max_bandwidth_mbps(),
537 push_queue_size: default_push_queue_size(),
538 cache_size: default_cache_size(),
539 max_retries: default_max_retries(),
540 transfer_timeout_in_sec: default_transfer_timeout_in_sec(),
541 retry_interval_in_ms: default_retry_interval_in_ms(),
542 snapshot_push_backoff_in_ms: default_snapshot_push_backoff_in_ms(),
543 snapshot_push_max_retry: default_snapshot_push_max_retry(),
544 push_timeout_in_ms: default_push_timeout_in_ms(),
545 receive_chunk_timeout_in_sec: default_receive_chunk_timeout_in_sec(),
546 enable: default_snapshot_enabled(),
547 }
548 }
549}
550impl SnapshotConfig {
551 fn validate(&self) -> Result<()> {
552 if self.max_log_entries_before_snapshot == 0 {
553 return Err(Error::Config(ConfigError::Message(
554 "max_log_entries_before_snapshot must be greater than 0".into(),
555 )));
556 }
557
558 if self.cleanup_retain_count == 0 {
559 return Err(Error::Config(ConfigError::Message(
560 "cleanup_retain_count must be greater than 0".into(),
561 )));
562 }
563 validate_directory(&self.snapshots_dir, "snapshots_dir")?;
565
566 if self.chunk_size == 0 {
568 return Err(Error::Config(ConfigError::Message(format!(
569 "chunk_size must be at least {} bytes (got {})",
570 0, self.chunk_size
571 ))));
572 }
573
574 if self.retained_log_entries < 1 {
575 return Err(Error::Config(ConfigError::Message(format!(
576 "retained_log_entries must be >= 1, (got {})",
577 self.retained_log_entries
578 ))));
579 }
580
581 if self.sender_yield_every_n_chunks < 1 {
582 return Err(Error::Config(ConfigError::Message(format!(
583 "sender_yield_every_n_chunks must be >= 1, (got {})",
584 self.sender_yield_every_n_chunks
585 ))));
586 }
587
588 if self.receiver_yield_every_n_chunks < 1 {
589 return Err(Error::Config(ConfigError::Message(format!(
590 "receiver_yield_every_n_chunks must be >= 1, (got {})",
591 self.receiver_yield_every_n_chunks
592 ))));
593 }
594
595 if self.push_queue_size < 1 {
596 return Err(Error::Config(ConfigError::Message(format!(
597 "push_queue_size must be >= 1, (got {})",
598 self.push_queue_size
599 ))));
600 }
601
602 if self.receive_chunk_timeout_in_sec == 0 {
603 return Err(Error::Config(ConfigError::Message(
604 "receive_chunk_timeout_in_sec must be greater than 0".into(),
605 )));
606 }
607
608 if self.snapshot_push_max_retry < 1 {
609 return Err(Error::Config(ConfigError::Message(format!(
610 "snapshot_push_max_retry must be >= 1, (got {})",
611 self.snapshot_push_max_retry
612 ))));
613 }
614
615 Ok(())
616 }
617}
618
619fn default_snapshot_enabled() -> bool {
620 true
621}
622
623fn default_max_log_entries_before_snapshot() -> u64 {
625 1000
626}
627
628fn default_snapshot_cool_down_since_last_check() -> Duration {
633 Duration::from_secs(60)
634}
635
636fn default_cleanup_retain_count() -> u64 {
638 2
639}
640fn default_snapshots_dir() -> PathBuf {
642 PathBuf::from("/tmp/snapshots")
643}
644fn default_snapshots_dir_prefix() -> String {
646 "snapshot-".to_string()
647}
648
649fn default_chunk_size() -> usize {
651 1024
652}
653
654#[derive(Debug, Serialize, Deserialize, Clone)]
655pub struct AutoJoinConfig {
656 #[serde(default = "default_rpc_enable_compression")]
657 pub rpc_enable_compression: bool,
658}
659impl Default for AutoJoinConfig {
660 fn default() -> Self {
661 Self {
662 rpc_enable_compression: default_rpc_enable_compression(),
663 }
664 }
665}
666fn default_rpc_enable_compression() -> bool {
667 true
668}
669
670fn default_retained_log_entries() -> u64 {
671 1
672}
673
674fn default_sender_yield_every_n_chunks() -> usize {
675 1
676}
677
678fn default_receiver_yield_every_n_chunks() -> usize {
679 1
680}
681
682fn default_max_bandwidth_mbps() -> u32 {
683 1
684}
685
686fn default_push_queue_size() -> usize {
687 100
688}
689
690fn default_cache_size() -> usize {
691 10000
692}
693fn default_max_retries() -> u32 {
694 1
695}
696fn default_transfer_timeout_in_sec() -> u64 {
697 600
698}
699fn default_retry_interval_in_ms() -> u64 {
700 10
701}
702fn default_snapshot_push_backoff_in_ms() -> u64 {
703 100
704}
705fn default_snapshot_push_max_retry() -> u32 {
706 3
707}
708fn default_push_timeout_in_ms() -> u64 {
709 300_000
710}
711fn default_receive_chunk_timeout_in_sec() -> u64 {
712 30
713}
714
715#[derive(Debug, Serialize, Deserialize, Clone)]
716pub struct ZombieConfig {
717 #[serde(default = "default_zombie_threshold")]
719 pub threshold: u32,
720
721 #[serde(default = "default_zombie_purge_interval")]
722 pub purge_interval: Duration,
723}
724
725impl Default for ZombieConfig {
726 fn default() -> Self {
727 Self {
728 threshold: default_zombie_threshold(),
729 purge_interval: default_zombie_purge_interval(),
730 }
731 }
732}
733
734fn default_zombie_threshold() -> u32 {
735 3
736}
737fn default_zombie_purge_interval() -> Duration {
739 Duration::from_secs(30)
740}
741
742#[derive(Debug, Serialize, Deserialize, Clone)]
743pub struct PromotionConfig {
744 #[serde(default = "default_stale_learner_threshold")]
745 pub stale_learner_threshold: Duration,
746}
747
748impl Default for PromotionConfig {
749 fn default() -> Self {
750 Self {
751 stale_learner_threshold: default_stale_learner_threshold(),
752 }
753 }
754}
755
756fn default_stale_learner_threshold() -> Duration {
758 Duration::from_secs(300)
759}
760#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
768pub enum PersistenceStrategy {
769 MemFirst,
780}
781
782#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
793pub enum FlushPolicy {
794 Batch { idle_flush_interval_ms: u64 },
795}
796
797#[derive(Serialize, Deserialize, Clone, Debug)]
799pub struct PersistenceConfig {
800 #[serde(default = "default_persistence_strategy")]
806 pub strategy: PersistenceStrategy,
807
808 #[serde(default = "default_flush_policy")]
813 pub flush_policy: FlushPolicy,
814
815 #[serde(default = "default_max_buffered_entries")]
820 pub max_buffered_entries: usize,
821}
822
823fn default_persistence_strategy() -> PersistenceStrategy {
825 PersistenceStrategy::MemFirst
826}
827
828fn default_flush_policy() -> FlushPolicy {
833 FlushPolicy::Batch {
834 idle_flush_interval_ms: 1000,
835 }
836}
837
838fn default_max_buffered_entries() -> usize {
840 10_000
841}
842
843impl PersistenceConfig {
844 pub fn validate(&self) -> Result<()> {
845 let FlushPolicy::Batch {
846 idle_flush_interval_ms,
847 } = self.flush_policy;
848 if idle_flush_interval_ms == 0 {
849 return Err(Error::Config(ConfigError::Message(
850 "flush_policy.idle_flush_interval_ms must be greater than 0".into(),
851 )));
852 }
853 Ok(())
854 }
855}
856
857impl Default for PersistenceConfig {
858 fn default() -> Self {
859 Self {
860 strategy: default_persistence_strategy(),
861 flush_policy: default_flush_policy(),
862 max_buffered_entries: default_max_buffered_entries(),
863 }
864 }
865}
866
867#[derive(Debug, Clone, Serialize, Deserialize)]
889pub struct BackpressureConfig {
890 #[serde(default = "default_max_pending_writes")]
898 pub max_pending_writes: usize,
899
900 #[serde(default = "default_max_pending_reads")]
908 pub max_pending_reads: usize,
909}
910
911impl Default for BackpressureConfig {
912 fn default() -> Self {
913 Self {
914 max_pending_writes: default_max_pending_writes(),
915 max_pending_reads: default_max_pending_reads(),
916 }
917 }
918}
919
920fn default_max_pending_writes() -> usize {
921 10_000
922}
923
924fn default_max_pending_reads() -> usize {
925 50_000
926}
927
928impl BackpressureConfig {
929 pub fn should_reject_write(
934 &self,
935 current_pending: usize,
936 ) -> bool {
937 self.max_pending_writes > 0 && current_pending >= self.max_pending_writes
938 }
939
940 pub fn should_reject_read(
945 &self,
946 current_pending: usize,
947 ) -> bool {
948 self.max_pending_reads > 0 && current_pending >= self.max_pending_reads
949 }
950}
951
952#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
957pub enum ReadConsistencyPolicy {
958 LeaseRead,
965
966 #[default]
972 LinearizableRead,
973
974 EventualConsistency,
982}
983
984#[derive(Clone, Debug, Serialize, Deserialize)]
986pub struct ReadConsistencyConfig {
987 #[serde(default)]
992 pub default_policy: ReadConsistencyPolicy,
993
994 #[serde(default = "default_lease_duration_ms")]
1001 pub lease_duration_ms: u64,
1002
1003 #[serde(default = "default_allow_client_override")]
1008 pub allow_client_override: bool,
1009
1010 #[serde(default = "default_state_machine_sync_timeout_ms")]
1016 pub state_machine_sync_timeout_ms: u64,
1017}
1018
1019impl Default for ReadConsistencyConfig {
1020 fn default() -> Self {
1021 Self {
1022 default_policy: ReadConsistencyPolicy::default(),
1023 lease_duration_ms: default_lease_duration_ms(),
1024 allow_client_override: default_allow_client_override(),
1025 state_machine_sync_timeout_ms: default_state_machine_sync_timeout_ms(),
1026 }
1027 }
1028}
1029
1030fn default_lease_duration_ms() -> u64 {
1031 250
1033}
1034
1035fn default_allow_client_override() -> bool {
1036 true
1038}
1039
1040fn default_state_machine_sync_timeout_ms() -> u64 {
1041 10 }
1043
1044impl ReadConsistencyConfig {
1045 fn validate(&self) -> Result<()> {
1046 if self.lease_duration_ms == 0 {
1048 return Err(Error::Config(ConfigError::Message(
1049 "read_consistency.lease_duration_ms must be greater than 0".into(),
1050 )));
1051 }
1052 Ok(())
1053 }
1054}
1055
1056impl From<d_engine_proto::client::ReadConsistencyPolicy> for ReadConsistencyPolicy {
1057 fn from(proto_policy: d_engine_proto::client::ReadConsistencyPolicy) -> Self {
1058 match proto_policy {
1059 d_engine_proto::client::ReadConsistencyPolicy::LeaseRead => Self::LeaseRead,
1060 d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead => {
1061 Self::LinearizableRead
1062 }
1063 d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency => {
1064 Self::EventualConsistency
1065 }
1066 }
1067 }
1068}
1069
1070impl From<ReadConsistencyPolicy> for d_engine_proto::client::ReadConsistencyPolicy {
1071 fn from(config_policy: ReadConsistencyPolicy) -> Self {
1072 match config_policy {
1073 ReadConsistencyPolicy::LeaseRead => {
1074 d_engine_proto::client::ReadConsistencyPolicy::LeaseRead
1075 }
1076 ReadConsistencyPolicy::LinearizableRead => {
1077 d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead
1078 }
1079 ReadConsistencyPolicy::EventualConsistency => {
1080 d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency
1081 }
1082 }
1083 }
1084}
1085
1086#[derive(Debug, Clone, Serialize, Deserialize)]
1093pub struct RpcCompressionConfig {
1094 #[serde(default = "default_replication_compression")]
1102 pub replication_response: bool,
1103
1104 #[serde(default = "default_election_compression")]
1111 pub election_response: bool,
1112
1113 #[serde(default = "default_snapshot_compression")]
1120 pub snapshot_response: bool,
1121
1122 #[serde(default = "default_cluster_compression")]
1129 pub cluster_response: bool,
1130
1131 #[serde(default = "default_client_compression")]
1138 pub client_response: bool,
1139}
1140
1141impl Default for RpcCompressionConfig {
1142 fn default() -> Self {
1143 Self {
1144 replication_response: default_replication_compression(),
1145 election_response: default_election_compression(),
1146 snapshot_response: default_snapshot_compression(),
1147 cluster_response: default_cluster_compression(),
1148 client_response: default_client_compression(),
1149 }
1150 }
1151}
1152
1153fn default_replication_compression() -> bool {
1155 false
1158}
1159
1160fn default_election_compression() -> bool {
1161 true
1163}
1164
1165fn default_snapshot_compression() -> bool {
1166 true
1168}
1169
1170fn default_cluster_compression() -> bool {
1171 true
1173}
1174
1175fn default_client_compression() -> bool {
1176 false
1178}
1179
1180#[derive(Debug, Serialize, Deserialize, Clone)]
1202pub struct WatchConfig {
1203 #[serde(default = "default_event_queue_size")]
1220 pub event_queue_size: usize,
1221
1222 #[serde(default = "default_watcher_buffer_size")]
1239 pub watcher_buffer_size: usize,
1240
1241 #[serde(default = "default_enable_watch_metrics")]
1249 pub enable_metrics: bool,
1250
1251 #[serde(default = "default_max_watcher_count")]
1259 pub max_watcher_count: usize,
1260
1261 #[serde(default = "default_heartbeat_interval_ms")]
1272 pub heartbeat_interval_ms: u64,
1273}
1274
1275impl Default for WatchConfig {
1276 fn default() -> Self {
1277 Self {
1278 event_queue_size: default_event_queue_size(),
1279 watcher_buffer_size: default_watcher_buffer_size(),
1280 enable_metrics: default_enable_watch_metrics(),
1281 max_watcher_count: default_max_watcher_count(),
1282 heartbeat_interval_ms: default_heartbeat_interval_ms(),
1283 }
1284 }
1285}
1286
1287impl WatchConfig {
1288 pub fn validate(&self) -> Result<()> {
1290 if self.event_queue_size == 0 {
1291 return Err(Error::Config(ConfigError::Message(
1292 "watch.event_queue_size must be greater than 0".into(),
1293 )));
1294 }
1295
1296 if self.event_queue_size > 100_000 {
1297 warn!(
1298 "watch.event_queue_size ({}) is very large and may consume significant memory (~{}MB)",
1299 self.event_queue_size,
1300 (self.event_queue_size * 24) / 1_000_000
1301 );
1302 }
1303
1304 if self.watcher_buffer_size == 0 {
1305 return Err(Error::Config(ConfigError::Message(
1306 "watch.watcher_buffer_size must be greater than 0".into(),
1307 )));
1308 }
1309
1310 if self.watcher_buffer_size > 1000 {
1311 warn!(
1312 "watch.watcher_buffer_size ({}) is very large. Each watcher will consume ~{}KB memory",
1313 self.watcher_buffer_size,
1314 (self.watcher_buffer_size * 240) / 1000
1315 );
1316 }
1317
1318 Ok(())
1319 }
1320}
1321
1322const fn default_event_queue_size() -> usize {
1323 10240
1324}
1325
1326const fn default_watcher_buffer_size() -> usize {
1327 256
1328}
1329
1330const fn default_enable_watch_metrics() -> bool {
1331 false
1332}
1333
1334const fn default_max_watcher_count() -> usize {
1335 9_223_372_036_854_775_807
1338}
1339
1340const fn default_heartbeat_interval_ms() -> u64 {
1341 30_000
1342}
1343
1344#[derive(Debug, Serialize, Deserialize, Clone)]
1357pub struct MetricsConfig {
1358 #[serde(default = "default_enable_backpressure_metrics")]
1365 pub enable_backpressure: bool,
1366
1367 #[serde(default = "default_enable_batch_metrics")]
1374 pub enable_batch: bool,
1375
1376 #[serde(default = "default_metrics_sample_rate")]
1387 pub sample_rate: u32,
1388}
1389
1390impl Default for MetricsConfig {
1391 fn default() -> Self {
1392 Self {
1393 enable_backpressure: default_enable_backpressure_metrics(),
1394 enable_batch: default_enable_batch_metrics(),
1395 sample_rate: default_metrics_sample_rate(),
1396 }
1397 }
1398}
1399
1400fn default_enable_backpressure_metrics() -> bool {
1401 false
1402}
1403
1404fn default_enable_batch_metrics() -> bool {
1405 false
1406}
1407
1408fn default_metrics_sample_rate() -> u32 {
1409 1 }