use calimero_node_primitives::sync::TreeLeafData;
use calimero_primitives::application::ApplicationId;
use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use calimero_storage::address::Id;
use calimero_storage::entities::{ChildInfo, Metadata, StorageType};
use calimero_storage::index::Index;
use calimero_storage::interface::{Action, ApplyContext, Interface};
use calimero_storage::store::MainStorage;
use calimero_store::Store;
use eyre::{bail, Result};
use rand::Rng;
pub fn get_local_root_hash_for_context(context_id: ContextId) -> Result<[u8; 32]> {
let root_id = Id::new(*context_id.as_ref());
match Index::<MainStorage>::get_hashes_for(root_id) {
Ok(Some((full_hash, _))) => Ok(full_hash),
Ok(None) => Ok([0u8; 32]),
Err(e) => {
tracing::warn!(%context_id, error = %e, "Failed to get root hash");
Ok([0u8; 32])
}
}
}
#[allow(dead_code, reason = "utility function for application validation")]
pub fn validate_application_id(ours: &ApplicationId, theirs: &ApplicationId) -> eyre::Result<()> {
if ours != theirs {
bail!("application mismatch: expected {}, got {}", ours, theirs);
}
Ok(())
}
#[must_use]
pub fn generate_nonce() -> calimero_crypto::Nonce {
rand::thread_rng().gen()
}
pub fn wire_authorization_for(
metadata: &Metadata,
) -> Option<calimero_storage::entities::StorageType> {
match &metadata.storage_type {
StorageType::Public | StorageType::Frozen => None,
StorageType::Shared { .. } | StorageType::User { .. } => {
Some(metadata.storage_type.clone())
}
}
}
fn extract_author_from_leaf_authorization(
authorization: Option<&StorageType>,
) -> Option<PublicKey> {
match authorization? {
StorageType::User { owner, .. } => Some(*owner),
StorageType::Shared { signature_data, .. } => {
signature_data.as_ref().and_then(|sd| sd.signer)
}
StorageType::Public | StorageType::Frozen => None,
}
}
pub fn is_leaf_currently_authorized(
store: &Store,
context_id: &ContextId,
leaf: &TreeLeafData,
) -> bool {
let Some(author) = extract_author_from_leaf_authorization(leaf.metadata.authorization.as_ref())
else {
return true;
};
match calimero_context::group_store::is_currently_authorized_for_context(
store, context_id, &author,
) {
Ok(true) => true,
Ok(false) => {
crate::node_metrics::record_hc_leaf_drop("unauthorized");
false
}
Err(err) => {
tracing::error!(
%context_id,
%author,
error = %err,
"is_leaf_currently_authorized: membership lookup failed; dropping entity to avoid silent bypass"
);
crate::node_metrics::record_hc_leaf_drop("lookup_error");
false
}
}
}
fn is_opaque_crdt_type(crdt_type: &calimero_primitives::crdt::CrdtType) -> bool {
use calimero_primitives::crdt::CrdtType;
matches!(crdt_type, CrdtType::LwwRegister { inner_type }
if inner_type == crate::sync::hash_comparison_protocol::OPAQUE_LEAF_CRDT_TYPE_NAME)
}
pub fn apply_leaf_with_crdt_merge(context_id: ContextId, leaf: &TreeLeafData) -> Result<()> {
let entity_id = Id::new(leaf.key);
let root_id = Id::new(*context_id.as_ref());
if calimero_storage::collections::is_app_root_entry(entity_id) {
if is_opaque_crdt_type(&leaf.metadata.crdt_type) {
let mut md = Metadata::default();
md.created_at = leaf.metadata.created_at;
md.updated_at = leaf.metadata.hlc_timestamp.into();
calimero_storage::interface::Interface::<MainStorage>::write_pre_merged_root_state(
entity_id,
&leaf.value,
md,
)?;
return Ok(());
}
tracing::warn!(
%context_id,
entity_id = %entity_id,
"HC apply: skipping root-entity merge on host (no host-side merge dispatch); \
caller dispatches via ContextClient::merge_root_state"
);
return Ok(());
}
let existing_index = Index::<MainStorage>::get_index(entity_id).ok().flatten();
let mut metadata = Metadata::default();
metadata.crdt_type = Some(leaf.metadata.crdt_type.clone());
metadata.updated_at = leaf.metadata.hlc_timestamp.into();
metadata.created_at = leaf.metadata.created_at;
if let Some(wire_auth) = leaf.metadata.authorization.as_ref() {
metadata.storage_type = wire_auth.clone();
} else if let Some(ref existing) = existing_index {
metadata.storage_type = existing.metadata.storage_type.clone();
}
let action = if existing_index.is_some() {
Action::Update {
id: entity_id,
data: leaf.value.clone(),
ancestors: vec![], metadata,
}
} else {
let parent_id = leaf.metadata.parent_id.map(Id::new).unwrap_or(root_id);
if Index::<MainStorage>::get_index(parent_id)
.ok()
.flatten()
.is_none()
{
let parent_init = Action::Update {
id: parent_id,
data: vec![],
ancestors: vec![],
metadata: Metadata::default(),
};
Interface::<MainStorage>::apply_action(parent_init, &ApplyContext::empty())?;
}
let parent_hash = Index::<MainStorage>::get_hashes_for(parent_id)
.ok()
.flatten()
.map(|(full, _)| full)
.unwrap_or([0; 32]);
let parent_metadata = Index::<MainStorage>::get_index(parent_id)
.ok()
.flatten()
.map(|idx| idx.metadata.clone())
.unwrap_or_default();
let ancestor = ChildInfo::new(parent_id, parent_hash, parent_metadata);
Action::Add {
id: entity_id,
data: leaf.value.clone(),
ancestors: vec![ancestor],
metadata,
}
};
Interface::<MainStorage>::apply_action(action, &ApplyContext::empty())?;
Ok(())
}
pub const MAX_ENTITIES_PER_PUSH: usize = 500;
#[derive(Debug, Default)]
pub struct EntityPushOutcome {
pub applied: u32,
pub deferred_root_merges: Vec<([u8; 32], Vec<u8>, u64)>,
}
pub fn handle_entity_push(
store: &Store,
runtime_env: &calimero_storage::env::RuntimeEnv,
context_id: ContextId,
entities: &[TreeLeafData],
) -> EntityPushOutcome {
let entities = if entities.len() > MAX_ENTITIES_PER_PUSH {
tracing::warn!(
%context_id,
received = entities.len(),
max = MAX_ENTITIES_PER_PUSH,
"EntityPush exceeds max, truncating"
);
&entities[..MAX_ENTITIES_PER_PUSH]
} else {
entities
};
calimero_storage::env::with_runtime_env(runtime_env.clone(), || {
let mut applied = 0u32;
let mut dropped_unauthorized = 0u32;
let mut deferred_root_merges: Vec<([u8; 32], Vec<u8>, u64)> = Vec::new();
for leaf in entities {
if !leaf.is_valid() {
tracing::warn!(
%context_id,
key = %hex::encode(leaf.key),
len = leaf.value.len(),
"pushed entity failed TreeLeafData::is_valid(), skipping"
);
continue;
}
if !is_leaf_currently_authorized(store, &context_id, leaf) {
dropped_unauthorized += 1;
tracing::warn!(
%context_id,
key = %hex::encode(leaf.key),
"pushed entity dropped: claimed author is not currently authorized for this context"
);
continue;
}
let entity_id = Id::new(leaf.key);
if calimero_storage::collections::is_app_root_entry(entity_id)
&& !is_opaque_crdt_type(&leaf.metadata.crdt_type)
{
deferred_root_merges.push((
leaf.key,
leaf.value.clone(),
leaf.metadata.hlc_timestamp,
));
continue;
}
match apply_leaf_with_crdt_merge(context_id, leaf) {
Ok(()) => applied += 1,
Err(e) => {
tracing::warn!(
%context_id,
key = %hex::encode(leaf.key),
error = %e,
"Failed to apply pushed entity"
);
}
}
}
if dropped_unauthorized > 0 {
tracing::info!(
%context_id,
dropped_unauthorized,
"EntityPush: dropped entities whose author is no longer authorized"
);
}
EntityPushOutcome {
applied,
deferred_root_merges,
}
})
}
pub fn extract_signed_op(
skeleton_bytes: &[u8],
) -> Option<calimero_context_client::local_governance::SignedNamespaceOp> {
use calimero_context_client::local_governance::{SignedNamespaceOp, StoredNamespaceEntry};
if let Ok(StoredNamespaceEntry::Signed(op)) =
borsh::from_slice::<StoredNamespaceEntry>(skeleton_bytes)
{
return Some(op);
}
borsh::from_slice::<SignedNamespaceOp>(skeleton_bytes).ok()
}
pub fn extract_signed_op_bytes(skeleton_bytes: &[u8]) -> Option<Vec<u8>> {
extract_signed_op(skeleton_bytes).and_then(|op| borsh::to_vec(&op).ok())
}
#[cfg(test)]
mod tests {
use super::*;
use calimero_primitives::application::ApplicationId;
#[test]
fn test_validate_application_id_matching() {
let app_id = ApplicationId::from([1u8; 32]);
assert!(validate_application_id(&app_id, &app_id).is_ok());
}
#[test]
fn test_validate_application_id_mismatch() {
let app1 = ApplicationId::from([1u8; 32]);
let app2 = ApplicationId::from([2u8; 32]);
let result = validate_application_id(&app1, &app2);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("application mismatch"));
}
#[test]
fn test_generate_nonce_returns_value() {
let nonce = generate_nonce();
assert_ne!(nonce, [0u8; 12]);
}
#[test]
fn test_generate_nonce_is_random() {
let nonce1 = generate_nonce();
let nonce2 = generate_nonce();
assert_ne!(nonce1, nonce2, "Nonces should be randomly generated");
}
use calimero_storage::entities::{SignatureData, StorageType};
use std::collections::BTreeSet;
#[test]
fn extract_author_user_returns_owner() {
let owner = PublicKey::from([7u8; 32]);
let st = StorageType::User {
owner,
signature_data: None,
};
assert_eq!(
extract_author_from_leaf_authorization(Some(&st)),
Some(owner),
);
}
#[test]
fn extract_author_shared_with_signer_hint_returns_signer() {
let signer = PublicKey::from([9u8; 32]);
let st = StorageType::Shared {
writers: BTreeSet::from([signer]),
signature_data: Some(SignatureData {
signer: Some(signer),
signature: [0u8; 64],
nonce: 0,
}),
};
assert_eq!(
extract_author_from_leaf_authorization(Some(&st)),
Some(signer),
);
}
#[test]
fn extract_author_shared_without_signer_hint_returns_none() {
let st = StorageType::Shared {
writers: BTreeSet::from([PublicKey::from([1u8; 32])]),
signature_data: Some(SignatureData {
signer: None,
signature: [0u8; 64],
nonce: 0,
}),
};
assert_eq!(extract_author_from_leaf_authorization(Some(&st)), None);
}
#[test]
fn extract_author_public_returns_none() {
assert_eq!(
extract_author_from_leaf_authorization(Some(&StorageType::Public)),
None,
);
}
#[test]
fn extract_author_frozen_returns_none() {
assert_eq!(
extract_author_from_leaf_authorization(Some(&StorageType::Frozen)),
None,
);
}
#[test]
fn extract_author_no_authorization_returns_none() {
assert_eq!(extract_author_from_leaf_authorization(None), None);
}
}