use std::sync::Arc;
use calimero_blobstore::config::BlobStoreConfig;
use calimero_blobstore::{BlobManager as BlobStore, FileSystem};
use calimero_context_client::client::ContextClient;
use calimero_dag::{CausalDelta, DeltaKind};
use calimero_network_primitives::client::NetworkClient;
use calimero_node_primitives::client::{BlobManager, NodeClient, SyncClient};
use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use calimero_storage::action::Action;
use calimero_storage::logical_clock::HybridTimestamp;
use calimero_store::db::InMemoryDB;
use calimero_store::Store;
use calimero_utils_actix::LazyRecipient;
use tokio::sync::{broadcast, mpsc};
use crate::delta_store::DeltaStore;
fn make_delta(
id: [u8; 32],
parents: Vec<[u8; 32]>,
expected_root_hash: [u8; 32],
) -> CausalDelta<Vec<Action>> {
CausalDelta {
id,
parents,
payload: Vec::new(),
hlc: HybridTimestamp::default(),
expected_root_hash,
kind: DeltaKind::Regular,
}
}
#[tokio::test]
async fn add_local_applied_delta_prunes_non_head_ancestors() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = Store::new(Arc::new(InMemoryDB::owned()));
let blob_config =
BlobStoreConfig::new(tmp.path().to_path_buf().try_into().expect("utf8 blob path"));
let file_system = FileSystem::new(&blob_config).await.expect("blob fs");
let blob_store = BlobStore::new(store.clone(), file_system);
let blob_manager = BlobManager::new(blob_store);
let node_recipient = LazyRecipient::new();
let context_recipient = LazyRecipient::new();
let network_recipient = LazyRecipient::new();
let network_client = NetworkClient::new(network_recipient);
let (event_sender, _) = broadcast::channel(16);
let (ctx_sync_tx, _ctx_sync_rx) = mpsc::channel(1);
let (ns_sync_tx, _ns_sync_rx) = mpsc::channel(1);
let (ns_join_tx, _ns_join_rx) = mpsc::channel(1);
let (open_subgroup_join_tx, _open_subgroup_join_rx) = mpsc::channel(1);
let sync_client = SyncClient::new(ctx_sync_tx, ns_sync_tx, ns_join_tx, open_subgroup_join_tx);
let node_client = NodeClient::new(
store.clone(),
blob_manager,
network_client,
node_recipient,
event_sender,
sync_client,
String::new(),
None,
);
let context_client = ContextClient::new(store, node_client, context_recipient);
let context_id = ContextId::from([0xAA; 32]);
let our_identity = PublicKey::from([0xBB; 32]);
let root = [0u8; 32];
let delta_store = DeltaStore::new(root, context_client, context_id, our_identity);
let parent_id = [0x11u8; 32];
let parent_hash = [0xA1; 32];
let cascaded = delta_store
.add_local_applied_delta(make_delta(parent_id, vec![[0u8; 32]], parent_hash))
.await
.expect("add parent succeeds");
assert!(
cascaded.is_empty(),
"no pending children → no cascaded events"
);
let ids = delta_store.head_root_hash_ids().await;
assert_eq!(
ids,
vec![parent_id],
"parent is the sole head after first add"
);
let child_id = [0x22u8; 32];
let child_hash = [0xC1; 32];
let cascaded = delta_store
.add_local_applied_delta(make_delta(child_id, vec![parent_id], child_hash))
.await
.expect("add child");
assert!(cascaded.is_empty(), "still no pending children to cascade");
let ids = delta_store.head_root_hash_ids().await;
assert_eq!(
ids,
vec![child_id],
"parent entry must be pruned once it is no longer a head"
);
}
#[tokio::test]
async fn add_local_applied_delta_self_logs_own_rotations() {
use std::collections::BTreeSet;
use calimero_storage::address::Id;
use calimero_storage::tests::common::{build_signed_shared_action, pubkey_of};
use ed25519_dalek::SigningKey;
use crate::delta_store::load_rotation_log_direct;
let tmp = tempfile::tempdir().expect("tempdir");
let store = Store::new(Arc::new(InMemoryDB::owned()));
let blob_config =
BlobStoreConfig::new(tmp.path().to_path_buf().try_into().expect("utf8 blob path"));
let file_system = FileSystem::new(&blob_config).await.expect("blob fs");
let blob_store = BlobStore::new(store.clone(), file_system);
let blob_manager = BlobManager::new(blob_store);
let network_client = NetworkClient::new(LazyRecipient::new());
let (event_sender, _) = broadcast::channel(16);
let (ctx_sync_tx, _r0) = mpsc::channel(1);
let (ns_sync_tx, _r1) = mpsc::channel(1);
let (ns_join_tx, _r2) = mpsc::channel(1);
let (open_subgroup_join_tx, _r3) = mpsc::channel(1);
let sync_client = SyncClient::new(ctx_sync_tx, ns_sync_tx, ns_join_tx, open_subgroup_join_tx);
let node_client = NodeClient::new(
store.clone(),
blob_manager,
network_client,
LazyRecipient::new(),
event_sender,
sync_client,
String::new(),
None,
);
let context_id = ContextId::from([0xAA; 32]);
let our_identity = PublicKey::from([0xBB; 32]);
let context_client = ContextClient::new(store, node_client, LazyRecipient::new());
let reader = context_client.clone();
let delta_store = DeltaStore::new([0u8; 32], context_client, context_id, our_identity);
let anchor = Id::new([0x33; 32]);
let alice_sk = SigningKey::from_bytes(&[0xA1; 32]);
let bob_sk = SigningKey::from_bytes(&[0xB2; 32]);
let alice = pubkey_of(&alice_sk);
let bob = pubkey_of(&bob_sk);
let writers = |set: &[PublicKey]| -> BTreeSet<PublicKey> { set.iter().copied().collect() };
let shared_action = |add: bool, w: BTreeSet<PublicKey>, nonce: u64| -> Action {
build_signed_shared_action(add, anchor, vec![1], w, nonce, &alice_sk, vec![])
};
let log_entries = || {
load_rotation_log_direct(&reader, context_id, anchor)
.expect("read log")
.map(|l| {
l.entries
.iter()
.map(|e| e.new_writers.clone())
.collect::<Vec<_>>()
})
.unwrap_or_default()
};
let mut d = make_delta([0x01; 32], vec![[0u8; 32]], [0xD1; 32]);
d.payload = vec![shared_action(true, writers(&[alice]), 1)];
let _ = delta_store
.add_local_applied_delta(d)
.await
.expect("add genesis");
assert_eq!(
log_entries(),
vec![calimero_storage::entities::full_mask(writers(&[alice]))],
"genesis Add must self-log a bootstrap entry"
);
let mut d = make_delta([0x02; 32], vec![[0x01; 32]], [0xD2; 32]);
d.payload = vec![shared_action(false, writers(&[alice, bob]), 2)];
let _ = delta_store
.add_local_applied_delta(d)
.await
.expect("add rotation");
assert_eq!(
log_entries(),
vec![
calimero_storage::entities::full_mask(writers(&[alice])),
calimero_storage::entities::full_mask(writers(&[alice, bob]))
],
"writer-set change must self-log a rotation entry"
);
let mut d = make_delta([0x03; 32], vec![[0x02; 32]], [0xD3; 32]);
d.payload = vec![shared_action(false, writers(&[alice, bob]), 3)];
let _ = delta_store
.add_local_applied_delta(d)
.await
.expect("add value-write");
assert_eq!(
log_entries(),
vec![
calimero_storage::entities::full_mask(writers(&[alice])),
calimero_storage::entities::full_mask(writers(&[alice, bob]))
],
"a value-write that doesn't change writers must NOT append a rotation entry"
);
}