Skip to main content

d_engine_core/config/
raft.rs

1use std::fmt::Debug;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use config::ConfigError;
6use serde::Deserialize;
7use serde::Serialize;
8use tracing::warn;
9
10use super::lease::LeaseConfig;
11use super::validate_directory;
12use crate::Error;
13use crate::Result;
14
15/// Configuration parameters for the Raft consensus algorithm implementation
16#[derive(Serialize, Deserialize, Clone)]
17pub struct RaftConfig {
18    /// Configuration settings related to log replication
19    /// Includes parameters like heartbeat interval and AppendEntries entry count limit
20    #[serde(default)]
21    pub replication: ReplicationConfig,
22
23    /// Client request batching configuration
24    ///
25    /// Controls flush thresholds for the leader's propose and linearizable-read buffers.
26    /// Separate from replication config — this governs the client ingestion layer, not
27    /// the Leader→Follower replication path.
28    #[serde(default)]
29    pub batching: BatchingConfig,
30
31    /// Configuration settings for leader election mechanism
32    /// Controls timeouts and randomization factors for election timing
33    #[serde(default)]
34    pub election: ElectionConfig,
35
36    /// Configuration settings for cluster membership changes
37    /// Handles joint consensus transitions and cluster reconfiguration rules
38    #[serde(default)]
39    pub membership: MembershipConfig,
40
41    /// Configuration settings for state machine behavior
42    /// Controls state machine operations like lease management, compaction, etc.
43    /// For backward compatibility, can also be configured via `storage` in TOML files.
44    #[serde(default, alias = "storage")]
45    pub state_machine: StateMachineConfig,
46
47    /// Configuration settings for snapshot feature
48    #[serde(default)]
49    pub snapshot: SnapshotConfig,
50
51    /// Configuration settings for log persistence behavior
52    /// Controls how and when log entries are persisted to stable storage
53    #[serde(default)]
54    pub persistence: PersistenceConfig,
55
56    /// Maximum allowed log entry gap between leader and learner nodes
57    /// Learners with larger gaps than this value will trigger catch-up replication
58    /// Default value is set via default_learner_catchup_threshold() function
59    #[serde(default = "default_learner_catchup_threshold")]
60    pub learner_catchup_threshold: u64,
61
62    /// Throttle interval (milliseconds) for learner progress checks
63    /// Prevents excessive checking of learner promotion eligibility
64    /// Default value is set via default_learner_check_throttle_ms() function
65    #[serde(default = "default_learner_check_throttle_ms")]
66    pub learner_check_throttle_ms: u64,
67
68    /// Base timeout duration (in milliseconds) for general Raft operations
69    /// Used as fallback timeout when operation-specific timeouts are not set
70    /// Default value is set via default_general_timeout() function
71    #[serde(default = "default_general_timeout")]
72    pub general_raft_timeout_duration_in_ms: u64,
73
74    /// Timeout for snapshot RPC operations (milliseconds)
75    #[serde(default = "default_snapshot_rpc_timeout_ms")]
76    pub snapshot_rpc_timeout_ms: u64,
77
78    /// Configuration settings for new node auto join feature
79    #[serde(default)]
80    pub auto_join: AutoJoinConfig,
81
82    /// Configuration for read operation consistency behavior
83    /// Controls the trade-off between read performance and consistency guarantees
84    #[serde(default)]
85    pub read_consistency: ReadConsistencyConfig,
86
87    /// Backpressure configuration for client request flow control
88    /// Prevents unbounded memory growth by rejecting excess requests
89    #[serde(default)]
90    pub backpressure: BackpressureConfig,
91
92    /// RPC compression configuration for different service types
93    ///
94    /// Controls which RPC service types use response compression.
95    /// Allows fine-tuning for performance optimization based on
96    /// deployment environment and traffic patterns.
97    #[serde(default)]
98    pub rpc_compression: RpcCompressionConfig,
99
100    /// Configuration for Watch mechanism that monitors key changes
101    /// Controls event queue sizes and metrics behavior
102    #[serde(default)]
103    pub watch: WatchConfig,
104
105    /// Performance metrics configuration
106    /// Controls metrics emission and sampling for observability vs performance trade-off
107    #[serde(default)]
108    pub metrics: MetricsConfig,
109}
110
111impl Debug for RaftConfig {
112    fn fmt(
113        &self,
114        f: &mut std::fmt::Formatter<'_>,
115    ) -> std::fmt::Result {
116        f.debug_struct("RaftConfig").finish()
117    }
118}
119impl Default for RaftConfig {
120    fn default() -> Self {
121        Self {
122            replication: ReplicationConfig::default(),
123            batching: BatchingConfig::default(),
124            election: ElectionConfig::default(),
125            membership: MembershipConfig::default(),
126            state_machine: StateMachineConfig::default(),
127            snapshot: SnapshotConfig::default(),
128            persistence: PersistenceConfig::default(),
129            learner_catchup_threshold: default_learner_catchup_threshold(),
130            learner_check_throttle_ms: default_learner_check_throttle_ms(),
131            general_raft_timeout_duration_in_ms: default_general_timeout(),
132            auto_join: AutoJoinConfig::default(),
133            snapshot_rpc_timeout_ms: default_snapshot_rpc_timeout_ms(),
134            read_consistency: ReadConsistencyConfig::default(),
135            backpressure: BackpressureConfig::default(),
136            rpc_compression: RpcCompressionConfig::default(),
137            watch: WatchConfig::default(),
138            metrics: MetricsConfig::default(),
139        }
140    }
141}
142impl RaftConfig {
143    /// Validates all Raft subsystem configurations
144    pub fn validate(&self) -> Result<()> {
145        if self.learner_catchup_threshold == 0 {
146            return Err(Error::Config(ConfigError::Message(
147                "learner_catchup_threshold must be greater than 0".into(),
148            )));
149        }
150
151        if self.general_raft_timeout_duration_in_ms < 1 {
152            return Err(Error::Config(ConfigError::Message(
153                "general_raft_timeout_duration_in_ms must be at least 1ms".into(),
154            )));
155        }
156
157        self.replication.validate()?;
158        self.batching.validate()?;
159        self.election.validate()?;
160        self.membership.validate()?;
161        self.state_machine.validate()?;
162        self.snapshot.validate()?;
163        self.read_consistency.validate()?;
164        self.watch.validate()?;
165
166        // Warn if lease duration is too long compared to election timeout
167        if self.read_consistency.lease_duration_ms > self.election.election_timeout_min / 2 {
168            warn!(
169                "read_consistency.lease_duration_ms ({}) is greater than half of election_timeout_min ({}ms). \
170                     This may cause lease expiration during normal operation.",
171                self.read_consistency.lease_duration_ms,
172                self.election.election_timeout_min / 2
173            );
174        }
175
176        Ok(())
177    }
178}
179
180fn default_learner_catchup_threshold() -> u64 {
181    1
182}
183
184fn default_learner_check_throttle_ms() -> u64 {
185    1000 // 1 second
186}
187
188// in ms
189fn default_general_timeout() -> u64 {
190    50
191}
192fn default_snapshot_rpc_timeout_ms() -> u64 {
193    // 1 hour - sufficient for large snapshots
194    3_600_000
195}
196#[derive(Debug, Serialize, Deserialize, Clone)]
197pub struct ReplicationConfig {
198    /// Heartbeat interval (milliseconds): how often the leader sends AppendEntries RPCs.
199    #[serde(default = "default_append_interval")]
200    pub rpc_append_entries_clock_in_ms: u64,
201
202    /// Maximum log entries per single AppendEntries RPC to a follower.
203    #[serde(default = "default_entries_per_replication")]
204    pub append_entries_max_entries_per_replication: u64,
205}
206
207impl Default for ReplicationConfig {
208    fn default() -> Self {
209        Self {
210            rpc_append_entries_clock_in_ms: default_append_interval(),
211            append_entries_max_entries_per_replication: default_entries_per_replication(),
212        }
213    }
214}
215impl ReplicationConfig {
216    fn validate(&self) -> Result<()> {
217        if self.rpc_append_entries_clock_in_ms == 0 {
218            return Err(Error::Config(ConfigError::Message(
219                "rpc_append_entries_clock_in_ms cannot be 0".into(),
220            )));
221        }
222
223        if self.append_entries_max_entries_per_replication == 0 {
224            return Err(Error::Config(ConfigError::Message(
225                "append_entries_max_entries_per_replication must be > 0".into(),
226            )));
227        }
228
229        Ok(())
230    }
231}
232
233/// Batching configuration for leader-side drain loops and buffer allocation.
234///
235/// A single value intentionally covers all drain loops because they all operate
236/// within the same heartbeat period and share the same order-of-magnitude concurrency:
237/// - `raft.rs` cmd_rx drain (client propose ingestion)
238/// - `raft.rs` role_rx drain (commit index event coalescing)
239/// - `DefaultCommitHandler` new_commit_rx drain (state machine apply batching)
240///
241/// Also used as the initial Vec capacity for propose and linearizable-read buffers
242/// (construction-time hint only; the propose buffer retains capacity across flushes
243/// via `mem::swap`, while the read buffer resets each flush via `mem::take`).
244///
245/// # Tuning Guidelines
246/// - Low latency priority: lower values → smaller batches, faster flush
247/// - Throughput priority: higher values → larger batches, fewer RPCs
248#[derive(Debug, Serialize, Deserialize, Clone)]
249pub struct BatchingConfig {
250    /// Maximum items to drain per heartbeat period across all drain loops.
251    ///
252    /// **Default**: 100
253    #[serde(default = "default_max_batch_size")]
254    pub max_batch_size: usize,
255}
256
257impl Default for BatchingConfig {
258    fn default() -> Self {
259        Self {
260            max_batch_size: default_max_batch_size(),
261        }
262    }
263}
264
265impl BatchingConfig {
266    fn validate(&self) -> Result<()> {
267        if self.max_batch_size == 0 {
268            return Err(Error::Config(ConfigError::Message(
269                "batching.max_batch_size must be > 0".into(),
270            )));
271        }
272        Ok(())
273    }
274}
275
276fn default_append_interval() -> u64 {
277    100
278}
279fn default_max_batch_size() -> usize {
280    100
281}
282fn default_entries_per_replication() -> u64 {
283    100
284}
285#[derive(Debug, Serialize, Deserialize, Clone)]
286pub struct ElectionConfig {
287    #[serde(default = "default_election_timeout_min")]
288    pub election_timeout_min: u64,
289
290    #[serde(default = "default_election_timeout_max")]
291    pub election_timeout_max: u64,
292
293    #[serde(default = "default_peer_monitor_interval")]
294    pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
295
296    #[serde(default = "default_client_request_id")]
297    pub internal_rpc_client_request_id: u32,
298}
299
300impl Default for ElectionConfig {
301    fn default() -> Self {
302        Self {
303            election_timeout_min: default_election_timeout_min(),
304            election_timeout_max: default_election_timeout_max(),
305            rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
306            internal_rpc_client_request_id: default_client_request_id(),
307        }
308    }
309}
310impl ElectionConfig {
311    fn validate(&self) -> Result<()> {
312        if self.election_timeout_min >= self.election_timeout_max {
313            return Err(Error::Config(ConfigError::Message(format!(
314                "election_timeout_min {}ms must be less than election_timeout_max {}ms",
315                self.election_timeout_min, self.election_timeout_max
316            ))));
317        }
318
319        if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
320            return Err(Error::Config(ConfigError::Message(
321                "rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
322            )));
323        }
324
325        Ok(())
326    }
327}
328fn default_election_timeout_min() -> u64 {
329    500
330}
331fn default_election_timeout_max() -> u64 {
332    1000
333}
334fn default_peer_monitor_interval() -> u64 {
335    30
336}
337fn default_client_request_id() -> u32 {
338    0
339}
340
341#[derive(Debug, Serialize, Deserialize, Clone)]
342pub struct MembershipConfig {
343    #[serde(default = "default_probe_service")]
344    pub cluster_healthcheck_probe_service_name: String,
345
346    #[serde(default = "default_verify_leadership_persistent_timeout")]
347    pub verify_leadership_persistent_timeout: Duration,
348
349    #[serde(default = "default_membership_maintenance_interval")]
350    pub membership_maintenance_interval: Duration,
351
352    #[serde(default)]
353    pub zombie: ZombieConfig,
354
355    /// Configuration settings for ready learners promotion
356    #[serde(default)]
357    pub promotion: PromotionConfig,
358}
359impl Default for MembershipConfig {
360    fn default() -> Self {
361        Self {
362            cluster_healthcheck_probe_service_name: default_probe_service(),
363            verify_leadership_persistent_timeout: default_verify_leadership_persistent_timeout(),
364            membership_maintenance_interval: default_membership_maintenance_interval(),
365            zombie: ZombieConfig::default(),
366            promotion: PromotionConfig::default(),
367        }
368    }
369}
370fn default_probe_service() -> String {
371    "d_engine.server.cluster.ClusterManagementService".to_string()
372}
373
374// 30 seconds
375fn default_membership_maintenance_interval() -> Duration {
376    Duration::from_secs(30)
377}
378
379/// Default timeout for leader to keep verifying its leadership.
380///
381/// In Raft, the leader may retry sending no-op entries to confirm it still holds leadership.
382/// This timeout defines how long the leader will keep retrying before stepping down.
383///
384/// Default: 1 hour.
385fn default_verify_leadership_persistent_timeout() -> Duration {
386    Duration::from_secs(3600)
387}
388
389impl MembershipConfig {
390    fn validate(&self) -> Result<()> {
391        if self.cluster_healthcheck_probe_service_name.is_empty() {
392            return Err(Error::Config(ConfigError::Message(
393                "cluster_healthcheck_probe_service_name cannot be empty".into(),
394            )));
395        }
396        Ok(())
397    }
398}
399
400/// State machine behavior configuration
401///
402/// Controls state machine operations including lease management, compaction policies,
403/// and other data lifecycle features. This configuration affects how the state machine
404/// processes applied log entries and manages data.
405#[derive(Serialize, Deserialize, Clone, Debug)]
406#[derive(Default)]
407pub struct StateMachineConfig {
408    /// Lease (time-based expiration) configuration
409    ///
410    /// For backward compatibility, can also be configured via `ttl` in TOML files.
411    #[serde(alias = "ttl")]
412    pub lease: LeaseConfig,
413}
414
415impl StateMachineConfig {
416    pub fn validate(&self) -> Result<()> {
417        self.lease.validate()?;
418        Ok(())
419    }
420}
421
422/// Submit processor-specific configuration
423#[derive(Debug, Serialize, Deserialize, Clone)]
424pub struct SnapshotConfig {
425    /// If enable the snapshot or not
426    #[serde(default = "default_snapshot_enabled")]
427    pub enable: bool,
428
429    /// Maximum number of log entries to accumulate before triggering snapshot creation
430    /// This helps control memory usage by enforcing periodic state compaction
431    #[serde(default = "default_max_log_entries_before_snapshot")]
432    pub max_log_entries_before_snapshot: u64,
433
434    /// Minimum duration to wait between consecutive snapshot checks.
435    /// Acts as a cooldown period to avoid overly frequent snapshot evaluations.
436    #[serde(default = "default_snapshot_cool_down_since_last_check")]
437    pub snapshot_cool_down_since_last_check: Duration,
438
439    /// Number of historical snapshot versions to retain during cleanup
440    /// Ensures we maintain a safety buffer of previous states for recovery
441    #[serde(default = "default_cleanup_retain_count")]
442    pub cleanup_retain_count: u64,
443
444    /// Snapshot storage directory
445    ///
446    /// Default: `default_snapshots_dir()` (/tmp/snapshots)
447    #[serde(default = "default_snapshots_dir")]
448    pub snapshots_dir: PathBuf,
449
450    #[serde(default = "default_snapshots_dir_prefix")]
451    pub snapshots_dir_prefix: String,
452
453    /// Size (in bytes) of individual chunks when transferring snapshots
454    ///
455    /// Default: `default_chunk_size()` (typically 1MB)
456    #[serde(default = "default_chunk_size")]
457    pub chunk_size: usize,
458
459    /// Number of log entries to retain (0 = disable retention)
460    #[serde(default = "default_retained_log_entries")]
461    pub retained_log_entries: u64,
462
463    /// Number of chunks to process before yielding the task
464    #[serde(default = "default_sender_yield_every_n_chunks")]
465    pub sender_yield_every_n_chunks: usize,
466
467    /// Number of chunks to process before yielding the task
468    #[serde(default = "default_receiver_yield_every_n_chunks")]
469    pub receiver_yield_every_n_chunks: usize,
470
471    #[serde(default = "default_max_bandwidth_mbps")]
472    pub max_bandwidth_mbps: u32,
473
474    #[serde(default = "default_push_queue_size")]
475    pub push_queue_size: usize,
476
477    #[serde(default = "default_cache_size")]
478    pub cache_size: usize,
479
480    #[serde(default = "default_max_retries")]
481    pub max_retries: u32,
482
483    #[serde(default = "default_transfer_timeout_in_sec")]
484    pub transfer_timeout_in_sec: u64,
485
486    #[serde(default = "default_retry_interval_in_ms")]
487    pub retry_interval_in_ms: u64,
488
489    #[serde(default = "default_snapshot_push_backoff_in_ms")]
490    pub snapshot_push_backoff_in_ms: u64,
491
492    #[serde(default = "default_snapshot_push_max_retry")]
493    pub snapshot_push_max_retry: u32,
494
495    #[serde(default = "default_push_timeout_in_ms")]
496    pub push_timeout_in_ms: u64,
497}
498impl Default for SnapshotConfig {
499    fn default() -> Self {
500        Self {
501            max_log_entries_before_snapshot: default_max_log_entries_before_snapshot(),
502            snapshot_cool_down_since_last_check: default_snapshot_cool_down_since_last_check(),
503            cleanup_retain_count: default_cleanup_retain_count(),
504            snapshots_dir: default_snapshots_dir(),
505            snapshots_dir_prefix: default_snapshots_dir_prefix(),
506            chunk_size: default_chunk_size(),
507            retained_log_entries: default_retained_log_entries(),
508            sender_yield_every_n_chunks: default_sender_yield_every_n_chunks(),
509            receiver_yield_every_n_chunks: default_receiver_yield_every_n_chunks(),
510            max_bandwidth_mbps: default_max_bandwidth_mbps(),
511            push_queue_size: default_push_queue_size(),
512            cache_size: default_cache_size(),
513            max_retries: default_max_retries(),
514            transfer_timeout_in_sec: default_transfer_timeout_in_sec(),
515            retry_interval_in_ms: default_retry_interval_in_ms(),
516            snapshot_push_backoff_in_ms: default_snapshot_push_backoff_in_ms(),
517            snapshot_push_max_retry: default_snapshot_push_max_retry(),
518            push_timeout_in_ms: default_push_timeout_in_ms(),
519            enable: default_snapshot_enabled(),
520        }
521    }
522}
523impl SnapshotConfig {
524    fn validate(&self) -> Result<()> {
525        if self.max_log_entries_before_snapshot == 0 {
526            return Err(Error::Config(ConfigError::Message(
527                "max_log_entries_before_snapshot must be greater than 0".into(),
528            )));
529        }
530
531        if self.cleanup_retain_count == 0 {
532            return Err(Error::Config(ConfigError::Message(
533                "cleanup_retain_count must be greater than 0".into(),
534            )));
535        }
536        // Validate storage paths
537        validate_directory(&self.snapshots_dir, "snapshots_dir")?;
538
539        // chunk_size should be > 0
540        if self.chunk_size == 0 {
541            return Err(Error::Config(ConfigError::Message(format!(
542                "chunk_size must be at least {} bytes (got {})",
543                0, self.chunk_size
544            ))));
545        }
546
547        if self.retained_log_entries < 1 {
548            return Err(Error::Config(ConfigError::Message(format!(
549                "retained_log_entries must be >= 1, (got {})",
550                self.retained_log_entries
551            ))));
552        }
553
554        if self.sender_yield_every_n_chunks < 1 {
555            return Err(Error::Config(ConfigError::Message(format!(
556                "sender_yield_every_n_chunks must be >= 1, (got {})",
557                self.sender_yield_every_n_chunks
558            ))));
559        }
560
561        if self.receiver_yield_every_n_chunks < 1 {
562            return Err(Error::Config(ConfigError::Message(format!(
563                "receiver_yield_every_n_chunks must be >= 1, (got {})",
564                self.receiver_yield_every_n_chunks
565            ))));
566        }
567
568        if self.push_queue_size < 1 {
569            return Err(Error::Config(ConfigError::Message(format!(
570                "push_queue_size must be >= 1, (got {})",
571                self.push_queue_size
572            ))));
573        }
574
575        if self.snapshot_push_max_retry < 1 {
576            return Err(Error::Config(ConfigError::Message(format!(
577                "snapshot_push_max_retry must be >= 1, (got {})",
578                self.snapshot_push_max_retry
579            ))));
580        }
581
582        Ok(())
583    }
584}
585
586fn default_snapshot_enabled() -> bool {
587    true
588}
589
590/// Default threshold for triggering snapshot creation
591fn default_max_log_entries_before_snapshot() -> u64 {
592    1000
593}
594
595/// Default cooldown duration between snapshot checks.
596///
597/// Prevents constant evaluation of snapshot conditions in tight loops.
598/// Currently set to 1 hour (3600 seconds).
599fn default_snapshot_cool_down_since_last_check() -> Duration {
600    Duration::from_secs(3600)
601}
602
603/// Default number of historical snapshots to retain
604fn default_cleanup_retain_count() -> u64 {
605    2
606}
607/// Default snapshots storage path
608fn default_snapshots_dir() -> PathBuf {
609    PathBuf::from("/tmp/snapshots")
610}
611/// Default snapshots directory prefix
612fn default_snapshots_dir_prefix() -> String {
613    "snapshot-".to_string()
614}
615
616/// 1KB chunks by default
617fn default_chunk_size() -> usize {
618    1024
619}
620
621#[derive(Debug, Serialize, Deserialize, Clone)]
622pub struct AutoJoinConfig {
623    #[serde(default = "default_rpc_enable_compression")]
624    pub rpc_enable_compression: bool,
625}
626impl Default for AutoJoinConfig {
627    fn default() -> Self {
628        Self {
629            rpc_enable_compression: default_rpc_enable_compression(),
630        }
631    }
632}
633fn default_rpc_enable_compression() -> bool {
634    true
635}
636
637fn default_retained_log_entries() -> u64 {
638    1
639}
640
641fn default_sender_yield_every_n_chunks() -> usize {
642    1
643}
644
645fn default_receiver_yield_every_n_chunks() -> usize {
646    1
647}
648
649fn default_max_bandwidth_mbps() -> u32 {
650    1
651}
652
653fn default_push_queue_size() -> usize {
654    100
655}
656
657fn default_cache_size() -> usize {
658    10000
659}
660fn default_max_retries() -> u32 {
661    1
662}
663fn default_transfer_timeout_in_sec() -> u64 {
664    600
665}
666fn default_retry_interval_in_ms() -> u64 {
667    10
668}
669fn default_snapshot_push_backoff_in_ms() -> u64 {
670    100
671}
672fn default_snapshot_push_max_retry() -> u32 {
673    3
674}
675fn default_push_timeout_in_ms() -> u64 {
676    300_000
677}
678
679#[derive(Debug, Serialize, Deserialize, Clone)]
680pub struct ZombieConfig {
681    /// zombie connection failed threshold
682    #[serde(default = "default_zombie_threshold")]
683    pub threshold: u32,
684
685    #[serde(default = "default_zombie_purge_interval")]
686    pub purge_interval: Duration,
687}
688
689impl Default for ZombieConfig {
690    fn default() -> Self {
691        Self {
692            threshold: default_zombie_threshold(),
693            purge_interval: default_zombie_purge_interval(),
694        }
695    }
696}
697
698fn default_zombie_threshold() -> u32 {
699    3
700}
701// 30 seconds
702fn default_zombie_purge_interval() -> Duration {
703    Duration::from_secs(30)
704}
705
706#[derive(Debug, Serialize, Deserialize, Clone)]
707pub struct PromotionConfig {
708    #[serde(default = "default_stale_learner_threshold")]
709    pub stale_learner_threshold: Duration,
710    #[serde(default = "default_stale_check_interval")]
711    pub stale_check_interval: Duration,
712}
713
714impl Default for PromotionConfig {
715    fn default() -> Self {
716        Self {
717            stale_learner_threshold: default_stale_learner_threshold(),
718            stale_check_interval: default_stale_check_interval(),
719        }
720    }
721}
722
723// 5 minutes
724fn default_stale_learner_threshold() -> Duration {
725    Duration::from_secs(300)
726}
727// 30 seconds
728fn default_stale_check_interval() -> Duration {
729    Duration::from_secs(30)
730}
731/// Defines how Raft log entries are persisted and accessed.
732///
733/// All strategies use a configurable [`FlushPolicy`] to control when memory contents
734/// are flushed to disk, affecting write latency and durability guarantees.
735///
736/// **Note:** Both strategies now fully load all log entries from disk into memory at startup.
737/// The in-memory `SkipMap` serves as the primary data structure for reads in all modes.
738#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
739pub enum PersistenceStrategy {
740    /// Disk-first persistence strategy.
741    ///
742    /// - **Write path**: On append, the log entry is first written to disk. Only after a successful
743    ///   disk write is it acknowledged and stored in the in-memory `SkipMap`.
744    ///
745    /// - **Read path**: Reads are always served from the in-memory `SkipMap`.
746    ///
747    /// - **Startup behavior**: All log entries are loaded from disk into memory at startup,
748    ///   ensuring consistent access speed regardless of disk state.
749    ///
750    /// - Suitable for systems prioritizing strong durability while still providing in-memory
751    ///   performance for reads.
752    DiskFirst,
753
754    /// Memory-first persistence strategy.
755    ///
756    /// - **Write path**: On append, the log entry is first written to the in-memory `SkipMap` and
757    ///   acknowledged immediately. Disk persistence happens asynchronously in the background,
758    ///   governed by [`FlushPolicy`].
759    ///
760    /// - **Read path**: Reads are always served from the in-memory `SkipMap`.
761    ///
762    /// - **Startup behavior**: All log entries are loaded from disk into memory at startup, the
763    ///   same as `DiskFirst`.
764    ///
765    /// - Suitable for systems that favor lower write latency and faster failover, while still
766    ///   retaining a disk-backed log for crash recovery.
767    MemFirst,
768}
769
770/// Controls when in-memory logs should be flushed to disk.
771#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
772pub enum FlushPolicy {
773    /// Flush each log write immediately to disk.
774    ///
775    /// - Guarantees the highest durability.
776    /// - Each append operation causes a disk write.
777    Immediate,
778
779    /// Flush entries to disk when either of two conditions is met:
780    /// - The number of unflushed entries reaches the given threshold.
781    /// - The elapsed time since the last flush exceeds the configured interval.
782    ///
783    /// - Balances performance and durability.
784    /// - Recent unflushed entries may be lost in the event of a crash or power failure.
785    Batch { threshold: usize, interval_ms: u64 },
786}
787
788/// Configuration parameters for log persistence behavior
789#[derive(Serialize, Deserialize, Clone, Debug)]
790pub struct PersistenceConfig {
791    /// Strategy for persisting Raft logs
792    ///
793    /// This controls the trade-off between durability guarantees and performance
794    /// characteristics. The choice impacts both write throughput and recovery
795    /// behavior after node failures.
796    #[serde(default = "default_persistence_strategy")]
797    pub strategy: PersistenceStrategy,
798
799    /// Flush policy for asynchronous strategies
800    ///
801    /// This controls when log entries are flushed to disk. The choice impacts
802    /// write performance and durability guarantees.
803    #[serde(default = "default_flush_policy")]
804    pub flush_policy: FlushPolicy,
805
806    /// Maximum number of in-memory log entries to buffer when using async strategies
807    ///
808    /// This acts as a safety valve to prevent memory exhaustion during periods of
809    /// high write throughput or when disk persistence is slow.
810    #[serde(default = "default_max_buffered_entries")]
811    pub max_buffered_entries: usize,
812
813    /// Number of flush worker threads to use for log persistence.
814    ///
815    /// - If set to 0, the system falls back to spawning a new task per flush (legacy behavior,
816    ///   lower latency but less stable under high load).
817    /// - If set to a positive number, a worker pool of that size will be created to process flush
818    ///   requests (more stable and efficient under high load).
819    ///
820    /// This parameter allows tuning between throughput and latency depending on
821    /// workload characteristics.
822    #[serde(default = "default_flush_workers")]
823    pub flush_workers: usize,
824
825    /// Capacity of the internal task channel for flush workers.
826    ///
827    /// - Provides **backpressure** during high write throughput.
828    /// - Prevents unbounded task accumulation in memory when disk I/O is slow.
829    /// - Larger values improve throughput at the cost of higher memory usage, while smaller values
830    ///   apply stricter flow control but may reduce parallelism.
831    #[serde(default = "default_channel_capacity")]
832    pub channel_capacity: usize,
833}
834
835/// Default persistence strategy (optimized for balanced workloads)
836fn default_persistence_strategy() -> PersistenceStrategy {
837    PersistenceStrategy::DiskFirst
838}
839
840/// Default value for flush_workers
841fn default_flush_workers() -> usize {
842    2
843}
844
845/// Default value for channel_capacity
846fn default_channel_capacity() -> usize {
847    100
848}
849
850/// Default flush policy for asynchronous strategies
851///
852/// This controls when log entries are flushed to disk. The choice impacts
853/// write performance and durability guarantees.
854fn default_flush_policy() -> FlushPolicy {
855    FlushPolicy::Batch {
856        threshold: 1024,
857        interval_ms: 100,
858    }
859}
860
861/// Default maximum buffered log entries
862fn default_max_buffered_entries() -> usize {
863    10_000
864}
865
866impl Default for PersistenceConfig {
867    fn default() -> Self {
868        Self {
869            strategy: default_persistence_strategy(),
870            flush_policy: default_flush_policy(),
871            max_buffered_entries: default_max_buffered_entries(),
872            flush_workers: default_flush_workers(),
873            channel_capacity: default_channel_capacity(),
874        }
875    }
876}
877
878/// Backpressure configuration for client request flow control
879///
880/// Prevents unbounded memory growth by limiting pending client requests.
881/// When limits are reached, new requests are rejected with RESOURCE_EXHAUSTED
882/// error until the system processes existing requests.
883///
884/// # Value Semantics
885/// - `0` = unlimited (no backpressure)
886/// - `> 0` = maximum pending requests allowed
887///
888/// # Tuning Guidelines(only for reference)
889/// - Low memory (< 4GB): 1000-5000
890/// - Medium memory (4-16GB): 5000-20000
891/// - High memory (> 16GB): 20000-50000
892///
893/// # Example
894/// ```toml
895/// [raft.backpressure]
896/// max_pending_writes = 10000
897/// max_pending_reads = 50000
898/// ```
899#[derive(Debug, Clone, Serialize, Deserialize)]
900pub struct BackpressureConfig {
901    /// Maximum pending write (propose) requests
902    ///
903    /// Limits the number of client write requests waiting in the leader's
904    /// propose buffer. Write requests typically consume more resources
905    /// (replication, persistence) than reads.
906    ///
907    /// **Default**: 10000 (0 = unlimited)
908    #[serde(default = "default_max_pending_writes")]
909    pub max_pending_writes: usize,
910
911    /// Maximum pending linearizable read requests
912    ///
913    /// Limits the number of client linearizable read requests waiting in
914    /// the leader's read buffer. Read requests can tolerate higher limits
915    /// as they don't require replication.
916    ///
917    /// **Default**: 50000 (0 = unlimited)
918    #[serde(default = "default_max_pending_reads")]
919    pub max_pending_reads: usize,
920}
921
922impl Default for BackpressureConfig {
923    fn default() -> Self {
924        Self {
925            max_pending_writes: default_max_pending_writes(),
926            max_pending_reads: default_max_pending_reads(),
927        }
928    }
929}
930
931fn default_max_pending_writes() -> usize {
932    10_000
933}
934
935fn default_max_pending_reads() -> usize {
936    50_000
937}
938
939impl BackpressureConfig {
940    /// Check if write request should be rejected due to backpressure
941    ///
942    /// Returns true if the current pending count exceeds the limit.
943    /// When `max_pending_writes == 0`, always returns false (unlimited).
944    pub fn should_reject_write(
945        &self,
946        current_pending: usize,
947    ) -> bool {
948        self.max_pending_writes > 0 && current_pending >= self.max_pending_writes
949    }
950
951    /// Check if read request should be rejected due to backpressure
952    ///
953    /// Returns true if the current pending count exceeds the limit.
954    /// When `max_pending_reads == 0`, always returns false (unlimited).
955    pub fn should_reject_read(
956        &self,
957        current_pending: usize,
958    ) -> bool {
959        self.max_pending_reads > 0 && current_pending >= self.max_pending_reads
960    }
961}
962
963/// Policy for read operation consistency guarantees
964///
965/// Determines the trade-off between read consistency and performance.
966/// Clients can choose the appropriate level based on their requirements.
967#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
968pub enum ReadConsistencyPolicy {
969    /// Lease-based reads for better performance with weaker consistency
970    ///
971    /// The leader serves reads locally without contacting followers
972    /// during the valid lease period. Assumes bounded clock drift between nodes.
973    /// Provides lower latency but slightly weaker consistency guarantees
974    /// compared to LinearizableRead.
975    LeaseRead,
976
977    /// Fully linearizable reads for strongest consistency
978    ///
979    /// The leader verifies its leadership with a quorum before serving
980    /// the read, ensuring strict linearizability. This guarantees that
981    /// all reads reflect the most recent committed value in the cluster.
982    #[default]
983    LinearizableRead,
984
985    /// Eventually consistent reads from any node
986    ///
987    /// Allows reading from any node (leader, follower, or candidate) without
988    /// additional consistency checks. May return stale data but provides
989    /// best read performance and availability. Suitable for scenarios where
990    /// eventual consistency is acceptable.
991    /// **Can be served by non-leader nodes.**
992    EventualConsistency,
993}
994
995/// Configuration for read operation consistency behavior
996#[derive(Clone, Debug, Serialize, Deserialize)]
997pub struct ReadConsistencyConfig {
998    /// Default read consistency policy for the cluster
999    ///
1000    /// This sets the cluster-wide default behavior. Individual read requests
1001    /// can still override this setting when needed for specific use cases.
1002    #[serde(default)]
1003    pub default_policy: ReadConsistencyPolicy,
1004
1005    /// Lease duration in milliseconds for LeaseRead policy
1006    ///
1007    /// Only applicable when using the LeaseRead policy. The leader considers
1008    /// itself valid for this duration after successfully heartbeating to a quorum.
1009    #[serde(default = "default_lease_duration_ms")]
1010    pub lease_duration_ms: u64,
1011
1012    /// Whether to allow clients to override the default policy per request
1013    ///
1014    /// When true, clients can specify consistency requirements per read request.
1015    /// When false, all reads use the cluster's default_policy setting.
1016    #[serde(default = "default_allow_client_override")]
1017    pub allow_client_override: bool,
1018
1019    /// Timeout in milliseconds to wait for state machine to catch up with commit index
1020    ///
1021    /// Used by LinearizableRead to ensure the state machine has applied all committed
1022    /// entries before serving reads. Typical apply latency is <1ms on local SSD.
1023    /// Default: 10ms (safe buffer for single-node local deployments)
1024    #[serde(default = "default_state_machine_sync_timeout_ms")]
1025    pub state_machine_sync_timeout_ms: u64,
1026}
1027
1028impl Default for ReadConsistencyConfig {
1029    fn default() -> Self {
1030        Self {
1031            default_policy: ReadConsistencyPolicy::default(),
1032            lease_duration_ms: default_lease_duration_ms(),
1033            allow_client_override: default_allow_client_override(),
1034            state_machine_sync_timeout_ms: default_state_machine_sync_timeout_ms(),
1035        }
1036    }
1037}
1038
1039fn default_lease_duration_ms() -> u64 {
1040    // Conservative default: half of a typical heartbeat interval (~300ms)
1041    250
1042}
1043
1044fn default_allow_client_override() -> bool {
1045    // Allow flexibility by default — clients can choose stronger consistency when needed
1046    true
1047}
1048
1049fn default_state_machine_sync_timeout_ms() -> u64 {
1050    10 // 10ms is safe for typical <1ms apply latency on local SSD
1051}
1052
1053impl ReadConsistencyConfig {
1054    fn validate(&self) -> Result<()> {
1055        // Validate read consistency configuration
1056        if self.lease_duration_ms == 0 {
1057            return Err(Error::Config(ConfigError::Message(
1058                "read_consistency.lease_duration_ms must be greater than 0".into(),
1059            )));
1060        }
1061        Ok(())
1062    }
1063}
1064
1065impl From<d_engine_proto::client::ReadConsistencyPolicy> for ReadConsistencyPolicy {
1066    fn from(proto_policy: d_engine_proto::client::ReadConsistencyPolicy) -> Self {
1067        match proto_policy {
1068            d_engine_proto::client::ReadConsistencyPolicy::LeaseRead => Self::LeaseRead,
1069            d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead => {
1070                Self::LinearizableRead
1071            }
1072            d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency => {
1073                Self::EventualConsistency
1074            }
1075        }
1076    }
1077}
1078
1079impl From<ReadConsistencyPolicy> for d_engine_proto::client::ReadConsistencyPolicy {
1080    fn from(config_policy: ReadConsistencyPolicy) -> Self {
1081        match config_policy {
1082            ReadConsistencyPolicy::LeaseRead => {
1083                d_engine_proto::client::ReadConsistencyPolicy::LeaseRead
1084            }
1085            ReadConsistencyPolicy::LinearizableRead => {
1086                d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead
1087            }
1088            ReadConsistencyPolicy::EventualConsistency => {
1089                d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency
1090            }
1091        }
1092    }
1093}
1094
1095/// Configuration for controlling gRPC compression settings per service type
1096///
1097/// Provides fine-grained control over when to enable compression based on
1098/// the RPC service type and deployment environment. Each service can be
1099/// independently configured to use compression based on its data
1100/// characteristics and frequency.
1101#[derive(Debug, Clone, Serialize, Deserialize)]
1102pub struct RpcCompressionConfig {
1103    /// Controls compression for Raft replication response data
1104    ///
1105    /// Replication traffic is typically high-frequency with small payloads
1106    /// in LAN environments, making compression less beneficial. In WAN
1107    /// deployments with bandwidth constraints, enabling may help.
1108    ///
1109    /// **Default**: `false` - Optimized for LAN/same-VPC deployments
1110    #[serde(default = "default_replication_compression")]
1111    pub replication_response: bool,
1112
1113    /// Controls compression for Raft election response data
1114    ///
1115    /// Election traffic is low-frequency but time-sensitive. Compression
1116    /// rarely benefits election traffic due to small payload size.
1117    ///
1118    /// **Default**: `true` for backward compatibility
1119    #[serde(default = "default_election_compression")]
1120    pub election_response: bool,
1121
1122    /// Controls compression for snapshot transfer response data
1123    ///
1124    /// Snapshot transfers involve large data volumes where compression
1125    /// is typically beneficial, even in low-latency environments.
1126    ///
1127    /// **Default**: `true` - Recommended for all environments
1128    #[serde(default = "default_snapshot_compression")]
1129    pub snapshot_response: bool,
1130
1131    /// Controls compression for cluster management response data
1132    ///
1133    /// Cluster operations are infrequent but may contain configuration data.
1134    /// Compression is generally beneficial for these operations.
1135    ///
1136    /// **Default**: `true` for backward compatibility
1137    #[serde(default = "default_cluster_compression")]
1138    pub cluster_response: bool,
1139
1140    /// Controls compression for client request response data
1141    ///
1142    /// Client responses may vary in size. In LAN/VPC environments,
1143    /// compression CPU overhead typically outweighs network benefits.
1144    ///
1145    /// **Default**: `false` - Optimized for LAN/same-VPC deployments
1146    #[serde(default = "default_client_compression")]
1147    pub client_response: bool,
1148}
1149
1150impl Default for RpcCompressionConfig {
1151    fn default() -> Self {
1152        Self {
1153            replication_response: default_replication_compression(),
1154            election_response: default_election_compression(),
1155            snapshot_response: default_snapshot_compression(),
1156            cluster_response: default_cluster_compression(),
1157            client_response: default_client_compression(),
1158        }
1159    }
1160}
1161
1162// Default values for RPC compression settings
1163fn default_replication_compression() -> bool {
1164    // Replication traffic is high-frequency with typically small payloads
1165    // For LAN/VPC deployments, compression adds CPU overhead without significant benefit
1166    false
1167}
1168
1169fn default_election_compression() -> bool {
1170    // Kept enabled for backward compatibility, though minimal benefit
1171    true
1172}
1173
1174fn default_snapshot_compression() -> bool {
1175    // Snapshot data is large and benefits from compression in all environments
1176    true
1177}
1178
1179fn default_cluster_compression() -> bool {
1180    // Kept enabled for backward compatibility
1181    true
1182}
1183
1184fn default_client_compression() -> bool {
1185    // Client responses in LAN/VPC environments typically benefit from no compression
1186    false
1187}
1188
1189/// Configuration for the Watch mechanism that monitors key changes
1190///
1191/// The watch system allows clients to monitor specific keys for changes with
1192/// minimal overhead on the write path. It uses a lock-free event queue and
1193/// configurable buffer sizes to balance performance and memory usage.
1194///
1195/// # Performance Characteristics
1196///
1197/// - Write path overhead: < 0.01% with 100+ watchers
1198/// - Event notification latency: typically < 100μs end-to-end
1199/// - Memory per watcher: ~2.4KB with default buffer size
1200///
1201/// # Configuration Example
1202///
1203/// ```toml
1204/// [raft.watch]
1205/// event_queue_size = 1000
1206/// watcher_buffer_size = 10
1207/// enable_metrics = false
1208/// ```
1209#[derive(Debug, Serialize, Deserialize, Clone)]
1210pub struct WatchConfig {
1211    /// Buffer size for the global event queue shared across all watchers
1212    ///
1213    /// This queue sits between the write path and the dispatcher thread.
1214    /// A larger queue reduces the chance of dropped events under burst load,
1215    /// but increases memory usage.
1216    ///
1217    /// **Performance Impact**:
1218    /// - Memory: ~24 bytes per slot (key + value pointers + event type)
1219    /// - Default 1000 slots ≈ 24KB memory
1220    ///
1221    /// **Tuning Guidelines**:
1222    /// - Low traffic (< 1K writes/sec): 500-1000
1223    /// - Medium traffic (1K-10K writes/sec): 1000-2000
1224    /// - High traffic (> 10K writes/sec): 2000-5000
1225    ///
1226    /// **Default**: 1000
1227    #[serde(default = "default_event_queue_size")]
1228    pub event_queue_size: usize,
1229
1230    /// Buffer size for each individual watcher's channel
1231    ///
1232    /// Each registered watcher gets its own channel to receive events.
1233    /// Smaller buffers reduce memory usage but increase the risk of
1234    /// dropping events for slow consumers.
1235    ///
1236    /// **Performance Impact**:
1237    /// - Memory: ~240 bytes per slot per watcher
1238    /// - 10 slots × 100 watchers = ~240KB total
1239    ///
1240    /// **Tuning Guidelines**:
1241    /// - Fast consumers (< 1ms processing): 5-10
1242    /// - Normal consumers (1-10ms processing): 10-20
1243    /// - Slow consumers (> 10ms processing): 20-50
1244    ///
1245    /// **Default**: 10
1246    #[serde(default = "default_watcher_buffer_size")]
1247    pub watcher_buffer_size: usize,
1248
1249    /// Enable detailed metrics and logging for watch operations
1250    ///
1251    /// When enabled, logs warnings for dropped events and tracks watch
1252    /// performance metrics. Adds minimal overhead (~0.001%) but useful
1253    /// for debugging and monitoring.
1254    ///
1255    /// **Default**: false (minimal overhead in production)
1256    #[serde(default = "default_enable_watch_metrics")]
1257    pub enable_metrics: bool,
1258}
1259
1260impl Default for WatchConfig {
1261    fn default() -> Self {
1262        Self {
1263            event_queue_size: default_event_queue_size(),
1264            watcher_buffer_size: default_watcher_buffer_size(),
1265            enable_metrics: default_enable_watch_metrics(),
1266        }
1267    }
1268}
1269
1270impl WatchConfig {
1271    /// Validates watch configuration parameters
1272    pub fn validate(&self) -> Result<()> {
1273        if self.event_queue_size == 0 {
1274            return Err(Error::Config(ConfigError::Message(
1275                "watch.event_queue_size must be greater than 0".into(),
1276            )));
1277        }
1278
1279        if self.event_queue_size > 100_000 {
1280            warn!(
1281                "watch.event_queue_size ({}) is very large and may consume significant memory (~{}MB)",
1282                self.event_queue_size,
1283                (self.event_queue_size * 24) / 1_000_000
1284            );
1285        }
1286
1287        if self.watcher_buffer_size == 0 {
1288            return Err(Error::Config(ConfigError::Message(
1289                "watch.watcher_buffer_size must be greater than 0".into(),
1290            )));
1291        }
1292
1293        if self.watcher_buffer_size > 1000 {
1294            warn!(
1295                "watch.watcher_buffer_size ({}) is very large. Each watcher will consume ~{}KB memory",
1296                self.watcher_buffer_size,
1297                (self.watcher_buffer_size * 240) / 1000
1298            );
1299        }
1300
1301        Ok(())
1302    }
1303}
1304
1305const fn default_event_queue_size() -> usize {
1306    1000
1307}
1308
1309const fn default_watcher_buffer_size() -> usize {
1310    10
1311}
1312
1313const fn default_enable_watch_metrics() -> bool {
1314    false
1315}
1316
1317/// Performance metrics configuration
1318///
1319/// Controls emission of observability metrics. Disabling metrics reduces overhead
1320/// in hot paths but decreases system visibility.
1321///
1322/// # Example
1323/// ```toml
1324/// [raft.metrics]
1325/// enable_backpressure = true
1326/// enable_batch = true
1327/// sample_rate = 1  # No sampling (record every event)
1328/// ```
1329#[derive(Debug, Serialize, Deserialize, Clone)]
1330pub struct MetricsConfig {
1331    /// Enable backpressure metrics (rejections, buffer utilization)
1332    ///
1333    /// Tracks client request backpressure for capacity planning.
1334    /// Disable for absolute maximum performance in trusted environments.
1335    ///
1336    /// **Default**: false
1337    #[serde(default = "default_enable_backpressure_metrics")]
1338    pub enable_backpressure: bool,
1339
1340    /// Enable buffer length gauge (batch.buffer_length for propose and linearizable buffers)
1341    ///
1342    /// Tracks buffer utilization for capacity planning.
1343    /// Disable for absolute maximum performance in trusted environments.
1344    ///
1345    /// **Default**: false
1346    #[serde(default = "default_enable_batch_metrics")]
1347    pub enable_batch: bool,
1348
1349    /// Sample rate for high-frequency gauge metrics
1350    ///
1351    /// - `1` = record every event (no sampling)
1352    /// - `10` = record 1 out of 10 events (10% sampling)
1353    /// - `100` = record 1 out of 100 events (1% sampling)
1354    ///
1355    /// Applies to: buffer_utilization, buffer_length gauges.
1356    /// Does NOT apply to: counters (rejections, drain/heartbeat triggers).
1357    ///
1358    /// **Default**: 1 (no sampling)
1359    #[serde(default = "default_metrics_sample_rate")]
1360    pub sample_rate: u32,
1361}
1362
1363impl Default for MetricsConfig {
1364    fn default() -> Self {
1365        Self {
1366            enable_backpressure: default_enable_backpressure_metrics(),
1367            enable_batch: default_enable_batch_metrics(),
1368            sample_rate: default_metrics_sample_rate(),
1369        }
1370    }
1371}
1372
1373fn default_enable_backpressure_metrics() -> bool {
1374    false
1375}
1376
1377fn default_enable_batch_metrics() -> bool {
1378    false
1379}
1380
1381fn default_metrics_sample_rate() -> u32 {
1382    1 // No sampling by default
1383}