forge_orchestration/federation/
replication.rs

1//! Cross-region replication for data and workloads
2//!
3//! Implements replication strategies for:
4//! - Active-passive failover
5//! - Active-active multi-region
6//! - Data synchronization
7
8use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use tracing::{debug, info, warn};
13
14/// Replication policy
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub enum ReplicationPolicy {
17    /// No replication
18    None,
19    /// Replicate to specific regions
20    Explicit { regions: Vec<String> },
21    /// Replicate to N regions
22    Count { count: usize },
23    /// Replicate to all regions
24    All,
25    /// Replicate based on topology (e.g., one per zone)
26    Topology { key: String, count_per_key: usize },
27}
28
29impl Default for ReplicationPolicy {
30    fn default() -> Self {
31        Self::None
32    }
33}
34
35/// Replication status for a resource
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct ReplicationStatus {
38    /// Resource ID
39    pub resource_id: String,
40    /// Primary region
41    pub primary_region: String,
42    /// Replica regions
43    pub replica_regions: Vec<String>,
44    /// Replication state per region
45    pub region_states: HashMap<String, ReplicaState>,
46    /// Last sync time
47    pub last_sync: chrono::DateTime<chrono::Utc>,
48    /// Replication lag in milliseconds
49    pub lag_ms: HashMap<String, u64>,
50}
51
52/// State of a replica
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
54pub enum ReplicaState {
55    /// Replica is in sync
56    InSync,
57    /// Replica is syncing
58    Syncing,
59    /// Replica is lagging
60    Lagging,
61    /// Replica has failed
62    Failed,
63    /// Replica is being created
64    Creating,
65    /// Replica is being deleted
66    Deleting,
67}
68
69/// Replication controller for managing cross-region replication
70pub struct ReplicationController {
71    /// Replication status by resource
72    status: Arc<RwLock<HashMap<String, ReplicationStatus>>>,
73    /// Default replication policy
74    default_policy: RwLock<ReplicationPolicy>,
75    /// Available regions
76    regions: RwLock<HashSet<String>>,
77    /// Replication callbacks
78    callbacks: RwLock<Vec<Arc<dyn ReplicationCallback + Send + Sync>>>,
79}
80
81/// Callback for replication events
82pub trait ReplicationCallback: Send + Sync {
83    /// Called when replication is initiated
84    fn on_replicate(&self, resource_id: &str, source: &str, target: &str);
85    
86    /// Called when replication completes
87    fn on_sync_complete(&self, resource_id: &str, region: &str);
88    
89    /// Called when replication fails
90    fn on_sync_failed(&self, resource_id: &str, region: &str, error: &str);
91}
92
93impl ReplicationController {
94    /// Create new replication controller
95    pub fn new() -> Self {
96        Self {
97            status: Arc::new(RwLock::new(HashMap::new())),
98            default_policy: RwLock::new(ReplicationPolicy::None),
99            regions: RwLock::new(HashSet::new()),
100            callbacks: RwLock::new(Vec::new()),
101        }
102    }
103
104    /// Set default replication policy
105    pub fn set_default_policy(&self, policy: ReplicationPolicy) {
106        *self.default_policy.write() = policy;
107    }
108
109    /// Register available region
110    pub fn register_region(&self, region: impl Into<String>) {
111        self.regions.write().insert(region.into());
112    }
113
114    /// Unregister region
115    pub fn unregister_region(&self, region: &str) {
116        self.regions.write().remove(region);
117    }
118
119    /// Add replication callback
120    pub fn add_callback<C: ReplicationCallback + 'static>(&self, callback: C) {
121        self.callbacks.write().push(Arc::new(callback));
122    }
123
124    /// Start replication for a resource
125    pub fn replicate(&self, resource_id: impl Into<String>, primary_region: impl Into<String>, policy: Option<ReplicationPolicy>) {
126        let resource_id = resource_id.into();
127        let primary_region = primary_region.into();
128        let policy = policy.unwrap_or_else(|| self.default_policy.read().clone());
129
130        let target_regions = self.resolve_target_regions(&primary_region, &policy);
131        
132        if target_regions.is_empty() {
133            debug!(resource_id = %resource_id, "No replication targets");
134            return;
135        }
136
137        info!(
138            resource_id = %resource_id,
139            primary = %primary_region,
140            targets = ?target_regions,
141            "Starting replication"
142        );
143
144        // Initialize status
145        let mut region_states = HashMap::new();
146        region_states.insert(primary_region.clone(), ReplicaState::InSync);
147        
148        for region in &target_regions {
149            region_states.insert(region.clone(), ReplicaState::Creating);
150        }
151
152        let status = ReplicationStatus {
153            resource_id: resource_id.clone(),
154            primary_region: primary_region.clone(),
155            replica_regions: target_regions.clone(),
156            region_states,
157            last_sync: chrono::Utc::now(),
158            lag_ms: HashMap::new(),
159        };
160
161        self.status.write().insert(resource_id.clone(), status);
162
163        // Notify callbacks
164        let callbacks = self.callbacks.read();
165        for region in &target_regions {
166            for callback in callbacks.iter() {
167                callback.on_replicate(&resource_id, &primary_region, region);
168            }
169        }
170    }
171
172    /// Resolve target regions based on policy
173    fn resolve_target_regions(&self, primary: &str, policy: &ReplicationPolicy) -> Vec<String> {
174        let regions = self.regions.read();
175        let available: Vec<_> = regions.iter()
176            .filter(|r| *r != primary)
177            .cloned()
178            .collect();
179
180        match policy {
181            ReplicationPolicy::None => Vec::new(),
182            ReplicationPolicy::Explicit { regions: targets } => {
183                targets.iter()
184                    .filter(|r| available.contains(r))
185                    .cloned()
186                    .collect()
187            }
188            ReplicationPolicy::Count { count } => {
189                available.into_iter().take(*count).collect()
190            }
191            ReplicationPolicy::All => available,
192            ReplicationPolicy::Topology { key, count_per_key } => {
193                // Simplified - in real implementation would check topology labels
194                available.into_iter().take(*count_per_key).collect()
195            }
196        }
197    }
198
199    /// Update replica state
200    pub fn update_state(&self, resource_id: &str, region: &str, state: ReplicaState) {
201        let mut statuses = self.status.write();
202        
203        if let Some(status) = statuses.get_mut(resource_id) {
204            let old_state = status.region_states.get(region).copied();
205            status.region_states.insert(region.to_string(), state);
206
207            // Notify on state changes
208            if old_state != Some(state) {
209                let callbacks = self.callbacks.read();
210                
211                match state {
212                    ReplicaState::InSync => {
213                        for callback in callbacks.iter() {
214                            callback.on_sync_complete(resource_id, region);
215                        }
216                    }
217                    ReplicaState::Failed => {
218                        for callback in callbacks.iter() {
219                            callback.on_sync_failed(resource_id, region, "Replication failed");
220                        }
221                    }
222                    _ => {}
223                }
224            }
225        }
226    }
227
228    /// Update replication lag
229    pub fn update_lag(&self, resource_id: &str, region: &str, lag_ms: u64) {
230        let mut statuses = self.status.write();
231        
232        if let Some(status) = statuses.get_mut(resource_id) {
233            status.lag_ms.insert(region.to_string(), lag_ms);
234            status.last_sync = chrono::Utc::now();
235
236            // Update state based on lag
237            let state = if lag_ms == 0 {
238                ReplicaState::InSync
239            } else if lag_ms < 1000 {
240                ReplicaState::Syncing
241            } else {
242                ReplicaState::Lagging
243            };
244
245            status.region_states.insert(region.to_string(), state);
246        }
247    }
248
249    /// Get replication status
250    pub fn get_status(&self, resource_id: &str) -> Option<ReplicationStatus> {
251        self.status.read().get(resource_id).cloned()
252    }
253
254    /// Get all replicated resources
255    pub fn list_replicated(&self) -> Vec<ReplicationStatus> {
256        self.status.read().values().cloned().collect()
257    }
258
259    /// Stop replication for a resource
260    pub fn stop_replication(&self, resource_id: &str) {
261        if let Some(mut status) = self.status.write().remove(resource_id) {
262            info!(resource_id = %resource_id, "Stopping replication");
263            
264            // Mark all replicas as deleting
265            for (region, state) in status.region_states.iter_mut() {
266                if region != &status.primary_region {
267                    *state = ReplicaState::Deleting;
268                }
269            }
270        }
271    }
272
273    /// Promote a replica to primary
274    pub fn promote(&self, resource_id: &str, new_primary: &str) -> bool {
275        let mut statuses = self.status.write();
276        
277        if let Some(status) = statuses.get_mut(resource_id) {
278            if !status.replica_regions.contains(&new_primary.to_string()) 
279                && status.primary_region != new_primary {
280                warn!(
281                    resource_id = %resource_id,
282                    new_primary = %new_primary,
283                    "Cannot promote: region is not a replica"
284                );
285                return false;
286            }
287
288            let old_primary = status.primary_region.clone();
289            status.primary_region = new_primary.to_string();
290            
291            // Update replica list
292            status.replica_regions.retain(|r| r != new_primary);
293            if !status.replica_regions.contains(&old_primary) {
294                status.replica_regions.push(old_primary);
295            }
296
297            info!(
298                resource_id = %resource_id,
299                old_primary = %status.replica_regions.last().unwrap_or(&String::new()),
300                new_primary = %new_primary,
301                "Promoted replica to primary"
302            );
303
304            return true;
305        }
306
307        false
308    }
309
310    /// Check if resource is replicated to a region
311    pub fn is_replicated_to(&self, resource_id: &str, region: &str) -> bool {
312        self.status.read()
313            .get(resource_id)
314            .map(|s| s.primary_region == region || s.replica_regions.contains(&region.to_string()))
315            .unwrap_or(false)
316    }
317
318    /// Get healthy replicas for a resource
319    pub fn healthy_replicas(&self, resource_id: &str) -> Vec<String> {
320        self.status.read()
321            .get(resource_id)
322            .map(|s| {
323                let mut healthy = vec![s.primary_region.clone()];
324                healthy.extend(
325                    s.region_states.iter()
326                        .filter(|(r, state)| {
327                            *r != &s.primary_region && **state == ReplicaState::InSync
328                        })
329                        .map(|(r, _)| r.clone())
330                );
331                healthy
332            })
333            .unwrap_or_default()
334    }
335}
336
337impl Default for ReplicationController {
338    fn default() -> Self {
339        Self::new()
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    #[test]
348    fn test_replication_policy() {
349        let controller = ReplicationController::new();
350        
351        controller.register_region("us-east-1");
352        controller.register_region("us-west-2");
353        controller.register_region("eu-west-1");
354
355        controller.replicate(
356            "resource-1",
357            "us-east-1",
358            Some(ReplicationPolicy::Count { count: 2 })
359        );
360
361        let status = controller.get_status("resource-1").unwrap();
362        assert_eq!(status.primary_region, "us-east-1");
363        assert_eq!(status.replica_regions.len(), 2);
364    }
365
366    #[test]
367    fn test_replica_promotion() {
368        let controller = ReplicationController::new();
369        
370        controller.register_region("us-east-1");
371        controller.register_region("eu-west-1");
372
373        controller.replicate(
374            "resource-1",
375            "us-east-1",
376            Some(ReplicationPolicy::All)
377        );
378
379        // Mark replica as in sync
380        controller.update_state("resource-1", "eu-west-1", ReplicaState::InSync);
381
382        // Promote
383        assert!(controller.promote("resource-1", "eu-west-1"));
384
385        let status = controller.get_status("resource-1").unwrap();
386        assert_eq!(status.primary_region, "eu-west-1");
387        assert!(status.replica_regions.contains(&"us-east-1".to_string()));
388    }
389
390    #[test]
391    fn test_healthy_replicas() {
392        let controller = ReplicationController::new();
393        
394        controller.register_region("us-east-1");
395        controller.register_region("us-west-2");
396        controller.register_region("eu-west-1");
397
398        controller.replicate(
399            "resource-1",
400            "us-east-1",
401            Some(ReplicationPolicy::All)
402        );
403
404        controller.update_state("resource-1", "us-west-2", ReplicaState::InSync);
405        controller.update_state("resource-1", "eu-west-1", ReplicaState::Failed);
406
407        let healthy = controller.healthy_replicas("resource-1");
408        assert!(healthy.contains(&"us-east-1".to_string()));
409        assert!(healthy.contains(&"us-west-2".to_string()));
410        assert!(!healthy.contains(&"eu-west-1".to_string()));
411    }
412}