use std::collections::HashMap;
use std::sync::Arc;
use calimero_dag::{ApplyError, CausalDelta, DagStore, DeltaApplier, MAX_DELTA_QUERY_LIMIT};
use calimero_primitives::hash::Hash;
use calimero_storage::action::Action;
use calimero_storage::address::Id;
use calimero_storage::snapshot::Snapshot;
use calimero_storage::store::MainStorage;
use calimero_storage::Interface;
use tokio::sync::RwLock;
struct TestApplier {
applied: Arc<RwLock<Vec<[u8; 32]>>>,
}
impl TestApplier {
fn new() -> Self {
Self {
applied: Arc::new(RwLock::new(Vec::new())),
}
}
async fn get_applied(&self) -> Vec<[u8; 32]> {
self.applied.read().await.clone()
}
}
#[async_trait::async_trait]
impl DeltaApplier<Vec<Action>> for TestApplier {
async fn apply(&self, delta: &CausalDelta<Vec<Action>>) -> Result<(), ApplyError> {
for action in &delta.payload {
Interface::<MainStorage>::apply_action(action.clone())
.map_err(|e| ApplyError::Application(e.to_string()))?;
}
self.applied.write().await.push(delta.id);
Ok(())
}
}
struct SimulatedNode {
node_id: String,
dag: Arc<RwLock<DagStore<Vec<Action>>>>,
applier: Arc<TestApplier>,
root_hash: Arc<RwLock<Hash>>,
}
impl SimulatedNode {
fn new(node_id: &str) -> Self {
Self {
node_id: node_id.to_string(),
dag: Arc::new(RwLock::new(DagStore::new([0; 32]))),
applier: Arc::new(TestApplier::new()),
root_hash: Arc::new(RwLock::new(Hash::from([0; 32]))),
}
}
async fn add_delta(&self, delta: CausalDelta<Vec<Action>>) -> eyre::Result<bool> {
let mut dag = self.dag.write().await;
let applied = dag.add_delta(delta, &*self.applier).await?;
if applied {
*self.root_hash.write().await = Hash::from([99; 32]); }
Ok(applied)
}
async fn get_missing_parents(&self) -> Vec<[u8; 32]> {
self.dag
.read()
.await
.get_missing_parents(MAX_DELTA_QUERY_LIMIT)
}
async fn get_heads(&self) -> Vec<[u8; 32]> {
self.dag.read().await.get_heads()
}
async fn get_root_hash(&self) -> Hash {
*self.root_hash.read().await
}
async fn get_delta(&self, delta_id: &[u8; 32]) -> Option<CausalDelta<Vec<Action>>> {
self.dag.read().await.get_delta(delta_id).cloned()
}
}
#[tokio::test]
async fn test_missing_delta_catch_up_single_parent() {
let node_a = SimulatedNode::new("node_a");
let node_b = SimulatedNode::new("node_b");
let delta1 = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![]);
let delta2 = CausalDelta::new_test([2; 32], vec![[1; 32]], vec![]);
node_a.add_delta(delta1.clone()).await.unwrap();
node_a.add_delta(delta2.clone()).await.unwrap();
node_b.add_delta(delta2.clone()).await.unwrap();
let missing = node_b.get_missing_parents().await;
assert_eq!(
missing,
vec![[1; 32]],
"Node B should be missing parent delta 1"
);
let requested_delta = node_a.get_delta(&[1; 32]).await;
assert!(requested_delta.is_some());
node_b.add_delta(requested_delta.unwrap()).await.unwrap();
let missing_after = node_b.get_missing_parents().await;
assert_eq!(missing_after.len(), 0);
let applied = node_b.applier.get_applied().await;
assert_eq!(applied.len(), 2);
assert!(applied.contains(&[1; 32]));
assert!(applied.contains(&[2; 32]));
}
#[tokio::test]
async fn test_missing_delta_catch_up_multiple_parents() {
let node_a = SimulatedNode::new("node_a");
let node_b = SimulatedNode::new("node_b");
let delta1 = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![]);
let delta2 = CausalDelta::new_test([2; 32], vec![[0; 32]], vec![]);
let delta3_merge = CausalDelta::new_test([3; 32], vec![[1; 32], [2; 32]], vec![]);
node_a.add_delta(delta1.clone()).await.unwrap();
node_a.add_delta(delta2.clone()).await.unwrap();
node_a.add_delta(delta3_merge.clone()).await.unwrap();
node_b.add_delta(delta3_merge).await.unwrap();
let mut missing = node_b.get_missing_parents().await;
missing.sort();
assert_eq!(missing.len(), 2);
assert!(missing.contains(&[1; 32]));
assert!(missing.contains(&[2; 32]));
for delta_id in missing {
let delta = node_a.get_delta(&delta_id).await.unwrap();
node_b.add_delta(delta).await.unwrap();
}
let applied = node_b.applier.get_applied().await;
assert_eq!(applied.len(), 3);
}
#[tokio::test]
async fn test_deep_chain_catch_up() {
let node_a = SimulatedNode::new("node_a");
let node_b = SimulatedNode::new("node_b");
let mut prev_id = [0; 32];
let mut deltas = vec![];
for i in 1..=10 {
let mut id = [0; 32];
id[0] = i;
let delta = CausalDelta::new_test(id, vec![prev_id], vec![]);
deltas.push(delta.clone());
prev_id = id;
}
for delta in &deltas {
node_a.add_delta(delta.clone()).await.unwrap();
}
node_b.add_delta(deltas[9].clone()).await.unwrap();
let missing = node_b.get_missing_parents().await;
assert_eq!(missing.len(), 1);
let mut current_missing = missing;
let mut requested_count = 0;
while !current_missing.is_empty() {
for delta_id in ¤t_missing {
let delta = node_a.get_delta(delta_id).await.unwrap();
node_b.add_delta(delta).await.unwrap();
requested_count += 1;
}
current_missing = node_b.get_missing_parents().await;
}
assert_eq!(requested_count, 9);
let applied = node_b.applier.get_applied().await;
assert_eq!(applied.len(), 10);
}
#[tokio::test]
async fn test_snapshot_transfer_fresh_node() {
let node_a = SimulatedNode::new("node_a");
let _node_b = SimulatedNode::new("node_b");
for i in 1u8..=5u8 {
let delta = CausalDelta::new_test(
[i; 32],
vec![if i == 1 { [0; 32] } else { [i - 1; 32] }],
vec![],
);
node_a.add_delta(delta).await.unwrap();
}
let node_a_heads = node_a.get_heads().await;
assert_eq!(node_a_heads, vec![[5; 32]]);
let snapshot = Snapshot {
entity_count: 5,
index_count: 0,
entries: vec![],
indexes: vec![],
root_hash: [0; 32],
timestamp: 0,
};
assert_eq!(snapshot.entity_count, 5);
}
#[tokio::test]
async fn test_snapshot_excludes_tombstones() {
let live_entries = vec![
(Id::from([1; 32]), vec![1u8; 10]),
(Id::from([3; 32]), vec![3u8; 10]),
(Id::from([5; 32]), vec![5u8; 10]),
];
let snapshot = Snapshot {
entity_count: 3,
index_count: 0,
entries: live_entries.clone(),
indexes: vec![],
root_hash: [1; 32],
timestamp: 0,
};
assert_eq!(snapshot.entries.len(), 3);
let entity_ids: Vec<u8> = snapshot
.entries
.iter()
.map(|(id, _)| id.as_bytes()[0])
.collect();
assert!(
!entity_ids.contains(&2),
"Tombstone should not be in snapshot"
);
assert!(
!entity_ids.contains(&4),
"Tombstone should not be in snapshot"
);
assert!(entity_ids.contains(&1));
assert!(entity_ids.contains(&3));
assert!(entity_ids.contains(&5));
}
#[tokio::test]
async fn test_peer_selection_prefers_peer_with_state() {
let mut peers = HashMap::new();
peers.insert("peer_a", ([0; 32], vec![[0; 32]])); peers.insert("peer_b", ([0; 32], vec![[0; 32]])); peers.insert("peer_c", ([1; 32], vec![[5; 32]]));
let selected = peers
.iter()
.find(|(_, (root_hash, _))| *root_hash != [0; 32])
.map(|(id, _)| *id);
assert_eq!(selected, Some("peer_c"));
}
#[tokio::test]
async fn test_peer_selection_random_when_all_initialized() {
let mut peers = HashMap::new();
peers.insert("peer_a", ([1; 32], vec![[3; 32]]));
peers.insert("peer_b", ([2; 32], vec![[4; 32]]));
peers.insert("peer_c", ([3; 32], vec![[5; 32]]));
let any_peer_valid = peers
.iter()
.all(|(_, (root_hash, _))| *root_hash != [0; 32]);
assert!(any_peer_valid);
}
#[tokio::test]
async fn test_hash_heartbeat_detects_silent_divergence() {
let node_a = SimulatedNode::new("node_a");
let node_b = SimulatedNode::new("node_b");
let delta1 = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![]);
node_a.add_delta(delta1.clone()).await.unwrap();
node_b.add_delta(delta1).await.unwrap();
assert_eq!(node_a.get_heads().await, node_b.get_heads().await);
assert_eq!(node_a.get_heads().await, vec![[1; 32]]);
*node_a.root_hash.write().await = Hash::from([100; 32]); *node_b.root_hash.write().await = Hash::from([200; 32]);
let heads_a = node_a.get_heads().await;
let heads_b = node_b.get_heads().await;
let hash_a = node_a.get_root_hash().await;
let hash_b = node_b.get_root_hash().await;
assert_eq!(heads_a, heads_b, "Nodes should have same DAG heads");
assert_ne!(
hash_a, hash_b,
"Different root hash indicates silent divergence"
);
assert_eq!(heads_a, heads_b);
assert_ne!(hash_a, hash_b);
}
#[tokio::test]
async fn test_heartbeat_with_same_state_no_divergence() {
let node_a = SimulatedNode::new("node_a");
let node_b = SimulatedNode::new("node_b");
for i in 1..=3 {
let mut id = [0; 32];
id[0] = i;
let delta =
CausalDelta::new_test(id, vec![if i == 1 { [0; 32] } else { [i - 1; 32] }], vec![]);
node_a.add_delta(delta.clone()).await.unwrap();
node_b.add_delta(delta).await.unwrap();
}
assert_eq!(node_a.get_heads().await, node_b.get_heads().await);
assert_eq!(node_a.get_root_hash().await, node_b.get_root_hash().await);
}
#[tokio::test]
async fn test_merkle_comparison_detects_differences() {
let node_a_entities = vec![
(Id::from([1; 32]), vec![1, 2, 3]),
(Id::from([2; 32]), vec![4, 5, 6]),
];
let node_b_entities = vec![
(Id::from([1; 32]), vec![1, 2, 3]), (Id::from([2; 32]), vec![9, 9, 9]), ];
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let hash_a = {
let mut hasher = DefaultHasher::new();
for (id, data) in &node_a_entities {
id.hash(&mut hasher);
data.hash(&mut hasher);
}
hasher.finish()
};
let hash_b = {
let mut hasher = DefaultHasher::new();
for (id, data) in &node_b_entities {
id.hash(&mut hasher);
data.hash(&mut hasher);
}
hasher.finish()
};
assert_ne!(
hash_a, hash_b,
"Merkle comparison should detect different state"
);
}
#[tokio::test]
async fn test_recovery_via_full_resync() {
let node_a = SimulatedNode::new("node_a");
let node_b = SimulatedNode::new("node_b");
let delta_a = CausalDelta::new_test([10; 32], vec![[0; 32]], vec![]);
let delta_b = CausalDelta::new_test([20; 32], vec![[0; 32]], vec![]);
node_a.add_delta(delta_a).await.unwrap();
node_b.add_delta(delta_b).await.unwrap();
assert_ne!(node_a.get_heads().await, node_b.get_heads().await);
let snapshot = Snapshot {
entity_count: 1,
index_count: 0,
entries: vec![(Id::from([100; 32]), vec![1])],
indexes: vec![],
root_hash: [1; 32],
timestamp: 0,
};
assert_eq!(snapshot.entries.len(), 1);
}
#[tokio::test]
async fn test_recovery_via_delta_replay() {
let node_a = SimulatedNode::new("node_a");
let node_b = SimulatedNode::new("node_b");
let mut deltas = vec![];
for i in 1..=5 {
let mut id = [0; 32];
id[0] = i;
let delta =
CausalDelta::new_test(id, vec![if i == 1 { [0; 32] } else { [i - 1; 32] }], vec![]);
deltas.push(delta.clone());
node_a.add_delta(delta).await.unwrap();
}
assert_eq!(node_b.get_heads().await, vec![[0; 32]]);
let node_a_heads = node_a.get_heads().await;
for delta in deltas {
node_b.add_delta(delta).await.unwrap();
}
assert_eq!(node_b.get_heads().await, node_a_heads);
}