use calimero_crypto::Nonce;
use calimero_node_primitives::sync::{
create_runtime_env, InitPayload, LeafMetadata, MessagePayload, StreamMessage, SyncTransport,
TreeLeafData, TreeNode, TreeNodeResponse, MAX_NODES_PER_RESPONSE,
};
use calimero_primitives::context::ContextId;
use calimero_storage::address::Id;
use calimero_storage::env::{with_runtime_env, RuntimeEnv};
use calimero_storage::index::Index;
use calimero_storage::interface::Interface;
use calimero_storage::store::MainStorage;
use eyre::Result;
use tracing::{debug, info, trace, warn};
use super::manager::SyncManager;
pub const MAX_REQUEST_DEPTH: u8 = 16;
impl SyncManager {
pub async fn handle_tree_node_request<T: SyncTransport>(
&self,
context_id: ContextId,
first_node_id: [u8; 32],
first_max_depth: Option<u8>,
transport: &mut T,
_nonce: Nonce,
) -> Result<()> {
info!(%context_id, "Starting HashComparison responder");
let identities = self
.context_client
.get_context_members(&context_id, Some(true));
let our_identity = match crate::utils::choose_stream(identities, &mut rand::thread_rng())
.await
.transpose()?
{
Some((identity, _)) => identity,
None => {
warn!(%context_id, "No owned identity for context, cannot respond to TreeNodeRequest");
let mut sqx = super::tracking::Sequencer::default();
let msg = StreamMessage::Message {
sequence_id: sqx.next(),
payload: MessagePayload::TreeNodeResponse {
nodes: vec![],
not_found: true,
},
next_nonce: super::helpers::generate_nonce(),
};
transport.send(&msg).await?;
return Ok(());
}
};
let mut sqx = super::tracking::Sequencer::default();
let mut requests_handled = 0u64;
let datastore = self.context_client.datastore_handle().into_inner();
let runtime_env = create_runtime_env(&datastore, context_id, our_identity);
{
let clamped_depth = first_max_depth.map(|d| d.min(MAX_REQUEST_DEPTH));
let response = self
.build_tree_node_response(context_id, &first_node_id, clamped_depth, &runtime_env)
.await?;
let msg = StreamMessage::Message {
sequence_id: sqx.next(),
payload: MessagePayload::TreeNodeResponse {
nodes: response.nodes,
not_found: response.not_found,
},
next_nonce: super::helpers::generate_nonce(),
};
transport.send(&msg).await?;
requests_handled += 1;
}
loop {
let Some(request) = transport.recv().await? else {
debug!(%context_id, requests_handled, "Stream closed, responder done");
break;
};
let StreamMessage::Init { payload, .. } = request else {
debug!(%context_id, "Received non-Init message, ending responder");
break;
};
let InitPayload::TreeNodeRequest {
node_id, max_depth, ..
} = payload
else {
debug!(%context_id, "Received non-TreeNodeRequest, ending responder");
break;
};
trace!(
%context_id,
node_id = %hex::encode(node_id),
?max_depth,
"Handling subsequent TreeNodeRequest"
);
let clamped_depth = max_depth.map(|d| d.min(MAX_REQUEST_DEPTH));
let response = self
.build_tree_node_response(context_id, &node_id, clamped_depth, &runtime_env)
.await?;
let msg = StreamMessage::Message {
sequence_id: sqx.next(),
payload: MessagePayload::TreeNodeResponse {
nodes: response.nodes,
not_found: response.not_found,
},
next_nonce: super::helpers::generate_nonce(),
};
transport.send(&msg).await?;
requests_handled += 1;
}
info!(%context_id, requests_handled, "HashComparison responder complete");
Ok(())
}
async fn build_tree_node_response(
&self,
context_id: ContextId,
node_id: &[u8; 32],
max_depth: Option<u8>,
runtime_env: &RuntimeEnv,
) -> Result<TreeNodeResponse> {
let context = self.context_client.get_context(&context_id)?;
let Some(context) = context else {
debug!(
%context_id,
"Context not found for TreeNodeRequest"
);
return Ok(TreeNodeResponse::not_found());
};
let is_root_request = node_id == context.root_hash.as_ref();
let local_node = with_runtime_env(runtime_env.clone(), || {
self.get_local_tree_node_from_index(context_id, node_id, is_root_request)
})?;
let Some(node) = local_node else {
debug!(
%context_id,
node_id = %hex::encode(node_id),
"TreeNodeRequest: node not found"
);
return Ok(TreeNodeResponse::not_found());
};
let mut nodes = vec![node.clone()];
let depth = max_depth.unwrap_or(0);
if depth > 0 && node.is_internal() {
for child_id in &node.children {
let child_node = with_runtime_env(runtime_env.clone(), || {
self.get_local_tree_node_from_index(context_id, child_id, false)
})?;
if let Some(child) = child_node {
nodes.push(child);
if nodes.len() >= MAX_NODES_PER_RESPONSE {
break;
}
}
}
}
debug!(
%context_id,
node_id = %hex::encode(node_id),
nodes_count = nodes.len(),
"TreeNodeRequest: returning nodes"
);
Ok(TreeNodeResponse::new(nodes))
}
fn get_local_tree_node_from_index(
&self,
context_id: ContextId,
node_id: &[u8; 32],
is_root_request: bool,
) -> Result<Option<TreeNode>> {
let entity_id = if is_root_request {
Id::new(*context_id.as_ref())
} else {
Id::new(*node_id)
};
let index = match Index::<MainStorage>::get_index(entity_id) {
Ok(Some(idx)) => idx,
Ok(None) => return Ok(None),
Err(e) => {
warn!(
%context_id,
entity_id = %entity_id,
error = %e,
"Failed to get index for entity"
);
return Ok(None);
}
};
let full_hash = index.full_hash();
let children_ids: Vec<[u8; 32]> = index
.children()
.map(|children| {
children
.iter()
.map(|child| *child.id().as_bytes())
.collect()
})
.unwrap_or_default();
if children_ids.is_empty() {
if let Some(entry_data) = Interface::<MainStorage>::find_by_id_raw(entity_id) {
let crdt_type = index.metadata.crdt_type.clone().ok_or_else(|| {
eyre::eyre!(
"Missing CRDT type metadata for leaf entity {}: data integrity issue",
entity_id
)
})?;
let metadata = LeafMetadata::new(
crdt_type,
index.metadata.updated_at(),
[0u8; 32],
);
let leaf_data = TreeLeafData::new(*entity_id.as_bytes(), entry_data, metadata);
Ok(Some(TreeNode::leaf(
*entity_id.as_bytes(),
full_hash,
leaf_data,
)))
} else {
Ok(Some(TreeNode::internal(
*entity_id.as_bytes(),
full_hash,
vec![],
)))
}
} else {
Ok(Some(TreeNode::internal(
*entity_id.as_bytes(),
full_hash,
children_ids,
)))
}
}
}