1use super::crdt::{ConflictResolution, CrdtResolver, ReplicatedEvent, VersionVector};
28use super::hlc::{HlcTimestamp, HybridLogicalClock};
29use dashmap::DashMap;
30use serde::{Deserialize, Serialize};
31use std::{sync::Arc, time::Duration};
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct GeoReplicationConfig {
36 pub region_id: String,
38 pub peers: Vec<PeerRegion>,
40 pub sync_interval_ms: u64,
42 pub max_clock_drift_ms: u64,
44 pub batch_size: usize,
46}
47
48impl Default for GeoReplicationConfig {
49 fn default() -> Self {
50 Self {
51 region_id: "default".to_string(),
52 peers: vec![],
53 sync_interval_ms: 1000,
54 max_clock_drift_ms: 60_000,
55 batch_size: 100,
56 }
57 }
58}
59
60impl GeoReplicationConfig {
61 pub fn from_env() -> Option<Self> {
63 let enabled = std::env::var("ALLSOURCE_GEO_REPLICATION_ENABLED")
64 .map(|v| v == "true")
65 .unwrap_or(false);
66
67 if !enabled {
68 return None;
69 }
70
71 let region_id =
72 std::env::var("ALLSOURCE_REGION_ID").unwrap_or_else(|_| "default".to_string());
73
74 let peers_str = std::env::var("ALLSOURCE_GEO_PEERS").unwrap_or_default();
75 let peers: Vec<PeerRegion> = peers_str
76 .split(',')
77 .filter(|s| !s.trim().is_empty())
78 .enumerate()
79 .map(|(i, url)| PeerRegion {
80 region_id: format!("peer-{}", i),
81 api_url: url.trim().to_string(),
82 healthy: true,
83 last_sync_ms: 0,
84 })
85 .collect();
86
87 let sync_interval_ms: u64 = std::env::var("ALLSOURCE_GEO_SYNC_INTERVAL_MS")
88 .ok()
89 .and_then(|v| v.parse().ok())
90 .unwrap_or(1000);
91
92 let max_clock_drift_ms: u64 = std::env::var("ALLSOURCE_GEO_MAX_DRIFT_MS")
93 .ok()
94 .and_then(|v| v.parse().ok())
95 .unwrap_or(60_000);
96
97 let batch_size: usize = std::env::var("ALLSOURCE_GEO_BATCH_SIZE")
98 .ok()
99 .and_then(|v| v.parse().ok())
100 .unwrap_or(100);
101
102 Some(Self {
103 region_id,
104 peers,
105 sync_interval_ms,
106 max_clock_drift_ms,
107 batch_size,
108 })
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct PeerRegion {
115 pub region_id: String,
117 pub api_url: String,
119 pub healthy: bool,
121 pub last_sync_ms: u64,
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127#[serde(rename_all = "lowercase")]
128pub enum PeerHealth {
129 Healthy,
130 Degraded,
131 Unreachable,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct GeoReplicationStatus {
137 pub region_id: String,
139 pub peers: Vec<PeerStatus>,
141 pub events_sent: u64,
143 pub events_received: u64,
145 pub conflicts_resolved: u64,
147 pub current_hlc: HlcTimestamp,
149 pub version_vectors: std::collections::BTreeMap<String, VersionVector>,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct PeerStatus {
156 pub region_id: String,
157 pub api_url: String,
158 pub health: PeerHealth,
159 pub last_sync_ms: u64,
160 pub replication_lag_ms: u64,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct GeoSyncRequest {
166 pub source_region: String,
168 pub events: Vec<ReplicatedEvent>,
170 pub version_vector: VersionVector,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GeoSyncResponse {
177 pub accepted: usize,
179 pub skipped: usize,
181 pub version_vector: VersionVector,
183}
184
185pub struct GeoReplicationManager {
190 config: GeoReplicationConfig,
192 hlc: Arc<HybridLogicalClock>,
194 resolver: Arc<CrdtResolver>,
196 peer_health: DashMap<String, PeerHealth>,
198 outbound_buffer: DashMap<String, Vec<ReplicatedEvent>>,
200 events_sent: std::sync::atomic::AtomicU64,
202 events_received: std::sync::atomic::AtomicU64,
204 conflicts_resolved: std::sync::atomic::AtomicU64,
206}
207
208impl GeoReplicationManager {
209 pub fn new(config: GeoReplicationConfig) -> Self {
211 let node_id: u32 = std::env::var("ALLSOURCE_NODE_ID")
212 .ok()
213 .and_then(|v| v.parse().ok())
214 .unwrap_or(0);
215
216 let hlc = Arc::new(HybridLogicalClock::with_max_drift(
217 node_id,
218 config.max_clock_drift_ms,
219 ));
220
221 let peer_health = DashMap::new();
222 let outbound_buffer = DashMap::new();
223 for peer in &config.peers {
224 peer_health.insert(peer.region_id.clone(), PeerHealth::Healthy);
225 outbound_buffer.insert(peer.region_id.clone(), Vec::new());
226 }
227
228 Self {
229 config,
230 hlc,
231 resolver: Arc::new(CrdtResolver::new()),
232 peer_health,
233 outbound_buffer,
234 events_sent: std::sync::atomic::AtomicU64::new(0),
235 events_received: std::sync::atomic::AtomicU64::new(0),
236 conflicts_resolved: std::sync::atomic::AtomicU64::new(0),
237 }
238 }
239
240 pub fn region_id(&self) -> &str {
242 &self.config.region_id
243 }
244
245 pub fn hlc(&self) -> &Arc<HybridLogicalClock> {
247 &self.hlc
248 }
249
250 pub fn resolver(&self) -> &Arc<CrdtResolver> {
252 &self.resolver
253 }
254
255 pub fn stamp_event(&self, event_id: &str, event_data: serde_json::Value) -> ReplicatedEvent {
257 let ts = self.hlc.now();
258 ReplicatedEvent {
259 event_id: event_id.to_string(),
260 hlc_timestamp: ts,
261 origin_region: self.config.region_id.clone(),
262 event_data,
263 }
264 }
265
266 pub fn queue_for_replication(&self, event: ReplicatedEvent) {
268 for mut buffer in self.outbound_buffer.iter_mut() {
269 buffer.value_mut().push(event.clone());
270 }
271 }
272
273 pub fn drain_outbound(&self, peer_region: &str, max_batch: usize) -> Vec<ReplicatedEvent> {
275 if let Some(mut buffer) = self.outbound_buffer.get_mut(peer_region) {
276 let drain_count = max_batch.min(buffer.len());
277 buffer.drain(..drain_count).collect()
278 } else {
279 vec![]
280 }
281 }
282
283 pub fn receive_sync(&self, request: &GeoSyncRequest) -> GeoSyncResponse {
287 let mut accepted = 0;
288 let mut skipped = 0;
289
290 for event in &request.events {
291 if let Err(e) = self.hlc.receive(&event.hlc_timestamp) {
293 tracing::warn!(
294 "HLC drift violation from region {}: {}",
295 request.source_region,
296 e,
297 );
298 skipped += 1;
299 self.conflicts_resolved
300 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
301 continue;
302 }
303
304 match self.resolver.resolve_and_accept(event) {
305 ConflictResolution::Accept => {
306 accepted += 1;
307 self.events_received
308 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
309 }
310 ConflictResolution::Skip => {
311 skipped += 1;
312 self.conflicts_resolved
313 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
314 }
315 }
316 }
317
318 self.resolver
320 .merge_version_vector(&request.source_region, &request.version_vector);
321
322 let our_vv = self
324 .resolver
325 .version_vector_for(&self.config.region_id)
326 .unwrap_or_default();
327
328 GeoSyncResponse {
329 accepted,
330 skipped,
331 version_vector: our_vv,
332 }
333 }
334
335 pub fn set_peer_health(&self, peer_region: &str, health: PeerHealth) {
337 self.peer_health.insert(peer_region.to_string(), health);
338 }
339
340 pub fn peer_health(&self, peer_region: &str) -> PeerHealth {
342 self.peer_health
343 .get(peer_region)
344 .map(|h| *h)
345 .unwrap_or(PeerHealth::Unreachable)
346 }
347
348 pub fn peers(&self) -> &[PeerRegion] {
350 &self.config.peers
351 }
352
353 pub fn sync_interval(&self) -> Duration {
355 Duration::from_millis(self.config.sync_interval_ms)
356 }
357
358 pub fn batch_size(&self) -> usize {
360 self.config.batch_size
361 }
362
363 pub fn build_sync_request(&self, peer_region: &str) -> Option<GeoSyncRequest> {
365 let events = self.drain_outbound(peer_region, self.config.batch_size);
366 if events.is_empty() {
367 return None;
368 }
369
370 self.events_sent
371 .fetch_add(events.len() as u64, std::sync::atomic::Ordering::Relaxed);
372
373 let vv = self
374 .resolver
375 .version_vector_for(&self.config.region_id)
376 .unwrap_or_default();
377
378 Some(GeoSyncRequest {
379 source_region: self.config.region_id.clone(),
380 events,
381 version_vector: vv,
382 })
383 }
384
385 pub fn select_failover_region(&self) -> Option<String> {
387 self.config
388 .peers
389 .iter()
390 .filter(|p| {
391 self.peer_health
392 .get(&p.region_id)
393 .map(|h| *h == PeerHealth::Healthy)
394 .unwrap_or(false)
395 })
396 .max_by_key(|p| p.last_sync_ms) .map(|p| p.region_id.clone())
398 }
399
400 pub fn status(&self) -> GeoReplicationStatus {
402 let peers: Vec<PeerStatus> = self
403 .config
404 .peers
405 .iter()
406 .map(|p| {
407 let health = self
408 .peer_health
409 .get(&p.region_id)
410 .map(|h| *h)
411 .unwrap_or(PeerHealth::Unreachable);
412
413 let lag_ms = if p.last_sync_ms > 0 {
414 let now_ms = std::time::SystemTime::now()
415 .duration_since(std::time::UNIX_EPOCH)
416 .unwrap_or_default()
417 .as_millis() as u64;
418 now_ms.saturating_sub(p.last_sync_ms)
419 } else {
420 0
421 };
422
423 PeerStatus {
424 region_id: p.region_id.clone(),
425 api_url: p.api_url.clone(),
426 health,
427 last_sync_ms: p.last_sync_ms,
428 replication_lag_ms: lag_ms,
429 }
430 })
431 .collect();
432
433 GeoReplicationStatus {
434 region_id: self.config.region_id.clone(),
435 peers,
436 events_sent: self.events_sent.load(std::sync::atomic::Ordering::Relaxed),
437 events_received: self
438 .events_received
439 .load(std::sync::atomic::Ordering::Relaxed),
440 conflicts_resolved: self
441 .conflicts_resolved
442 .load(std::sync::atomic::Ordering::Relaxed),
443 current_hlc: self.hlc.current(),
444 version_vectors: self.resolver.all_version_vectors(),
445 }
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452
453 fn test_config(region: &str) -> GeoReplicationConfig {
454 GeoReplicationConfig {
455 region_id: region.to_string(),
456 peers: vec![PeerRegion {
457 region_id: "eu-west".to_string(),
458 api_url: "https://eu.core:3900".to_string(),
459 healthy: true,
460 last_sync_ms: 0,
461 }],
462 sync_interval_ms: 1000,
463 max_clock_drift_ms: 60_000,
464 batch_size: 100,
465 }
466 }
467
468 #[test]
469 fn test_stamp_event() {
470 let mgr = GeoReplicationManager::new(test_config("us-east"));
471 let event = mgr.stamp_event("evt-1", serde_json::json!({"foo": "bar"}));
472
473 assert_eq!(event.event_id, "evt-1");
474 assert_eq!(event.origin_region, "us-east");
475 assert!(event.hlc_timestamp.physical_ms > 0);
476 }
477
478 #[test]
479 fn test_queue_and_drain() {
480 let mgr = GeoReplicationManager::new(test_config("us-east"));
481 let event = mgr.stamp_event("evt-1", serde_json::json!({}));
482
483 mgr.queue_for_replication(event);
484
485 let batch = mgr.drain_outbound("eu-west", 10);
486 assert_eq!(batch.len(), 1);
487 assert_eq!(batch[0].event_id, "evt-1");
488
489 let batch2 = mgr.drain_outbound("eu-west", 10);
491 assert!(batch2.is_empty());
492 }
493
494 #[test]
495 fn test_receive_sync_accepts_new_events() {
496 let mgr = GeoReplicationManager::new(test_config("us-east"));
497
498 let request = GeoSyncRequest {
499 source_region: "eu-west".to_string(),
500 events: vec![ReplicatedEvent {
501 event_id: "evt-remote-1".to_string(),
502 hlc_timestamp: HlcTimestamp::new(
503 std::time::SystemTime::now()
504 .duration_since(std::time::UNIX_EPOCH)
505 .unwrap()
506 .as_millis() as u64,
507 0,
508 2,
509 ),
510 origin_region: "eu-west".to_string(),
511 event_data: serde_json::json!({"source": "eu"}),
512 }],
513 version_vector: VersionVector::new(),
514 };
515
516 let response = mgr.receive_sync(&request);
517 assert_eq!(response.accepted, 1);
518 assert_eq!(response.skipped, 0);
519 }
520
521 #[test]
522 fn test_receive_sync_skips_duplicates() {
523 let mgr = GeoReplicationManager::new(test_config("us-east"));
524 let now_ms = std::time::SystemTime::now()
525 .duration_since(std::time::UNIX_EPOCH)
526 .unwrap()
527 .as_millis() as u64;
528
529 let event = ReplicatedEvent {
530 event_id: "evt-dup".to_string(),
531 hlc_timestamp: HlcTimestamp::new(now_ms, 0, 2),
532 origin_region: "eu-west".to_string(),
533 event_data: serde_json::json!({}),
534 };
535
536 let request = GeoSyncRequest {
537 source_region: "eu-west".to_string(),
538 events: vec![event.clone(), event],
539 version_vector: VersionVector::new(),
540 };
541
542 let response = mgr.receive_sync(&request);
543 assert_eq!(response.accepted, 1);
544 assert_eq!(response.skipped, 1);
545 }
546
547 #[test]
548 fn test_build_sync_request() {
549 let mgr = GeoReplicationManager::new(test_config("us-east"));
550 let event = mgr.stamp_event("evt-1", serde_json::json!({}));
551 mgr.queue_for_replication(event);
552
553 let req = mgr.build_sync_request("eu-west");
554 assert!(req.is_some());
555 let req = req.unwrap();
556 assert_eq!(req.source_region, "us-east");
557 assert_eq!(req.events.len(), 1);
558 }
559
560 #[test]
561 fn test_build_sync_request_empty() {
562 let mgr = GeoReplicationManager::new(test_config("us-east"));
563 let req = mgr.build_sync_request("eu-west");
564 assert!(req.is_none());
565 }
566
567 #[test]
568 fn test_peer_health_tracking() {
569 let mgr = GeoReplicationManager::new(test_config("us-east"));
570 assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Healthy);
571
572 mgr.set_peer_health("eu-west", PeerHealth::Degraded);
573 assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Degraded);
574
575 mgr.set_peer_health("eu-west", PeerHealth::Unreachable);
576 assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Unreachable);
577 }
578
579 #[test]
580 fn test_select_failover_region() {
581 let config = GeoReplicationConfig {
582 region_id: "us-east".to_string(),
583 peers: vec![
584 PeerRegion {
585 region_id: "eu-west".to_string(),
586 api_url: "https://eu.core:3900".to_string(),
587 healthy: true,
588 last_sync_ms: 100,
589 },
590 PeerRegion {
591 region_id: "ap-east".to_string(),
592 api_url: "https://ap.core:3900".to_string(),
593 healthy: true,
594 last_sync_ms: 200,
595 },
596 ],
597 ..Default::default()
598 };
599 let mgr = GeoReplicationManager::new(config);
600
601 let failover = mgr.select_failover_region();
603 assert_eq!(failover, Some("ap-east".to_string()));
604 }
605
606 #[test]
607 fn test_select_failover_skips_unhealthy() {
608 let config = GeoReplicationConfig {
609 region_id: "us-east".to_string(),
610 peers: vec![
611 PeerRegion {
612 region_id: "eu-west".to_string(),
613 api_url: "https://eu.core:3900".to_string(),
614 healthy: true,
615 last_sync_ms: 200,
616 },
617 PeerRegion {
618 region_id: "ap-east".to_string(),
619 api_url: "https://ap.core:3900".to_string(),
620 healthy: true,
621 last_sync_ms: 300,
622 },
623 ],
624 ..Default::default()
625 };
626 let mgr = GeoReplicationManager::new(config);
627 mgr.set_peer_health("ap-east", PeerHealth::Unreachable);
628
629 let failover = mgr.select_failover_region();
630 assert_eq!(failover, Some("eu-west".to_string()));
631 }
632
633 #[test]
634 fn test_status() {
635 let mgr = GeoReplicationManager::new(test_config("us-east"));
636 let status = mgr.status();
637
638 assert_eq!(status.region_id, "us-east");
639 assert_eq!(status.peers.len(), 1);
640 assert_eq!(status.events_sent, 0);
641 assert_eq!(status.events_received, 0);
642 assert_eq!(status.conflicts_resolved, 0);
643 }
644
645 #[test]
646 fn test_two_region_convergence() {
647 let us = GeoReplicationManager::new(GeoReplicationConfig {
649 region_id: "us-east".to_string(),
650 peers: vec![PeerRegion {
651 region_id: "eu-west".to_string(),
652 api_url: "http://eu:3900".to_string(),
653 healthy: true,
654 last_sync_ms: 0,
655 }],
656 ..Default::default()
657 });
658 let eu = GeoReplicationManager::new(GeoReplicationConfig {
659 region_id: "eu-west".to_string(),
660 peers: vec![PeerRegion {
661 region_id: "us-east".to_string(),
662 api_url: "http://us:3900".to_string(),
663 healthy: true,
664 last_sync_ms: 0,
665 }],
666 ..Default::default()
667 });
668
669 let evt1 = us.stamp_event("evt-1", serde_json::json!({"from": "us"}));
671 us.resolver.resolve_and_accept(&evt1);
672 us.queue_for_replication(evt1);
673
674 let evt2 = eu.stamp_event("evt-2", serde_json::json!({"from": "eu"}));
676 eu.resolver.resolve_and_accept(&evt2);
677 eu.queue_for_replication(evt2);
678
679 let us_req = us.build_sync_request("eu-west").unwrap();
681 let eu_resp = eu.receive_sync(&us_req);
682 assert_eq!(eu_resp.accepted, 1);
683
684 let eu_req = eu.build_sync_request("us-east").unwrap();
686 let us_resp = us.receive_sync(&eu_req);
687 assert_eq!(us_resp.accepted, 1);
688
689 assert_eq!(us.resolver.seen_count(), 2);
691 assert_eq!(eu.resolver.seen_count(), 2);
692 }
693}