use core::num::NonZeroU128;
use std::collections::BTreeSet;
use calimero_primitives::identity::PublicKey;
use calimero_storage::action::Action;
use calimero_storage::address::Id;
use calimero_storage::entities::{ChildInfo, Metadata};
use calimero_storage::index::Index;
use calimero_storage::interface::{
disable_nonce_check_for_testing, ApplyContext, Interface, StorageError,
};
use calimero_storage::logical_clock::{HybridTimestamp, Timestamp, ID, NTP64};
use calimero_storage::rotation_log;
use calimero_storage::store::{MockedStorage, StorageAdaptor};
use calimero_storage::tests::common::{build_signed_shared_action, pubkey_of};
use ed25519_dalek::SigningKey;
use crate::sync::rotation_log_reader;
use crate::sync::test_helpers::Dag;
struct Delta {
id: [u8; 32],
parents: Vec<[u8; 32]>,
hlc_ns: u64,
action: Action,
}
fn hlc(ns: u64) -> HybridTimestamp {
let node_id = ID::from(NonZeroU128::new(1).unwrap());
HybridTimestamp::new(Timestamp::new(NTP64(ns), node_id))
}
fn deliver<S: StorageAdaptor>(delta: &Delta, dag: &Dag) -> Result<(), StorageError> {
let entity_id = delta.action.id();
let effective_writers: Option<
std::collections::BTreeMap<PublicKey, calimero_storage::entities::OpMask>,
> = match rotation_log::load::<S>(entity_id)? {
Some(log) => {
rotation_log_reader::writers_at(&log, &delta.parents, |a, b| dag.happens_before(a, b))
}
None => None,
};
let ctx = ApplyContext {
effective_writers,
delta_id: Some(delta.id),
delta_hlc: Some(hlc(delta.hlc_ns)),
};
Interface::<S>::apply_action(delta.action.clone(), &ctx)
}
fn make_signing_key(seed: u8) -> SigningKey {
SigningKey::from_bytes(&[seed; 32])
}
fn setup_root<S: StorageAdaptor>() -> ChildInfo {
let root_id = Id::root();
let root_meta = Metadata::default();
Index::<S>::add_root(ChildInfo::new(root_id, [0; 32], root_meta.clone())).unwrap();
let (full_hash, _) = Index::<S>::get_hashes_for(root_id).unwrap().unwrap();
ChildInfo::new(root_id, full_hash, root_meta)
}
fn one_sec(n: u64) -> u64 {
n.saturating_mul(1_000_000_000)
}
fn writers_at_frontier<S: StorageAdaptor, F>(
id: Id,
frontier: &[[u8; 32]],
happens_before: F,
) -> Option<std::collections::BTreeMap<PublicKey, calimero_storage::entities::OpMask>>
where
F: Fn(&[u8; 32], &[u8; 32]) -> bool,
{
let log = rotation_log::load::<S>(id).unwrap()?;
rotation_log_reader::writers_at(&log, frontier, happens_before)
}
#[test]
fn update_vs_rotation_race_pre_rotation_write_accepted() {
let _nonce_off = disable_nonce_check_for_testing();
type Carol = MockedStorage<5500>;
let root = setup_root::<Carol>();
let alice_sk = make_signing_key(0xA1);
let bob_sk = make_signing_key(0xB1);
let alice = pubkey_of(&alice_sk);
let bob = pubkey_of(&bob_sk);
let id = Id::new([0x50; 32]);
let mut dag = Dag::new();
let d_root_id = [0xD0; 32];
let d_root = Delta {
id: d_root_id,
parents: vec![],
hlc_ns: one_sec(10),
action: build_signed_shared_action(
true,
id,
b"hello".to_vec(),
[alice, bob].into_iter().collect(),
one_sec(10),
&alice_sk,
vec![root.clone()],
),
};
dag.record(d_root.id, d_root.parents.clone());
deliver::<Carol>(&d_root, &dag).expect("bootstrap delivered to Carol");
let d1_id = [0xD1; 32];
let d1 = Delta {
id: d1_id,
parents: vec![d_root_id],
hlc_ns: one_sec(20),
action: build_signed_shared_action(
false,
id,
b"hello".to_vec(),
[alice].into_iter().collect(), one_sec(20),
&alice_sk,
vec![],
),
};
dag.record(d1.id, d1.parents.clone());
let d2_id = [0xD2; 32];
let d2 = Delta {
id: d2_id,
parents: vec![d_root_id],
hlc_ns: one_sec(21),
action: build_signed_shared_action(
false,
id,
b"world".to_vec(),
[alice, bob].into_iter().collect(), one_sec(21),
&bob_sk,
vec![],
),
};
dag.record(d2.id, d2.parents.clone());
deliver::<Carol>(&d1, &dag).expect("rotation delivered");
deliver::<Carol>(&d2, &dag).expect(
"Bob's pre-rotation write must be accepted — writers_at(D2.parents=[D_root]) \
includes Bob, even though stored writers post-D1 is {Alice}",
);
let log = rotation_log::load::<Carol>(id).unwrap().unwrap();
assert_eq!(log.entries.len(), 2, "log has D_root and D1");
assert_eq!(log.entries[0].delta_id, d_root_id);
assert_eq!(log.entries[1].delta_id, d1_id);
}
#[test]
fn self_removal_mid_flight_pre_accepted_post_rejected() {
let _nonce_off = disable_nonce_check_for_testing();
type Carol = MockedStorage<5510>;
let root = setup_root::<Carol>();
let alice_sk = make_signing_key(0xA2);
let bob_sk = make_signing_key(0xB2);
let alice = pubkey_of(&alice_sk);
let bob = pubkey_of(&bob_sk);
let id = Id::new([0x51; 32]);
let mut dag = Dag::new();
let d_root_id = [0xE0; 32];
let d_root = Delta {
id: d_root_id,
parents: vec![],
hlc_ns: one_sec(10),
action: build_signed_shared_action(
true,
id,
b"v0".to_vec(),
[alice, bob].into_iter().collect(),
one_sec(10),
&alice_sk,
vec![root.clone()],
),
};
dag.record(d_root.id, d_root.parents.clone());
deliver::<Carol>(&d_root, &dag).unwrap();
let d2_id = [0xE2; 32];
let d2 = Delta {
id: d2_id,
parents: vec![d_root_id],
hlc_ns: one_sec(15),
action: build_signed_shared_action(
false,
id,
b"alice-pre".to_vec(),
[alice, bob].into_iter().collect(),
one_sec(15),
&alice_sk,
vec![],
),
};
dag.record(d2.id, d2.parents.clone());
let d1_id = [0xE1; 32];
let d1 = Delta {
id: d1_id,
parents: vec![d2_id],
hlc_ns: one_sec(20),
action: build_signed_shared_action(
false,
id,
b"alice-pre".to_vec(),
[bob].into_iter().collect(), one_sec(20),
&alice_sk,
vec![],
),
};
dag.record(d1.id, d1.parents.clone());
let d3_id = [0xE3; 32];
let d3 = Delta {
id: d3_id,
parents: vec![d1_id],
hlc_ns: one_sec(25),
action: build_signed_shared_action(
false,
id,
b"alice-post".to_vec(),
[bob].into_iter().collect(),
one_sec(25),
&alice_sk,
vec![],
),
};
dag.record(d3.id, d3.parents.clone());
deliver::<Carol>(&d1, &dag).expect("rotation accepted");
deliver::<Carol>(&d2, &dag).expect(
"Alice's pre-rotation write accepted — D2 happens-before D1 in DAG, \
writers_at(D2.parents=[D_root]) includes Alice",
);
let post_result = deliver::<Carol>(&d3, &dag);
assert!(
matches!(post_result, Err(StorageError::InvalidSignature)),
"post-rotation write by removed writer must be rejected; got {post_result:?}",
);
}
#[test]
fn concurrent_conflicting_rotations_deterministic_convergence() {
let _nonce_off = disable_nonce_check_for_testing();
type Carol = MockedStorage<5520>;
type Dave = MockedStorage<5521>;
let carol_root = setup_root::<Carol>();
let dave_root = setup_root::<Dave>();
let alice_sk = make_signing_key(0xA3);
let bob_sk = make_signing_key(0xB3);
let alice = pubkey_of(&alice_sk);
let bob = pubkey_of(&bob_sk);
let id = Id::new([0x52; 32]);
let mut dag = Dag::new();
let d_root_id = [0xF0; 32];
let d_root_carol = Delta {
id: d_root_id,
parents: vec![],
hlc_ns: one_sec(10),
action: build_signed_shared_action(
true,
id,
b"v0".to_vec(),
[alice, bob].into_iter().collect(),
one_sec(10),
&alice_sk,
vec![carol_root.clone()],
),
};
let d_root_dave = Delta {
id: d_root_id,
parents: vec![],
hlc_ns: one_sec(10),
action: build_signed_shared_action(
true,
id,
b"v0".to_vec(),
[alice, bob].into_iter().collect(),
one_sec(10),
&alice_sk,
vec![dave_root.clone()],
),
};
dag.record(d_root_carol.id, d_root_carol.parents.clone());
deliver::<Carol>(&d_root_carol, &dag).unwrap();
deliver::<Dave>(&d_root_dave, &dag).unwrap();
let d1_id = [0xF1; 32];
let d1 = Delta {
id: d1_id,
parents: vec![d_root_id],
hlc_ns: one_sec(20),
action: build_signed_shared_action(
false,
id,
b"v0".to_vec(),
[alice].into_iter().collect(),
one_sec(20),
&alice_sk,
vec![],
),
};
dag.record(d1.id, d1.parents.clone());
let d2_id = [0xF2; 32];
let d2 = Delta {
id: d2_id,
parents: vec![d_root_id],
hlc_ns: one_sec(21),
action: build_signed_shared_action(
false,
id,
b"v0".to_vec(),
[bob].into_iter().collect(),
one_sec(21),
&bob_sk,
vec![],
),
};
dag.record(d2.id, d2.parents.clone());
deliver::<Carol>(&d1, &dag).expect("D1 by Alice accepted on Carol");
deliver::<Carol>(&d2, &dag).expect("D2 by Bob accepted on Carol (concurrent with D1)");
deliver::<Dave>(&d2, &dag).expect("D2 by Bob accepted on Dave");
deliver::<Dave>(&d1, &dag).expect("D1 by Alice accepted on Dave (concurrent with D2)");
let carol_log = rotation_log::load::<Carol>(id).unwrap().unwrap();
let dave_log = rotation_log::load::<Dave>(id).unwrap().unwrap();
assert_eq!(carol_log.entries.len(), 3);
assert_eq!(dave_log.entries.len(), 3);
let causal_frontier = [d1_id, d2_id];
let happens_before = |a: &[u8; 32], b: &[u8; 32]| dag.happens_before(a, b);
let carol_writers =
writers_at_frontier::<Carol, _>(id, &causal_frontier, &happens_before).unwrap();
let dave_writers =
writers_at_frontier::<Dave, _>(id, &causal_frontier, &happens_before).unwrap();
assert_eq!(carol_writers, dave_writers, "deterministic convergence");
assert_eq!(
carol_writers,
[bob]
.into_iter()
.map(|k| (k, calimero_storage::entities::OpMask::FULL))
.collect::<std::collections::BTreeMap<_, _>>(),
"D2 (HLC 21) wins over D1 (HLC 20)"
);
}
#[test]
fn long_partition_reconciliation_converges() {
let _nonce_off = disable_nonce_check_for_testing();
type Left = MockedStorage<5530>;
type Right = MockedStorage<5531>;
let left_root = setup_root::<Left>();
let right_root = setup_root::<Right>();
let alice_sk = make_signing_key(0xA4);
let bob_sk = make_signing_key(0xB4);
let carol_sk = make_signing_key(0xC4);
let dave_sk = make_signing_key(0xD4);
let alice = pubkey_of(&alice_sk);
let bob = pubkey_of(&bob_sk);
let carol = pubkey_of(&carol_sk);
let dave = pubkey_of(&dave_sk);
let id = Id::new([0x53; 32]);
let mut dag = Dag::new();
let g0 = [0x10; 32];
let bootstrap_left = Delta {
id: g0,
parents: vec![],
hlc_ns: one_sec(10),
action: build_signed_shared_action(
true,
id,
b"v0".to_vec(),
[alice, bob].into_iter().collect(),
one_sec(10),
&alice_sk,
vec![left_root.clone()],
),
};
let bootstrap_right = Delta {
id: g0,
parents: vec![],
hlc_ns: one_sec(10),
action: build_signed_shared_action(
true,
id,
b"v0".to_vec(),
[alice, bob].into_iter().collect(),
one_sec(10),
&alice_sk,
vec![right_root.clone()],
),
};
dag.record(g0, vec![]);
deliver::<Left>(&bootstrap_left, &dag).unwrap();
deliver::<Right>(&bootstrap_right, &dag).unwrap();
let l1 = [0x11; 32];
let l1_delta = Delta {
id: l1,
parents: vec![g0],
hlc_ns: one_sec(20),
action: build_signed_shared_action(
false,
id,
b"v0".to_vec(),
[alice, carol].into_iter().collect(),
one_sec(20),
&alice_sk,
vec![],
),
};
dag.record(l1, vec![g0]);
deliver::<Left>(&l1_delta, &dag).unwrap();
let l2 = [0x12; 32];
let l2_delta = Delta {
id: l2,
parents: vec![l1],
hlc_ns: one_sec(30),
action: build_signed_shared_action(
false,
id,
b"left".to_vec(),
[alice, carol].into_iter().collect(),
one_sec(30),
&carol_sk,
vec![],
),
};
dag.record(l2, vec![l1]);
deliver::<Left>(&l2_delta, &dag).unwrap();
let r1 = [0x21; 32];
let r1_delta = Delta {
id: r1,
parents: vec![g0],
hlc_ns: one_sec(25),
action: build_signed_shared_action(
false,
id,
b"v0".to_vec(),
[bob, dave].into_iter().collect(),
one_sec(25),
&bob_sk,
vec![],
),
};
dag.record(r1, vec![g0]);
deliver::<Right>(&r1_delta, &dag).unwrap();
let r2 = [0x22; 32];
let r2_delta = Delta {
id: r2,
parents: vec![r1],
hlc_ns: one_sec(35),
action: build_signed_shared_action(
false,
id,
b"right".to_vec(),
[bob, dave].into_iter().collect(),
one_sec(35),
&dave_sk,
vec![],
),
};
dag.record(r2, vec![r1]);
deliver::<Right>(&r2_delta, &dag).unwrap();
deliver::<Left>(&r1_delta, &dag).expect("R1 (Bob's rotation) accepted on Left");
deliver::<Left>(&r2_delta, &dag).expect("R2 (Dave's write) accepted on Left");
deliver::<Right>(&l1_delta, &dag).expect("L1 (Alice's rotation) accepted on Right");
deliver::<Right>(&l2_delta, &dag).expect("L2 (Carol's write) accepted on Right");
let frontier = [l2, r2];
let hb = |a: &[u8; 32], b: &[u8; 32]| dag.happens_before(a, b);
let left_writers = writers_at_frontier::<Left, _>(id, &frontier, &hb).unwrap();
let right_writers = writers_at_frontier::<Right, _>(id, &frontier, &hb).unwrap();
assert_eq!(
left_writers, right_writers,
"both sides converge on the same writer set as-of {{L2, R2}}"
);
assert_eq!(
left_writers,
[bob, dave]
.into_iter()
.map(|k| (k, calimero_storage::entities::OpMask::FULL))
.collect::<std::collections::BTreeMap<_, _>>(),
"R1 (HLC 25) wins HLC tiebreak vs L1 (HLC 20)"
);
}
#[test]
fn writer_set_diverges_when_rotation_reconciled_via_hc_until_log_union() {
let _nonce_off = disable_nonce_check_for_testing();
type Both = MockedStorage<5540>;
type Hc = MockedStorage<5541>;
let both_root = setup_root::<Both>();
let hc_root = setup_root::<Hc>();
let alice_sk = make_signing_key(0xA5);
let bob_sk = make_signing_key(0xB5);
let alice = pubkey_of(&alice_sk);
let bob = pubkey_of(&bob_sk);
let carol = pubkey_of(&make_signing_key(0xC5));
let id = Id::new([0x55; 32]);
let mut dag = Dag::new();
let g0 = [0x50; 32];
let mk_bootstrap = |root: ChildInfo| Delta {
id: g0,
parents: vec![],
hlc_ns: one_sec(10),
action: build_signed_shared_action(
true,
id,
b"v0".to_vec(),
[alice, bob].into_iter().collect(),
one_sec(10),
&alice_sk,
vec![root],
),
};
dag.record(g0, vec![]);
deliver::<Both>(&mk_bootstrap(both_root.clone()), &dag).unwrap();
deliver::<Hc>(&mk_bootstrap(hc_root.clone()), &dag).unwrap();
let d_add_c = [0x51; 32];
let rot_add_c = Delta {
id: d_add_c,
parents: vec![g0],
hlc_ns: one_sec(30),
action: build_signed_shared_action(
false,
id,
b"v0".to_vec(),
[alice, carol].into_iter().collect(),
one_sec(30),
&alice_sk,
vec![],
),
};
dag.record(d_add_c, vec![g0]);
let d_drop_b = [0x52; 32];
let rot_drop_b = Delta {
id: d_drop_b,
parents: vec![g0],
hlc_ns: one_sec(20),
action: build_signed_shared_action(
false,
id,
b"v0".to_vec(),
[alice].into_iter().collect(),
one_sec(20),
&bob_sk,
vec![],
),
};
dag.record(d_drop_b, vec![g0]);
deliver::<Both>(&rot_add_c, &dag).unwrap();
deliver::<Both>(&rot_drop_b, &dag).unwrap();
deliver::<Hc>(&rot_drop_b, &dag).unwrap();
let both_writers =
rotation_log::resolve_local(&rotation_log::load::<Both>(id).unwrap().unwrap())
.expect("Both must resolve a writer set");
let hc_writers_before =
rotation_log::resolve_local(&rotation_log::load::<Hc>(id).unwrap().unwrap())
.expect("Hc must resolve a writer set");
assert_eq!(
both_writers,
[alice, carol]
.into_iter()
.map(|k| (k, calimero_storage::entities::OpMask::FULL))
.collect::<std::collections::BTreeMap<_, _>>(),
"the node that applied both rotations resolves node-1's later-HLC {{A,C}}"
);
assert!(
both_writers.contains_key(&carol) && !hc_writers_before.contains_key(&carol),
"repro of the #2703 divergence: HC node lacks Carol because it never \
received node-1's hash-neutral rotation (HC carries no rotation log); \
both={both_writers:?} hc={hc_writers_before:?}"
);
let both_log = rotation_log::load::<Both>(id).unwrap().unwrap();
let hc_log = rotation_log::load::<Hc>(id).unwrap().unwrap();
let hc_seen: std::collections::BTreeSet<[u8; 32]> =
hc_log.entries.iter().map(|e| e.delta_id).collect();
for entry in &both_log.entries {
if !hc_seen.contains(&entry.delta_id) {
rotation_log::append::<Hc>(id, entry.clone()).unwrap();
}
}
let hc_writers_after =
rotation_log::resolve_local(&rotation_log::load::<Hc>(id).unwrap().unwrap()).unwrap();
assert_eq!(
hc_writers_after, both_writers,
"after unioning the peer's rotation log, the HC node converges on the \
same writer set (Carol is now a writer everywhere)"
);
}