use crate::sync::helpers::{
apply_leaf_with_crdt_merge, apply_leaf_with_crdt_merge_gated, apply_under_context_lock,
generate_nonce, get_local_root_hash_for_context, handle_entity_push,
is_leaf_currently_authorized, LeafOutcome, MAX_ENTITIES_PER_PUSH,
};
use async_trait::async_trait;
use calimero_context_client::client::ContextClient;
use calimero_node_primitives::sync::{
compare_tree_nodes, create_runtime_env, EntityDeletion, InitPayload, LeafMetadata,
MessagePayload, StreamMessage, SyncProtocolExecutor, SyncTransport, TreeCompareResult,
TreeLeafData, TreeNode, TreeNodeResponse, MAX_LEAF_VALUE_SIZE, MAX_NODES_PER_RESPONSE,
};
use calimero_primitives::context::ContextId;
use calimero_primitives::crdt::CrdtType;
use calimero_primitives::hash::Hash;
use calimero_primitives::identity::PublicKey;
use calimero_storage::address::Id;
use calimero_storage::entities::StorageType;
use calimero_storage::env::with_runtime_env;
use calimero_storage::index::Index;
use calimero_storage::interface::Interface;
use calimero_storage::rotation_log::{self, RotationLog};
use calimero_storage::store::MainStorage;
use calimero_store::Store;
use eyre::{bail, Result};
use tracing::{debug, info, trace, warn};
const MAX_PENDING_NODES: usize = 10_000;
pub(crate) const OPAQUE_LEAF_CRDT_TYPE_NAME: &str = "Opaque";
pub const MAX_REQUEST_DEPTH: u8 = 16;
pub const MAX_HASH_COMPARISON_REQUESTS: u64 = 10_000;
#[derive(Debug, Clone)]
pub struct HashComparisonConfig {
pub remote_root_hash: [u8; 32],
pub context_client: Option<ContextClient>,
}
#[derive(Debug, Clone)]
pub struct HashComparisonFirstRequest {
pub node_id: [u8; 32],
pub max_depth: Option<u8>,
}
#[derive(Debug, Default, Clone)]
pub struct HashComparisonStats {
pub nodes_compared: u64,
pub entities_merged: u64,
pub entities_pushed: u64,
pub nodes_skipped: u64,
pub requests_sent: u64,
pub root_hash_verified: bool,
pub deferred_root_merges: Vec<([u8; 32], Vec<u8>, u64)>,
}
pub struct HashComparisonProtocol;
#[async_trait(?Send)]
impl SyncProtocolExecutor for HashComparisonProtocol {
type Config = HashComparisonConfig;
type ResponderInit = HashComparisonFirstRequest;
type Stats = HashComparisonStats;
async fn run_initiator<T: SyncTransport>(
transport: &mut T,
store: &Store,
context_id: ContextId,
identity: PublicKey,
config: Self::Config,
) -> Result<Self::Stats> {
run_initiator_impl(
transport,
store,
context_id,
identity,
config.remote_root_hash,
config.context_client.as_ref(),
)
.await
}
async fn run_responder<T: SyncTransport>(
transport: &mut T,
store: &Store,
context_id: ContextId,
identity: PublicKey,
first_request: Self::ResponderInit,
) -> Result<()> {
run_responder_impl(
transport,
store,
context_id,
identity,
first_request.node_id,
first_request.max_depth,
)
.await
}
}
async fn run_initiator_impl<T: SyncTransport>(
transport: &mut T,
store: &Store,
context_id: ContextId,
identity: PublicKey,
remote_root_hash: [u8; 32],
context_client: Option<&ContextClient>,
) -> Result<HashComparisonStats> {
info!(%context_id, "Starting HashComparison sync (initiator)");
let mut stats = HashComparisonStats::default();
let runtime_env = create_runtime_env(store, context_id, identity);
let schema_app_key = calimero_context::hlc_fence::loaded_reader_app_key(store, &context_id)
.ok()
.flatten();
let mut to_compare: Vec<([u8; 32], bool)> = vec![(remote_root_hash, true)];
let mut pending_local_leaf_pushes: Vec<TreeLeafData> = Vec::new();
let mut pending_deletions: Vec<EntityDeletion> = Vec::new();
while let Some((node_id, is_root_request)) = to_compare.pop() {
if to_compare.len() > MAX_PENDING_NODES {
bail!(
"HashComparison sync aborted: pending nodes ({}) exceeds limit ({})",
to_compare.len(),
MAX_PENDING_NODES
);
}
let request_msg = StreamMessage::Init {
context_id,
party_id: identity,
payload: InitPayload::TreeNodeRequest {
context_id,
node_id,
max_depth: Some(1),
},
next_nonce: generate_nonce(),
};
transport.send(&request_msg).await?;
stats.requests_sent += 1;
let response = transport
.recv()
.await?
.ok_or_else(|| eyre::eyre!("stream closed unexpectedly"))?;
let StreamMessage::Message { payload, .. } = response else {
bail!("Unexpected response type during HashComparison sync");
};
let (nodes, not_found) = match payload {
MessagePayload::TreeNodeResponse { nodes, not_found } => (nodes, not_found),
MessagePayload::SnapshotError { error } => {
warn!(%context_id, ?error, "Peer returned error");
bail!("Peer error: {:?}", error);
}
_ => bail!("Unexpected payload type"),
};
if nodes.len() > MAX_NODES_PER_RESPONSE {
warn!(%context_id, count = nodes.len(), "Response too large, skipping");
continue;
}
if not_found {
if is_root_request {
bail!(
"HashComparison sync aborted: peer reported root node not_found \
(peer's root advanced after handshake)"
);
}
debug!(%context_id, node_id = %hex::encode(node_id), "Node not found on peer");
continue;
}
for (node_idx, remote_node) in nodes.into_iter().enumerate() {
if node_idx != 0 && node_idx % 64 == 0 {
tokio::task::yield_now().await;
}
if !remote_node.is_valid() {
warn!(%context_id, "Invalid TreeNode, skipping");
continue;
}
stats.nodes_compared += 1;
if remote_node.is_leaf() {
if let Some(ref leaf_data) = remote_node.leaf_data {
trace!(
%context_id,
key = %hex::encode(leaf_data.key),
"Merging leaf entity"
);
if !is_leaf_currently_authorized(store, &context_id, leaf_data) {
warn!(
%context_id,
key = %hex::encode(leaf_data.key),
"HC merge skipped: claimed author is not currently authorized for this context"
);
continue;
}
let entity_id = calimero_storage::address::Id::new(leaf_data.key);
let is_opaque = matches!(
&leaf_data.metadata.crdt_type,
calimero_primitives::crdt::CrdtType::LwwRegister { inner_type }
if inner_type == OPAQUE_LEAF_CRDT_TYPE_NAME
);
if calimero_storage::collections::is_app_root_entry(entity_id) && !is_opaque {
stats.deferred_root_merges.push((
leaf_data.key,
leaf_data.value.clone(),
leaf_data.metadata.hlc_timestamp,
));
continue;
}
let loaded_app_key =
calimero_context::hlc_fence::loaded_reader_app_key(store, &context_id);
let outcome =
apply_under_context_lock(context_client, context_id, &runtime_env, || {
apply_hc_leaf_gated(store, context_id, leaf_data, loaded_app_key)
})
.await?;
match outcome {
HcLeafGateOutcome::Buffered => {
continue;
}
HcLeafGateOutcome::SkippedStoreError => {
continue;
}
HcLeafGateOutcome::Applied => {}
}
stats.entities_merged += 1;
let local_node = with_runtime_env(runtime_env.clone(), || {
get_local_tree_node(context_id, &remote_node.id, false, schema_app_key)
})?;
if let Some(local) = local_node {
if local.is_leaf() && local.hash != remote_node.hash {
if let Some(local_leaf) = local.leaf_data {
if local_leaf.value.len() > MAX_LEAF_VALUE_SIZE {
warn!(
%context_id,
key = %hex::encode(local_leaf.key),
len = local_leaf.value.len(),
max = MAX_LEAF_VALUE_SIZE,
"leaf value exceeds MAX_LEAF_VALUE_SIZE, \
skipping bidirectional push"
);
} else {
pending_local_leaf_pushes.push(local_leaf);
}
}
}
}
}
} else {
let is_this_node_root = is_root_request && remote_node.id == node_id;
if !remote_node.deleted_children.is_empty() {
let applied =
apply_under_context_lock(context_client, context_id, &runtime_env, || {
apply_remote_tombstones(&remote_node.deleted_children)
})
.await;
if applied > 0 {
debug!(
%context_id,
applied,
"applied remote deleted_children (clear convergence)"
);
}
}
let local_version = with_runtime_env(runtime_env.clone(), || {
get_local_tree_node(
context_id,
&remote_node.id,
is_this_node_root,
schema_app_key,
)
})?;
match compare_tree_nodes(local_version.as_ref(), Some(&remote_node)) {
TreeCompareResult::Equal => {
stats.nodes_skipped += 1;
trace!(%context_id, "Subtree matches, skipping");
}
TreeCompareResult::LocalMissing => {
for child_id in &remote_node.children {
to_compare.push((*child_id, false));
}
}
TreeCompareResult::Different {
remote_only_children,
local_only_children,
common_children,
} => {
for child_id in remote_only_children {
let tombstone = with_runtime_env(runtime_env.clone(), || {
let id = calimero_storage::address::Id::new(child_id);
match Index::<MainStorage>::get_index(id) {
Ok(Some(idx)) => idx
.deleted_at
.map(|deleted_at| (deleted_at, idx.metadata.clone())),
_ => None,
}
});
if let Some((deleted_at, metadata)) = tombstone {
pending_deletions.push(EntityDeletion {
id: child_id,
deleted_at,
metadata,
});
} else {
to_compare.push((child_id, false));
}
}
for child_id in common_children {
to_compare.push((child_id, false));
}
if !local_only_children.is_empty() {
let pushed = push_local_subtrees(
transport,
&runtime_env,
context_id,
identity,
&local_only_children,
&mut stats,
schema_app_key,
)
.await?;
debug!(
%context_id,
local_only = local_only_children.len(),
entities_pushed = pushed,
"Pushed local-only children to peer"
);
}
}
TreeCompareResult::RemoteMissing => {
if let Some(ref local_node) = local_version {
let leaves = with_runtime_env(runtime_env.clone(), || {
collect_local_leaves(
context_id,
&local_node.id,
is_this_node_root,
schema_app_key,
)
})?;
if !leaves.is_empty() {
push_entities(transport, context_id, identity, &leaves, &mut stats)
.await?;
}
}
}
}
}
}
tokio::task::yield_now().await;
}
if !pending_local_leaf_pushes.is_empty() {
let pushed = push_entities(
transport,
context_id,
identity,
&pending_local_leaf_pushes,
&mut stats,
)
.await?;
debug!(
%context_id,
divergent_leaves = pending_local_leaf_pushes.len(),
entities_pushed = pushed,
"Flushed bidirectional leaf reconciliation pushes"
);
}
if !pending_deletions.is_empty() {
let applied = push_deletions(
transport,
context_id,
identity,
&pending_deletions,
&mut stats,
)
.await?;
debug!(
%context_id,
tombstones = pending_deletions.len(),
applied,
"Flushed clear/tombstone deletion propagation"
);
}
if let Err(e) =
reconcile_rotation_logs_with_peer(transport, context_id, identity, &runtime_env).await
{
debug!(%context_id, error = %e, "rotation-log reconciliation skipped (best-effort)");
}
let peer_current_root = match query_peer_current_root(transport, context_id, identity).await {
Ok(Some(root)) => root,
Ok(None) | Err(_) => remote_root_hash,
};
transport.close().await?;
let local_root_hash = with_runtime_env(runtime_env.clone(), || {
get_local_root_hash_for_context(context_id)
})?;
stats.root_hash_verified = local_root_hash == peer_current_root;
info!(
%context_id,
nodes_compared = stats.nodes_compared,
entities_merged = stats.entities_merged,
entities_pushed = stats.entities_pushed,
nodes_skipped = stats.nodes_skipped,
root_hash_verified = stats.root_hash_verified,
"HashComparison sync complete"
);
if !stats.root_hash_verified {
warn!(
%context_id,
local_hash = %hex::encode(&local_root_hash[..8]),
peer_hash = %hex::encode(&peer_current_root[..8]),
nodes_compared = stats.nodes_compared,
entities_merged = stats.entities_merged,
entities_pushed = stats.entities_pushed,
nodes_skipped = stats.nodes_skipped,
"HashComparison sync did not converge with the peer's live root. \
Compared against the peer's post-sync root (re-read at session end), \
so a mismatch here means the two nodes are genuinely divergent — \
persistent occurrences across interval-sync ticks indicate a real \
merge convergence bug rather than benign handshake drift."
);
}
Ok(stats)
}
async fn query_peer_current_root<T: SyncTransport>(
transport: &mut T,
context_id: ContextId,
identity: PublicKey,
) -> Result<Option<[u8; 32]>> {
let request = StreamMessage::Init {
context_id,
party_id: identity,
payload: InitPayload::DagHeadsRequest { context_id },
next_nonce: generate_nonce(),
};
transport.send(&request).await?;
let Some(response) = transport.recv().await? else {
return Ok(None);
};
match response {
StreamMessage::Message {
payload: MessagePayload::DagHeadsResponse { root_hash, .. },
..
} => Ok(Some(*root_hash)),
_ => Ok(None),
}
}
const MAX_ROTATION_LOGS_PER_SYNC: usize = 1024;
pub(crate) fn collect_local_shared_rotation_logs(
context_id: ContextId,
) -> Vec<([u8; 32], Vec<u8>)> {
let mut out = Vec::new();
collect_shared_rotation_logs_recursive(Id::new(*context_id.as_ref()), &mut out, 0);
out
}
fn collect_shared_rotation_logs_recursive(
entity_id: Id,
out: &mut Vec<([u8; 32], Vec<u8>)>,
depth: u32,
) {
if depth >= MAX_COLLECT_DEPTH || out.len() >= MAX_ROTATION_LOGS_PER_SYNC {
if out.len() >= MAX_ROTATION_LOGS_PER_SYNC {
warn!(
count = out.len(),
"rotation-log sync: hit per-sync cap, truncating the set of \
exchanged Shared rotation logs"
);
}
return;
}
let Ok(Some(index)) = Index::<MainStorage>::get_index(entity_id) else {
return;
};
if matches!(index.metadata.storage_type, StorageType::Shared { .. }) {
if let Ok(Some(log)) = rotation_log::load::<MainStorage>(entity_id) {
if let Ok(bytes) = borsh::to_vec(&log) {
out.push((*entity_id.as_bytes(), bytes));
}
}
}
if let Some(children) = index.children() {
for child in children.iter() {
collect_shared_rotation_logs_recursive(Id::new(*child.id().as_bytes()), out, depth + 1);
}
}
}
pub(crate) fn union_received_rotation_logs(logs: &[([u8; 32], Vec<u8>)]) -> usize {
let mut applied = 0_usize;
for (entity_bytes, bytes) in logs {
let entity_id = Id::new(*entity_bytes);
let remote: RotationLog = match borsh::from_slice(bytes) {
Ok(log) => log,
Err(e) => {
debug!(%entity_id, error = %e, "rotation-log sync: undecodable remote log, skipping");
continue;
}
};
for entry in remote.entries {
match rotation_log::append::<MainStorage>(entity_id, entry) {
Ok(()) => applied += 1,
Err(e) => {
debug!(%entity_id, error = %e, "rotation-log sync: append skipped");
}
}
}
}
applied
}
pub(crate) async fn reconcile_rotation_logs_with_peer<T: SyncTransport>(
transport: &mut T,
context_id: ContextId,
identity: PublicKey,
runtime_env: &calimero_storage::env::RuntimeEnv,
) -> Result<()> {
let local_logs = with_runtime_env(runtime_env.clone(), || {
collect_local_shared_rotation_logs(context_id)
});
let request = StreamMessage::Init {
context_id,
party_id: identity,
payload: InitPayload::RotationLogSyncRequest {
context_id,
logs: local_logs,
},
next_nonce: generate_nonce(),
};
transport.send(&request).await?;
let Some(response) = transport.recv().await? else {
return Ok(());
};
if let StreamMessage::Message {
payload: MessagePayload::RotationLogSyncResponse { logs },
..
} = response
{
let applied = with_runtime_env(runtime_env.clone(), || union_received_rotation_logs(&logs));
if applied > 0 {
debug!(
%context_id,
applied,
"rotation-log reconciliation: unioned peer's Shared rotation logs"
);
}
}
Ok(())
}
async fn run_responder_impl<T: SyncTransport>(
transport: &mut T,
store: &Store,
context_id: ContextId,
identity: PublicKey,
first_node_id: [u8; 32],
first_max_depth: Option<u8>,
) -> Result<()> {
info!(%context_id, "Starting HashComparison sync (responder)");
if let Some(depth) = first_max_depth {
if depth > MAX_REQUEST_DEPTH {
bail!(
"First request max_depth {} exceeds maximum {}",
depth,
MAX_REQUEST_DEPTH
);
}
}
let runtime_env = create_runtime_env(store, context_id, identity);
let schema_app_key = calimero_context::hlc_fence::loaded_reader_app_key(store, &context_id)
.ok()
.flatten();
let local_root_hash = with_runtime_env(runtime_env.clone(), || {
Index::<MainStorage>::get_hashes_for(Id::new(*context_id.as_ref()))
.ok()
.flatten()
.map(|(full, _)| full)
.unwrap_or([0; 32])
});
let mut sequence_id = 0u64;
let mut requests_handled = 0u64;
{
let clamped_depth = first_max_depth.map(|d| d.min(MAX_REQUEST_DEPTH));
let is_root_request = first_node_id == local_root_hash;
let local_node = with_runtime_env(runtime_env.clone(), || {
get_local_tree_node(context_id, &first_node_id, is_root_request, schema_app_key)
})?;
let response = build_tree_node_response_internal(
context_id,
local_node,
clamped_depth,
&runtime_env,
schema_app_key,
)?;
let msg = StreamMessage::Message {
sequence_id,
payload: MessagePayload::TreeNodeResponse {
nodes: response.nodes,
not_found: response.not_found,
},
next_nonce: generate_nonce(),
};
transport.send(&msg).await?;
sequence_id += 1;
requests_handled += 1;
}
loop {
if requests_handled >= MAX_HASH_COMPARISON_REQUESTS {
warn!(
%context_id,
requests_handled,
max = MAX_HASH_COMPARISON_REQUESTS,
"Request limit reached, closing responder"
);
break;
}
let Some(request) = transport.recv().await? else {
debug!(%context_id, requests_handled, "Stream closed, responder done");
break;
};
let StreamMessage::Init { payload, .. } = request else {
debug!(%context_id, "Received non-Init message, ending responder");
break;
};
match payload {
InitPayload::TreeNodeRequest {
node_id, max_depth, ..
} => {
trace!(
%context_id,
node_id = %hex::encode(node_id),
?max_depth,
"Handling TreeNodeRequest"
);
let clamped_depth = max_depth.map(|d| d.min(MAX_REQUEST_DEPTH));
let is_root_request = node_id == local_root_hash;
let local_node = with_runtime_env(runtime_env.clone(), || {
get_local_tree_node(context_id, &node_id, is_root_request, schema_app_key)
})?;
let response = build_tree_node_response_internal(
context_id,
local_node,
clamped_depth,
&runtime_env,
schema_app_key,
)?;
let msg = StreamMessage::Message {
sequence_id,
payload: MessagePayload::TreeNodeResponse {
nodes: response.nodes,
not_found: response.not_found,
},
next_nonce: generate_nonce(),
};
transport.send(&msg).await?;
sequence_id += 1;
requests_handled += 1;
}
InitPayload::EntityPush { entities, .. } => {
let entity_count = entities.len();
trace!(%context_id, entity_count, "Handling EntityPush from initiator");
let outcome = handle_entity_push(store, &runtime_env, context_id, &entities);
let applied = outcome.applied;
if !outcome.deferred_root_merges.is_empty() {
warn!(
%context_id,
deferred = outcome.deferred_root_merges.len(),
"EntityPush responder: dropped root-entity deferred merges \
(protocol-trait responder lacks ContextClient — initiator-side \
dispatch will pick up root divergence on next sync round)"
);
}
let msg = StreamMessage::Message {
sequence_id,
payload: MessagePayload::EntityPushAck {
applied_count: applied,
},
next_nonce: generate_nonce(),
};
transport.send(&msg).await?;
sequence_id += 1;
requests_handled += 1;
info!(
%context_id,
applied,
deferred_root_merges = outcome.deferred_root_merges.len(),
total = entity_count,
"Applied pushed entities via CRDT merge"
);
}
InitPayload::EntityDeletePush { deletions, .. } => {
let total = deletions.len();
trace!(%context_id, total, "Handling EntityDeletePush from initiator");
let mut applied: u32 = 0;
for deletion in &deletions {
let action = calimero_storage::action::Action::DeleteRef {
id: calimero_storage::address::Id::new(deletion.id),
deleted_at: deletion.deleted_at,
metadata: deletion.metadata.clone(),
};
let result = with_runtime_env(runtime_env.clone(), || {
Interface::<MainStorage>::apply_action(
action,
&calimero_storage::interface::ApplyContext::empty(),
)
});
match result {
Ok(_) => applied += 1,
Err(e) => {
debug!(
%context_id,
id = %hex::encode(deletion.id),
error = %e,
"EntityDeletePush: skipped a tombstone (lost LWW or unauthorized)"
);
}
}
}
let msg = StreamMessage::Message {
sequence_id,
payload: MessagePayload::EntityDeletePushAck {
applied_count: applied,
},
next_nonce: generate_nonce(),
};
transport.send(&msg).await?;
sequence_id += 1;
requests_handled += 1;
info!(%context_id, applied, total, "Applied pushed tombstones (delete-wins)");
}
InitPayload::DagHeadsRequest { .. } => {
let current_root = with_runtime_env(runtime_env.clone(), || {
Index::<MainStorage>::get_hashes_for(Id::new(*context_id.as_ref()))
.ok()
.flatten()
.map(|(full, _)| full)
.unwrap_or([0; 32])
});
let msg = StreamMessage::Message {
sequence_id,
payload: MessagePayload::DagHeadsResponse {
dag_heads: Vec::new(),
root_hash: Hash::from(current_root),
},
next_nonce: generate_nonce(),
};
transport.send(&msg).await?;
sequence_id += 1;
requests_handled += 1;
}
InitPayload::RotationLogSyncRequest { logs, .. } => {
let applied =
with_runtime_env(runtime_env.clone(), || union_received_rotation_logs(&logs));
let local_logs = with_runtime_env(runtime_env.clone(), || {
collect_local_shared_rotation_logs(context_id)
});
let msg = StreamMessage::Message {
sequence_id,
payload: MessagePayload::RotationLogSyncResponse { logs: local_logs },
next_nonce: generate_nonce(),
};
transport.send(&msg).await?;
sequence_id += 1;
requests_handled += 1;
if applied > 0 {
info!(%context_id, applied, "rotation-log sync: unioned initiator's Shared rotation logs");
}
}
_ => {
debug!(%context_id, "Received unknown payload, ending responder");
break;
}
}
}
info!(%context_id, requests_handled, "HashComparison responder complete");
Ok(())
}
fn build_tree_node_response_internal(
context_id: ContextId,
local_node: Option<TreeNode>,
clamped_depth: Option<u8>,
runtime_env: &calimero_storage::env::RuntimeEnv,
schema_app_key: Option<[u8; 32]>,
) -> Result<TreeNodeResponse> {
let response = if let Some(node) = local_node {
let mut nodes = vec![node.clone()];
let depth = clamped_depth.unwrap_or(0);
if depth > 0 && node.is_internal() {
for child_id in &node.children {
if let Some(child) = with_runtime_env(runtime_env.clone(), || {
get_local_tree_node(context_id, child_id, false, schema_app_key)
})? {
nodes.push(child);
if nodes.len() >= MAX_NODES_PER_RESPONSE {
break;
}
}
}
}
TreeNodeResponse::new(nodes)
} else {
TreeNodeResponse::not_found()
};
Ok(response)
}
const MAX_COLLECT_DEPTH: u32 = 64;
const MAX_LEAVES_PER_SUBTREE: usize = 10_000;
fn collect_local_leaves(
context_id: ContextId,
node_id: &[u8; 32],
is_root: bool,
schema_app_key: Option<[u8; 32]>,
) -> Result<Vec<TreeLeafData>> {
let mut leaves = Vec::new();
collect_leaves_recursive(context_id, node_id, is_root, &mut leaves, 0, schema_app_key)?;
Ok(leaves)
}
fn collect_leaves_recursive(
context_id: ContextId,
node_id: &[u8; 32],
is_root: bool,
leaves: &mut Vec<TreeLeafData>,
depth: u32,
schema_app_key: Option<[u8; 32]>,
) -> Result<()> {
if depth >= MAX_COLLECT_DEPTH {
warn!(
depth,
node_id = %hex::encode(node_id),
"collect_leaves_recursive: max depth reached, truncating"
);
return Ok(());
}
if leaves.len() > MAX_LEAVES_PER_SUBTREE {
return Ok(());
}
let entity_id = if is_root {
Id::new(*context_id.as_ref())
} else {
Id::new(*node_id)
};
let index = match Index::<MainStorage>::get_index(entity_id) {
Ok(Some(idx)) => idx,
Ok(None) => return Ok(()),
Err(e) => {
warn!(
%entity_id,
error = %e,
"collect_leaves_recursive: failed to read index, skipping subtree"
);
return Ok(());
}
};
let children_ids: Vec<[u8; 32]> = index
.children()
.map(|children| children.iter().map(|c| *c.id().as_bytes()).collect())
.unwrap_or_default();
if children_ids.is_empty() {
if let Some(entry_data) = Interface::<MainStorage>::find_by_id_raw(entity_id) {
let crdt_type = index.metadata.crdt_type.clone().unwrap_or_else(|| {
trace!(%entity_id, "opaque leaf, synthesised LWW wire type for push");
CrdtType::lww_register(OPAQUE_LEAF_CRDT_TYPE_NAME)
});
let mut metadata = LeafMetadata::new(crdt_type, index.metadata.updated_at(), [0u8; 32])
.with_created_at(index.metadata.created_at());
if let Some(parent_id) = index.parent_id() {
metadata = metadata.with_parent(*parent_id.as_bytes());
}
if let Ok(ancestors) = Index::<MainStorage>::get_ancestors_of(entity_id) {
metadata = metadata.with_ancestors(ancestors);
}
if let Some(auth) = crate::sync::helpers::wire_authorization_for(&index.metadata) {
metadata = metadata.with_authorization(auth);
}
if let Some(schema) = schema_app_key {
metadata = metadata.with_schema_app_key(schema);
}
let leaf_data = TreeLeafData::new(*entity_id.as_bytes(), entry_data, metadata);
if leaf_data.value.len() > MAX_LEAF_VALUE_SIZE {
warn!(
%entity_id,
len = leaf_data.value.len(),
"leaf value exceeds MAX_LEAF_VALUE_SIZE, skipping push"
);
} else {
leaves.push(leaf_data);
}
}
} else {
for child_id in &children_ids {
collect_leaves_recursive(
context_id,
child_id,
false,
leaves,
depth + 1,
schema_app_key,
)?;
}
}
Ok(())
}
async fn push_local_subtrees<T: SyncTransport>(
transport: &mut T,
runtime_env: &calimero_storage::env::RuntimeEnv,
context_id: ContextId,
identity: PublicKey,
local_only_children: &[[u8; 32]],
stats: &mut HashComparisonStats,
schema_app_key: Option<[u8; 32]>,
) -> Result<u64> {
let mut total = 0u64;
for child_id in local_only_children {
let leaves = with_runtime_env(runtime_env.clone(), || {
collect_local_leaves(context_id, child_id, false, schema_app_key)
})?;
if !leaves.is_empty() {
total += push_entities(transport, context_id, identity, &leaves, stats).await?;
}
}
Ok(total)
}
async fn push_entities<T: SyncTransport>(
transport: &mut T,
context_id: ContextId,
identity: PublicKey,
leaves: &[TreeLeafData],
stats: &mut HashComparisonStats,
) -> Result<u64> {
let mut total_pushed = 0u64;
for chunk in leaves.chunks(MAX_ENTITIES_PER_PUSH) {
let push_msg = StreamMessage::Init {
context_id,
party_id: identity,
payload: InitPayload::EntityPush {
context_id,
entities: chunk.to_vec(),
},
next_nonce: generate_nonce(),
};
transport.send(&push_msg).await?;
stats.requests_sent += 1;
let ack = transport
.recv()
.await?
.ok_or_else(|| eyre::eyre!("stream closed while waiting for EntityPushAck"))?;
match ack {
StreamMessage::Message {
payload: MessagePayload::EntityPushAck { applied_count },
..
} => {
total_pushed += u64::from(applied_count);
}
_ => {
bail!(
"Unexpected response to EntityPush (peer may not support bidirectional sync)"
);
}
}
}
stats.entities_pushed += total_pushed;
Ok(total_pushed)
}
async fn push_deletions<T: SyncTransport>(
transport: &mut T,
context_id: ContextId,
identity: PublicKey,
deletions: &[EntityDeletion],
stats: &mut HashComparisonStats,
) -> Result<u64> {
let mut total_applied = 0u64;
for chunk in deletions.chunks(MAX_ENTITIES_PER_PUSH) {
let push_msg = StreamMessage::Init {
context_id,
party_id: identity,
payload: InitPayload::EntityDeletePush {
context_id,
deletions: chunk.to_vec(),
},
next_nonce: generate_nonce(),
};
transport.send(&push_msg).await?;
stats.requests_sent += 1;
let ack = transport
.recv()
.await?
.ok_or_else(|| eyre::eyre!("stream closed while waiting for EntityDeletePushAck"))?;
match ack {
StreamMessage::Message {
payload: MessagePayload::EntityDeletePushAck { applied_count },
..
} => {
total_applied += u64::from(applied_count);
}
_ => {
bail!(
"Unexpected response to EntityDeletePush (peer may not support tombstone propagation)"
);
}
}
}
Ok(total_applied)
}
fn get_local_tree_node(
context_id: ContextId,
node_id: &[u8; 32],
is_root_request: bool,
schema_app_key: Option<[u8; 32]>,
) -> Result<Option<TreeNode>> {
let entity_id = if is_root_request {
Id::new(*context_id.as_ref())
} else {
Id::new(*node_id)
};
let index = match Index::<MainStorage>::get_index(entity_id) {
Ok(Some(idx)) => idx,
Ok(None) => return Ok(None),
Err(e) => {
warn!(%context_id, %entity_id, error = %e, "Failed to get index");
return Ok(None);
}
};
let full_hash = index.full_hash();
let children_ids: Vec<[u8; 32]> = index
.children()
.map(|children| children.iter().map(|c| *c.id().as_bytes()).collect())
.unwrap_or_default();
let deleted_children = collect_deleted_children_wire(&index);
if !children_ids.is_empty() || !deleted_children.is_empty() {
let mut node = TreeNode::internal(*entity_id.as_bytes(), full_hash, children_ids);
node.deleted_children = deleted_children;
return Ok(Some(node));
}
if let Some(entry_data) = Interface::<MainStorage>::find_by_id_raw(entity_id) {
let crdt_type = index.metadata.crdt_type.clone().unwrap_or_else(|| {
trace!(%entity_id, "opaque leaf, synthesised LWW wire type for sync");
CrdtType::lww_register(OPAQUE_LEAF_CRDT_TYPE_NAME)
});
let mut metadata = LeafMetadata::new(crdt_type, index.metadata.updated_at(), [0u8; 32])
.with_created_at(index.metadata.created_at());
if let Some(parent_id) = index.parent_id() {
metadata = metadata.with_parent(*parent_id.as_bytes());
}
if let Ok(ancestors) = Index::<MainStorage>::get_ancestors_of(entity_id) {
metadata = metadata.with_ancestors(ancestors);
}
if let Some(auth) = crate::sync::helpers::wire_authorization_for(&index.metadata) {
metadata = metadata.with_authorization(auth);
}
if let Some(schema) = schema_app_key {
metadata = metadata.with_schema_app_key(schema);
}
let leaf_data = TreeLeafData::new(*entity_id.as_bytes(), entry_data, metadata);
Ok(Some(TreeNode::leaf(
*entity_id.as_bytes(),
full_hash,
leaf_data,
)))
} else {
Ok(Some(TreeNode::internal(
*entity_id.as_bytes(),
full_hash,
vec![],
)))
}
}
pub(crate) fn apply_remote_tombstones(deletions: &[EntityDeletion]) -> u64 {
let mut applied = 0u64;
for deletion in deletions {
let action = calimero_storage::action::Action::DeleteRef {
id: Id::new(deletion.id),
deleted_at: deletion.deleted_at,
metadata: deletion.metadata.clone(),
};
if Interface::<MainStorage>::apply_action(
action,
&calimero_storage::interface::ApplyContext::empty(),
)
.is_ok()
{
applied += 1;
}
}
applied
}
pub(crate) fn collect_deleted_children_wire(
index: &calimero_storage::index::EntityIndex,
) -> Vec<EntityDeletion> {
index
.deleted_children()
.iter()
.filter_map(|child_id| {
let cidx = Index::<MainStorage>::get_index(*child_id).ok().flatten()?;
let deleted_at = cidx.deleted_at?;
Some(EntityDeletion {
id: *child_id.as_bytes(),
deleted_at,
metadata: cidx.metadata.clone(),
})
})
.collect()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum HcLeafGateOutcome {
Applied,
Buffered,
SkippedStoreError,
}
fn apply_hc_leaf_gated(
store: &Store,
context_id: ContextId,
leaf: &TreeLeafData,
loaded_app_key: Result<Option<[u8; 32]>>,
) -> Result<HcLeafGateOutcome> {
let loaded_app_key = match loaded_app_key {
Ok(key) => key,
Err(e) => {
warn!(
%context_id,
error = %e,
key = %hex::encode(leaf.key),
"HC merge: could not resolve loaded reader schema (store error); \
skipping leaf fail-closed — it will be re-pushed next sync"
);
return Ok(HcLeafGateOutcome::SkippedStoreError);
}
};
match loaded_app_key {
Some(loaded) => Ok(
match apply_leaf_with_crdt_merge_gated(store, context_id, leaf, loaded)? {
LeafOutcome::Applied => HcLeafGateOutcome::Applied,
LeafOutcome::Buffered => HcLeafGateOutcome::Buffered,
},
),
None => {
apply_leaf_with_crdt_merge(context_id, leaf)?;
Ok(HcLeafGateOutcome::Applied)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_creation() {
let config = HashComparisonConfig {
remote_root_hash: [1u8; 32],
context_client: None,
};
assert_eq!(config.remote_root_hash, [1u8; 32]);
}
#[test]
fn test_stats_default() {
let stats = HashComparisonStats::default();
assert_eq!(stats.nodes_compared, 0);
assert_eq!(stats.entities_merged, 0);
}
#[test]
fn get_local_tree_node_returns_leaf_for_no_crdt_entity() {
use std::sync::Arc;
use calimero_storage::action::Action;
use calimero_storage::entities::{ChildInfo, Metadata};
use calimero_storage::interface::ApplyContext;
use calimero_store::db::InMemoryDB;
use calimero_store::Store;
let context_id = ContextId::from([0xCA; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = create_runtime_env(&store, context_id, identity);
let root_id = Id::new(*context_id.as_ref());
let opaque_id = Id::new([118u8; 32]);
with_runtime_env(runtime_env.clone(), || {
Interface::<MainStorage>::apply_action(
Action::Update {
id: root_id,
data: vec![],
ancestors: vec![],
metadata: Metadata::default(),
},
&ApplyContext::empty(),
)
.expect("create root");
let root_hash = Index::<MainStorage>::get_hashes_for(root_id)
.ok()
.flatten()
.map(|(full, _)| full)
.unwrap_or([0; 32]);
let root_meta = Index::<MainStorage>::get_index(root_id)
.ok()
.flatten()
.map(|idx| idx.metadata.clone())
.unwrap_or_default();
Interface::<MainStorage>::apply_action(
Action::Add {
id: opaque_id,
data: b"app-root-state".to_vec(),
ancestors: vec![ChildInfo::new(root_id, root_hash, root_meta)],
metadata: Metadata::new(100, 100),
},
&ApplyContext::empty(),
)
.expect("add opaque leaf");
assert!(
Index::<MainStorage>::get_index(opaque_id)
.unwrap()
.unwrap()
.metadata
.crdt_type
.is_none(),
"seeded entity must have crdt_type == None"
);
let node = get_local_tree_node(context_id, opaque_id.as_bytes(), false, None)
.expect("get_local_tree_node should not error")
.expect("node should exist");
assert!(node.is_leaf(), "opaque entity must be emitted as a leaf");
assert!(
!node.is_internal(),
"opaque entity must not be an internal node"
);
assert!(
node.is_valid(),
"opaque leaf node must be structurally valid"
);
let leaf_data = node.leaf_data.as_ref().expect("leaf must carry leaf_data");
assert!(
matches!(leaf_data.metadata.crdt_type, CrdtType::LwwRegister { .. }),
"opaque leaf must carry a synthetic LwwRegister wire type, got {:?}",
leaf_data.metadata.crdt_type
);
assert_eq!(leaf_data.value, b"app-root-state");
});
}
#[test]
fn apply_leaf_skips_existing_frozen_entry() {
use std::sync::Arc;
use calimero_node_primitives::sync::hash_comparison::{LeafMetadata, TreeLeafData};
use calimero_primitives::crdt::CrdtType;
use calimero_storage::action::Action;
use calimero_storage::entities::{ChildInfo, Metadata, StorageType};
use calimero_storage::interface::ApplyContext;
use calimero_store::db::InMemoryDB;
use calimero_store::Store;
use sha2::{Digest, Sha256};
use crate::sync::helpers::apply_leaf_with_crdt_merge;
let context_id = ContextId::from([0xCA; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = create_runtime_env(&store, context_id, identity);
let root_id = Id::new(*context_id.as_ref());
let frozen_id = Id::new([0x42u8; 32]);
let value = b"immutable-frozen-payload".to_vec();
let key_hash: [u8; 32] = Sha256::digest(&value).into();
let mut blob = Vec::new();
blob.extend_from_slice(&key_hash);
blob.extend_from_slice(&value);
blob.extend_from_slice(frozen_id.as_bytes());
with_runtime_env(runtime_env.clone(), || {
Interface::<MainStorage>::apply_action(
Action::Update {
id: root_id,
data: vec![],
ancestors: vec![],
metadata: Metadata::default(),
},
&ApplyContext::empty(),
)
.expect("create root");
let root_hash = Index::<MainStorage>::get_hashes_for(root_id)
.ok()
.flatten()
.map(|(full, _)| full)
.unwrap_or([0; 32]);
let root_meta = Index::<MainStorage>::get_index(root_id)
.ok()
.flatten()
.map(|idx| idx.metadata.clone())
.unwrap_or_default();
let mut frozen_meta = Metadata::new(100, 100);
frozen_meta.storage_type = StorageType::Frozen;
frozen_meta.crdt_type = Some(CrdtType::FrozenStorage);
Interface::<MainStorage>::apply_action(
Action::Add {
id: frozen_id,
data: blob.clone(),
ancestors: vec![ChildInfo::new(root_id, root_hash, root_meta)],
metadata: frozen_meta,
},
&ApplyContext::empty(),
)
.expect("seed frozen entry");
let leaf = TreeLeafData::new(
*frozen_id.as_bytes(),
blob.clone(),
LeafMetadata::new(CrdtType::FrozenStorage, 100, *root_id.as_bytes()),
);
apply_leaf_with_crdt_merge(context_id, &leaf)
.expect("re-applying an existing frozen leaf must be a no-op, not a fatal Update");
assert!(
Index::<MainStorage>::get_index(frozen_id)
.unwrap()
.is_some(),
"frozen entry must remain present after the skipped re-apply"
);
});
}
#[test]
fn apply_leaf_new_frozen_entry_lands_as_frozen_not_public() {
use std::sync::Arc;
use calimero_node_primitives::sync::hash_comparison::{LeafMetadata, TreeLeafData};
use calimero_primitives::crdt::CrdtType;
use calimero_storage::action::Action;
use calimero_storage::entities::{Metadata, StorageType};
use calimero_storage::interface::ApplyContext;
use calimero_store::db::InMemoryDB;
use calimero_store::Store;
use sha2::{Digest, Sha256};
use crate::sync::helpers::apply_leaf_with_crdt_merge;
let context_id = ContextId::from([0xCC; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = create_runtime_env(&store, context_id, identity);
let root_id = Id::new(*context_id.as_ref());
let frozen_id = Id::new([0x77u8; 32]);
let value = b"freshly-pushed-frozen".to_vec();
let key_hash: [u8; 32] = Sha256::digest(&value).into();
let mut blob = Vec::new();
blob.extend_from_slice(&key_hash);
blob.extend_from_slice(&value);
blob.extend_from_slice(frozen_id.as_bytes());
with_runtime_env(runtime_env.clone(), || {
Interface::<MainStorage>::apply_action(
Action::Update {
id: root_id,
data: vec![],
ancestors: vec![],
metadata: Metadata::default(),
},
&ApplyContext::empty(),
)
.expect("create root");
let leaf = TreeLeafData::new(
*frozen_id.as_bytes(),
blob.clone(),
LeafMetadata::new(CrdtType::FrozenStorage, 100, *root_id.as_bytes())
.with_parent(*root_id.as_bytes()),
);
apply_leaf_with_crdt_merge(context_id, &leaf).expect("apply new frozen leaf");
let md = Index::<MainStorage>::get_index(frozen_id)
.unwrap()
.expect("frozen entity should have been created")
.metadata;
assert!(
matches!(md.storage_type, StorageType::Frozen),
"new frozen leaf must land as Frozen, not {:?} — else a later Frozen \
delta is rejected with Cannot change StorageType",
md.storage_type
);
});
}
#[test]
fn hashcomparison_pull_does_not_resurrect_cleared_entry() {
use std::sync::Arc;
use calimero_node_primitives::sync::hash_comparison::{LeafMetadata, TreeLeafData};
use calimero_primitives::crdt::CrdtType;
use calimero_storage::action::Action;
use calimero_storage::entities::{ChildInfo, Metadata};
use calimero_storage::interface::ApplyContext;
use calimero_store::db::InMemoryDB;
use calimero_store::Store;
use crate::sync::helpers::apply_leaf_with_crdt_merge;
let context_id = ContextId::from([0xCB; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = create_runtime_env(&store, context_id, identity);
let root_id = Id::new(*context_id.as_ref());
let entry_id = Id::new([0x77u8; 32]);
let child_ids = |parent: Id| -> Vec<Id> {
Index::<MainStorage>::get_children_of(parent)
.unwrap_or_default()
.iter()
.map(ChildInfo::id)
.collect()
};
with_runtime_env(runtime_env.clone(), || {
Interface::<MainStorage>::apply_action(
Action::Update {
id: root_id,
data: vec![],
ancestors: vec![],
metadata: Metadata::default(),
},
&ApplyContext::empty(),
)
.expect("create root");
let root_hash = Index::<MainStorage>::get_hashes_for(root_id)
.ok()
.flatten()
.map(|(full, _)| full)
.unwrap_or([0; 32]);
let root_meta = Index::<MainStorage>::get_index(root_id)
.ok()
.flatten()
.map(|idx| idx.metadata.clone())
.unwrap_or_default();
let mut md = Metadata::new(100, 100);
md.crdt_type = Some(CrdtType::LwwRegister {
inner_type: "String".to_owned(),
});
Interface::<MainStorage>::apply_action(
Action::Add {
id: entry_id,
data: b"peer-value".to_vec(),
ancestors: vec![ChildInfo::new(root_id, root_hash, root_meta)],
metadata: md,
},
&ApplyContext::empty(),
)
.expect("seed entry");
assert!(
child_ids(root_id).contains(&entry_id),
"entry should be seeded under root"
);
Interface::<MainStorage>::remove_child_from(root_id, entry_id).expect("clear entry");
assert!(
Index::<MainStorage>::is_deleted(entry_id).unwrap(),
"entry must be tombstoned after clear"
);
assert!(
!child_ids(root_id).contains(&entry_id),
"cleared entry must leave root's children"
);
let leaf = TreeLeafData::new(
*entry_id.as_bytes(),
b"peer-value".to_vec(),
LeafMetadata::new(
CrdtType::LwwRegister {
inner_type: "String".to_owned(),
},
100,
*root_id.as_bytes(),
),
);
apply_leaf_with_crdt_merge(context_id, &leaf).expect("apply pulled leaf");
assert!(
Index::<MainStorage>::is_deleted(entry_id).unwrap(),
"HashComparison pull resurrected a cleared entry (tombstone lost) — \
delete-wins violated, so the deletion can never converge"
);
assert!(
!child_ids(root_id).contains(&entry_id),
"HashComparison pull re-added the cleared entry to root's children — \
root hash diverges from a peer that applied the delete"
);
});
}
use calimero_node_primitives::sync::hash_comparison::{LeafMetadata, TreeLeafData};
use calimero_primitives::crdt::CrdtType;
use calimero_store::db::InMemoryDB;
use calimero_store::Store;
use std::sync::Arc;
fn hc_opaque_leaf(key: [u8; 32], schema: Option<[u8; 32]>) -> TreeLeafData {
let mut md = LeafMetadata::new(CrdtType::lww_register("test"), 100, [0u8; 32]);
if let Some(k) = schema {
md = md.with_schema_app_key(k);
}
TreeLeafData::new(key, b"v2-bytes".to_vec(), md)
}
#[test]
fn hc_store_error_resolving_gate_skips_leaf_not_applies() {
let context_id = ContextId::from([0xCD; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = create_runtime_env(&store, context_id, identity);
let leaf_key = [0x45u8; 32];
let leaf = hc_opaque_leaf(leaf_key, None);
let outcome = with_runtime_env(runtime_env.clone(), || {
apply_hc_leaf_gated(
&store,
context_id,
&leaf,
Err(eyre::eyre!("simulated transient store error")),
)
})
.expect("gate must not propagate the store error");
assert_eq!(
outcome,
HcLeafGateOutcome::SkippedStoreError,
"fail-closed: a store error must skip the leaf, not apply it"
);
let stored = with_runtime_env(runtime_env.clone(), || {
Index::<MainStorage>::get_index(Id::new(leaf_key))
.ok()
.flatten()
});
assert!(
stored.is_none(),
"store error must NOT result in an ungated apply/store"
);
}
#[test]
fn hc_no_gate_ok_none_still_applies_leaf() {
let context_id = ContextId::from([0xCE; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = create_runtime_env(&store, context_id, identity);
let leaf_key = [0x46u8; 32];
let leaf = hc_opaque_leaf(leaf_key, None);
let outcome = with_runtime_env(runtime_env.clone(), || {
apply_hc_leaf_gated(&store, context_id, &leaf, Ok(None))
})
.expect("gate must apply the leaf");
assert_eq!(
outcome,
HcLeafGateOutcome::Applied,
"Ok(None) legitimate-no-gate case must apply the leaf"
);
let stored = with_runtime_env(runtime_env.clone(), || {
Index::<MainStorage>::get_index(Id::new(leaf_key))
.ok()
.flatten()
});
assert!(
stored.is_some(),
"Ok(None) no-gate case must store the leaf"
);
}
#[test]
fn rotation_log_reconciliation_converges_divergent_shared_logs() {
use core::num::NonZeroU128;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use calimero_storage::action::Action;
use calimero_storage::entities::{ChildInfo, Metadata};
use calimero_storage::interface::ApplyContext;
use calimero_storage::logical_clock::{HybridTimestamp, Timestamp, ID, NTP64};
use calimero_storage::tests::common::{build_signed_shared_action, pubkey_of};
use calimero_store::db::InMemoryDB;
use calimero_store::Store;
use ed25519_dalek::SigningKey;
fn hlc(ns: u64) -> HybridTimestamp {
HybridTimestamp::new(Timestamp::new(
NTP64(ns),
ID::from(NonZeroU128::new(1).unwrap()),
))
}
let context_id = ContextId::from([0xC7; 32]);
let identity = PublicKey::from([0u8; 32]);
let anchor_id = Id::new([0x77; 32]);
let alice_sk = SigningKey::from_bytes(&[0xA1; 32]);
let alice = pubkey_of(&alice_sk);
let bob = pubkey_of(&SigningKey::from_bytes(&[0xB2; 32]));
let carol = pubkey_of(&SigningKey::from_bytes(&[0xC3; 32]));
let genesis: BTreeSet<PublicKey> = [alice, bob].into_iter().collect();
let seed = |with_rotation: bool| -> Store {
let store = Store::new(Arc::new(InMemoryDB::owned()));
let env = create_runtime_env(&store, context_id, identity);
with_runtime_env(env, || {
let root_id = Id::new(*context_id.as_ref());
Interface::<MainStorage>::apply_action(
Action::Update {
id: root_id,
data: vec![],
ancestors: vec![],
metadata: Metadata::default(),
},
&ApplyContext::empty(),
)
.expect("create root");
let root_hash = Index::<MainStorage>::get_hashes_for(root_id)
.ok()
.flatten()
.map(|(full, _)| full)
.unwrap_or([0; 32]);
let root_meta = Index::<MainStorage>::get_index(root_id)
.ok()
.flatten()
.map(|idx| idx.metadata.clone())
.unwrap_or_default();
let bootstrap = build_signed_shared_action(
true,
anchor_id,
b"v0".to_vec(),
genesis.clone(),
10,
&alice_sk,
vec![ChildInfo::new(root_id, root_hash, root_meta)],
);
Interface::<MainStorage>::apply_action(
bootstrap,
&ApplyContext {
effective_writers: Some(calimero_storage::entities::full_mask(
genesis.clone(),
)),
delta_id: Some([0xE0; 32]),
delta_hlc: Some(hlc(10)),
},
)
.expect("bootstrap shared anchor");
if with_rotation {
let rotation = build_signed_shared_action(
false,
anchor_id,
b"v0".to_vec(),
[alice, carol].into_iter().collect(),
30,
&alice_sk,
vec![],
);
Interface::<MainStorage>::apply_action(
rotation,
&ApplyContext {
effective_writers: Some(calimero_storage::entities::full_mask(
genesis.clone(),
)),
delta_id: Some([0xE1; 32]),
delta_hlc: Some(hlc(30)),
},
)
.expect("rotation to {Alice, Carol}");
}
});
store
};
let full = seed(true);
let hc = seed(false);
let full_env = create_runtime_env(&full, context_id, identity);
let hc_env = create_runtime_env(&hc, context_id, identity);
let resolve = |env: &calimero_storage::env::RuntimeEnv| -> BTreeMap<PublicKey, calimero_storage::entities::OpMask> {
with_runtime_env(env.clone(), || {
rotation_log::resolve_local(
&rotation_log::load::<MainStorage>(anchor_id)
.unwrap()
.unwrap(),
)
.unwrap()
})
};
let hc_before = resolve(&hc_env);
assert!(
!hc_before.contains_key(&carol),
"precondition: HC node lacks Carol; got {hc_before:?}"
);
let collected = with_runtime_env(full_env.clone(), || {
collect_local_shared_rotation_logs(context_id)
});
assert!(
collected.iter().any(|(id, _)| *id == *anchor_id.as_bytes()),
"collect_local_shared_rotation_logs must find the Shared anchor"
);
let applied = with_runtime_env(hc_env.clone(), || union_received_rotation_logs(&collected));
assert!(applied > 0, "union must append the missing rotation entry");
let hc_after = resolve(&hc_env);
let full_writers = resolve(&full_env);
assert_eq!(
hc_after, full_writers,
"after the union both nodes resolve the same writer set"
);
assert!(
hc_after.contains_key(&carol),
"HC node now recognises Carol as a writer; got {hc_after:?}"
);
}
}