use async_trait::async_trait;
use calimero_context_client::client::ContextClient;
use calimero_network_primitives::stream::Stream;
use calimero_node_primitives::sync::{ProtocolSelection, SyncProtocol, SyncProtocolExecutor};
use calimero_primitives::context::ContextId;
use calimero_primitives::hash::Hash;
use calimero_primitives::identity::PublicKey;
use eyre::{bail, Result, WrapErr};
use libp2p::PeerId;
use tracing::{debug, info, warn};
use super::hash_comparison_protocol::{HashComparisonConfig, HashComparisonProtocol};
use super::level_sync::{LevelWiseConfig, LevelWiseProtocol};
#[async_trait(?Send)]
pub(crate) trait ProtocolDispatch {
async fn open_stream(&self, peer: PeerId) -> Result<Stream>;
async fn request_dag_heads_and_sync(
&self,
context_id: ContextId,
chosen_peer: PeerId,
our_identity: PublicKey,
stream: &mut Stream,
) -> Result<SyncProtocol>;
async fn fallback_to_snapshot_sync(
&self,
context_id: ContextId,
our_identity: PublicKey,
chosen_peer: PeerId,
) -> Result<SyncProtocol>;
}
#[derive(Clone)]
pub(crate) struct ProtocolSelector {
context_client: ContextClient,
}
impl ProtocolSelector {
pub(crate) fn new(context_client: ContextClient) -> Self {
Self { context_client }
}
pub(crate) async fn execute<D: ProtocolDispatch>(
&self,
dispatch: &D,
selection: ProtocolSelection,
context_id: ContextId,
chosen_peer: PeerId,
our_identity: PublicKey,
local_root_hash: &Hash,
peer_root_hash: &Hash,
stream: &mut Stream,
) -> Result<Option<SyncProtocol>> {
match selection.protocol {
SyncProtocol::None => {
debug!(
%context_id,
%chosen_peer,
root_hash = %local_root_hash,
reason = %selection.reason,
"No sync needed: {}",
selection.reason
);
Ok(None)
}
SyncProtocol::Snapshot { compressed, .. } => {
info!(
%context_id,
%chosen_peer,
compressed,
reason = %selection.reason,
"Initiating snapshot sync"
);
let result = dispatch
.fallback_to_snapshot_sync(context_id, our_identity, chosen_peer)
.await
.wrap_err("snapshot sync")?;
Ok(Some(result))
}
SyncProtocol::DeltaSync { .. } => {
info!(
%context_id,
%chosen_peer,
reason = %selection.reason,
"Initiating delta sync via DAG heads request"
);
let result = dispatch
.request_dag_heads_and_sync(context_id, chosen_peer, our_identity, stream)
.await
.wrap_err("delta sync")?;
if matches!(result, SyncProtocol::None) {
bail!(
"Peer {chosen_peer} has no data for context {context_id} \
despite handshake indicating overlap"
);
}
Ok(Some(result))
}
SyncProtocol::HashComparison { root_hash, .. } => {
info!(
%context_id,
reason = %selection.reason,
"Starting HashComparison sync"
);
let mut transport = super::stream::StreamTransport::new(stream);
let store = self.context_client.datastore_handle().into_inner();
let config = HashComparisonConfig {
remote_root_hash: root_hash,
};
match HashComparisonProtocol::run_initiator(
&mut transport,
&store,
context_id,
our_identity,
config,
)
.await
{
Ok(stats) => {
info!(
%context_id,
nodes_compared = stats.nodes_compared,
entities_merged = stats.entities_merged,
nodes_skipped = stats.nodes_skipped,
deferred_root_merges = stats.deferred_root_merges.len(),
"HashComparison sync completed successfully"
);
if !stats.deferred_root_merges.is_empty() {
dispatch_deferred_root_merges(
&self.context_client,
&store,
context_id,
our_identity,
&stats.deferred_root_merges,
)
.await;
}
Ok(Some(SyncProtocol::HashComparison {
root_hash,
divergent_subtrees: vec![],
}))
}
Err(e) => {
warn!(
%context_id,
error = %e,
"HashComparison sync failed, falling back to DAG catchup"
);
let mut fallback_stream = dispatch
.open_stream(chosen_peer)
.await
.wrap_err("open stream for hash-comparison fallback")?;
let result = dispatch
.request_dag_heads_and_sync(
context_id,
chosen_peer,
our_identity,
&mut fallback_stream,
)
.await
.wrap_err("hash comparison fallback")?;
if matches!(result, SyncProtocol::None) {
info!(
%context_id,
"DAG catchup failed, falling back to snapshot sync"
);
drop(fallback_stream);
let result = dispatch
.fallback_to_snapshot_sync(context_id, our_identity, chosen_peer)
.await
.wrap_err("snapshot fallback")?;
return Ok(Some(result));
}
Ok(Some(result))
}
}
}
SyncProtocol::BloomFilter { .. } => {
warn!(
%context_id,
reason = %selection.reason,
"BloomFilter not yet implemented, falling back to snapshot"
);
let result = dispatch
.fallback_to_snapshot_sync(context_id, our_identity, chosen_peer)
.await
.wrap_err("bloom filter fallback")?;
Ok(Some(result))
}
SyncProtocol::SubtreePrefetch { .. } => {
warn!(
%context_id,
reason = %selection.reason,
"SubtreePrefetch not yet implemented, falling back to snapshot"
);
let result = dispatch
.fallback_to_snapshot_sync(context_id, our_identity, chosen_peer)
.await
.wrap_err("subtree prefetch fallback")?;
Ok(Some(result))
}
SyncProtocol::LevelWise { max_depth } => {
info!(
%context_id,
max_depth,
reason = %selection.reason,
"Starting LevelWise sync"
);
let mut transport = super::stream::StreamTransport::new(stream);
let store = self.context_client.datastore_handle().into_inner();
let config = LevelWiseConfig {
remote_root_hash: **peer_root_hash,
max_depth,
};
match LevelWiseProtocol::run_initiator(
&mut transport,
&store,
context_id,
our_identity,
config,
)
.await
{
Ok(stats) => {
info!(
%context_id,
levels_synced = stats.levels_synced,
nodes_compared = stats.nodes_compared,
entities_merged = stats.entities_merged,
nodes_skipped = stats.nodes_skipped,
deferred_root_merges = stats.deferred_root_merges.len(),
"LevelWise sync completed successfully"
);
if !stats.deferred_root_merges.is_empty() {
dispatch_deferred_root_merges(
&self.context_client,
&store,
context_id,
our_identity,
&stats.deferred_root_merges,
)
.await;
}
Ok(Some(SyncProtocol::LevelWise { max_depth }))
}
Err(e) => {
warn!(
%context_id,
error = %e,
"LevelWise sync failed, falling back to DAG catchup"
);
let mut fallback_stream = dispatch
.open_stream(chosen_peer)
.await
.wrap_err("open stream for level-wise fallback")?;
let result = dispatch
.request_dag_heads_and_sync(
context_id,
chosen_peer,
our_identity,
&mut fallback_stream,
)
.await
.wrap_err("level-wise fallback")?;
if matches!(result, SyncProtocol::None) {
info!(
%context_id,
"DAG catchup insufficient, attempting snapshot"
);
drop(fallback_stream);
let snapshot_result = dispatch
.fallback_to_snapshot_sync(context_id, our_identity, chosen_peer)
.await
.wrap_err("level-wise snapshot fallback")?;
return Ok(Some(snapshot_result));
}
Ok(Some(result))
}
}
}
}
}
}
pub(crate) async fn dispatch_deferred_root_merges(
context_client: &ContextClient,
store: &calimero_store::Store,
context_id: ContextId,
our_identity: PublicKey,
deferred: &[([u8; 32], Vec<u8>, u64)],
) {
use calimero_storage::address::Id;
use calimero_storage::entities::Metadata;
use calimero_storage::env::with_runtime_env;
use calimero_storage::index::Index;
use calimero_storage::interface::Interface;
use calimero_storage::merge::MergeRootStateRequest;
use calimero_storage::store::{MainStorage, StorageAdaptor};
let runtime_env =
calimero_node_primitives::sync::create_runtime_env(store, context_id, our_identity);
for (key, incoming, incoming_hlc_ts) in deferred {
let entity_id = Id::new(*key);
let read_result: eyre::Result<(Vec<u8>, Metadata)> =
with_runtime_env(runtime_env.clone(), || {
let meta = Index::<MainStorage>::get_index(entity_id)
.map_err(|e| eyre::eyre!("get_index: {e}"))?
.map(|idx| idx.metadata)
.unwrap_or_default();
let existing = <MainStorage as StorageAdaptor>::storage_read(
calimero_storage::store::Key::Entry(entity_id),
)
.unwrap_or_default();
Ok((existing, meta))
});
let (existing, existing_metadata) = match read_result {
Ok(pair) => pair,
Err(err) => {
warn!(
%context_id,
entity_id = %hex::encode(key),
%err,
"deferred root merge: failed to read existing root state, skipping"
);
continue;
}
};
let existing_ts: u64 = (*existing_metadata.updated_at).into();
let incoming_ts: u64 = *incoming_hlc_ts;
let request = MergeRootStateRequest {
existing,
incoming: incoming.clone(),
existing_created_at: existing_metadata.created_at,
existing_ts,
incoming_ts,
};
let merged = match context_client
.merge_root_state(&context_id, &our_identity, request)
.await
{
Ok(bytes) => bytes,
Err(err) => {
warn!(
%context_id,
entity_id = %hex::encode(key),
?err,
"deferred root merge: WASM dispatch failed, skipping"
);
continue;
}
};
let mut new_metadata = existing_metadata.clone();
new_metadata.updated_at = existing_ts.max(incoming_ts).into();
let write_result = with_runtime_env(runtime_env.clone(), || {
Interface::<MainStorage>::write_pre_merged_root_state(entity_id, &merged, new_metadata)
.map_err(|e| eyre::eyre!("write_pre_merged_root_state: {e}"))
});
match write_result {
Ok(_full_hash) => {
info!(
%context_id,
entity_id = %hex::encode(key),
"deferred root merge: applied"
);
}
Err(err) => {
warn!(
%context_id,
entity_id = %hex::encode(key),
%err,
"deferred root merge: failed to write merged bytes back"
);
}
}
}
}
#[cfg(test)]
mod tests {
}