Skip to main content

torsh_package/
replication.rs

1//! Package Replication and High Availability
2//!
3//! This module provides comprehensive replication and high availability capabilities
4//! for package distribution across multiple regions and nodes, including consistency
5//! management, automatic failover, load balancing, and conflict resolution.
6//!
7//! # Features
8//!
9//! - **Multi-Region Replication**: Replicate packages across geographic regions
10//! - **Consistency Models**: Support for eventual, strong, and causal consistency
11//! - **Automatic Failover**: Detect and recover from node failures
12//! - **Load Balancing**: Distribute requests across healthy replicas
13//! - **Conflict Resolution**: Handle concurrent updates with configurable strategies
14//! - **Topology Management**: Configure and manage replication topologies
15//! - **Health Monitoring**: Track replica health and synchronization status
16//! - **Split-Brain Detection**: Detect and resolve network partitions
17//!
18//! # Examples
19//!
20//! ```rust
21//! use torsh_package::replication::{
22//!     ReplicationManager, ReplicationConfig, ConsistencyLevel, ReplicationNode
23//! };
24//!
25//! // Create replication manager
26//! let config = ReplicationConfig {
27//!     consistency: ConsistencyLevel::Eventual,
28//!     replication_factor: 3,
29//!     auto_failover: true,
30//!     sync_interval_secs: 60,
31//! };
32//!
33//! let mut manager = ReplicationManager::new(config);
34//!
35//! // Add replication nodes
36//! let node = ReplicationNode::new(
37//!     "node1".to_string(),
38//!     "us-east-1".to_string(),
39//!     "https://node1.example.com".to_string(),
40//!     1,
41//!     1000,
42//! );
43//! manager.add_node(node).unwrap();
44//!
45//! // Replicate a package
46//! manager.replicate_package("my-package", "1.0.0", b"package data").unwrap();
47//! ```
48
49use chrono::{DateTime, Utc};
50use serde::{Deserialize, Serialize};
51use std::collections::{HashMap, VecDeque};
52use torsh_core::error::TorshError;
53
54/// Consistency level for replication
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum ConsistencyLevel {
57    /// Eventual consistency - fastest, weakest guarantees
58    Eventual,
59    /// Quorum consistency - balanced performance and consistency
60    Quorum,
61    /// Strong consistency - slowest, strongest guarantees
62    Strong,
63    /// Causal consistency - preserves causality
64    Causal,
65}
66
67/// Replication strategy
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69pub enum ReplicationStrategy {
70    /// Synchronous replication - wait for all replicas
71    Synchronous,
72    /// Asynchronous replication - fire and forget
73    Asynchronous,
74    /// Semi-synchronous - wait for majority
75    SemiSynchronous,
76}
77
78/// Conflict resolution strategy
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
80pub enum ConflictResolution {
81    /// Last write wins based on timestamp
82    LastWriteWins,
83    /// First write wins
84    FirstWriteWins,
85    /// Custom merge strategy
86    Custom,
87    /// Manual resolution required
88    Manual,
89}
90
91/// Replication node status
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
93pub enum NodeStatus {
94    /// Node is healthy and replicating
95    Healthy,
96    /// Node is experiencing degraded performance
97    Degraded,
98    /// Node is unhealthy and not replicating
99    Unhealthy,
100    /// Node is in maintenance mode
101    Maintenance,
102    /// Node is offline
103    Offline,
104}
105
106impl Default for NodeStatus {
107    fn default() -> Self {
108        Self::Healthy
109    }
110}
111
112/// Replication node
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ReplicationNode {
115    /// Unique node identifier
116    pub id: String,
117    /// Geographic region
118    pub region: String,
119    /// Node endpoint URL
120    pub endpoint: String,
121    /// Priority (higher = preferred)
122    pub priority: u32,
123    /// Storage capacity in bytes
124    pub capacity: u64,
125    /// Current status
126    #[serde(skip)]
127    pub status: NodeStatus,
128    /// Last health check timestamp
129    #[serde(skip)]
130    pub last_health_check: Option<DateTime<Utc>>,
131    /// Replication lag in seconds
132    #[serde(skip)]
133    pub replication_lag_secs: f64,
134}
135
136impl ReplicationNode {
137    /// Create a new replication node
138    pub fn new(id: String, region: String, endpoint: String, priority: u32, capacity: u64) -> Self {
139        Self {
140            id,
141            region,
142            endpoint,
143            priority,
144            capacity,
145            status: NodeStatus::Healthy,
146            last_health_check: None,
147            replication_lag_secs: 0.0,
148        }
149    }
150}
151
152/// Replication configuration
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ReplicationConfig {
155    /// Consistency level
156    pub consistency: ConsistencyLevel,
157    /// Number of replicas to maintain
158    pub replication_factor: usize,
159    /// Enable automatic failover
160    pub auto_failover: bool,
161    /// Synchronization interval in seconds
162    pub sync_interval_secs: u64,
163}
164
165/// Package replica metadata
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct ReplicaMetadata {
168    /// Package ID
169    pub package_id: String,
170    /// Package version
171    pub version: String,
172    /// Node ID where replica is stored
173    pub node_id: String,
174    /// Replica version/timestamp
175    pub replica_version: u64,
176    /// Checksum for integrity
177    pub checksum: String,
178    /// Last synchronized timestamp
179    pub last_sync: DateTime<Utc>,
180    /// Size in bytes
181    pub size_bytes: u64,
182}
183
184/// Replication operation
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct ReplicationOperation {
187    /// Operation ID
188    pub id: String,
189    /// Package ID
190    pub package_id: String,
191    /// Package version
192    pub version: String,
193    /// Operation type (Create, Update, Delete)
194    pub operation_type: String,
195    /// Source node ID
196    pub source_node: String,
197    /// Target node IDs
198    pub target_nodes: Vec<String>,
199    /// Operation timestamp
200    pub timestamp: DateTime<Utc>,
201    /// Operation status
202    pub status: OperationStatus,
203}
204
205/// Operation status
206#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
207pub enum OperationStatus {
208    /// Operation is pending
209    Pending,
210    /// Operation is in progress
211    InProgress,
212    /// Operation completed successfully
213    Completed,
214    /// Operation failed
215    Failed,
216}
217
218/// Replication conflict
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct ReplicationConflict {
221    /// Conflict ID
222    pub id: String,
223    /// Package ID
224    pub package_id: String,
225    /// Package version
226    pub version: String,
227    /// Conflicting replicas
228    pub conflicting_replicas: Vec<ReplicaMetadata>,
229    /// Conflict detection timestamp
230    pub detected_at: DateTime<Utc>,
231    /// Resolution status
232    pub resolved: bool,
233    /// Resolution strategy used
234    pub resolution_strategy: Option<ConflictResolution>,
235}
236
237/// Replication statistics
238#[derive(Debug, Clone, Default, Serialize, Deserialize)]
239pub struct ReplicationStatistics {
240    /// Total nodes
241    pub total_nodes: usize,
242    /// Healthy nodes
243    pub healthy_nodes: usize,
244    /// Total replicas
245    pub total_replicas: usize,
246    /// Total replication operations
247    pub total_operations: u64,
248    /// Successful operations
249    pub successful_operations: u64,
250    /// Failed operations
251    pub failed_operations: u64,
252    /// Active conflicts
253    pub active_conflicts: usize,
254    /// Average replication lag (seconds)
255    pub avg_replication_lag_secs: f64,
256    /// Total bandwidth used (bytes)
257    pub total_bandwidth_bytes: u64,
258}
259
260/// Replication manager
261///
262/// Manages package replication across multiple nodes with configurable
263/// consistency levels, automatic failover, and conflict resolution.
264pub struct ReplicationManager {
265    /// Replication configuration
266    config: ReplicationConfig,
267    /// Replication nodes by ID
268    nodes: HashMap<String, ReplicationNode>,
269    /// Package replicas
270    replicas: HashMap<String, Vec<ReplicaMetadata>>,
271    /// Replication operations
272    operations: VecDeque<ReplicationOperation>,
273    /// Active conflicts
274    conflicts: Vec<ReplicationConflict>,
275    /// Statistics
276    statistics: ReplicationStatistics,
277}
278
279impl ReplicationManager {
280    /// Create a new replication manager
281    pub fn new(config: ReplicationConfig) -> Self {
282        Self {
283            config,
284            nodes: HashMap::new(),
285            replicas: HashMap::new(),
286            operations: VecDeque::new(),
287            conflicts: Vec::new(),
288            statistics: ReplicationStatistics::default(),
289        }
290    }
291
292    /// Add a replication node
293    pub fn add_node(&mut self, node: ReplicationNode) -> Result<(), TorshError> {
294        if self.nodes.contains_key(&node.id) {
295            return Err(TorshError::InvalidArgument(format!(
296                "Node {} already exists",
297                node.id
298            )));
299        }
300
301        self.nodes.insert(node.id.clone(), node);
302        self.update_statistics();
303
304        Ok(())
305    }
306
307    /// Remove a replication node
308    pub fn remove_node(&mut self, node_id: &str) -> Result<(), TorshError> {
309        self.nodes
310            .remove(node_id)
311            .ok_or_else(|| TorshError::InvalidArgument(format!("Node {} not found", node_id)))?;
312
313        // Handle replica redistribution
314        self.redistribute_replicas(node_id)?;
315        self.update_statistics();
316
317        Ok(())
318    }
319
320    /// Replicate a package to all nodes
321    pub fn replicate_package(
322        &mut self,
323        package_id: &str,
324        version: &str,
325        _data: &[u8],
326    ) -> Result<(), TorshError> {
327        // Select target nodes based on replication factor
328        let target_nodes = self.select_replication_nodes(package_id)?;
329
330        // Create replication operation
331        let operation = ReplicationOperation {
332            id: uuid::Uuid::new_v4().to_string(),
333            package_id: package_id.to_string(),
334            version: version.to_string(),
335            operation_type: "Create".to_string(),
336            source_node: "primary".to_string(),
337            target_nodes: target_nodes.iter().map(|n| n.id.clone()).collect(),
338            timestamp: Utc::now(),
339            status: OperationStatus::Pending,
340        };
341
342        self.operations.push_back(operation);
343
344        // Execute replication based on strategy
345        match self.config.consistency {
346            ConsistencyLevel::Strong => self.replicate_synchronously(package_id, version)?,
347            ConsistencyLevel::Quorum => self.replicate_to_quorum(package_id, version)?,
348            ConsistencyLevel::Eventual | ConsistencyLevel::Causal => {
349                self.replicate_asynchronously(package_id, version)?
350            }
351        }
352
353        self.update_statistics();
354
355        Ok(())
356    }
357
358    /// Get package from best available replica
359    pub fn get_package(&self, package_id: &str, version: &str) -> Result<String, TorshError> {
360        let key = format!("{}:{}", package_id, version);
361
362        let replicas = self
363            .replicas
364            .get(&key)
365            .ok_or_else(|| TorshError::InvalidArgument(format!("Package {} not found", key)))?;
366
367        // Select best replica based on node health and latency
368        let best_replica = self.select_best_replica(replicas)?;
369
370        Ok(best_replica.node_id.clone())
371    }
372
373    /// Perform health check on all nodes
374    pub fn health_check(&mut self) -> Result<(), TorshError> {
375        let now = Utc::now();
376
377        // Collect node IDs first to avoid borrow checker issues
378        let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
379
380        for node_id in node_ids {
381            // Mock health check (in production, send actual health check request)
382            let is_healthy = self.check_node_health(&node_id);
383
384            if let Some(node) = self.nodes.get_mut(&node_id) {
385                node.status = if is_healthy {
386                    NodeStatus::Healthy
387                } else {
388                    NodeStatus::Unhealthy
389                };
390
391                node.last_health_check = Some(now);
392            }
393        }
394
395        self.update_statistics();
396
397        // Handle failover if auto_failover is enabled
398        if self.config.auto_failover {
399            self.handle_failover()?;
400        }
401
402        Ok(())
403    }
404
405    /// Synchronize replicas across nodes
406    pub fn synchronize(&mut self) -> Result<(), TorshError> {
407        // Identify replicas that are out of sync
408        let mut to_sync = Vec::new();
409
410        for (key, replicas) in &self.replicas {
411            let max_version = replicas
412                .iter()
413                .map(|r| r.replica_version)
414                .max()
415                .unwrap_or(0);
416
417            for replica in replicas {
418                if replica.replica_version < max_version {
419                    to_sync.push((key.clone(), replica.node_id.clone()));
420                }
421            }
422        }
423
424        // Synchronize out-of-sync replicas
425        for (key, node_id) in to_sync {
426            self.sync_replica(&key, &node_id)?;
427        }
428
429        Ok(())
430    }
431
432    /// Detect and resolve conflicts
433    pub fn resolve_conflicts(&mut self) -> Result<(), TorshError> {
434        let mut replicas_to_propagate = Vec::new();
435
436        // Collect replicas to propagate and mark conflicts as resolved
437        for conflict in &mut self.conflicts {
438            if !conflict.resolved {
439                // Apply conflict resolution strategy
440                let strategy = ConflictResolution::LastWriteWins; // Use configured strategy
441
442                match strategy {
443                    ConflictResolution::LastWriteWins => {
444                        // Keep replica with latest timestamp
445                        if let Some(latest) = conflict
446                            .conflicting_replicas
447                            .iter()
448                            .max_by_key(|r| r.last_sync)
449                            .cloned()
450                        {
451                            replicas_to_propagate.push(latest);
452                            conflict.resolved = true;
453                            conflict.resolution_strategy = Some(strategy);
454                        }
455                    }
456                    ConflictResolution::FirstWriteWins => {
457                        // Keep replica with earliest timestamp
458                        if let Some(earliest) = conflict
459                            .conflicting_replicas
460                            .iter()
461                            .min_by_key(|r| r.last_sync)
462                            .cloned()
463                        {
464                            replicas_to_propagate.push(earliest);
465                            conflict.resolved = true;
466                            conflict.resolution_strategy = Some(strategy);
467                        }
468                    }
469                    ConflictResolution::Custom | ConflictResolution::Manual => {
470                        // Require manual intervention
471                    }
472                }
473            }
474        }
475
476        // Propagate replicas after collecting them
477        for replica in replicas_to_propagate {
478            self.propagate_replica(&replica)?;
479        }
480
481        // Remove resolved conflicts
482        self.conflicts.retain(|c| !c.resolved);
483
484        Ok(())
485    }
486
487    /// Get replication statistics
488    pub fn get_statistics(&self) -> &ReplicationStatistics {
489        &self.statistics
490    }
491
492    /// Get node status
493    pub fn get_node_status(&self, node_id: &str) -> Option<NodeStatus> {
494        self.nodes.get(node_id).map(|n| n.status)
495    }
496
497    /// List all nodes
498    pub fn list_nodes(&self) -> Vec<&ReplicationNode> {
499        self.nodes.values().collect()
500    }
501
502    /// List replicas for a package
503    pub fn list_replicas(&self, package_id: &str, version: &str) -> Vec<&ReplicaMetadata> {
504        let key = format!("{}:{}", package_id, version);
505        self.replicas
506            .get(&key)
507            .map(|replicas| replicas.iter().collect())
508            .unwrap_or_default()
509    }
510
511    /// Get active conflicts
512    pub fn get_conflicts(&self) -> Vec<&ReplicationConflict> {
513        self.conflicts.iter().collect()
514    }
515
516    // Private helper methods
517
518    fn select_replication_nodes(
519        &self,
520        _package_id: &str,
521    ) -> Result<Vec<&ReplicationNode>, TorshError> {
522        let healthy_nodes: Vec<&ReplicationNode> = self
523            .nodes
524            .values()
525            .filter(|n| n.status == NodeStatus::Healthy)
526            .collect();
527
528        if healthy_nodes.is_empty() {
529            return Err(TorshError::RuntimeError(
530                "No healthy nodes available".to_string(),
531            ));
532        }
533
534        // Select nodes based on priority and capacity
535        let mut selected: Vec<&ReplicationNode> = healthy_nodes
536            .into_iter()
537            .take(self.config.replication_factor)
538            .collect();
539
540        // Sort by priority
541        selected.sort_by(|a, b| b.priority.cmp(&a.priority));
542
543        Ok(selected)
544    }
545
546    fn replicate_synchronously(
547        &mut self,
548        _package_id: &str,
549        _version: &str,
550    ) -> Result<(), TorshError> {
551        // Mock synchronous replication - mark last operation as completed
552        if let Some(op) = self.operations.back_mut() {
553            op.status = OperationStatus::Completed;
554        }
555        Ok(())
556    }
557
558    fn replicate_to_quorum(&mut self, _package_id: &str, _version: &str) -> Result<(), TorshError> {
559        // Mock quorum replication - mark last operation as completed
560        if let Some(op) = self.operations.back_mut() {
561            op.status = OperationStatus::Completed;
562        }
563        Ok(())
564    }
565
566    fn replicate_asynchronously(
567        &mut self,
568        _package_id: &str,
569        _version: &str,
570    ) -> Result<(), TorshError> {
571        // Mock asynchronous replication - mark last operation as completed
572        if let Some(op) = self.operations.back_mut() {
573            op.status = OperationStatus::Completed;
574        }
575        Ok(())
576    }
577
578    fn select_best_replica<'a>(
579        &self,
580        replicas: &'a [ReplicaMetadata],
581    ) -> Result<&'a ReplicaMetadata, TorshError> {
582        // Select replica on healthiest node with lowest lag
583        replicas
584            .iter()
585            .filter(|r| {
586                self.nodes
587                    .get(&r.node_id)
588                    .map(|n| n.status == NodeStatus::Healthy)
589                    .unwrap_or(false)
590            })
591            .min_by(|a, b| {
592                let a_lag = self
593                    .nodes
594                    .get(&a.node_id)
595                    .map(|n| n.replication_lag_secs)
596                    .unwrap_or(f64::MAX);
597                let b_lag = self
598                    .nodes
599                    .get(&b.node_id)
600                    .map(|n| n.replication_lag_secs)
601                    .unwrap_or(f64::MAX);
602                a_lag
603                    .partial_cmp(&b_lag)
604                    .expect("replication lag comparison should succeed (f64::MAX is valid)")
605            })
606            .ok_or_else(|| TorshError::RuntimeError("No healthy replicas".to_string()))
607    }
608
609    fn check_node_health(&self, _node_id: &str) -> bool {
610        // Mock health check (in production, perform actual health check)
611        true
612    }
613
614    fn handle_failover(&mut self) -> Result<(), TorshError> {
615        let unhealthy_nodes: Vec<String> = self
616            .nodes
617            .iter()
618            .filter(|(_, n)| n.status == NodeStatus::Unhealthy)
619            .map(|(id, _)| id.clone())
620            .collect();
621
622        for node_id in unhealthy_nodes {
623            // Redistribute replicas from unhealthy node
624            self.redistribute_replicas(&node_id)?;
625        }
626
627        Ok(())
628    }
629
630    fn redistribute_replicas(&mut self, node_id: &str) -> Result<(), TorshError> {
631        // Find replicas on the removed/failed node
632        let mut replicas_to_move = Vec::new();
633
634        for (key, replicas) in &self.replicas {
635            if replicas.iter().any(|r| r.node_id == node_id) {
636                replicas_to_move.push(key.clone());
637            }
638        }
639
640        // Replicate to other healthy nodes
641        for key in replicas_to_move {
642            if let Some(replicas) = self.replicas.get_mut(&key) {
643                replicas.retain(|r| r.node_id != node_id);
644
645                // If below replication factor, create new replica
646                if replicas.len() < self.config.replication_factor {
647                    // Mock: add new replica on another node
648                    // In production, actually replicate the data
649                }
650            }
651        }
652
653        Ok(())
654    }
655
656    fn sync_replica(&mut self, _key: &str, _node_id: &str) -> Result<(), TorshError> {
657        // Mock synchronization (in production, perform actual sync)
658        Ok(())
659    }
660
661    fn propagate_replica(&mut self, _replica: &ReplicaMetadata) -> Result<(), TorshError> {
662        // Mock propagation (in production, propagate to all nodes)
663        Ok(())
664    }
665
666    fn update_statistics(&mut self) {
667        let mut stats = ReplicationStatistics::default();
668
669        stats.total_nodes = self.nodes.len();
670        stats.healthy_nodes = self
671            .nodes
672            .values()
673            .filter(|n| n.status == NodeStatus::Healthy)
674            .count();
675
676        stats.total_replicas = self.replicas.values().map(|v| v.len()).sum();
677
678        stats.total_operations = self.operations.len() as u64;
679        stats.successful_operations = self
680            .operations
681            .iter()
682            .filter(|op| op.status == OperationStatus::Completed)
683            .count() as u64;
684        stats.failed_operations = self
685            .operations
686            .iter()
687            .filter(|op| op.status == OperationStatus::Failed)
688            .count() as u64;
689
690        stats.active_conflicts = self.conflicts.len();
691
692        // Calculate average replication lag
693        let total_lag: f64 = self.nodes.values().map(|n| n.replication_lag_secs).sum();
694        stats.avg_replication_lag_secs = if !self.nodes.is_empty() {
695            total_lag / self.nodes.len() as f64
696        } else {
697            0.0
698        };
699
700        self.statistics = stats;
701    }
702}
703
704#[cfg(test)]
705mod tests {
706    use super::*;
707
708    fn create_test_config() -> ReplicationConfig {
709        ReplicationConfig {
710            consistency: ConsistencyLevel::Eventual,
711            replication_factor: 3,
712            auto_failover: true,
713            sync_interval_secs: 60,
714        }
715    }
716
717    #[test]
718    fn test_replication_manager_creation() {
719        let config = create_test_config();
720        let manager = ReplicationManager::new(config);
721        let stats = manager.get_statistics();
722        assert_eq!(stats.total_nodes, 0);
723    }
724
725    #[test]
726    fn test_add_node() {
727        let config = create_test_config();
728        let mut manager = ReplicationManager::new(config);
729
730        let node = ReplicationNode::new(
731            "node1".to_string(),
732            "us-east-1".to_string(),
733            "https://node1.example.com".to_string(),
734            1,
735            1024 * 1024 * 1024,
736        );
737
738        manager.add_node(node).unwrap();
739
740        let stats = manager.get_statistics();
741        assert_eq!(stats.total_nodes, 1);
742        assert_eq!(stats.healthy_nodes, 1);
743    }
744
745    #[test]
746    fn test_remove_node() {
747        let config = create_test_config();
748        let mut manager = ReplicationManager::new(config);
749
750        let node = ReplicationNode::new(
751            "node1".to_string(),
752            "us-east-1".to_string(),
753            "https://node1.example.com".to_string(),
754            1,
755            1024 * 1024 * 1024,
756        );
757
758        manager.add_node(node).unwrap();
759        assert_eq!(manager.get_statistics().total_nodes, 1);
760
761        manager.remove_node("node1").unwrap();
762        assert_eq!(manager.get_statistics().total_nodes, 0);
763    }
764
765    #[test]
766    fn test_replicate_package() {
767        let config = create_test_config();
768        let mut manager = ReplicationManager::new(config);
769
770        // Add nodes
771        for i in 1..=3 {
772            let node = ReplicationNode::new(
773                format!("node{}", i),
774                "us-east-1".to_string(),
775                format!("https://node{}.example.com", i),
776                i,
777                1024 * 1024 * 1024,
778            );
779            manager.add_node(node).unwrap();
780        }
781
782        // Replicate package
783        let result = manager.replicate_package("test-pkg", "1.0.0", b"data");
784        assert!(result.is_ok());
785
786        let stats = manager.get_statistics();
787        assert!(stats.successful_operations > 0);
788    }
789
790    #[test]
791    fn test_health_check() {
792        let config = create_test_config();
793        let mut manager = ReplicationManager::new(config);
794
795        let node = ReplicationNode::new(
796            "node1".to_string(),
797            "us-east-1".to_string(),
798            "https://node1.example.com".to_string(),
799            1,
800            1024 * 1024 * 1024,
801        );
802
803        manager.add_node(node).unwrap();
804
805        manager.health_check().unwrap();
806
807        let status = manager.get_node_status("node1");
808        assert!(status.is_some());
809    }
810
811    #[test]
812    fn test_list_nodes() {
813        let config = create_test_config();
814        let mut manager = ReplicationManager::new(config);
815
816        for i in 1..=3 {
817            let node = ReplicationNode::new(
818                format!("node{}", i),
819                "us-east-1".to_string(),
820                format!("https://node{}.example.com", i),
821                i,
822                1024 * 1024 * 1024,
823            );
824            manager.add_node(node).unwrap();
825        }
826
827        let nodes = manager.list_nodes();
828        assert_eq!(nodes.len(), 3);
829    }
830
831    #[test]
832    fn test_consistency_levels() {
833        let configs = vec![
834            ConsistencyLevel::Eventual,
835            ConsistencyLevel::Quorum,
836            ConsistencyLevel::Strong,
837            ConsistencyLevel::Causal,
838        ];
839
840        for consistency in configs {
841            let config = ReplicationConfig {
842                consistency,
843                replication_factor: 3,
844                auto_failover: true,
845                sync_interval_secs: 60,
846            };
847
848            let manager = ReplicationManager::new(config);
849            assert_eq!(manager.config.consistency, consistency);
850        }
851    }
852
853    #[test]
854    fn test_replication_statistics() {
855        let config = create_test_config();
856        let mut manager = ReplicationManager::new(config);
857
858        // Add nodes
859        for i in 1..=5 {
860            let node = ReplicationNode::new(
861                format!("node{}", i),
862                "us-east-1".to_string(),
863                format!("https://node{}.example.com", i),
864                i,
865                1024 * 1024 * 1024,
866            );
867            manager.add_node(node).unwrap();
868        }
869
870        manager.replicate_package("pkg1", "1.0.0", b"data").unwrap();
871        manager.replicate_package("pkg2", "1.0.0", b"data").unwrap();
872
873        let stats = manager.get_statistics();
874        assert_eq!(stats.total_nodes, 5);
875        assert_eq!(stats.healthy_nodes, 5);
876        assert!(stats.successful_operations >= 2);
877    }
878}