use std::sync::Arc;
use calimero_dag::{ApplyError, CausalDelta, DagStore, DeltaApplier, MAX_DELTA_QUERY_LIMIT};
use calimero_storage::action::Action;
use calimero_storage::address::Id;
use calimero_storage::entities::{ChildInfo, Metadata};
use calimero_storage::env::time_now;
use calimero_storage::index::Index;
use calimero_storage::interface::Interface;
use calimero_storage::store::MainStorage;
use tokio::sync::Mutex;
struct StorageApplier {
applied: Arc<Mutex<Vec<AppliedDelta>>>,
should_fail: Arc<Mutex<bool>>,
}
impl StorageApplier {
fn new() -> Self {
Self {
applied: Arc::new(Mutex::new(Vec::new())),
should_fail: Arc::new(Mutex::new(false)),
}
}
async fn set_should_fail(&self, value: bool) {
*self.should_fail.lock().await = value;
}
async fn get_applied(&self) -> Vec<AppliedDelta> {
self.applied.lock().await.clone()
}
}
#[derive(Debug, Clone)]
struct AppliedDelta {
delta_id: [u8; 32],
action_count: usize,
}
#[async_trait::async_trait]
impl DeltaApplier<Vec<Action>> for StorageApplier {
async fn apply(&self, delta: &CausalDelta<Vec<Action>>) -> Result<(), ApplyError> {
if *self.should_fail.lock().await {
return Err(ApplyError::Application("Simulated failure".to_string()));
}
for action in &delta.payload {
Interface::<MainStorage>::apply_action(action.clone())
.map_err(|e| ApplyError::Application(e.to_string()))?;
}
self.applied.lock().await.push(AppliedDelta {
delta_id: delta.id,
action_count: delta.payload.len(),
});
Ok(())
}
}
#[tokio::test]
async fn test_dag_applies_deltas_to_storage_in_order() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let id1 = Id::new([1; 32]);
let id2 = Id::new([2; 32]);
Index::<MainStorage>::add_root(ChildInfo::new(id1, [10; 32], Metadata::default())).unwrap();
Index::<MainStorage>::add_root(ChildInfo::new(id2, [20; 32], Metadata::default())).unwrap();
let action1 = Action::Update {
id: id1,
data: b"data from delta 1".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
let action2 = Action::Update {
id: id2,
data: b"data from delta 2".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
let delta1 = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![action1]);
let delta2 = CausalDelta::new_test([2; 32], vec![[1; 32]], vec![action2]);
let applied1 = dag.add_delta(delta1, &applier).await.unwrap();
let applied2 = dag.add_delta(delta2, &applier).await.unwrap();
assert!(applied1, "Delta1 should be applied");
assert!(applied2, "Delta2 should be applied");
let applied_deltas = applier.get_applied().await;
assert_eq!(applied_deltas.len(), 2);
assert_eq!(applied_deltas[0].delta_id, [1; 32]);
assert_eq!(applied_deltas[1].delta_id, [2; 32]);
let stored1 = Interface::<MainStorage>::get(id1).unwrap();
let stored2 = Interface::<MainStorage>::get(id2).unwrap();
assert_eq!(
stored1, b"data from delta 1",
"Storage should have data from delta1"
);
assert_eq!(
stored2, b"data from delta 2",
"Storage should have data from delta2"
);
assert_eq!(dag.get_heads(), vec![[2; 32]]);
assert_eq!(dag.stats().applied_deltas, 3); }
#[tokio::test]
async fn test_dag_handles_out_of_order_and_applies_to_storage() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let id1 = Id::new([10; 32]);
let id2 = Id::new([20; 32]);
Index::<MainStorage>::add_root(ChildInfo::new(id1, [11; 32], Metadata::default())).unwrap();
Index::<MainStorage>::add_root(ChildInfo::new(id2, [22; 32], Metadata::default())).unwrap();
let action1 = Action::Update {
id: id1,
data: b"first delta data".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
let action2 = Action::Update {
id: id2,
data: b"second delta data".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
let delta1 = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![action1]);
let delta2 = CausalDelta::new_test([2; 32], vec![[1; 32]], vec![action2]);
let applied2 = dag.add_delta(delta2.clone(), &applier).await.unwrap();
assert!(!applied2, "Delta2 should be pending (missing parent)");
assert_eq!(
applier.get_applied().await.len(),
0,
"No deltas applied yet"
);
assert_eq!(dag.pending_stats().count, 1, "Delta2 should be pending");
assert_eq!(
dag.get_missing_parents(MAX_DELTA_QUERY_LIMIT),
vec![[1; 32]],
"Missing parent delta1"
);
let applied1 = dag.add_delta(delta1, &applier).await.unwrap();
assert!(applied1, "Delta1 should be applied");
let applied_deltas = applier.get_applied().await;
assert_eq!(applied_deltas.len(), 2, "Both deltas should be applied");
assert_eq!(applied_deltas[0].delta_id, [1; 32], "Delta1 applied first");
assert_eq!(
applied_deltas[1].delta_id, [2; 32],
"Delta2 auto-applied second"
);
let stored1 = Interface::<MainStorage>::get(id1).unwrap();
let stored2 = Interface::<MainStorage>::get(id2).unwrap();
assert_eq!(stored1, b"first delta data", "Storage has delta1 data");
assert_eq!(
stored2, b"second delta data",
"Storage has delta2 data (auto-applied!)"
);
assert_eq!(dag.pending_stats().count, 0, "No pending deltas");
assert_eq!(dag.stats().applied_deltas, 3); assert_eq!(dag.get_heads(), vec![[2; 32]]);
}
#[tokio::test]
async fn test_dag_concurrent_updates_both_applied_to_storage() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let id_a = Id::new([100; 32]);
let id_b = Id::new([200; 32]);
Index::<MainStorage>::add_root(ChildInfo::new(id_a, [101; 32], Metadata::default())).unwrap();
Index::<MainStorage>::add_root(ChildInfo::new(id_b, [201; 32], Metadata::default())).unwrap();
let action_a = Action::Update {
id: id_a,
data: b"concurrent update A".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
let action_b = Action::Update {
id: id_b,
data: b"concurrent update B".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
let delta_a = CausalDelta::new_test([10; 32], vec![[0; 32]], vec![action_a]);
let delta_b = CausalDelta::new_test([20; 32], vec![[0; 32]], vec![action_b]);
dag.add_delta(delta_a, &applier).await.unwrap();
dag.add_delta(delta_b, &applier).await.unwrap();
let mut heads = dag.get_heads();
heads.sort();
assert_eq!(heads.len(), 2, "Concurrent updates create two heads");
assert!(heads.contains(&[10; 32]));
assert!(heads.contains(&[20; 32]));
let applied_deltas = applier.get_applied().await;
assert_eq!(applied_deltas.len(), 2, "Both concurrent deltas applied");
let stored_a = Interface::<MainStorage>::get(id_a).unwrap();
let stored_b = Interface::<MainStorage>::get(id_b).unwrap();
assert_eq!(stored_a, b"concurrent update A", "Storage has update A");
assert_eq!(stored_b, b"concurrent update B", "Storage has update B");
let delta_merge = CausalDelta::new_test(
[30; 32],
vec![[10; 32], [20; 32]], vec![], );
dag.add_delta(delta_merge, &applier).await.unwrap();
assert_eq!(dag.get_heads(), vec![[30; 32]]);
assert_eq!(dag.stats().applied_deltas, 4);
assert_eq!(
Interface::<MainStorage>::get(id_a).unwrap(),
b"concurrent update A"
);
assert_eq!(
Interface::<MainStorage>::get(id_b).unwrap(),
b"concurrent update B"
);
}
#[tokio::test]
async fn test_dag_storage_error_handling() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let id1 = Id::new([1; 32]);
Index::<MainStorage>::add_root(ChildInfo::new(id1, [10; 32], Metadata::default())).unwrap();
let action1 = Action::Update {
id: id1,
data: b"delta 1".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
let delta1 = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![action1]);
applier.set_should_fail(true).await;
let result = dag.add_delta(delta1.clone(), &applier).await;
assert!(result.is_err(), "Should fail when applier fails");
assert!(dag.has_delta(&[1; 32]));
assert_eq!(dag.stats().applied_deltas, 1); assert_eq!(applier.get_applied().await.len(), 0);
}
#[tokio::test]
async fn test_dag_storage_lww_through_deltas() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let id = Id::new([1; 32]);
Index::<MainStorage>::add_root(ChildInfo::new(id, [10; 32], Metadata::default())).unwrap();
let action1 = Action::Update {
id,
data: b"version 1".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
std::thread::sleep(std::time::Duration::from_millis(2));
let action2 = Action::Update {
id,
data: b"version 2".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
let delta1 = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![action1]);
let delta2 = CausalDelta::new_test([2; 32], vec![[0; 32]], vec![action2]);
dag.add_delta(delta1, &applier).await.unwrap();
dag.add_delta(delta2, &applier).await.unwrap();
let stored = Interface::<MainStorage>::get(id).unwrap();
assert_eq!(stored, b"version 2");
}
#[tokio::test]
async fn test_dag_storage_delete_via_delta() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let id = Id::new([1; 32]);
Index::<MainStorage>::add_root(ChildInfo::new(id, [10; 32], Metadata::default())).unwrap();
let add_action = Action::Add {
id,
data: b"test data".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
};
let delta1 = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![add_action]);
dag.add_delta(delta1, &applier).await.unwrap();
assert!(Interface::<MainStorage>::get(id).is_ok());
let delete_action = Action::DeleteRef {
id,
deleted_at: time_now(),
metadata: Metadata::default(),
};
let delta2 = CausalDelta::new_test([2; 32], vec![[1; 32]], vec![delete_action]);
dag.add_delta(delta2, &applier).await.unwrap();
assert!(Index::<MainStorage>::is_deleted(id).unwrap());
}
#[tokio::test]
async fn test_dag_storage_multiple_actions_per_delta() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let id1 = Id::new([1; 32]);
let id2 = Id::new([2; 32]);
let id3 = Id::new([3; 32]);
Index::<MainStorage>::add_root(ChildInfo::new(id1, [11; 32], Metadata::default())).unwrap();
Index::<MainStorage>::add_root(ChildInfo::new(id2, [22; 32], Metadata::default())).unwrap();
Index::<MainStorage>::add_root(ChildInfo::new(id3, [33; 32], Metadata::default())).unwrap();
let actions = vec![
Action::Update {
id: id1,
data: b"update 1".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
},
Action::Update {
id: id2,
data: b"update 2".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
},
Action::Update {
id: id3,
data: b"update 3".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
},
];
let delta = CausalDelta::new_test([1; 32], vec![[0; 32]], actions.clone());
dag.add_delta(delta, &applier).await.unwrap();
let applied_deltas = applier.get_applied().await;
assert_eq!(applied_deltas.len(), 1);
assert_eq!(applied_deltas[0].action_count, 3);
assert_eq!(Interface::<MainStorage>::get(id1).unwrap(), b"update 1");
assert_eq!(Interface::<MainStorage>::get(id2).unwrap(), b"update 2");
assert_eq!(Interface::<MainStorage>::get(id3).unwrap(), b"update 3");
}
#[tokio::test]
async fn test_dag_storage_deep_chain_out_of_order() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let ids: Vec<_> = (1..=10).map(|i| Id::new([i; 32])).collect();
for id in &ids {
Index::<MainStorage>::add_root(ChildInfo::new(*id, [0; 32], Metadata::default())).unwrap();
}
let deltas: Vec<_> = (1..=10)
.map(|i| {
let action = Action::Update {
id: ids[i - 1],
data: format!("value {}", i).into_bytes(),
ancestors: vec![],
metadata: Metadata::default(),
};
CausalDelta::new_test([i as u8; 32], vec![[(i - 1) as u8; 32]], vec![action])
})
.collect();
for delta in deltas.iter().rev() {
dag.add_delta(delta.clone(), &applier).await.unwrap();
}
let applied = applier.get_applied().await;
assert_eq!(applied.len(), 10);
for i in 1..=10 {
assert_eq!(applied[i - 1].delta_id, [i as u8; 32]);
}
for i in 1..=10 {
let stored = Interface::<MainStorage>::get(ids[i - 1]).unwrap();
assert_eq!(stored, format!("value {}", i).as_bytes());
}
assert_eq!(dag.pending_stats().count, 0);
}
#[tokio::test]
async fn test_dag_storage_concurrent_branches_merge() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let id_a = Id::new([10; 32]);
let id_b = Id::new([20; 32]);
Index::<MainStorage>::add_root(ChildInfo::new(id_a, [11; 32], Metadata::default())).unwrap();
Index::<MainStorage>::add_root(ChildInfo::new(id_b, [22; 32], Metadata::default())).unwrap();
let delta_a = CausalDelta::new_test(
[1; 32],
vec![[0; 32]],
vec![Action::Update {
id: id_a,
data: b"branch A v1".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
}],
);
let delta_a2 = CausalDelta::new_test(
[2; 32],
vec![[1; 32]],
vec![Action::Update {
id: id_a,
data: b"branch A v2".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
}],
);
let delta_b = CausalDelta::new_test(
[3; 32],
vec![[0; 32]],
vec![Action::Update {
id: id_b,
data: b"branch B".to_vec(),
ancestors: vec![],
metadata: Metadata::default(),
}],
);
dag.add_delta(delta_a, &applier).await.unwrap();
dag.add_delta(delta_a2, &applier).await.unwrap();
dag.add_delta(delta_b, &applier).await.unwrap();
let mut heads = dag.get_heads();
heads.sort();
assert_eq!(heads.len(), 2);
let merge = CausalDelta::new_test(
[99; 32],
vec![[2; 32], [3; 32]],
vec![], );
dag.add_delta(merge, &applier).await.unwrap();
assert_eq!(dag.get_heads(), vec![[99; 32]]);
assert_eq!(Interface::<MainStorage>::get(id_a).unwrap(), b"branch A v2");
assert_eq!(Interface::<MainStorage>::get(id_b).unwrap(), b"branch B");
}
#[tokio::test]
async fn test_dag_storage_stress_many_deltas() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let id = Id::new([1; 32]);
Index::<MainStorage>::add_root(ChildInfo::new(id, [10; 32], Metadata::default())).unwrap();
for i in 1..=100 {
let action = Action::Update {
id,
data: format!("version {}", i).into_bytes(),
ancestors: vec![],
metadata: Metadata::default(),
};
let delta = CausalDelta::new_test([i as u8; 32], vec![[(i - 1) as u8; 32]], vec![action]);
dag.add_delta(delta, &applier).await.unwrap();
}
assert_eq!(applier.get_applied().await.len(), 100);
assert_eq!(dag.stats().applied_deltas, 101);
let stored = Interface::<MainStorage>::get(id).unwrap();
assert_eq!(stored, b"version 100");
}
#[tokio::test]
async fn test_dag_heads_tracked_after_linear_deltas() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
assert_eq!(dag.get_heads(), vec![[0; 32]]);
let delta1 = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![]);
dag.add_delta(delta1, &applier).await.unwrap();
assert_eq!(dag.get_heads(), vec![[1; 32]]);
let delta2 = CausalDelta::new_test([2; 32], vec![[1; 32]], vec![]);
dag.add_delta(delta2, &applier).await.unwrap();
assert_eq!(dag.get_heads(), vec![[2; 32]]);
}
#[tokio::test]
async fn test_dag_heads_multiple_concurrent_branches() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let delta_a = CausalDelta::new_test([10; 32], vec![[0; 32]], vec![]);
let delta_b = CausalDelta::new_test([20; 32], vec![[0; 32]], vec![]);
dag.add_delta(delta_a, &applier).await.unwrap();
dag.add_delta(delta_b, &applier).await.unwrap();
let mut heads = dag.get_heads();
heads.sort();
assert_eq!(heads.len(), 2);
assert_eq!(heads, vec![[10; 32], [20; 32]]);
}
#[tokio::test]
async fn test_dag_heads_merge_reduces_to_single_head() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let delta_a = CausalDelta::new_test([1; 32], vec![[0; 32]], vec![]);
let delta_b = CausalDelta::new_test([2; 32], vec![[0; 32]], vec![]);
dag.add_delta(delta_a, &applier).await.unwrap();
dag.add_delta(delta_b, &applier).await.unwrap();
assert_eq!(dag.get_heads().len(), 2);
let merge = CausalDelta::new_test([99; 32], vec![[1; 32], [2; 32]], vec![]);
dag.add_delta(merge, &applier).await.unwrap();
assert_eq!(dag.get_heads(), vec![[99; 32]]);
}
#[tokio::test]
async fn test_concurrent_branches_deterministic_root_hash() {
let applier = StorageApplier::new();
let mut dag = DagStore::new([0; 32]);
let delta_a = create_test_delta(
[10; 32], vec![[0; 32]], vec![], [0xAA; 32], );
let delta_b = create_test_delta(
[20; 32], vec![[0; 32]], vec![], [0xBB; 32], );
let _ = dag.add_delta(delta_a.clone(), &applier).await.unwrap();
let _ = dag.add_delta(delta_b.clone(), &applier).await.unwrap();
let mut heads = dag.get_heads();
heads.sort();
assert_eq!(heads.len(), 2);
assert!(heads.contains(&[10; 32]));
assert!(heads.contains(&[20; 32]));
let applier2 = StorageApplier::new();
let mut dag2 = DagStore::new([0; 32]);
let _ = dag2.add_delta(delta_b.clone(), &applier2).await.unwrap();
let _ = dag2.add_delta(delta_a.clone(), &applier2).await.unwrap();
let mut heads2 = dag2.get_heads();
heads2.sort();
assert_eq!(heads2, heads);
}
fn create_test_delta(
id: [u8; 32],
parents: Vec<[u8; 32]>,
actions: Vec<Action>,
expected_root_hash: [u8; 32],
) -> CausalDelta<Vec<Action>> {
CausalDelta {
id,
parents,
payload: actions,
hlc: calimero_storage::logical_clock::HybridTimestamp::default(),
expected_root_hash,
kind: calimero_dag::DeltaKind::Regular,
}
}