replication_engine/
config.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Configuration for the replication engine.
5//!
6//! This module defines all configuration types needed to run the replication engine.
7//! Configuration is passed to [`ReplicationEngine::new()`](crate::ReplicationEngine::new)
8//! and can be constructed programmatically or deserialized from YAML/JSON.
9//!
10//! # Quick Start
11//!
12//! ```rust
13//! use replication_engine::config::{ReplicationConfig, PeerConfig};
14//!
15//! let config = ReplicationConfig {
16//!     local_node_id: "node-1".into(),
17//!     peers: vec![
18//!         PeerConfig::for_testing("node-2", "redis://peer2:6379"),
19//!     ],
20//!     ..Default::default()
21//! };
22//! ```
23//!
24//! # Configuration Structure
25//!
26//! ```text
27//! ReplicationConfig
28//! ├── local_node_id: String        # This node's unique ID
29//! ├── settings: ReplicationSettings
30//! │   ├── hot_path: HotPathConfig  # CDC stream tailing
31//! │   ├── cold_path: ColdPathConfig # Merkle anti-entropy  
32//! │   ├── peer_health: PeerHealthConfig
33//! │   └── slo: SloConfig           # SLO thresholds
34//! ├── peers: Vec<PeerConfig>       # Remote nodes to replicate from
35//! └── cursor: CursorConfig         # SQLite cursor persistence
36//! ```
37//!
38//! # YAML Example
39//!
40//! ```yaml
41//! local_node_id: "uk.node.london-1"
42//!
43//! settings:
44//!   hot_path:
45//!     enabled: true
46//!     batch_size: 100
47//!     block_timeout: "5s"
48//!   cold_path:
49//!     enabled: true
50//!     interval_sec: 60
51//!
52//! peers:
53//!   - node_id: "uk.node.manchester-1"
54//!     redis_url: "redis://peer1:6379"
55//!
56//! cursor:
57//!   sqlite_path: "/var/lib/app/cursors.db"
58//! ```
59
60use serde::{Deserialize, Serialize};
61use std::time::Duration;
62
63// ═══════════════════════════════════════════════════════════════════════════════
64// Top-level config: passed from daemon to ReplicationEngine::new()
65// ═══════════════════════════════════════════════════════════════════════════════
66
67/// The top-level config object passed to `ReplicationEngine::new()`.
68///
69/// # Fields
70///
71/// - `local_node_id`: Unique identifier for this node. Used to filter self from peer lists.
72/// - `settings`: Tunable parameters for hot path, cold path, health checks, and SLOs.
73/// - `peers`: List of remote nodes to replicate from.
74/// - `cursor`: SQLite cursor persistence settings.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ReplicationConfig {
77    /// The identity of the local node running this engine.
78    /// Used to filter "self" out of peer lists and identify our own streams.
79    pub local_node_id: String,
80
81    /// General settings for the replication logic (timeouts, batch sizes, etc.)
82    pub settings: ReplicationSettings,
83
84    /// The list of peers to replicate from.
85    /// Each peer represents a remote node with a Redis CDC stream.
86    pub peers: Vec<PeerConfig>,
87
88    /// Cursor persistence settings.
89    /// Cursors are stored in SQLite for crash recovery.
90    #[serde(default)]
91    pub cursor: CursorConfig,
92}
93
94impl Default for ReplicationConfig {
95    fn default() -> Self {
96        Self {
97            local_node_id: "local.dev.node.default".to_string(),
98            settings: ReplicationSettings::default(),
99            peers: Vec::new(),
100            cursor: CursorConfig::default(),
101        }
102    }
103}
104
105impl ReplicationConfig {
106    /// Create a minimal config for testing.
107    pub fn for_testing(local_node_id: &str) -> Self {
108        Self {
109            local_node_id: local_node_id.to_string(),
110            settings: ReplicationSettings::default(),
111            peers: Vec::new(),
112            cursor: CursorConfig::in_memory(),
113        }
114    }
115}
116
117// ═══════════════════════════════════════════════════════════════════════════════
118// ReplicationSettings: hot path and cold path config
119// ═══════════════════════════════════════════════════════════════════════════════
120
121/// General settings for the replication logic.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123#[derive(Default)]
124pub struct ReplicationSettings {
125    #[serde(default)]
126    pub hot_path: HotPathConfig,
127    #[serde(default)]
128    pub cold_path: ColdPathConfig,
129    #[serde(default)]
130    pub peer_health: PeerHealthConfig,
131    #[serde(default)]
132    pub slo: SloConfig,
133}
134
135
136// ═══════════════════════════════════════════════════════════════════════════════
137// SloConfig: Service Level Objectives for alerting
138// ═══════════════════════════════════════════════════════════════════════════════
139
140/// SLO thresholds for detecting performance degradation.
141///
142/// These thresholds trigger warnings when exceeded, helping operators
143/// detect issues before they become critical. Violations are logged
144/// and exposed via metrics.
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct SloConfig {
147    /// Maximum acceptable stream read latency (ms).
148    /// Exceeding this triggers a warning.
149    #[serde(default = "default_max_stream_read_latency_ms")]
150    pub max_stream_read_latency_ms: u64,
151
152    /// Maximum acceptable peer operation latency (ms).
153    /// Applies to Merkle queries, key fetches, etc.
154    #[serde(default = "default_max_peer_op_latency_ms")]
155    pub max_peer_op_latency_ms: u64,
156
157    /// Maximum acceptable batch flush latency (ms).
158    #[serde(default = "default_max_batch_flush_latency_ms")]
159    pub max_batch_flush_latency_ms: u64,
160
161    /// Maximum acceptable replication lag (seconds).
162    /// If we fall this far behind the stream head, trigger warning.
163    #[serde(default = "default_max_replication_lag_sec")]
164    pub max_replication_lag_sec: u64,
165}
166
167fn default_max_stream_read_latency_ms() -> u64 {
168    100 // 100ms
169}
170
171fn default_max_peer_op_latency_ms() -> u64 {
172    500 // 500ms
173}
174
175fn default_max_batch_flush_latency_ms() -> u64 {
176    200 // 200ms
177}
178
179fn default_max_replication_lag_sec() -> u64 {
180    30 // 30 seconds
181}
182
183impl Default for SloConfig {
184    fn default() -> Self {
185        Self {
186            max_stream_read_latency_ms: 100,
187            max_peer_op_latency_ms: 500,
188            max_batch_flush_latency_ms: 200,
189            max_replication_lag_sec: 30,
190        }
191    }
192}
193
194impl SloConfig {
195    /// Check if a stream read latency violates SLO.
196    pub fn is_stream_read_violation(&self, latency: Duration) -> bool {
197        latency.as_millis() as u64 > self.max_stream_read_latency_ms
198    }
199
200    /// Check if a peer operation latency violates SLO.
201    pub fn is_peer_op_violation(&self, latency: Duration) -> bool {
202        latency.as_millis() as u64 > self.max_peer_op_latency_ms
203    }
204
205    /// Check if a batch flush latency violates SLO.
206    pub fn is_batch_flush_violation(&self, latency: Duration) -> bool {
207        latency.as_millis() as u64 > self.max_batch_flush_latency_ms
208    }
209}
210
211/// Peer health check configuration.
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct PeerHealthConfig {
214    /// Whether to enable idle peer ping checks.
215    #[serde(default = "default_true")]
216    pub enabled: bool,
217
218    /// How often to check for idle peers (in seconds).
219    #[serde(default = "default_ping_interval_sec")]
220    pub ping_interval_sec: u64,
221
222    /// Consider a peer idle if no successful contact for this many seconds.
223    #[serde(default = "default_idle_threshold_sec")]
224    pub idle_threshold_sec: u64,
225}
226
227fn default_ping_interval_sec() -> u64 {
228    30
229}
230
231fn default_idle_threshold_sec() -> u64 {
232    60
233}
234
235impl Default for PeerHealthConfig {
236    fn default() -> Self {
237        Self {
238            enabled: true,
239            ping_interval_sec: 30,
240            idle_threshold_sec: 60,
241        }
242    }
243}
244
245// ═══════════════════════════════════════════════════════════════════════════════
246// PeerConfig: one entry per remote node
247// ═══════════════════════════════════════════════════════════════════════════════
248
249/// Configuration for a single peer node.
250///
251/// Each peer represents a remote node whose CDC stream we tail.
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct PeerConfig {
254    /// Peer's unique node ID (for logging and deduplication).
255    pub node_id: String,
256
257    /// Redis URL for connecting to the peer's CDC stream.
258    /// Example: `"redis://peer1.example.com:6379"`
259    pub redis_url: String,
260
261    /// Optional: Priority for sync (lower = higher priority).
262    /// Can be used for tiered replication strategies.
263    #[serde(default)]
264    pub priority: u32,
265
266    /// Number of consecutive failures before circuit opens.
267    #[serde(default = "default_circuit_failure_threshold")]
268    pub circuit_failure_threshold: u32,
269
270    /// How long to wait before trying again after circuit opens (seconds).
271    #[serde(default = "default_circuit_reset_timeout")]
272    pub circuit_reset_timeout_sec: u64,
273
274    /// Redis key prefix used by the peer (e.g., "sync:").
275    /// Must match the peer's sync-engine configuration.
276    #[serde(default)]
277    pub redis_prefix: Option<String>,
278}
279
280fn default_circuit_failure_threshold() -> u32 {
281    5
282}
283
284fn default_circuit_reset_timeout() -> u64 {
285    30
286}
287
288impl PeerConfig {
289    /// Get the CDC stream key for this peer.
290    /// Uses the configured prefix + "__local__:cdc".
291    pub fn cdc_stream_key(&self) -> String {
292        let prefix = self.redis_prefix.as_deref().unwrap_or("");
293        format!("{}__local__:cdc", prefix)
294    }
295
296    /// Create a peer config for testing.
297    pub fn for_testing(node_id: &str, redis_url: &str) -> Self {
298        Self {
299            node_id: node_id.to_string(),
300            redis_url: redis_url.to_string(),
301            priority: 0,
302            circuit_failure_threshold: 5,
303            circuit_reset_timeout_sec: 30,
304            redis_prefix: None,
305        }
306    }
307}
308
309// ═══════════════════════════════════════════════════════════════════════════════
310// HotPathConfig: CDC stream tailing settings
311// ═══════════════════════════════════════════════════════════════════════════════
312
313/// Hot path (CDC stream tailing) configuration.
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct HotPathConfig {
316    /// Whether hot path replication is enabled.
317    #[serde(default = "default_true")]
318    pub enabled: bool,
319
320    /// Maximum entries to read per XREAD call (initial batch size).
321    #[serde(default = "default_batch_size")]
322    pub batch_size: usize,
323
324    /// XREAD block timeout as a duration string (e.g., "5s").
325    /// Parsed to Duration internally.
326    #[serde(default = "default_block_timeout")]
327    pub block_timeout: String,
328
329    /// Enable adaptive batch sizing (AIMD - Additive Increase, Multiplicative Decrease).
330    /// When enabled, batch size adjusts based on replication lag:
331    /// - Increases when caught up (empty reads)
332    /// - Decreases when lagging (full batches)
333    #[serde(default = "default_false")]
334    pub adaptive_batch_size: bool,
335
336    /// Minimum batch size for adaptive sizing.
337    #[serde(default = "default_min_batch_size")]
338    pub min_batch_size: usize,
339
340    /// Maximum batch size for adaptive sizing.
341    #[serde(default = "default_max_batch_size")]
342    pub max_batch_size: usize,
343
344    // ─────────────────────────────────────────────────────────────────────────
345    // Rate Limiting (thundering herd prevention)
346    // ─────────────────────────────────────────────────────────────────────────
347
348    /// Enable rate limiting for event processing.
349    /// Prevents thundering herd when many peers reconnect simultaneously.
350    #[serde(default = "default_false")]
351    pub rate_limit_enabled: bool,
352
353    /// Maximum events per second (sustained rate).
354    /// Tokens refill at this rate.
355    #[serde(default = "default_rate_limit_per_sec")]
356    pub rate_limit_per_sec: u32,
357
358    /// Maximum burst size for rate limiting.
359    /// Allows short bursts above the sustained rate.
360    #[serde(default = "default_rate_limit_burst")]
361    pub rate_limit_burst: u32,
362}
363
364fn default_rate_limit_per_sec() -> u32 {
365    10_000 // 10k events/sec default
366}
367
368fn default_rate_limit_burst() -> u32 {
369    1000 // Allow bursts of 1000 events
370}
371
372fn default_true() -> bool {
373    true
374}
375
376fn default_false() -> bool {
377    false
378}
379
380fn default_batch_size() -> usize {
381    100
382}
383
384fn default_min_batch_size() -> usize {
385    50
386}
387
388fn default_max_batch_size() -> usize {
389    1000
390}
391
392fn default_block_timeout() -> String {
393    "5s".to_string()
394}
395
396impl Default for HotPathConfig {
397    fn default() -> Self {
398        Self {
399            enabled: true,
400            batch_size: 100,
401            block_timeout: "5s".to_string(),
402            adaptive_batch_size: false,
403            min_batch_size: 50,
404            max_batch_size: 1000,
405            rate_limit_enabled: false,
406            rate_limit_per_sec: 10_000,
407            rate_limit_burst: 1000,
408        }
409    }
410}
411
412impl HotPathConfig {
413    /// Parse the block_timeout string to a Duration.
414    pub fn block_timeout_duration(&self) -> Duration {
415        humantime::parse_duration(&self.block_timeout).unwrap_or(Duration::from_secs(5))
416    }
417
418    /// Create rate limit configuration from hot path settings.
419    ///
420    /// Returns `None` if rate limiting is disabled.
421    pub fn rate_limit_config(&self) -> Option<crate::resilience::RateLimitConfig> {
422        if self.rate_limit_enabled {
423            Some(crate::resilience::RateLimitConfig {
424                burst_size: self.rate_limit_burst,
425                refill_rate: self.rate_limit_per_sec,
426            })
427        } else {
428            None
429        }
430    }
431}
432
433// ═══════════════════════════════════════════════════════════════════════════════
434// ColdPathConfig: Merkle anti-entropy settings
435// ═══════════════════════════════════════════════════════════════════════════════
436
437/// Cold path (Merkle anti-entropy) configuration.
438#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ColdPathConfig {
440    /// Whether cold path repair is enabled.
441    #[serde(default = "default_true")]
442    pub enabled: bool,
443
444    /// How often to run Merkle comparison (in seconds).
445    #[serde(default = "default_interval_sec")]
446    pub interval_sec: u64,
447
448    /// Maximum items to repair per cycle (prevents memory pressure).
449    /// If more items are divergent, remaining items will be repaired in subsequent cycles.
450    #[serde(default = "default_max_items_per_cycle")]
451    pub max_items_per_cycle: usize,
452
453    /// Base backoff time in seconds when a peer fails repair.
454    /// Actual backoff = min(base * 2^consecutive_failures, max).
455    #[serde(default = "default_backoff_base_sec")]
456    pub backoff_base_sec: u64,
457
458    /// Maximum backoff time in seconds (ceiling).
459    #[serde(default = "default_backoff_max_sec")]
460    pub backoff_max_sec: u64,
461}
462
463fn default_interval_sec() -> u64 {
464    60
465}
466
467fn default_max_items_per_cycle() -> usize {
468    1000
469}
470
471fn default_backoff_base_sec() -> u64 {
472    5
473}
474
475fn default_backoff_max_sec() -> u64 {
476    300 // 5 minutes
477}
478
479impl Default for ColdPathConfig {
480    fn default() -> Self {
481        Self {
482            enabled: true,
483            interval_sec: 60,
484            max_items_per_cycle: 1000,
485            backoff_base_sec: 5,
486            backoff_max_sec: 300,
487        }
488    }
489}
490
491impl ColdPathConfig {
492    /// Get the interval as a Duration.
493    pub fn interval(&self) -> Duration {
494        Duration::from_secs(self.interval_sec)
495    }
496
497    /// Calculate backoff duration for a given number of consecutive failures.
498    pub fn backoff_for_failures(&self, consecutive_failures: u32) -> Duration {
499        let backoff_secs = self.backoff_base_sec.saturating_mul(
500            2u64.saturating_pow(consecutive_failures)
501        );
502        Duration::from_secs(backoff_secs.min(self.backoff_max_sec))
503    }
504}
505
506// ═══════════════════════════════════════════════════════════════════════════════
507// CursorConfig: cursor persistence (internal, not from daemon)
508// ═══════════════════════════════════════════════════════════════════════════════
509
510/// Cursor persistence configuration.
511///
512/// Cursors track the last-read position in each peer's CDC stream.
513/// We persist to SQLite because Redis streams are ephemeral (may be trimmed).
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct CursorConfig {
516    /// Path to SQLite database for cursor storage.
517    pub sqlite_path: String,
518
519    /// Whether to use WAL mode for SQLite (recommended).
520    #[serde(default = "default_true")]
521    pub wal_mode: bool,
522}
523
524impl Default for CursorConfig {
525    fn default() -> Self {
526        Self {
527            sqlite_path: "replication_cursors.db".to_string(),
528            wal_mode: true,
529        }
530    }
531}
532
533impl CursorConfig {
534    /// Create an in-memory config for testing.
535    pub fn in_memory() -> Self {
536        Self {
537            sqlite_path: ":memory:".to_string(),
538            wal_mode: false,
539        }
540    }
541}
542
543// ═══════════════════════════════════════════════════════════════════════════════
544// Tests
545// ═══════════════════════════════════════════════════════════════════════════════
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550
551    #[test]
552    fn test_peer_cdc_stream_key() {
553        let peer = PeerConfig::for_testing("test-node", "redis://localhost:6379");
554        assert_eq!(peer.cdc_stream_key(), "__local__:cdc");
555    }
556
557    #[test]
558    fn test_peer_config_defaults() {
559        let peer = PeerConfig::for_testing("node-1", "redis://host:6379");
560        assert_eq!(peer.node_id, "node-1");
561        assert_eq!(peer.redis_url, "redis://host:6379");
562        assert_eq!(peer.priority, 0);
563        assert_eq!(peer.circuit_failure_threshold, 5);
564        assert_eq!(peer.circuit_reset_timeout_sec, 30);
565    }
566
567    #[test]
568    fn test_hot_path_block_timeout_parsing() {
569        let config = HotPathConfig {
570            enabled: true,
571            batch_size: 50,
572            block_timeout: "10s".to_string(),
573            adaptive_batch_size: false,
574            min_batch_size: 50,
575            max_batch_size: 1000,
576            rate_limit_enabled: false,
577            rate_limit_per_sec: 10_000,
578            rate_limit_burst: 1000,
579        };
580        assert_eq!(config.block_timeout_duration(), Duration::from_secs(10));
581    }
582
583    #[test]
584    fn test_hot_path_block_timeout_various_formats() {
585        let test_cases = [
586            ("5s", Duration::from_secs(5)),
587            ("1m", Duration::from_secs(60)),
588            ("500ms", Duration::from_millis(500)),
589            ("2min", Duration::from_secs(120)),
590        ];
591
592        for (input, expected) in test_cases {
593            let config = HotPathConfig {
594                block_timeout: input.to_string(),
595                ..Default::default()
596            };
597            assert_eq!(config.block_timeout_duration(), expected, "Failed for input: {}", input);
598        }
599    }
600
601    #[test]
602    fn test_hot_path_block_timeout_invalid_fallback() {
603        let config = HotPathConfig {
604            block_timeout: "invalid".to_string(),
605            ..Default::default()
606        };
607        // Should fall back to 5 seconds
608        assert_eq!(config.block_timeout_duration(), Duration::from_secs(5));
609    }
610
611    #[test]
612    fn test_hot_path_rate_limit_config() {
613        let mut config = HotPathConfig::default();
614        
615        // Disabled by default
616        assert!(config.rate_limit_config().is_none());
617
618        // Enable it
619        config.rate_limit_enabled = true;
620        config.rate_limit_per_sec = 5000;
621        config.rate_limit_burst = 500;
622        
623        let rate_config = config.rate_limit_config().unwrap();
624        assert_eq!(rate_config.refill_rate, 5000);
625        assert_eq!(rate_config.burst_size, 500);
626    }
627
628    #[test]
629    fn test_hot_path_default() {
630        let config = HotPathConfig::default();
631        assert!(config.enabled);
632        assert_eq!(config.batch_size, 100);
633        assert_eq!(config.block_timeout, "5s");
634        assert!(!config.adaptive_batch_size);
635        assert_eq!(config.min_batch_size, 50);
636        assert_eq!(config.max_batch_size, 1000);
637        assert!(!config.rate_limit_enabled);
638        assert_eq!(config.rate_limit_per_sec, 10_000);
639        assert_eq!(config.rate_limit_burst, 1000);
640    }
641
642    #[test]
643    fn test_cold_path_interval() {
644        let config = ColdPathConfig {
645            enabled: true,
646            interval_sec: 120,
647            max_items_per_cycle: 500,
648            backoff_base_sec: 5,
649            backoff_max_sec: 300,
650        };
651        assert_eq!(config.interval(), Duration::from_secs(120));
652    }
653
654    #[test]
655    fn test_cold_path_default() {
656        let config = ColdPathConfig::default();
657        assert!(config.enabled);
658        assert_eq!(config.interval_sec, 60);
659        assert_eq!(config.max_items_per_cycle, 1000);
660        assert_eq!(config.backoff_base_sec, 5);
661        assert_eq!(config.backoff_max_sec, 300);
662    }
663
664    #[test]
665    fn test_cold_path_backoff() {
666        let config = ColdPathConfig {
667            enabled: true,
668            interval_sec: 60,
669            max_items_per_cycle: 1000,
670            backoff_base_sec: 5,
671            backoff_max_sec: 300,
672        };
673
674        // 0 failures = 5 * 2^0 = 5 seconds (but we start from 1 failure in practice)
675        assert_eq!(config.backoff_for_failures(0), Duration::from_secs(5));
676
677        // 1 failure = 5 * 2^1 = 10 seconds
678        assert_eq!(config.backoff_for_failures(1), Duration::from_secs(10));
679
680        // 2 failures = 5 * 2^2 = 20 seconds
681        assert_eq!(config.backoff_for_failures(2), Duration::from_secs(20));
682
683        // 3 failures = 5 * 2^3 = 40 seconds
684        assert_eq!(config.backoff_for_failures(3), Duration::from_secs(40));
685
686        // 6 failures = 5 * 2^6 = 320 seconds, capped at 300
687        assert_eq!(config.backoff_for_failures(6), Duration::from_secs(300));
688
689        // Many failures = still capped at 300
690        assert_eq!(config.backoff_for_failures(10), Duration::from_secs(300));
691    }
692
693    #[test]
694    fn test_slo_config_default() {
695        let config = SloConfig::default();
696        assert_eq!(config.max_stream_read_latency_ms, 100);
697        assert_eq!(config.max_peer_op_latency_ms, 500);
698        assert_eq!(config.max_batch_flush_latency_ms, 200);
699        assert_eq!(config.max_replication_lag_sec, 30);
700    }
701
702    #[test]
703    fn test_slo_stream_read_violation() {
704        let config = SloConfig::default(); // 100ms threshold
705
706        // Under threshold - no violation
707        assert!(!config.is_stream_read_violation(Duration::from_millis(50)));
708        assert!(!config.is_stream_read_violation(Duration::from_millis(100)));
709
710        // Over threshold - violation
711        assert!(config.is_stream_read_violation(Duration::from_millis(101)));
712        assert!(config.is_stream_read_violation(Duration::from_secs(1)));
713    }
714
715    #[test]
716    fn test_slo_peer_op_violation() {
717        let config = SloConfig::default(); // 500ms threshold
718
719        assert!(!config.is_peer_op_violation(Duration::from_millis(250)));
720        assert!(!config.is_peer_op_violation(Duration::from_millis(500)));
721        assert!(config.is_peer_op_violation(Duration::from_millis(501)));
722    }
723
724    #[test]
725    fn test_slo_batch_flush_violation() {
726        let config = SloConfig::default(); // 200ms threshold
727
728        assert!(!config.is_batch_flush_violation(Duration::from_millis(100)));
729        assert!(!config.is_batch_flush_violation(Duration::from_millis(200)));
730        assert!(config.is_batch_flush_violation(Duration::from_millis(201)));
731    }
732
733    #[test]
734    fn test_peer_health_config_default() {
735        let config = PeerHealthConfig::default();
736        assert!(config.enabled);
737        assert_eq!(config.ping_interval_sec, 30);
738        assert_eq!(config.idle_threshold_sec, 60);
739    }
740
741    #[test]
742    fn test_cursor_config_default() {
743        let config = CursorConfig::default();
744        assert_eq!(config.sqlite_path, "replication_cursors.db");
745        assert!(config.wal_mode);
746    }
747
748    #[test]
749    fn test_cursor_config_in_memory() {
750        let config = CursorConfig::in_memory();
751        assert_eq!(config.sqlite_path, ":memory:");
752        assert!(!config.wal_mode);
753    }
754
755    #[test]
756    fn test_replication_config_default() {
757        let config = ReplicationConfig::default();
758        assert_eq!(config.local_node_id, "local.dev.node.default");
759        assert!(config.peers.is_empty());
760        assert!(config.settings.hot_path.enabled);
761        assert!(config.settings.cold_path.enabled);
762    }
763
764    #[test]
765    fn test_default_config_serializes() {
766        let config = ReplicationConfig::default();
767        let json = serde_json::to_string_pretty(&config).unwrap();
768        assert!(json.contains("local.dev.node.default"));
769    }
770
771    #[test]
772    fn test_for_testing_config() {
773        let config = ReplicationConfig::for_testing("test-node-1");
774        assert_eq!(config.local_node_id, "test-node-1");
775        assert_eq!(config.cursor.sqlite_path, ":memory:");
776    }
777
778    #[test]
779    fn test_replication_settings_default() {
780        let settings = ReplicationSettings::default();
781        assert!(settings.hot_path.enabled);
782        assert!(settings.cold_path.enabled);
783        assert!(settings.peer_health.enabled);
784        assert_eq!(settings.slo.max_replication_lag_sec, 30);
785    }
786
787    #[test]
788    fn test_config_json_roundtrip() {
789        let config = ReplicationConfig {
790            local_node_id: "node-roundtrip".to_string(),
791            settings: ReplicationSettings::default(),
792            peers: vec![
793                PeerConfig::for_testing("peer-1", "redis://peer1:6379"),
794                PeerConfig::for_testing("peer-2", "redis://peer2:6379"),
795            ],
796            cursor: CursorConfig::default(),
797        };
798
799        let json = serde_json::to_string(&config).unwrap();
800        let parsed: ReplicationConfig = serde_json::from_str(&json).unwrap();
801
802        assert_eq!(parsed.local_node_id, "node-roundtrip");
803        assert_eq!(parsed.peers.len(), 2);
804        assert_eq!(parsed.peers[0].node_id, "peer-1");
805        assert_eq!(parsed.peers[1].node_id, "peer-2");
806    }
807
808    #[test]
809    fn test_hot_path_config_json_roundtrip() {
810        let config = HotPathConfig {
811            enabled: true,
812            batch_size: 200,
813            block_timeout: "10s".to_string(),
814            adaptive_batch_size: true,
815            min_batch_size: 25,
816            max_batch_size: 2000,
817            rate_limit_enabled: true,
818            rate_limit_per_sec: 5000,
819            rate_limit_burst: 500,
820        };
821
822        let json = serde_json::to_string(&config).unwrap();
823        let parsed: HotPathConfig = serde_json::from_str(&json).unwrap();
824
825        assert!(parsed.enabled);
826        assert_eq!(parsed.batch_size, 200);
827        assert_eq!(parsed.block_timeout, "10s");
828        assert!(parsed.adaptive_batch_size);
829        assert_eq!(parsed.min_batch_size, 25);
830        assert_eq!(parsed.max_batch_size, 2000);
831        assert!(parsed.rate_limit_enabled);
832        assert_eq!(parsed.rate_limit_per_sec, 5000);
833        assert_eq!(parsed.rate_limit_burst, 500);
834    }
835}