1use chrono::{DateTime, Utc};
50use serde::{Deserialize, Serialize};
51use std::collections::{HashMap, VecDeque};
52use torsh_core::error::TorshError;
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum ConsistencyLevel {
57 Eventual,
59 Quorum,
61 Strong,
63 Causal,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69pub enum ReplicationStrategy {
70 Synchronous,
72 Asynchronous,
74 SemiSynchronous,
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
80pub enum ConflictResolution {
81 LastWriteWins,
83 FirstWriteWins,
85 Custom,
87 Manual,
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
93pub enum NodeStatus {
94 Healthy,
96 Degraded,
98 Unhealthy,
100 Maintenance,
102 Offline,
104}
105
106impl Default for NodeStatus {
107 fn default() -> Self {
108 Self::Healthy
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ReplicationNode {
115 pub id: String,
117 pub region: String,
119 pub endpoint: String,
121 pub priority: u32,
123 pub capacity: u64,
125 #[serde(skip)]
127 pub status: NodeStatus,
128 #[serde(skip)]
130 pub last_health_check: Option<DateTime<Utc>>,
131 #[serde(skip)]
133 pub replication_lag_secs: f64,
134}
135
136impl ReplicationNode {
137 pub fn new(id: String, region: String, endpoint: String, priority: u32, capacity: u64) -> Self {
139 Self {
140 id,
141 region,
142 endpoint,
143 priority,
144 capacity,
145 status: NodeStatus::Healthy,
146 last_health_check: None,
147 replication_lag_secs: 0.0,
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ReplicationConfig {
155 pub consistency: ConsistencyLevel,
157 pub replication_factor: usize,
159 pub auto_failover: bool,
161 pub sync_interval_secs: u64,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct ReplicaMetadata {
168 pub package_id: String,
170 pub version: String,
172 pub node_id: String,
174 pub replica_version: u64,
176 pub checksum: String,
178 pub last_sync: DateTime<Utc>,
180 pub size_bytes: u64,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct ReplicationOperation {
187 pub id: String,
189 pub package_id: String,
191 pub version: String,
193 pub operation_type: String,
195 pub source_node: String,
197 pub target_nodes: Vec<String>,
199 pub timestamp: DateTime<Utc>,
201 pub status: OperationStatus,
203}
204
205#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
207pub enum OperationStatus {
208 Pending,
210 InProgress,
212 Completed,
214 Failed,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct ReplicationConflict {
221 pub id: String,
223 pub package_id: String,
225 pub version: String,
227 pub conflicting_replicas: Vec<ReplicaMetadata>,
229 pub detected_at: DateTime<Utc>,
231 pub resolved: bool,
233 pub resolution_strategy: Option<ConflictResolution>,
235}
236
237#[derive(Debug, Clone, Default, Serialize, Deserialize)]
239pub struct ReplicationStatistics {
240 pub total_nodes: usize,
242 pub healthy_nodes: usize,
244 pub total_replicas: usize,
246 pub total_operations: u64,
248 pub successful_operations: u64,
250 pub failed_operations: u64,
252 pub active_conflicts: usize,
254 pub avg_replication_lag_secs: f64,
256 pub total_bandwidth_bytes: u64,
258}
259
260pub struct ReplicationManager {
265 config: ReplicationConfig,
267 nodes: HashMap<String, ReplicationNode>,
269 replicas: HashMap<String, Vec<ReplicaMetadata>>,
271 operations: VecDeque<ReplicationOperation>,
273 conflicts: Vec<ReplicationConflict>,
275 statistics: ReplicationStatistics,
277}
278
279impl ReplicationManager {
280 pub fn new(config: ReplicationConfig) -> Self {
282 Self {
283 config,
284 nodes: HashMap::new(),
285 replicas: HashMap::new(),
286 operations: VecDeque::new(),
287 conflicts: Vec::new(),
288 statistics: ReplicationStatistics::default(),
289 }
290 }
291
292 pub fn add_node(&mut self, node: ReplicationNode) -> Result<(), TorshError> {
294 if self.nodes.contains_key(&node.id) {
295 return Err(TorshError::InvalidArgument(format!(
296 "Node {} already exists",
297 node.id
298 )));
299 }
300
301 self.nodes.insert(node.id.clone(), node);
302 self.update_statistics();
303
304 Ok(())
305 }
306
307 pub fn remove_node(&mut self, node_id: &str) -> Result<(), TorshError> {
309 self.nodes
310 .remove(node_id)
311 .ok_or_else(|| TorshError::InvalidArgument(format!("Node {} not found", node_id)))?;
312
313 self.redistribute_replicas(node_id)?;
315 self.update_statistics();
316
317 Ok(())
318 }
319
320 pub fn replicate_package(
322 &mut self,
323 package_id: &str,
324 version: &str,
325 _data: &[u8],
326 ) -> Result<(), TorshError> {
327 let target_nodes = self.select_replication_nodes(package_id)?;
329
330 let operation = ReplicationOperation {
332 id: uuid::Uuid::new_v4().to_string(),
333 package_id: package_id.to_string(),
334 version: version.to_string(),
335 operation_type: "Create".to_string(),
336 source_node: "primary".to_string(),
337 target_nodes: target_nodes.iter().map(|n| n.id.clone()).collect(),
338 timestamp: Utc::now(),
339 status: OperationStatus::Pending,
340 };
341
342 self.operations.push_back(operation);
343
344 match self.config.consistency {
346 ConsistencyLevel::Strong => self.replicate_synchronously(package_id, version)?,
347 ConsistencyLevel::Quorum => self.replicate_to_quorum(package_id, version)?,
348 ConsistencyLevel::Eventual | ConsistencyLevel::Causal => {
349 self.replicate_asynchronously(package_id, version)?
350 }
351 }
352
353 self.update_statistics();
354
355 Ok(())
356 }
357
358 pub fn get_package(&self, package_id: &str, version: &str) -> Result<String, TorshError> {
360 let key = format!("{}:{}", package_id, version);
361
362 let replicas = self
363 .replicas
364 .get(&key)
365 .ok_or_else(|| TorshError::InvalidArgument(format!("Package {} not found", key)))?;
366
367 let best_replica = self.select_best_replica(replicas)?;
369
370 Ok(best_replica.node_id.clone())
371 }
372
373 pub fn health_check(&mut self) -> Result<(), TorshError> {
375 let now = Utc::now();
376
377 let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
379
380 for node_id in node_ids {
381 let is_healthy = self.check_node_health(&node_id);
383
384 if let Some(node) = self.nodes.get_mut(&node_id) {
385 node.status = if is_healthy {
386 NodeStatus::Healthy
387 } else {
388 NodeStatus::Unhealthy
389 };
390
391 node.last_health_check = Some(now);
392 }
393 }
394
395 self.update_statistics();
396
397 if self.config.auto_failover {
399 self.handle_failover()?;
400 }
401
402 Ok(())
403 }
404
405 pub fn synchronize(&mut self) -> Result<(), TorshError> {
407 let mut to_sync = Vec::new();
409
410 for (key, replicas) in &self.replicas {
411 let max_version = replicas
412 .iter()
413 .map(|r| r.replica_version)
414 .max()
415 .unwrap_or(0);
416
417 for replica in replicas {
418 if replica.replica_version < max_version {
419 to_sync.push((key.clone(), replica.node_id.clone()));
420 }
421 }
422 }
423
424 for (key, node_id) in to_sync {
426 self.sync_replica(&key, &node_id)?;
427 }
428
429 Ok(())
430 }
431
432 pub fn resolve_conflicts(&mut self) -> Result<(), TorshError> {
434 let mut replicas_to_propagate = Vec::new();
435
436 for conflict in &mut self.conflicts {
438 if !conflict.resolved {
439 let strategy = ConflictResolution::LastWriteWins; match strategy {
443 ConflictResolution::LastWriteWins => {
444 if let Some(latest) = conflict
446 .conflicting_replicas
447 .iter()
448 .max_by_key(|r| r.last_sync)
449 .cloned()
450 {
451 replicas_to_propagate.push(latest);
452 conflict.resolved = true;
453 conflict.resolution_strategy = Some(strategy);
454 }
455 }
456 ConflictResolution::FirstWriteWins => {
457 if let Some(earliest) = conflict
459 .conflicting_replicas
460 .iter()
461 .min_by_key(|r| r.last_sync)
462 .cloned()
463 {
464 replicas_to_propagate.push(earliest);
465 conflict.resolved = true;
466 conflict.resolution_strategy = Some(strategy);
467 }
468 }
469 ConflictResolution::Custom | ConflictResolution::Manual => {
470 }
472 }
473 }
474 }
475
476 for replica in replicas_to_propagate {
478 self.propagate_replica(&replica)?;
479 }
480
481 self.conflicts.retain(|c| !c.resolved);
483
484 Ok(())
485 }
486
487 pub fn get_statistics(&self) -> &ReplicationStatistics {
489 &self.statistics
490 }
491
492 pub fn get_node_status(&self, node_id: &str) -> Option<NodeStatus> {
494 self.nodes.get(node_id).map(|n| n.status)
495 }
496
497 pub fn list_nodes(&self) -> Vec<&ReplicationNode> {
499 self.nodes.values().collect()
500 }
501
502 pub fn list_replicas(&self, package_id: &str, version: &str) -> Vec<&ReplicaMetadata> {
504 let key = format!("{}:{}", package_id, version);
505 self.replicas
506 .get(&key)
507 .map(|replicas| replicas.iter().collect())
508 .unwrap_or_default()
509 }
510
511 pub fn get_conflicts(&self) -> Vec<&ReplicationConflict> {
513 self.conflicts.iter().collect()
514 }
515
516 fn select_replication_nodes(
519 &self,
520 _package_id: &str,
521 ) -> Result<Vec<&ReplicationNode>, TorshError> {
522 let healthy_nodes: Vec<&ReplicationNode> = self
523 .nodes
524 .values()
525 .filter(|n| n.status == NodeStatus::Healthy)
526 .collect();
527
528 if healthy_nodes.is_empty() {
529 return Err(TorshError::RuntimeError(
530 "No healthy nodes available".to_string(),
531 ));
532 }
533
534 let mut selected: Vec<&ReplicationNode> = healthy_nodes
536 .into_iter()
537 .take(self.config.replication_factor)
538 .collect();
539
540 selected.sort_by(|a, b| b.priority.cmp(&a.priority));
542
543 Ok(selected)
544 }
545
546 fn replicate_synchronously(
547 &mut self,
548 _package_id: &str,
549 _version: &str,
550 ) -> Result<(), TorshError> {
551 if let Some(op) = self.operations.back_mut() {
553 op.status = OperationStatus::Completed;
554 }
555 Ok(())
556 }
557
558 fn replicate_to_quorum(&mut self, _package_id: &str, _version: &str) -> Result<(), TorshError> {
559 if let Some(op) = self.operations.back_mut() {
561 op.status = OperationStatus::Completed;
562 }
563 Ok(())
564 }
565
566 fn replicate_asynchronously(
567 &mut self,
568 _package_id: &str,
569 _version: &str,
570 ) -> Result<(), TorshError> {
571 if let Some(op) = self.operations.back_mut() {
573 op.status = OperationStatus::Completed;
574 }
575 Ok(())
576 }
577
578 fn select_best_replica<'a>(
579 &self,
580 replicas: &'a [ReplicaMetadata],
581 ) -> Result<&'a ReplicaMetadata, TorshError> {
582 replicas
584 .iter()
585 .filter(|r| {
586 self.nodes
587 .get(&r.node_id)
588 .map(|n| n.status == NodeStatus::Healthy)
589 .unwrap_or(false)
590 })
591 .min_by(|a, b| {
592 let a_lag = self
593 .nodes
594 .get(&a.node_id)
595 .map(|n| n.replication_lag_secs)
596 .unwrap_or(f64::MAX);
597 let b_lag = self
598 .nodes
599 .get(&b.node_id)
600 .map(|n| n.replication_lag_secs)
601 .unwrap_or(f64::MAX);
602 a_lag
603 .partial_cmp(&b_lag)
604 .expect("replication lag comparison should succeed (f64::MAX is valid)")
605 })
606 .ok_or_else(|| TorshError::RuntimeError("No healthy replicas".to_string()))
607 }
608
609 fn check_node_health(&self, _node_id: &str) -> bool {
610 true
612 }
613
614 fn handle_failover(&mut self) -> Result<(), TorshError> {
615 let unhealthy_nodes: Vec<String> = self
616 .nodes
617 .iter()
618 .filter(|(_, n)| n.status == NodeStatus::Unhealthy)
619 .map(|(id, _)| id.clone())
620 .collect();
621
622 for node_id in unhealthy_nodes {
623 self.redistribute_replicas(&node_id)?;
625 }
626
627 Ok(())
628 }
629
630 fn redistribute_replicas(&mut self, node_id: &str) -> Result<(), TorshError> {
631 let mut replicas_to_move = Vec::new();
633
634 for (key, replicas) in &self.replicas {
635 if replicas.iter().any(|r| r.node_id == node_id) {
636 replicas_to_move.push(key.clone());
637 }
638 }
639
640 for key in replicas_to_move {
642 if let Some(replicas) = self.replicas.get_mut(&key) {
643 replicas.retain(|r| r.node_id != node_id);
644
645 if replicas.len() < self.config.replication_factor {
647 }
650 }
651 }
652
653 Ok(())
654 }
655
656 fn sync_replica(&mut self, _key: &str, _node_id: &str) -> Result<(), TorshError> {
657 Ok(())
659 }
660
661 fn propagate_replica(&mut self, _replica: &ReplicaMetadata) -> Result<(), TorshError> {
662 Ok(())
664 }
665
666 fn update_statistics(&mut self) {
667 let mut stats = ReplicationStatistics::default();
668
669 stats.total_nodes = self.nodes.len();
670 stats.healthy_nodes = self
671 .nodes
672 .values()
673 .filter(|n| n.status == NodeStatus::Healthy)
674 .count();
675
676 stats.total_replicas = self.replicas.values().map(|v| v.len()).sum();
677
678 stats.total_operations = self.operations.len() as u64;
679 stats.successful_operations = self
680 .operations
681 .iter()
682 .filter(|op| op.status == OperationStatus::Completed)
683 .count() as u64;
684 stats.failed_operations = self
685 .operations
686 .iter()
687 .filter(|op| op.status == OperationStatus::Failed)
688 .count() as u64;
689
690 stats.active_conflicts = self.conflicts.len();
691
692 let total_lag: f64 = self.nodes.values().map(|n| n.replication_lag_secs).sum();
694 stats.avg_replication_lag_secs = if !self.nodes.is_empty() {
695 total_lag / self.nodes.len() as f64
696 } else {
697 0.0
698 };
699
700 self.statistics = stats;
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707
708 fn create_test_config() -> ReplicationConfig {
709 ReplicationConfig {
710 consistency: ConsistencyLevel::Eventual,
711 replication_factor: 3,
712 auto_failover: true,
713 sync_interval_secs: 60,
714 }
715 }
716
717 #[test]
718 fn test_replication_manager_creation() {
719 let config = create_test_config();
720 let manager = ReplicationManager::new(config);
721 let stats = manager.get_statistics();
722 assert_eq!(stats.total_nodes, 0);
723 }
724
725 #[test]
726 fn test_add_node() {
727 let config = create_test_config();
728 let mut manager = ReplicationManager::new(config);
729
730 let node = ReplicationNode::new(
731 "node1".to_string(),
732 "us-east-1".to_string(),
733 "https://node1.example.com".to_string(),
734 1,
735 1024 * 1024 * 1024,
736 );
737
738 manager.add_node(node).unwrap();
739
740 let stats = manager.get_statistics();
741 assert_eq!(stats.total_nodes, 1);
742 assert_eq!(stats.healthy_nodes, 1);
743 }
744
745 #[test]
746 fn test_remove_node() {
747 let config = create_test_config();
748 let mut manager = ReplicationManager::new(config);
749
750 let node = ReplicationNode::new(
751 "node1".to_string(),
752 "us-east-1".to_string(),
753 "https://node1.example.com".to_string(),
754 1,
755 1024 * 1024 * 1024,
756 );
757
758 manager.add_node(node).unwrap();
759 assert_eq!(manager.get_statistics().total_nodes, 1);
760
761 manager.remove_node("node1").unwrap();
762 assert_eq!(manager.get_statistics().total_nodes, 0);
763 }
764
765 #[test]
766 fn test_replicate_package() {
767 let config = create_test_config();
768 let mut manager = ReplicationManager::new(config);
769
770 for i in 1..=3 {
772 let node = ReplicationNode::new(
773 format!("node{}", i),
774 "us-east-1".to_string(),
775 format!("https://node{}.example.com", i),
776 i,
777 1024 * 1024 * 1024,
778 );
779 manager.add_node(node).unwrap();
780 }
781
782 let result = manager.replicate_package("test-pkg", "1.0.0", b"data");
784 assert!(result.is_ok());
785
786 let stats = manager.get_statistics();
787 assert!(stats.successful_operations > 0);
788 }
789
790 #[test]
791 fn test_health_check() {
792 let config = create_test_config();
793 let mut manager = ReplicationManager::new(config);
794
795 let node = ReplicationNode::new(
796 "node1".to_string(),
797 "us-east-1".to_string(),
798 "https://node1.example.com".to_string(),
799 1,
800 1024 * 1024 * 1024,
801 );
802
803 manager.add_node(node).unwrap();
804
805 manager.health_check().unwrap();
806
807 let status = manager.get_node_status("node1");
808 assert!(status.is_some());
809 }
810
811 #[test]
812 fn test_list_nodes() {
813 let config = create_test_config();
814 let mut manager = ReplicationManager::new(config);
815
816 for i in 1..=3 {
817 let node = ReplicationNode::new(
818 format!("node{}", i),
819 "us-east-1".to_string(),
820 format!("https://node{}.example.com", i),
821 i,
822 1024 * 1024 * 1024,
823 );
824 manager.add_node(node).unwrap();
825 }
826
827 let nodes = manager.list_nodes();
828 assert_eq!(nodes.len(), 3);
829 }
830
831 #[test]
832 fn test_consistency_levels() {
833 let configs = vec![
834 ConsistencyLevel::Eventual,
835 ConsistencyLevel::Quorum,
836 ConsistencyLevel::Strong,
837 ConsistencyLevel::Causal,
838 ];
839
840 for consistency in configs {
841 let config = ReplicationConfig {
842 consistency,
843 replication_factor: 3,
844 auto_failover: true,
845 sync_interval_secs: 60,
846 };
847
848 let manager = ReplicationManager::new(config);
849 assert_eq!(manager.config.consistency, consistency);
850 }
851 }
852
853 #[test]
854 fn test_replication_statistics() {
855 let config = create_test_config();
856 let mut manager = ReplicationManager::new(config);
857
858 for i in 1..=5 {
860 let node = ReplicationNode::new(
861 format!("node{}", i),
862 "us-east-1".to_string(),
863 format!("https://node{}.example.com", i),
864 i,
865 1024 * 1024 * 1024,
866 );
867 manager.add_node(node).unwrap();
868 }
869
870 manager.replicate_package("pkg1", "1.0.0", b"data").unwrap();
871 manager.replicate_package("pkg2", "1.0.0", b"data").unwrap();
872
873 let stats = manager.get_statistics();
874 assert_eq!(stats.total_nodes, 5);
875 assert_eq!(stats.healthy_nodes, 5);
876 assert!(stats.successful_operations >= 2);
877 }
878}