Skip to main content

allsource_core/infrastructure/cluster/
geo_replication.rs

1/// Geo-Replication Manager for cross-region event replication.
2///
3/// Coordinates replication between AllSource Core instances in different
4/// geographic regions. Uses HLC for causal ordering and CRDTs for
5/// conflict-free convergence.
6///
7/// # Architecture
8///
9/// ```text
10/// Region US-East          Region EU-West          Region AP-East
11/// ┌─────────────┐        ┌─────────────┐        ┌─────────────┐
12/// │ Core Leader  │◄──────►│ Core Leader  │◄──────►│ Core Leader  │
13/// │ (writes OK)  │  sync  │ (writes OK)  │  sync  │ (writes OK)  │
14/// │ HLC + CRDT   │        │ HLC + CRDT   │        │ HLC + CRDT   │
15/// └─────────────┘        └─────────────┘        └─────────────┘
16/// ```
17///
18/// Each region is an independent leader that accepts writes locally.
19/// Events are replicated asynchronously to peer regions using the
20/// GeoReplicationManager. CRDT resolution ensures convergence.
21///
22/// # Opt-in
23///
24/// Enabled via `ALLSOURCE_GEO_REPLICATION_ENABLED=true` with:
25/// - `ALLSOURCE_REGION_ID`: this region's identifier (e.g., "us-east-1")
26/// - `ALLSOURCE_GEO_PEERS`: comma-separated peer URLs (e.g., "https://eu.core:3900,https://ap.core:3900")
27use super::crdt::{ConflictResolution, CrdtResolver, ReplicatedEvent, VersionVector};
28use super::hlc::{HlcTimestamp, HybridLogicalClock};
29use dashmap::DashMap;
30use serde::{Deserialize, Serialize};
31use std::{sync::Arc, time::Duration};
32
33/// Configuration for geo-replication.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct GeoReplicationConfig {
36    /// This region's unique identifier.
37    pub region_id: String,
38    /// Peer region endpoints for replication.
39    pub peers: Vec<PeerRegion>,
40    /// Sync interval for pushing events to peers (ms).
41    pub sync_interval_ms: u64,
42    /// Maximum HLC clock drift tolerance (ms).
43    pub max_clock_drift_ms: u64,
44    /// Batch size for replication sync.
45    pub batch_size: usize,
46}
47
48impl Default for GeoReplicationConfig {
49    fn default() -> Self {
50        Self {
51            region_id: "default".to_string(),
52            peers: vec![],
53            sync_interval_ms: 1000,
54            max_clock_drift_ms: 60_000,
55            batch_size: 100,
56        }
57    }
58}
59
60impl GeoReplicationConfig {
61    /// Load configuration from environment variables.
62    pub fn from_env() -> Option<Self> {
63        let enabled = std::env::var("ALLSOURCE_GEO_REPLICATION_ENABLED")
64            .map(|v| v == "true")
65            .unwrap_or(false);
66
67        if !enabled {
68            return None;
69        }
70
71        let region_id =
72            std::env::var("ALLSOURCE_REGION_ID").unwrap_or_else(|_| "default".to_string());
73
74        let peers_str = std::env::var("ALLSOURCE_GEO_PEERS").unwrap_or_default();
75        let peers: Vec<PeerRegion> = peers_str
76            .split(',')
77            .filter(|s| !s.trim().is_empty())
78            .enumerate()
79            .map(|(i, url)| PeerRegion {
80                region_id: format!("peer-{i}"),
81                api_url: url.trim().to_string(),
82                healthy: true,
83                last_sync_ms: 0,
84            })
85            .collect();
86
87        let sync_interval_ms: u64 = std::env::var("ALLSOURCE_GEO_SYNC_INTERVAL_MS")
88            .ok()
89            .and_then(|v| v.parse().ok())
90            .unwrap_or(1000);
91
92        let max_clock_drift_ms: u64 = std::env::var("ALLSOURCE_GEO_MAX_DRIFT_MS")
93            .ok()
94            .and_then(|v| v.parse().ok())
95            .unwrap_or(60_000);
96
97        let batch_size: usize = std::env::var("ALLSOURCE_GEO_BATCH_SIZE")
98            .ok()
99            .and_then(|v| v.parse().ok())
100            .unwrap_or(100);
101
102        Some(Self {
103            region_id,
104            peers,
105            sync_interval_ms,
106            max_clock_drift_ms,
107            batch_size,
108        })
109    }
110}
111
112/// A peer region endpoint.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct PeerRegion {
115    /// Region identifier.
116    pub region_id: String,
117    /// HTTP API URL for the peer's Core instance.
118    pub api_url: String,
119    /// Whether the peer is currently healthy.
120    pub healthy: bool,
121    /// Last successful sync timestamp (ms since epoch).
122    pub last_sync_ms: u64,
123}
124
125/// Health status of a peer region.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
127#[serde(rename_all = "lowercase")]
128pub enum PeerHealth {
129    Healthy,
130    Degraded,
131    Unreachable,
132}
133
134/// Status of the geo-replication system.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct GeoReplicationStatus {
137    /// This region's ID.
138    pub region_id: String,
139    /// Peer regions and their health.
140    pub peers: Vec<PeerStatus>,
141    /// Total events replicated out.
142    pub events_sent: u64,
143    /// Total events received from peers.
144    pub events_received: u64,
145    /// Total conflicts resolved (skipped duplicates).
146    pub conflicts_resolved: u64,
147    /// Current HLC state.
148    pub current_hlc: HlcTimestamp,
149    /// Version vectors per region.
150    pub version_vectors: std::collections::BTreeMap<String, VersionVector>,
151}
152
153/// Status of a single peer region.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct PeerStatus {
156    pub region_id: String,
157    pub api_url: String,
158    pub health: PeerHealth,
159    pub last_sync_ms: u64,
160    pub replication_lag_ms: u64,
161}
162
163/// Replication sync request sent to a peer.
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct GeoSyncRequest {
166    /// Source region ID.
167    pub source_region: String,
168    /// Events to replicate.
169    pub events: Vec<ReplicatedEvent>,
170    /// Source's version vector for this peer.
171    pub version_vector: VersionVector,
172}
173
174/// Replication sync response from a peer.
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GeoSyncResponse {
177    /// Number of events accepted.
178    pub accepted: usize,
179    /// Number of events skipped (duplicates).
180    pub skipped: usize,
181    /// Peer's current version vector (for bi-directional sync).
182    pub version_vector: VersionVector,
183}
184
185/// Geo-Replication Manager.
186///
187/// Manages cross-region event replication with CRDT conflict resolution
188/// and HLC-based causal ordering.
189pub struct GeoReplicationManager {
190    /// Configuration.
191    config: GeoReplicationConfig,
192    /// Hybrid Logical Clock for this region.
193    hlc: Arc<HybridLogicalClock>,
194    /// CRDT resolver for conflict resolution.
195    resolver: Arc<CrdtResolver>,
196    /// Peer health tracking.
197    peer_health: DashMap<String, PeerHealth>,
198    /// Outbound event buffer (events pending replication to peers).
199    outbound_buffer: DashMap<String, Vec<ReplicatedEvent>>,
200    /// Counter: events sent.
201    events_sent: std::sync::atomic::AtomicU64,
202    /// Counter: events received.
203    events_received: std::sync::atomic::AtomicU64,
204    /// Counter: conflicts resolved.
205    conflicts_resolved: std::sync::atomic::AtomicU64,
206}
207
208impl GeoReplicationManager {
209    /// Create a new geo-replication manager.
210    pub fn new(config: GeoReplicationConfig) -> Self {
211        let node_id: u32 = std::env::var("ALLSOURCE_NODE_ID")
212            .ok()
213            .and_then(|v| v.parse().ok())
214            .unwrap_or(0);
215
216        let hlc = Arc::new(HybridLogicalClock::with_max_drift(
217            node_id,
218            config.max_clock_drift_ms,
219        ));
220
221        let peer_health = DashMap::new();
222        let outbound_buffer = DashMap::new();
223        for peer in &config.peers {
224            peer_health.insert(peer.region_id.clone(), PeerHealth::Healthy);
225            outbound_buffer.insert(peer.region_id.clone(), Vec::new());
226        }
227
228        Self {
229            config,
230            hlc,
231            resolver: Arc::new(CrdtResolver::new()),
232            peer_health,
233            outbound_buffer,
234            events_sent: std::sync::atomic::AtomicU64::new(0),
235            events_received: std::sync::atomic::AtomicU64::new(0),
236            conflicts_resolved: std::sync::atomic::AtomicU64::new(0),
237        }
238    }
239
240    /// Get this region's ID.
241    pub fn region_id(&self) -> &str {
242        &self.config.region_id
243    }
244
245    /// Get the HLC instance.
246    pub fn hlc(&self) -> &Arc<HybridLogicalClock> {
247        &self.hlc
248    }
249
250    /// Get the CRDT resolver.
251    pub fn resolver(&self) -> &Arc<CrdtResolver> {
252        &self.resolver
253    }
254
255    /// Stamp a new local event with an HLC timestamp.
256    pub fn stamp_event(&self, event_id: &str, event_data: serde_json::Value) -> ReplicatedEvent {
257        let ts = self.hlc.now();
258        ReplicatedEvent {
259            event_id: event_id.to_string(),
260            hlc_timestamp: ts,
261            origin_region: self.config.region_id.clone(),
262            event_data,
263        }
264    }
265
266    /// Queue an event for replication to all peers.
267    pub fn queue_for_replication(&self, event: &ReplicatedEvent) {
268        for mut buffer in self.outbound_buffer.iter_mut() {
269            buffer.value_mut().push(event.clone());
270        }
271    }
272
273    /// Drain the outbound buffer for a specific peer.
274    pub fn drain_outbound(&self, peer_region: &str, max_batch: usize) -> Vec<ReplicatedEvent> {
275        if let Some(mut buffer) = self.outbound_buffer.get_mut(peer_region) {
276            let drain_count = max_batch.min(buffer.len());
277            buffer.drain(..drain_count).collect()
278        } else {
279            vec![]
280        }
281    }
282
283    /// Receive events from a peer region.
284    ///
285    /// Applies CRDT conflict resolution and returns the sync response.
286    pub fn receive_sync(&self, request: &GeoSyncRequest) -> GeoSyncResponse {
287        let mut accepted = 0;
288        let mut skipped = 0;
289
290        for event in &request.events {
291            // Update HLC from remote timestamp
292            if let Err(e) = self.hlc.receive(&event.hlc_timestamp) {
293                tracing::warn!(
294                    "HLC drift violation from region {}: {}",
295                    request.source_region,
296                    e,
297                );
298                skipped += 1;
299                self.conflicts_resolved
300                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
301                continue;
302            }
303
304            match self.resolver.resolve_and_accept(event) {
305                ConflictResolution::Accept => {
306                    accepted += 1;
307                    self.events_received
308                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
309                }
310                ConflictResolution::Skip => {
311                    skipped += 1;
312                    self.conflicts_resolved
313                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
314                }
315            }
316        }
317
318        // Merge remote version vector
319        self.resolver
320            .merge_version_vector(&request.source_region, &request.version_vector);
321
322        // Build our version vector for the response
323        let our_vv = self
324            .resolver
325            .version_vector_for(&self.config.region_id)
326            .unwrap_or_default();
327
328        GeoSyncResponse {
329            accepted,
330            skipped,
331            version_vector: our_vv,
332        }
333    }
334
335    /// Update a peer's health status.
336    pub fn set_peer_health(&self, peer_region: &str, health: PeerHealth) {
337        self.peer_health.insert(peer_region.to_string(), health);
338    }
339
340    /// Get a peer's health status.
341    pub fn peer_health(&self, peer_region: &str) -> PeerHealth {
342        self.peer_health
343            .get(peer_region)
344            .map_or(PeerHealth::Unreachable, |h| *h)
345    }
346
347    /// Get all peers.
348    pub fn peers(&self) -> &[PeerRegion] {
349        &self.config.peers
350    }
351
352    /// Get sync interval.
353    pub fn sync_interval(&self) -> Duration {
354        Duration::from_millis(self.config.sync_interval_ms)
355    }
356
357    /// Get batch size.
358    pub fn batch_size(&self) -> usize {
359        self.config.batch_size
360    }
361
362    /// Build a sync request for a specific peer.
363    pub fn build_sync_request(&self, peer_region: &str) -> Option<GeoSyncRequest> {
364        let events = self.drain_outbound(peer_region, self.config.batch_size);
365        if events.is_empty() {
366            return None;
367        }
368
369        self.events_sent
370            .fetch_add(events.len() as u64, std::sync::atomic::Ordering::Relaxed);
371
372        let vv = self
373            .resolver
374            .version_vector_for(&self.config.region_id)
375            .unwrap_or_default();
376
377        Some(GeoSyncRequest {
378            source_region: self.config.region_id.clone(),
379            events,
380            version_vector: vv,
381        })
382    }
383
384    /// Select the best failover region (lowest replication lag, healthy).
385    pub fn select_failover_region(&self) -> Option<String> {
386        self.config
387            .peers
388            .iter()
389            .filter(|p| {
390                self.peer_health
391                    .get(&p.region_id)
392                    .is_some_and(|h| *h == PeerHealth::Healthy)
393            })
394            .max_by_key(|p| p.last_sync_ms) // most recently synced = least data loss
395            .map(|p| p.region_id.clone())
396    }
397
398    /// Get full replication status.
399    pub fn status(&self) -> GeoReplicationStatus {
400        let peers: Vec<PeerStatus> = self
401            .config
402            .peers
403            .iter()
404            .map(|p| {
405                let health = self
406                    .peer_health
407                    .get(&p.region_id)
408                    .map_or(PeerHealth::Unreachable, |h| *h);
409
410                let lag_ms = if p.last_sync_ms > 0 {
411                    let now_ms = std::time::SystemTime::now()
412                        .duration_since(std::time::UNIX_EPOCH)
413                        .unwrap_or_default()
414                        .as_millis() as u64;
415                    now_ms.saturating_sub(p.last_sync_ms)
416                } else {
417                    0
418                };
419
420                PeerStatus {
421                    region_id: p.region_id.clone(),
422                    api_url: p.api_url.clone(),
423                    health,
424                    last_sync_ms: p.last_sync_ms,
425                    replication_lag_ms: lag_ms,
426                }
427            })
428            .collect();
429
430        GeoReplicationStatus {
431            region_id: self.config.region_id.clone(),
432            peers,
433            events_sent: self.events_sent.load(std::sync::atomic::Ordering::Relaxed),
434            events_received: self
435                .events_received
436                .load(std::sync::atomic::Ordering::Relaxed),
437            conflicts_resolved: self
438                .conflicts_resolved
439                .load(std::sync::atomic::Ordering::Relaxed),
440            current_hlc: self.hlc.current(),
441            version_vectors: self.resolver.all_version_vectors(),
442        }
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449
450    fn test_config(region: &str) -> GeoReplicationConfig {
451        GeoReplicationConfig {
452            region_id: region.to_string(),
453            peers: vec![PeerRegion {
454                region_id: "eu-west".to_string(),
455                api_url: "https://eu.core:3900".to_string(),
456                healthy: true,
457                last_sync_ms: 0,
458            }],
459            sync_interval_ms: 1000,
460            max_clock_drift_ms: 60_000,
461            batch_size: 100,
462        }
463    }
464
465    #[test]
466    fn test_stamp_event() {
467        let mgr = GeoReplicationManager::new(test_config("us-east"));
468        let event = mgr.stamp_event("evt-1", serde_json::json!({"foo": "bar"}));
469
470        assert_eq!(event.event_id, "evt-1");
471        assert_eq!(event.origin_region, "us-east");
472        assert!(event.hlc_timestamp.physical_ms > 0);
473    }
474
475    #[test]
476    fn test_queue_and_drain() {
477        let mgr = GeoReplicationManager::new(test_config("us-east"));
478        let event = mgr.stamp_event("evt-1", serde_json::json!({}));
479
480        mgr.queue_for_replication(&event);
481
482        let batch = mgr.drain_outbound("eu-west", 10);
483        assert_eq!(batch.len(), 1);
484        assert_eq!(batch[0].event_id, "evt-1");
485
486        // Drained — should be empty now
487        let batch2 = mgr.drain_outbound("eu-west", 10);
488        assert!(batch2.is_empty());
489    }
490
491    #[test]
492    fn test_receive_sync_accepts_new_events() {
493        let mgr = GeoReplicationManager::new(test_config("us-east"));
494
495        let request = GeoSyncRequest {
496            source_region: "eu-west".to_string(),
497            events: vec![ReplicatedEvent {
498                event_id: "evt-remote-1".to_string(),
499                hlc_timestamp: HlcTimestamp::new(
500                    std::time::SystemTime::now()
501                        .duration_since(std::time::UNIX_EPOCH)
502                        .unwrap()
503                        .as_millis() as u64,
504                    0,
505                    2,
506                ),
507                origin_region: "eu-west".to_string(),
508                event_data: serde_json::json!({"source": "eu"}),
509            }],
510            version_vector: VersionVector::new(),
511        };
512
513        let response = mgr.receive_sync(&request);
514        assert_eq!(response.accepted, 1);
515        assert_eq!(response.skipped, 0);
516    }
517
518    #[test]
519    fn test_receive_sync_skips_duplicates() {
520        let mgr = GeoReplicationManager::new(test_config("us-east"));
521        let now_ms = std::time::SystemTime::now()
522            .duration_since(std::time::UNIX_EPOCH)
523            .unwrap()
524            .as_millis() as u64;
525
526        let event = ReplicatedEvent {
527            event_id: "evt-dup".to_string(),
528            hlc_timestamp: HlcTimestamp::new(now_ms, 0, 2),
529            origin_region: "eu-west".to_string(),
530            event_data: serde_json::json!({}),
531        };
532
533        let request = GeoSyncRequest {
534            source_region: "eu-west".to_string(),
535            events: vec![event.clone(), event],
536            version_vector: VersionVector::new(),
537        };
538
539        let response = mgr.receive_sync(&request);
540        assert_eq!(response.accepted, 1);
541        assert_eq!(response.skipped, 1);
542    }
543
544    #[test]
545    fn test_build_sync_request() {
546        let mgr = GeoReplicationManager::new(test_config("us-east"));
547        let event = mgr.stamp_event("evt-1", serde_json::json!({}));
548        mgr.queue_for_replication(&event);
549
550        let req = mgr.build_sync_request("eu-west");
551        assert!(req.is_some());
552        let req = req.unwrap();
553        assert_eq!(req.source_region, "us-east");
554        assert_eq!(req.events.len(), 1);
555    }
556
557    #[test]
558    fn test_build_sync_request_empty() {
559        let mgr = GeoReplicationManager::new(test_config("us-east"));
560        let req = mgr.build_sync_request("eu-west");
561        assert!(req.is_none());
562    }
563
564    #[test]
565    fn test_peer_health_tracking() {
566        let mgr = GeoReplicationManager::new(test_config("us-east"));
567        assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Healthy);
568
569        mgr.set_peer_health("eu-west", PeerHealth::Degraded);
570        assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Degraded);
571
572        mgr.set_peer_health("eu-west", PeerHealth::Unreachable);
573        assert_eq!(mgr.peer_health("eu-west"), PeerHealth::Unreachable);
574    }
575
576    #[test]
577    fn test_select_failover_region() {
578        let config = GeoReplicationConfig {
579            region_id: "us-east".to_string(),
580            peers: vec![
581                PeerRegion {
582                    region_id: "eu-west".to_string(),
583                    api_url: "https://eu.core:3900".to_string(),
584                    healthy: true,
585                    last_sync_ms: 100,
586                },
587                PeerRegion {
588                    region_id: "ap-east".to_string(),
589                    api_url: "https://ap.core:3900".to_string(),
590                    healthy: true,
591                    last_sync_ms: 200,
592                },
593            ],
594            ..Default::default()
595        };
596        let mgr = GeoReplicationManager::new(config);
597
598        // ap-east synced more recently → preferred failover
599        let failover = mgr.select_failover_region();
600        assert_eq!(failover, Some("ap-east".to_string()));
601    }
602
603    #[test]
604    fn test_select_failover_skips_unhealthy() {
605        let config = GeoReplicationConfig {
606            region_id: "us-east".to_string(),
607            peers: vec![
608                PeerRegion {
609                    region_id: "eu-west".to_string(),
610                    api_url: "https://eu.core:3900".to_string(),
611                    healthy: true,
612                    last_sync_ms: 200,
613                },
614                PeerRegion {
615                    region_id: "ap-east".to_string(),
616                    api_url: "https://ap.core:3900".to_string(),
617                    healthy: true,
618                    last_sync_ms: 300,
619                },
620            ],
621            ..Default::default()
622        };
623        let mgr = GeoReplicationManager::new(config);
624        mgr.set_peer_health("ap-east", PeerHealth::Unreachable);
625
626        let failover = mgr.select_failover_region();
627        assert_eq!(failover, Some("eu-west".to_string()));
628    }
629
630    #[test]
631    fn test_status() {
632        let mgr = GeoReplicationManager::new(test_config("us-east"));
633        let status = mgr.status();
634
635        assert_eq!(status.region_id, "us-east");
636        assert_eq!(status.peers.len(), 1);
637        assert_eq!(status.events_sent, 0);
638        assert_eq!(status.events_received, 0);
639        assert_eq!(status.conflicts_resolved, 0);
640    }
641
642    #[test]
643    fn test_two_region_convergence() {
644        // Simulate two regions exchanging events
645        let us = GeoReplicationManager::new(GeoReplicationConfig {
646            region_id: "us-east".to_string(),
647            peers: vec![PeerRegion {
648                region_id: "eu-west".to_string(),
649                api_url: "http://eu:3900".to_string(),
650                healthy: true,
651                last_sync_ms: 0,
652            }],
653            ..Default::default()
654        });
655        let eu = GeoReplicationManager::new(GeoReplicationConfig {
656            region_id: "eu-west".to_string(),
657            peers: vec![PeerRegion {
658                region_id: "us-east".to_string(),
659                api_url: "http://us:3900".to_string(),
660                healthy: true,
661                last_sync_ms: 0,
662            }],
663            ..Default::default()
664        });
665
666        // US writes event 1
667        let evt1 = us.stamp_event("evt-1", serde_json::json!({"from": "us"}));
668        us.resolver.resolve_and_accept(&evt1);
669        us.queue_for_replication(&evt1);
670
671        // EU writes event 2
672        let evt2 = eu.stamp_event("evt-2", serde_json::json!({"from": "eu"}));
673        eu.resolver.resolve_and_accept(&evt2);
674        eu.queue_for_replication(&evt2);
675
676        // US → EU sync
677        let us_req = us.build_sync_request("eu-west").unwrap();
678        let eu_resp = eu.receive_sync(&us_req);
679        assert_eq!(eu_resp.accepted, 1);
680
681        // EU → US sync
682        let eu_req = eu.build_sync_request("us-east").unwrap();
683        let us_resp = us.receive_sync(&eu_req);
684        assert_eq!(us_resp.accepted, 1);
685
686        // Both regions now have both events
687        assert_eq!(us.resolver.seen_count(), 2);
688        assert_eq!(eu.resolver.seen_count(), 2);
689    }
690}