#[cfg(feature = "embedded-sync")]
mod tests {
use allsource_core::{
cluster::{
ConflictResolution, CrdtResolver, HlcTimestamp, HybridLogicalClock, ReplicatedEvent,
VersionVector,
},
embedded::{Config, EmbeddedCore, IngestEvent, Query},
};
use serde_json::json;
#[test]
fn hlc_provides_total_order_across_nodes() {
let clock_a = HybridLogicalClock::new(1);
let clock_b = HybridLogicalClock::new(2);
let t1 = clock_a.now();
let t2 = clock_b.now();
assert_ne!(t1, t2);
assert!(t1 < t2 || t2 < t1);
}
#[test]
fn hlc_respects_causality_after_receive() {
let clock_a = HybridLogicalClock::new(1);
let clock_b = HybridLogicalClock::new(2);
let t1 = clock_a.now();
let _ = clock_b.receive(&t1).unwrap();
let t2 = clock_b.now();
assert!(t2 > t1);
}
#[test]
fn hlc_logical_counter_breaks_same_ms_ties() {
let clock = HybridLogicalClock::new(1);
let t1 = clock.now();
let t2 = clock.now();
assert!(t2 > t1);
}
#[test]
fn hlc_rejects_excessive_clock_drift() {
let clock = HybridLogicalClock::with_max_drift(1, 1000);
let future_ts = HlcTimestamp::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
+ 60_000, 0,
99,
);
let result = clock.receive(&future_ts);
assert!(result.is_err()); }
#[test]
fn version_vector_tracks_per_node_progress() {
let mut vv = VersionVector::new();
let ts1 = HlcTimestamp::new(1000, 0, 1);
let ts2 = HlcTimestamp::new(2000, 0, 1);
vv.advance("node-1", ts1);
assert_eq!(vv.get("node-1").unwrap().physical_ms, 1000);
vv.advance("node-1", ts2);
assert_eq!(vv.get("node-1").unwrap().physical_ms, 2000);
}
#[test]
fn version_vector_detects_unseen_events() {
let mut vv = VersionVector::new();
let ts_old = HlcTimestamp::new(1000, 0, 1);
let ts_new = HlcTimestamp::new(2000, 0, 1);
vv.advance("node-1", ts_old);
assert!(vv.is_new("node-1", &ts_new));
assert!(!vv.is_new("node-1", &ts_old));
}
#[test]
fn version_vector_merge_is_commutative() {
let mut a = VersionVector::new();
let mut b = VersionVector::new();
a.advance("node-1", HlcTimestamp::new(5000, 0, 1));
b.advance("node-2", HlcTimestamp::new(3000, 0, 2));
let mut ab = a.clone();
ab.merge(&b);
let mut ba = b.clone();
ba.merge(&a);
assert_eq!(ab.get("node-1"), ba.get("node-1"));
assert_eq!(ab.get("node-2"), ba.get("node-2"));
}
#[test]
fn version_vector_merge_takes_max() {
let mut a = VersionVector::new();
let mut b = VersionVector::new();
a.advance("node-1", HlcTimestamp::new(5000, 0, 1));
a.advance("node-2", HlcTimestamp::new(1000, 0, 2));
b.advance("node-1", HlcTimestamp::new(3000, 0, 1));
b.advance("node-2", HlcTimestamp::new(8000, 0, 2));
a.merge(&b);
assert_eq!(a.get("node-1").unwrap().physical_ms, 5000);
assert_eq!(a.get("node-2").unwrap().physical_ms, 8000);
}
#[test]
fn crdt_resolver_accepts_new_event() {
let resolver = CrdtResolver::new();
let event = ReplicatedEvent {
event_id: "evt-1".to_string(),
hlc_timestamp: HlcTimestamp::new(1000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"key": "value"}),
};
let result = resolver.resolve(&event);
assert_eq!(result, ConflictResolution::Accept);
}
#[test]
fn crdt_resolver_skips_duplicate_event() {
let resolver = CrdtResolver::new();
let event = ReplicatedEvent {
event_id: "evt-1".to_string(),
hlc_timestamp: HlcTimestamp::new(1000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"key": "value"}),
};
let _ = resolver.resolve_and_accept(&event);
let result = resolver.resolve(&event);
assert_eq!(result, ConflictResolution::Skip);
}
#[tokio::test]
async fn two_instances_sync_after_independent_writes() {
let core_a = open_core_with_node(1).await;
let core_b = open_core_with_node(2).await;
core_a
.ingest(IngestEvent {
entity_id: "doc-1",
event_type: "doc.edited",
payload: json!({"text": "hello from A"}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
core_b
.ingest(IngestEvent {
entity_id: "doc-2",
event_type: "doc.edited",
payload: json!({"text": "hello from B"}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
sync_pair(&core_a, &core_b).await.unwrap();
let a_events = core_a
.query(Query::new().event_type_prefix("doc."))
.await
.unwrap();
let b_events = core_b
.query(Query::new().event_type_prefix("doc."))
.await
.unwrap();
assert_eq!(a_events.len(), 2);
assert_eq!(b_events.len(), 2);
}
#[tokio::test]
async fn sync_is_idempotent() {
let core_a = open_core_with_node(1).await;
let core_b = open_core_with_node(2).await;
core_a
.ingest(IngestEvent {
entity_id: "e1",
event_type: "test.created",
payload: json!({}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
sync_pair(&core_a, &core_b).await.unwrap();
sync_pair(&core_a, &core_b).await.unwrap();
sync_pair(&core_a, &core_b).await.unwrap();
assert_eq!(core_a.stats().total_events, 1);
assert_eq!(core_b.stats().total_events, 1);
}
#[tokio::test]
async fn sync_conflict_last_write_wins() {
let core_a = open_core_with_node(1).await;
let core_b = open_core_with_node(2).await;
core_a
.ingest(IngestEvent {
entity_id: "config-1",
event_type: "config.updated",
payload: json!({"theme": "dark"}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
core_b
.ingest(IngestEvent {
entity_id: "config-1",
event_type: "config.updated",
payload: json!({"theme": "light"}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
sync_pair(&core_a, &core_b).await.unwrap();
let a_state = core_a.projection("entity_snapshots", "config-1");
let b_state = core_b.projection("entity_snapshots", "config-1");
assert_eq!(a_state, b_state);
assert_eq!(a_state.unwrap()["theme"], "light");
}
#[tokio::test]
async fn offline_queue_drains_on_reconnect() {
let core_local = open_core_with_node(1).await;
let core_cloud = open_core_with_node(2).await;
for i in 0..10 {
core_local
.ingest(IngestEvent {
entity_id: &format!("item-{i}"),
event_type: "item.created",
payload: json!({"i": i}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
}
assert_eq!(core_local.stats().total_events, 10);
assert_eq!(core_cloud.stats().total_events, 0);
sync_pair(&core_local, &core_cloud).await.unwrap();
assert_eq!(core_cloud.stats().total_events, 10);
}
#[tokio::test]
async fn sync_preserves_event_ordering() {
let core_a = open_core_with_node(1).await;
let core_b = open_core_with_node(2).await;
for i in 0..5 {
core_a
.ingest(IngestEvent {
entity_id: "seq-entity",
event_type: "step.completed",
payload: json!({"step": i}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
}
sync_pair(&core_a, &core_b).await.unwrap();
let b_events = core_b
.query(Query::new().entity_id("seq-entity"))
.await
.unwrap();
assert_eq!(b_events.len(), 5);
for i in 1..b_events.len() {
assert!(b_events[i].timestamp >= b_events[i - 1].timestamp);
}
}
#[test]
fn test_append_only_strategy_accepts_all() {
use allsource_core::cluster::MergeStrategy;
let resolver =
CrdtResolver::with_strategies(vec![("test.".to_string(), MergeStrategy::AppendOnly)]);
let event1 = ReplicatedEvent {
event_id: "evt-1".to_string(),
hlc_timestamp: HlcTimestamp::new(1000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"event_type": "test.created", "entity_id": "e-1"}),
};
let event2 = ReplicatedEvent {
event_id: "evt-2".to_string(),
hlc_timestamp: HlcTimestamp::new(2000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"event_type": "test.created", "entity_id": "e-1"}),
};
assert_eq!(
resolver.resolve_and_accept(&event1),
ConflictResolution::Accept
);
assert_eq!(
resolver.resolve_and_accept(&event2),
ConflictResolution::Accept
);
}
#[test]
fn test_lww_strategy_keeps_latest() {
use allsource_core::cluster::MergeStrategy;
let resolver = CrdtResolver::with_strategies(vec![(
"config.".to_string(),
MergeStrategy::LastWriteWins,
)]);
let newer = ReplicatedEvent {
event_id: "evt-2".to_string(),
hlc_timestamp: HlcTimestamp::new(2000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"event_type": "config.updated", "entity_id": "cfg-1"}),
};
let older = ReplicatedEvent {
event_id: "evt-1".to_string(),
hlc_timestamp: HlcTimestamp::new(1000, 0, 2),
origin_region: "node-2".to_string(),
event_data: json!({"event_type": "config.updated", "entity_id": "cfg-1"}),
};
assert_eq!(
resolver.resolve_and_accept(&newer),
ConflictResolution::Accept
);
assert_eq!(resolver.resolve(&older), ConflictResolution::Skip);
}
#[test]
fn test_fww_strategy_keeps_first() {
use allsource_core::cluster::MergeStrategy;
let resolver = CrdtResolver::with_strategies(vec![(
"init.".to_string(),
MergeStrategy::FirstWriteWins,
)]);
let first = ReplicatedEvent {
event_id: "evt-1".to_string(),
hlc_timestamp: HlcTimestamp::new(1000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"event_type": "init.setup", "entity_id": "app-1"}),
};
let second = ReplicatedEvent {
event_id: "evt-2".to_string(),
hlc_timestamp: HlcTimestamp::new(2000, 0, 2),
origin_region: "node-2".to_string(),
event_data: json!({"event_type": "init.setup", "entity_id": "app-1"}),
};
assert_eq!(
resolver.resolve_and_accept(&first),
ConflictResolution::Accept
);
assert_eq!(resolver.resolve(&second), ConflictResolution::Skip);
}
#[test]
fn test_strategy_prefix_matching() {
use allsource_core::cluster::MergeStrategy;
let resolver = CrdtResolver::with_strategies(vec![(
"config.".to_string(),
MergeStrategy::LastWriteWins,
)]);
let event = ReplicatedEvent {
event_id: "evt-1".to_string(),
hlc_timestamp: HlcTimestamp::new(2000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"event_type": "config.updated", "entity_id": "cfg-1"}),
};
resolver.resolve_and_accept(&event);
let older = ReplicatedEvent {
event_id: "evt-2".to_string(),
hlc_timestamp: HlcTimestamp::new(1000, 0, 2),
origin_region: "node-2".to_string(),
event_data: json!({"event_type": "config.updated", "entity_id": "cfg-1"}),
};
assert_eq!(resolver.resolve(&older), ConflictResolution::Skip);
let deleted = ReplicatedEvent {
event_id: "evt-3".to_string(),
hlc_timestamp: HlcTimestamp::new(3000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"event_type": "config.deleted", "entity_id": "cfg-1"}),
};
assert_eq!(resolver.resolve(&deleted), ConflictResolution::Accept);
}
#[test]
fn test_default_fallback_when_no_match() {
use allsource_core::cluster::MergeStrategy;
let resolver = CrdtResolver::with_strategies(vec![(
"config.".to_string(),
MergeStrategy::FirstWriteWins,
)]);
let order1 = ReplicatedEvent {
event_id: "evt-1".to_string(),
hlc_timestamp: HlcTimestamp::new(1000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"event_type": "order.placed", "entity_id": "o-1"}),
};
let order2 = ReplicatedEvent {
event_id: "evt-2".to_string(),
hlc_timestamp: HlcTimestamp::new(2000, 0, 1),
origin_region: "node-1".to_string(),
event_data: json!({"event_type": "order.placed", "entity_id": "o-1"}),
};
assert_eq!(
resolver.resolve_and_accept(&order1),
ConflictResolution::Accept
);
assert_eq!(
resolver.resolve_and_accept(&order2),
ConflictResolution::Accept
);
}
async fn open_core_with_node(node_id: u32) -> EmbeddedCore {
EmbeddedCore::open(Config::builder().node_id(node_id).build().unwrap())
.await
.unwrap()
}
async fn sync_pair(a: &EmbeddedCore, b: &EmbeddedCore) -> allsource_core::error::Result<()> {
a.sync_to(b).await?;
b.sync_to(a).await?;
Ok(())
}
}