use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use crate::conflict_resolution::VectorClock;
use crate::cross_dc::ConsistencyLevel;
use crate::error::{ClusterError, Result};
use crate::raft::OxirsNodeId;
pub type RegionId = String;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConflictResolutionMode {
LastWriterWins,
VectorClock,
}
impl Default for ConflictResolutionMode {
fn default() -> Self {
ConflictResolutionMode::LastWriterWins
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WriteRoutingDecision {
pub primary_region: RegionId,
pub fanout_regions: Vec<RegionId>,
pub consistency: ConsistencyLevel,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionRoutingRule {
pub name: String,
pub subject_prefix: Option<String>,
pub tenant_id: Option<String>,
pub target_region: RegionId,
pub consistency: ConsistencyLevel,
}
impl RegionRoutingRule {
pub fn default_to(region: impl Into<RegionId>, consistency: ConsistencyLevel) -> Self {
Self {
name: "default".into(),
subject_prefix: None,
tenant_id: None,
target_region: region.into(),
consistency,
}
}
pub fn matches(&self, subject: &str, tenant: Option<&str>) -> bool {
if let Some(prefix) = &self.subject_prefix {
if !subject.starts_with(prefix) {
return false;
}
}
if let Some(want_tenant) = &self.tenant_id {
match tenant {
Some(t) if t == want_tenant => {}
_ => return false,
}
}
true
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActiveActiveGeoConfig {
pub regions: Vec<RegionId>,
pub local_region: RegionId,
pub primary_tier: BTreeMap<String, Vec<RegionId>>,
pub routing_rules: Vec<RegionRoutingRule>,
pub conflict_mode: ConflictResolutionMode,
pub max_lag: Duration,
}
impl ActiveActiveGeoConfig {
pub fn single_region(local: impl Into<RegionId>) -> Self {
let local = local.into();
Self {
regions: vec![local.clone()],
local_region: local.clone(),
primary_tier: BTreeMap::new(),
routing_rules: vec![RegionRoutingRule::default_to(
local,
ConsistencyLevel::LocalAsync,
)],
conflict_mode: ConflictResolutionMode::default(),
max_lag: Duration::from_secs(5),
}
}
pub fn multi_region(local: impl Into<RegionId>, regions: Vec<RegionId>) -> Self {
let local = local.into();
let mut all = regions;
if !all.contains(&local) {
all.insert(0, local.clone());
}
let mut primary_tier = BTreeMap::new();
primary_tier.insert("primary".to_string(), all.clone());
Self {
regions: all,
local_region: local.clone(),
primary_tier,
routing_rules: vec![RegionRoutingRule::default_to(
local,
ConsistencyLevel::LocalAsync,
)],
conflict_mode: ConflictResolutionMode::default(),
max_lag: Duration::from_secs(5),
}
}
pub fn route(&self, subject: &str, tenant: Option<&str>) -> WriteRoutingDecision {
for rule in &self.routing_rules {
if rule.matches(subject, tenant) {
let fanout = self
.regions
.iter()
.filter(|r| **r != rule.target_region)
.cloned()
.collect();
return WriteRoutingDecision {
primary_region: rule.target_region.clone(),
fanout_regions: fanout,
consistency: rule.consistency.clone(),
};
}
}
let fanout = self
.regions
.iter()
.filter(|r| **r != self.local_region)
.cloned()
.collect();
WriteRoutingDecision {
primary_region: self.local_region.clone(),
fanout_regions: fanout,
consistency: ConsistencyLevel::LocalAsync,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionRaftGroup {
pub region_id: RegionId,
pub group_id: u64,
pub members: Vec<OxirsNodeId>,
pub committed_seq: u64,
}
impl RegionRaftGroup {
pub fn new(region_id: impl Into<RegionId>, group_id: u64, members: Vec<OxirsNodeId>) -> Self {
Self {
region_id: region_id.into(),
group_id,
members,
committed_seq: 0,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GeoWriteOutcome {
Committed { region: RegionId, seq: u64 },
RejectedByConflict {
region: RegionId,
winner_region: RegionId,
winner_timestamp_ms: u64,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GeoRecord {
pub region: RegionId,
pub timestamp_ms: u64,
pub clock: VectorClock,
pub value: String,
}
#[derive(Debug)]
pub struct ActiveActiveReplicator {
config: ActiveActiveGeoConfig,
state: Arc<Mutex<ReplicatorState>>,
}
#[derive(Debug)]
struct ReplicatorState {
groups: HashMap<RegionId, RegionRaftGroup>,
records: HashMap<String, GeoRecord>,
region_seq: HashMap<RegionId, u64>,
pending_fanout: HashMap<RegionId, Vec<PendingFanout>>,
}
#[derive(Debug, Clone)]
struct PendingFanout {
key: String,
record: GeoRecord,
seq: u64,
targets: Vec<RegionId>,
}
impl ActiveActiveReplicator {
pub fn new(config: ActiveActiveGeoConfig, groups: Vec<RegionRaftGroup>) -> Result<Self> {
if !config.regions.contains(&config.local_region) {
return Err(ClusterError::Config(format!(
"local_region '{}' is not in the configured regions list",
config.local_region
)));
}
for g in &groups {
if !config.regions.contains(&g.region_id) {
return Err(ClusterError::Config(format!(
"Raft group region '{}' is not in the configured regions list",
g.region_id
)));
}
}
let region_seq = config
.regions
.iter()
.map(|r| (r.clone(), 0u64))
.collect::<HashMap<_, _>>();
let pending_fanout = config
.regions
.iter()
.map(|r| (r.clone(), Vec::new()))
.collect::<HashMap<_, _>>();
let groups_map = groups
.into_iter()
.map(|g| (g.region_id.clone(), g))
.collect();
let state = ReplicatorState {
groups: groups_map,
records: HashMap::new(),
region_seq,
pending_fanout,
};
Ok(Self {
config,
state: Arc::new(Mutex::new(state)),
})
}
pub fn config(&self) -> &ActiveActiveGeoConfig {
&self.config
}
pub fn apply_local_write(
&self,
region: &RegionId,
key: &str,
value: &str,
timestamp_ms: u64,
clock: VectorClock,
) -> Result<GeoWriteOutcome> {
let mut st = self.lock_state()?;
if !st.region_seq.contains_key(region) {
return Err(ClusterError::Config(format!(
"Region '{}' is not part of the active-active deployment",
region
)));
}
let new_record = GeoRecord {
region: region.clone(),
timestamp_ms,
clock,
value: value.to_owned(),
};
match self.resolve(&st, key, &new_record) {
Resolution::Accept => {
let seq = {
let counter = st.region_seq.entry(region.clone()).or_insert(0);
*counter += 1;
*counter
};
if let Some(group) = st.groups.get_mut(region) {
group.committed_seq = group.committed_seq.max(seq);
}
let targets: Vec<RegionId> = self
.config
.regions
.iter()
.filter(|r| r != ®ion)
.cloned()
.collect();
let pending = PendingFanout {
key: key.to_string(),
record: new_record.clone(),
seq,
targets,
};
st.records.insert(key.to_string(), new_record);
if let Some(queue) = st.pending_fanout.get_mut(region) {
queue.push(pending);
}
Ok(GeoWriteOutcome::Committed {
region: region.clone(),
seq,
})
}
Resolution::Reject(winner) => Ok(GeoWriteOutcome::RejectedByConflict {
region: region.clone(),
winner_region: winner.region.clone(),
winner_timestamp_ms: winner.timestamp_ms,
}),
}
}
pub fn apply_remote_write(
&self,
origin_region: &RegionId,
key: &str,
value: &str,
timestamp_ms: u64,
clock: VectorClock,
) -> Result<GeoWriteOutcome> {
let mut st = self.lock_state()?;
if !st.region_seq.contains_key(origin_region) {
return Err(ClusterError::Config(format!(
"Origin region '{}' is not part of the active-active deployment",
origin_region
)));
}
let new_record = GeoRecord {
region: origin_region.clone(),
timestamp_ms,
clock,
value: value.to_owned(),
};
match self.resolve(&st, key, &new_record) {
Resolution::Accept => {
st.records.insert(key.to_string(), new_record);
let counter = st.region_seq.entry(origin_region.clone()).or_insert(0);
*counter += 1;
let seq = *counter;
Ok(GeoWriteOutcome::Committed {
region: origin_region.clone(),
seq,
})
}
Resolution::Reject(winner) => Ok(GeoWriteOutcome::RejectedByConflict {
region: origin_region.clone(),
winner_region: winner.region.clone(),
winner_timestamp_ms: winner.timestamp_ms,
}),
}
}
pub fn drain_fanout(
&self,
source_region: &RegionId,
target_region: &RegionId,
) -> Result<Vec<(String, GeoRecord, u64)>> {
let mut st = self.lock_state()?;
let mut keep: Vec<PendingFanout> = Vec::new();
let mut emit: Vec<(String, GeoRecord, u64)> = Vec::new();
let queue = st.pending_fanout.remove(source_region).unwrap_or_default();
for mut entry in queue {
if let Some(idx) = entry.targets.iter().position(|t| t == target_region) {
emit.push((entry.key.clone(), entry.record.clone(), entry.seq));
entry.targets.remove(idx);
}
if !entry.targets.is_empty() {
keep.push(entry);
}
}
st.pending_fanout.insert(source_region.clone(), keep);
Ok(emit)
}
pub fn get_record(&self, key: &str) -> Result<Option<GeoRecord>> {
let st = self.lock_state()?;
Ok(st.records.get(key).cloned())
}
pub fn record_count(&self) -> Result<usize> {
let st = self.lock_state()?;
Ok(st.records.len())
}
pub fn region_sequences(&self) -> Result<BTreeMap<RegionId, u64>> {
let st = self.lock_state()?;
Ok(st.region_seq.iter().map(|(k, v)| (k.clone(), *v)).collect())
}
pub fn pending_fanout_len(&self, source_region: &RegionId) -> Result<usize> {
let st = self.lock_state()?;
Ok(st
.pending_fanout
.get(source_region)
.map(|v| v.len())
.unwrap_or(0))
}
fn resolve<'r>(
&self,
st: &'r ReplicatorState,
key: &str,
new_record: &GeoRecord,
) -> Resolution<'r> {
let existing = match st.records.get(key) {
Some(r) => r,
None => return Resolution::Accept,
};
match self.config.conflict_mode {
ConflictResolutionMode::LastWriterWins => lww_resolve(existing, new_record),
ConflictResolutionMode::VectorClock => vector_clock_resolve(existing, new_record),
}
}
fn lock_state(&self) -> Result<std::sync::MutexGuard<'_, ReplicatorState>> {
self.state
.lock()
.map_err(|e| ClusterError::Lock(format!("active-active state lock poisoned: {}", e)))
}
}
enum Resolution<'r> {
Accept,
Reject(&'r GeoRecord),
}
fn lww_resolve<'r>(existing: &'r GeoRecord, new_record: &GeoRecord) -> Resolution<'r> {
if new_record.timestamp_ms > existing.timestamp_ms {
Resolution::Accept
} else if new_record.timestamp_ms == existing.timestamp_ms {
if new_record.region > existing.region {
Resolution::Accept
} else {
Resolution::Reject(existing)
}
} else {
Resolution::Reject(existing)
}
}
fn vector_clock_resolve<'r>(existing: &'r GeoRecord, new_record: &GeoRecord) -> Resolution<'r> {
if existing.clock.happens_before(&new_record.clock) {
Resolution::Accept
} else if new_record.clock.happens_before(&existing.clock) {
Resolution::Reject(existing)
} else {
lww_resolve(existing, new_record)
}
}
pub fn current_timestamp_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
fn three_region_config() -> ActiveActiveGeoConfig {
ActiveActiveGeoConfig::multi_region(
"us-east-1",
vec![
"us-east-1".to_string(),
"eu-west-1".to_string(),
"ap-northeast-1".to_string(),
],
)
}
fn replicator() -> ActiveActiveReplicator {
let config = three_region_config();
let groups = config
.regions
.iter()
.enumerate()
.map(|(i, r)| RegionRaftGroup::new(r.clone(), 1_000 + i as u64, vec![1, 2, 3]))
.collect();
ActiveActiveReplicator::new(config, groups).expect("valid config")
}
#[test]
fn route_default_uses_local_region_for_unmatched_writes() {
let cfg = three_region_config();
let decision = cfg.route("http://example.org/s", None);
assert_eq!(decision.primary_region, "us-east-1");
assert!(!decision.fanout_regions.contains(&"us-east-1".to_string()));
assert_eq!(decision.fanout_regions.len(), 2);
}
#[test]
fn route_matches_subject_prefix_rule() {
let mut cfg = three_region_config();
cfg.routing_rules.insert(
0,
RegionRoutingRule {
name: "eu-tenant".into(),
subject_prefix: Some("https://eu.example.org/".into()),
tenant_id: None,
target_region: "eu-west-1".into(),
consistency: ConsistencyLevel::EachQuorum,
},
);
let decision = cfg.route("https://eu.example.org/foo", None);
assert_eq!(decision.primary_region, "eu-west-1");
assert_eq!(decision.consistency, ConsistencyLevel::EachQuorum);
}
#[test]
fn local_write_assigns_monotonic_seq() {
let r = replicator();
let region: RegionId = "us-east-1".into();
for i in 0..5 {
let outcome = r
.apply_local_write(
®ion,
&format!("key-{}", i),
&format!("v{}", i),
current_timestamp_ms(),
VectorClock::new(),
)
.expect("apply");
match outcome {
GeoWriteOutcome::Committed { seq, .. } => assert_eq!(seq, (i + 1) as u64),
_ => panic!("unexpected outcome"),
}
}
assert_eq!(r.record_count().expect("count"), 5);
}
#[test]
fn lww_newer_timestamp_wins() {
let r = replicator();
let us: RegionId = "us-east-1".into();
let eu: RegionId = "eu-west-1".into();
r.apply_local_write(&us, "k", "from-us", 100, VectorClock::new())
.expect("first");
let outcome = r
.apply_remote_write(&eu, "k", "from-eu", 200, VectorClock::new())
.expect("second");
assert!(matches!(outcome, GeoWriteOutcome::Committed { .. }));
let rec = r.get_record("k").expect("get").expect("present");
assert_eq!(rec.value, "from-eu");
assert_eq!(rec.region, "eu-west-1");
}
#[test]
fn lww_older_timestamp_rejected() {
let r = replicator();
let us: RegionId = "us-east-1".into();
let eu: RegionId = "eu-west-1".into();
r.apply_local_write(&us, "k", "from-us", 200, VectorClock::new())
.expect("first");
let outcome = r
.apply_remote_write(&eu, "k", "from-eu", 100, VectorClock::new())
.expect("second");
assert!(matches!(
outcome,
GeoWriteOutcome::RejectedByConflict { .. }
));
let rec = r.get_record("k").expect("get").expect("present");
assert_eq!(rec.value, "from-us");
}
#[test]
fn lww_tie_breaks_on_region_id() {
let r = replicator();
let us: RegionId = "us-east-1".into();
let eu: RegionId = "eu-west-1".into();
r.apply_local_write(&us, "k", "from-us", 100, VectorClock::new())
.expect("first");
let outcome = r
.apply_remote_write(&eu, "k", "from-eu", 100, VectorClock::new())
.expect("second");
assert!(matches!(
outcome,
GeoWriteOutcome::RejectedByConflict { .. }
));
let rec = r.get_record("k").expect("get").expect("present");
assert_eq!(rec.value, "from-us");
}
#[test]
fn vector_clock_mode_accepts_strictly_later() {
let mut cfg = three_region_config();
cfg.conflict_mode = ConflictResolutionMode::VectorClock;
let groups = cfg
.regions
.iter()
.enumerate()
.map(|(i, r)| RegionRaftGroup::new(r.clone(), 5_000 + i as u64, vec![1, 2, 3]))
.collect();
let r = ActiveActiveReplicator::new(cfg, groups).expect("valid");
let mut clk1 = VectorClock::new();
clk1.increment(1);
r.apply_local_write(&"us-east-1".into(), "k", "v1", 100, clk1.clone())
.expect("first");
let mut clk2 = clk1.clone();
clk2.increment(2);
let outcome = r
.apply_remote_write(&"eu-west-1".into(), "k", "v2", 50, clk2)
.expect("second");
assert!(matches!(outcome, GeoWriteOutcome::Committed { .. }));
let rec = r.get_record("k").expect("get").expect("present");
assert_eq!(rec.value, "v2");
}
#[test]
fn fanout_drain_routes_only_to_target() {
let r = replicator();
let us: RegionId = "us-east-1".into();
r.apply_local_write(&us, "k", "v", 100, VectorClock::new())
.expect("apply");
assert_eq!(r.pending_fanout_len(&us).expect("len"), 1);
let drained_eu = r.drain_fanout(&us, &"eu-west-1".into()).expect("drain eu");
assert_eq!(drained_eu.len(), 1);
assert_eq!(drained_eu[0].0, "k");
assert_eq!(r.pending_fanout_len(&us).expect("len"), 1);
let drained_ap = r
.drain_fanout(&us, &"ap-northeast-1".into())
.expect("drain ap");
assert_eq!(drained_ap.len(), 1);
assert_eq!(r.pending_fanout_len(&us).expect("len"), 0);
}
#[test]
fn config_rejects_unknown_local_region() {
let cfg = ActiveActiveGeoConfig {
regions: vec!["us-east-1".into()],
local_region: "eu-west-1".into(),
primary_tier: BTreeMap::new(),
routing_rules: Vec::new(),
conflict_mode: ConflictResolutionMode::default(),
max_lag: Duration::from_secs(5),
};
let res = ActiveActiveReplicator::new(cfg, Vec::new());
assert!(res.is_err());
}
}