1use serde::{Deserialize, Serialize};
61use std::time::Duration;
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ReplicationEngineConfig {
77 pub local_node_id: String,
80
81 pub settings: ReplicationEngineSettings,
83
84 pub peers: Vec<PeerConfig>,
87
88 #[serde(default)]
91 pub cursor: CursorConfig,
92}
93
94impl Default for ReplicationEngineConfig {
95 fn default() -> Self {
96 Self {
97 local_node_id: "local.dev.node.default".to_string(),
98 settings: ReplicationEngineSettings::default(),
99 peers: Vec::new(),
100 cursor: CursorConfig::default(),
101 }
102 }
103}
104
105impl ReplicationEngineConfig {
106 pub fn for_testing(local_node_id: &str) -> Self {
108 Self {
109 local_node_id: local_node_id.to_string(),
110 settings: ReplicationEngineSettings::default(),
111 peers: Vec::new(),
112 cursor: CursorConfig::in_memory(),
113 }
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
123#[derive(Default)]
124pub struct ReplicationEngineSettings {
125 #[serde(default)]
126 pub hot_path: HotPathConfig,
127 #[serde(default)]
128 pub cold_path: ColdPathConfig,
129 #[serde(default)]
130 pub peer_health: PeerHealthConfig,
131 #[serde(default)]
132 pub slo: SloConfig,
133}
134
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct SloConfig {
147 #[serde(default = "default_max_stream_read_latency_ms")]
150 pub max_stream_read_latency_ms: u64,
151
152 #[serde(default = "default_max_peer_op_latency_ms")]
155 pub max_peer_op_latency_ms: u64,
156
157 #[serde(default = "default_max_batch_flush_latency_ms")]
159 pub max_batch_flush_latency_ms: u64,
160
161 #[serde(default = "default_max_replication_lag_sec")]
164 pub max_replication_lag_sec: u64,
165}
166
167fn default_max_stream_read_latency_ms() -> u64 {
168 100 }
170
171fn default_max_peer_op_latency_ms() -> u64 {
172 500 }
174
175fn default_max_batch_flush_latency_ms() -> u64 {
176 200 }
178
179fn default_max_replication_lag_sec() -> u64 {
180 30 }
182
183impl Default for SloConfig {
184 fn default() -> Self {
185 Self {
186 max_stream_read_latency_ms: 100,
187 max_peer_op_latency_ms: 500,
188 max_batch_flush_latency_ms: 200,
189 max_replication_lag_sec: 30,
190 }
191 }
192}
193
194impl SloConfig {
195 pub fn is_stream_read_violation(&self, latency: Duration) -> bool {
197 latency.as_millis() as u64 > self.max_stream_read_latency_ms
198 }
199
200 pub fn is_peer_op_violation(&self, latency: Duration) -> bool {
202 latency.as_millis() as u64 > self.max_peer_op_latency_ms
203 }
204
205 pub fn is_batch_flush_violation(&self, latency: Duration) -> bool {
207 latency.as_millis() as u64 > self.max_batch_flush_latency_ms
208 }
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct PeerHealthConfig {
214 #[serde(default = "default_true")]
216 pub enabled: bool,
217
218 #[serde(default = "default_ping_interval_sec")]
220 pub ping_interval_sec: u64,
221
222 #[serde(default = "default_idle_threshold_sec")]
224 pub idle_threshold_sec: u64,
225}
226
227fn default_ping_interval_sec() -> u64 {
228 30
229}
230
231fn default_idle_threshold_sec() -> u64 {
232 60
233}
234
235impl Default for PeerHealthConfig {
236 fn default() -> Self {
237 Self {
238 enabled: true,
239 ping_interval_sec: 30,
240 idle_threshold_sec: 60,
241 }
242 }
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct PeerConfig {
254 pub node_id: String,
256
257 pub redis_url: String,
260
261 #[serde(default)]
264 pub priority: u32,
265
266 #[serde(default = "default_circuit_failure_threshold")]
268 pub circuit_failure_threshold: u32,
269
270 #[serde(default = "default_circuit_reset_timeout")]
272 pub circuit_reset_timeout_sec: u64,
273
274 #[serde(default)]
277 pub redis_prefix: Option<String>,
278}
279
280fn default_circuit_failure_threshold() -> u32 {
281 5
282}
283
284fn default_circuit_reset_timeout() -> u64 {
285 30
286}
287
288impl PeerConfig {
289 pub fn cdc_stream_key(&self) -> String {
293 let prefix = self.redis_prefix.as_deref().unwrap_or("");
294 format!("{}cdc", prefix)
295 }
296
297 pub fn for_testing(node_id: &str, redis_url: &str) -> Self {
299 Self {
300 node_id: node_id.to_string(),
301 redis_url: redis_url.to_string(),
302 priority: 0,
303 circuit_failure_threshold: 5,
304 circuit_reset_timeout_sec: 30,
305 redis_prefix: None,
306 }
307 }
308}
309
310#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct HotPathConfig {
317 #[serde(default = "default_true")]
319 pub enabled: bool,
320
321 #[serde(default = "default_batch_size")]
323 pub batch_size: usize,
324
325 #[serde(default = "default_block_timeout")]
328 pub block_timeout: String,
329
330 #[serde(default = "default_false")]
335 pub adaptive_batch_size: bool,
336
337 #[serde(default = "default_min_batch_size")]
339 pub min_batch_size: usize,
340
341 #[serde(default = "default_max_batch_size")]
343 pub max_batch_size: usize,
344
345 #[serde(default = "default_false")]
352 pub rate_limit_enabled: bool,
353
354 #[serde(default = "default_rate_limit_per_sec")]
357 pub rate_limit_per_sec: u32,
358
359 #[serde(default = "default_rate_limit_burst")]
362 pub rate_limit_burst: u32,
363}
364
365fn default_rate_limit_per_sec() -> u32 {
366 10_000 }
368
369fn default_rate_limit_burst() -> u32 {
370 1000 }
372
373fn default_true() -> bool {
374 true
375}
376
377fn default_false() -> bool {
378 false
379}
380
381fn default_batch_size() -> usize {
382 100
383}
384
385fn default_min_batch_size() -> usize {
386 50
387}
388
389fn default_max_batch_size() -> usize {
390 1000
391}
392
393fn default_block_timeout() -> String {
394 "5s".to_string()
395}
396
397impl Default for HotPathConfig {
398 fn default() -> Self {
399 Self {
400 enabled: true,
401 batch_size: 100,
402 block_timeout: "5s".to_string(),
403 adaptive_batch_size: false,
404 min_batch_size: 50,
405 max_batch_size: 1000,
406 rate_limit_enabled: false,
407 rate_limit_per_sec: 10_000,
408 rate_limit_burst: 1000,
409 }
410 }
411}
412
413impl HotPathConfig {
414 pub fn block_timeout_duration(&self) -> Duration {
416 humantime::parse_duration(&self.block_timeout).unwrap_or(Duration::from_secs(5))
417 }
418
419 pub fn rate_limit_config(&self) -> Option<crate::resilience::RateLimitConfig> {
423 if self.rate_limit_enabled {
424 Some(crate::resilience::RateLimitConfig {
425 burst_size: self.rate_limit_burst,
426 refill_rate: self.rate_limit_per_sec,
427 })
428 } else {
429 None
430 }
431 }
432}
433
434#[derive(Debug, Clone, Serialize, Deserialize)]
440pub struct ColdPathConfig {
441 #[serde(default = "default_true")]
443 pub enabled: bool,
444
445 #[serde(default = "default_interval_sec")]
447 pub interval_sec: u64,
448
449 #[serde(default = "default_max_items_per_cycle")]
452 pub max_items_per_cycle: usize,
453
454 #[serde(default = "default_backoff_base_sec")]
457 pub backoff_base_sec: u64,
458
459 #[serde(default = "default_backoff_max_sec")]
461 pub backoff_max_sec: u64,
462}
463
464fn default_interval_sec() -> u64 {
465 60
466}
467
468fn default_max_items_per_cycle() -> usize {
469 1000
470}
471
472fn default_backoff_base_sec() -> u64 {
473 5
474}
475
476fn default_backoff_max_sec() -> u64 {
477 300 }
479
480impl Default for ColdPathConfig {
481 fn default() -> Self {
482 Self {
483 enabled: true,
484 interval_sec: 60,
485 max_items_per_cycle: 1000,
486 backoff_base_sec: 5,
487 backoff_max_sec: 300,
488 }
489 }
490}
491
492impl ColdPathConfig {
493 pub fn interval(&self) -> Duration {
495 Duration::from_secs(self.interval_sec)
496 }
497
498 pub fn backoff_for_failures(&self, consecutive_failures: u32) -> Duration {
500 let backoff_secs = self.backoff_base_sec.saturating_mul(
501 2u64.saturating_pow(consecutive_failures)
502 );
503 Duration::from_secs(backoff_secs.min(self.backoff_max_sec))
504 }
505}
506
507#[derive(Debug, Clone, Serialize, Deserialize)]
516pub struct CursorConfig {
517 pub sqlite_path: String,
519
520 #[serde(default = "default_true")]
522 pub wal_mode: bool,
523}
524
525impl Default for CursorConfig {
526 fn default() -> Self {
527 Self {
528 sqlite_path: "replication_cursors.db".to_string(),
529 wal_mode: true,
530 }
531 }
532}
533
534impl CursorConfig {
535 pub fn in_memory() -> Self {
537 Self {
538 sqlite_path: ":memory:".to_string(),
539 wal_mode: false,
540 }
541 }
542}
543
544#[cfg(test)]
549mod tests {
550 use super::*;
551
552 #[test]
553 fn test_peer_cdc_stream_key() {
554 let peer = PeerConfig::for_testing("test-node", "redis://localhost:6379");
555 assert_eq!(peer.cdc_stream_key(), "cdc");
557
558 let peer_with_prefix = PeerConfig {
560 redis_prefix: Some("redsqrl:".to_string()),
561 ..PeerConfig::for_testing("test-node", "redis://localhost:6379")
562 };
563 assert_eq!(peer_with_prefix.cdc_stream_key(), "redsqrl:cdc");
564 }
565
566 #[test]
567 fn test_peer_config_defaults() {
568 let peer = PeerConfig::for_testing("node-1", "redis://host:6379");
569 assert_eq!(peer.node_id, "node-1");
570 assert_eq!(peer.redis_url, "redis://host:6379");
571 assert_eq!(peer.priority, 0);
572 assert_eq!(peer.circuit_failure_threshold, 5);
573 assert_eq!(peer.circuit_reset_timeout_sec, 30);
574 }
575
576 #[test]
577 fn test_hot_path_block_timeout_parsing() {
578 let config = HotPathConfig {
579 enabled: true,
580 batch_size: 50,
581 block_timeout: "10s".to_string(),
582 adaptive_batch_size: false,
583 min_batch_size: 50,
584 max_batch_size: 1000,
585 rate_limit_enabled: false,
586 rate_limit_per_sec: 10_000,
587 rate_limit_burst: 1000,
588 };
589 assert_eq!(config.block_timeout_duration(), Duration::from_secs(10));
590 }
591
592 #[test]
593 fn test_hot_path_block_timeout_various_formats() {
594 let test_cases = [
595 ("5s", Duration::from_secs(5)),
596 ("1m", Duration::from_secs(60)),
597 ("500ms", Duration::from_millis(500)),
598 ("2min", Duration::from_secs(120)),
599 ];
600
601 for (input, expected) in test_cases {
602 let config = HotPathConfig {
603 block_timeout: input.to_string(),
604 ..Default::default()
605 };
606 assert_eq!(config.block_timeout_duration(), expected, "Failed for input: {}", input);
607 }
608 }
609
610 #[test]
611 fn test_hot_path_block_timeout_invalid_fallback() {
612 let config = HotPathConfig {
613 block_timeout: "invalid".to_string(),
614 ..Default::default()
615 };
616 assert_eq!(config.block_timeout_duration(), Duration::from_secs(5));
618 }
619
620 #[test]
621 fn test_hot_path_rate_limit_config() {
622 let mut config = HotPathConfig::default();
623
624 assert!(config.rate_limit_config().is_none());
626
627 config.rate_limit_enabled = true;
629 config.rate_limit_per_sec = 5000;
630 config.rate_limit_burst = 500;
631
632 let rate_config = config.rate_limit_config().unwrap();
633 assert_eq!(rate_config.refill_rate, 5000);
634 assert_eq!(rate_config.burst_size, 500);
635 }
636
637 #[test]
638 fn test_hot_path_default() {
639 let config = HotPathConfig::default();
640 assert!(config.enabled);
641 assert_eq!(config.batch_size, 100);
642 assert_eq!(config.block_timeout, "5s");
643 assert!(!config.adaptive_batch_size);
644 assert_eq!(config.min_batch_size, 50);
645 assert_eq!(config.max_batch_size, 1000);
646 assert!(!config.rate_limit_enabled);
647 assert_eq!(config.rate_limit_per_sec, 10_000);
648 assert_eq!(config.rate_limit_burst, 1000);
649 }
650
651 #[test]
652 fn test_cold_path_interval() {
653 let config = ColdPathConfig {
654 enabled: true,
655 interval_sec: 120,
656 max_items_per_cycle: 500,
657 backoff_base_sec: 5,
658 backoff_max_sec: 300,
659 };
660 assert_eq!(config.interval(), Duration::from_secs(120));
661 }
662
663 #[test]
664 fn test_cold_path_default() {
665 let config = ColdPathConfig::default();
666 assert!(config.enabled);
667 assert_eq!(config.interval_sec, 60);
668 assert_eq!(config.max_items_per_cycle, 1000);
669 assert_eq!(config.backoff_base_sec, 5);
670 assert_eq!(config.backoff_max_sec, 300);
671 }
672
673 #[test]
674 fn test_cold_path_backoff() {
675 let config = ColdPathConfig {
676 enabled: true,
677 interval_sec: 60,
678 max_items_per_cycle: 1000,
679 backoff_base_sec: 5,
680 backoff_max_sec: 300,
681 };
682
683 assert_eq!(config.backoff_for_failures(0), Duration::from_secs(5));
685
686 assert_eq!(config.backoff_for_failures(1), Duration::from_secs(10));
688
689 assert_eq!(config.backoff_for_failures(2), Duration::from_secs(20));
691
692 assert_eq!(config.backoff_for_failures(3), Duration::from_secs(40));
694
695 assert_eq!(config.backoff_for_failures(6), Duration::from_secs(300));
697
698 assert_eq!(config.backoff_for_failures(10), Duration::from_secs(300));
700 }
701
702 #[test]
703 fn test_slo_config_default() {
704 let config = SloConfig::default();
705 assert_eq!(config.max_stream_read_latency_ms, 100);
706 assert_eq!(config.max_peer_op_latency_ms, 500);
707 assert_eq!(config.max_batch_flush_latency_ms, 200);
708 assert_eq!(config.max_replication_lag_sec, 30);
709 }
710
711 #[test]
712 fn test_slo_stream_read_violation() {
713 let config = SloConfig::default(); assert!(!config.is_stream_read_violation(Duration::from_millis(50)));
717 assert!(!config.is_stream_read_violation(Duration::from_millis(100)));
718
719 assert!(config.is_stream_read_violation(Duration::from_millis(101)));
721 assert!(config.is_stream_read_violation(Duration::from_secs(1)));
722 }
723
724 #[test]
725 fn test_slo_peer_op_violation() {
726 let config = SloConfig::default(); assert!(!config.is_peer_op_violation(Duration::from_millis(250)));
729 assert!(!config.is_peer_op_violation(Duration::from_millis(500)));
730 assert!(config.is_peer_op_violation(Duration::from_millis(501)));
731 }
732
733 #[test]
734 fn test_slo_batch_flush_violation() {
735 let config = SloConfig::default(); assert!(!config.is_batch_flush_violation(Duration::from_millis(100)));
738 assert!(!config.is_batch_flush_violation(Duration::from_millis(200)));
739 assert!(config.is_batch_flush_violation(Duration::from_millis(201)));
740 }
741
742 #[test]
743 fn test_peer_health_config_default() {
744 let config = PeerHealthConfig::default();
745 assert!(config.enabled);
746 assert_eq!(config.ping_interval_sec, 30);
747 assert_eq!(config.idle_threshold_sec, 60);
748 }
749
750 #[test]
751 fn test_cursor_config_default() {
752 let config = CursorConfig::default();
753 assert_eq!(config.sqlite_path, "replication_cursors.db");
754 assert!(config.wal_mode);
755 }
756
757 #[test]
758 fn test_cursor_config_in_memory() {
759 let config = CursorConfig::in_memory();
760 assert_eq!(config.sqlite_path, ":memory:");
761 assert!(!config.wal_mode);
762 }
763
764 #[test]
765 fn test_replication_engine_config_default() {
766 let config = ReplicationEngineConfig::default();
767 assert_eq!(config.local_node_id, "local.dev.node.default");
768 assert!(config.peers.is_empty());
769 assert!(config.settings.hot_path.enabled);
770 assert!(config.settings.cold_path.enabled);
771 }
772
773 #[test]
774 fn test_default_config_serializes() {
775 let config = ReplicationEngineConfig::default();
776 let json = serde_json::to_string_pretty(&config).unwrap();
777 assert!(json.contains("local.dev.node.default"));
778 }
779
780 #[test]
781 fn test_for_testing_config() {
782 let config = ReplicationEngineConfig::for_testing("test-node-1");
783 assert_eq!(config.local_node_id, "test-node-1");
784 assert_eq!(config.cursor.sqlite_path, ":memory:");
785 }
786
787 #[test]
788 fn test_replication_engine_settings_default() {
789 let settings = ReplicationEngineSettings::default();
790 assert!(settings.hot_path.enabled);
791 assert!(settings.cold_path.enabled);
792 assert!(settings.peer_health.enabled);
793 assert_eq!(settings.slo.max_replication_lag_sec, 30);
794 }
795
796 #[test]
797 fn test_config_json_roundtrip() {
798 let config = ReplicationEngineConfig {
799 local_node_id: "node-roundtrip".to_string(),
800 settings: ReplicationEngineSettings::default(),
801 peers: vec![
802 PeerConfig::for_testing("peer-1", "redis://peer1:6379"),
803 PeerConfig::for_testing("peer-2", "redis://peer2:6379"),
804 ],
805 cursor: CursorConfig::default(),
806 };
807
808 let json = serde_json::to_string(&config).unwrap();
809 let parsed: ReplicationEngineConfig = serde_json::from_str(&json).unwrap();
810
811 assert_eq!(parsed.local_node_id, "node-roundtrip");
812 assert_eq!(parsed.peers.len(), 2);
813 assert_eq!(parsed.peers[0].node_id, "peer-1");
814 assert_eq!(parsed.peers[1].node_id, "peer-2");
815 }
816
817 #[test]
818 fn test_hot_path_config_json_roundtrip() {
819 let config = HotPathConfig {
820 enabled: true,
821 batch_size: 200,
822 block_timeout: "10s".to_string(),
823 adaptive_batch_size: true,
824 min_batch_size: 25,
825 max_batch_size: 2000,
826 rate_limit_enabled: true,
827 rate_limit_per_sec: 5000,
828 rate_limit_burst: 500,
829 };
830
831 let json = serde_json::to_string(&config).unwrap();
832 let parsed: HotPathConfig = serde_json::from_str(&json).unwrap();
833
834 assert!(parsed.enabled);
835 assert_eq!(parsed.batch_size, 200);
836 assert_eq!(parsed.block_timeout, "10s");
837 assert!(parsed.adaptive_batch_size);
838 assert_eq!(parsed.min_batch_size, 25);
839 assert_eq!(parsed.max_batch_size, 2000);
840 assert!(parsed.rate_limit_enabled);
841 assert_eq!(parsed.rate_limit_per_sec, 5000);
842 assert_eq!(parsed.rate_limit_burst, 500);
843 }
844}