use calimero_node::sync::{
HashComparisonConfig, HashComparisonFirstRequest, HashComparisonProtocol, HashComparisonStats,
};
use calimero_node_primitives::sync::{
InitPayload, StreamMessage, SyncProtocolExecutor, SyncTransport,
};
use calimero_primitives::identity::PublicKey;
use eyre::{bail, Result, WrapErr};
use super::node::SimNode;
use super::transport::SimStream;
#[derive(Debug, Default, Clone)]
pub struct SimSyncStats {
pub nodes_compared: u64,
pub entities_transferred: u64,
pub entities_pushed: u64,
pub nodes_skipped: u64,
pub rounds: u64,
}
impl From<HashComparisonStats> for SimSyncStats {
fn from(stats: HashComparisonStats) -> Self {
Self {
nodes_compared: stats.nodes_compared,
entities_transferred: stats.entities_merged,
entities_pushed: stats.entities_pushed,
nodes_skipped: stats.nodes_skipped,
rounds: stats.requests_sent,
}
}
}
pub async fn execute_hash_comparison_sync(
initiator: &mut SimNode,
responder: &SimNode,
) -> Result<SimSyncStats> {
let (mut init_stream, mut resp_stream) = SimStream::pair();
let init_root = initiator.root_hash();
let resp_root = responder.root_hash();
if init_root == resp_root {
return Ok(SimSyncStats::default());
}
let initiator_store = initiator.storage().store();
let responder_store = responder.storage().store();
let initiator_context = initiator.context_id();
let responder_context = responder.context_id();
let identity = PublicKey::from([0u8; 32]);
let config = HashComparisonConfig {
remote_root_hash: resp_root,
};
let initiator_fut = async {
HashComparisonProtocol::run_initiator(
&mut init_stream,
initiator_store,
initiator_context,
identity,
config,
)
.await
};
let responder_fut = async {
let first_msg = resp_stream
.recv()
.await?
.ok_or_else(|| eyre::eyre!("Stream closed before first message"))?;
let first_request = match first_msg {
StreamMessage::Init {
payload:
InitPayload::TreeNodeRequest {
node_id, max_depth, ..
},
..
} => HashComparisonFirstRequest { node_id, max_depth },
_ => bail!("Expected TreeNodeRequest Init message"),
};
HashComparisonProtocol::run_responder(
&mut resp_stream,
responder_store,
responder_context,
identity,
first_request,
)
.await
};
let (init_result, resp_result) = tokio::join!(initiator_fut, responder_fut);
resp_result.wrap_err("responder failed")?;
let stats = init_result.wrap_err("initiator failed")?;
Ok(stats.into())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sync_sim::actions::EntityMetadata;
use crate::sync_sim::types::EntityId;
use calimero_primitives::context::ContextId;
fn shared_context() -> ContextId {
ContextId::from(SimNode::DEFAULT_CONTEXT_ID)
}
#[tokio::test]
async fn test_sync_empty_to_populated() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
bob.insert_entity_with_metadata(
EntityId::from_u64(1),
b"hello".to_vec(),
EntityMetadata::default(),
);
bob.insert_entity_with_metadata(
EntityId::from_u64(2),
b"world".to_vec(),
EntityMetadata::default(),
);
assert_ne!(alice.root_hash(), bob.root_hash());
let stats = execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("sync should succeed");
assert_eq!(
alice.root_hash(),
bob.root_hash(),
"root hashes should match after sync"
);
assert!(
stats.entities_transferred > 0,
"should have transferred entities"
);
assert_eq!(
alice.entity_count(),
bob.entity_count(),
"entity counts should match after sync"
);
}
#[tokio::test]
async fn test_sync_already_in_sync() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let bob = SimNode::new_in_context("bob", ctx);
let stats = execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("sync should succeed");
assert_eq!(stats.rounds, 0, "no rounds needed when already in sync");
}
#[tokio::test]
async fn test_sync_partial_overlap() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let shared_id = EntityId::from_u64(100);
alice.insert_entity_with_metadata(shared_id, b"shared".to_vec(), EntityMetadata::default());
bob.insert_entity_with_metadata(shared_id, b"shared".to_vec(), EntityMetadata::default());
bob.insert_entity_with_metadata(
EntityId::from_u64(200),
b"bob-only".to_vec(),
EntityMetadata::default(),
);
let stats = execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("sync should succeed");
assert!(stats.entities_transferred >= 1);
}
#[tokio::test]
async fn test_three_node_chain_sync() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let mut charlie = SimNode::new_in_context("charlie", ctx);
for i in 1..=3 {
alice.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("alice-{i}").into_bytes(),
EntityMetadata::default(),
);
}
for i in 4..=6 {
bob.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("bob-{i}").into_bytes(),
EntityMetadata::default(),
);
}
for i in 7..=9 {
charlie.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("charlie-{i}").into_bytes(),
EntityMetadata::default(),
);
}
assert_ne!(alice.root_hash(), bob.root_hash());
assert_ne!(bob.root_hash(), charlie.root_hash());
assert_ne!(alice.root_hash(), charlie.root_hash());
println!("Initial state:");
println!(
" Alice: {} entities, hash {:?}",
alice.entity_count(),
&alice.root_hash()[..4]
);
println!(
" Bob: {} entities, hash {:?}",
bob.entity_count(),
&bob.root_hash()[..4]
);
println!(
" Charlie: {} entities, hash {:?}",
charlie.entity_count(),
&charlie.root_hash()[..4]
);
let stats1 = execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("alice <- bob sync should succeed");
println!("\nAfter Alice <- Bob:");
println!(
" Alice: {} entities (transferred {})",
alice.entity_count(),
stats1.entities_transferred
);
let stats2 = execute_hash_comparison_sync(&mut alice, &charlie)
.await
.expect("alice <- charlie sync should succeed");
println!("\nAfter Alice <- Charlie:");
println!(
" Alice: {} entities (transferred {})",
alice.entity_count(),
stats2.entities_transferred
);
assert_eq!(alice.entity_count(), 9, "Alice should have all 9 entities");
let stats3 = execute_hash_comparison_sync(&mut bob, &alice)
.await
.expect("bob <- alice sync should succeed");
println!("\nAfter Bob <- Alice:");
println!(
" Bob: {} entities (transferred {})",
bob.entity_count(),
stats3.entities_transferred
);
let stats4 = execute_hash_comparison_sync(&mut charlie, &alice)
.await
.expect("charlie <- alice sync should succeed");
println!("\nAfter Charlie <- Alice:");
println!(
" Charlie: {} entities (transferred {})",
charlie.entity_count(),
stats4.entities_transferred
);
println!("\nFinal state:");
println!(
" Alice: {} entities, hash {:?}",
alice.entity_count(),
&alice.root_hash()[..4]
);
println!(
" Bob: {} entities, hash {:?}",
bob.entity_count(),
&bob.root_hash()[..4]
);
println!(
" Charlie: {} entities, hash {:?}",
charlie.entity_count(),
&charlie.root_hash()[..4]
);
assert_eq!(
alice.root_hash(),
bob.root_hash(),
"Alice and Bob should converge"
);
assert_eq!(
bob.root_hash(),
charlie.root_hash(),
"Bob and Charlie should converge"
);
assert_eq!(alice.entity_count(), 9);
assert_eq!(bob.entity_count(), 9);
assert_eq!(charlie.entity_count(), 9);
}
#[tokio::test]
async fn test_three_node_mesh_sync() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let mut charlie = SimNode::new_in_context("charlie", ctx);
for i in 0..5 {
alice.insert_entity_with_metadata(
EntityId::from_u64(100 + i),
format!("a-{i}").into_bytes(),
EntityMetadata::default(),
);
bob.insert_entity_with_metadata(
EntityId::from_u64(200 + i),
format!("b-{i}").into_bytes(),
EntityMetadata::default(),
);
charlie.insert_entity_with_metadata(
EntityId::from_u64(300 + i),
format!("c-{i}").into_bytes(),
EntityMetadata::default(),
);
}
execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("a<-b");
execute_hash_comparison_sync(&mut alice, &charlie)
.await
.expect("a<-c");
execute_hash_comparison_sync(&mut bob, &alice)
.await
.expect("b<-a");
execute_hash_comparison_sync(&mut bob, &charlie)
.await
.expect("b<-c");
execute_hash_comparison_sync(&mut charlie, &alice)
.await
.expect("c<-a");
execute_hash_comparison_sync(&mut charlie, &bob)
.await
.expect("c<-b");
assert_eq!(alice.entity_count(), 15, "Alice should have 15 entities");
assert_eq!(bob.entity_count(), 15, "Bob should have 15 entities");
assert_eq!(
charlie.entity_count(),
15,
"Charlie should have 15 entities"
);
assert_eq!(
alice.root_hash(),
bob.root_hash(),
"Alice and Bob should match"
);
assert_eq!(
bob.root_hash(),
charlie.root_hash(),
"Bob and Charlie should match"
);
}
#[tokio::test]
async fn test_three_node_fresh_join() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let mut charlie = SimNode::new_in_context("charlie", ctx);
for i in 1..=5 {
alice.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("shared-{i}").into_bytes(),
EntityMetadata::default(),
);
bob.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("shared-{i}").into_bytes(),
EntityMetadata::default(),
);
}
assert_eq!(alice.root_hash(), bob.root_hash());
assert_eq!(charlie.entity_count(), 0);
execute_hash_comparison_sync(&mut charlie, &alice)
.await
.expect("charlie <- alice sync should succeed");
assert_eq!(charlie.root_hash(), alice.root_hash());
assert_eq!(charlie.entity_count(), 5);
}
#[tokio::test]
async fn test_three_node_crdt_conflict() {
use calimero_primitives::crdt::CrdtType;
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let mut charlie = SimNode::new_in_context("charlie", ctx);
let conflict_id = EntityId::from_u64(999);
alice.insert_entity_with_metadata(
conflict_id,
b"alice-version".to_vec(),
EntityMetadata::new(CrdtType::lww_register("test"), 100), );
bob.insert_entity_with_metadata(
conflict_id,
b"bob-version".to_vec(),
EntityMetadata::new(CrdtType::lww_register("test"), 200), );
charlie.insert_entity_with_metadata(
conflict_id,
b"charlie-version".to_vec(),
EntityMetadata::new(CrdtType::lww_register("test"), 300), );
execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("a<-b");
execute_hash_comparison_sync(&mut alice, &charlie)
.await
.expect("a<-c");
execute_hash_comparison_sync(&mut bob, &alice)
.await
.expect("b<-a");
execute_hash_comparison_sync(&mut charlie, &alice)
.await
.expect("c<-a");
assert_eq!(alice.root_hash(), bob.root_hash(), "A and B should match");
assert_eq!(bob.root_hash(), charlie.root_hash(), "B and C should match");
assert_eq!(alice.entity_count(), 1);
assert_eq!(bob.entity_count(), 1);
assert_eq!(charlie.entity_count(), 1);
}
#[tokio::test]
async fn test_initiator_has_more_data_push_to_peer() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let base_id = EntityId::from_u64(1000);
alice.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
bob.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
assert_eq!(alice.root_hash(), bob.root_hash());
for i in 1..=10 {
alice.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("seed-{i}").into_bytes(),
EntityMetadata::default(),
);
}
assert_eq!(alice.entity_count(), 11); assert_eq!(bob.entity_count(), 1); assert_ne!(alice.root_hash(), bob.root_hash());
let stats = execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("sync should succeed");
println!(
"Stats: entities_pushed={}, entities_transferred={}, rounds={}",
stats.entities_pushed, stats.entities_transferred, stats.rounds
);
assert_eq!(
bob.entity_count(),
11,
"Bob should have all 11 entities after bidirectional sync"
);
assert_eq!(alice.entity_count(), 11);
assert_eq!(
alice.root_hash(),
bob.root_hash(),
"Root hashes should match after sync"
);
assert!(
stats.entities_pushed >= 10,
"Should have pushed at least 10 entities, got {}",
stats.entities_pushed
);
}
#[tokio::test]
async fn test_bidirectional_both_have_unique_data() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let base_id = EntityId::from_u64(1000);
alice.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
bob.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
for i in 1..=10 {
alice.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("alice-{i}").into_bytes(),
EntityMetadata::default(),
);
}
for i in 101..=103 {
bob.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("bob-{i}").into_bytes(),
EntityMetadata::default(),
);
}
assert_eq!(alice.entity_count(), 11); assert_eq!(bob.entity_count(), 4); assert_ne!(alice.root_hash(), bob.root_hash());
let stats = execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("sync should succeed");
println!(
"Stats: pushed={}, transferred={}, compared={}, skipped={}, rounds={}",
stats.entities_pushed,
stats.entities_transferred,
stats.nodes_compared,
stats.nodes_skipped,
stats.rounds
);
assert_eq!(
alice.entity_count(),
14,
"Alice should have 14 entities (1 shared + 10 own + 3 from bob)"
);
assert_eq!(
bob.entity_count(),
14,
"Bob should have 14 entities (1 shared + 3 own + 10 from alice)"
);
assert_eq!(
alice.root_hash(),
bob.root_hash(),
"Root hashes should match after bidirectional sync"
);
}
#[tokio::test]
async fn test_pull_direction_still_works() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let base_id = EntityId::from_u64(1000);
alice.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
bob.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
for i in 1..=5 {
bob.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("bob-{i}").into_bytes(),
EntityMetadata::default(),
);
}
let stats = execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("sync should succeed");
assert_eq!(alice.entity_count(), 6, "Alice should have 6 entities");
assert_eq!(alice.root_hash(), bob.root_hash(), "Hashes should match");
assert!(
stats.entities_transferred >= 5,
"Should have transferred at least 5 entities"
);
}
#[tokio::test]
async fn test_four_node_seed_data_propagation() {
let ctx = shared_context();
let mut node1 = SimNode::new_in_context("node1", ctx);
let mut node2 = SimNode::new_in_context("node2", ctx);
let mut node3 = SimNode::new_in_context("node3", ctx);
let mut node4 = SimNode::new_in_context("node4", ctx);
let base_id = EntityId::from_u64(1000);
for node in [&mut node1, &mut node2, &mut node3, &mut node4] {
node.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
}
for i in 1..=10 {
node1.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("seed-{i}").into_bytes(),
EntityMetadata::default(),
);
}
assert_eq!(node1.entity_count(), 11);
assert_eq!(node2.entity_count(), 1);
assert_eq!(node3.entity_count(), 1);
assert_eq!(node4.entity_count(), 1);
execute_hash_comparison_sync(&mut node1, &node2)
.await
.expect("n1<->n2");
execute_hash_comparison_sync(&mut node1, &node3)
.await
.expect("n1<->n3");
execute_hash_comparison_sync(&mut node1, &node4)
.await
.expect("n1<->n4");
assert_eq!(
node2.entity_count(),
11,
"Node 2 should have 11 entities after sync with node 1"
);
assert_eq!(
node3.entity_count(),
11,
"Node 3 should have 11 entities after sync with node 1"
);
assert_eq!(
node4.entity_count(),
11,
"Node 4 should have 11 entities after sync with node 1"
);
assert_eq!(node1.root_hash(), node2.root_hash());
assert_eq!(node1.root_hash(), node3.root_hash());
assert_eq!(node1.root_hash(), node4.root_hash());
}
#[tokio::test]
async fn test_single_entity_push_with_shared_base() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let base_id = EntityId::from_u64(1000);
alice.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
bob.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
alice.insert_entity_with_metadata(
EntityId::from_u64(42),
b"hello".to_vec(),
EntityMetadata::default(),
);
assert_eq!(alice.entity_count(), 2);
assert_eq!(bob.entity_count(), 1);
let stats = execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("sync should succeed");
assert_eq!(bob.entity_count(), 2, "Bob should have 2 entities");
assert_eq!(alice.root_hash(), bob.root_hash());
assert!(
stats.entities_pushed >= 1,
"Should have pushed at least 1 entity, got {}",
stats.entities_pushed,
);
}
#[tokio::test]
async fn test_crdt_conflict_during_push() {
use calimero_primitives::crdt::CrdtType;
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let base_id = EntityId::from_u64(1000);
alice.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
bob.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
let conflict_id = EntityId::from_u64(42);
alice.insert_entity_with_metadata(
conflict_id,
b"alice-wins".to_vec(),
EntityMetadata::new(CrdtType::lww_register("test"), 200), );
bob.insert_entity_with_metadata(
conflict_id,
b"bob-loses".to_vec(),
EntityMetadata::new(CrdtType::lww_register("test"), 100), );
for i in 1..=3 {
alice.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("alice-{i}").into_bytes(),
EntityMetadata::default(),
);
}
assert_ne!(alice.root_hash(), bob.root_hash());
execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("a->b should succeed");
execute_hash_comparison_sync(&mut bob, &alice)
.await
.expect("b->a should succeed");
assert_eq!(
alice.root_hash(),
bob.root_hash(),
"Should converge after bidirectional CRDT merge"
);
assert_eq!(alice.entity_count(), 5);
assert_eq!(bob.entity_count(), 5);
}
#[tokio::test]
async fn test_symmetric_sync_converges() {
let ctx = shared_context();
let mut alice = SimNode::new_in_context("alice", ctx);
let mut bob = SimNode::new_in_context("bob", ctx);
let base_id = EntityId::from_u64(1000);
alice.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
bob.insert_entity_with_metadata(base_id, b"base".to_vec(), EntityMetadata::default());
for i in 1..=5 {
alice.insert_entity_with_metadata(
EntityId::from_u64(i),
format!("a-{i}").into_bytes(),
EntityMetadata::default(),
);
bob.insert_entity_with_metadata(
EntityId::from_u64(100 + i),
format!("b-{i}").into_bytes(),
EntityMetadata::default(),
);
}
execute_hash_comparison_sync(&mut alice, &bob)
.await
.expect("a->b");
assert_eq!(alice.entity_count(), 11); assert_eq!(bob.entity_count(), 11);
assert_eq!(alice.root_hash(), bob.root_hash());
let stats = execute_hash_comparison_sync(&mut bob, &alice)
.await
.expect("b->a");
assert_eq!(stats.entities_transferred, 0, "no-op when already synced");
assert_eq!(stats.entities_pushed, 0, "no-op when already synced");
assert_eq!(alice.root_hash(), bob.root_hash());
}
}