use calimero_context_client::client::ContextClient;
use calimero_node_primitives::sync::{EntityDeletion, TreeLeafData};
use calimero_primitives::application::ApplicationId;
use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use calimero_storage::address::Id;
use calimero_storage::entities::{ChildInfo, Metadata, StorageType};
use calimero_storage::index::Index;
use calimero_storage::interface::{Action, ApplyContext, Interface};
use calimero_storage::store::MainStorage;
use calimero_store::Store;
use eyre::{bail, Result};
use rand::Rng;
pub fn get_local_root_hash_for_context(context_id: ContextId) -> Result<[u8; 32]> {
let root_id = Id::new(*context_id.as_ref());
match Index::<MainStorage>::get_hashes_for(root_id) {
Ok(Some((full_hash, _))) => Ok(full_hash),
Ok(None) => Ok([0u8; 32]),
Err(e) => {
tracing::warn!(%context_id, error = %e, "Failed to get root hash");
Ok([0u8; 32])
}
}
}
#[allow(dead_code, reason = "utility function for application validation")]
pub fn validate_application_id(ours: &ApplicationId, theirs: &ApplicationId) -> eyre::Result<()> {
if ours != theirs {
bail!("application mismatch: expected {}, got {}", ours, theirs);
}
Ok(())
}
#[must_use]
pub fn generate_nonce() -> calimero_crypto::Nonce {
rand::thread_rng().gen()
}
pub fn wire_authorization_for(
metadata: &Metadata,
) -> Option<calimero_storage::entities::StorageType> {
match &metadata.storage_type {
StorageType::Public | StorageType::Frozen => None,
StorageType::Shared { .. }
| StorageType::User { .. }
| StorageType::SharedMember { .. } => Some(metadata.storage_type.clone()),
}
}
fn extract_author_from_leaf_authorization(
authorization: Option<&StorageType>,
) -> Option<PublicKey> {
match authorization? {
StorageType::User { owner, .. } => Some(*owner),
StorageType::Shared { signature_data, .. }
| StorageType::SharedMember { signature_data, .. } => {
signature_data.as_ref().and_then(|sd| sd.signer)
}
StorageType::Public | StorageType::Frozen => None,
}
}
pub fn is_leaf_currently_authorized(
store: &Store,
context_id: &ContextId,
leaf: &TreeLeafData,
) -> bool {
let Some(author) = extract_author_from_leaf_authorization(leaf.metadata.authorization.as_ref())
else {
return true;
};
match calimero_context::group_store::is_currently_authorized_for_context(
store, context_id, &author,
) {
Ok(true) => true,
Ok(false) => {
crate::node_metrics::record_hc_leaf_drop("unauthorized");
false
}
Err(err) => {
tracing::error!(
%context_id,
%author,
error = %err,
"is_leaf_currently_authorized: membership lookup failed; dropping entity to avoid silent bypass"
);
crate::node_metrics::record_hc_leaf_drop("lookup_error");
false
}
}
}
fn is_opaque_crdt_type(crdt_type: &calimero_primitives::crdt::CrdtType) -> bool {
use calimero_primitives::crdt::CrdtType;
matches!(crdt_type, CrdtType::LwwRegister { inner_type }
if inner_type == crate::sync::hash_comparison_protocol::OPAQUE_LEAF_CRDT_TYPE_NAME)
}
fn empty_chain_placement_is_safe(parent_is_root: bool, parent_present_locally: bool) -> bool {
parent_is_root || parent_present_locally
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LeafOutcome {
Applied,
Buffered,
}
pub fn apply_leaf_with_crdt_merge_gated(
store: &Store,
context_id: ContextId,
leaf: &TreeLeafData,
loaded_app_key: [u8; 32],
) -> Result<LeafOutcome> {
if let Some(schema) = leaf.metadata.schema_app_key {
if schema != loaded_app_key {
let leaf_bytes = borsh::to_vec(leaf)?;
let record = calimero_context::group_store::AbsorbRecord::from_leaf(
leaf.key, leaf_bytes, schema,
);
calimero_context::group_store::AbsorbRepository::new(store).save(
&context_id,
schema,
&record,
)?;
crate::node_metrics::record_delta_outcome("absorbed_leaf_future_schema");
tracing::warn!(
%context_id,
key = %hex::encode(leaf.key),
?schema,
?loaded_app_key,
"sync-repair leaf authored under a newer schema than the loaded \
reader — buffered into the absorb buffer instead of storing \
unreadable bytes (will replay once the reader advances)"
);
return Ok(LeafOutcome::Buffered);
}
}
apply_leaf_with_crdt_merge(context_id, leaf)?;
Ok(LeafOutcome::Applied)
}
pub fn apply_leaf_with_crdt_merge(context_id: ContextId, leaf: &TreeLeafData) -> Result<()> {
let entity_id = Id::new(leaf.key);
let root_id = Id::new(*context_id.as_ref());
if calimero_storage::collections::is_app_root_entry(entity_id) {
if is_opaque_crdt_type(&leaf.metadata.crdt_type) {
let mut md = Metadata::default();
md.created_at = leaf.metadata.created_at;
md.updated_at = leaf.metadata.hlc_timestamp.into();
calimero_storage::interface::Interface::<MainStorage>::write_pre_merged_root_state(
entity_id,
&leaf.value,
md,
)?;
return Ok(());
}
tracing::warn!(
%context_id,
entity_id = %entity_id,
"HC apply: skipping root-entity merge on host (no host-side merge dispatch); \
caller dispatches via ContextClient::merge_root_state"
);
return Ok(());
}
let existing_index = Index::<MainStorage>::get_index(entity_id).ok().flatten();
let mut metadata = Metadata::default();
metadata.crdt_type = Some(leaf.metadata.crdt_type.clone());
metadata.updated_at = leaf.metadata.hlc_timestamp.into();
metadata.created_at = leaf.metadata.created_at;
if let Some(wire_auth) = leaf.metadata.authorization.as_ref() {
metadata.storage_type = wire_auth.clone();
} else if let Some(ref existing) = existing_index {
metadata.storage_type = existing.metadata.storage_type.clone();
} else if matches!(
leaf.metadata.crdt_type,
calimero_primitives::crdt::CrdtType::FrozenStorage
) {
metadata.storage_type = StorageType::Frozen;
}
let action = if existing_index.is_some() {
if matches!(metadata.storage_type, StorageType::Frozen) {
return Ok(());
}
Action::Update {
id: entity_id,
data: leaf.value.clone(),
ancestors: vec![], metadata,
}
} else {
let parent_id = leaf.metadata.parent_id.map(Id::new).unwrap_or(root_id);
if parent_id.is_root()
&& Index::<MainStorage>::get_index(parent_id)
.ok()
.flatten()
.is_none()
{
let parent_init = Action::Update {
id: parent_id,
data: vec![],
ancestors: vec![],
metadata: Metadata::default(),
};
Interface::<MainStorage>::apply_action(parent_init, &ApplyContext::empty())?;
}
let ancestors = if !leaf.metadata.ancestors.is_empty() {
leaf.metadata.ancestors.clone()
} else {
let parent_index = Index::<MainStorage>::get_index(parent_id).ok().flatten();
if !empty_chain_placement_is_safe(parent_id.is_root(), parent_index.is_some()) {
tracing::warn!(
%context_id,
%entity_id,
%parent_id,
"HC apply: leaf arrived without an ancestor chain and its parent \
is not present locally; deferring rather than guessing its tree \
position (avoids a divergent root hash HashComparison cannot heal)"
);
return Ok(());
}
let parent_hash = Index::<MainStorage>::get_hashes_for(parent_id)
.ok()
.flatten()
.map(|(full, _)| full)
.unwrap_or([0; 32]);
let parent_metadata = parent_index
.map(|idx| idx.metadata.clone())
.unwrap_or_default();
vec![ChildInfo::new(parent_id, parent_hash, parent_metadata)]
};
Action::Add {
id: entity_id,
data: leaf.value.clone(),
ancestors,
metadata,
}
};
Interface::<MainStorage>::apply_action(action, &ApplyContext::empty())?;
Ok(())
}
pub const MAX_ENTITIES_PER_PUSH: usize = 500;
#[derive(Debug, Default)]
pub struct EntityPushOutcome {
pub applied: u32,
pub deferred_root_merges: Vec<([u8; 32], Vec<u8>, u64)>,
}
pub fn handle_entity_push(
store: &Store,
runtime_env: &calimero_storage::env::RuntimeEnv,
context_id: ContextId,
entities: &[TreeLeafData],
) -> EntityPushOutcome {
let entities = if entities.len() > MAX_ENTITIES_PER_PUSH {
tracing::warn!(
%context_id,
received = entities.len(),
max = MAX_ENTITIES_PER_PUSH,
"EntityPush exceeds max, truncating"
);
&entities[..MAX_ENTITIES_PER_PUSH]
} else {
entities
};
let loaded_app_key = calimero_context::hlc_fence::loaded_reader_app_key(store, &context_id);
apply_entity_push_batch(store, runtime_env, context_id, entities, loaded_app_key)
}
fn apply_entity_push_batch(
store: &Store,
runtime_env: &calimero_storage::env::RuntimeEnv,
context_id: ContextId,
entities: &[TreeLeafData],
loaded_app_key: Result<Option<[u8; 32]>>,
) -> EntityPushOutcome {
let loaded_app_key = match loaded_app_key {
Ok(key) => key,
Err(e) => {
tracing::warn!(
%context_id,
error = %e,
count = entities.len(),
"EntityPush: could not resolve loaded reader schema (store error); \
skipping batch fail-closed — leaves will be re-pushed next sync"
);
return EntityPushOutcome::default();
}
};
calimero_storage::env::with_runtime_env(runtime_env.clone(), || {
let mut applied = 0u32;
let mut dropped_unauthorized = 0u32;
let mut buffered = 0u32;
let mut deferred_root_merges: Vec<([u8; 32], Vec<u8>, u64)> = Vec::new();
for leaf in entities {
if !leaf.is_valid() {
tracing::warn!(
%context_id,
key = %hex::encode(leaf.key),
len = leaf.value.len(),
"pushed entity failed TreeLeafData::is_valid(), skipping"
);
continue;
}
if !is_leaf_currently_authorized(store, &context_id, leaf) {
dropped_unauthorized += 1;
tracing::warn!(
%context_id,
key = %hex::encode(leaf.key),
"pushed entity dropped: claimed author is not currently authorized for this context"
);
continue;
}
let entity_id = Id::new(leaf.key);
if calimero_storage::collections::is_app_root_entry(entity_id)
&& !is_opaque_crdt_type(&leaf.metadata.crdt_type)
{
deferred_root_merges.push((
leaf.key,
leaf.value.clone(),
leaf.metadata.hlc_timestamp,
));
continue;
}
let apply_result = match loaded_app_key {
Some(loaded) => apply_leaf_with_crdt_merge_gated(store, context_id, leaf, loaded)
.map(|outcome| match outcome {
LeafOutcome::Applied => true,
LeafOutcome::Buffered => false,
}),
None => apply_leaf_with_crdt_merge(context_id, leaf).map(|()| true),
};
match apply_result {
Ok(true) => applied += 1,
Ok(false) => buffered += 1,
Err(e) => {
tracing::warn!(
%context_id,
key = %hex::encode(leaf.key),
error = %e,
"Failed to apply pushed entity"
);
}
}
}
if buffered > 0 {
tracing::info!(
%context_id,
buffered,
"EntityPush: buffered future-schema entities into the absorb buffer"
);
}
if dropped_unauthorized > 0 {
tracing::info!(
%context_id,
dropped_unauthorized,
"EntityPush: dropped entities whose author is no longer authorized"
);
}
EntityPushOutcome {
applied,
deferred_root_merges,
}
})
}
pub async fn apply_under_context_lock<R>(
context_client: Option<&ContextClient>,
context_id: ContextId,
runtime_env: &calimero_storage::env::RuntimeEnv,
f: impl FnOnce() -> R,
) -> R {
let _guard = match context_client {
Some(client) => client.acquire_lock(&context_id).await,
None => None,
};
calimero_storage::env::with_runtime_env(runtime_env.clone(), f)
}
pub async fn handle_entity_push_locked(
context_client: Option<&ContextClient>,
store: &Store,
runtime_env: &calimero_storage::env::RuntimeEnv,
context_id: ContextId,
entities: &[TreeLeafData],
) -> EntityPushOutcome {
let _guard = match context_client {
Some(client) => client.acquire_lock(&context_id).await,
None => None,
};
handle_entity_push(store, runtime_env, context_id, entities)
}
fn apply_entity_deletions(
context_id: ContextId,
runtime_env: &calimero_storage::env::RuntimeEnv,
deletions: &[EntityDeletion],
) -> u32 {
calimero_storage::env::with_runtime_env(runtime_env.clone(), || {
let mut applied: u32 = 0;
for deletion in deletions {
let action = Action::DeleteRef {
id: Id::new(deletion.id),
deleted_at: deletion.deleted_at,
metadata: deletion.metadata.clone(),
};
match Interface::<MainStorage>::apply_action(action, &ApplyContext::empty()) {
Ok(_) => applied += 1,
Err(e) => tracing::debug!(
%context_id,
id = %hex::encode(deletion.id),
error = %e,
"EntityDeletePush: skipped a tombstone (lost LWW or unauthorized)"
),
}
}
applied
})
}
pub async fn handle_entity_delete_push_locked(
context_client: Option<&ContextClient>,
context_id: ContextId,
runtime_env: &calimero_storage::env::RuntimeEnv,
deletions: &[EntityDeletion],
) -> u32 {
let _guard = match context_client {
Some(client) => client.acquire_lock(&context_id).await,
None => None,
};
apply_entity_deletions(context_id, runtime_env, deletions)
}
pub fn extract_signed_op(
skeleton_bytes: &[u8],
) -> Option<calimero_context_client::local_governance::SignedNamespaceOp> {
use calimero_context_client::local_governance::{SignedNamespaceOp, StoredNamespaceEntry};
if let Ok(StoredNamespaceEntry::Signed(op)) =
borsh::from_slice::<StoredNamespaceEntry>(skeleton_bytes)
{
return Some(op);
}
borsh::from_slice::<SignedNamespaceOp>(skeleton_bytes).ok()
}
pub fn extract_signed_op_bytes(skeleton_bytes: &[u8]) -> Option<Vec<u8>> {
extract_signed_op(skeleton_bytes).and_then(|op| borsh::to_vec(&op).ok())
}
#[cfg(test)]
mod tests {
use super::*;
use calimero_primitives::application::ApplicationId;
#[test]
fn test_validate_application_id_matching() {
let app_id = ApplicationId::from([1u8; 32]);
assert!(validate_application_id(&app_id, &app_id).is_ok());
}
#[test]
fn test_validate_application_id_mismatch() {
let app1 = ApplicationId::from([1u8; 32]);
let app2 = ApplicationId::from([2u8; 32]);
let result = validate_application_id(&app1, &app2);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("application mismatch"));
}
#[test]
fn test_generate_nonce_returns_value() {
let nonce = generate_nonce();
assert_ne!(nonce, [0u8; 12]);
}
#[test]
fn test_generate_nonce_is_random() {
let nonce1 = generate_nonce();
let nonce2 = generate_nonce();
assert_ne!(nonce1, nonce2, "Nonces should be randomly generated");
}
use calimero_storage::entities::{SignatureData, StorageType};
use std::collections::BTreeSet;
#[test]
fn extract_author_user_returns_owner() {
let owner = PublicKey::from([7u8; 32]);
let st = StorageType::User {
owner,
signature_data: None,
};
assert_eq!(
extract_author_from_leaf_authorization(Some(&st)),
Some(owner),
);
}
#[test]
fn extract_author_shared_with_signer_hint_returns_signer() {
let signer = PublicKey::from([9u8; 32]);
let st = StorageType::Shared {
writers: std::collections::BTreeMap::from([(
signer,
calimero_storage::entities::OpMask::FULL,
)]),
signature_data: Some(SignatureData {
signer: Some(signer),
signature: [0u8; 64],
nonce: 0,
}),
};
assert_eq!(
extract_author_from_leaf_authorization(Some(&st)),
Some(signer),
);
}
#[test]
fn extract_author_shared_without_signer_hint_returns_none() {
let st = StorageType::Shared {
writers: std::collections::BTreeMap::from([(
PublicKey::from([1u8; 32]),
calimero_storage::entities::OpMask::FULL,
)]),
signature_data: Some(SignatureData {
signer: None,
signature: [0u8; 64],
nonce: 0,
}),
};
assert_eq!(extract_author_from_leaf_authorization(Some(&st)), None);
}
#[test]
fn extract_author_public_returns_none() {
assert_eq!(
extract_author_from_leaf_authorization(Some(&StorageType::Public)),
None,
);
}
#[test]
fn extract_author_frozen_returns_none() {
assert_eq!(
extract_author_from_leaf_authorization(Some(&StorageType::Frozen)),
None,
);
}
#[test]
fn extract_author_no_authorization_returns_none() {
assert_eq!(extract_author_from_leaf_authorization(None), None);
}
use calimero_node_primitives::sync::{LeafMetadata, TreeLeafData};
use calimero_primitives::context::ContextId;
use calimero_primitives::crdt::CrdtType;
use calimero_storage::address::Id;
use calimero_storage::index::Index;
use calimero_storage::store::MainStorage;
use calimero_store::db::InMemoryDB;
use calimero_store::Store;
use std::sync::Arc;
fn opaque_leaf_with_schema(key: [u8; 32], schema: Option<[u8; 32]>) -> TreeLeafData {
let mut md = LeafMetadata::new(CrdtType::lww_register("test"), 100, [0u8; 32]);
if let Some(k) = schema {
md = md.with_schema_app_key(k);
}
TreeLeafData::new(key, b"v2-bytes".to_vec(), md)
}
#[test]
fn leaf_with_future_schema_is_buffered_not_stored() {
let context_id = ContextId::from([0xCA; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = calimero_node_primitives::sync::storage_bridge::create_runtime_env(
&store, context_id, identity,
);
let leaf_key = [0x42u8; 32];
let leaf = opaque_leaf_with_schema(leaf_key, Some([2u8; 32])); let loaded_v1 = [1u8; 32];
let outcome = calimero_storage::env::with_runtime_env(runtime_env.clone(), || {
apply_leaf_with_crdt_merge_gated(&store, context_id, &leaf, loaded_v1)
})
.expect("gated apply must not error");
assert!(
matches!(outcome, LeafOutcome::Buffered),
"future-schema leaf must be buffered, got {outcome:?}"
);
let stored = calimero_storage::env::with_runtime_env(runtime_env.clone(), || {
Index::<MainStorage>::get_index(Id::new(leaf_key))
.ok()
.flatten()
});
assert!(stored.is_none(), "future-schema leaf must NOT be stored");
let pending = calimero_context::group_store::AbsorbRepository::new(&store)
.enumerate_pending(&context_id)
.expect("enumerate pending");
assert_eq!(pending.len(), 1, "future-schema leaf must be buffered");
}
#[test]
fn leaf_with_matching_schema_applies() {
let context_id = ContextId::from([0xCB; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = calimero_node_primitives::sync::storage_bridge::create_runtime_env(
&store, context_id, identity,
);
let leaf_key = [0x43u8; 32];
let loaded = [1u8; 32];
let leaf = opaque_leaf_with_schema(leaf_key, Some(loaded));
let outcome = calimero_storage::env::with_runtime_env(runtime_env.clone(), || {
apply_leaf_with_crdt_merge_gated(&store, context_id, &leaf, loaded)
})
.expect("gated apply must not error");
assert!(matches!(outcome, LeafOutcome::Applied));
}
#[test]
fn legacy_leaf_without_schema_marker_applies() {
let context_id = ContextId::from([0xCC; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = calimero_node_primitives::sync::storage_bridge::create_runtime_env(
&store, context_id, identity,
);
let leaf_key = [0x44u8; 32];
let leaf = opaque_leaf_with_schema(leaf_key, None);
let outcome = calimero_storage::env::with_runtime_env(runtime_env.clone(), || {
apply_leaf_with_crdt_merge_gated(&store, context_id, &leaf, [1u8; 32])
})
.expect("gated apply must not error");
assert!(matches!(outcome, LeafOutcome::Applied));
}
#[test]
fn store_error_resolving_gate_skips_batch_not_applies() {
let context_id = ContextId::from([0xCD; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = calimero_node_primitives::sync::storage_bridge::create_runtime_env(
&store, context_id, identity,
);
let leaf_key = [0x45u8; 32];
let leaf = opaque_leaf_with_schema(leaf_key, None);
let outcome = apply_entity_push_batch(
&store,
&runtime_env,
context_id,
std::slice::from_ref(&leaf),
Err(eyre::eyre!("simulated transient store error")),
);
assert_eq!(
outcome.applied, 0,
"fail-closed: a store error must skip the batch, not apply it"
);
let stored = calimero_storage::env::with_runtime_env(runtime_env.clone(), || {
Index::<MainStorage>::get_index(Id::new(leaf_key))
.ok()
.flatten()
});
assert!(
stored.is_none(),
"store error must NOT result in an ungated apply/store"
);
}
#[test]
fn no_gate_ok_none_still_applies_leaf() {
let context_id = ContextId::from([0xCE; 32]);
let identity = PublicKey::from([0u8; 32]);
let store = Store::new(Arc::new(InMemoryDB::owned()));
let runtime_env = calimero_node_primitives::sync::storage_bridge::create_runtime_env(
&store, context_id, identity,
);
let leaf_key = [0x46u8; 32];
let leaf = opaque_leaf_with_schema(leaf_key, None);
let outcome = apply_entity_push_batch(
&store,
&runtime_env,
context_id,
std::slice::from_ref(&leaf),
Ok(None),
);
assert_eq!(
outcome.applied, 1,
"Ok(None) legitimate-no-gate case must apply the leaf"
);
}
}
#[cfg(test)]
mod empty_chain_placement_tests {
#[test]
fn safe_when_parent_is_the_context_root() {
assert!(super::empty_chain_placement_is_safe(true, false));
assert!(super::empty_chain_placement_is_safe(true, true));
}
#[test]
fn safe_when_nonroot_parent_already_exists_locally() {
assert!(super::empty_chain_placement_is_safe(false, true));
}
#[test]
fn unsafe_for_nested_entity_whose_parent_is_absent() {
assert!(!super::empty_chain_placement_is_safe(false, false));
}
}