1use std::fmt::Debug;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use config::ConfigError;
6use serde::Deserialize;
7use serde::Serialize;
8use tracing::warn;
9
10use super::lease::LeaseConfig;
11use super::validate_directory;
12use crate::Error;
13use crate::Result;
14
15#[derive(Serialize, Deserialize, Clone)]
17pub struct RaftConfig {
18 #[serde(default)]
21 pub replication: ReplicationConfig,
22
23 #[serde(default)]
26 pub election: ElectionConfig,
27
28 #[serde(default)]
31 pub membership: MembershipConfig,
32
33 #[serde(default)]
36 pub commit_handler: CommitHandlerConfig,
37
38 #[serde(default, alias = "storage")]
42 pub state_machine: StateMachineConfig,
43
44 #[serde(default)]
46 pub snapshot: SnapshotConfig,
47
48 #[serde(default)]
51 pub persistence: PersistenceConfig,
52
53 #[serde(default = "default_learner_catchup_threshold")]
57 pub learner_catchup_threshold: u64,
58
59 #[serde(default = "default_learner_check_throttle_ms")]
63 pub learner_check_throttle_ms: u64,
64
65 #[serde(default = "default_general_timeout")]
69 pub general_raft_timeout_duration_in_ms: u64,
70
71 #[serde(default = "default_snapshot_rpc_timeout_ms")]
73 pub snapshot_rpc_timeout_ms: u64,
74
75 #[serde(default)]
77 pub auto_join: AutoJoinConfig,
78
79 #[serde(default)]
82 pub read_consistency: ReadConsistencyConfig,
83
84 #[serde(default)]
90 pub rpc_compression: RpcCompressionConfig,
91
92 #[serde(default)]
95 pub watch: WatchConfig,
96}
97
98impl Debug for RaftConfig {
99 fn fmt(
100 &self,
101 f: &mut std::fmt::Formatter<'_>,
102 ) -> std::fmt::Result {
103 f.debug_struct("RaftConfig").finish()
104 }
105}
106impl Default for RaftConfig {
107 fn default() -> Self {
108 Self {
109 replication: ReplicationConfig::default(),
110 election: ElectionConfig::default(),
111 membership: MembershipConfig::default(),
112 commit_handler: CommitHandlerConfig::default(),
113 state_machine: StateMachineConfig::default(),
114 snapshot: SnapshotConfig::default(),
115 persistence: PersistenceConfig::default(),
116 learner_catchup_threshold: default_learner_catchup_threshold(),
117 learner_check_throttle_ms: default_learner_check_throttle_ms(),
118 general_raft_timeout_duration_in_ms: default_general_timeout(),
119 auto_join: AutoJoinConfig::default(),
120 snapshot_rpc_timeout_ms: default_snapshot_rpc_timeout_ms(),
121 read_consistency: ReadConsistencyConfig::default(),
122 rpc_compression: RpcCompressionConfig::default(),
123 watch: WatchConfig::default(),
124 }
125 }
126}
127impl RaftConfig {
128 pub fn validate(&self) -> Result<()> {
130 if self.learner_catchup_threshold == 0 {
131 return Err(Error::Config(ConfigError::Message(
132 "learner_catchup_threshold must be greater than 0".into(),
133 )));
134 }
135
136 if self.general_raft_timeout_duration_in_ms < 1 {
137 return Err(Error::Config(ConfigError::Message(
138 "general_raft_timeout_duration_in_ms must be at least 1ms".into(),
139 )));
140 }
141
142 self.replication.validate()?;
143 self.election.validate()?;
144 self.membership.validate()?;
145 self.commit_handler.validate()?;
146 self.state_machine.validate()?;
147 self.snapshot.validate()?;
148 self.read_consistency.validate()?;
149 self.watch.validate()?;
150
151 if self.read_consistency.lease_duration_ms > self.election.election_timeout_min / 2 {
153 warn!(
154 "read_consistency.lease_duration_ms ({}) is greater than half of election_timeout_min ({}ms). \
155 This may cause lease expiration during normal operation.",
156 self.read_consistency.lease_duration_ms,
157 self.election.election_timeout_min / 2
158 );
159 }
160
161 Ok(())
162 }
163}
164
165fn default_learner_catchup_threshold() -> u64 {
166 1
167}
168
169fn default_learner_check_throttle_ms() -> u64 {
170 1000 }
172
173fn default_general_timeout() -> u64 {
175 50
176}
177fn default_snapshot_rpc_timeout_ms() -> u64 {
178 3_600_000
180}
181#[derive(Debug, Serialize, Deserialize, Clone)]
182pub struct ReplicationConfig {
183 #[serde(default = "default_append_interval")]
184 pub rpc_append_entries_clock_in_ms: u64,
185
186 #[serde(default = "default_batch_threshold")]
187 pub rpc_append_entries_in_batch_threshold: usize,
188
189 #[serde(default = "default_batch_delay")]
190 pub rpc_append_entries_batch_process_delay_in_ms: u64,
191
192 #[serde(default = "default_entries_per_replication")]
193 pub append_entries_max_entries_per_replication: u64,
194}
195
196impl Default for ReplicationConfig {
197 fn default() -> Self {
198 Self {
199 rpc_append_entries_clock_in_ms: default_append_interval(),
200 rpc_append_entries_in_batch_threshold: default_batch_threshold(),
201 rpc_append_entries_batch_process_delay_in_ms: default_batch_delay(),
202 append_entries_max_entries_per_replication: default_entries_per_replication(),
203 }
204 }
205}
206impl ReplicationConfig {
207 fn validate(&self) -> Result<()> {
208 if self.rpc_append_entries_clock_in_ms == 0 {
209 return Err(Error::Config(ConfigError::Message(
210 "rpc_append_entries_clock_in_ms cannot be 0".into(),
211 )));
212 }
213
214 if self.rpc_append_entries_in_batch_threshold == 0 {
215 return Err(Error::Config(ConfigError::Message(
216 "rpc_append_entries_in_batch_threshold must be > 0".into(),
217 )));
218 }
219
220 if self.append_entries_max_entries_per_replication == 0 {
221 return Err(Error::Config(ConfigError::Message(
222 "append_entries_max_entries_per_replication must be > 0".into(),
223 )));
224 }
225
226 if self.rpc_append_entries_batch_process_delay_in_ms >= self.rpc_append_entries_clock_in_ms
227 {
228 return Err(Error::Config(ConfigError::Message(format!(
229 "batch_delay {}ms should be less than append_interval {}ms",
230 self.rpc_append_entries_batch_process_delay_in_ms,
231 self.rpc_append_entries_clock_in_ms
232 ))));
233 }
234
235 Ok(())
236 }
237}
238fn default_append_interval() -> u64 {
239 100
240}
241fn default_batch_threshold() -> usize {
242 100
243}
244fn default_batch_delay() -> u64 {
245 1
246}
247fn default_entries_per_replication() -> u64 {
248 100
249}
250#[derive(Debug, Serialize, Deserialize, Clone)]
251pub struct ElectionConfig {
252 #[serde(default = "default_election_timeout_min")]
253 pub election_timeout_min: u64,
254
255 #[serde(default = "default_election_timeout_max")]
256 pub election_timeout_max: u64,
257
258 #[serde(default = "default_peer_monitor_interval")]
259 pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
260
261 #[serde(default = "default_client_request_id")]
262 pub internal_rpc_client_request_id: u32,
263}
264
265impl Default for ElectionConfig {
266 fn default() -> Self {
267 Self {
268 election_timeout_min: default_election_timeout_min(),
269 election_timeout_max: default_election_timeout_max(),
270 rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
271 internal_rpc_client_request_id: default_client_request_id(),
272 }
273 }
274}
275impl ElectionConfig {
276 fn validate(&self) -> Result<()> {
277 if self.election_timeout_min >= self.election_timeout_max {
278 return Err(Error::Config(ConfigError::Message(format!(
279 "election_timeout_min {}ms must be less than election_timeout_max {}ms",
280 self.election_timeout_min, self.election_timeout_max
281 ))));
282 }
283
284 if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
285 return Err(Error::Config(ConfigError::Message(
286 "rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
287 )));
288 }
289
290 Ok(())
291 }
292}
293fn default_election_timeout_min() -> u64 {
294 500
295}
296fn default_election_timeout_max() -> u64 {
297 1000
298}
299fn default_peer_monitor_interval() -> u64 {
300 30
301}
302fn default_client_request_id() -> u32 {
303 0
304}
305
306#[derive(Debug, Serialize, Deserialize, Clone)]
307pub struct MembershipConfig {
308 #[serde(default = "default_probe_service")]
309 pub cluster_healthcheck_probe_service_name: String,
310
311 #[serde(default = "default_verify_leadership_persistent_timeout")]
312 pub verify_leadership_persistent_timeout: Duration,
313
314 #[serde(default = "default_membership_maintenance_interval")]
315 pub membership_maintenance_interval: Duration,
316
317 #[serde(default)]
318 pub zombie: ZombieConfig,
319
320 #[serde(default)]
322 pub promotion: PromotionConfig,
323}
324impl Default for MembershipConfig {
325 fn default() -> Self {
326 Self {
327 cluster_healthcheck_probe_service_name: default_probe_service(),
328 verify_leadership_persistent_timeout: default_verify_leadership_persistent_timeout(),
329 membership_maintenance_interval: default_membership_maintenance_interval(),
330 zombie: ZombieConfig::default(),
331 promotion: PromotionConfig::default(),
332 }
333 }
334}
335fn default_probe_service() -> String {
336 "d_engine.server.cluster.ClusterManagementService".to_string()
337}
338
339fn default_membership_maintenance_interval() -> Duration {
341 Duration::from_secs(30)
342}
343
344fn default_verify_leadership_persistent_timeout() -> Duration {
351 Duration::from_secs(3600)
352}
353
354impl MembershipConfig {
355 fn validate(&self) -> Result<()> {
356 if self.cluster_healthcheck_probe_service_name.is_empty() {
357 return Err(Error::Config(ConfigError::Message(
358 "cluster_healthcheck_probe_service_name cannot be empty".into(),
359 )));
360 }
361 Ok(())
362 }
363}
364
365#[derive(Debug, Serialize, Deserialize, Clone)]
367pub struct CommitHandlerConfig {
368 #[serde(default = "default_batch_size_threshold")]
369 pub batch_size_threshold: u64,
370
371 #[serde(default = "default_process_interval_ms")]
372 pub process_interval_ms: u64,
373
374 #[serde(default = "default_max_entries_per_chunk")]
375 pub max_entries_per_chunk: usize,
376}
377impl Default for CommitHandlerConfig {
378 fn default() -> Self {
379 Self {
380 batch_size_threshold: default_batch_size_threshold(),
381 process_interval_ms: default_process_interval_ms(),
382 max_entries_per_chunk: default_max_entries_per_chunk(),
383 }
384 }
385}
386impl CommitHandlerConfig {
387 fn validate(&self) -> Result<()> {
388 if self.batch_size_threshold == 0 {
389 return Err(Error::Config(ConfigError::Message(
390 "batch_size_threshold must be > 0".into(),
391 )));
392 }
393
394 if self.process_interval_ms == 0 {
395 return Err(Error::Config(ConfigError::Message(
396 "process_interval_ms must be > 0".into(),
397 )));
398 }
399
400 if self.max_entries_per_chunk == 0 {
401 return Err(Error::Config(ConfigError::Message(
402 "max_entries_per_chunk must be > 0".into(),
403 )));
404 }
405
406 Ok(())
407 }
408}
409fn default_batch_size_threshold() -> u64 {
410 100
411}
412fn default_process_interval_ms() -> u64 {
413 10
414}
415fn default_max_entries_per_chunk() -> usize {
416 10
417}
418
419#[derive(Serialize, Deserialize, Clone, Debug)]
425#[derive(Default)]
426pub struct StateMachineConfig {
427 #[serde(alias = "ttl")]
431 pub lease: LeaseConfig,
432}
433
434impl StateMachineConfig {
435 pub fn validate(&self) -> Result<()> {
436 self.lease.validate()?;
437 Ok(())
438 }
439}
440
441#[derive(Debug, Serialize, Deserialize, Clone)]
443pub struct SnapshotConfig {
444 #[serde(default = "default_snapshot_enabled")]
446 pub enable: bool,
447
448 #[serde(default = "default_max_log_entries_before_snapshot")]
451 pub max_log_entries_before_snapshot: u64,
452
453 #[serde(default = "default_snapshot_cool_down_since_last_check")]
456 pub snapshot_cool_down_since_last_check: Duration,
457
458 #[serde(default = "default_cleanup_retain_count")]
461 pub cleanup_retain_count: u64,
462
463 #[serde(default = "default_snapshots_dir")]
467 pub snapshots_dir: PathBuf,
468
469 #[serde(default = "default_snapshots_dir_prefix")]
470 pub snapshots_dir_prefix: String,
471
472 #[serde(default = "default_chunk_size")]
476 pub chunk_size: usize,
477
478 #[serde(default = "default_retained_log_entries")]
480 pub retained_log_entries: u64,
481
482 #[serde(default = "default_sender_yield_every_n_chunks")]
484 pub sender_yield_every_n_chunks: usize,
485
486 #[serde(default = "default_receiver_yield_every_n_chunks")]
488 pub receiver_yield_every_n_chunks: usize,
489
490 #[serde(default = "default_max_bandwidth_mbps")]
491 pub max_bandwidth_mbps: u32,
492
493 #[serde(default = "default_push_queue_size")]
494 pub push_queue_size: usize,
495
496 #[serde(default = "default_cache_size")]
497 pub cache_size: usize,
498
499 #[serde(default = "default_max_retries")]
500 pub max_retries: u32,
501
502 #[serde(default = "default_transfer_timeout_in_sec")]
503 pub transfer_timeout_in_sec: u64,
504
505 #[serde(default = "default_retry_interval_in_ms")]
506 pub retry_interval_in_ms: u64,
507
508 #[serde(default = "default_snapshot_push_backoff_in_ms")]
509 pub snapshot_push_backoff_in_ms: u64,
510
511 #[serde(default = "default_snapshot_push_max_retry")]
512 pub snapshot_push_max_retry: u32,
513
514 #[serde(default = "default_push_timeout_in_ms")]
515 pub push_timeout_in_ms: u64,
516}
517impl Default for SnapshotConfig {
518 fn default() -> Self {
519 Self {
520 max_log_entries_before_snapshot: default_max_log_entries_before_snapshot(),
521 snapshot_cool_down_since_last_check: default_snapshot_cool_down_since_last_check(),
522 cleanup_retain_count: default_cleanup_retain_count(),
523 snapshots_dir: default_snapshots_dir(),
524 snapshots_dir_prefix: default_snapshots_dir_prefix(),
525 chunk_size: default_chunk_size(),
526 retained_log_entries: default_retained_log_entries(),
527 sender_yield_every_n_chunks: default_sender_yield_every_n_chunks(),
528 receiver_yield_every_n_chunks: default_receiver_yield_every_n_chunks(),
529 max_bandwidth_mbps: default_max_bandwidth_mbps(),
530 push_queue_size: default_push_queue_size(),
531 cache_size: default_cache_size(),
532 max_retries: default_max_retries(),
533 transfer_timeout_in_sec: default_transfer_timeout_in_sec(),
534 retry_interval_in_ms: default_retry_interval_in_ms(),
535 snapshot_push_backoff_in_ms: default_snapshot_push_backoff_in_ms(),
536 snapshot_push_max_retry: default_snapshot_push_max_retry(),
537 push_timeout_in_ms: default_push_timeout_in_ms(),
538 enable: default_snapshot_enabled(),
539 }
540 }
541}
542impl SnapshotConfig {
543 fn validate(&self) -> Result<()> {
544 if self.max_log_entries_before_snapshot == 0 {
545 return Err(Error::Config(ConfigError::Message(
546 "max_log_entries_before_snapshot must be greater than 0".into(),
547 )));
548 }
549
550 if self.cleanup_retain_count == 0 {
551 return Err(Error::Config(ConfigError::Message(
552 "cleanup_retain_count must be greater than 0".into(),
553 )));
554 }
555 validate_directory(&self.snapshots_dir, "snapshots_dir")?;
557
558 if self.chunk_size == 0 {
560 return Err(Error::Config(ConfigError::Message(format!(
561 "chunk_size must be at least {} bytes (got {})",
562 0, self.chunk_size
563 ))));
564 }
565
566 if self.retained_log_entries < 1 {
567 return Err(Error::Config(ConfigError::Message(format!(
568 "retained_log_entries must be >= 1, (got {})",
569 self.retained_log_entries
570 ))));
571 }
572
573 if self.sender_yield_every_n_chunks < 1 {
574 return Err(Error::Config(ConfigError::Message(format!(
575 "sender_yield_every_n_chunks must be >= 1, (got {})",
576 self.sender_yield_every_n_chunks
577 ))));
578 }
579
580 if self.receiver_yield_every_n_chunks < 1 {
581 return Err(Error::Config(ConfigError::Message(format!(
582 "receiver_yield_every_n_chunks must be >= 1, (got {})",
583 self.receiver_yield_every_n_chunks
584 ))));
585 }
586
587 if self.push_queue_size < 1 {
588 return Err(Error::Config(ConfigError::Message(format!(
589 "push_queue_size must be >= 1, (got {})",
590 self.push_queue_size
591 ))));
592 }
593
594 if self.snapshot_push_max_retry < 1 {
595 return Err(Error::Config(ConfigError::Message(format!(
596 "snapshot_push_max_retry must be >= 1, (got {})",
597 self.snapshot_push_max_retry
598 ))));
599 }
600
601 Ok(())
602 }
603}
604
605fn default_snapshot_enabled() -> bool {
606 true
607}
608
609fn default_max_log_entries_before_snapshot() -> u64 {
611 1000
612}
613
614fn default_snapshot_cool_down_since_last_check() -> Duration {
619 Duration::from_secs(3600)
620}
621
622fn default_cleanup_retain_count() -> u64 {
624 2
625}
626fn default_snapshots_dir() -> PathBuf {
628 PathBuf::from("/tmp/snapshots")
629}
630fn default_snapshots_dir_prefix() -> String {
632 "snapshot-".to_string()
633}
634
635fn default_chunk_size() -> usize {
637 1024
638}
639
640#[derive(Debug, Serialize, Deserialize, Clone)]
641pub struct AutoJoinConfig {
642 #[serde(default = "default_rpc_enable_compression")]
643 pub rpc_enable_compression: bool,
644}
645impl Default for AutoJoinConfig {
646 fn default() -> Self {
647 Self {
648 rpc_enable_compression: default_rpc_enable_compression(),
649 }
650 }
651}
652fn default_rpc_enable_compression() -> bool {
653 true
654}
655
656fn default_retained_log_entries() -> u64 {
657 1
658}
659
660fn default_sender_yield_every_n_chunks() -> usize {
661 1
662}
663
664fn default_receiver_yield_every_n_chunks() -> usize {
665 1
666}
667
668fn default_max_bandwidth_mbps() -> u32 {
669 1
670}
671
672fn default_push_queue_size() -> usize {
673 100
674}
675
676fn default_cache_size() -> usize {
677 10000
678}
679fn default_max_retries() -> u32 {
680 1
681}
682fn default_transfer_timeout_in_sec() -> u64 {
683 600
684}
685fn default_retry_interval_in_ms() -> u64 {
686 10
687}
688fn default_snapshot_push_backoff_in_ms() -> u64 {
689 100
690}
691fn default_snapshot_push_max_retry() -> u32 {
692 3
693}
694fn default_push_timeout_in_ms() -> u64 {
695 300_000
696}
697
698#[derive(Debug, Serialize, Deserialize, Clone)]
699pub struct ZombieConfig {
700 #[serde(default = "default_zombie_threshold")]
702 pub threshold: u32,
703
704 #[serde(default = "default_zombie_purge_interval")]
705 pub purge_interval: Duration,
706}
707
708impl Default for ZombieConfig {
709 fn default() -> Self {
710 Self {
711 threshold: default_zombie_threshold(),
712 purge_interval: default_zombie_purge_interval(),
713 }
714 }
715}
716
717fn default_zombie_threshold() -> u32 {
718 3
719}
720fn default_zombie_purge_interval() -> Duration {
722 Duration::from_secs(30)
723}
724
725#[derive(Debug, Serialize, Deserialize, Clone)]
726pub struct PromotionConfig {
727 #[serde(default = "default_stale_learner_threshold")]
728 pub stale_learner_threshold: Duration,
729 #[serde(default = "default_stale_check_interval")]
730 pub stale_check_interval: Duration,
731}
732
733impl Default for PromotionConfig {
734 fn default() -> Self {
735 Self {
736 stale_learner_threshold: default_stale_learner_threshold(),
737 stale_check_interval: default_stale_check_interval(),
738 }
739 }
740}
741
742fn default_stale_learner_threshold() -> Duration {
744 Duration::from_secs(300)
745}
746fn default_stale_check_interval() -> Duration {
748 Duration::from_secs(30)
749}
750#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
758pub enum PersistenceStrategy {
759 DiskFirst,
772
773 MemFirst,
787}
788
789#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
791pub enum FlushPolicy {
792 Immediate,
797
798 Batch { threshold: usize, interval_ms: u64 },
805}
806
807#[derive(Serialize, Deserialize, Clone, Debug)]
809pub struct PersistenceConfig {
810 #[serde(default = "default_persistence_strategy")]
816 pub strategy: PersistenceStrategy,
817
818 #[serde(default = "default_flush_policy")]
823 pub flush_policy: FlushPolicy,
824
825 #[serde(default = "default_max_buffered_entries")]
830 pub max_buffered_entries: usize,
831
832 #[serde(default = "default_flush_workers")]
842 pub flush_workers: usize,
843
844 #[serde(default = "default_channel_capacity")]
851 pub channel_capacity: usize,
852}
853
854fn default_persistence_strategy() -> PersistenceStrategy {
856 PersistenceStrategy::MemFirst
857}
858
859fn default_flush_workers() -> usize {
861 2
862}
863
864fn default_channel_capacity() -> usize {
866 100
867}
868
869fn default_flush_policy() -> FlushPolicy {
874 FlushPolicy::Batch {
875 threshold: 1024,
876 interval_ms: 100,
877 }
878}
879
880fn default_max_buffered_entries() -> usize {
882 10_000
883}
884
885impl Default for PersistenceConfig {
886 fn default() -> Self {
887 Self {
888 strategy: default_persistence_strategy(),
889 flush_policy: default_flush_policy(),
890 max_buffered_entries: default_max_buffered_entries(),
891 flush_workers: default_flush_workers(),
892 channel_capacity: default_channel_capacity(),
893 }
894 }
895}
896
897#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
902pub enum ReadConsistencyPolicy {
903 LeaseRead,
910
911 #[default]
917 LinearizableRead,
918
919 EventualConsistency,
927}
928
929#[derive(Clone, Debug, Serialize, Deserialize)]
931pub struct ReadConsistencyConfig {
932 #[serde(default)]
937 pub default_policy: ReadConsistencyPolicy,
938
939 #[serde(default = "default_lease_duration_ms")]
944 pub lease_duration_ms: u64,
945
946 #[serde(default = "default_allow_client_override")]
951 pub allow_client_override: bool,
952
953 #[serde(default = "default_state_machine_sync_timeout_ms")]
959 pub state_machine_sync_timeout_ms: u64,
960}
961
962impl Default for ReadConsistencyConfig {
963 fn default() -> Self {
964 Self {
965 default_policy: ReadConsistencyPolicy::default(),
966 lease_duration_ms: default_lease_duration_ms(),
967 allow_client_override: default_allow_client_override(),
968 state_machine_sync_timeout_ms: default_state_machine_sync_timeout_ms(),
969 }
970 }
971}
972
973fn default_lease_duration_ms() -> u64 {
974 250
976}
977
978fn default_allow_client_override() -> bool {
979 true
981}
982
983fn default_state_machine_sync_timeout_ms() -> u64 {
984 10 }
986
987impl ReadConsistencyConfig {
988 fn validate(&self) -> Result<()> {
989 if self.lease_duration_ms == 0 {
991 return Err(Error::Config(ConfigError::Message(
992 "read_consistency.lease_duration_ms must be greater than 0".into(),
993 )));
994 }
995 Ok(())
996 }
997}
998
999impl From<d_engine_proto::client::ReadConsistencyPolicy> for ReadConsistencyPolicy {
1000 fn from(proto_policy: d_engine_proto::client::ReadConsistencyPolicy) -> Self {
1001 match proto_policy {
1002 d_engine_proto::client::ReadConsistencyPolicy::LeaseRead => Self::LeaseRead,
1003 d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead => {
1004 Self::LinearizableRead
1005 }
1006 d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency => {
1007 Self::EventualConsistency
1008 }
1009 }
1010 }
1011}
1012
1013impl From<ReadConsistencyPolicy> for d_engine_proto::client::ReadConsistencyPolicy {
1014 fn from(config_policy: ReadConsistencyPolicy) -> Self {
1015 match config_policy {
1016 ReadConsistencyPolicy::LeaseRead => {
1017 d_engine_proto::client::ReadConsistencyPolicy::LeaseRead
1018 }
1019 ReadConsistencyPolicy::LinearizableRead => {
1020 d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead
1021 }
1022 ReadConsistencyPolicy::EventualConsistency => {
1023 d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency
1024 }
1025 }
1026 }
1027}
1028
1029#[derive(Debug, Clone, Serialize, Deserialize)]
1036pub struct RpcCompressionConfig {
1037 #[serde(default = "default_replication_compression")]
1045 pub replication_response: bool,
1046
1047 #[serde(default = "default_election_compression")]
1054 pub election_response: bool,
1055
1056 #[serde(default = "default_snapshot_compression")]
1063 pub snapshot_response: bool,
1064
1065 #[serde(default = "default_cluster_compression")]
1072 pub cluster_response: bool,
1073
1074 #[serde(default = "default_client_compression")]
1081 pub client_response: bool,
1082}
1083
1084impl Default for RpcCompressionConfig {
1085 fn default() -> Self {
1086 Self {
1087 replication_response: default_replication_compression(),
1088 election_response: default_election_compression(),
1089 snapshot_response: default_snapshot_compression(),
1090 cluster_response: default_cluster_compression(),
1091 client_response: default_client_compression(),
1092 }
1093 }
1094}
1095
1096fn default_replication_compression() -> bool {
1098 false
1101}
1102
1103fn default_election_compression() -> bool {
1104 true
1106}
1107
1108fn default_snapshot_compression() -> bool {
1109 true
1111}
1112
1113fn default_cluster_compression() -> bool {
1114 true
1116}
1117
1118fn default_client_compression() -> bool {
1119 false
1121}
1122
1123#[derive(Debug, Serialize, Deserialize, Clone)]
1144pub struct WatchConfig {
1145 #[serde(default = "default_event_queue_size")]
1162 pub event_queue_size: usize,
1163
1164 #[serde(default = "default_watcher_buffer_size")]
1181 pub watcher_buffer_size: usize,
1182
1183 #[serde(default = "default_enable_watch_metrics")]
1191 pub enable_metrics: bool,
1192}
1193
1194impl Default for WatchConfig {
1195 fn default() -> Self {
1196 Self {
1197 event_queue_size: default_event_queue_size(),
1198 watcher_buffer_size: default_watcher_buffer_size(),
1199 enable_metrics: default_enable_watch_metrics(),
1200 }
1201 }
1202}
1203
1204impl WatchConfig {
1205 pub fn validate(&self) -> Result<()> {
1207 if self.event_queue_size == 0 {
1208 return Err(Error::Config(ConfigError::Message(
1209 "watch.event_queue_size must be greater than 0".into(),
1210 )));
1211 }
1212
1213 if self.event_queue_size > 100_000 {
1214 warn!(
1215 "watch.event_queue_size ({}) is very large and may consume significant memory (~{}MB)",
1216 self.event_queue_size,
1217 (self.event_queue_size * 24) / 1_000_000
1218 );
1219 }
1220
1221 if self.watcher_buffer_size == 0 {
1222 return Err(Error::Config(ConfigError::Message(
1223 "watch.watcher_buffer_size must be greater than 0".into(),
1224 )));
1225 }
1226
1227 if self.watcher_buffer_size > 1000 {
1228 warn!(
1229 "watch.watcher_buffer_size ({}) is very large. Each watcher will consume ~{}KB memory",
1230 self.watcher_buffer_size,
1231 (self.watcher_buffer_size * 240) / 1000
1232 );
1233 }
1234
1235 Ok(())
1236 }
1237}
1238
1239const fn default_event_queue_size() -> usize {
1240 1000
1241}
1242
1243const fn default_watcher_buffer_size() -> usize {
1244 10
1245}
1246
1247const fn default_enable_watch_metrics() -> bool {
1248 false
1249}