d_engine_core/config/
raft.rs

1use std::fmt::Debug;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use config::ConfigError;
6use serde::Deserialize;
7use serde::Serialize;
8use tracing::warn;
9
10use super::lease::LeaseConfig;
11use super::validate_directory;
12use crate::Error;
13use crate::Result;
14
15/// Configuration parameters for the Raft consensus algorithm implementation
16#[derive(Serialize, Deserialize, Clone)]
17pub struct RaftConfig {
18    /// Configuration settings related to log replication
19    /// Includes parameters like replication batch size and network retry behavior
20    #[serde(default)]
21    pub replication: ReplicationConfig,
22
23    /// Configuration settings for leader election mechanism
24    /// Controls timeouts and randomization factors for election timing
25    #[serde(default)]
26    pub election: ElectionConfig,
27
28    /// Configuration settings for cluster membership changes
29    /// Handles joint consensus transitions and cluster reconfiguration rules
30    #[serde(default)]
31    pub membership: MembershipConfig,
32
33    /// Configuration settings for commit application handling
34    /// Controls how committed log entries are applied to the state machine
35    #[serde(default)]
36    pub commit_handler: CommitHandlerConfig,
37
38    /// Configuration settings for state machine behavior
39    /// Controls state machine operations like lease management, compaction, etc.
40    /// For backward compatibility, can also be configured via `storage` in TOML files.
41    #[serde(default, alias = "storage")]
42    pub state_machine: StateMachineConfig,
43
44    /// Configuration settings for snapshot feature
45    #[serde(default)]
46    pub snapshot: SnapshotConfig,
47
48    /// Configuration settings for log persistence behavior
49    /// Controls how and when log entries are persisted to stable storage
50    #[serde(default)]
51    pub persistence: PersistenceConfig,
52
53    /// Maximum allowed log entry gap between leader and learner nodes
54    /// Learners with larger gaps than this value will trigger catch-up replication
55    /// Default value is set via default_learner_catchup_threshold() function
56    #[serde(default = "default_learner_catchup_threshold")]
57    pub learner_catchup_threshold: u64,
58
59    /// Throttle interval (milliseconds) for learner progress checks
60    /// Prevents excessive checking of learner promotion eligibility
61    /// Default value is set via default_learner_check_throttle_ms() function
62    #[serde(default = "default_learner_check_throttle_ms")]
63    pub learner_check_throttle_ms: u64,
64
65    /// Base timeout duration (in milliseconds) for general Raft operations
66    /// Used as fallback timeout when operation-specific timeouts are not set
67    /// Default value is set via default_general_timeout() function
68    #[serde(default = "default_general_timeout")]
69    pub general_raft_timeout_duration_in_ms: u64,
70
71    /// Timeout for snapshot RPC operations (milliseconds)
72    #[serde(default = "default_snapshot_rpc_timeout_ms")]
73    pub snapshot_rpc_timeout_ms: u64,
74
75    /// Configuration settings for new node auto join feature
76    #[serde(default)]
77    pub auto_join: AutoJoinConfig,
78
79    /// Configuration for read operation consistency behavior
80    /// Controls the trade-off between read performance and consistency guarantees
81    #[serde(default)]
82    pub read_consistency: ReadConsistencyConfig,
83
84    /// RPC compression configuration for different service types
85    ///
86    /// Controls which RPC service types use response compression.
87    /// Allows fine-tuning for performance optimization based on
88    /// deployment environment and traffic patterns.
89    #[serde(default)]
90    pub rpc_compression: RpcCompressionConfig,
91
92    /// Configuration for Watch mechanism that monitors key changes
93    /// Controls event queue sizes and metrics behavior
94    #[serde(default)]
95    pub watch: WatchConfig,
96}
97
98impl Debug for RaftConfig {
99    fn fmt(
100        &self,
101        f: &mut std::fmt::Formatter<'_>,
102    ) -> std::fmt::Result {
103        f.debug_struct("RaftConfig").finish()
104    }
105}
106impl Default for RaftConfig {
107    fn default() -> Self {
108        Self {
109            replication: ReplicationConfig::default(),
110            election: ElectionConfig::default(),
111            membership: MembershipConfig::default(),
112            commit_handler: CommitHandlerConfig::default(),
113            state_machine: StateMachineConfig::default(),
114            snapshot: SnapshotConfig::default(),
115            persistence: PersistenceConfig::default(),
116            learner_catchup_threshold: default_learner_catchup_threshold(),
117            learner_check_throttle_ms: default_learner_check_throttle_ms(),
118            general_raft_timeout_duration_in_ms: default_general_timeout(),
119            auto_join: AutoJoinConfig::default(),
120            snapshot_rpc_timeout_ms: default_snapshot_rpc_timeout_ms(),
121            read_consistency: ReadConsistencyConfig::default(),
122            rpc_compression: RpcCompressionConfig::default(),
123            watch: WatchConfig::default(),
124        }
125    }
126}
127impl RaftConfig {
128    /// Validates all Raft subsystem configurations
129    pub fn validate(&self) -> Result<()> {
130        if self.learner_catchup_threshold == 0 {
131            return Err(Error::Config(ConfigError::Message(
132                "learner_catchup_threshold must be greater than 0".into(),
133            )));
134        }
135
136        if self.general_raft_timeout_duration_in_ms < 1 {
137            return Err(Error::Config(ConfigError::Message(
138                "general_raft_timeout_duration_in_ms must be at least 1ms".into(),
139            )));
140        }
141
142        self.replication.validate()?;
143        self.election.validate()?;
144        self.membership.validate()?;
145        self.commit_handler.validate()?;
146        self.state_machine.validate()?;
147        self.snapshot.validate()?;
148        self.read_consistency.validate()?;
149        self.watch.validate()?;
150
151        // Warn if lease duration is too long compared to election timeout
152        if self.read_consistency.lease_duration_ms > self.election.election_timeout_min / 2 {
153            warn!(
154                "read_consistency.lease_duration_ms ({}) is greater than half of election_timeout_min ({}ms). \
155                     This may cause lease expiration during normal operation.",
156                self.read_consistency.lease_duration_ms,
157                self.election.election_timeout_min / 2
158            );
159        }
160
161        Ok(())
162    }
163}
164
165fn default_learner_catchup_threshold() -> u64 {
166    1
167}
168
169fn default_learner_check_throttle_ms() -> u64 {
170    1000 // 1 second
171}
172
173// in ms
174fn default_general_timeout() -> u64 {
175    50
176}
177fn default_snapshot_rpc_timeout_ms() -> u64 {
178    // 1 hour - sufficient for large snapshots
179    3_600_000
180}
181#[derive(Debug, Serialize, Deserialize, Clone)]
182pub struct ReplicationConfig {
183    #[serde(default = "default_append_interval")]
184    pub rpc_append_entries_clock_in_ms: u64,
185
186    #[serde(default = "default_batch_threshold")]
187    pub rpc_append_entries_in_batch_threshold: usize,
188
189    #[serde(default = "default_batch_delay")]
190    pub rpc_append_entries_batch_process_delay_in_ms: u64,
191
192    #[serde(default = "default_entries_per_replication")]
193    pub append_entries_max_entries_per_replication: u64,
194}
195
196impl Default for ReplicationConfig {
197    fn default() -> Self {
198        Self {
199            rpc_append_entries_clock_in_ms: default_append_interval(),
200            rpc_append_entries_in_batch_threshold: default_batch_threshold(),
201            rpc_append_entries_batch_process_delay_in_ms: default_batch_delay(),
202            append_entries_max_entries_per_replication: default_entries_per_replication(),
203        }
204    }
205}
206impl ReplicationConfig {
207    fn validate(&self) -> Result<()> {
208        if self.rpc_append_entries_clock_in_ms == 0 {
209            return Err(Error::Config(ConfigError::Message(
210                "rpc_append_entries_clock_in_ms cannot be 0".into(),
211            )));
212        }
213
214        if self.rpc_append_entries_in_batch_threshold == 0 {
215            return Err(Error::Config(ConfigError::Message(
216                "rpc_append_entries_in_batch_threshold must be > 0".into(),
217            )));
218        }
219
220        if self.append_entries_max_entries_per_replication == 0 {
221            return Err(Error::Config(ConfigError::Message(
222                "append_entries_max_entries_per_replication must be > 0".into(),
223            )));
224        }
225
226        if self.rpc_append_entries_batch_process_delay_in_ms >= self.rpc_append_entries_clock_in_ms
227        {
228            return Err(Error::Config(ConfigError::Message(format!(
229                "batch_delay {}ms should be less than append_interval {}ms",
230                self.rpc_append_entries_batch_process_delay_in_ms,
231                self.rpc_append_entries_clock_in_ms
232            ))));
233        }
234
235        Ok(())
236    }
237}
238fn default_append_interval() -> u64 {
239    100
240}
241fn default_batch_threshold() -> usize {
242    100
243}
244fn default_batch_delay() -> u64 {
245    1
246}
247fn default_entries_per_replication() -> u64 {
248    100
249}
250#[derive(Debug, Serialize, Deserialize, Clone)]
251pub struct ElectionConfig {
252    #[serde(default = "default_election_timeout_min")]
253    pub election_timeout_min: u64,
254
255    #[serde(default = "default_election_timeout_max")]
256    pub election_timeout_max: u64,
257
258    #[serde(default = "default_peer_monitor_interval")]
259    pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
260
261    #[serde(default = "default_client_request_id")]
262    pub internal_rpc_client_request_id: u32,
263}
264
265impl Default for ElectionConfig {
266    fn default() -> Self {
267        Self {
268            election_timeout_min: default_election_timeout_min(),
269            election_timeout_max: default_election_timeout_max(),
270            rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
271            internal_rpc_client_request_id: default_client_request_id(),
272        }
273    }
274}
275impl ElectionConfig {
276    fn validate(&self) -> Result<()> {
277        if self.election_timeout_min >= self.election_timeout_max {
278            return Err(Error::Config(ConfigError::Message(format!(
279                "election_timeout_min {}ms must be less than election_timeout_max {}ms",
280                self.election_timeout_min, self.election_timeout_max
281            ))));
282        }
283
284        if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
285            return Err(Error::Config(ConfigError::Message(
286                "rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
287            )));
288        }
289
290        Ok(())
291    }
292}
293fn default_election_timeout_min() -> u64 {
294    500
295}
296fn default_election_timeout_max() -> u64 {
297    1000
298}
299fn default_peer_monitor_interval() -> u64 {
300    30
301}
302fn default_client_request_id() -> u32 {
303    0
304}
305
306#[derive(Debug, Serialize, Deserialize, Clone)]
307pub struct MembershipConfig {
308    #[serde(default = "default_probe_service")]
309    pub cluster_healthcheck_probe_service_name: String,
310
311    #[serde(default = "default_verify_leadership_persistent_timeout")]
312    pub verify_leadership_persistent_timeout: Duration,
313
314    #[serde(default = "default_membership_maintenance_interval")]
315    pub membership_maintenance_interval: Duration,
316
317    #[serde(default)]
318    pub zombie: ZombieConfig,
319
320    /// Configuration settings for ready learners promotion
321    #[serde(default)]
322    pub promotion: PromotionConfig,
323}
324impl Default for MembershipConfig {
325    fn default() -> Self {
326        Self {
327            cluster_healthcheck_probe_service_name: default_probe_service(),
328            verify_leadership_persistent_timeout: default_verify_leadership_persistent_timeout(),
329            membership_maintenance_interval: default_membership_maintenance_interval(),
330            zombie: ZombieConfig::default(),
331            promotion: PromotionConfig::default(),
332        }
333    }
334}
335fn default_probe_service() -> String {
336    "d_engine.server.cluster.ClusterManagementService".to_string()
337}
338
339// 30 seconds
340fn default_membership_maintenance_interval() -> Duration {
341    Duration::from_secs(30)
342}
343
344/// Default timeout for leader to keep verifying its leadership.
345///
346/// In Raft, the leader may retry sending no-op entries to confirm it still holds leadership.
347/// This timeout defines how long the leader will keep retrying before stepping down.
348///
349/// Default: 1 hour.
350fn default_verify_leadership_persistent_timeout() -> Duration {
351    Duration::from_secs(3600)
352}
353
354impl MembershipConfig {
355    fn validate(&self) -> Result<()> {
356        if self.cluster_healthcheck_probe_service_name.is_empty() {
357            return Err(Error::Config(ConfigError::Message(
358                "cluster_healthcheck_probe_service_name cannot be empty".into(),
359            )));
360        }
361        Ok(())
362    }
363}
364
365/// Submit processor-specific configuration
366#[derive(Debug, Serialize, Deserialize, Clone)]
367pub struct CommitHandlerConfig {
368    #[serde(default = "default_batch_size_threshold")]
369    pub batch_size_threshold: u64,
370
371    #[serde(default = "default_process_interval_ms")]
372    pub process_interval_ms: u64,
373
374    #[serde(default = "default_max_entries_per_chunk")]
375    pub max_entries_per_chunk: usize,
376}
377impl Default for CommitHandlerConfig {
378    fn default() -> Self {
379        Self {
380            batch_size_threshold: default_batch_size_threshold(),
381            process_interval_ms: default_process_interval_ms(),
382            max_entries_per_chunk: default_max_entries_per_chunk(),
383        }
384    }
385}
386impl CommitHandlerConfig {
387    fn validate(&self) -> Result<()> {
388        if self.batch_size_threshold == 0 {
389            return Err(Error::Config(ConfigError::Message(
390                "batch_size_threshold must be > 0".into(),
391            )));
392        }
393
394        if self.process_interval_ms == 0 {
395            return Err(Error::Config(ConfigError::Message(
396                "process_interval_ms must be > 0".into(),
397            )));
398        }
399
400        if self.max_entries_per_chunk == 0 {
401            return Err(Error::Config(ConfigError::Message(
402                "max_entries_per_chunk must be > 0".into(),
403            )));
404        }
405
406        Ok(())
407    }
408}
409fn default_batch_size_threshold() -> u64 {
410    100
411}
412fn default_process_interval_ms() -> u64 {
413    10
414}
415fn default_max_entries_per_chunk() -> usize {
416    10
417}
418
419/// State machine behavior configuration
420///
421/// Controls state machine operations including lease management, compaction policies,
422/// and other data lifecycle features. This configuration affects how the state machine
423/// processes applied log entries and manages data.
424#[derive(Serialize, Deserialize, Clone, Debug)]
425#[derive(Default)]
426pub struct StateMachineConfig {
427    /// Lease (time-based expiration) configuration
428    ///
429    /// For backward compatibility, can also be configured via `ttl` in TOML files.
430    #[serde(alias = "ttl")]
431    pub lease: LeaseConfig,
432}
433
434impl StateMachineConfig {
435    pub fn validate(&self) -> Result<()> {
436        self.lease.validate()?;
437        Ok(())
438    }
439}
440
441/// Submit processor-specific configuration
442#[derive(Debug, Serialize, Deserialize, Clone)]
443pub struct SnapshotConfig {
444    /// If enable the snapshot or not
445    #[serde(default = "default_snapshot_enabled")]
446    pub enable: bool,
447
448    /// Maximum number of log entries to accumulate before triggering snapshot creation
449    /// This helps control memory usage by enforcing periodic state compaction
450    #[serde(default = "default_max_log_entries_before_snapshot")]
451    pub max_log_entries_before_snapshot: u64,
452
453    /// Minimum duration to wait between consecutive snapshot checks.
454    /// Acts as a cooldown period to avoid overly frequent snapshot evaluations.
455    #[serde(default = "default_snapshot_cool_down_since_last_check")]
456    pub snapshot_cool_down_since_last_check: Duration,
457
458    /// Number of historical snapshot versions to retain during cleanup
459    /// Ensures we maintain a safety buffer of previous states for recovery
460    #[serde(default = "default_cleanup_retain_count")]
461    pub cleanup_retain_count: u64,
462
463    /// Snapshot storage directory
464    ///
465    /// Default: `default_snapshots_dir()` (/tmp/snapshots)
466    #[serde(default = "default_snapshots_dir")]
467    pub snapshots_dir: PathBuf,
468
469    #[serde(default = "default_snapshots_dir_prefix")]
470    pub snapshots_dir_prefix: String,
471
472    /// Size (in bytes) of individual chunks when transferring snapshots
473    ///
474    /// Default: `default_chunk_size()` (typically 1MB)
475    #[serde(default = "default_chunk_size")]
476    pub chunk_size: usize,
477
478    /// Number of log entries to retain (0 = disable retention)
479    #[serde(default = "default_retained_log_entries")]
480    pub retained_log_entries: u64,
481
482    /// Number of chunks to process before yielding the task
483    #[serde(default = "default_sender_yield_every_n_chunks")]
484    pub sender_yield_every_n_chunks: usize,
485
486    /// Number of chunks to process before yielding the task
487    #[serde(default = "default_receiver_yield_every_n_chunks")]
488    pub receiver_yield_every_n_chunks: usize,
489
490    #[serde(default = "default_max_bandwidth_mbps")]
491    pub max_bandwidth_mbps: u32,
492
493    #[serde(default = "default_push_queue_size")]
494    pub push_queue_size: usize,
495
496    #[serde(default = "default_cache_size")]
497    pub cache_size: usize,
498
499    #[serde(default = "default_max_retries")]
500    pub max_retries: u32,
501
502    #[serde(default = "default_transfer_timeout_in_sec")]
503    pub transfer_timeout_in_sec: u64,
504
505    #[serde(default = "default_retry_interval_in_ms")]
506    pub retry_interval_in_ms: u64,
507
508    #[serde(default = "default_snapshot_push_backoff_in_ms")]
509    pub snapshot_push_backoff_in_ms: u64,
510
511    #[serde(default = "default_snapshot_push_max_retry")]
512    pub snapshot_push_max_retry: u32,
513
514    #[serde(default = "default_push_timeout_in_ms")]
515    pub push_timeout_in_ms: u64,
516}
517impl Default for SnapshotConfig {
518    fn default() -> Self {
519        Self {
520            max_log_entries_before_snapshot: default_max_log_entries_before_snapshot(),
521            snapshot_cool_down_since_last_check: default_snapshot_cool_down_since_last_check(),
522            cleanup_retain_count: default_cleanup_retain_count(),
523            snapshots_dir: default_snapshots_dir(),
524            snapshots_dir_prefix: default_snapshots_dir_prefix(),
525            chunk_size: default_chunk_size(),
526            retained_log_entries: default_retained_log_entries(),
527            sender_yield_every_n_chunks: default_sender_yield_every_n_chunks(),
528            receiver_yield_every_n_chunks: default_receiver_yield_every_n_chunks(),
529            max_bandwidth_mbps: default_max_bandwidth_mbps(),
530            push_queue_size: default_push_queue_size(),
531            cache_size: default_cache_size(),
532            max_retries: default_max_retries(),
533            transfer_timeout_in_sec: default_transfer_timeout_in_sec(),
534            retry_interval_in_ms: default_retry_interval_in_ms(),
535            snapshot_push_backoff_in_ms: default_snapshot_push_backoff_in_ms(),
536            snapshot_push_max_retry: default_snapshot_push_max_retry(),
537            push_timeout_in_ms: default_push_timeout_in_ms(),
538            enable: default_snapshot_enabled(),
539        }
540    }
541}
542impl SnapshotConfig {
543    fn validate(&self) -> Result<()> {
544        if self.max_log_entries_before_snapshot == 0 {
545            return Err(Error::Config(ConfigError::Message(
546                "max_log_entries_before_snapshot must be greater than 0".into(),
547            )));
548        }
549
550        if self.cleanup_retain_count == 0 {
551            return Err(Error::Config(ConfigError::Message(
552                "cleanup_retain_count must be greater than 0".into(),
553            )));
554        }
555        // Validate storage paths
556        validate_directory(&self.snapshots_dir, "snapshots_dir")?;
557
558        // chunk_size should be > 0
559        if self.chunk_size == 0 {
560            return Err(Error::Config(ConfigError::Message(format!(
561                "chunk_size must be at least {} bytes (got {})",
562                0, self.chunk_size
563            ))));
564        }
565
566        if self.retained_log_entries < 1 {
567            return Err(Error::Config(ConfigError::Message(format!(
568                "retained_log_entries must be >= 1, (got {})",
569                self.retained_log_entries
570            ))));
571        }
572
573        if self.sender_yield_every_n_chunks < 1 {
574            return Err(Error::Config(ConfigError::Message(format!(
575                "sender_yield_every_n_chunks must be >= 1, (got {})",
576                self.sender_yield_every_n_chunks
577            ))));
578        }
579
580        if self.receiver_yield_every_n_chunks < 1 {
581            return Err(Error::Config(ConfigError::Message(format!(
582                "receiver_yield_every_n_chunks must be >= 1, (got {})",
583                self.receiver_yield_every_n_chunks
584            ))));
585        }
586
587        if self.push_queue_size < 1 {
588            return Err(Error::Config(ConfigError::Message(format!(
589                "push_queue_size must be >= 1, (got {})",
590                self.push_queue_size
591            ))));
592        }
593
594        if self.snapshot_push_max_retry < 1 {
595            return Err(Error::Config(ConfigError::Message(format!(
596                "snapshot_push_max_retry must be >= 1, (got {})",
597                self.snapshot_push_max_retry
598            ))));
599        }
600
601        Ok(())
602    }
603}
604
605fn default_snapshot_enabled() -> bool {
606    true
607}
608
609/// Default threshold for triggering snapshot creation
610fn default_max_log_entries_before_snapshot() -> u64 {
611    1000
612}
613
614/// Default cooldown duration between snapshot checks.
615///
616/// Prevents constant evaluation of snapshot conditions in tight loops.
617/// Currently set to 1 hour (3600 seconds).
618fn default_snapshot_cool_down_since_last_check() -> Duration {
619    Duration::from_secs(3600)
620}
621
622/// Default number of historical snapshots to retain
623fn default_cleanup_retain_count() -> u64 {
624    2
625}
626/// Default snapshots storage path
627fn default_snapshots_dir() -> PathBuf {
628    PathBuf::from("/tmp/snapshots")
629}
630/// Default snapshots directory prefix
631fn default_snapshots_dir_prefix() -> String {
632    "snapshot-".to_string()
633}
634
635/// 1KB chunks by default
636fn default_chunk_size() -> usize {
637    1024
638}
639
640#[derive(Debug, Serialize, Deserialize, Clone)]
641pub struct AutoJoinConfig {
642    #[serde(default = "default_rpc_enable_compression")]
643    pub rpc_enable_compression: bool,
644}
645impl Default for AutoJoinConfig {
646    fn default() -> Self {
647        Self {
648            rpc_enable_compression: default_rpc_enable_compression(),
649        }
650    }
651}
652fn default_rpc_enable_compression() -> bool {
653    true
654}
655
656fn default_retained_log_entries() -> u64 {
657    1
658}
659
660fn default_sender_yield_every_n_chunks() -> usize {
661    1
662}
663
664fn default_receiver_yield_every_n_chunks() -> usize {
665    1
666}
667
668fn default_max_bandwidth_mbps() -> u32 {
669    1
670}
671
672fn default_push_queue_size() -> usize {
673    100
674}
675
676fn default_cache_size() -> usize {
677    10000
678}
679fn default_max_retries() -> u32 {
680    1
681}
682fn default_transfer_timeout_in_sec() -> u64 {
683    600
684}
685fn default_retry_interval_in_ms() -> u64 {
686    10
687}
688fn default_snapshot_push_backoff_in_ms() -> u64 {
689    100
690}
691fn default_snapshot_push_max_retry() -> u32 {
692    3
693}
694fn default_push_timeout_in_ms() -> u64 {
695    300_000
696}
697
698#[derive(Debug, Serialize, Deserialize, Clone)]
699pub struct ZombieConfig {
700    /// zombie connection failed threshold
701    #[serde(default = "default_zombie_threshold")]
702    pub threshold: u32,
703
704    #[serde(default = "default_zombie_purge_interval")]
705    pub purge_interval: Duration,
706}
707
708impl Default for ZombieConfig {
709    fn default() -> Self {
710        Self {
711            threshold: default_zombie_threshold(),
712            purge_interval: default_zombie_purge_interval(),
713        }
714    }
715}
716
717fn default_zombie_threshold() -> u32 {
718    3
719}
720// 30 seconds
721fn default_zombie_purge_interval() -> Duration {
722    Duration::from_secs(30)
723}
724
725#[derive(Debug, Serialize, Deserialize, Clone)]
726pub struct PromotionConfig {
727    #[serde(default = "default_stale_learner_threshold")]
728    pub stale_learner_threshold: Duration,
729    #[serde(default = "default_stale_check_interval")]
730    pub stale_check_interval: Duration,
731}
732
733impl Default for PromotionConfig {
734    fn default() -> Self {
735        Self {
736            stale_learner_threshold: default_stale_learner_threshold(),
737            stale_check_interval: default_stale_check_interval(),
738        }
739    }
740}
741
742// 5 minutes
743fn default_stale_learner_threshold() -> Duration {
744    Duration::from_secs(300)
745}
746// 30 seconds
747fn default_stale_check_interval() -> Duration {
748    Duration::from_secs(30)
749}
750/// Defines how Raft log entries are persisted and accessed.
751///
752/// All strategies use a configurable [`FlushPolicy`] to control when memory contents
753/// are flushed to disk, affecting write latency and durability guarantees.
754///
755/// **Note:** Both strategies now fully load all log entries from disk into memory at startup.
756/// The in-memory `SkipMap` serves as the primary data structure for reads in all modes.
757#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
758pub enum PersistenceStrategy {
759    /// Disk-first persistence strategy.
760    ///
761    /// - **Write path**: On append, the log entry is first written to disk. Only after a successful
762    ///   disk write is it acknowledged and stored in the in-memory `SkipMap`.
763    ///
764    /// - **Read path**: Reads are always served from the in-memory `SkipMap`.
765    ///
766    /// - **Startup behavior**: All log entries are loaded from disk into memory at startup,
767    ///   ensuring consistent access speed regardless of disk state.
768    ///
769    /// - Suitable for systems prioritizing strong durability while still providing in-memory
770    ///   performance for reads.
771    DiskFirst,
772
773    /// Memory-first persistence strategy.
774    ///
775    /// - **Write path**: On append, the log entry is first written to the in-memory `SkipMap` and
776    ///   acknowledged immediately. Disk persistence happens asynchronously in the background,
777    ///   governed by [`FlushPolicy`].
778    ///
779    /// - **Read path**: Reads are always served from the in-memory `SkipMap`.
780    ///
781    /// - **Startup behavior**: All log entries are loaded from disk into memory at startup, the
782    ///   same as `DiskFirst`.
783    ///
784    /// - Suitable for systems that favor lower write latency and faster failover, while still
785    ///   retaining a disk-backed log for crash recovery.
786    MemFirst,
787}
788
789/// Controls when in-memory logs should be flushed to disk.
790#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
791pub enum FlushPolicy {
792    /// Flush each log write immediately to disk.
793    ///
794    /// - Guarantees the highest durability.
795    /// - Each append operation causes a disk write.
796    Immediate,
797
798    /// Flush entries to disk when either of two conditions is met:
799    /// - The number of unflushed entries reaches the given threshold.
800    /// - The elapsed time since the last flush exceeds the configured interval.
801    ///
802    /// - Balances performance and durability.
803    /// - Recent unflushed entries may be lost in the event of a crash or power failure.
804    Batch { threshold: usize, interval_ms: u64 },
805}
806
807/// Configuration parameters for log persistence behavior
808#[derive(Serialize, Deserialize, Clone, Debug)]
809pub struct PersistenceConfig {
810    /// Strategy for persisting Raft logs
811    ///
812    /// This controls the trade-off between durability guarantees and performance
813    /// characteristics. The choice impacts both write throughput and recovery
814    /// behavior after node failures.
815    #[serde(default = "default_persistence_strategy")]
816    pub strategy: PersistenceStrategy,
817
818    /// Flush policy for asynchronous strategies
819    ///
820    /// This controls when log entries are flushed to disk. The choice impacts
821    /// write performance and durability guarantees.
822    #[serde(default = "default_flush_policy")]
823    pub flush_policy: FlushPolicy,
824
825    /// Maximum number of in-memory log entries to buffer when using async strategies
826    ///
827    /// This acts as a safety valve to prevent memory exhaustion during periods of
828    /// high write throughput or when disk persistence is slow.
829    #[serde(default = "default_max_buffered_entries")]
830    pub max_buffered_entries: usize,
831
832    /// Number of flush worker threads to use for log persistence.
833    ///
834    /// - If set to 0, the system falls back to spawning a new task per flush (legacy behavior,
835    ///   lower latency but less stable under high load).
836    /// - If set to a positive number, a worker pool of that size will be created to process flush
837    ///   requests (more stable and efficient under high load).
838    ///
839    /// This parameter allows tuning between throughput and latency depending on
840    /// workload characteristics.
841    #[serde(default = "default_flush_workers")]
842    pub flush_workers: usize,
843
844    /// Capacity of the internal task channel for flush workers.
845    ///
846    /// - Provides **backpressure** during high write throughput.
847    /// - Prevents unbounded task accumulation in memory when disk I/O is slow.
848    /// - Larger values improve throughput at the cost of higher memory usage, while smaller values
849    ///   apply stricter flow control but may reduce parallelism.
850    #[serde(default = "default_channel_capacity")]
851    pub channel_capacity: usize,
852}
853
854/// Default persistence strategy (optimized for balanced workloads)
855fn default_persistence_strategy() -> PersistenceStrategy {
856    PersistenceStrategy::MemFirst
857}
858
859/// Default value for flush_workers
860fn default_flush_workers() -> usize {
861    2
862}
863
864/// Default value for channel_capacity
865fn default_channel_capacity() -> usize {
866    100
867}
868
869/// Default flush policy for asynchronous strategies
870///
871/// This controls when log entries are flushed to disk. The choice impacts
872/// write performance and durability guarantees.
873fn default_flush_policy() -> FlushPolicy {
874    FlushPolicy::Batch {
875        threshold: 1024,
876        interval_ms: 100,
877    }
878}
879
880/// Default maximum buffered log entries
881fn default_max_buffered_entries() -> usize {
882    10_000
883}
884
885impl Default for PersistenceConfig {
886    fn default() -> Self {
887        Self {
888            strategy: default_persistence_strategy(),
889            flush_policy: default_flush_policy(),
890            max_buffered_entries: default_max_buffered_entries(),
891            flush_workers: default_flush_workers(),
892            channel_capacity: default_channel_capacity(),
893        }
894    }
895}
896
897/// Policy for read operation consistency guarantees
898///
899/// Determines the trade-off between read consistency and performance.
900/// Clients can choose the appropriate level based on their requirements.
901#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
902pub enum ReadConsistencyPolicy {
903    /// Lease-based reads for better performance with weaker consistency
904    ///
905    /// The leader serves reads locally without contacting followers
906    /// during the valid lease period. Assumes bounded clock drift between nodes.
907    /// Provides lower latency but slightly weaker consistency guarantees
908    /// compared to LinearizableRead.
909    LeaseRead,
910
911    /// Fully linearizable reads for strongest consistency
912    ///
913    /// The leader verifies its leadership with a quorum before serving
914    /// the read, ensuring strict linearizability. This guarantees that
915    /// all reads reflect the most recent committed value in the cluster.
916    #[default]
917    LinearizableRead,
918
919    /// Eventually consistent reads from any node
920    ///
921    /// Allows reading from any node (leader, follower, or candidate) without
922    /// additional consistency checks. May return stale data but provides
923    /// best read performance and availability. Suitable for scenarios where
924    /// eventual consistency is acceptable.
925    /// **Can be served by non-leader nodes.**
926    EventualConsistency,
927}
928
929/// Configuration for read operation consistency behavior
930#[derive(Clone, Debug, Serialize, Deserialize)]
931pub struct ReadConsistencyConfig {
932    /// Default read consistency policy for the cluster
933    ///
934    /// This sets the cluster-wide default behavior. Individual read requests
935    /// can still override this setting when needed for specific use cases.
936    #[serde(default)]
937    pub default_policy: ReadConsistencyPolicy,
938
939    /// Lease duration in milliseconds for LeaseRead policy
940    ///
941    /// Only applicable when using the LeaseRead policy. The leader considers
942    /// itself valid for this duration after successfully heartbeating to a quorum.
943    #[serde(default = "default_lease_duration_ms")]
944    pub lease_duration_ms: u64,
945
946    /// Whether to allow clients to override the default policy per request
947    ///
948    /// When true, clients can specify consistency requirements per read request.
949    /// When false, all reads use the cluster's default_policy setting.
950    #[serde(default = "default_allow_client_override")]
951    pub allow_client_override: bool,
952
953    /// Timeout in milliseconds to wait for state machine to catch up with commit index
954    ///
955    /// Used by LinearizableRead to ensure the state machine has applied all committed
956    /// entries before serving reads. Typical apply latency is <1ms on local SSD.
957    /// Default: 10ms (safe buffer for single-node local deployments)
958    #[serde(default = "default_state_machine_sync_timeout_ms")]
959    pub state_machine_sync_timeout_ms: u64,
960
961    /// ReadIndex batching configuration
962    #[serde(default)]
963    pub read_batching: ReadBatchingConfig,
964}
965
966/// Read batching configuration
967#[derive(Clone, Debug, Serialize, Deserialize)]
968pub struct ReadBatchingConfig {
969    /// Flush when buffer reaches this size
970    pub size_threshold: usize,
971
972    /// Flush when first request ages beyond this (milliseconds)
973    pub time_threshold_ms: u64,
974}
975
976impl Default for ReadBatchingConfig {
977    fn default() -> Self {
978        Self {
979            size_threshold: 50,
980            time_threshold_ms: 10,
981        }
982    }
983}
984
985impl Default for ReadConsistencyConfig {
986    fn default() -> Self {
987        Self {
988            default_policy: ReadConsistencyPolicy::default(),
989            lease_duration_ms: default_lease_duration_ms(),
990            allow_client_override: default_allow_client_override(),
991            state_machine_sync_timeout_ms: default_state_machine_sync_timeout_ms(),
992            read_batching: ReadBatchingConfig::default(),
993        }
994    }
995}
996
997fn default_lease_duration_ms() -> u64 {
998    // Conservative default: half of a typical heartbeat interval (~300ms)
999    250
1000}
1001
1002fn default_allow_client_override() -> bool {
1003    // Allow flexibility by default — clients can choose stronger consistency when needed
1004    true
1005}
1006
1007fn default_state_machine_sync_timeout_ms() -> u64 {
1008    10 // 10ms is safe for typical <1ms apply latency on local SSD
1009}
1010
1011impl ReadConsistencyConfig {
1012    fn validate(&self) -> Result<()> {
1013        // Validate read consistency configuration
1014        if self.lease_duration_ms == 0 {
1015            return Err(Error::Config(ConfigError::Message(
1016                "read_consistency.lease_duration_ms must be greater than 0".into(),
1017            )));
1018        }
1019        Ok(())
1020    }
1021}
1022
1023impl From<d_engine_proto::client::ReadConsistencyPolicy> for ReadConsistencyPolicy {
1024    fn from(proto_policy: d_engine_proto::client::ReadConsistencyPolicy) -> Self {
1025        match proto_policy {
1026            d_engine_proto::client::ReadConsistencyPolicy::LeaseRead => Self::LeaseRead,
1027            d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead => {
1028                Self::LinearizableRead
1029            }
1030            d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency => {
1031                Self::EventualConsistency
1032            }
1033        }
1034    }
1035}
1036
1037impl From<ReadConsistencyPolicy> for d_engine_proto::client::ReadConsistencyPolicy {
1038    fn from(config_policy: ReadConsistencyPolicy) -> Self {
1039        match config_policy {
1040            ReadConsistencyPolicy::LeaseRead => {
1041                d_engine_proto::client::ReadConsistencyPolicy::LeaseRead
1042            }
1043            ReadConsistencyPolicy::LinearizableRead => {
1044                d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead
1045            }
1046            ReadConsistencyPolicy::EventualConsistency => {
1047                d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency
1048            }
1049        }
1050    }
1051}
1052
1053/// Configuration for controlling gRPC compression settings per service type
1054///
1055/// Provides fine-grained control over when to enable compression based on
1056/// the RPC service type and deployment environment. Each service can be
1057/// independently configured to use compression based on its data
1058/// characteristics and frequency.
1059#[derive(Debug, Clone, Serialize, Deserialize)]
1060pub struct RpcCompressionConfig {
1061    /// Controls compression for Raft replication response data
1062    ///
1063    /// Replication traffic is typically high-frequency with small payloads
1064    /// in LAN environments, making compression less beneficial. In WAN
1065    /// deployments with bandwidth constraints, enabling may help.
1066    ///
1067    /// **Default**: `false` - Optimized for LAN/same-VPC deployments
1068    #[serde(default = "default_replication_compression")]
1069    pub replication_response: bool,
1070
1071    /// Controls compression for Raft election response data
1072    ///
1073    /// Election traffic is low-frequency but time-sensitive. Compression
1074    /// rarely benefits election traffic due to small payload size.
1075    ///
1076    /// **Default**: `true` for backward compatibility
1077    #[serde(default = "default_election_compression")]
1078    pub election_response: bool,
1079
1080    /// Controls compression for snapshot transfer response data
1081    ///
1082    /// Snapshot transfers involve large data volumes where compression
1083    /// is typically beneficial, even in low-latency environments.
1084    ///
1085    /// **Default**: `true` - Recommended for all environments
1086    #[serde(default = "default_snapshot_compression")]
1087    pub snapshot_response: bool,
1088
1089    /// Controls compression for cluster management response data
1090    ///
1091    /// Cluster operations are infrequent but may contain configuration data.
1092    /// Compression is generally beneficial for these operations.
1093    ///
1094    /// **Default**: `true` for backward compatibility
1095    #[serde(default = "default_cluster_compression")]
1096    pub cluster_response: bool,
1097
1098    /// Controls compression for client request response data
1099    ///
1100    /// Client responses may vary in size. In LAN/VPC environments,
1101    /// compression CPU overhead typically outweighs network benefits.
1102    ///
1103    /// **Default**: `false` - Optimized for LAN/same-VPC deployments
1104    #[serde(default = "default_client_compression")]
1105    pub client_response: bool,
1106}
1107
1108impl Default for RpcCompressionConfig {
1109    fn default() -> Self {
1110        Self {
1111            replication_response: default_replication_compression(),
1112            election_response: default_election_compression(),
1113            snapshot_response: default_snapshot_compression(),
1114            cluster_response: default_cluster_compression(),
1115            client_response: default_client_compression(),
1116        }
1117    }
1118}
1119
1120// Default values for RPC compression settings
1121fn default_replication_compression() -> bool {
1122    // Replication traffic is high-frequency with typically small payloads
1123    // For LAN/VPC deployments, compression adds CPU overhead without significant benefit
1124    false
1125}
1126
1127fn default_election_compression() -> bool {
1128    // Kept enabled for backward compatibility, though minimal benefit
1129    true
1130}
1131
1132fn default_snapshot_compression() -> bool {
1133    // Snapshot data is large and benefits from compression in all environments
1134    true
1135}
1136
1137fn default_cluster_compression() -> bool {
1138    // Kept enabled for backward compatibility
1139    true
1140}
1141
1142fn default_client_compression() -> bool {
1143    // Client responses in LAN/VPC environments typically benefit from no compression
1144    false
1145}
1146
1147/// Configuration for the Watch mechanism that monitors key changes
1148///
1149/// The watch system allows clients to monitor specific keys for changes with
1150/// minimal overhead on the write path. It uses a lock-free event queue and
1151/// configurable buffer sizes to balance performance and memory usage.
1152///
1153/// # Performance Characteristics
1154///
1155/// - Write path overhead: < 0.01% with 100+ watchers
1156/// - Event notification latency: typically < 100μs end-to-end
1157/// - Memory per watcher: ~2.4KB with default buffer size
1158///
1159/// # Configuration Example
1160///
1161/// ```toml
1162/// [raft.watch]
1163/// event_queue_size = 1000
1164/// watcher_buffer_size = 10
1165/// enable_metrics = false
1166/// ```
1167#[derive(Debug, Serialize, Deserialize, Clone)]
1168pub struct WatchConfig {
1169    /// Buffer size for the global event queue shared across all watchers
1170    ///
1171    /// This queue sits between the write path and the dispatcher thread.
1172    /// A larger queue reduces the chance of dropped events under burst load,
1173    /// but increases memory usage.
1174    ///
1175    /// **Performance Impact**:
1176    /// - Memory: ~24 bytes per slot (key + value pointers + event type)
1177    /// - Default 1000 slots ≈ 24KB memory
1178    ///
1179    /// **Tuning Guidelines**:
1180    /// - Low traffic (< 1K writes/sec): 500-1000
1181    /// - Medium traffic (1K-10K writes/sec): 1000-2000
1182    /// - High traffic (> 10K writes/sec): 2000-5000
1183    ///
1184    /// **Default**: 1000
1185    #[serde(default = "default_event_queue_size")]
1186    pub event_queue_size: usize,
1187
1188    /// Buffer size for each individual watcher's channel
1189    ///
1190    /// Each registered watcher gets its own channel to receive events.
1191    /// Smaller buffers reduce memory usage but increase the risk of
1192    /// dropping events for slow consumers.
1193    ///
1194    /// **Performance Impact**:
1195    /// - Memory: ~240 bytes per slot per watcher
1196    /// - 10 slots × 100 watchers = ~240KB total
1197    ///
1198    /// **Tuning Guidelines**:
1199    /// - Fast consumers (< 1ms processing): 5-10
1200    /// - Normal consumers (1-10ms processing): 10-20
1201    /// - Slow consumers (> 10ms processing): 20-50
1202    ///
1203    /// **Default**: 10
1204    #[serde(default = "default_watcher_buffer_size")]
1205    pub watcher_buffer_size: usize,
1206
1207    /// Enable detailed metrics and logging for watch operations
1208    ///
1209    /// When enabled, logs warnings for dropped events and tracks watch
1210    /// performance metrics. Adds minimal overhead (~0.001%) but useful
1211    /// for debugging and monitoring.
1212    ///
1213    /// **Default**: false (minimal overhead in production)
1214    #[serde(default = "default_enable_watch_metrics")]
1215    pub enable_metrics: bool,
1216}
1217
1218impl Default for WatchConfig {
1219    fn default() -> Self {
1220        Self {
1221            event_queue_size: default_event_queue_size(),
1222            watcher_buffer_size: default_watcher_buffer_size(),
1223            enable_metrics: default_enable_watch_metrics(),
1224        }
1225    }
1226}
1227
1228impl WatchConfig {
1229    /// Validates watch configuration parameters
1230    pub fn validate(&self) -> Result<()> {
1231        if self.event_queue_size == 0 {
1232            return Err(Error::Config(ConfigError::Message(
1233                "watch.event_queue_size must be greater than 0".into(),
1234            )));
1235        }
1236
1237        if self.event_queue_size > 100_000 {
1238            warn!(
1239                "watch.event_queue_size ({}) is very large and may consume significant memory (~{}MB)",
1240                self.event_queue_size,
1241                (self.event_queue_size * 24) / 1_000_000
1242            );
1243        }
1244
1245        if self.watcher_buffer_size == 0 {
1246            return Err(Error::Config(ConfigError::Message(
1247                "watch.watcher_buffer_size must be greater than 0".into(),
1248            )));
1249        }
1250
1251        if self.watcher_buffer_size > 1000 {
1252            warn!(
1253                "watch.watcher_buffer_size ({}) is very large. Each watcher will consume ~{}KB memory",
1254                self.watcher_buffer_size,
1255                (self.watcher_buffer_size * 240) / 1000
1256            );
1257        }
1258
1259        Ok(())
1260    }
1261}
1262
1263const fn default_event_queue_size() -> usize {
1264    1000
1265}
1266
1267const fn default_watcher_buffer_size() -> usize {
1268    10
1269}
1270
1271const fn default_enable_watch_metrics() -> bool {
1272    false
1273}