1use super::crdt::{ConflictResolution, CrdtResolver, ReplicatedEvent, VersionVector};
28use super::hlc::{HlcTimestamp, HybridLogicalClock};
29use dashmap::DashMap;
30use serde::{Deserialize, Serialize};
31use std::{sync::Arc, time::Duration};
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct GeoReplicationConfig {
36 pub region_id: String,
38 pub peers: Vec<PeerRegion>,
40 pub sync_interval_ms: u64,
42 pub max_clock_drift_ms: u64,
44 pub batch_size: usize,
46}
47
48impl Default for GeoReplicationConfig {
49 fn default() -> Self {
50 Self {
51 region_id: "default".to_string(),
52 peers: vec![],
53 sync_interval_ms: 1000,
54 max_clock_drift_ms: 60_000,
55 batch_size: 100,
56 }
57 }
58}
59
60impl GeoReplicationConfig {
61 pub fn from_env() -> Option<Self> {
63 let enabled = std::env::var("ALLSOURCE_GEO_REPLICATION_ENABLED")
64 .map(|v| v == "true")
65 .unwrap_or(false);
66
67 if !enabled {
68 return None;
69 }
70
71 let region_id =
72 std::env::var("ALLSOURCE_REGION_ID").unwrap_or_else(|_| "default".to_string());
73
74 let peers_str = std::env::var("ALLSOURCE_GEO_PEERS").unwrap_or_default();
75 let peers: Vec<PeerRegion> = peers_str
76 .split(',')
77 .filter(|s| !s.trim().is_empty())
78 .enumerate()
79 .map(|(i, url)| PeerRegion {
80 region_id: format!("peer-{i}"),
81 api_url: url.trim().to_string(),
82 healthy: true,
83 last_sync_ms: 0,
84 })
85 .collect();
86
87 let sync_interval_ms: u64 = std::env::var("ALLSOURCE_GEO_SYNC_INTERVAL_MS")
88 .ok()
89 .and_then(|v| v.parse().ok())
90 .unwrap_or(1000);
91
92 let max_clock_drift_ms: u64 = std::env::var("ALLSOURCE_GEO_MAX_DRIFT_MS")
93 .ok()
94 .and_then(|v| v.parse().ok())
95 .unwrap_or(60_000);
96
97 let batch_size: usize = std::env::var("ALLSOURCE_GEO_BATCH_SIZE")
98 .ok()
99 .and_then(|v| v.parse().ok())
100 .unwrap_or(100);
101
102 Some(Self {
103 region_id,
104 peers,
105 sync_interval_ms,
106 max_clock_drift_ms,
107 batch_size,
108 })
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct PeerRegion {
115 pub region_id: String,
117 pub api_url: String,
119 pub healthy: bool,
121 pub last_sync_ms: u64,
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127#[serde(rename_all = "lowercase")]
128pub enum PeerHealth {
129 Healthy,
130 Degraded,
131 Unreachable,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct GeoReplicationStatus {
137 pub region_id: String,
139 pub peers: Vec<PeerStatus>,
141 pub events_sent: u64,
143 pub events_received: u64,
145 pub conflicts_resolved: u64,
147 pub current_hlc: HlcTimestamp,
149 pub version_vectors: std::collections::BTreeMap<String, VersionVector>,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct PeerStatus {
156 pub region_id: String,
157 pub api_url: String,
158 pub health: PeerHealth,
159 pub last_sync_ms: u64,
160 pub replication_lag_ms: u64,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct GeoSyncRequest {
166 pub source_region: String,
168 pub events: Vec<ReplicatedEvent>,
170 pub version_vector: VersionVector,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GeoSyncResponse {
177 pub accepted: usize,
179 pub skipped: usize,
181 pub version_vector: VersionVector,
183}
184
185pub struct GeoReplicationManager {
190 config: GeoReplicationConfig,
192 hlc: Arc<HybridLogicalClock>,
194 resolver: Arc<CrdtResolver>,
196 peer_health: DashMap<String, PeerHealth>,
198 outbound_buffer: DashMap<String, Vec<ReplicatedEvent>>,
200 events_sent: std::sync::atomic::AtomicU64,
202 events_received: std::sync::atomic::AtomicU64,
204 conflicts_resolved: std::sync::atomic::AtomicU64,
206}
207
208impl GeoReplicationManager {
209 pub fn new(config: GeoReplicationConfig) -> Self {
211 let node_id: u32 = std::env::var("ALLSOURCE_NODE_ID")
212 .ok()
213 .and_then(|v| v.parse().ok())
214 .unwrap_or(0);
215
216 let hlc = Arc::new(HybridLogicalClock::with_max_drift(
217 node_id,
218 config.max_clock_drift_ms,
219 ));
220
221 let peer_health = DashMap::new();
222 let outbound_buffer = DashMap::new();
223 for peer in &config.peers {
224 peer_health.insert(peer.region_id.clone(), PeerHealth::Healthy);
225 outbound_buffer.insert(peer.region_id.clone(), Vec::new());
226 }
227
228 Self {
229 config,
230 hlc,
231 resolver: Arc::new(CrdtResolver::new()),
232 peer_health,
233 outbound_buffer,
234 events_sent: std::sync::atomic::AtomicU64::new(0),
235 events_received: std::sync::atomic::AtomicU64::new(0),
236 conflicts_resolved: std::sync::atomic::AtomicU64::new(0),
237 }
238 }
239
240 pub fn region_id(&self) -> &str {
242 &self.config.region_id
243 }
244
245 pub fn hlc(&self) -> &Arc<HybridLogicalClock> {
247 &self.hlc
248 }
249
250 pub fn resolver(&self) -> &Arc<CrdtResolver> {
252 &self.resolver
253 }
254
255 pub fn stamp_event(&self, event_id: &str, event_data: serde_json::Value) -> ReplicatedEvent {
257 let ts = self.hlc.now();
258 ReplicatedEvent {
259 event_id: event_id.to_string(),
260 hlc_timestamp: ts,
261 origin_region: self.config.region_id.clone(),
262 event_data,
263 }
264 }
265
266 pub fn queue_for_replication(&self, event: &ReplicatedEvent) {
268 for mut buffer in self.outbound_buffer.iter_mut() {
269 buffer.value_mut().push(event.clone());
270 }
271 }
272
273 pub fn drain_outbound(&self, peer_region: &str, max_batch: usize) -> Vec<ReplicatedEvent> {
275 if let Some(mut buffer) = self.outbound_buffer.get_mut(peer_region) {
276 let drain_count = max_batch.min(buffer.len());
277 buffer.drain(..drain_count).collect()
278 } else {
279 vec![]
280 }
281 }
282
283 pub fn receive_sync(&self, request: &GeoSyncRequest) -> GeoSyncResponse {
287 let mut accepted = 0;
288 let mut skipped = 0;
289
290 for event in &request.events {
291 if let Err(e) = self.hlc.receive(&event.hlc_timestamp) {
293 tracing::warn!(
294 "HLC drift violation from region {}: {}",
295 request.source_region,
296 e,
297 );
298 skipped += 1;
299 self.conflicts_resolved
300 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
301 continue;
302 }
303
304 match self.resolver.resolve_and_accept(event) {
305 ConflictResolution::Accept => {
306 accepted += 1;
307 self.events_received
308 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
309 }
310 ConflictResolution::Skip => {
311 skipped += 1;
312 self.conflicts_resolved
313 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
314 }
315 }
316 }
317
318 self.resolver
320 .merge_version_vector(&request.source_region, &request.version_vector);
321
322 let our_vv = self
324 .resolver
325 .version_vector_for(&self.config.region_id)
326 .unwrap_or_default();
327
328 GeoSyncResponse {
329 accepted,
330 skipped,
331 version_vector: our_vv,
332 }
333 }
334
335 pub fn set_peer_health(&self, peer_region: &str, health: PeerHealth) {
337 self.peer_health.insert(peer_region.to_string(), health);
338 }
339
340 pub fn peer_health(&self, peer_region: &str) -> PeerHealth {
342 self.peer_health
343 .get(peer_region)
344 .map_or(PeerHealth::Unreachable, |h| *h)
345 }
346
347 pub fn peers(&self) -> &[PeerRegion] {
349 &self.config.peers
350 }
351
352 pub fn sync_interval(&self) -> Duration {
354 Duration::from_millis(self.config.sync_interval_ms)
355 }
356
357 pub fn batch_size(&self) -> usize {
359 self.config.batch_size
360 }
361
362 pub fn build_sync_request(&self, peer_region: &str) -> Option<GeoSyncRequest> {
364 let events = self.drain_outbound(peer_region, self.config.batch_size);
365 if events.is_empty() {
366 return None;
367 }
368
369 self.events_sent
370 .fetch_add(events.len() as u64, std::sync::atomic::Ordering::Relaxed);
371
372 let vv = self
373 .resolver
374 .version_vector_for(&self.config.region_id)
375 .unwrap_or_default();
376
377 Some(GeoSyncRequest {
378 source_region: self.config.region_id.clone(),
379 events,
380 version_vector: vv,
381 })
382 }
383
384 pub fn select_failover_region(&self) -> Option<String> {
386 self.config
387 .peers
388 .iter()
389 .filter(|p| {
390 self.peer_health
391 .get(&p.region_id)
392 .is_some_and(|h| *h == PeerHealth::Healthy)
393 })
394 .max_by_key(|p| p.last_sync_ms) .map(|p| p.region_id.clone())
396 }
397
398 pub fn status(&self) -> GeoReplicationStatus {
400 let peers: Vec<PeerStatus> = self
401 .config
402 .peers
403 .iter()
404 .map(|p| {
405 let health = self
406 .peer_health
407 .get(&p.region_id)
408 .map_or(PeerHealth::Unreachable, |h| *h);
409
410 let lag_ms = if p.last_sync_ms > 0 {
411 let now_ms = std::time::SystemTime::now()
412 .duration_since(std::time::UNIX_EPOCH)
413 .unwrap_or_default()
414 .as_millis() as u64;
415 now_ms.saturating_sub(p.last_sync_ms)
416 } else {
417 0
418 };
419
420 PeerStatus {
421 region_id: p.region_id.clone(),
422 api_url: p.api_url.clone(),
423 health,
424 last_sync_ms: p.last_sync_ms,
425 replication_lag_ms: lag_ms,
426 }
427 })
428 .collect();
429
430 GeoReplicationStatus {
431 region_id: self.config.region_id.clone(),
432 peers,
433 events_sent: self.events_sent.load(std::sync::atomic::Ordering::Relaxed),
434 events_received: self
435 .events_received
436 .load(std::sync::atomic::Ordering::Relaxed),
437 conflicts_resolved: self
438 .conflicts_resolved
439 .load(std::sync::atomic::Ordering::Relaxed),
440 current_hlc: self.hlc.current(),
441 version_vectors: self.resolver.all_version_vectors(),
442 }
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 fn test_config(region: &str) -> GeoReplicationConfig {
451 GeoReplicationConfig {
452 region_id: region.to_string(),
453 peers: vec![PeerRegion {
454 region_id: "eu-west".to_string(),
455 api_url: "https://eu.core:3900".to_string(),
456 healthy: true,
457 last_sync_ms: 0,
458 }],
459 sync_interval_ms: 1000,
460 max_clock_drift_ms: 60_000,
461 batch_size: 100,
462 }
463 }
464
465 #[test]
466 fn test_stamp_event() {
467 let mgr = GeoReplicationManager::new(test_config("us-east"));
468 let event = mgr.stamp_event("evt-1", serde_json::json!({"foo": "bar"}));
469
470 assert_eq!(event.event_id, "evt-1");
471 assert_eq!(event.origin_region, "us-east");
472 assert!(event.hlc_timestamp.physical_ms > 0);
473 }
474
475 #[test]
476 fn test_queue_and_drain() {
477 let mgr = GeoReplicationManager::new(test_config("us-east"));
478 let event = mgr.stamp_event("evt-1", serde_json::json!({}));
479
480 mgr.queue_for_replication(&event);
481
482 let batch = mgr.drain_outbound("eu-west", 10);
483 assert_eq!(batch.len(), 1);
484 assert_eq!(batch[0].event_id, "evt-1");
485
486 let batch2 = mgr.drain_outbound("eu-west", 10);
488 assert!(batch2.is_empty());
489 }
490
491 #[test]
492 fn test_receive_sync_accepts_new_events() {
493 let mgr = GeoReplicationManager::new(test_config("us-east"));
494
495 let request = GeoSyncRequest {
496 source_region: "eu-west".to_string(),
497 events: vec![ReplicatedEvent {
498 event_id: "evt-remote-1".to_string(),
499 hlc_timestamp: HlcTimestamp::new(
500 std::time::SystemTime::now()
501 .duration_since(std::time::UNIX_EPOCH)
502 .unwrap()
503 .as_millis() as u64,
504 0,
505 2,
506 ),
507 origin_region: "eu-west".to_string(),
508 event_data: serde_json::json!({"source": "eu"}),
509 }],
510 version_vector: VersionVector::new(),
511 };
512
513 let response = mgr.receive_sync(&request);
514 assert_eq!(response.accepted, 1);
515 assert_eq!(response.skipped, 0);
516 }
517
518 #[test]
519 fn test_receive_sync_skips_duplicates() {
520 let mgr = GeoReplicationManager::new(test_config("us-east"));
521 let now_ms = std::time::SystemTime::now()
522 .duration_since(std::time::UNIX_EPOCH)
523 .unwrap()
524 .as_millis() as u64;
525
526 let event = ReplicatedEvent {
527 event_id: "evt-dup".to_string(),
528 hlc_timestamp: HlcTimestamp::new(now_ms, 0, 2),
529 origin_region: "eu-west".to_string(),
530 event_data: serde_json::json!({}),
531 };
532
533 let request = GeoSyncRequest {
534 source_region: "eu-west".to_string(),
535 events: vec![event.clone(), event],
536 version_vector: VersionVector::new(),
537 };
538
539 let response = mgr.receive_sync(&request);
540 assert_eq!(response.accepted, 1);
541 assert_eq!(response.skipped, 1);
542 }
543
544 #[test]
545 fn test_build_sync_request() {
546 let mgr = GeoReplicationManager::new(test_config("us-east"));
547 let event = mgr.stamp_event("evt-1", serde_json::json!({}));
548 mgr.queue_for_replication(&event);
549
550 let req = mgr.build_sync_request("eu-west");
551 assert!(req.is_some());
552 let req = req.unwrap();
553 assert_eq!(req.source_region, "us-east");
554 assert_eq!(req.events.len(), 1);
555 }
556
557 #[test]
558 fn test_build_sync_request_empty() {
559 let mgr = GeoReplicationManager::new(test_config("us-east"));
560 let req = mgr.build_sync_request("eu-west");
561 assert!(req.is_none());
562 }
563
564 #[test]
565 fn test_peer_health_tracking() {
566 let mgr = GeoReplicationManager::new(test_config("us-east"));
567 assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Healthy);
568
569 mgr.set_peer_health("eu-west", PeerHealth::Degraded);
570 assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Degraded);
571
572 mgr.set_peer_health("eu-west", PeerHealth::Unreachable);
573 assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Unreachable);
574 }
575
576 #[test]
577 fn test_select_failover_region() {
578 let config = GeoReplicationConfig {
579 region_id: "us-east".to_string(),
580 peers: vec![
581 PeerRegion {
582 region_id: "eu-west".to_string(),
583 api_url: "https://eu.core:3900".to_string(),
584 healthy: true,
585 last_sync_ms: 100,
586 },
587 PeerRegion {
588 region_id: "ap-east".to_string(),
589 api_url: "https://ap.core:3900".to_string(),
590 healthy: true,
591 last_sync_ms: 200,
592 },
593 ],
594 ..Default::default()
595 };
596 let mgr = GeoReplicationManager::new(config);
597
598 let failover = mgr.select_failover_region();
600 assert_eq!(failover, Some("ap-east".to_string()));
601 }
602
603 #[test]
604 fn test_select_failover_skips_unhealthy() {
605 let config = GeoReplicationConfig {
606 region_id: "us-east".to_string(),
607 peers: vec![
608 PeerRegion {
609 region_id: "eu-west".to_string(),
610 api_url: "https://eu.core:3900".to_string(),
611 healthy: true,
612 last_sync_ms: 200,
613 },
614 PeerRegion {
615 region_id: "ap-east".to_string(),
616 api_url: "https://ap.core:3900".to_string(),
617 healthy: true,
618 last_sync_ms: 300,
619 },
620 ],
621 ..Default::default()
622 };
623 let mgr = GeoReplicationManager::new(config);
624 mgr.set_peer_health("ap-east", PeerHealth::Unreachable);
625
626 let failover = mgr.select_failover_region();
627 assert_eq!(failover, Some("eu-west".to_string()));
628 }
629
630 #[test]
631 fn test_status() {
632 let mgr = GeoReplicationManager::new(test_config("us-east"));
633 let status = mgr.status();
634
635 assert_eq!(status.region_id, "us-east");
636 assert_eq!(status.peers.len(), 1);
637 assert_eq!(status.events_sent, 0);
638 assert_eq!(status.events_received, 0);
639 assert_eq!(status.conflicts_resolved, 0);
640 }
641
642 #[test]
643 fn test_two_region_convergence() {
644 let us = GeoReplicationManager::new(GeoReplicationConfig {
646 region_id: "us-east".to_string(),
647 peers: vec![PeerRegion {
648 region_id: "eu-west".to_string(),
649 api_url: "http://eu:3900".to_string(),
650 healthy: true,
651 last_sync_ms: 0,
652 }],
653 ..Default::default()
654 });
655 let eu = GeoReplicationManager::new(GeoReplicationConfig {
656 region_id: "eu-west".to_string(),
657 peers: vec![PeerRegion {
658 region_id: "us-east".to_string(),
659 api_url: "http://us:3900".to_string(),
660 healthy: true,
661 last_sync_ms: 0,
662 }],
663 ..Default::default()
664 });
665
666 let evt1 = us.stamp_event("evt-1", serde_json::json!({"from": "us"}));
668 us.resolver.resolve_and_accept(&evt1);
669 us.queue_for_replication(&evt1);
670
671 let evt2 = eu.stamp_event("evt-2", serde_json::json!({"from": "eu"}));
673 eu.resolver.resolve_and_accept(&evt2);
674 eu.queue_for_replication(&evt2);
675
676 let us_req = us.build_sync_request("eu-west").unwrap();
678 let eu_resp = eu.receive_sync(&us_req);
679 assert_eq!(eu_resp.accepted, 1);
680
681 let eu_req = eu.build_sync_request("us-east").unwrap();
683 let us_resp = us.receive_sync(&eu_req);
684 assert_eq!(us_resp.accepted, 1);
685
686 assert_eq!(us.resolver.seen_count(), 2);
688 assert_eq!(eu.resolver.seen_count(), 2);
689 }
690}