Skip to main content

d_engine_core/config/
raft.rs

1use std::fmt::Debug;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use config::ConfigError;
6use serde::Deserialize;
7use serde::Serialize;
8use tracing::warn;
9
10use super::lease::LeaseConfig;
11use super::validate_directory;
12use crate::Error;
13use crate::Result;
14
15/// Configuration parameters for the Raft consensus algorithm implementation
16#[derive(Serialize, Deserialize, Clone)]
17pub struct RaftConfig {
18    /// Configuration settings related to log replication
19    /// Includes parameters like heartbeat interval and AppendEntries entry count limit
20    #[serde(default)]
21    pub replication: ReplicationConfig,
22
23    /// Client request batching configuration
24    ///
25    /// Controls flush thresholds for the leader's propose and linearizable-read buffers.
26    /// Separate from replication config — this governs the client ingestion layer, not
27    /// the Leader→Follower replication path.
28    #[serde(default)]
29    pub batching: BatchingConfig,
30
31    /// Configuration settings for leader election mechanism
32    /// Controls timeouts and randomization factors for election timing
33    #[serde(default)]
34    pub election: ElectionConfig,
35
36    /// Configuration settings for cluster membership changes
37    /// Handles joint consensus transitions and cluster reconfiguration rules
38    #[serde(default)]
39    pub membership: MembershipConfig,
40
41    /// Configuration settings for state machine behavior
42    /// Controls state machine operations like lease management, compaction, etc.
43    /// For backward compatibility, can also be configured via `storage` in TOML files.
44    #[serde(default, alias = "storage")]
45    pub state_machine: StateMachineConfig,
46
47    /// Configuration settings for snapshot feature
48    #[serde(default)]
49    pub snapshot: SnapshotConfig,
50
51    /// Configuration settings for log persistence behavior
52    /// Controls how and when log entries are persisted to stable storage
53    #[serde(default)]
54    pub persistence: PersistenceConfig,
55
56    /// Maximum allowed log entry gap between leader and learner nodes
57    /// Learners with larger gaps than this value will trigger catch-up replication
58    /// Default value is set via default_learner_catchup_threshold() function
59    #[serde(default = "default_learner_catchup_threshold")]
60    pub learner_catchup_threshold: u64,
61
62    /// Throttle interval (milliseconds) for learner progress checks
63    /// Prevents excessive checking of learner promotion eligibility
64    /// Default value is set via default_learner_check_throttle_ms() function
65    #[serde(default = "default_learner_check_throttle_ms")]
66    pub learner_check_throttle_ms: u64,
67
68    /// Base timeout duration (in milliseconds) for general Raft operations
69    /// Used as fallback timeout when operation-specific timeouts are not set
70    /// Default value is set via default_general_timeout() function
71    #[serde(default = "default_general_timeout")]
72    pub general_raft_timeout_duration_in_ms: u64,
73
74    /// Timeout for snapshot RPC operations (milliseconds)
75    #[serde(default = "default_snapshot_rpc_timeout_ms")]
76    pub snapshot_rpc_timeout_ms: u64,
77
78    /// Command channel capacity for client write requests
79    /// Bounded channel prevents unbounded memory growth under high load
80    /// Default value is set via default_cmd_channel_capacity() function
81    #[serde(default = "default_cmd_channel_capacity")]
82    pub cmd_channel_capacity: usize,
83
84    /// Ordered channel capacity for stream_append_entries ordering
85    /// Controls buffering of response receivers in FIFO order
86    /// Default value is set via default_ordered_channel_capacity() function
87    #[serde(default = "default_ordered_channel_capacity")]
88    pub ordered_channel_capacity: usize,
89
90    /// Configuration settings for new node auto join feature
91    #[serde(default)]
92    pub auto_join: AutoJoinConfig,
93
94    /// Configuration for read operation consistency behavior
95    /// Controls the trade-off between read performance and consistency guarantees
96    #[serde(default)]
97    pub read_consistency: ReadConsistencyConfig,
98
99    /// Backpressure configuration for client request flow control
100    /// Prevents unbounded memory growth by rejecting excess requests
101    #[serde(default)]
102    pub backpressure: BackpressureConfig,
103
104    /// RPC compression configuration for different service types
105    ///
106    /// Controls which RPC service types use response compression.
107    /// Allows fine-tuning for performance optimization based on
108    /// deployment environment and traffic patterns.
109    #[serde(default)]
110    pub rpc_compression: RpcCompressionConfig,
111
112    /// Configuration for Watch mechanism that monitors key changes
113    /// Controls event queue sizes and metrics behavior
114    #[serde(default)]
115    pub watch: WatchConfig,
116
117    /// Performance metrics configuration
118    /// Controls metrics emission and sampling for observability vs performance trade-off
119    #[serde(default)]
120    pub metrics: MetricsConfig,
121}
122
123impl Debug for RaftConfig {
124    fn fmt(
125        &self,
126        f: &mut std::fmt::Formatter<'_>,
127    ) -> std::fmt::Result {
128        f.debug_struct("RaftConfig").finish()
129    }
130}
131impl Default for RaftConfig {
132    fn default() -> Self {
133        Self {
134            replication: ReplicationConfig::default(),
135            batching: BatchingConfig::default(),
136            election: ElectionConfig::default(),
137            membership: MembershipConfig::default(),
138            state_machine: StateMachineConfig::default(),
139            snapshot: SnapshotConfig::default(),
140            persistence: PersistenceConfig::default(),
141            learner_catchup_threshold: default_learner_catchup_threshold(),
142            learner_check_throttle_ms: default_learner_check_throttle_ms(),
143            general_raft_timeout_duration_in_ms: default_general_timeout(),
144            auto_join: AutoJoinConfig::default(),
145            snapshot_rpc_timeout_ms: default_snapshot_rpc_timeout_ms(),
146            cmd_channel_capacity: default_cmd_channel_capacity(),
147            ordered_channel_capacity: default_ordered_channel_capacity(),
148            read_consistency: ReadConsistencyConfig::default(),
149            backpressure: BackpressureConfig::default(),
150            rpc_compression: RpcCompressionConfig::default(),
151            watch: WatchConfig::default(),
152            metrics: MetricsConfig::default(),
153        }
154    }
155}
156impl RaftConfig {
157    /// Validates all Raft subsystem configurations
158    pub fn validate(&self) -> Result<()> {
159        if self.learner_catchup_threshold == 0 {
160            return Err(Error::Config(ConfigError::Message(
161                "learner_catchup_threshold must be greater than 0".into(),
162            )));
163        }
164
165        if self.general_raft_timeout_duration_in_ms < 1 {
166            return Err(Error::Config(ConfigError::Message(
167                "general_raft_timeout_duration_in_ms must be at least 1ms".into(),
168            )));
169        }
170
171        self.replication.validate()?;
172        self.batching.validate()?;
173        self.election.validate()?;
174        self.membership.validate()?;
175        self.state_machine.validate()?;
176        self.snapshot.validate()?;
177        self.read_consistency.validate()?;
178        self.watch.validate()?;
179        self.persistence.validate()?;
180
181        // Enforce lease_duration < election_timeout (load-bearing safety constraint).
182        // The lease fast path for linearizable reads relies on the invariant that
183        // "lease valid" and "new leader elected" are mutually exclusive on the timeline.
184        // This holds only when: lease_duration < election_timeout - max_clock_drift.
185        // At minimum, lease_duration must be strictly less than election_timeout_min.
186        if self.read_consistency.lease_duration_ms >= self.election.election_timeout_min {
187            return Err(Error::Config(ConfigError::Message(format!(
188                "read_consistency.lease_duration_ms ({}) must be strictly less than \
189                 election_timeout_min ({}ms) — required for lease-based linearizable reads \
190                 to be safe under partition",
191                self.read_consistency.lease_duration_ms, self.election.election_timeout_min,
192            ))));
193        }
194
195        Ok(())
196    }
197}
198
199fn default_learner_catchup_threshold() -> u64 {
200    1
201}
202
203fn default_learner_check_throttle_ms() -> u64 {
204    1000 // 1 second
205}
206
207// in ms
208fn default_general_timeout() -> u64 {
209    50
210}
211fn default_snapshot_rpc_timeout_ms() -> u64 {
212    // 1 hour - sufficient for large snapshots
213    3_600_000
214}
215
216fn default_cmd_channel_capacity() -> usize {
217    1024
218}
219
220fn default_ordered_channel_capacity() -> usize {
221    1024
222}
223
224#[derive(Debug, Serialize, Deserialize, Clone)]
225pub struct ReplicationConfig {
226    /// Heartbeat interval (milliseconds): how often the leader sends AppendEntries RPCs.
227    #[serde(default = "default_append_interval")]
228    pub rpc_append_entries_clock_in_ms: u64,
229
230    /// Maximum log entries per single AppendEntries RPC to a follower.
231    #[serde(default = "default_entries_per_replication")]
232    pub append_entries_max_entries_per_replication: u64,
233}
234
235impl Default for ReplicationConfig {
236    fn default() -> Self {
237        Self {
238            rpc_append_entries_clock_in_ms: default_append_interval(),
239            append_entries_max_entries_per_replication: default_entries_per_replication(),
240        }
241    }
242}
243impl ReplicationConfig {
244    fn validate(&self) -> Result<()> {
245        if self.rpc_append_entries_clock_in_ms == 0 {
246            return Err(Error::Config(ConfigError::Message(
247                "rpc_append_entries_clock_in_ms cannot be 0".into(),
248            )));
249        }
250
251        if self.append_entries_max_entries_per_replication == 0 {
252            return Err(Error::Config(ConfigError::Message(
253                "append_entries_max_entries_per_replication must be > 0".into(),
254            )));
255        }
256
257        Ok(())
258    }
259}
260
261/// Batching configuration for leader-side drain loops and buffer allocation.
262///
263/// A single value intentionally covers all drain loops because they all operate
264/// within the same heartbeat period and share the same order-of-magnitude concurrency:
265/// - `raft.rs` cmd_rx drain (client propose ingestion)
266/// - `raft.rs` role_rx drain (commit index event coalescing)
267/// - `DefaultCommitHandler` new_commit_rx drain (state machine apply batching)
268///
269/// Also used as the initial Vec capacity for propose and linearizable-read buffers
270/// (construction-time hint only; the propose buffer retains capacity across flushes
271/// via `mem::swap`, while the read buffer resets each flush via `mem::take`).
272///
273/// # Tuning Guidelines
274/// - Low latency priority: lower values → smaller batches, faster flush
275/// - Throughput priority: higher values → larger batches, fewer RPCs
276#[derive(Debug, Serialize, Deserialize, Clone)]
277pub struct BatchingConfig {
278    /// Maximum items to drain per heartbeat period across all drain loops.
279    ///
280    /// **Default**: 100
281    #[serde(default = "default_max_batch_size")]
282    pub max_batch_size: usize,
283}
284
285impl Default for BatchingConfig {
286    fn default() -> Self {
287        Self {
288            max_batch_size: default_max_batch_size(),
289        }
290    }
291}
292
293impl BatchingConfig {
294    fn validate(&self) -> Result<()> {
295        if self.max_batch_size == 0 {
296            return Err(Error::Config(ConfigError::Message(
297                "batching.max_batch_size must be > 0".into(),
298            )));
299        }
300        Ok(())
301    }
302}
303
304fn default_append_interval() -> u64 {
305    100
306}
307fn default_max_batch_size() -> usize {
308    100
309}
310fn default_entries_per_replication() -> u64 {
311    100
312}
313#[derive(Debug, Serialize, Deserialize, Clone)]
314pub struct ElectionConfig {
315    #[serde(default = "default_election_timeout_min")]
316    pub election_timeout_min: u64,
317
318    #[serde(default = "default_election_timeout_max")]
319    pub election_timeout_max: u64,
320
321    #[serde(default = "default_peer_monitor_interval")]
322    pub rpc_peer_connectinon_monitor_interval_in_sec: u64,
323
324    #[serde(default = "default_client_request_id")]
325    pub internal_rpc_client_request_id: u32,
326}
327
328impl Default for ElectionConfig {
329    fn default() -> Self {
330        Self {
331            election_timeout_min: default_election_timeout_min(),
332            election_timeout_max: default_election_timeout_max(),
333            rpc_peer_connectinon_monitor_interval_in_sec: default_peer_monitor_interval(),
334            internal_rpc_client_request_id: default_client_request_id(),
335        }
336    }
337}
338impl ElectionConfig {
339    fn validate(&self) -> Result<()> {
340        if self.election_timeout_min >= self.election_timeout_max {
341            return Err(Error::Config(ConfigError::Message(format!(
342                "election_timeout_min {}ms must be less than election_timeout_max {}ms",
343                self.election_timeout_min, self.election_timeout_max
344            ))));
345        }
346
347        if self.rpc_peer_connectinon_monitor_interval_in_sec == 0 {
348            return Err(Error::Config(ConfigError::Message(
349                "rpc_peer_connectinon_monitor_interval_in_sec cannot be 0".into(),
350            )));
351        }
352
353        Ok(())
354    }
355}
356fn default_election_timeout_min() -> u64 {
357    500
358}
359fn default_election_timeout_max() -> u64 {
360    1000
361}
362fn default_peer_monitor_interval() -> u64 {
363    30
364}
365fn default_client_request_id() -> u32 {
366    0
367}
368
369#[derive(Debug, Serialize, Deserialize, Clone)]
370pub struct MembershipConfig {
371    #[serde(default = "default_probe_service")]
372    pub cluster_healthcheck_probe_service_name: String,
373
374    #[serde(default = "default_verify_leadership_persistent_timeout")]
375    pub verify_leadership_persistent_timeout: Duration,
376
377    #[serde(default)]
378    pub zombie: ZombieConfig,
379
380    /// Configuration settings for ready learners promotion
381    #[serde(default)]
382    pub promotion: PromotionConfig,
383}
384impl Default for MembershipConfig {
385    fn default() -> Self {
386        Self {
387            cluster_healthcheck_probe_service_name: default_probe_service(),
388            verify_leadership_persistent_timeout: default_verify_leadership_persistent_timeout(),
389            zombie: ZombieConfig::default(),
390            promotion: PromotionConfig::default(),
391        }
392    }
393}
394fn default_probe_service() -> String {
395    "d_engine.server.cluster.ClusterManagementService".to_string()
396}
397
398/// Default timeout for leader to keep verifying its leadership.
399///
400/// In Raft, the leader may retry sending no-op entries to confirm it still holds leadership.
401/// This timeout defines how long the leader will keep retrying before stepping down.
402///
403/// Default: 1 hour.
404fn default_verify_leadership_persistent_timeout() -> Duration {
405    Duration::from_secs(3600)
406}
407
408impl MembershipConfig {
409    fn validate(&self) -> Result<()> {
410        if self.cluster_healthcheck_probe_service_name.is_empty() {
411            return Err(Error::Config(ConfigError::Message(
412                "cluster_healthcheck_probe_service_name cannot be empty".into(),
413            )));
414        }
415        Ok(())
416    }
417}
418
419/// State machine behavior configuration
420///
421/// Controls state machine operations including lease management, compaction policies,
422/// and other data lifecycle features. This configuration affects how the state machine
423/// processes applied log entries and manages data.
424#[derive(Serialize, Deserialize, Clone, Debug)]
425#[derive(Default)]
426pub struct StateMachineConfig {
427    /// Lease (time-based expiration) configuration
428    ///
429    /// For backward compatibility, can also be configured via `ttl` in TOML files.
430    #[serde(alias = "ttl")]
431    pub lease: LeaseConfig,
432}
433
434impl StateMachineConfig {
435    pub fn validate(&self) -> Result<()> {
436        self.lease.validate()?;
437        Ok(())
438    }
439}
440
441/// Submit processor-specific configuration
442#[derive(Debug, Serialize, Deserialize, Clone)]
443pub struct SnapshotConfig {
444    /// If enable the snapshot or not
445    #[serde(default = "default_snapshot_enabled")]
446    pub enable: bool,
447
448    /// Maximum number of log entries to accumulate before triggering snapshot creation
449    /// This helps control memory usage by enforcing periodic state compaction
450    #[serde(default = "default_max_log_entries_before_snapshot")]
451    pub max_log_entries_before_snapshot: u64,
452
453    /// Minimum duration to wait between consecutive snapshot checks.
454    /// Acts as a cooldown period to avoid overly frequent snapshot evaluations.
455    #[serde(default = "default_snapshot_cool_down_since_last_check")]
456    pub snapshot_cool_down_since_last_check: Duration,
457
458    /// Number of historical snapshot versions to retain during cleanup
459    /// Ensures we maintain a safety buffer of previous states for recovery
460    #[serde(default = "default_cleanup_retain_count")]
461    pub cleanup_retain_count: u64,
462
463    /// Snapshot storage directory
464    ///
465    /// Default: `default_snapshots_dir()` (/tmp/snapshots)
466    #[serde(default = "default_snapshots_dir")]
467    pub snapshots_dir: PathBuf,
468
469    #[serde(default = "default_snapshots_dir_prefix")]
470    pub snapshots_dir_prefix: String,
471
472    /// Size (in bytes) of individual chunks when transferring snapshots
473    ///
474    /// Default: `default_chunk_size()` (typically 1MB)
475    #[serde(default = "default_chunk_size")]
476    pub chunk_size: usize,
477
478    /// Number of log entries to retain (0 = disable retention)
479    #[serde(default = "default_retained_log_entries")]
480    pub retained_log_entries: u64,
481
482    /// Number of chunks to process before yielding the task
483    #[serde(default = "default_sender_yield_every_n_chunks")]
484    pub sender_yield_every_n_chunks: usize,
485
486    /// Number of chunks to process before yielding the task
487    #[serde(default = "default_receiver_yield_every_n_chunks")]
488    pub receiver_yield_every_n_chunks: usize,
489
490    #[serde(default = "default_max_bandwidth_mbps")]
491    pub max_bandwidth_mbps: u32,
492
493    #[serde(default = "default_push_queue_size")]
494    pub push_queue_size: usize,
495
496    #[serde(default = "default_cache_size")]
497    pub cache_size: usize,
498
499    #[serde(default = "default_max_retries")]
500    pub max_retries: u32,
501
502    #[serde(default = "default_transfer_timeout_in_sec")]
503    pub transfer_timeout_in_sec: u64,
504
505    #[serde(default = "default_retry_interval_in_ms")]
506    pub retry_interval_in_ms: u64,
507
508    #[serde(default = "default_snapshot_push_backoff_in_ms")]
509    pub snapshot_push_backoff_in_ms: u64,
510
511    #[serde(default = "default_snapshot_push_max_retry")]
512    pub snapshot_push_max_retry: u32,
513
514    #[serde(default = "default_push_timeout_in_ms")]
515    pub push_timeout_in_ms: u64,
516
517    /// Maximum duration to wait for a single chunk during snapshot reception.
518    /// Applies per-chunk on the follower side. Increase for slow networks or large chunks.
519    ///
520    /// Default: 30 seconds
521    #[serde(default = "default_receive_chunk_timeout_in_sec")]
522    pub receive_chunk_timeout_in_sec: u64,
523}
524impl Default for SnapshotConfig {
525    fn default() -> Self {
526        Self {
527            max_log_entries_before_snapshot: default_max_log_entries_before_snapshot(),
528            snapshot_cool_down_since_last_check: default_snapshot_cool_down_since_last_check(),
529            cleanup_retain_count: default_cleanup_retain_count(),
530            snapshots_dir: default_snapshots_dir(),
531            snapshots_dir_prefix: default_snapshots_dir_prefix(),
532            chunk_size: default_chunk_size(),
533            retained_log_entries: default_retained_log_entries(),
534            sender_yield_every_n_chunks: default_sender_yield_every_n_chunks(),
535            receiver_yield_every_n_chunks: default_receiver_yield_every_n_chunks(),
536            max_bandwidth_mbps: default_max_bandwidth_mbps(),
537            push_queue_size: default_push_queue_size(),
538            cache_size: default_cache_size(),
539            max_retries: default_max_retries(),
540            transfer_timeout_in_sec: default_transfer_timeout_in_sec(),
541            retry_interval_in_ms: default_retry_interval_in_ms(),
542            snapshot_push_backoff_in_ms: default_snapshot_push_backoff_in_ms(),
543            snapshot_push_max_retry: default_snapshot_push_max_retry(),
544            push_timeout_in_ms: default_push_timeout_in_ms(),
545            receive_chunk_timeout_in_sec: default_receive_chunk_timeout_in_sec(),
546            enable: default_snapshot_enabled(),
547        }
548    }
549}
550impl SnapshotConfig {
551    fn validate(&self) -> Result<()> {
552        if self.max_log_entries_before_snapshot == 0 {
553            return Err(Error::Config(ConfigError::Message(
554                "max_log_entries_before_snapshot must be greater than 0".into(),
555            )));
556        }
557
558        if self.cleanup_retain_count == 0 {
559            return Err(Error::Config(ConfigError::Message(
560                "cleanup_retain_count must be greater than 0".into(),
561            )));
562        }
563        // Validate storage paths
564        validate_directory(&self.snapshots_dir, "snapshots_dir")?;
565
566        // chunk_size should be > 0
567        if self.chunk_size == 0 {
568            return Err(Error::Config(ConfigError::Message(format!(
569                "chunk_size must be at least {} bytes (got {})",
570                0, self.chunk_size
571            ))));
572        }
573
574        if self.retained_log_entries < 1 {
575            return Err(Error::Config(ConfigError::Message(format!(
576                "retained_log_entries must be >= 1, (got {})",
577                self.retained_log_entries
578            ))));
579        }
580
581        if self.sender_yield_every_n_chunks < 1 {
582            return Err(Error::Config(ConfigError::Message(format!(
583                "sender_yield_every_n_chunks must be >= 1, (got {})",
584                self.sender_yield_every_n_chunks
585            ))));
586        }
587
588        if self.receiver_yield_every_n_chunks < 1 {
589            return Err(Error::Config(ConfigError::Message(format!(
590                "receiver_yield_every_n_chunks must be >= 1, (got {})",
591                self.receiver_yield_every_n_chunks
592            ))));
593        }
594
595        if self.push_queue_size < 1 {
596            return Err(Error::Config(ConfigError::Message(format!(
597                "push_queue_size must be >= 1, (got {})",
598                self.push_queue_size
599            ))));
600        }
601
602        if self.receive_chunk_timeout_in_sec == 0 {
603            return Err(Error::Config(ConfigError::Message(
604                "receive_chunk_timeout_in_sec must be greater than 0".into(),
605            )));
606        }
607
608        if self.snapshot_push_max_retry < 1 {
609            return Err(Error::Config(ConfigError::Message(format!(
610                "snapshot_push_max_retry must be >= 1, (got {})",
611                self.snapshot_push_max_retry
612            ))));
613        }
614
615        Ok(())
616    }
617}
618
619fn default_snapshot_enabled() -> bool {
620    true
621}
622
623/// Default threshold for triggering snapshot creation
624fn default_max_log_entries_before_snapshot() -> u64 {
625    1000
626}
627
628/// Default cooldown duration between snapshot checks.
629///
630/// Prevents constant evaluation of snapshot conditions in tight loops.
631/// Reduced from 3600s to 60s to allow timely log compaction under typical workloads.
632fn default_snapshot_cool_down_since_last_check() -> Duration {
633    Duration::from_secs(60)
634}
635
636/// Default number of historical snapshots to retain
637fn default_cleanup_retain_count() -> u64 {
638    2
639}
640/// Default snapshots storage path
641fn default_snapshots_dir() -> PathBuf {
642    PathBuf::from("/tmp/snapshots")
643}
644/// Default snapshots directory prefix
645fn default_snapshots_dir_prefix() -> String {
646    "snapshot-".to_string()
647}
648
649/// 1KB chunks by default
650fn default_chunk_size() -> usize {
651    1024
652}
653
654#[derive(Debug, Serialize, Deserialize, Clone)]
655pub struct AutoJoinConfig {
656    #[serde(default = "default_rpc_enable_compression")]
657    pub rpc_enable_compression: bool,
658}
659impl Default for AutoJoinConfig {
660    fn default() -> Self {
661        Self {
662            rpc_enable_compression: default_rpc_enable_compression(),
663        }
664    }
665}
666fn default_rpc_enable_compression() -> bool {
667    true
668}
669
670fn default_retained_log_entries() -> u64 {
671    1
672}
673
674fn default_sender_yield_every_n_chunks() -> usize {
675    1
676}
677
678fn default_receiver_yield_every_n_chunks() -> usize {
679    1
680}
681
682fn default_max_bandwidth_mbps() -> u32 {
683    1
684}
685
686fn default_push_queue_size() -> usize {
687    100
688}
689
690fn default_cache_size() -> usize {
691    10000
692}
693fn default_max_retries() -> u32 {
694    1
695}
696fn default_transfer_timeout_in_sec() -> u64 {
697    600
698}
699fn default_retry_interval_in_ms() -> u64 {
700    10
701}
702fn default_snapshot_push_backoff_in_ms() -> u64 {
703    100
704}
705fn default_snapshot_push_max_retry() -> u32 {
706    3
707}
708fn default_push_timeout_in_ms() -> u64 {
709    300_000
710}
711fn default_receive_chunk_timeout_in_sec() -> u64 {
712    30
713}
714
715#[derive(Debug, Serialize, Deserialize, Clone)]
716pub struct ZombieConfig {
717    /// zombie connection failed threshold
718    #[serde(default = "default_zombie_threshold")]
719    pub threshold: u32,
720
721    #[serde(default = "default_zombie_purge_interval")]
722    pub purge_interval: Duration,
723}
724
725impl Default for ZombieConfig {
726    fn default() -> Self {
727        Self {
728            threshold: default_zombie_threshold(),
729            purge_interval: default_zombie_purge_interval(),
730        }
731    }
732}
733
734fn default_zombie_threshold() -> u32 {
735    3
736}
737// 30 seconds
738fn default_zombie_purge_interval() -> Duration {
739    Duration::from_secs(30)
740}
741
742#[derive(Debug, Serialize, Deserialize, Clone)]
743pub struct PromotionConfig {
744    #[serde(default = "default_stale_learner_threshold")]
745    pub stale_learner_threshold: Duration,
746}
747
748impl Default for PromotionConfig {
749    fn default() -> Self {
750        Self {
751            stale_learner_threshold: default_stale_learner_threshold(),
752        }
753    }
754}
755
756// 5 minutes
757fn default_stale_learner_threshold() -> Duration {
758    Duration::from_secs(300)
759}
760/// Defines how Raft log entries are persisted and accessed.
761///
762/// All strategies use a configurable [`FlushPolicy`] to control when memory contents
763/// are flushed to disk, affecting write latency and durability guarantees.
764///
765/// **Note:** Both strategies now fully load all log entries from disk into memory at startup.
766/// The in-memory `SkipMap` serves as the primary data structure for reads in all modes.
767#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
768pub enum PersistenceStrategy {
769    /// Memory-first persistence strategy.
770    ///
771    /// - **Write path**: On append, the log entry is first written to the in-memory `SkipMap` and
772    ///   acknowledged immediately. Disk persistence happens asynchronously in the background,
773    ///   governed by [`FlushPolicy`].
774    ///
775    /// - **Read path**: Reads are always served from the in-memory `SkipMap`.
776    ///
777    /// - **Startup behavior**: All log entries are loaded from disk into memory at startup.
778    ///
779    MemFirst,
780}
781
782/// Controls when in-memory logs should be flushed to disk.
783///
784/// Flush is triggered by whichever comes first:
785/// - An explicit `flush()` call (immediate, no wait).
786/// - `append_entries` calls `write_notify.notify_one()` for an immediate persist+fsync.
787/// - The idle safety-net timer fires after `idle_flush_interval_ms` of inactivity.
788///
789/// `idle_flush_interval_ms` must be greater than zero. It only fires when no
790/// writes have arrived for that duration; normal-path latency is determined by
791/// the fsync execution time (drain-then-fsync architecture).
792#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
793pub enum FlushPolicy {
794    Batch { idle_flush_interval_ms: u64 },
795}
796
797/// Configuration parameters for log persistence behavior
798#[derive(Serialize, Deserialize, Clone, Debug)]
799pub struct PersistenceConfig {
800    /// Strategy for persisting Raft logs
801    ///
802    /// This controls the trade-off between durability guarantees and performance
803    /// characteristics. The choice impacts both write throughput and recovery
804    /// behavior after node failures.
805    #[serde(default = "default_persistence_strategy")]
806    pub strategy: PersistenceStrategy,
807
808    /// Flush policy for asynchronous strategies
809    ///
810    /// This controls when log entries are flushed to disk. The choice impacts
811    /// write performance and durability guarantees.
812    #[serde(default = "default_flush_policy")]
813    pub flush_policy: FlushPolicy,
814
815    /// Maximum number of in-memory log entries to buffer when using async strategies
816    ///
817    /// This acts as a safety valve to prevent memory exhaustion during periods of
818    /// high write throughput or when disk persistence is slow.
819    #[serde(default = "default_max_buffered_entries")]
820    pub max_buffered_entries: usize,
821}
822
823/// Default persistence strategy (optimized for balanced workloads)
824fn default_persistence_strategy() -> PersistenceStrategy {
825    PersistenceStrategy::MemFirst
826}
827
828/// Default flush policy for asynchronous strategies
829///
830/// This controls when log entries are flushed to disk. The choice impacts
831/// write performance and durability guarantees.
832fn default_flush_policy() -> FlushPolicy {
833    FlushPolicy::Batch {
834        idle_flush_interval_ms: 1000,
835    }
836}
837
838/// Default maximum buffered log entries
839fn default_max_buffered_entries() -> usize {
840    10_000
841}
842
843impl PersistenceConfig {
844    pub fn validate(&self) -> Result<()> {
845        let FlushPolicy::Batch {
846            idle_flush_interval_ms,
847        } = self.flush_policy;
848        if idle_flush_interval_ms == 0 {
849            return Err(Error::Config(ConfigError::Message(
850                "flush_policy.idle_flush_interval_ms must be greater than 0".into(),
851            )));
852        }
853        Ok(())
854    }
855}
856
857impl Default for PersistenceConfig {
858    fn default() -> Self {
859        Self {
860            strategy: default_persistence_strategy(),
861            flush_policy: default_flush_policy(),
862            max_buffered_entries: default_max_buffered_entries(),
863        }
864    }
865}
866
867/// Backpressure configuration for client request flow control
868///
869/// Prevents unbounded memory growth by limiting pending client requests.
870/// When limits are reached, new requests are rejected with RESOURCE_EXHAUSTED
871/// error until the system processes existing requests.
872///
873/// # Value Semantics
874/// - `0` = unlimited (no backpressure)
875/// - `> 0` = maximum pending requests allowed
876///
877/// # Tuning Guidelines(only for reference)
878/// - Low memory (< 4GB): 1000-5000
879/// - Medium memory (4-16GB): 5000-20000
880/// - High memory (> 16GB): 20000-50000
881///
882/// # Example
883/// ```toml
884/// [raft.backpressure]
885/// max_pending_writes = 10000
886/// max_pending_reads = 50000
887/// ```
888#[derive(Debug, Clone, Serialize, Deserialize)]
889pub struct BackpressureConfig {
890    /// Maximum pending write (propose) requests
891    ///
892    /// Limits the number of client write requests waiting in the leader's
893    /// propose buffer. Write requests typically consume more resources
894    /// (replication, persistence) than reads.
895    ///
896    /// **Default**: 10000 (0 = unlimited)
897    #[serde(default = "default_max_pending_writes")]
898    pub max_pending_writes: usize,
899
900    /// Maximum pending linearizable read requests
901    ///
902    /// Limits the number of client linearizable read requests waiting in
903    /// the leader's read buffer. Read requests can tolerate higher limits
904    /// as they don't require replication.
905    ///
906    /// **Default**: 50000 (0 = unlimited)
907    #[serde(default = "default_max_pending_reads")]
908    pub max_pending_reads: usize,
909}
910
911impl Default for BackpressureConfig {
912    fn default() -> Self {
913        Self {
914            max_pending_writes: default_max_pending_writes(),
915            max_pending_reads: default_max_pending_reads(),
916        }
917    }
918}
919
920fn default_max_pending_writes() -> usize {
921    10_000
922}
923
924fn default_max_pending_reads() -> usize {
925    50_000
926}
927
928impl BackpressureConfig {
929    /// Check if write request should be rejected due to backpressure
930    ///
931    /// Returns true if the current pending count exceeds the limit.
932    /// When `max_pending_writes == 0`, always returns false (unlimited).
933    pub fn should_reject_write(
934        &self,
935        current_pending: usize,
936    ) -> bool {
937        self.max_pending_writes > 0 && current_pending >= self.max_pending_writes
938    }
939
940    /// Check if read request should be rejected due to backpressure
941    ///
942    /// Returns true if the current pending count exceeds the limit.
943    /// When `max_pending_reads == 0`, always returns false (unlimited).
944    pub fn should_reject_read(
945        &self,
946        current_pending: usize,
947    ) -> bool {
948        self.max_pending_reads > 0 && current_pending >= self.max_pending_reads
949    }
950}
951
952/// Policy for read operation consistency guarantees
953///
954/// Determines the trade-off between read consistency and performance.
955/// Clients can choose the appropriate level based on their requirements.
956#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
957pub enum ReadConsistencyPolicy {
958    /// Lease-based reads for better performance with weaker consistency
959    ///
960    /// The leader serves reads locally without contacting followers
961    /// during the valid lease period. Assumes bounded clock drift between nodes.
962    /// Provides lower latency but slightly weaker consistency guarantees
963    /// compared to LinearizableRead.
964    LeaseRead,
965
966    /// Fully linearizable reads for strongest consistency
967    ///
968    /// The leader verifies its leadership with a quorum before serving
969    /// the read, ensuring strict linearizability. This guarantees that
970    /// all reads reflect the most recent committed value in the cluster.
971    #[default]
972    LinearizableRead,
973
974    /// Eventually consistent reads from any node
975    ///
976    /// Allows reading from any node (leader, follower, or candidate) without
977    /// additional consistency checks. May return stale data but provides
978    /// best read performance and availability. Suitable for scenarios where
979    /// eventual consistency is acceptable.
980    /// **Can be served by non-leader nodes.**
981    EventualConsistency,
982}
983
984/// Configuration for read operation consistency behavior
985#[derive(Clone, Debug, Serialize, Deserialize)]
986pub struct ReadConsistencyConfig {
987    /// Default read consistency policy for the cluster
988    ///
989    /// This sets the cluster-wide default behavior. Individual read requests
990    /// can still override this setting when needed for specific use cases.
991    #[serde(default)]
992    pub default_policy: ReadConsistencyPolicy,
993
994    /// Lease duration in milliseconds for LeaseRead policy
995    ///
996    /// Only applicable when using the LeaseRead policy. The leader considers
997    /// itself valid for this duration after successfully heartbeating to a quorum.
998    ///
999    /// **MUST be > 0**. Config validation will reject 0 or invalid values.
1000    #[serde(default = "default_lease_duration_ms")]
1001    pub lease_duration_ms: u64,
1002
1003    /// Whether to allow clients to override the default policy per request
1004    ///
1005    /// When true, clients can specify consistency requirements per read request.
1006    /// When false, all reads use the cluster's default_policy setting.
1007    #[serde(default = "default_allow_client_override")]
1008    pub allow_client_override: bool,
1009
1010    /// Timeout in milliseconds to wait for state machine to catch up with commit index
1011    ///
1012    /// Used by LinearizableRead to ensure the state machine has applied all committed
1013    /// entries before serving reads. Typical apply latency is <1ms on local SSD.
1014    /// Default: 10ms (safe buffer for single-node local deployments)
1015    #[serde(default = "default_state_machine_sync_timeout_ms")]
1016    pub state_machine_sync_timeout_ms: u64,
1017}
1018
1019impl Default for ReadConsistencyConfig {
1020    fn default() -> Self {
1021        Self {
1022            default_policy: ReadConsistencyPolicy::default(),
1023            lease_duration_ms: default_lease_duration_ms(),
1024            allow_client_override: default_allow_client_override(),
1025            state_machine_sync_timeout_ms: default_state_machine_sync_timeout_ms(),
1026        }
1027    }
1028}
1029
1030fn default_lease_duration_ms() -> u64 {
1031    // Conservative default: half of a typical heartbeat interval (~300ms)
1032    250
1033}
1034
1035fn default_allow_client_override() -> bool {
1036    // Allow flexibility by default — clients can choose stronger consistency when needed
1037    true
1038}
1039
1040fn default_state_machine_sync_timeout_ms() -> u64 {
1041    10 // 10ms is safe for typical <1ms apply latency on local SSD
1042}
1043
1044impl ReadConsistencyConfig {
1045    fn validate(&self) -> Result<()> {
1046        // Validate read consistency configuration
1047        if self.lease_duration_ms == 0 {
1048            return Err(Error::Config(ConfigError::Message(
1049                "read_consistency.lease_duration_ms must be greater than 0".into(),
1050            )));
1051        }
1052        Ok(())
1053    }
1054}
1055
1056impl From<d_engine_proto::client::ReadConsistencyPolicy> for ReadConsistencyPolicy {
1057    fn from(proto_policy: d_engine_proto::client::ReadConsistencyPolicy) -> Self {
1058        match proto_policy {
1059            d_engine_proto::client::ReadConsistencyPolicy::LeaseRead => Self::LeaseRead,
1060            d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead => {
1061                Self::LinearizableRead
1062            }
1063            d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency => {
1064                Self::EventualConsistency
1065            }
1066        }
1067    }
1068}
1069
1070impl From<ReadConsistencyPolicy> for d_engine_proto::client::ReadConsistencyPolicy {
1071    fn from(config_policy: ReadConsistencyPolicy) -> Self {
1072        match config_policy {
1073            ReadConsistencyPolicy::LeaseRead => {
1074                d_engine_proto::client::ReadConsistencyPolicy::LeaseRead
1075            }
1076            ReadConsistencyPolicy::LinearizableRead => {
1077                d_engine_proto::client::ReadConsistencyPolicy::LinearizableRead
1078            }
1079            ReadConsistencyPolicy::EventualConsistency => {
1080                d_engine_proto::client::ReadConsistencyPolicy::EventualConsistency
1081            }
1082        }
1083    }
1084}
1085
1086/// Configuration for controlling gRPC compression settings per service type
1087///
1088/// Provides fine-grained control over when to enable compression based on
1089/// the RPC service type and deployment environment. Each service can be
1090/// independently configured to use compression based on its data
1091/// characteristics and frequency.
1092#[derive(Debug, Clone, Serialize, Deserialize)]
1093pub struct RpcCompressionConfig {
1094    /// Controls compression for Raft replication response data
1095    ///
1096    /// Replication traffic is typically high-frequency with small payloads
1097    /// in LAN environments, making compression less beneficial. In WAN
1098    /// deployments with bandwidth constraints, enabling may help.
1099    ///
1100    /// **Default**: `false` - Optimized for LAN/same-VPC deployments
1101    #[serde(default = "default_replication_compression")]
1102    pub replication_response: bool,
1103
1104    /// Controls compression for Raft election response data
1105    ///
1106    /// Election traffic is low-frequency but time-sensitive. Compression
1107    /// rarely benefits election traffic due to small payload size.
1108    ///
1109    /// **Default**: `true` for backward compatibility
1110    #[serde(default = "default_election_compression")]
1111    pub election_response: bool,
1112
1113    /// Controls compression for snapshot transfer response data
1114    ///
1115    /// Snapshot transfers involve large data volumes where compression
1116    /// is typically beneficial, even in low-latency environments.
1117    ///
1118    /// **Default**: `true` - Recommended for all environments
1119    #[serde(default = "default_snapshot_compression")]
1120    pub snapshot_response: bool,
1121
1122    /// Controls compression for cluster management response data
1123    ///
1124    /// Cluster operations are infrequent but may contain configuration data.
1125    /// Compression is generally beneficial for these operations.
1126    ///
1127    /// **Default**: `true` for backward compatibility
1128    #[serde(default = "default_cluster_compression")]
1129    pub cluster_response: bool,
1130
1131    /// Controls compression for client request response data
1132    ///
1133    /// Client responses may vary in size. In LAN/VPC environments,
1134    /// compression CPU overhead typically outweighs network benefits.
1135    ///
1136    /// **Default**: `false` - Optimized for LAN/same-VPC deployments
1137    #[serde(default = "default_client_compression")]
1138    pub client_response: bool,
1139}
1140
1141impl Default for RpcCompressionConfig {
1142    fn default() -> Self {
1143        Self {
1144            replication_response: default_replication_compression(),
1145            election_response: default_election_compression(),
1146            snapshot_response: default_snapshot_compression(),
1147            cluster_response: default_cluster_compression(),
1148            client_response: default_client_compression(),
1149        }
1150    }
1151}
1152
1153// Default values for RPC compression settings
1154fn default_replication_compression() -> bool {
1155    // Replication traffic is high-frequency with typically small payloads
1156    // For LAN/VPC deployments, compression adds CPU overhead without significant benefit
1157    false
1158}
1159
1160fn default_election_compression() -> bool {
1161    // Kept enabled for backward compatibility, though minimal benefit
1162    true
1163}
1164
1165fn default_snapshot_compression() -> bool {
1166    // Snapshot data is large and benefits from compression in all environments
1167    true
1168}
1169
1170fn default_cluster_compression() -> bool {
1171    // Kept enabled for backward compatibility
1172    true
1173}
1174
1175fn default_client_compression() -> bool {
1176    // Client responses in LAN/VPC environments typically benefit from no compression
1177    false
1178}
1179
1180/// Configuration for the Watch mechanism that monitors key changes
1181///
1182/// The watch system allows clients to monitor specific keys for changes with
1183/// minimal overhead on the write path. It uses a lock-free event queue and
1184/// configurable buffer sizes to balance performance and memory usage.
1185///
1186/// # Performance Characteristics
1187///
1188/// - Write path overhead: < 0.01% with 100+ watchers
1189/// - Event notification latency: typically < 100μs end-to-end
1190/// - Memory per watcher: ~2.4KB with default buffer size
1191///
1192/// # Configuration Example
1193///
1194/// ```toml
1195/// [raft.watch]
1196/// event_queue_size = 10240
1197/// watcher_buffer_size = 256
1198/// enable_metrics = false
1199/// max_watcher_count = 10000
1200/// ```
1201#[derive(Debug, Serialize, Deserialize, Clone)]
1202pub struct WatchConfig {
1203    /// Buffer size for the global event queue shared across all watchers
1204    ///
1205    /// This queue sits between the write path and the dispatcher thread.
1206    /// A larger queue reduces the chance of dropped events under burst load,
1207    /// but increases memory usage.
1208    ///
1209    /// **Performance Impact**:
1210    /// - Memory: ~24 bytes per slot (key + value pointers + event type)
1211    /// - Default 1000 slots ≈ 24KB memory
1212    ///
1213    /// **Tuning Guidelines**:
1214    /// - Low traffic (< 1K writes/sec): 500-1000
1215    /// - Medium traffic (1K-10K writes/sec): 1000-2000
1216    /// - High traffic (> 10K writes/sec): 2000-5000
1217    ///
1218    /// **Default**: 1000
1219    #[serde(default = "default_event_queue_size")]
1220    pub event_queue_size: usize,
1221
1222    /// Buffer size for each individual watcher's channel
1223    ///
1224    /// Each registered watcher gets its own channel to receive events.
1225    /// Smaller buffers reduce memory usage but increase the risk of
1226    /// dropping events for slow consumers.
1227    ///
1228    /// **Performance Impact**:
1229    /// - Memory: ~240 bytes per slot per watcher
1230    /// - 10 slots × 100 watchers = ~240KB total
1231    ///
1232    /// **Tuning Guidelines**:
1233    /// - Fast consumers (< 1ms processing): 5-10
1234    /// - Normal consumers (1-10ms processing): 10-20
1235    /// - Slow consumers (> 10ms processing): 20-50
1236    ///
1237    /// **Default**: 10
1238    #[serde(default = "default_watcher_buffer_size")]
1239    pub watcher_buffer_size: usize,
1240
1241    /// Enable detailed metrics and logging for watch operations
1242    ///
1243    /// When enabled, logs warnings for dropped events and tracks watch
1244    /// performance metrics. Adds minimal overhead (~0.001%) but useful
1245    /// for debugging and monitoring.
1246    ///
1247    /// **Default**: false (minimal overhead in production)
1248    #[serde(default = "default_enable_watch_metrics")]
1249    pub enable_metrics: bool,
1250
1251    /// Hard cap on total active watchers (exact + prefix combined).
1252    ///
1253    /// `register()` and `register_prefix()` return `WatchError::LimitExceeded`
1254    /// once this count is reached.  Leave at the default (effectively unlimited)
1255    /// or set to a finite value to prevent runaway watcher growth.
1256    ///
1257    /// **Default**: i64::MAX (effectively unlimited)
1258    #[serde(default = "default_max_watcher_count")]
1259    pub max_watcher_count: usize,
1260
1261    /// Heartbeat interval in milliseconds for progress notifications.
1262    ///
1263    /// The dispatcher broadcasts a `Progress` event to all active watchers on
1264    /// each tick so clients can confirm the stream is alive even during quiet
1265    /// periods.  Set to 0 to disable heartbeats entirely.
1266    ///
1267    /// Use milliseconds (not seconds) so tests can set short intervals (e.g. 50ms)
1268    /// without sleeping for a full second.
1269    ///
1270    /// **Default**: 30_000 (30 seconds)
1271    #[serde(default = "default_heartbeat_interval_ms")]
1272    pub heartbeat_interval_ms: u64,
1273}
1274
1275impl Default for WatchConfig {
1276    fn default() -> Self {
1277        Self {
1278            event_queue_size: default_event_queue_size(),
1279            watcher_buffer_size: default_watcher_buffer_size(),
1280            enable_metrics: default_enable_watch_metrics(),
1281            max_watcher_count: default_max_watcher_count(),
1282            heartbeat_interval_ms: default_heartbeat_interval_ms(),
1283        }
1284    }
1285}
1286
1287impl WatchConfig {
1288    /// Validates watch configuration parameters
1289    pub fn validate(&self) -> Result<()> {
1290        if self.event_queue_size == 0 {
1291            return Err(Error::Config(ConfigError::Message(
1292                "watch.event_queue_size must be greater than 0".into(),
1293            )));
1294        }
1295
1296        if self.event_queue_size > 100_000 {
1297            warn!(
1298                "watch.event_queue_size ({}) is very large and may consume significant memory (~{}MB)",
1299                self.event_queue_size,
1300                (self.event_queue_size * 24) / 1_000_000
1301            );
1302        }
1303
1304        if self.watcher_buffer_size == 0 {
1305            return Err(Error::Config(ConfigError::Message(
1306                "watch.watcher_buffer_size must be greater than 0".into(),
1307            )));
1308        }
1309
1310        if self.watcher_buffer_size > 1000 {
1311            warn!(
1312                "watch.watcher_buffer_size ({}) is very large. Each watcher will consume ~{}KB memory",
1313                self.watcher_buffer_size,
1314                (self.watcher_buffer_size * 240) / 1000
1315            );
1316        }
1317
1318        Ok(())
1319    }
1320}
1321
1322const fn default_event_queue_size() -> usize {
1323    10240
1324}
1325
1326const fn default_watcher_buffer_size() -> usize {
1327    256
1328}
1329
1330const fn default_enable_watch_metrics() -> bool {
1331    false
1332}
1333
1334const fn default_max_watcher_count() -> usize {
1335    // i64::MAX — the config crate uses signed 64-bit internally, so usize::MAX overflows it.
1336    // This value is effectively unlimited for any realistic deployment.
1337    9_223_372_036_854_775_807
1338}
1339
1340const fn default_heartbeat_interval_ms() -> u64 {
1341    30_000
1342}
1343
1344/// Performance metrics configuration
1345///
1346/// Controls emission of observability metrics. Disabling metrics reduces overhead
1347/// in hot paths but decreases system visibility.
1348///
1349/// # Example
1350/// ```toml
1351/// [raft.metrics]
1352/// enable_backpressure = true
1353/// enable_batch = true
1354/// sample_rate = 1  # No sampling (record every event)
1355/// ```
1356#[derive(Debug, Serialize, Deserialize, Clone)]
1357pub struct MetricsConfig {
1358    /// Enable backpressure metrics (rejections, buffer utilization)
1359    ///
1360    /// Tracks client request backpressure for capacity planning.
1361    /// Disable for absolute maximum performance in trusted environments.
1362    ///
1363    /// **Default**: false
1364    #[serde(default = "default_enable_backpressure_metrics")]
1365    pub enable_backpressure: bool,
1366
1367    /// Enable buffer length gauge (batch.buffer_length for propose and linearizable buffers)
1368    ///
1369    /// Tracks buffer utilization for capacity planning.
1370    /// Disable for absolute maximum performance in trusted environments.
1371    ///
1372    /// **Default**: false
1373    #[serde(default = "default_enable_batch_metrics")]
1374    pub enable_batch: bool,
1375
1376    /// Sample rate for high-frequency gauge metrics
1377    ///
1378    /// - `1` = record every event (no sampling)
1379    /// - `10` = record 1 out of 10 events (10% sampling)
1380    /// - `100` = record 1 out of 100 events (1% sampling)
1381    ///
1382    /// Applies to: buffer_utilization, buffer_length gauges.
1383    /// Does NOT apply to: counters (rejections, drain/heartbeat triggers).
1384    ///
1385    /// **Default**: 1 (no sampling)
1386    #[serde(default = "default_metrics_sample_rate")]
1387    pub sample_rate: u32,
1388}
1389
1390impl Default for MetricsConfig {
1391    fn default() -> Self {
1392        Self {
1393            enable_backpressure: default_enable_backpressure_metrics(),
1394            enable_batch: default_enable_batch_metrics(),
1395            sample_rate: default_metrics_sample_rate(),
1396        }
1397    }
1398}
1399
1400fn default_enable_backpressure_metrics() -> bool {
1401    false
1402}
1403
1404fn default_enable_batch_metrics() -> bool {
1405    false
1406}
1407
1408fn default_metrics_sample_rate() -> u32 {
1409    1 // No sampling by default
1410}