replication_engine/
config.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Configuration for the replication engine.
5//!
6//! This module defines all configuration types needed to run the replication engine.
7//! Configuration is passed to [`ReplicationEngine::new()`](crate::ReplicationEngine::new)
8//! and can be constructed programmatically or deserialized from YAML/JSON.
9//!
10//! # Quick Start
11//!
12//! ```rust
13//! use replication_engine::config::{ReplicationEngineConfig, PeerConfig};
14//!
15//! let config = ReplicationEngineConfig {
16//!     local_node_id: "node-1".into(),
17//!     peers: vec![
18//!         PeerConfig::for_testing("node-2", "redis://peer2:6379"),
19//!     ],
20//!     ..Default::default()
21//! };
22//! ```
23//!
24//! # Configuration Structure
25//!
26//! ```text
27//! ReplicationEngineConfig
28//! ├── local_node_id: String        # This node's unique ID
29//! ├── settings: ReplicationEngineSettings
30//! │   ├── hot_path: HotPathConfig  # CDC stream tailing
31//! │   ├── cold_path: ColdPathConfig # Merkle anti-entropy  
32//! │   ├── peer_health: PeerHealthConfig
33//! │   └── slo: SloConfig           # SLO thresholds
34//! ├── peers: Vec<PeerConfig>       # Remote nodes to replicate from
35//! └── cursor: CursorConfig         # SQLite cursor persistence
36//! ```
37//!
38//! # YAML Example
39//!
40//! ```yaml
41//! local_node_id: "uk.node.london-1"
42//!
43//! settings:
44//!   hot_path:
45//!     enabled: true
46//!     batch_size: 100
47//!     block_timeout: "5s"
48//!   cold_path:
49//!     enabled: true
50//!     interval_sec: 60
51//!
52//! peers:
53//!   - node_id: "uk.node.manchester-1"
54//!     redis_url: "redis://peer1:6379"
55//!
56//! cursor:
57//!   sqlite_path: "/var/lib/app/cursors.db"
58//! ```
59
60use serde::{Deserialize, Serialize};
61use std::time::Duration;
62
63// ═══════════════════════════════════════════════════════════════════════════════
64// Top-level config: passed from daemon to ReplicationEngine::new()
65// ═══════════════════════════════════════════════════════════════════════════════
66
67/// The top-level config object passed to `ReplicationEngine::new()`.
68///
69/// # Fields
70///
71/// - `local_node_id`: Unique identifier for this node. Used to filter self from peer lists.
72/// - `settings`: Tunable parameters for hot path, cold path, health checks, and SLOs.
73/// - `peers`: List of remote nodes to replicate from.
74/// - `cursor`: SQLite cursor persistence settings.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ReplicationEngineConfig {
77    /// The identity of the local node running this engine.
78    /// Used to filter "self" out of peer lists and identify our own streams.
79    pub local_node_id: String,
80
81    /// General settings for the replication logic (timeouts, batch sizes, etc.)
82    pub settings: ReplicationEngineSettings,
83
84    /// The list of peers to replicate from.
85    /// Each peer represents a remote node with a Redis CDC stream.
86    pub peers: Vec<PeerConfig>,
87
88    /// Cursor persistence settings.
89    /// Cursors are stored in SQLite for crash recovery.
90    #[serde(default)]
91    pub cursor: CursorConfig,
92}
93
94impl Default for ReplicationEngineConfig {
95    fn default() -> Self {
96        Self {
97            local_node_id: "local.dev.node.default".to_string(),
98            settings: ReplicationEngineSettings::default(),
99            peers: Vec::new(),
100            cursor: CursorConfig::default(),
101        }
102    }
103}
104
105impl ReplicationEngineConfig {
106    /// Create a minimal config for testing.
107    pub fn for_testing(local_node_id: &str) -> Self {
108        Self {
109            local_node_id: local_node_id.to_string(),
110            settings: ReplicationEngineSettings::default(),
111            peers: Vec::new(),
112            cursor: CursorConfig::in_memory(),
113        }
114    }
115}
116
117// ═══════════════════════════════════════════════════════════════════════════════
118// ReplicationSettings: hot path and cold path config
119// ═══════════════════════════════════════════════════════════════════════════════
120
121/// General settings for the replication logic.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123#[derive(Default)]
124pub struct ReplicationEngineSettings {
125    #[serde(default)]
126    pub hot_path: HotPathConfig,
127    #[serde(default)]
128    pub cold_path: ColdPathConfig,
129    #[serde(default)]
130    pub peer_health: PeerHealthConfig,
131    #[serde(default)]
132    pub slo: SloConfig,
133}
134
135
136// ═══════════════════════════════════════════════════════════════════════════════
137// SloConfig: Service Level Objectives for alerting
138// ═══════════════════════════════════════════════════════════════════════════════
139
140/// SLO thresholds for detecting performance degradation.
141///
142/// These thresholds trigger warnings when exceeded, helping operators
143/// detect issues before they become critical. Violations are logged
144/// and exposed via metrics.
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct SloConfig {
147    /// Maximum acceptable stream read latency (ms).
148    /// Exceeding this triggers a warning.
149    #[serde(default = "default_max_stream_read_latency_ms")]
150    pub max_stream_read_latency_ms: u64,
151
152    /// Maximum acceptable peer operation latency (ms).
153    /// Applies to Merkle queries, key fetches, etc.
154    #[serde(default = "default_max_peer_op_latency_ms")]
155    pub max_peer_op_latency_ms: u64,
156
157    /// Maximum acceptable batch flush latency (ms).
158    #[serde(default = "default_max_batch_flush_latency_ms")]
159    pub max_batch_flush_latency_ms: u64,
160
161    /// Maximum acceptable replication lag (seconds).
162    /// If we fall this far behind the stream head, trigger warning.
163    #[serde(default = "default_max_replication_lag_sec")]
164    pub max_replication_lag_sec: u64,
165}
166
167fn default_max_stream_read_latency_ms() -> u64 {
168    100 // 100ms
169}
170
171fn default_max_peer_op_latency_ms() -> u64 {
172    500 // 500ms
173}
174
175fn default_max_batch_flush_latency_ms() -> u64 {
176    200 // 200ms
177}
178
179fn default_max_replication_lag_sec() -> u64 {
180    30 // 30 seconds
181}
182
183impl Default for SloConfig {
184    fn default() -> Self {
185        Self {
186            max_stream_read_latency_ms: 100,
187            max_peer_op_latency_ms: 500,
188            max_batch_flush_latency_ms: 200,
189            max_replication_lag_sec: 30,
190        }
191    }
192}
193
194impl SloConfig {
195    /// Check if a stream read latency violates SLO.
196    pub fn is_stream_read_violation(&self, latency: Duration) -> bool {
197        latency.as_millis() as u64 > self.max_stream_read_latency_ms
198    }
199
200    /// Check if a peer operation latency violates SLO.
201    pub fn is_peer_op_violation(&self, latency: Duration) -> bool {
202        latency.as_millis() as u64 > self.max_peer_op_latency_ms
203    }
204
205    /// Check if a batch flush latency violates SLO.
206    pub fn is_batch_flush_violation(&self, latency: Duration) -> bool {
207        latency.as_millis() as u64 > self.max_batch_flush_latency_ms
208    }
209}
210
211/// Peer health check configuration.
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct PeerHealthConfig {
214    /// Whether to enable idle peer ping checks.
215    #[serde(default = "default_true")]
216    pub enabled: bool,
217
218    /// How often to check for idle peers (in seconds).
219    #[serde(default = "default_ping_interval_sec")]
220    pub ping_interval_sec: u64,
221
222    /// Consider a peer idle if no successful contact for this many seconds.
223    #[serde(default = "default_idle_threshold_sec")]
224    pub idle_threshold_sec: u64,
225}
226
227fn default_ping_interval_sec() -> u64 {
228    30
229}
230
231fn default_idle_threshold_sec() -> u64 {
232    60
233}
234
235impl Default for PeerHealthConfig {
236    fn default() -> Self {
237        Self {
238            enabled: true,
239            ping_interval_sec: 30,
240            idle_threshold_sec: 60,
241        }
242    }
243}
244
245// ═══════════════════════════════════════════════════════════════════════════════
246// PeerConfig: one entry per remote node
247// ═══════════════════════════════════════════════════════════════════════════════
248
249/// Configuration for a single peer node.
250///
251/// Each peer represents a remote node whose CDC stream we tail.
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct PeerConfig {
254    /// Peer's unique node ID (for logging and deduplication).
255    pub node_id: String,
256
257    /// Redis URL for connecting to the peer's CDC stream.
258    /// Example: `"redis://peer1.example.com:6379"`
259    pub redis_url: String,
260
261    /// Optional: Priority for sync (lower = higher priority).
262    /// Can be used for tiered replication strategies.
263    #[serde(default)]
264    pub priority: u32,
265
266    /// Number of consecutive failures before circuit opens.
267    #[serde(default = "default_circuit_failure_threshold")]
268    pub circuit_failure_threshold: u32,
269
270    /// How long to wait before trying again after circuit opens (seconds).
271    #[serde(default = "default_circuit_reset_timeout")]
272    pub circuit_reset_timeout_sec: u64,
273
274    /// Redis key prefix used by the peer (e.g., "sync:").
275    /// Must match the peer's sync-engine configuration.
276    #[serde(default)]
277    pub redis_prefix: Option<String>,
278}
279
280fn default_circuit_failure_threshold() -> u32 {
281    5
282}
283
284fn default_circuit_reset_timeout() -> u64 {
285    30
286}
287
288impl PeerConfig {
289    /// Get the CDC stream key for this peer.
290    /// Convention: redis_prefix includes trailing colon (e.g., "redsqrl:").
291    /// Result: "redsqrl:cdc" or just "cdc" if no prefix.
292    pub fn cdc_stream_key(&self) -> String {
293        let prefix = self.redis_prefix.as_deref().unwrap_or("");
294        format!("{}cdc", prefix)
295    }
296
297    /// Create a peer config for testing.
298    pub fn for_testing(node_id: &str, redis_url: &str) -> Self {
299        Self {
300            node_id: node_id.to_string(),
301            redis_url: redis_url.to_string(),
302            priority: 0,
303            circuit_failure_threshold: 5,
304            circuit_reset_timeout_sec: 30,
305            redis_prefix: None,
306        }
307    }
308}
309
310// ═══════════════════════════════════════════════════════════════════════════════
311// HotPathConfig: CDC stream tailing settings
312// ═══════════════════════════════════════════════════════════════════════════════
313
314/// Hot path (CDC stream tailing) configuration.
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct HotPathConfig {
317    /// Whether hot path replication is enabled.
318    #[serde(default = "default_true")]
319    pub enabled: bool,
320
321    /// Maximum entries to read per XREAD call (initial batch size).
322    #[serde(default = "default_batch_size")]
323    pub batch_size: usize,
324
325    /// XREAD block timeout as a duration string (e.g., "5s").
326    /// Parsed to Duration internally.
327    #[serde(default = "default_block_timeout")]
328    pub block_timeout: String,
329
330    /// Enable adaptive batch sizing (AIMD - Additive Increase, Multiplicative Decrease).
331    /// When enabled, batch size adjusts based on replication lag:
332    /// - Increases when caught up (empty reads)
333    /// - Decreases when lagging (full batches)
334    #[serde(default = "default_false")]
335    pub adaptive_batch_size: bool,
336
337    /// Minimum batch size for adaptive sizing.
338    #[serde(default = "default_min_batch_size")]
339    pub min_batch_size: usize,
340
341    /// Maximum batch size for adaptive sizing.
342    #[serde(default = "default_max_batch_size")]
343    pub max_batch_size: usize,
344
345    // ─────────────────────────────────────────────────────────────────────────
346    // Rate Limiting (thundering herd prevention)
347    // ─────────────────────────────────────────────────────────────────────────
348
349    /// Enable rate limiting for event processing.
350    /// Prevents thundering herd when many peers reconnect simultaneously.
351    #[serde(default = "default_false")]
352    pub rate_limit_enabled: bool,
353
354    /// Maximum events per second (sustained rate).
355    /// Tokens refill at this rate.
356    #[serde(default = "default_rate_limit_per_sec")]
357    pub rate_limit_per_sec: u32,
358
359    /// Maximum burst size for rate limiting.
360    /// Allows short bursts above the sustained rate.
361    #[serde(default = "default_rate_limit_burst")]
362    pub rate_limit_burst: u32,
363}
364
365fn default_rate_limit_per_sec() -> u32 {
366    10_000 // 10k events/sec default
367}
368
369fn default_rate_limit_burst() -> u32 {
370    1000 // Allow bursts of 1000 events
371}
372
373fn default_true() -> bool {
374    true
375}
376
377fn default_false() -> bool {
378    false
379}
380
381fn default_batch_size() -> usize {
382    100
383}
384
385fn default_min_batch_size() -> usize {
386    50
387}
388
389fn default_max_batch_size() -> usize {
390    1000
391}
392
393fn default_block_timeout() -> String {
394    "5s".to_string()
395}
396
397impl Default for HotPathConfig {
398    fn default() -> Self {
399        Self {
400            enabled: true,
401            batch_size: 100,
402            block_timeout: "5s".to_string(),
403            adaptive_batch_size: false,
404            min_batch_size: 50,
405            max_batch_size: 1000,
406            rate_limit_enabled: false,
407            rate_limit_per_sec: 10_000,
408            rate_limit_burst: 1000,
409        }
410    }
411}
412
413impl HotPathConfig {
414    /// Parse the block_timeout string to a Duration.
415    pub fn block_timeout_duration(&self) -> Duration {
416        humantime::parse_duration(&self.block_timeout).unwrap_or(Duration::from_secs(5))
417    }
418
419    /// Create rate limit configuration from hot path settings.
420    ///
421    /// Returns `None` if rate limiting is disabled.
422    pub fn rate_limit_config(&self) -> Option<crate::resilience::RateLimitConfig> {
423        if self.rate_limit_enabled {
424            Some(crate::resilience::RateLimitConfig {
425                burst_size: self.rate_limit_burst,
426                refill_rate: self.rate_limit_per_sec,
427            })
428        } else {
429            None
430        }
431    }
432}
433
434// ═══════════════════════════════════════════════════════════════════════════════
435// ColdPathConfig: Merkle anti-entropy settings
436// ═══════════════════════════════════════════════════════════════════════════════
437
438/// Cold path (Merkle anti-entropy) configuration.
439#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct ColdPathConfig {
441    /// Whether cold path repair is enabled.
442    #[serde(default = "default_true")]
443    pub enabled: bool,
444
445    /// How often to run Merkle comparison (in seconds).
446    #[serde(default = "default_interval_sec")]
447    pub interval_sec: u64,
448
449    /// Maximum items to repair per cycle (prevents memory pressure).
450    /// If more items are divergent, remaining items will be repaired in subsequent cycles.
451    #[serde(default = "default_max_items_per_cycle")]
452    pub max_items_per_cycle: usize,
453
454    /// Base backoff time in seconds when a peer fails repair.
455    /// Actual backoff = min(base * 2^consecutive_failures, max).
456    #[serde(default = "default_backoff_base_sec")]
457    pub backoff_base_sec: u64,
458
459    /// Maximum backoff time in seconds (ceiling).
460    #[serde(default = "default_backoff_max_sec")]
461    pub backoff_max_sec: u64,
462}
463
464fn default_interval_sec() -> u64 {
465    60
466}
467
468fn default_max_items_per_cycle() -> usize {
469    1000
470}
471
472fn default_backoff_base_sec() -> u64 {
473    5
474}
475
476fn default_backoff_max_sec() -> u64 {
477    300 // 5 minutes
478}
479
480impl Default for ColdPathConfig {
481    fn default() -> Self {
482        Self {
483            enabled: true,
484            interval_sec: 60,
485            max_items_per_cycle: 1000,
486            backoff_base_sec: 5,
487            backoff_max_sec: 300,
488        }
489    }
490}
491
492impl ColdPathConfig {
493    /// Get the interval as a Duration.
494    pub fn interval(&self) -> Duration {
495        Duration::from_secs(self.interval_sec)
496    }
497
498    /// Calculate backoff duration for a given number of consecutive failures.
499    pub fn backoff_for_failures(&self, consecutive_failures: u32) -> Duration {
500        let backoff_secs = self.backoff_base_sec.saturating_mul(
501            2u64.saturating_pow(consecutive_failures)
502        );
503        Duration::from_secs(backoff_secs.min(self.backoff_max_sec))
504    }
505}
506
507// ═══════════════════════════════════════════════════════════════════════════════
508// CursorConfig: cursor persistence (internal, not from daemon)
509// ═══════════════════════════════════════════════════════════════════════════════
510
511/// Cursor persistence configuration.
512///
513/// Cursors track the last-read position in each peer's CDC stream.
514/// We persist to SQLite because Redis streams are ephemeral (may be trimmed).
515#[derive(Debug, Clone, Serialize, Deserialize)]
516pub struct CursorConfig {
517    /// Path to SQLite database for cursor storage.
518    pub sqlite_path: String,
519
520    /// Whether to use WAL mode for SQLite (recommended).
521    #[serde(default = "default_true")]
522    pub wal_mode: bool,
523}
524
525impl Default for CursorConfig {
526    fn default() -> Self {
527        Self {
528            sqlite_path: "replication_cursors.db".to_string(),
529            wal_mode: true,
530        }
531    }
532}
533
534impl CursorConfig {
535    /// Create an in-memory config for testing.
536    pub fn in_memory() -> Self {
537        Self {
538            sqlite_path: ":memory:".to_string(),
539            wal_mode: false,
540        }
541    }
542}
543
544// ═══════════════════════════════════════════════════════════════════════════════
545// Tests
546// ═══════════════════════════════════════════════════════════════════════════════
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551
552    #[test]
553    fn test_peer_cdc_stream_key() {
554        let peer = PeerConfig::for_testing("test-node", "redis://localhost:6379");
555        // No prefix -> just "cdc"
556        assert_eq!(peer.cdc_stream_key(), "cdc");
557        
558        // With prefix (includes trailing colon)
559        let peer_with_prefix = PeerConfig {
560            redis_prefix: Some("redsqrl:".to_string()),
561            ..PeerConfig::for_testing("test-node", "redis://localhost:6379")
562        };
563        assert_eq!(peer_with_prefix.cdc_stream_key(), "redsqrl:cdc");
564    }
565
566    #[test]
567    fn test_peer_config_defaults() {
568        let peer = PeerConfig::for_testing("node-1", "redis://host:6379");
569        assert_eq!(peer.node_id, "node-1");
570        assert_eq!(peer.redis_url, "redis://host:6379");
571        assert_eq!(peer.priority, 0);
572        assert_eq!(peer.circuit_failure_threshold, 5);
573        assert_eq!(peer.circuit_reset_timeout_sec, 30);
574    }
575
576    #[test]
577    fn test_hot_path_block_timeout_parsing() {
578        let config = HotPathConfig {
579            enabled: true,
580            batch_size: 50,
581            block_timeout: "10s".to_string(),
582            adaptive_batch_size: false,
583            min_batch_size: 50,
584            max_batch_size: 1000,
585            rate_limit_enabled: false,
586            rate_limit_per_sec: 10_000,
587            rate_limit_burst: 1000,
588        };
589        assert_eq!(config.block_timeout_duration(), Duration::from_secs(10));
590    }
591
592    #[test]
593    fn test_hot_path_block_timeout_various_formats() {
594        let test_cases = [
595            ("5s", Duration::from_secs(5)),
596            ("1m", Duration::from_secs(60)),
597            ("500ms", Duration::from_millis(500)),
598            ("2min", Duration::from_secs(120)),
599        ];
600
601        for (input, expected) in test_cases {
602            let config = HotPathConfig {
603                block_timeout: input.to_string(),
604                ..Default::default()
605            };
606            assert_eq!(config.block_timeout_duration(), expected, "Failed for input: {}", input);
607        }
608    }
609
610    #[test]
611    fn test_hot_path_block_timeout_invalid_fallback() {
612        let config = HotPathConfig {
613            block_timeout: "invalid".to_string(),
614            ..Default::default()
615        };
616        // Should fall back to 5 seconds
617        assert_eq!(config.block_timeout_duration(), Duration::from_secs(5));
618    }
619
620    #[test]
621    fn test_hot_path_rate_limit_config() {
622        let mut config = HotPathConfig::default();
623        
624        // Disabled by default
625        assert!(config.rate_limit_config().is_none());
626
627        // Enable it
628        config.rate_limit_enabled = true;
629        config.rate_limit_per_sec = 5000;
630        config.rate_limit_burst = 500;
631        
632        let rate_config = config.rate_limit_config().unwrap();
633        assert_eq!(rate_config.refill_rate, 5000);
634        assert_eq!(rate_config.burst_size, 500);
635    }
636
637    #[test]
638    fn test_hot_path_default() {
639        let config = HotPathConfig::default();
640        assert!(config.enabled);
641        assert_eq!(config.batch_size, 100);
642        assert_eq!(config.block_timeout, "5s");
643        assert!(!config.adaptive_batch_size);
644        assert_eq!(config.min_batch_size, 50);
645        assert_eq!(config.max_batch_size, 1000);
646        assert!(!config.rate_limit_enabled);
647        assert_eq!(config.rate_limit_per_sec, 10_000);
648        assert_eq!(config.rate_limit_burst, 1000);
649    }
650
651    #[test]
652    fn test_cold_path_interval() {
653        let config = ColdPathConfig {
654            enabled: true,
655            interval_sec: 120,
656            max_items_per_cycle: 500,
657            backoff_base_sec: 5,
658            backoff_max_sec: 300,
659        };
660        assert_eq!(config.interval(), Duration::from_secs(120));
661    }
662
663    #[test]
664    fn test_cold_path_default() {
665        let config = ColdPathConfig::default();
666        assert!(config.enabled);
667        assert_eq!(config.interval_sec, 60);
668        assert_eq!(config.max_items_per_cycle, 1000);
669        assert_eq!(config.backoff_base_sec, 5);
670        assert_eq!(config.backoff_max_sec, 300);
671    }
672
673    #[test]
674    fn test_cold_path_backoff() {
675        let config = ColdPathConfig {
676            enabled: true,
677            interval_sec: 60,
678            max_items_per_cycle: 1000,
679            backoff_base_sec: 5,
680            backoff_max_sec: 300,
681        };
682
683        // 0 failures = 5 * 2^0 = 5 seconds (but we start from 1 failure in practice)
684        assert_eq!(config.backoff_for_failures(0), Duration::from_secs(5));
685
686        // 1 failure = 5 * 2^1 = 10 seconds
687        assert_eq!(config.backoff_for_failures(1), Duration::from_secs(10));
688
689        // 2 failures = 5 * 2^2 = 20 seconds
690        assert_eq!(config.backoff_for_failures(2), Duration::from_secs(20));
691
692        // 3 failures = 5 * 2^3 = 40 seconds
693        assert_eq!(config.backoff_for_failures(3), Duration::from_secs(40));
694
695        // 6 failures = 5 * 2^6 = 320 seconds, capped at 300
696        assert_eq!(config.backoff_for_failures(6), Duration::from_secs(300));
697
698        // Many failures = still capped at 300
699        assert_eq!(config.backoff_for_failures(10), Duration::from_secs(300));
700    }
701
702    #[test]
703    fn test_slo_config_default() {
704        let config = SloConfig::default();
705        assert_eq!(config.max_stream_read_latency_ms, 100);
706        assert_eq!(config.max_peer_op_latency_ms, 500);
707        assert_eq!(config.max_batch_flush_latency_ms, 200);
708        assert_eq!(config.max_replication_lag_sec, 30);
709    }
710
711    #[test]
712    fn test_slo_stream_read_violation() {
713        let config = SloConfig::default(); // 100ms threshold
714
715        // Under threshold - no violation
716        assert!(!config.is_stream_read_violation(Duration::from_millis(50)));
717        assert!(!config.is_stream_read_violation(Duration::from_millis(100)));
718
719        // Over threshold - violation
720        assert!(config.is_stream_read_violation(Duration::from_millis(101)));
721        assert!(config.is_stream_read_violation(Duration::from_secs(1)));
722    }
723
724    #[test]
725    fn test_slo_peer_op_violation() {
726        let config = SloConfig::default(); // 500ms threshold
727
728        assert!(!config.is_peer_op_violation(Duration::from_millis(250)));
729        assert!(!config.is_peer_op_violation(Duration::from_millis(500)));
730        assert!(config.is_peer_op_violation(Duration::from_millis(501)));
731    }
732
733    #[test]
734    fn test_slo_batch_flush_violation() {
735        let config = SloConfig::default(); // 200ms threshold
736
737        assert!(!config.is_batch_flush_violation(Duration::from_millis(100)));
738        assert!(!config.is_batch_flush_violation(Duration::from_millis(200)));
739        assert!(config.is_batch_flush_violation(Duration::from_millis(201)));
740    }
741
742    #[test]
743    fn test_peer_health_config_default() {
744        let config = PeerHealthConfig::default();
745        assert!(config.enabled);
746        assert_eq!(config.ping_interval_sec, 30);
747        assert_eq!(config.idle_threshold_sec, 60);
748    }
749
750    #[test]
751    fn test_cursor_config_default() {
752        let config = CursorConfig::default();
753        assert_eq!(config.sqlite_path, "replication_cursors.db");
754        assert!(config.wal_mode);
755    }
756
757    #[test]
758    fn test_cursor_config_in_memory() {
759        let config = CursorConfig::in_memory();
760        assert_eq!(config.sqlite_path, ":memory:");
761        assert!(!config.wal_mode);
762    }
763
764    #[test]
765    fn test_replication_engine_config_default() {
766        let config = ReplicationEngineConfig::default();
767        assert_eq!(config.local_node_id, "local.dev.node.default");
768        assert!(config.peers.is_empty());
769        assert!(config.settings.hot_path.enabled);
770        assert!(config.settings.cold_path.enabled);
771    }
772
773    #[test]
774    fn test_default_config_serializes() {
775        let config = ReplicationEngineConfig::default();
776        let json = serde_json::to_string_pretty(&config).unwrap();
777        assert!(json.contains("local.dev.node.default"));
778    }
779
780    #[test]
781    fn test_for_testing_config() {
782        let config = ReplicationEngineConfig::for_testing("test-node-1");
783        assert_eq!(config.local_node_id, "test-node-1");
784        assert_eq!(config.cursor.sqlite_path, ":memory:");
785    }
786
787    #[test]
788    fn test_replication_engine_settings_default() {
789        let settings = ReplicationEngineSettings::default();
790        assert!(settings.hot_path.enabled);
791        assert!(settings.cold_path.enabled);
792        assert!(settings.peer_health.enabled);
793        assert_eq!(settings.slo.max_replication_lag_sec, 30);
794    }
795
796    #[test]
797    fn test_config_json_roundtrip() {
798        let config = ReplicationEngineConfig {
799            local_node_id: "node-roundtrip".to_string(),
800            settings: ReplicationEngineSettings::default(),
801            peers: vec![
802                PeerConfig::for_testing("peer-1", "redis://peer1:6379"),
803                PeerConfig::for_testing("peer-2", "redis://peer2:6379"),
804            ],
805            cursor: CursorConfig::default(),
806        };
807
808        let json = serde_json::to_string(&config).unwrap();
809        let parsed: ReplicationEngineConfig = serde_json::from_str(&json).unwrap();
810
811        assert_eq!(parsed.local_node_id, "node-roundtrip");
812        assert_eq!(parsed.peers.len(), 2);
813        assert_eq!(parsed.peers[0].node_id, "peer-1");
814        assert_eq!(parsed.peers[1].node_id, "peer-2");
815    }
816
817    #[test]
818    fn test_hot_path_config_json_roundtrip() {
819        let config = HotPathConfig {
820            enabled: true,
821            batch_size: 200,
822            block_timeout: "10s".to_string(),
823            adaptive_batch_size: true,
824            min_batch_size: 25,
825            max_batch_size: 2000,
826            rate_limit_enabled: true,
827            rate_limit_per_sec: 5000,
828            rate_limit_burst: 500,
829        };
830
831        let json = serde_json::to_string(&config).unwrap();
832        let parsed: HotPathConfig = serde_json::from_str(&json).unwrap();
833
834        assert!(parsed.enabled);
835        assert_eq!(parsed.batch_size, 200);
836        assert_eq!(parsed.block_timeout, "10s");
837        assert!(parsed.adaptive_batch_size);
838        assert_eq!(parsed.min_batch_size, 25);
839        assert_eq!(parsed.max_batch_size, 2000);
840        assert!(parsed.rate_limit_enabled);
841        assert_eq!(parsed.rate_limit_per_sec, 5000);
842        assert_eq!(parsed.rate_limit_burst, 500);
843    }
844}