1use serde::{Deserialize, Serialize};
61use std::time::Duration;
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ReplicationConfig {
77 pub local_node_id: String,
80
81 pub settings: ReplicationSettings,
83
84 pub peers: Vec<PeerConfig>,
87
88 #[serde(default)]
91 pub cursor: CursorConfig,
92}
93
94impl Default for ReplicationConfig {
95 fn default() -> Self {
96 Self {
97 local_node_id: "local.dev.node.default".to_string(),
98 settings: ReplicationSettings::default(),
99 peers: Vec::new(),
100 cursor: CursorConfig::default(),
101 }
102 }
103}
104
105impl ReplicationConfig {
106 pub fn for_testing(local_node_id: &str) -> Self {
108 Self {
109 local_node_id: local_node_id.to_string(),
110 settings: ReplicationSettings::default(),
111 peers: Vec::new(),
112 cursor: CursorConfig::in_memory(),
113 }
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
123#[derive(Default)]
124pub struct ReplicationSettings {
125 #[serde(default)]
126 pub hot_path: HotPathConfig,
127 #[serde(default)]
128 pub cold_path: ColdPathConfig,
129 #[serde(default)]
130 pub peer_health: PeerHealthConfig,
131 #[serde(default)]
132 pub slo: SloConfig,
133}
134
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct SloConfig {
147 #[serde(default = "default_max_stream_read_latency_ms")]
150 pub max_stream_read_latency_ms: u64,
151
152 #[serde(default = "default_max_peer_op_latency_ms")]
155 pub max_peer_op_latency_ms: u64,
156
157 #[serde(default = "default_max_batch_flush_latency_ms")]
159 pub max_batch_flush_latency_ms: u64,
160
161 #[serde(default = "default_max_replication_lag_sec")]
164 pub max_replication_lag_sec: u64,
165}
166
167fn default_max_stream_read_latency_ms() -> u64 {
168 100 }
170
171fn default_max_peer_op_latency_ms() -> u64 {
172 500 }
174
175fn default_max_batch_flush_latency_ms() -> u64 {
176 200 }
178
179fn default_max_replication_lag_sec() -> u64 {
180 30 }
182
183impl Default for SloConfig {
184 fn default() -> Self {
185 Self {
186 max_stream_read_latency_ms: 100,
187 max_peer_op_latency_ms: 500,
188 max_batch_flush_latency_ms: 200,
189 max_replication_lag_sec: 30,
190 }
191 }
192}
193
194impl SloConfig {
195 pub fn is_stream_read_violation(&self, latency: Duration) -> bool {
197 latency.as_millis() as u64 > self.max_stream_read_latency_ms
198 }
199
200 pub fn is_peer_op_violation(&self, latency: Duration) -> bool {
202 latency.as_millis() as u64 > self.max_peer_op_latency_ms
203 }
204
205 pub fn is_batch_flush_violation(&self, latency: Duration) -> bool {
207 latency.as_millis() as u64 > self.max_batch_flush_latency_ms
208 }
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct PeerHealthConfig {
214 #[serde(default = "default_true")]
216 pub enabled: bool,
217
218 #[serde(default = "default_ping_interval_sec")]
220 pub ping_interval_sec: u64,
221
222 #[serde(default = "default_idle_threshold_sec")]
224 pub idle_threshold_sec: u64,
225}
226
227fn default_ping_interval_sec() -> u64 {
228 30
229}
230
231fn default_idle_threshold_sec() -> u64 {
232 60
233}
234
235impl Default for PeerHealthConfig {
236 fn default() -> Self {
237 Self {
238 enabled: true,
239 ping_interval_sec: 30,
240 idle_threshold_sec: 60,
241 }
242 }
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct PeerConfig {
254 pub node_id: String,
256
257 pub redis_url: String,
260
261 #[serde(default)]
264 pub priority: u32,
265
266 #[serde(default = "default_circuit_failure_threshold")]
268 pub circuit_failure_threshold: u32,
269
270 #[serde(default = "default_circuit_reset_timeout")]
272 pub circuit_reset_timeout_sec: u64,
273
274 #[serde(default)]
277 pub redis_prefix: Option<String>,
278}
279
280fn default_circuit_failure_threshold() -> u32 {
281 5
282}
283
284fn default_circuit_reset_timeout() -> u64 {
285 30
286}
287
288impl PeerConfig {
289 pub fn cdc_stream_key(&self) -> String {
292 let prefix = self.redis_prefix.as_deref().unwrap_or("");
293 format!("{}__local__:cdc", prefix)
294 }
295
296 pub fn for_testing(node_id: &str, redis_url: &str) -> Self {
298 Self {
299 node_id: node_id.to_string(),
300 redis_url: redis_url.to_string(),
301 priority: 0,
302 circuit_failure_threshold: 5,
303 circuit_reset_timeout_sec: 30,
304 redis_prefix: None,
305 }
306 }
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct HotPathConfig {
316 #[serde(default = "default_true")]
318 pub enabled: bool,
319
320 #[serde(default = "default_batch_size")]
322 pub batch_size: usize,
323
324 #[serde(default = "default_block_timeout")]
327 pub block_timeout: String,
328
329 #[serde(default = "default_false")]
334 pub adaptive_batch_size: bool,
335
336 #[serde(default = "default_min_batch_size")]
338 pub min_batch_size: usize,
339
340 #[serde(default = "default_max_batch_size")]
342 pub max_batch_size: usize,
343
344 #[serde(default = "default_false")]
351 pub rate_limit_enabled: bool,
352
353 #[serde(default = "default_rate_limit_per_sec")]
356 pub rate_limit_per_sec: u32,
357
358 #[serde(default = "default_rate_limit_burst")]
361 pub rate_limit_burst: u32,
362}
363
364fn default_rate_limit_per_sec() -> u32 {
365 10_000 }
367
368fn default_rate_limit_burst() -> u32 {
369 1000 }
371
372fn default_true() -> bool {
373 true
374}
375
376fn default_false() -> bool {
377 false
378}
379
380fn default_batch_size() -> usize {
381 100
382}
383
384fn default_min_batch_size() -> usize {
385 50
386}
387
388fn default_max_batch_size() -> usize {
389 1000
390}
391
392fn default_block_timeout() -> String {
393 "5s".to_string()
394}
395
396impl Default for HotPathConfig {
397 fn default() -> Self {
398 Self {
399 enabled: true,
400 batch_size: 100,
401 block_timeout: "5s".to_string(),
402 adaptive_batch_size: false,
403 min_batch_size: 50,
404 max_batch_size: 1000,
405 rate_limit_enabled: false,
406 rate_limit_per_sec: 10_000,
407 rate_limit_burst: 1000,
408 }
409 }
410}
411
412impl HotPathConfig {
413 pub fn block_timeout_duration(&self) -> Duration {
415 humantime::parse_duration(&self.block_timeout).unwrap_or(Duration::from_secs(5))
416 }
417
418 pub fn rate_limit_config(&self) -> Option<crate::resilience::RateLimitConfig> {
422 if self.rate_limit_enabled {
423 Some(crate::resilience::RateLimitConfig {
424 burst_size: self.rate_limit_burst,
425 refill_rate: self.rate_limit_per_sec,
426 })
427 } else {
428 None
429 }
430 }
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ColdPathConfig {
440 #[serde(default = "default_true")]
442 pub enabled: bool,
443
444 #[serde(default = "default_interval_sec")]
446 pub interval_sec: u64,
447
448 #[serde(default = "default_max_items_per_cycle")]
451 pub max_items_per_cycle: usize,
452
453 #[serde(default = "default_backoff_base_sec")]
456 pub backoff_base_sec: u64,
457
458 #[serde(default = "default_backoff_max_sec")]
460 pub backoff_max_sec: u64,
461}
462
463fn default_interval_sec() -> u64 {
464 60
465}
466
467fn default_max_items_per_cycle() -> usize {
468 1000
469}
470
471fn default_backoff_base_sec() -> u64 {
472 5
473}
474
475fn default_backoff_max_sec() -> u64 {
476 300 }
478
479impl Default for ColdPathConfig {
480 fn default() -> Self {
481 Self {
482 enabled: true,
483 interval_sec: 60,
484 max_items_per_cycle: 1000,
485 backoff_base_sec: 5,
486 backoff_max_sec: 300,
487 }
488 }
489}
490
491impl ColdPathConfig {
492 pub fn interval(&self) -> Duration {
494 Duration::from_secs(self.interval_sec)
495 }
496
497 pub fn backoff_for_failures(&self, consecutive_failures: u32) -> Duration {
499 let backoff_secs = self.backoff_base_sec.saturating_mul(
500 2u64.saturating_pow(consecutive_failures)
501 );
502 Duration::from_secs(backoff_secs.min(self.backoff_max_sec))
503 }
504}
505
506#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct CursorConfig {
516 pub sqlite_path: String,
518
519 #[serde(default = "default_true")]
521 pub wal_mode: bool,
522}
523
524impl Default for CursorConfig {
525 fn default() -> Self {
526 Self {
527 sqlite_path: "replication_cursors.db".to_string(),
528 wal_mode: true,
529 }
530 }
531}
532
533impl CursorConfig {
534 pub fn in_memory() -> Self {
536 Self {
537 sqlite_path: ":memory:".to_string(),
538 wal_mode: false,
539 }
540 }
541}
542
543#[cfg(test)]
548mod tests {
549 use super::*;
550
551 #[test]
552 fn test_peer_cdc_stream_key() {
553 let peer = PeerConfig::for_testing("test-node", "redis://localhost:6379");
554 assert_eq!(peer.cdc_stream_key(), "__local__:cdc");
555 }
556
557 #[test]
558 fn test_peer_config_defaults() {
559 let peer = PeerConfig::for_testing("node-1", "redis://host:6379");
560 assert_eq!(peer.node_id, "node-1");
561 assert_eq!(peer.redis_url, "redis://host:6379");
562 assert_eq!(peer.priority, 0);
563 assert_eq!(peer.circuit_failure_threshold, 5);
564 assert_eq!(peer.circuit_reset_timeout_sec, 30);
565 }
566
567 #[test]
568 fn test_hot_path_block_timeout_parsing() {
569 let config = HotPathConfig {
570 enabled: true,
571 batch_size: 50,
572 block_timeout: "10s".to_string(),
573 adaptive_batch_size: false,
574 min_batch_size: 50,
575 max_batch_size: 1000,
576 rate_limit_enabled: false,
577 rate_limit_per_sec: 10_000,
578 rate_limit_burst: 1000,
579 };
580 assert_eq!(config.block_timeout_duration(), Duration::from_secs(10));
581 }
582
583 #[test]
584 fn test_hot_path_block_timeout_various_formats() {
585 let test_cases = [
586 ("5s", Duration::from_secs(5)),
587 ("1m", Duration::from_secs(60)),
588 ("500ms", Duration::from_millis(500)),
589 ("2min", Duration::from_secs(120)),
590 ];
591
592 for (input, expected) in test_cases {
593 let config = HotPathConfig {
594 block_timeout: input.to_string(),
595 ..Default::default()
596 };
597 assert_eq!(config.block_timeout_duration(), expected, "Failed for input: {}", input);
598 }
599 }
600
601 #[test]
602 fn test_hot_path_block_timeout_invalid_fallback() {
603 let config = HotPathConfig {
604 block_timeout: "invalid".to_string(),
605 ..Default::default()
606 };
607 assert_eq!(config.block_timeout_duration(), Duration::from_secs(5));
609 }
610
611 #[test]
612 fn test_hot_path_rate_limit_config() {
613 let mut config = HotPathConfig::default();
614
615 assert!(config.rate_limit_config().is_none());
617
618 config.rate_limit_enabled = true;
620 config.rate_limit_per_sec = 5000;
621 config.rate_limit_burst = 500;
622
623 let rate_config = config.rate_limit_config().unwrap();
624 assert_eq!(rate_config.refill_rate, 5000);
625 assert_eq!(rate_config.burst_size, 500);
626 }
627
628 #[test]
629 fn test_hot_path_default() {
630 let config = HotPathConfig::default();
631 assert!(config.enabled);
632 assert_eq!(config.batch_size, 100);
633 assert_eq!(config.block_timeout, "5s");
634 assert!(!config.adaptive_batch_size);
635 assert_eq!(config.min_batch_size, 50);
636 assert_eq!(config.max_batch_size, 1000);
637 assert!(!config.rate_limit_enabled);
638 assert_eq!(config.rate_limit_per_sec, 10_000);
639 assert_eq!(config.rate_limit_burst, 1000);
640 }
641
642 #[test]
643 fn test_cold_path_interval() {
644 let config = ColdPathConfig {
645 enabled: true,
646 interval_sec: 120,
647 max_items_per_cycle: 500,
648 backoff_base_sec: 5,
649 backoff_max_sec: 300,
650 };
651 assert_eq!(config.interval(), Duration::from_secs(120));
652 }
653
654 #[test]
655 fn test_cold_path_default() {
656 let config = ColdPathConfig::default();
657 assert!(config.enabled);
658 assert_eq!(config.interval_sec, 60);
659 assert_eq!(config.max_items_per_cycle, 1000);
660 assert_eq!(config.backoff_base_sec, 5);
661 assert_eq!(config.backoff_max_sec, 300);
662 }
663
664 #[test]
665 fn test_cold_path_backoff() {
666 let config = ColdPathConfig {
667 enabled: true,
668 interval_sec: 60,
669 max_items_per_cycle: 1000,
670 backoff_base_sec: 5,
671 backoff_max_sec: 300,
672 };
673
674 assert_eq!(config.backoff_for_failures(0), Duration::from_secs(5));
676
677 assert_eq!(config.backoff_for_failures(1), Duration::from_secs(10));
679
680 assert_eq!(config.backoff_for_failures(2), Duration::from_secs(20));
682
683 assert_eq!(config.backoff_for_failures(3), Duration::from_secs(40));
685
686 assert_eq!(config.backoff_for_failures(6), Duration::from_secs(300));
688
689 assert_eq!(config.backoff_for_failures(10), Duration::from_secs(300));
691 }
692
693 #[test]
694 fn test_slo_config_default() {
695 let config = SloConfig::default();
696 assert_eq!(config.max_stream_read_latency_ms, 100);
697 assert_eq!(config.max_peer_op_latency_ms, 500);
698 assert_eq!(config.max_batch_flush_latency_ms, 200);
699 assert_eq!(config.max_replication_lag_sec, 30);
700 }
701
702 #[test]
703 fn test_slo_stream_read_violation() {
704 let config = SloConfig::default(); assert!(!config.is_stream_read_violation(Duration::from_millis(50)));
708 assert!(!config.is_stream_read_violation(Duration::from_millis(100)));
709
710 assert!(config.is_stream_read_violation(Duration::from_millis(101)));
712 assert!(config.is_stream_read_violation(Duration::from_secs(1)));
713 }
714
715 #[test]
716 fn test_slo_peer_op_violation() {
717 let config = SloConfig::default(); assert!(!config.is_peer_op_violation(Duration::from_millis(250)));
720 assert!(!config.is_peer_op_violation(Duration::from_millis(500)));
721 assert!(config.is_peer_op_violation(Duration::from_millis(501)));
722 }
723
724 #[test]
725 fn test_slo_batch_flush_violation() {
726 let config = SloConfig::default(); assert!(!config.is_batch_flush_violation(Duration::from_millis(100)));
729 assert!(!config.is_batch_flush_violation(Duration::from_millis(200)));
730 assert!(config.is_batch_flush_violation(Duration::from_millis(201)));
731 }
732
733 #[test]
734 fn test_peer_health_config_default() {
735 let config = PeerHealthConfig::default();
736 assert!(config.enabled);
737 assert_eq!(config.ping_interval_sec, 30);
738 assert_eq!(config.idle_threshold_sec, 60);
739 }
740
741 #[test]
742 fn test_cursor_config_default() {
743 let config = CursorConfig::default();
744 assert_eq!(config.sqlite_path, "replication_cursors.db");
745 assert!(config.wal_mode);
746 }
747
748 #[test]
749 fn test_cursor_config_in_memory() {
750 let config = CursorConfig::in_memory();
751 assert_eq!(config.sqlite_path, ":memory:");
752 assert!(!config.wal_mode);
753 }
754
755 #[test]
756 fn test_replication_config_default() {
757 let config = ReplicationConfig::default();
758 assert_eq!(config.local_node_id, "local.dev.node.default");
759 assert!(config.peers.is_empty());
760 assert!(config.settings.hot_path.enabled);
761 assert!(config.settings.cold_path.enabled);
762 }
763
764 #[test]
765 fn test_default_config_serializes() {
766 let config = ReplicationConfig::default();
767 let json = serde_json::to_string_pretty(&config).unwrap();
768 assert!(json.contains("local.dev.node.default"));
769 }
770
771 #[test]
772 fn test_for_testing_config() {
773 let config = ReplicationConfig::for_testing("test-node-1");
774 assert_eq!(config.local_node_id, "test-node-1");
775 assert_eq!(config.cursor.sqlite_path, ":memory:");
776 }
777
778 #[test]
779 fn test_replication_settings_default() {
780 let settings = ReplicationSettings::default();
781 assert!(settings.hot_path.enabled);
782 assert!(settings.cold_path.enabled);
783 assert!(settings.peer_health.enabled);
784 assert_eq!(settings.slo.max_replication_lag_sec, 30);
785 }
786
787 #[test]
788 fn test_config_json_roundtrip() {
789 let config = ReplicationConfig {
790 local_node_id: "node-roundtrip".to_string(),
791 settings: ReplicationSettings::default(),
792 peers: vec![
793 PeerConfig::for_testing("peer-1", "redis://peer1:6379"),
794 PeerConfig::for_testing("peer-2", "redis://peer2:6379"),
795 ],
796 cursor: CursorConfig::default(),
797 };
798
799 let json = serde_json::to_string(&config).unwrap();
800 let parsed: ReplicationConfig = serde_json::from_str(&json).unwrap();
801
802 assert_eq!(parsed.local_node_id, "node-roundtrip");
803 assert_eq!(parsed.peers.len(), 2);
804 assert_eq!(parsed.peers[0].node_id, "peer-1");
805 assert_eq!(parsed.peers[1].node_id, "peer-2");
806 }
807
808 #[test]
809 fn test_hot_path_config_json_roundtrip() {
810 let config = HotPathConfig {
811 enabled: true,
812 batch_size: 200,
813 block_timeout: "10s".to_string(),
814 adaptive_batch_size: true,
815 min_batch_size: 25,
816 max_batch_size: 2000,
817 rate_limit_enabled: true,
818 rate_limit_per_sec: 5000,
819 rate_limit_burst: 500,
820 };
821
822 let json = serde_json::to_string(&config).unwrap();
823 let parsed: HotPathConfig = serde_json::from_str(&json).unwrap();
824
825 assert!(parsed.enabled);
826 assert_eq!(parsed.batch_size, 200);
827 assert_eq!(parsed.block_timeout, "10s");
828 assert!(parsed.adaptive_batch_size);
829 assert_eq!(parsed.min_batch_size, 25);
830 assert_eq!(parsed.max_batch_size, 2000);
831 assert!(parsed.rate_limit_enabled);
832 assert_eq!(parsed.rate_limit_per_sec, 5000);
833 assert_eq!(parsed.rate_limit_burst, 500);
834 }
835}