use borsh::{BorshDeserialize, BorshSerialize};
use calimero_crypto::Nonce;
use calimero_network_primitives::stream::Stream;
use calimero_node_primitives::sync::{
MessagePayload, SnapshotCursor, SnapshotError, StreamMessage,
};
use calimero_primitives::context::ContextId;
use calimero_primitives::hash::Hash;
use calimero_storage::env::time_now;
use calimero_store::key::ContextState as ContextStateKey;
use calimero_store::key::{Generic as GenericKey, SCOPE_SIZE};
use calimero_store::slice::Slice;
use calimero_store::types::ContextState as ContextStateValue;
use eyre::Result;
use hex;
use tracing::{debug, info, warn};
use super::manager::SyncManager;
use super::tracking::Sequencer;
pub const DEFAULT_PAGE_BYTE_LIMIT: u32 = 64 * 1024;
pub const DEFAULT_PAGE_LIMIT: u16 = 16;
const SYNC_IN_PROGRESS_SCOPE: [u8; SCOPE_SIZE] = *b"sync-in-progres\0";
impl SyncManager {
pub async fn handle_snapshot_boundary_request(
&self,
context_id: ContextId,
_requested_cutoff_timestamp: Option<u64>,
stream: &mut Stream,
_nonce: Nonce,
) -> Result<()> {
let context = match self.context_client.get_context(&context_id)? {
Some(ctx) => ctx,
None => {
warn!(%context_id, "Context not found for snapshot boundary request");
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
};
info!(
%context_id,
root_hash = %context.root_hash,
heads_count = context.dag_heads.len(),
"Sending snapshot boundary response"
);
let mut sqx = Sequencer::default();
let msg = StreamMessage::Message {
sequence_id: sqx.next(),
payload: MessagePayload::SnapshotBoundaryResponse {
boundary_timestamp: time_now(),
boundary_root_hash: context.root_hash,
dag_heads: context.dag_heads.clone(),
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
Ok(())
}
#[expect(clippy::too_many_arguments, reason = "protocol handler")]
pub async fn handle_snapshot_stream_request(
&self,
context_id: ContextId,
boundary_root_hash: Hash,
page_limit: u16,
byte_limit: u32,
resume_cursor: Option<Vec<u8>>,
stream: &mut Stream,
_nonce: Nonce,
) -> Result<()> {
let context = match self.context_client.get_context(&context_id)? {
Some(ctx) => ctx,
None => {
warn!(%context_id, "Context not found for snapshot stream");
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
};
if context.root_hash != boundary_root_hash {
warn!(%context_id, "Boundary mismatch - state changed during sync");
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
let start_cursor = match resume_cursor {
Some(bytes) => match SnapshotCursor::try_from_slice(&bytes) {
Ok(cursor) => Some(cursor),
Err(_) => {
return self
.send_snapshot_error(stream, SnapshotError::ResumeCursorInvalid)
.await;
}
},
None => None,
};
self.stream_snapshot_pages(
context_id,
boundary_root_hash,
start_cursor,
page_limit,
byte_limit,
stream,
)
.await
}
async fn stream_snapshot_pages(
&self,
context_id: ContextId,
boundary_root_hash: Hash,
start_cursor: Option<SnapshotCursor>,
page_limit: u16,
byte_limit: u32,
stream: &mut Stream,
) -> Result<()> {
let handle = self.context_client.datastore_handle();
let (pages, next_cursor, total_entries) = generate_snapshot_pages(
&handle,
context_id,
start_cursor.as_ref(),
page_limit,
byte_limit,
)?;
let current_context = self.context_client.get_context(&context_id)?;
if let Some(ctx) = current_context {
if ctx.root_hash != boundary_root_hash {
warn!(
%context_id,
expected = %boundary_root_hash,
actual = %ctx.root_hash,
"Root hash changed during snapshot generation"
);
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
}
info!(%context_id, pages = pages.len(), total_entries, "Streaming snapshot");
if pages.is_empty() {
let msg = StreamMessage::Message {
sequence_id: 0,
payload: MessagePayload::SnapshotPage {
payload: Vec::new().into(),
uncompressed_len: 0,
cursor: None,
page_count: 0,
sent_count: 0,
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
return Ok(());
}
let mut sqx = Sequencer::default();
let page_count = pages.len() as u64;
for (i, page_data) in pages.into_iter().enumerate() {
let is_last = i == (page_count as usize - 1) && next_cursor.is_none();
let compressed = lz4_flex::compress_prepend_size(&page_data);
let cursor = if is_last {
None
} else if i == (page_count as usize - 1) {
match next_cursor.as_ref().map(borsh::to_vec).transpose() {
Ok(value) => value,
Err(e) => {
warn!(%context_id, error = %e, "Failed to encode snapshot cursor");
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
}
} else {
None
};
let msg = StreamMessage::Message {
sequence_id: sqx.next(),
payload: MessagePayload::SnapshotPage {
payload: compressed.into(),
uncompressed_len: page_data.len() as u32,
cursor,
page_count,
sent_count: (i + 1) as u64,
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
}
debug!(%context_id, "Finished streaming snapshot pages");
Ok(())
}
async fn send_snapshot_error(&self, stream: &mut Stream, error: SnapshotError) -> Result<()> {
let msg = StreamMessage::Message {
sequence_id: 0,
payload: MessagePayload::SnapshotError { error },
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await
}
pub async fn request_snapshot_sync(
&self,
context_id: ContextId,
peer_id: libp2p::PeerId,
force: bool,
) -> Result<SnapshotSyncResult> {
info!(%context_id, %peer_id, force, "Starting snapshot sync");
let is_crash_recovery = self.check_sync_in_progress(context_id)?.is_some();
if !force && !is_crash_recovery {
let handle = self.context_client.datastore_handle();
let has_state_keys = has_context_state_keys(&handle, context_id)?;
let has_initialized_metadata = self
.context_client
.get_context(&context_id)?
.map(|ctx| *ctx.root_hash != [0u8; 32])
.unwrap_or(false);
let is_initialized = has_state_keys || has_initialized_metadata;
calimero_node_primitives::sync::check_snapshot_safety(is_initialized)
.map_err(|e| eyre::eyre!("Snapshot safety check failed: {:?}", e))?;
}
let mut stream = self.network_client.open_stream(peer_id).await?;
let boundary = self
.request_snapshot_boundary(context_id, &mut stream)
.await?;
info!(%context_id, root_hash = %boundary.boundary_root_hash, "Received boundary");
let applied_records = self
.request_and_apply_snapshot_pages(context_id, &boundary, &mut stream)
.await?;
match self.context_client.compute_root_hash(&context_id) {
Ok(computed_root) => {
if computed_root != *boundary.boundary_root_hash {
warn!(
%context_id,
computed_root = %hex::encode(computed_root),
claimed_root = %hex::encode(*boundary.boundary_root_hash),
"Snapshot root hash mismatch - compute_root_hash may need fixing"
);
} else {
info!(
%context_id,
computed_root = %hex::encode(computed_root),
"Snapshot root hash verified successfully"
);
}
}
Err(e) => {
warn!(
%context_id,
error = %e,
claimed_root = %hex::encode(*boundary.boundary_root_hash),
"Could not verify root hash (compute_root_hash failed), trusting peer's claimed hash"
);
}
}
let root_to_store = *boundary.boundary_root_hash;
self.context_client
.force_root_hash(&context_id, root_to_store.into())?;
self.context_client
.update_dag_heads(&context_id, boundary.dag_heads.clone())?;
self.clear_sync_in_progress_marker(context_id)?;
info!(%context_id, applied_records, "Snapshot sync completed successfully");
Ok(SnapshotSyncResult {
boundary_root_hash: boundary.boundary_root_hash,
dag_heads: boundary.dag_heads,
applied_records,
})
}
async fn request_snapshot_boundary(
&self,
context_id: ContextId,
stream: &mut Stream,
) -> Result<SnapshotBoundary> {
use calimero_node_primitives::sync::InitPayload;
let identities = self
.context_client
.get_context_members(&context_id, Some(true));
let Some((our_identity, _)) =
crate::utils::choose_stream(identities, &mut rand::thread_rng())
.await
.transpose()?
else {
eyre::bail!("No owned identity found for context: {}", context_id);
};
let msg = StreamMessage::Init {
context_id,
party_id: our_identity,
payload: InitPayload::SnapshotBoundaryRequest {
context_id,
requested_cutoff_timestamp: None,
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
let response = super::stream::recv(stream, None, self.sync_config.timeout).await?;
let Some(StreamMessage::Message { payload, .. }) = response else {
eyre::bail!("Unexpected response to snapshot boundary request");
};
match payload {
MessagePayload::SnapshotBoundaryResponse {
boundary_timestamp,
boundary_root_hash,
dag_heads,
} => Ok(SnapshotBoundary {
boundary_timestamp,
boundary_root_hash,
dag_heads,
}),
MessagePayload::SnapshotError { error } => {
eyre::bail!("Snapshot boundary request failed: {:?}", error);
}
_ => eyre::bail!("Unexpected payload in snapshot boundary response"),
}
}
async fn request_and_apply_snapshot_pages(
&self,
context_id: ContextId,
boundary: &SnapshotBoundary,
stream: &mut Stream,
) -> Result<usize> {
use calimero_node_primitives::sync::InitPayload;
use std::collections::HashSet;
let identities = self
.context_client
.get_context_members(&context_id, Some(true));
let Some((our_identity, _)) =
crate::utils::choose_stream(identities, &mut rand::thread_rng())
.await
.transpose()?
else {
eyre::bail!("No owned identity found for context: {}", context_id);
};
self.set_sync_in_progress_marker(context_id, &boundary.boundary_root_hash)?;
let existing_keys: HashSet<[u8; 32]> = {
let handle = self.context_client.datastore_handle();
collect_context_state_keys(&handle, context_id)?
.into_iter()
.collect()
};
debug!(%context_id, existing_count = existing_keys.len(), "Collected existing state keys");
let mut received_keys: HashSet<[u8; 32]> = HashSet::new();
let mut total_applied = 0;
let mut resume_cursor: Option<Vec<u8>> = None;
loop {
let msg = StreamMessage::Init {
context_id,
party_id: our_identity,
payload: InitPayload::SnapshotStreamRequest {
context_id,
boundary_root_hash: boundary.boundary_root_hash,
page_limit: DEFAULT_PAGE_LIMIT,
byte_limit: DEFAULT_PAGE_BYTE_LIMIT,
resume_cursor: resume_cursor.clone(),
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
let mut pages_in_burst = 0;
loop {
let response = super::stream::recv(stream, None, self.sync_config.timeout).await?;
let Some(StreamMessage::Message { payload, .. }) = response else {
eyre::bail!("Unexpected response during snapshot streaming");
};
match payload {
MessagePayload::SnapshotPage {
payload,
uncompressed_len,
cursor,
page_count,
sent_count,
} => {
if payload.is_empty() && uncompressed_len == 0 {
self.cleanup_stale_keys(context_id, &existing_keys, &received_keys)?;
return Ok(total_applied);
}
let decompressed = lz4_flex::decompress_size_prepended(&payload)
.map_err(|e| eyre::eyre!("Decompress failed: {}", e))?;
if decompressed.len() != uncompressed_len as usize {
eyre::bail!(
"Size mismatch: {} vs {}",
uncompressed_len,
decompressed.len()
);
}
let records = decode_snapshot_records(&decompressed)?;
let mut handle = self.context_client.datastore_handle();
for (state_key, value) in &records {
let key = ContextStateKey::new(context_id, *state_key);
let slice: Slice<'_> = value.clone().into();
handle.put(&key, &ContextStateValue::from(slice))?;
received_keys.insert(*state_key);
}
total_applied += records.len();
pages_in_burst += 1;
debug!(
%context_id,
pages_in_burst,
page_count,
sent_count,
total_applied,
"Applied snapshot page"
);
let is_last_in_burst = sent_count == page_count;
if is_last_in_burst {
match cursor {
None => {
self.cleanup_stale_keys(
context_id,
&existing_keys,
&received_keys,
)?;
return Ok(total_applied);
}
Some(c) => {
resume_cursor = Some(c);
break; }
}
}
}
MessagePayload::SnapshotError { error } => {
eyre::bail!("Snapshot streaming failed: {:?}", error);
}
_ => eyre::bail!("Unexpected payload during snapshot streaming"),
}
}
}
}
fn cleanup_stale_keys(
&self,
context_id: ContextId,
existing_keys: &std::collections::HashSet<[u8; 32]>,
received_keys: &std::collections::HashSet<[u8; 32]>,
) -> Result<()> {
let mut handle = self.context_client.datastore_handle();
let mut deleted = 0;
for state_key in existing_keys.difference(received_keys) {
handle.delete(&ContextStateKey::new(context_id, *state_key))?;
deleted += 1;
}
if deleted > 0 {
debug!(%context_id, deleted, "Cleaned up stale keys");
}
Ok(())
}
fn set_sync_in_progress_marker(
&self,
context_id: ContextId,
boundary_root_hash: &Hash,
) -> Result<()> {
use calimero_store::types::GenericData;
let key = GenericKey::new(SYNC_IN_PROGRESS_SCOPE, *context_id);
let value_bytes = borsh::to_vec(boundary_root_hash)?;
let value: GenericData<'_> = Slice::from(value_bytes).into();
let mut handle = self.context_client.datastore_handle();
handle.put(&key, &value)?;
debug!(%context_id, "Set sync-in-progress marker");
Ok(())
}
fn clear_sync_in_progress_marker(&self, context_id: ContextId) -> Result<()> {
let key = GenericKey::new(SYNC_IN_PROGRESS_SCOPE, *context_id);
let mut handle = self.context_client.datastore_handle();
handle.delete(&key)?;
debug!(%context_id, "Cleared sync-in-progress marker");
Ok(())
}
pub fn check_sync_in_progress(&self, context_id: ContextId) -> Result<Option<Hash>> {
let key = GenericKey::new(SYNC_IN_PROGRESS_SCOPE, *context_id);
let handle = self.context_client.datastore_handle();
let value_opt = handle.get(&key)?;
match value_opt {
Some(value) => {
let bytes: Vec<u8> = value.as_ref().to_vec();
let hash: Hash = borsh::from_slice(&bytes)?;
Ok(Some(hash))
}
None => Ok(None),
}
}
}
#[derive(Debug)]
pub struct SnapshotSyncResult {
pub boundary_root_hash: Hash,
pub dag_heads: Vec<[u8; 32]>,
pub applied_records: usize,
}
struct SnapshotBoundary {
#[allow(dead_code)]
boundary_timestamp: u64,
boundary_root_hash: Hash,
dag_heads: Vec<[u8; 32]>,
}
fn generate_snapshot_pages<L: calimero_store::layer::ReadLayer>(
handle: &calimero_store::Handle<L>,
context_id: ContextId,
start_cursor: Option<&SnapshotCursor>,
page_limit: u16,
byte_limit: u32,
) -> Result<(Vec<Vec<u8>>, Option<SnapshotCursor>, u64)> {
let mut iter = handle.iter_snapshot::<ContextStateKey>()?;
let mut entries: Vec<([u8; 32], Vec<u8>)> = Vec::new();
for (key_result, value_result) in iter.entries() {
let key = key_result?;
let value = value_result?;
if key.context_id() == context_id {
entries.push((key.state_key(), value.value.to_vec()));
}
}
entries.sort_by(|a, b| a.0.cmp(&b.0));
let total_entries = entries.len() as u64;
let start_idx = start_cursor
.map(|c| {
entries
.iter()
.position(|(k, _)| *k > c.last_key)
.unwrap_or(entries.len())
})
.unwrap_or(0);
let mut pages: Vec<Vec<u8>> = Vec::new();
let mut current_page: Vec<u8> = Vec::new();
let mut last_key: Option<[u8; 32]> = None;
for (key, value) in entries.into_iter().skip(start_idx) {
let record_bytes = borsh::to_vec(&CanonicalRecord { key, value })?;
if !current_page.is_empty() && (current_page.len() + record_bytes.len()) as u32 > byte_limit
{
pages.push(std::mem::take(&mut current_page));
if pages.len() >= page_limit as usize {
return Ok((
pages,
last_key.map(|k| SnapshotCursor { last_key: k }),
total_entries,
));
}
}
current_page.extend(record_bytes);
last_key = Some(key);
}
if !current_page.is_empty() {
pages.push(current_page);
}
Ok((pages, None, total_entries))
}
#[derive(BorshSerialize, BorshDeserialize)]
struct CanonicalRecord {
key: [u8; 32],
value: Vec<u8>,
}
fn decode_snapshot_records(payload: &[u8]) -> Result<Vec<([u8; 32], Vec<u8>)>> {
let mut records = Vec::new();
let mut offset = 0;
while offset < payload.len() {
let record: CanonicalRecord = BorshDeserialize::deserialize(&mut &payload[offset..])?;
offset += borsh::to_vec(&record)?.len();
records.push((record.key, record.value));
}
Ok(records)
}
fn has_context_state_keys<L: calimero_store::layer::ReadLayer>(
handle: &calimero_store::Handle<L>,
context_id: ContextId,
) -> Result<bool> {
let mut iter = handle.iter::<ContextStateKey>()?;
for (key_result, _) in iter.entries() {
let key = key_result?;
if key.context_id() == context_id {
return Ok(true); }
}
Ok(false)
}
fn collect_context_state_keys<L: calimero_store::layer::ReadLayer>(
handle: &calimero_store::Handle<L>,
context_id: ContextId,
) -> Result<Vec<[u8; 32]>> {
let mut keys = Vec::new();
let mut iter = handle.iter::<ContextStateKey>()?;
for (key_result, _) in iter.entries() {
let key = key_result?;
if key.context_id() == context_id {
keys.push(key.state_key());
}
}
Ok(keys)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_canonical_record_encoding() {
let record = CanonicalRecord {
key: [0u8; 32],
value: vec![1, 2, 3, 4],
};
let encoded = borsh::to_vec(&record).unwrap();
let decoded: CanonicalRecord = BorshDeserialize::deserialize(&mut &encoded[..]).unwrap();
assert_eq!(record.key, decoded.key);
assert_eq!(record.value, decoded.value);
}
#[test]
fn test_decode_snapshot_records_empty() {
let records = decode_snapshot_records(&[]).unwrap();
assert!(records.is_empty());
}
#[test]
fn test_decode_snapshot_records_single() {
let record = CanonicalRecord {
key: [1u8; 32],
value: vec![10, 20, 30],
};
let encoded = borsh::to_vec(&record).unwrap();
let records = decode_snapshot_records(&encoded).unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].0, [1u8; 32]);
assert_eq!(records[0].1, vec![10, 20, 30]);
}
#[test]
fn test_decode_snapshot_records_multiple() {
let record1 = CanonicalRecord {
key: [1u8; 32],
value: vec![10],
};
let record2 = CanonicalRecord {
key: [2u8; 32],
value: vec![20, 21],
};
let mut encoded = borsh::to_vec(&record1).unwrap();
encoded.extend(borsh::to_vec(&record2).unwrap());
let records = decode_snapshot_records(&encoded).unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].0, [1u8; 32]);
assert_eq!(records[1].0, [2u8; 32]);
}
}