1use std::fmt::Debug;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use config::ConfigError;
6use serde::Deserialize;
7use serde::Serialize;
8use tracing::warn;
9
10use super::lease::LeaseConfig;
11use super::validate_directory;
12use crate::Error;
13use crate::Result;
14
15#[derive(Serialize, Deserialize, Clone)]
17pub struct RaftConfig {
18 #[serde(default)]
21 pub replication: ReplicationConfig,
22
23 #[serde(default)]
29 pub batching: BatchingConfig,
30
31 #[serde(default)]
34 pub election: ElectionConfig,
35
36 #[serde(default)]
39 pub membership: MembershipConfig,
40
41 #[serde(default, alias = "storage")]
45 pub state_machine: StateMachineConfig,
46
47 #[serde(default)]
49 pub snapshot: SnapshotConfig,
50
51 #[serde(default)]
54 pub persistence: PersistenceConfig,
55
56 #[serde(default = "default_learner_catchup_threshold")]
60 pub learner_catchup_threshold: u64,
61
62 #[serde(default = "default_learner_check_throttle_ms")]
66 pub learner_check_throttle_ms: u64,
67
68 #[serde(default = "default_general_timeout")]
72 pub general_raft_timeout_duration_in_ms: u64,
73
74 #[serde(default = "default_snapshot_rpc_timeout_ms")]
76 pub snapshot_rpc_timeout_ms: u64,
77
78 #[serde(default)]
80 pub auto_join: AutoJoinConfig,
81
82 #[serde(default)]
85 pub read_consistency: ReadConsistencyConfig,
86
87 #[serde(default)]
90 pub backpressure: BackpressureConfig,
91
92 #[serde(default)]
98 pub rpc_compression: RpcCompressionConfig,
99
100 #[serde(default)]
103 pub watch: WatchConfig,
104
105 #[serde(default)]
108 pub metrics: MetricsConfig,
109}
110
111impl Debug for RaftConfig {
112 fn fmt(
113 &self,
114 f: &mut std::fmt::Formatter<'_>,
115 ) -> std::fmt::Result {
116 f.debug_struct("RaftConfig").finish()
117 }
118}
119impl Default for RaftConfig {
120 fn default() -> Self {
121 Self {
122 replication: ReplicationConfig::default(),
123 batching: BatchingConfig::default(),
124 election: ElectionConfig::default(),
125 membership: MembershipConfig::default(),
126 state_machine: StateMachineConfig::default(),
127 snapshot: SnapshotConfig::default(),
128 persistence: PersistenceConfig::default(),
129 learner_catchup_threshold: default_learner_catchup_threshold(),
130 learner_check_throttle_ms: default_learner_check_throttle_ms(),
131 general_raft_timeout_duration_in_ms: default_general_timeout(),
132 auto_join: AutoJoinConfig::default(),
133 snapshot_rpc_timeout_ms: default_snapshot_rpc_timeout_ms(),
134 read_consistency: ReadConsistencyConfig::default(),
135 backpressure: BackpressureConfig::default(),
136 rpc_compression: RpcCompressionConfig::default(),
137 watch: WatchConfig::default(),
138 metrics: MetricsConfig::default(),
139 }
140 }
141}
142impl RaftConfig {
143 pub fn validate(&self) -> Result<()> {
145 if self.learner_catchup_threshold == 0 {
146 return Err(Error::Config(ConfigError::Message(
147 "learner_catchup_threshold must be greater than 0".into(),
148 )));
149 }
150
151 if self.general_raft_timeout_duration_in_ms < 1 {
152 return Err(Error::Config(ConfigError::Message(
153 "general_raft_timeout_duration_in_ms must be at least 1ms".into(),
154 )));
155 }
156
157 self.replication.validate()?;
158 self.batching.validate()?;
159 self.election.validate()?;
160 self.membership.validate()?;
161 self.state_machine.validate()?;
162 self.snapshot.validate()?;
163 self.read_consistency.validate()?;
164 self.watch.validate()?;
165
166 if self.read_consistency.lease_duration_ms > self.election.election_timeout_min / 2 {
168 warn!(
169 "read_consistency.lease_duration_ms ({}) is greater than half of election_timeout_min ({}ms). \
170 This may cause lease expiration during normal operation.",
171 self.read_consistency.lease_duration_ms,
172 self.election.election_timeout_min / 2
173 );
174 }
175
176 Ok(())
177 }
178}
179
180fn default_learner_catchup_threshold() -> u64 {
181 1
182}
183
184fn default_learner_check_throttle_ms() -> u64 {
185 1000 }
187
188fn default_general_timeout() -> u64 {
190 50
191}
192fn default_snapshot_rpc_timeout_ms() -> u64 {
193 3_600_000
195}
196#[derive(Debug, Serialize, Deserialize, Clone)]
197pub struct ReplicationConfig {
198 #[serde(default = "default_append_interval")]
200 pub rpc_append_entries_clock_in_ms: u64,
201
202 #[serde(default = "default_entries_per_replication")]
204 pub append_entries_max_entries_per_replication: u64,
205}
206
207impl Default for ReplicationConfig {
208 fn default() -> Self {
209 Self {
210 rpc_append_entries_clock_in_ms: default_append_interval(),
211 append_entries_max_entries_per_replication: default_entries_per_replication(),
212 }
213 }
214}
215impl ReplicationConfig {
216 fn validate(&self) -> Result<()> {
217 if self.rpc_append_entries_clock_in_ms == 0 {
218 return Err(Error::Config(ConfigError::Message(
219 "rpc_append_entries_clock_in_ms cannot be 0".into(),
220 )));
221 }
222
223 if self.append_entries_max_entries_per_replication == 0 {
224 return Err(Error::Config(ConfigError::Message(
225 "append_entries_max_entries_per_replication must be > 0".into(),
226 )));
227 }
228
229 Ok(())
230 }
231}
232
233#[derive(Debug, Serialize, Deserialize, Clone)]
249pub struct BatchingConfig {
250 #[serde(default = "default_max_batch_size")]
254 pub max_batch_size: usize,
255}
256
257impl Default for BatchingConfig {
258 fn default() -> Self {
259 Self {
260 max_batch_size: default_max_batch_size(),
261 }
262 }
263}
264
265impl BatchingConfig {
266 fn validate(&self) -> Result<()> {
267 if self.max_batch_size == 0 {
268 return Err(Error::Config(ConfigError::Message(
269 "batching.max_batch_size must be > 0".into(),
270 )));
271 }
272 Ok(())
273 }
274}
275
276fn default_append_interval() -> u64 {
277 100
278}
279fn default_max_batch_size() -> usize {
280 100
281}
282fn default_entries_per_replication() -> u64 {
283 100
284}
285#[derive(Debug, Serialize, Deserialize, Clone)]
286pub struct ElectionConfig {
287 #[serde(default = "default_election_timeout_min")]
288 pub election_timeout_min: u64,
289
290 #[serde(default = "default_election_timeout_max")]
291 pub election_timeout_max: u64,
292
293 #[serde(default = "default_peer_monitor_interval")]
294 pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
295
296 #[serde(default = "default_client_request_id")]
297 pub internal_rpc_client_request_id: u32,
298}
299
300impl Default for ElectionConfig {
301 fn default() -> Self {
302 Self {
303 election_timeout_min: default_election_timeout_min(),
304 election_timeout_max: default_election_timeout_max(),
305 rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
306 internal_rpc_client_request_id: default_client_request_id(),
307 }
308 }
309}
310impl ElectionConfig {
311 fn validate(&self) -> Result<()> {
312 if self.election_timeout_min >= self.election_timeout_max {
313 return Err(Error::Config(ConfigError::Message(format!(
314 "election_timeout_min {}ms must be less than election_timeout_max {}ms",
315 self.election_timeout_min, self.election_timeout_max
316 ))));
317 }
318
319 if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
320 return Err(Error::Config(ConfigError::Message(
321 "rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
322 )));
323 }
324
325 Ok(())
326 }
327}
328fn default_election_timeout_min() -> u64 {
329 500
330}
331fn default_election_timeout_max() -> u64 {
332 1000
333}
334fn default_peer_monitor_interval() -> u64 {
335 30
336}
337fn default_client_request_id() -> u32 {
338 0
339}
340
341#[derive(Debug, Serialize, Deserialize, Clone)]
342pub struct MembershipConfig {
343 #[serde(default = "default_probe_service")]
344 pub cluster_healthcheck_probe_service_name: String,
345
346 #[serde(default = "default_verify_leadership_persistent_timeout")]
347 pub verify_leadership_persistent_timeout: Duration,
348
349 #[serde(default = "default_membership_maintenance_interval")]
350 pub membership_maintenance_interval: Duration,
351
352 #[serde(default)]
353 pub zombie: ZombieConfig,
354
355 #[serde(default)]
357 pub promotion: PromotionConfig,
358}
359impl Default for MembershipConfig {
360 fn default() -> Self {
361 Self {
362 cluster_healthcheck_probe_service_name: default_probe_service(),
363 verify_leadership_persistent_timeout: default_verify_leadership_persistent_timeout(),
364 membership_maintenance_interval: default_membership_maintenance_interval(),
365 zombie: ZombieConfig::default(),
366 promotion: PromotionConfig::default(),
367 }
368 }
369}
370fn default_probe_service() -> String {
371 "d_engine.server.cluster.ClusterManagementService".to_string()
372}
373
374fn default_membership_maintenance_interval() -> Duration {
376 Duration::from_secs(30)
377}
378
379fn default_verify_leadership_persistent_timeout() -> Duration {
386 Duration::from_secs(3600)
387}
388
389impl MembershipConfig {
390 fn validate(&self) -> Result<()> {
391 if self.cluster_healthcheck_probe_service_name.is_empty() {
392 return Err(Error::Config(ConfigError::Message(
393 "cluster_healthcheck_probe_service_name cannot be empty".into(),
394 )));
395 }
396 Ok(())
397 }
398}
399
400#[derive(Serialize, Deserialize, Clone, Debug)]
406#[derive(Default)]
407pub struct StateMachineConfig {
408 #[serde(alias = "ttl")]
412 pub lease: LeaseConfig,
413}
414
415impl StateMachineConfig {
416 pub fn validate(&self) -> Result<()> {
417 self.lease.validate()?;
418 Ok(())
419 }
420}
421
422#[derive(Debug, Serialize, Deserialize, Clone)]
424pub struct SnapshotConfig {
425 #[serde(default = "default_snapshot_enabled")]
427 pub enable: bool,
428
429 #[serde(default = "default_max_log_entries_before_snapshot")]
432 pub max_log_entries_before_snapshot: u64,
433
434 #[serde(default = "default_snapshot_cool_down_since_last_check")]
437 pub snapshot_cool_down_since_last_check: Duration,
438
439 #[serde(default = "default_cleanup_retain_count")]
442 pub cleanup_retain_count: u64,
443
444 #[serde(default = "default_snapshots_dir")]
448 pub snapshots_dir: PathBuf,
449
450 #[serde(default = "default_snapshots_dir_prefix")]
451 pub snapshots_dir_prefix: String,
452
453 #[serde(default = "default_chunk_size")]
457 pub chunk_size: usize,
458
459 #[serde(default = "default_retained_log_entries")]
461 pub retained_log_entries: u64,
462
463 #[serde(default = "default_sender_yield_every_n_chunks")]
465 pub sender_yield_every_n_chunks: usize,
466
467 #[serde(default = "default_receiver_yield_every_n_chunks")]
469 pub receiver_yield_every_n_chunks: usize,
470
471 #[serde(default = "default_max_bandwidth_mbps")]
472 pub max_bandwidth_mbps: u32,
473
474 #[serde(default = "default_push_queue_size")]
475 pub push_queue_size: usize,
476
477 #[serde(default = "default_cache_size")]
478 pub cache_size: usize,
479
480 #[serde(default = "default_max_retries")]
481 pub max_retries: u32,
482
483 #[serde(default = "default_transfer_timeout_in_sec")]
484 pub transfer_timeout_in_sec: u64,
485
486 #[serde(default = "default_retry_interval_in_ms")]
487 pub retry_interval_in_ms: u64,
488
489 #[serde(default = "default_snapshot_push_backoff_in_ms")]
490 pub snapshot_push_backoff_in_ms: u64,
491
492 #[serde(default = "default_snapshot_push_max_retry")]
493 pub snapshot_push_max_retry: u32,
494
495 #[serde(default = "default_push_timeout_in_ms")]
496 pub push_timeout_in_ms: u64,
497}
498impl Default for SnapshotConfig {
499 fn default() -> Self {
500 Self {
501 max_log_entries_before_snapshot: default_max_log_entries_before_snapshot(),
502 snapshot_cool_down_since_last_check: default_snapshot_cool_down_since_last_check(),
503 cleanup_retain_count: default_cleanup_retain_count(),
504 snapshots_dir: default_snapshots_dir(),
505 snapshots_dir_prefix: default_snapshots_dir_prefix(),
506 chunk_size: default_chunk_size(),
507 retained_log_entries: default_retained_log_entries(),
508 sender_yield_every_n_chunks: default_sender_yield_every_n_chunks(),
509 receiver_yield_every_n_chunks: default_receiver_yield_every_n_chunks(),
510 max_bandwidth_mbps: default_max_bandwidth_mbps(),
511 push_queue_size: default_push_queue_size(),
512 cache_size: default_cache_size(),
513 max_retries: default_max_retries(),
514 transfer_timeout_in_sec: default_transfer_timeout_in_sec(),
515 retry_interval_in_ms: default_retry_interval_in_ms(),
516 snapshot_push_backoff_in_ms: default_snapshot_push_backoff_in_ms(),
517 snapshot_push_max_retry: default_snapshot_push_max_retry(),
518 push_timeout_in_ms: default_push_timeout_in_ms(),
519 enable: default_snapshot_enabled(),
520 }
521 }
522}
523impl SnapshotConfig {
524 fn validate(&self) -> Result<()> {
525 if self.max_log_entries_before_snapshot == 0 {
526 return Err(Error::Config(ConfigError::Message(
527 "max_log_entries_before_snapshot must be greater than 0".into(),
528 )));
529 }
530
531 if self.cleanup_retain_count == 0 {
532 return Err(Error::Config(ConfigError::Message(
533 "cleanup_retain_count must be greater than 0".into(),
534 )));
535 }
536 validate_directory(&self.snapshots_dir, "snapshots_dir")?;
538
539 if self.chunk_size == 0 {
541 return Err(Error::Config(ConfigError::Message(format!(
542 "chunk_size must be at least {} bytes (got {})",
543 0, self.chunk_size
544 ))));
545 }
546
547 if self.retained_log_entries < 1 {
548 return Err(Error::Config(ConfigError::Message(format!(
549 "retained_log_entries must be >= 1, (got {})",
550 self.retained_log_entries
551 ))));
552 }
553
554 if self.sender_yield_every_n_chunks < 1 {
555 return Err(Error::Config(ConfigError::Message(format!(
556 "sender_yield_every_n_chunks must be >= 1, (got {})",
557 self.sender_yield_every_n_chunks
558 ))));
559 }
560
561 if self.receiver_yield_every_n_chunks < 1 {
562 return Err(Error::Config(ConfigError::Message(format!(
563 "receiver_yield_every_n_chunks must be >= 1, (got {})",
564 self.receiver_yield_every_n_chunks
565 ))));
566 }
567
568 if self.push_queue_size < 1 {
569 return Err(Error::Config(ConfigError::Message(format!(
570 "push_queue_size must be >= 1, (got {})",
571 self.push_queue_size
572 ))));
573 }
574
575 if self.snapshot_push_max_retry < 1 {
576 return Err(Error::Config(ConfigError::Message(format!(
577 "snapshot_push_max_retry must be >= 1, (got {})",
578 self.snapshot_push_max_retry
579 ))));
580 }
581
582 Ok(())
583 }
584}
585
586fn default_snapshot_enabled() -> bool {
587 true
588}
589
590fn default_max_log_entries_before_snapshot() -> u64 {
592 1000
593}
594
595fn default_snapshot_cool_down_since_last_check() -> Duration {
600 Duration::from_secs(3600)
601}
602
603fn default_cleanup_retain_count() -> u64 {
605 2
606}
607fn default_snapshots_dir() -> PathBuf {
609 PathBuf::from("/tmp/snapshots")
610}
611fn default_snapshots_dir_prefix() -> String {
613 "snapshot-".to_string()
614}
615
616fn default_chunk_size() -> usize {
618 1024
619}
620
621#[derive(Debug, Serialize, Deserialize, Clone)]
622pub struct AutoJoinConfig {
623 #[serde(default = "default_rpc_enable_compression")]
624 pub rpc_enable_compression: bool,
625}
626impl Default for AutoJoinConfig {
627 fn default() -> Self {
628 Self {
629 rpc_enable_compression: default_rpc_enable_compression(),
630 }
631 }
632}
633fn default_rpc_enable_compression() -> bool {
634 true
635}
636
637fn default_retained_log_entries() -> u64 {
638 1
639}
640
641fn default_sender_yield_every_n_chunks() -> usize {
642 1
643}
644
645fn default_receiver_yield_every_n_chunks() -> usize {
646 1
647}
648
649fn default_max_bandwidth_mbps() -> u32 {
650 1
651}
652
653fn default_push_queue_size() -> usize {
654 100
655}
656
657fn default_cache_size() -> usize {
658 10000
659}
660fn default_max_retries() -> u32 {
661 1
662}
663fn default_transfer_timeout_in_sec() -> u64 {
664 600
665}
666fn default_retry_interval_in_ms() -> u64 {
667 10
668}
669fn default_snapshot_push_backoff_in_ms() -> u64 {
670 100
671}
672fn default_snapshot_push_max_retry() -> u32 {
673 3
674}
675fn default_push_timeout_in_ms() -> u64 {
676 300_000
677}
678
679#[derive(Debug, Serialize, Deserialize, Clone)]
680pub struct ZombieConfig {
681 #[serde(default = "default_zombie_threshold")]
683 pub threshold: u32,
684
685 #[serde(default = "default_zombie_purge_interval")]
686 pub purge_interval: Duration,
687}
688
689impl Default for ZombieConfig {
690 fn default() -> Self {
691 Self {
692 threshold: default_zombie_threshold(),
693 purge_interval: default_zombie_purge_interval(),
694 }
695 }
696}
697
698fn default_zombie_threshold() -> u32 {
699 3
700}
701fn default_zombie_purge_interval() -> Duration {
703 Duration::from_secs(30)
704}
705
706#[derive(Debug, Serialize, Deserialize, Clone)]
707pub struct PromotionConfig {
708 #[serde(default = "default_stale_learner_threshold")]
709 pub stale_learner_threshold: Duration,
710 #[serde(default = "default_stale_check_interval")]
711 pub stale_check_interval: Duration,
712}
713
714impl Default for PromotionConfig {
715 fn default() -> Self {
716 Self {
717 stale_learner_threshold: default_stale_learner_threshold(),
718 stale_check_interval: default_stale_check_interval(),
719 }
720 }
721}
722
723fn default_stale_learner_threshold() -> Duration {
725 Duration::from_secs(300)
726}
727fn default_stale_check_interval() -> Duration {
729 Duration::from_secs(30)
730}
731#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
739pub enum PersistenceStrategy {
740 DiskFirst,
753
754 MemFirst,
768}
769
770#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
772pub enum FlushPolicy {
773 Immediate,
778
779 Batch { threshold: usize, interval_ms: u64 },
786}
787
788#[derive(Serialize, Deserialize, Clone, Debug)]
790pub struct PersistenceConfig {
791 #[serde(default = "default_persistence_strategy")]
797 pub strategy: PersistenceStrategy,
798
799 #[serde(default = "default_flush_policy")]
804 pub flush_policy: FlushPolicy,
805
806 #[serde(default = "default_max_buffered_entries")]
811 pub max_buffered_entries: usize,
812
813 #[serde(default = "default_flush_workers")]
823 pub flush_workers: usize,
824
825 #[serde(default = "default_channel_capacity")]
832 pub channel_capacity: usize,
833}
834
835fn default_persistence_strategy() -> PersistenceStrategy {
837 PersistenceStrategy::DiskFirst
838}
839
840fn default_flush_workers() -> usize {
842 2
843}
844
845fn default_channel_capacity() -> usize {
847 100
848}
849
850fn default_flush_policy() -> FlushPolicy {
855 FlushPolicy::Batch {
856 threshold: 1024,
857 interval_ms: 100,
858 }
859}
860
861fn default_max_buffered_entries() -> usize {
863 10_000
864}
865
866impl Default for PersistenceConfig {
867 fn default() -> Self {
868 Self {
869 strategy: default_persistence_strategy(),
870 flush_policy: default_flush_policy(),
871 max_buffered_entries: default_max_buffered_entries(),
872 flush_workers: default_flush_workers(),
873 channel_capacity: default_channel_capacity(),
874 }
875 }
876}
877
878#[derive(Debug, Clone, Serialize, Deserialize)]
900pub struct BackpressureConfig {
901 #[serde(default = "default_max_pending_writes")]
909 pub max_pending_writes: usize,
910
911 #[serde(default = "default_max_pending_reads")]
919 pub max_pending_reads: usize,
920}
921
922impl Default for BackpressureConfig {
923 fn default() -> Self {
924 Self {
925 max_pending_writes: default_max_pending_writes(),
926 max_pending_reads: default_max_pending_reads(),
927 }
928 }
929}
930
931fn default_max_pending_writes() -> usize {
932 10_000
933}
934
935fn default_max_pending_reads() -> usize {
936 50_000
937}
938
939impl BackpressureConfig {
940 pub fn should_reject_write(
945 &self,
946 current_pending: usize,
947 ) -> bool {
948 self.max_pending_writes > 0 && current_pending >= self.max_pending_writes
949 }
950
951 pub fn should_reject_read(
956 &self,
957 current_pending: usize,
958 ) -> bool {
959 self.max_pending_reads > 0 && current_pending >= self.max_pending_reads
960 }
961}
962
963#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
968pub enum ReadConsistencyPolicy {
969 LeaseRead,
976
977 #[default]
983 LinearizableRead,
984
985 EventualConsistency,
993}
994
995#[derive(Clone, Debug, Serialize, Deserialize)]
997pub struct ReadConsistencyConfig {
998 #[serde(default)]
1003 pub default_policy: ReadConsistencyPolicy,
1004
1005 #[serde(default = "default_lease_duration_ms")]
1010 pub lease_duration_ms: u64,
1011
1012 #[serde(default = "default_allow_client_override")]
1017 pub allow_client_override: bool,
1018
1019 #[serde(default = "default_state_machine_sync_timeout_ms")]
1025 pub state_machine_sync_timeout_ms: u64,
1026}
1027
1028impl Default for ReadConsistencyConfig {
1029 fn default() -> Self {
1030 Self {
1031 default_policy: ReadConsistencyPolicy::default(),
1032 lease_duration_ms: default_lease_duration_ms(),
1033 allow_client_override: default_allow_client_override(),
1034 state_machine_sync_timeout_ms: default_state_machine_sync_timeout_ms(),
1035 }
1036 }
1037}
1038
1039fn default_lease_duration_ms() -> u64 {
1040 250
1042}
1043
1044fn default_allow_client_override() -> bool {
1045 true
1047}
1048
1049fn default_state_machine_sync_timeout_ms() -> u64 {
1050 10 }
1052
1053impl ReadConsistencyConfig {
1054 fn validate(&self) -> Result<()> {
1055 if self.lease_duration_ms == 0 {
1057 return Err(Error::Config(ConfigError::Message(
1058 "read_consistency.lease_duration_ms must be greater than 0".into(),
1059 )));
1060 }
1061 Ok(())
1062 }
1063}
1064
1065impl From<d_engine_proto::client::ReadConsistencyPolicy> for ReadConsistencyPolicy {
1066 fn from(proto_policy: d_engine_proto::client::ReadConsistencyPolicy) -> Self {
1067 match proto_policy {
1068 d_engine_proto::client::ReadConsistencyPolicy::LeaseRead => Self::LeaseRead,
1069 d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead => {
1070 Self::LinearizableRead
1071 }
1072 d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency => {
1073 Self::EventualConsistency
1074 }
1075 }
1076 }
1077}
1078
1079impl From<ReadConsistencyPolicy> for d_engine_proto::client::ReadConsistencyPolicy {
1080 fn from(config_policy: ReadConsistencyPolicy) -> Self {
1081 match config_policy {
1082 ReadConsistencyPolicy::LeaseRead => {
1083 d_engine_proto::client::ReadConsistencyPolicy::LeaseRead
1084 }
1085 ReadConsistencyPolicy::LinearizableRead => {
1086 d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead
1087 }
1088 ReadConsistencyPolicy::EventualConsistency => {
1089 d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency
1090 }
1091 }
1092 }
1093}
1094
1095#[derive(Debug, Clone, Serialize, Deserialize)]
1102pub struct RpcCompressionConfig {
1103 #[serde(default = "default_replication_compression")]
1111 pub replication_response: bool,
1112
1113 #[serde(default = "default_election_compression")]
1120 pub election_response: bool,
1121
1122 #[serde(default = "default_snapshot_compression")]
1129 pub snapshot_response: bool,
1130
1131 #[serde(default = "default_cluster_compression")]
1138 pub cluster_response: bool,
1139
1140 #[serde(default = "default_client_compression")]
1147 pub client_response: bool,
1148}
1149
1150impl Default for RpcCompressionConfig {
1151 fn default() -> Self {
1152 Self {
1153 replication_response: default_replication_compression(),
1154 election_response: default_election_compression(),
1155 snapshot_response: default_snapshot_compression(),
1156 cluster_response: default_cluster_compression(),
1157 client_response: default_client_compression(),
1158 }
1159 }
1160}
1161
1162fn default_replication_compression() -> bool {
1164 false
1167}
1168
1169fn default_election_compression() -> bool {
1170 true
1172}
1173
1174fn default_snapshot_compression() -> bool {
1175 true
1177}
1178
1179fn default_cluster_compression() -> bool {
1180 true
1182}
1183
1184fn default_client_compression() -> bool {
1185 false
1187}
1188
1189#[derive(Debug, Serialize, Deserialize, Clone)]
1210pub struct WatchConfig {
1211 #[serde(default = "default_event_queue_size")]
1228 pub event_queue_size: usize,
1229
1230 #[serde(default = "default_watcher_buffer_size")]
1247 pub watcher_buffer_size: usize,
1248
1249 #[serde(default = "default_enable_watch_metrics")]
1257 pub enable_metrics: bool,
1258}
1259
1260impl Default for WatchConfig {
1261 fn default() -> Self {
1262 Self {
1263 event_queue_size: default_event_queue_size(),
1264 watcher_buffer_size: default_watcher_buffer_size(),
1265 enable_metrics: default_enable_watch_metrics(),
1266 }
1267 }
1268}
1269
1270impl WatchConfig {
1271 pub fn validate(&self) -> Result<()> {
1273 if self.event_queue_size == 0 {
1274 return Err(Error::Config(ConfigError::Message(
1275 "watch.event_queue_size must be greater than 0".into(),
1276 )));
1277 }
1278
1279 if self.event_queue_size > 100_000 {
1280 warn!(
1281 "watch.event_queue_size ({}) is very large and may consume significant memory (~{}MB)",
1282 self.event_queue_size,
1283 (self.event_queue_size * 24) / 1_000_000
1284 );
1285 }
1286
1287 if self.watcher_buffer_size == 0 {
1288 return Err(Error::Config(ConfigError::Message(
1289 "watch.watcher_buffer_size must be greater than 0".into(),
1290 )));
1291 }
1292
1293 if self.watcher_buffer_size > 1000 {
1294 warn!(
1295 "watch.watcher_buffer_size ({}) is very large. Each watcher will consume ~{}KB memory",
1296 self.watcher_buffer_size,
1297 (self.watcher_buffer_size * 240) / 1000
1298 );
1299 }
1300
1301 Ok(())
1302 }
1303}
1304
1305const fn default_event_queue_size() -> usize {
1306 1000
1307}
1308
1309const fn default_watcher_buffer_size() -> usize {
1310 10
1311}
1312
1313const fn default_enable_watch_metrics() -> bool {
1314 false
1315}
1316
1317#[derive(Debug, Serialize, Deserialize, Clone)]
1330pub struct MetricsConfig {
1331 #[serde(default = "default_enable_backpressure_metrics")]
1338 pub enable_backpressure: bool,
1339
1340 #[serde(default = "default_enable_batch_metrics")]
1347 pub enable_batch: bool,
1348
1349 #[serde(default = "default_metrics_sample_rate")]
1360 pub sample_rate: u32,
1361}
1362
1363impl Default for MetricsConfig {
1364 fn default() -> Self {
1365 Self {
1366 enable_backpressure: default_enable_backpressure_metrics(),
1367 enable_batch: default_enable_batch_metrics(),
1368 sample_rate: default_metrics_sample_rate(),
1369 }
1370 }
1371}
1372
1373fn default_enable_backpressure_metrics() -> bool {
1374 false
1375}
1376
1377fn default_enable_batch_metrics() -> bool {
1378 false
1379}
1380
1381fn default_metrics_sample_rate() -> u32 {
1382 1 }