forge_orchestration/federation/
replication.rs1use std::collections::{HashMap, HashSet};
9use std::sync::Arc;
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use tracing::{debug, info, warn};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub enum ReplicationPolicy {
17 None,
19 Explicit { regions: Vec<String> },
21 Count { count: usize },
23 All,
25 Topology { key: String, count_per_key: usize },
27}
28
29impl Default for ReplicationPolicy {
30 fn default() -> Self {
31 Self::None
32 }
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct ReplicationStatus {
38 pub resource_id: String,
40 pub primary_region: String,
42 pub replica_regions: Vec<String>,
44 pub region_states: HashMap<String, ReplicaState>,
46 pub last_sync: chrono::DateTime<chrono::Utc>,
48 pub lag_ms: HashMap<String, u64>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
54pub enum ReplicaState {
55 InSync,
57 Syncing,
59 Lagging,
61 Failed,
63 Creating,
65 Deleting,
67}
68
69pub struct ReplicationController {
71 status: Arc<RwLock<HashMap<String, ReplicationStatus>>>,
73 default_policy: RwLock<ReplicationPolicy>,
75 regions: RwLock<HashSet<String>>,
77 callbacks: RwLock<Vec<Arc<dyn ReplicationCallback + Send + Sync>>>,
79}
80
81pub trait ReplicationCallback: Send + Sync {
83 fn on_replicate(&self, resource_id: &str, source: &str, target: &str);
85
86 fn on_sync_complete(&self, resource_id: &str, region: &str);
88
89 fn on_sync_failed(&self, resource_id: &str, region: &str, error: &str);
91}
92
93impl ReplicationController {
94 pub fn new() -> Self {
96 Self {
97 status: Arc::new(RwLock::new(HashMap::new())),
98 default_policy: RwLock::new(ReplicationPolicy::None),
99 regions: RwLock::new(HashSet::new()),
100 callbacks: RwLock::new(Vec::new()),
101 }
102 }
103
104 pub fn set_default_policy(&self, policy: ReplicationPolicy) {
106 *self.default_policy.write() = policy;
107 }
108
109 pub fn register_region(&self, region: impl Into<String>) {
111 self.regions.write().insert(region.into());
112 }
113
114 pub fn unregister_region(&self, region: &str) {
116 self.regions.write().remove(region);
117 }
118
119 pub fn add_callback<C: ReplicationCallback + 'static>(&self, callback: C) {
121 self.callbacks.write().push(Arc::new(callback));
122 }
123
124 pub fn replicate(&self, resource_id: impl Into<String>, primary_region: impl Into<String>, policy: Option<ReplicationPolicy>) {
126 let resource_id = resource_id.into();
127 let primary_region = primary_region.into();
128 let policy = policy.unwrap_or_else(|| self.default_policy.read().clone());
129
130 let target_regions = self.resolve_target_regions(&primary_region, &policy);
131
132 if target_regions.is_empty() {
133 debug!(resource_id = %resource_id, "No replication targets");
134 return;
135 }
136
137 info!(
138 resource_id = %resource_id,
139 primary = %primary_region,
140 targets = ?target_regions,
141 "Starting replication"
142 );
143
144 let mut region_states = HashMap::new();
146 region_states.insert(primary_region.clone(), ReplicaState::InSync);
147
148 for region in &target_regions {
149 region_states.insert(region.clone(), ReplicaState::Creating);
150 }
151
152 let status = ReplicationStatus {
153 resource_id: resource_id.clone(),
154 primary_region: primary_region.clone(),
155 replica_regions: target_regions.clone(),
156 region_states,
157 last_sync: chrono::Utc::now(),
158 lag_ms: HashMap::new(),
159 };
160
161 self.status.write().insert(resource_id.clone(), status);
162
163 let callbacks = self.callbacks.read();
165 for region in &target_regions {
166 for callback in callbacks.iter() {
167 callback.on_replicate(&resource_id, &primary_region, region);
168 }
169 }
170 }
171
172 fn resolve_target_regions(&self, primary: &str, policy: &ReplicationPolicy) -> Vec<String> {
174 let regions = self.regions.read();
175 let available: Vec<_> = regions.iter()
176 .filter(|r| *r != primary)
177 .cloned()
178 .collect();
179
180 match policy {
181 ReplicationPolicy::None => Vec::new(),
182 ReplicationPolicy::Explicit { regions: targets } => {
183 targets.iter()
184 .filter(|r| available.contains(r))
185 .cloned()
186 .collect()
187 }
188 ReplicationPolicy::Count { count } => {
189 available.into_iter().take(*count).collect()
190 }
191 ReplicationPolicy::All => available,
192 ReplicationPolicy::Topology { key, count_per_key } => {
193 available.into_iter().take(*count_per_key).collect()
195 }
196 }
197 }
198
199 pub fn update_state(&self, resource_id: &str, region: &str, state: ReplicaState) {
201 let mut statuses = self.status.write();
202
203 if let Some(status) = statuses.get_mut(resource_id) {
204 let old_state = status.region_states.get(region).copied();
205 status.region_states.insert(region.to_string(), state);
206
207 if old_state != Some(state) {
209 let callbacks = self.callbacks.read();
210
211 match state {
212 ReplicaState::InSync => {
213 for callback in callbacks.iter() {
214 callback.on_sync_complete(resource_id, region);
215 }
216 }
217 ReplicaState::Failed => {
218 for callback in callbacks.iter() {
219 callback.on_sync_failed(resource_id, region, "Replication failed");
220 }
221 }
222 _ => {}
223 }
224 }
225 }
226 }
227
228 pub fn update_lag(&self, resource_id: &str, region: &str, lag_ms: u64) {
230 let mut statuses = self.status.write();
231
232 if let Some(status) = statuses.get_mut(resource_id) {
233 status.lag_ms.insert(region.to_string(), lag_ms);
234 status.last_sync = chrono::Utc::now();
235
236 let state = if lag_ms == 0 {
238 ReplicaState::InSync
239 } else if lag_ms < 1000 {
240 ReplicaState::Syncing
241 } else {
242 ReplicaState::Lagging
243 };
244
245 status.region_states.insert(region.to_string(), state);
246 }
247 }
248
249 pub fn get_status(&self, resource_id: &str) -> Option<ReplicationStatus> {
251 self.status.read().get(resource_id).cloned()
252 }
253
254 pub fn list_replicated(&self) -> Vec<ReplicationStatus> {
256 self.status.read().values().cloned().collect()
257 }
258
259 pub fn stop_replication(&self, resource_id: &str) {
261 if let Some(mut status) = self.status.write().remove(resource_id) {
262 info!(resource_id = %resource_id, "Stopping replication");
263
264 for (region, state) in status.region_states.iter_mut() {
266 if region != &status.primary_region {
267 *state = ReplicaState::Deleting;
268 }
269 }
270 }
271 }
272
273 pub fn promote(&self, resource_id: &str, new_primary: &str) -> bool {
275 let mut statuses = self.status.write();
276
277 if let Some(status) = statuses.get_mut(resource_id) {
278 if !status.replica_regions.contains(&new_primary.to_string())
279 && status.primary_region != new_primary {
280 warn!(
281 resource_id = %resource_id,
282 new_primary = %new_primary,
283 "Cannot promote: region is not a replica"
284 );
285 return false;
286 }
287
288 let old_primary = status.primary_region.clone();
289 status.primary_region = new_primary.to_string();
290
291 status.replica_regions.retain(|r| r != new_primary);
293 if !status.replica_regions.contains(&old_primary) {
294 status.replica_regions.push(old_primary);
295 }
296
297 info!(
298 resource_id = %resource_id,
299 old_primary = %status.replica_regions.last().unwrap_or(&String::new()),
300 new_primary = %new_primary,
301 "Promoted replica to primary"
302 );
303
304 return true;
305 }
306
307 false
308 }
309
310 pub fn is_replicated_to(&self, resource_id: &str, region: &str) -> bool {
312 self.status.read()
313 .get(resource_id)
314 .map(|s| s.primary_region == region || s.replica_regions.contains(®ion.to_string()))
315 .unwrap_or(false)
316 }
317
318 pub fn healthy_replicas(&self, resource_id: &str) -> Vec<String> {
320 self.status.read()
321 .get(resource_id)
322 .map(|s| {
323 let mut healthy = vec![s.primary_region.clone()];
324 healthy.extend(
325 s.region_states.iter()
326 .filter(|(r, state)| {
327 *r != &s.primary_region && **state == ReplicaState::InSync
328 })
329 .map(|(r, _)| r.clone())
330 );
331 healthy
332 })
333 .unwrap_or_default()
334 }
335}
336
337impl Default for ReplicationController {
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346
347 #[test]
348 fn test_replication_policy() {
349 let controller = ReplicationController::new();
350
351 controller.register_region("us-east-1");
352 controller.register_region("us-west-2");
353 controller.register_region("eu-west-1");
354
355 controller.replicate(
356 "resource-1",
357 "us-east-1",
358 Some(ReplicationPolicy::Count { count: 2 })
359 );
360
361 let status = controller.get_status("resource-1").unwrap();
362 assert_eq!(status.primary_region, "us-east-1");
363 assert_eq!(status.replica_regions.len(), 2);
364 }
365
366 #[test]
367 fn test_replica_promotion() {
368 let controller = ReplicationController::new();
369
370 controller.register_region("us-east-1");
371 controller.register_region("eu-west-1");
372
373 controller.replicate(
374 "resource-1",
375 "us-east-1",
376 Some(ReplicationPolicy::All)
377 );
378
379 controller.update_state("resource-1", "eu-west-1", ReplicaState::InSync);
381
382 assert!(controller.promote("resource-1", "eu-west-1"));
384
385 let status = controller.get_status("resource-1").unwrap();
386 assert_eq!(status.primary_region, "eu-west-1");
387 assert!(status.replica_regions.contains(&"us-east-1".to_string()));
388 }
389
390 #[test]
391 fn test_healthy_replicas() {
392 let controller = ReplicationController::new();
393
394 controller.register_region("us-east-1");
395 controller.register_region("us-west-2");
396 controller.register_region("eu-west-1");
397
398 controller.replicate(
399 "resource-1",
400 "us-east-1",
401 Some(ReplicationPolicy::All)
402 );
403
404 controller.update_state("resource-1", "us-west-2", ReplicaState::InSync);
405 controller.update_state("resource-1", "eu-west-1", ReplicaState::Failed);
406
407 let healthy = controller.healthy_replicas("resource-1");
408 assert!(healthy.contains(&"us-east-1".to_string()));
409 assert!(healthy.contains(&"us-west-2".to_string()));
410 assert!(!healthy.contains(&"eu-west-1".to_string()));
411 }
412}