Skip to main content

allsource_core/infrastructure/cluster/
geo_replication.rs

1/// Geo-Replication Manager for cross-region event replication.
2///
3/// Coordinates replication between AllSource Core instances in different
4/// geographic regions. Uses HLC for causal ordering and CRDTs for
5/// conflict-free convergence.
6///
7/// # Architecture
8///
9/// ```text
10/// Region US-East          Region EU-West          Region AP-East
11/// ┌─────────────┐        ┌─────────────┐        ┌─────────────┐
12/// │ Core Leader  │◄──────►│ Core Leader  │◄──────►│ Core Leader  │
13/// │ (writes OK)  │  sync  │ (writes OK)  │  sync  │ (writes OK)  │
14/// │ HLC + CRDT   │        │ HLC + CRDT   │        │ HLC + CRDT   │
15/// └─────────────┘        └─────────────┘        └─────────────┘
16/// ```
17///
18/// Each region is an independent leader that accepts writes locally.
19/// Events are replicated asynchronously to peer regions using the
20/// GeoReplicationManager. CRDT resolution ensures convergence.
21///
22/// # Opt-in
23///
24/// Enabled via `ALLSOURCE_GEO_REPLICATION_ENABLED=true` with:
25/// - `ALLSOURCE_REGION_ID`: this region's identifier (e.g., "us-east-1")
26/// - `ALLSOURCE_GEO_PEERS`: comma-separated peer URLs (e.g., "https://eu.core:3900,https://ap.core:3900")
27use super::crdt::{ConflictResolution, CrdtResolver, ReplicatedEvent, VersionVector};
28use super::hlc::{HlcTimestamp, HybridLogicalClock};
29use dashmap::DashMap;
30use serde::{Deserialize, Serialize};
31use std::{sync::Arc, time::Duration};
32
33/// Configuration for geo-replication.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct GeoReplicationConfig {
36    /// This region's unique identifier.
37    pub region_id: String,
38    /// Peer region endpoints for replication.
39    pub peers: Vec<PeerRegion>,
40    /// Sync interval for pushing events to peers (ms).
41    pub sync_interval_ms: u64,
42    /// Maximum HLC clock drift tolerance (ms).
43    pub max_clock_drift_ms: u64,
44    /// Batch size for replication sync.
45    pub batch_size: usize,
46}
47
48impl Default for GeoReplicationConfig {
49    fn default() -> Self {
50        Self {
51            region_id: "default".to_string(),
52            peers: vec![],
53            sync_interval_ms: 1000,
54            max_clock_drift_ms: 60_000,
55            batch_size: 100,
56        }
57    }
58}
59
60impl GeoReplicationConfig {
61    /// Load configuration from environment variables.
62    pub fn from_env() -> Option<Self> {
63        let enabled = std::env::var("ALLSOURCE_GEO_REPLICATION_ENABLED")
64            .map(|v| v == "true")
65            .unwrap_or(false);
66
67        if !enabled {
68            return None;
69        }
70
71        let region_id =
72            std::env::var("ALLSOURCE_REGION_ID").unwrap_or_else(|_| "default".to_string());
73
74        let peers_str = std::env::var("ALLSOURCE_GEO_PEERS").unwrap_or_default();
75        let peers: Vec<PeerRegion> = peers_str
76            .split(',')
77            .filter(|s| !s.trim().is_empty())
78            .enumerate()
79            .map(|(i, url)| PeerRegion {
80                region_id: format!("peer-{}", i),
81                api_url: url.trim().to_string(),
82                healthy: true,
83                last_sync_ms: 0,
84            })
85            .collect();
86
87        let sync_interval_ms: u64 = std::env::var("ALLSOURCE_GEO_SYNC_INTERVAL_MS")
88            .ok()
89            .and_then(|v| v.parse().ok())
90            .unwrap_or(1000);
91
92        let max_clock_drift_ms: u64 = std::env::var("ALLSOURCE_GEO_MAX_DRIFT_MS")
93            .ok()
94            .and_then(|v| v.parse().ok())
95            .unwrap_or(60_000);
96
97        let batch_size: usize = std::env::var("ALLSOURCE_GEO_BATCH_SIZE")
98            .ok()
99            .and_then(|v| v.parse().ok())
100            .unwrap_or(100);
101
102        Some(Self {
103            region_id,
104            peers,
105            sync_interval_ms,
106            max_clock_drift_ms,
107            batch_size,
108        })
109    }
110}
111
112/// A peer region endpoint.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct PeerRegion {
115    /// Region identifier.
116    pub region_id: String,
117    /// HTTP API URL for the peer's Core instance.
118    pub api_url: String,
119    /// Whether the peer is currently healthy.
120    pub healthy: bool,
121    /// Last successful sync timestamp (ms since epoch).
122    pub last_sync_ms: u64,
123}
124
125/// Health status of a peer region.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127#[serde(rename_all = "lowercase")]
128pub enum PeerHealth {
129    Healthy,
130    Degraded,
131    Unreachable,
132}
133
134/// Status of the geo-replication system.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct GeoReplicationStatus {
137    /// This region's ID.
138    pub region_id: String,
139    /// Peer regions and their health.
140    pub peers: Vec<PeerStatus>,
141    /// Total events replicated out.
142    pub events_sent: u64,
143    /// Total events received from peers.
144    pub events_received: u64,
145    /// Total conflicts resolved (skipped duplicates).
146    pub conflicts_resolved: u64,
147    /// Current HLC state.
148    pub current_hlc: HlcTimestamp,
149    /// Version vectors per region.
150    pub version_vectors: std::collections::BTreeMap<String, VersionVector>,
151}
152
153/// Status of a single peer region.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct PeerStatus {
156    pub region_id: String,
157    pub api_url: String,
158    pub health: PeerHealth,
159    pub last_sync_ms: u64,
160    pub replication_lag_ms: u64,
161}
162
163/// Replication sync request sent to a peer.
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct GeoSyncRequest {
166    /// Source region ID.
167    pub source_region: String,
168    /// Events to replicate.
169    pub events: Vec<ReplicatedEvent>,
170    /// Source's version vector for this peer.
171    pub version_vector: VersionVector,
172}
173
174/// Replication sync response from a peer.
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GeoSyncResponse {
177    /// Number of events accepted.
178    pub accepted: usize,
179    /// Number of events skipped (duplicates).
180    pub skipped: usize,
181    /// Peer's current version vector (for bi-directional sync).
182    pub version_vector: VersionVector,
183}
184
185/// Geo-Replication Manager.
186///
187/// Manages cross-region event replication with CRDT conflict resolution
188/// and HLC-based causal ordering.
189pub struct GeoReplicationManager {
190    /// Configuration.
191    config: GeoReplicationConfig,
192    /// Hybrid Logical Clock for this region.
193    hlc: Arc<HybridLogicalClock>,
194    /// CRDT resolver for conflict resolution.
195    resolver: Arc<CrdtResolver>,
196    /// Peer health tracking.
197    peer_health: DashMap<String, PeerHealth>,
198    /// Outbound event buffer (events pending replication to peers).
199    outbound_buffer: DashMap<String, Vec<ReplicatedEvent>>,
200    /// Counter: events sent.
201    events_sent: std::sync::atomic::AtomicU64,
202    /// Counter: events received.
203    events_received: std::sync::atomic::AtomicU64,
204    /// Counter: conflicts resolved.
205    conflicts_resolved: std::sync::atomic::AtomicU64,
206}
207
208impl GeoReplicationManager {
209    /// Create a new geo-replication manager.
210    pub fn new(config: GeoReplicationConfig) -> Self {
211        let node_id: u32 = std::env::var("ALLSOURCE_NODE_ID")
212            .ok()
213            .and_then(|v| v.parse().ok())
214            .unwrap_or(0);
215
216        let hlc = Arc::new(HybridLogicalClock::with_max_drift(
217            node_id,
218            config.max_clock_drift_ms,
219        ));
220
221        let peer_health = DashMap::new();
222        let outbound_buffer = DashMap::new();
223        for peer in &config.peers {
224            peer_health.insert(peer.region_id.clone(), PeerHealth::Healthy);
225            outbound_buffer.insert(peer.region_id.clone(), Vec::new());
226        }
227
228        Self {
229            config,
230            hlc,
231            resolver: Arc::new(CrdtResolver::new()),
232            peer_health,
233            outbound_buffer,
234            events_sent: std::sync::atomic::AtomicU64::new(0),
235            events_received: std::sync::atomic::AtomicU64::new(0),
236            conflicts_resolved: std::sync::atomic::AtomicU64::new(0),
237        }
238    }
239
240    /// Get this region's ID.
241    pub fn region_id(&self) -> &str {
242        &self.config.region_id
243    }
244
245    /// Get the HLC instance.
246    pub fn hlc(&self) -> &Arc<HybridLogicalClock> {
247        &self.hlc
248    }
249
250    /// Get the CRDT resolver.
251    pub fn resolver(&self) -> &Arc<CrdtResolver> {
252        &self.resolver
253    }
254
255    /// Stamp a new local event with an HLC timestamp.
256    pub fn stamp_event(&self, event_id: &str, event_data: serde_json::Value) -> ReplicatedEvent {
257        let ts = self.hlc.now();
258        ReplicatedEvent {
259            event_id: event_id.to_string(),
260            hlc_timestamp: ts,
261            origin_region: self.config.region_id.clone(),
262            event_data,
263        }
264    }
265
266    /// Queue an event for replication to all peers.
267    pub fn queue_for_replication(&self, event: ReplicatedEvent) {
268        for mut buffer in self.outbound_buffer.iter_mut() {
269            buffer.value_mut().push(event.clone());
270        }
271    }
272
273    /// Drain the outbound buffer for a specific peer.
274    pub fn drain_outbound(&self, peer_region: &str, max_batch: usize) -> Vec<ReplicatedEvent> {
275        if let Some(mut buffer) = self.outbound_buffer.get_mut(peer_region) {
276            let drain_count = max_batch.min(buffer.len());
277            buffer.drain(..drain_count).collect()
278        } else {
279            vec![]
280        }
281    }
282
283    /// Receive events from a peer region.
284    ///
285    /// Applies CRDT conflict resolution and returns the sync response.
286    pub fn receive_sync(&self, request: &GeoSyncRequest) -> GeoSyncResponse {
287        let mut accepted = 0;
288        let mut skipped = 0;
289
290        for event in &request.events {
291            // Update HLC from remote timestamp
292            if let Err(e) = self.hlc.receive(&event.hlc_timestamp) {
293                tracing::warn!(
294                    "HLC drift violation from region {}: {}",
295                    request.source_region,
296                    e,
297                );
298                skipped += 1;
299                self.conflicts_resolved
300                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
301                continue;
302            }
303
304            match self.resolver.resolve_and_accept(event) {
305                ConflictResolution::Accept => {
306                    accepted += 1;
307                    self.events_received
308                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
309                }
310                ConflictResolution::Skip => {
311                    skipped += 1;
312                    self.conflicts_resolved
313                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
314                }
315            }
316        }
317
318        // Merge remote version vector
319        self.resolver
320            .merge_version_vector(&request.source_region, &request.version_vector);
321
322        // Build our version vector for the response
323        let our_vv = self
324            .resolver
325            .version_vector_for(&self.config.region_id)
326            .unwrap_or_default();
327
328        GeoSyncResponse {
329            accepted,
330            skipped,
331            version_vector: our_vv,
332        }
333    }
334
335    /// Update a peer's health status.
336    pub fn set_peer_health(&self, peer_region: &str, health: PeerHealth) {
337        self.peer_health.insert(peer_region.to_string(), health);
338    }
339
340    /// Get a peer's health status.
341    pub fn peer_health(&self, peer_region: &str) -> PeerHealth {
342        self.peer_health
343            .get(peer_region)
344            .map(|h| *h)
345            .unwrap_or(PeerHealth::Unreachable)
346    }
347
348    /// Get all peers.
349    pub fn peers(&self) -> &[PeerRegion] {
350        &self.config.peers
351    }
352
353    /// Get sync interval.
354    pub fn sync_interval(&self) -> Duration {
355        Duration::from_millis(self.config.sync_interval_ms)
356    }
357
358    /// Get batch size.
359    pub fn batch_size(&self) -> usize {
360        self.config.batch_size
361    }
362
363    /// Build a sync request for a specific peer.
364    pub fn build_sync_request(&self, peer_region: &str) -> Option<GeoSyncRequest> {
365        let events = self.drain_outbound(peer_region, self.config.batch_size);
366        if events.is_empty() {
367            return None;
368        }
369
370        self.events_sent
371            .fetch_add(events.len() as u64, std::sync::atomic::Ordering::Relaxed);
372
373        let vv = self
374            .resolver
375            .version_vector_for(&self.config.region_id)
376            .unwrap_or_default();
377
378        Some(GeoSyncRequest {
379            source_region: self.config.region_id.clone(),
380            events,
381            version_vector: vv,
382        })
383    }
384
385    /// Select the best failover region (lowest replication lag, healthy).
386    pub fn select_failover_region(&self) -> Option<String> {
387        self.config
388            .peers
389            .iter()
390            .filter(|p| {
391                self.peer_health
392                    .get(&p.region_id)
393                    .map(|h| *h == PeerHealth::Healthy)
394                    .unwrap_or(false)
395            })
396            .max_by_key(|p| p.last_sync_ms) // most recently synced = least data loss
397            .map(|p| p.region_id.clone())
398    }
399
400    /// Get full replication status.
401    pub fn status(&self) -> GeoReplicationStatus {
402        let peers: Vec<PeerStatus> = self
403            .config
404            .peers
405            .iter()
406            .map(|p| {
407                let health = self
408                    .peer_health
409                    .get(&p.region_id)
410                    .map(|h| *h)
411                    .unwrap_or(PeerHealth::Unreachable);
412
413                let lag_ms = if p.last_sync_ms > 0 {
414                    let now_ms = std::time::SystemTime::now()
415                        .duration_since(std::time::UNIX_EPOCH)
416                        .unwrap_or_default()
417                        .as_millis() as u64;
418                    now_ms.saturating_sub(p.last_sync_ms)
419                } else {
420                    0
421                };
422
423                PeerStatus {
424                    region_id: p.region_id.clone(),
425                    api_url: p.api_url.clone(),
426                    health,
427                    last_sync_ms: p.last_sync_ms,
428                    replication_lag_ms: lag_ms,
429                }
430            })
431            .collect();
432
433        GeoReplicationStatus {
434            region_id: self.config.region_id.clone(),
435            peers,
436            events_sent: self.events_sent.load(std::sync::atomic::Ordering::Relaxed),
437            events_received: self
438                .events_received
439                .load(std::sync::atomic::Ordering::Relaxed),
440            conflicts_resolved: self
441                .conflicts_resolved
442                .load(std::sync::atomic::Ordering::Relaxed),
443            current_hlc: self.hlc.current(),
444            version_vectors: self.resolver.all_version_vectors(),
445        }
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452
453    fn test_config(region: &str) -> GeoReplicationConfig {
454        GeoReplicationConfig {
455            region_id: region.to_string(),
456            peers: vec![PeerRegion {
457                region_id: "eu-west".to_string(),
458                api_url: "https://eu.core:3900".to_string(),
459                healthy: true,
460                last_sync_ms: 0,
461            }],
462            sync_interval_ms: 1000,
463            max_clock_drift_ms: 60_000,
464            batch_size: 100,
465        }
466    }
467
468    #[test]
469    fn test_stamp_event() {
470        let mgr = GeoReplicationManager::new(test_config("us-east"));
471        let event = mgr.stamp_event("evt-1", serde_json::json!({"foo": "bar"}));
472
473        assert_eq!(event.event_id, "evt-1");
474        assert_eq!(event.origin_region, "us-east");
475        assert!(event.hlc_timestamp.physical_ms > 0);
476    }
477
478    #[test]
479    fn test_queue_and_drain() {
480        let mgr = GeoReplicationManager::new(test_config("us-east"));
481        let event = mgr.stamp_event("evt-1", serde_json::json!({}));
482
483        mgr.queue_for_replication(event);
484
485        let batch = mgr.drain_outbound("eu-west", 10);
486        assert_eq!(batch.len(), 1);
487        assert_eq!(batch[0].event_id, "evt-1");
488
489        // Drained — should be empty now
490        let batch2 = mgr.drain_outbound("eu-west", 10);
491        assert!(batch2.is_empty());
492    }
493
494    #[test]
495    fn test_receive_sync_accepts_new_events() {
496        let mgr = GeoReplicationManager::new(test_config("us-east"));
497
498        let request = GeoSyncRequest {
499            source_region: "eu-west".to_string(),
500            events: vec![ReplicatedEvent {
501                event_id: "evt-remote-1".to_string(),
502                hlc_timestamp: HlcTimestamp::new(
503                    std::time::SystemTime::now()
504                        .duration_since(std::time::UNIX_EPOCH)
505                        .unwrap()
506                        .as_millis() as u64,
507                    0,
508                    2,
509                ),
510                origin_region: "eu-west".to_string(),
511                event_data: serde_json::json!({"source": "eu"}),
512            }],
513            version_vector: VersionVector::new(),
514        };
515
516        let response = mgr.receive_sync(&request);
517        assert_eq!(response.accepted, 1);
518        assert_eq!(response.skipped, 0);
519    }
520
521    #[test]
522    fn test_receive_sync_skips_duplicates() {
523        let mgr = GeoReplicationManager::new(test_config("us-east"));
524        let now_ms = std::time::SystemTime::now()
525            .duration_since(std::time::UNIX_EPOCH)
526            .unwrap()
527            .as_millis() as u64;
528
529        let event = ReplicatedEvent {
530            event_id: "evt-dup".to_string(),
531            hlc_timestamp: HlcTimestamp::new(now_ms, 0, 2),
532            origin_region: "eu-west".to_string(),
533            event_data: serde_json::json!({}),
534        };
535
536        let request = GeoSyncRequest {
537            source_region: "eu-west".to_string(),
538            events: vec![event.clone(), event],
539            version_vector: VersionVector::new(),
540        };
541
542        let response = mgr.receive_sync(&request);
543        assert_eq!(response.accepted, 1);
544        assert_eq!(response.skipped, 1);
545    }
546
547    #[test]
548    fn test_build_sync_request() {
549        let mgr = GeoReplicationManager::new(test_config("us-east"));
550        let event = mgr.stamp_event("evt-1", serde_json::json!({}));
551        mgr.queue_for_replication(event);
552
553        let req = mgr.build_sync_request("eu-west");
554        assert!(req.is_some());
555        let req = req.unwrap();
556        assert_eq!(req.source_region, "us-east");
557        assert_eq!(req.events.len(), 1);
558    }
559
560    #[test]
561    fn test_build_sync_request_empty() {
562        let mgr = GeoReplicationManager::new(test_config("us-east"));
563        let req = mgr.build_sync_request("eu-west");
564        assert!(req.is_none());
565    }
566
567    #[test]
568    fn test_peer_health_tracking() {
569        let mgr = GeoReplicationManager::new(test_config("us-east"));
570        assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Healthy);
571
572        mgr.set_peer_health("eu-west", PeerHealth::Degraded);
573        assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Degraded);
574
575        mgr.set_peer_health("eu-west", PeerHealth::Unreachable);
576        assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Unreachable);
577    }
578
579    #[test]
580    fn test_select_failover_region() {
581        let config = GeoReplicationConfig {
582            region_id: "us-east".to_string(),
583            peers: vec![
584                PeerRegion {
585                    region_id: "eu-west".to_string(),
586                    api_url: "https://eu.core:3900".to_string(),
587                    healthy: true,
588                    last_sync_ms: 100,
589                },
590                PeerRegion {
591                    region_id: "ap-east".to_string(),
592                    api_url: "https://ap.core:3900".to_string(),
593                    healthy: true,
594                    last_sync_ms: 200,
595                },
596            ],
597            ..Default::default()
598        };
599        let mgr = GeoReplicationManager::new(config);
600
601        // ap-east synced more recently → preferred failover
602        let failover = mgr.select_failover_region();
603        assert_eq!(failover, Some("ap-east".to_string()));
604    }
605
606    #[test]
607    fn test_select_failover_skips_unhealthy() {
608        let config = GeoReplicationConfig {
609            region_id: "us-east".to_string(),
610            peers: vec![
611                PeerRegion {
612                    region_id: "eu-west".to_string(),
613                    api_url: "https://eu.core:3900".to_string(),
614                    healthy: true,
615                    last_sync_ms: 200,
616                },
617                PeerRegion {
618                    region_id: "ap-east".to_string(),
619                    api_url: "https://ap.core:3900".to_string(),
620                    healthy: true,
621                    last_sync_ms: 300,
622                },
623            ],
624            ..Default::default()
625        };
626        let mgr = GeoReplicationManager::new(config);
627        mgr.set_peer_health("ap-east", PeerHealth::Unreachable);
628
629        let failover = mgr.select_failover_region();
630        assert_eq!(failover, Some("eu-west".to_string()));
631    }
632
633    #[test]
634    fn test_status() {
635        let mgr = GeoReplicationManager::new(test_config("us-east"));
636        let status = mgr.status();
637
638        assert_eq!(status.region_id, "us-east");
639        assert_eq!(status.peers.len(), 1);
640        assert_eq!(status.events_sent, 0);
641        assert_eq!(status.events_received, 0);
642        assert_eq!(status.conflicts_resolved, 0);
643    }
644
645    #[test]
646    fn test_two_region_convergence() {
647        // Simulate two regions exchanging events
648        let us = GeoReplicationManager::new(GeoReplicationConfig {
649            region_id: "us-east".to_string(),
650            peers: vec![PeerRegion {
651                region_id: "eu-west".to_string(),
652                api_url: "http://eu:3900".to_string(),
653                healthy: true,
654                last_sync_ms: 0,
655            }],
656            ..Default::default()
657        });
658        let eu = GeoReplicationManager::new(GeoReplicationConfig {
659            region_id: "eu-west".to_string(),
660            peers: vec![PeerRegion {
661                region_id: "us-east".to_string(),
662                api_url: "http://us:3900".to_string(),
663                healthy: true,
664                last_sync_ms: 0,
665            }],
666            ..Default::default()
667        });
668
669        // US writes event 1
670        let evt1 = us.stamp_event("evt-1", serde_json::json!({"from": "us"}));
671        us.resolver.resolve_and_accept(&evt1);
672        us.queue_for_replication(evt1);
673
674        // EU writes event 2
675        let evt2 = eu.stamp_event("evt-2", serde_json::json!({"from": "eu"}));
676        eu.resolver.resolve_and_accept(&evt2);
677        eu.queue_for_replication(evt2);
678
679        // US → EU sync
680        let us_req = us.build_sync_request("eu-west").unwrap();
681        let eu_resp = eu.receive_sync(&us_req);
682        assert_eq!(eu_resp.accepted, 1);
683
684        // EU → US sync
685        let eu_req = eu.build_sync_request("us-east").unwrap();
686        let us_resp = us.receive_sync(&eu_req);
687        assert_eq!(us_resp.accepted, 1);
688
689        // Both regions now have both events
690        assert_eq!(us.resolver.seen_count(), 2);
691        assert_eq!(eu.resolver.seen_count(), 2);
692    }
693}