use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::time::Instant;
use borsh::BorshDeserialize;
use calimero_crypto::Nonce;
use calimero_network_primitives::stream::Stream;
use calimero_node_primitives::sync::snapshot::{
snapshot_record_kind, SnapshotRecord, MAX_SNAPSHOT_PAGE_SIZE,
};
use calimero_node_primitives::sync::{
MessagePayload, SnapshotCursor, SnapshotError, StreamMessage,
};
use calimero_node_primitives::{SyncState, SyncStatusSnapshot};
use calimero_primitives::context::ContextId;
use calimero_primitives::events::{
ContextEvent, ContextEventPayload, NodeEvent, SyncStatusPayload,
};
use calimero_primitives::hash::Hash;
use calimero_storage::address::Id;
use calimero_storage::env::time_now;
use calimero_storage::interface::Interface;
use calimero_storage::store::{Key as StorageKey, MainStorage};
use calimero_store::key::ContextState as ContextStateKey;
use calimero_store::key::{Generic as GenericKey, SCOPE_SIZE};
use calimero_store::slice::Slice;
use calimero_store::types::ContextState as ContextStateValue;
use eyre::Result;
use hex;
use tracing::{debug, info, warn};
use super::manager::SyncManager;
use super::tracking::Sequencer;
pub const DEFAULT_PAGE_BYTE_LIMIT: u32 = 64 * 1024;
pub const DEFAULT_PAGE_LIMIT: u16 = 16;
const SNAPSHOT_PAGE_FORMAT_V2: u8 = 0xFF;
const SYNC_IN_PROGRESS_SCOPE: [u8; SCOPE_SIZE] = *b"sync-in-progres\0";
fn snapshot_entity_is_readable(
schema_app_key: Option<[u8; 32]>,
loaded_app_key: Option<[u8; 32]>,
) -> bool {
match (schema_app_key, loaded_app_key) {
(Some(schema), Some(loaded)) => schema == loaded,
_ => true,
}
}
impl SyncManager {
pub async fn handle_snapshot_boundary_request(
&self,
context_id: ContextId,
_requested_cutoff_timestamp: Option<u64>,
stream: &mut Stream,
_nonce: Nonce,
) -> Result<()> {
let context = match self.context_client.get_context(&context_id)? {
Some(ctx) => ctx,
None => {
warn!(%context_id, "Context not found for snapshot boundary request");
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
};
info!(
%context_id,
root_hash = %context.root_hash,
heads_count = context.dag_heads.len(),
"Sending snapshot boundary response"
);
let mut sqx = Sequencer::default();
let msg = StreamMessage::Message {
sequence_id: sqx.next(),
payload: MessagePayload::SnapshotBoundaryResponse {
boundary_timestamp: time_now(),
boundary_root_hash: context.root_hash,
dag_heads: context.dag_heads.clone(),
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
Ok(())
}
#[expect(clippy::too_many_arguments, reason = "protocol handler")]
pub async fn handle_snapshot_stream_request(
&self,
context_id: ContextId,
boundary_root_hash: Hash,
page_limit: u16,
byte_limit: u32,
resume_cursor: Option<Vec<u8>>,
stream: &mut Stream,
_nonce: Nonce,
) -> Result<()> {
let context = match self.context_client.get_context(&context_id)? {
Some(ctx) => ctx,
None => {
warn!(%context_id, "Context not found for snapshot stream");
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
};
if context.root_hash != boundary_root_hash {
warn!(%context_id, "Boundary mismatch - state changed during sync");
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
let start_cursor = match resume_cursor {
Some(bytes) => match SnapshotCursor::try_from_slice(&bytes) {
Ok(cursor) => Some(cursor),
Err(_) => {
return self
.send_snapshot_error(stream, SnapshotError::ResumeCursorInvalid)
.await;
}
},
None => None,
};
self.stream_snapshot_pages(
context_id,
boundary_root_hash,
start_cursor,
page_limit,
byte_limit,
stream,
)
.await
}
async fn stream_snapshot_pages(
&self,
context_id: ContextId,
boundary_root_hash: Hash,
start_cursor: Option<SnapshotCursor>,
page_limit: u16,
byte_limit: u32,
stream: &mut Stream,
) -> Result<()> {
let handle = self.context_client.datastore_handle();
let schema_app_key = calimero_context::hlc_fence::loaded_reader_app_key(
self.context_client.datastore(),
&context_id,
)
.ok()
.flatten();
let (pages, next_cursor, total_entries) = generate_snapshot_pages(
&handle,
context_id,
start_cursor.as_ref(),
page_limit,
byte_limit,
schema_app_key,
)?;
let current_context = self.context_client.get_context(&context_id)?;
if let Some(ctx) = current_context {
if ctx.root_hash != boundary_root_hash {
warn!(
%context_id,
expected = %boundary_root_hash,
actual = %ctx.root_hash,
"Root hash changed during snapshot generation"
);
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
}
info!(%context_id, pages = pages.len(), total_entries, "Streaming snapshot");
if pages.is_empty() {
let msg = StreamMessage::Message {
sequence_id: 0,
payload: MessagePayload::SnapshotPage {
payload: Vec::new().into(),
uncompressed_len: 0,
cursor: None,
page_count: 0,
sent_count: 0,
total_records: 0,
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
return Ok(());
}
let mut sqx = Sequencer::default();
let page_count = pages.len() as u64;
for (i, page_data) in pages.into_iter().enumerate() {
let is_last = i == (page_count as usize - 1) && next_cursor.is_none();
let compressed = lz4_flex::compress_prepend_size(&page_data);
let cursor = if is_last {
None
} else if i == (page_count as usize - 1) {
match next_cursor.as_ref().map(borsh::to_vec).transpose() {
Ok(value) => value,
Err(e) => {
warn!(%context_id, error = %e, "Failed to encode snapshot cursor");
return self
.send_snapshot_error(stream, SnapshotError::InvalidBoundary)
.await;
}
}
} else {
None
};
let msg = StreamMessage::Message {
sequence_id: sqx.next(),
payload: MessagePayload::SnapshotPage {
payload: compressed.into(),
uncompressed_len: page_data.len() as u32,
cursor,
page_count,
sent_count: (i + 1) as u64,
total_records: total_entries,
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
}
debug!(%context_id, "Finished streaming snapshot pages");
Ok(())
}
async fn send_snapshot_error(&self, stream: &mut Stream, error: SnapshotError) -> Result<()> {
let msg = StreamMessage::Message {
sequence_id: 0,
payload: MessagePayload::SnapshotError { error },
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await
}
pub async fn request_snapshot_sync(
&self,
context_id: ContextId,
peer_id: libp2p::PeerId,
force: bool,
) -> Result<SnapshotSyncResult> {
info!(%context_id, %peer_id, force, "Starting snapshot sync");
let is_crash_recovery = self.check_sync_in_progress(context_id)?.is_some();
if !force && !is_crash_recovery {
let handle = self.context_client.datastore_handle();
let has_state_keys = has_context_state_keys(&handle, context_id)?;
let has_initialized_metadata = self
.context_client
.get_context(&context_id)?
.map(|ctx| *ctx.root_hash != [0u8; 32])
.unwrap_or(false);
let is_initialized = has_state_keys || has_initialized_metadata;
calimero_node_primitives::sync::check_snapshot_safety(is_initialized)
.map_err(|e| eyre::eyre!("Snapshot safety check failed: {:?}", e))?;
}
let mut stream = self.sync_network.open_stream(peer_id).await?;
let boundary = self
.request_snapshot_boundary(context_id, &mut stream)
.await?;
info!(%context_id, root_hash = %boundary.boundary_root_hash, "Received boundary");
let applied_records = self
.request_and_apply_snapshot_pages(context_id, &boundary, &mut stream)
.await?;
let root_to_store = match self.context_client.compute_root_hash(&context_id) {
Ok(computed_root) => {
if computed_root != *boundary.boundary_root_hash {
warn!(
%context_id,
computed_root = %hex::encode(computed_root),
claimed_root = %hex::encode(*boundary.boundary_root_hash),
"Snapshot root hash mismatch - using computed hash from storage"
);
} else {
info!(
%context_id,
root_hash = %hex::encode(computed_root),
"Snapshot root hash verified successfully"
);
}
computed_root
}
Err(e) => {
warn!(
%context_id,
error = %e,
claimed_root = %hex::encode(*boundary.boundary_root_hash),
"Could not compute root hash, trusting peer's claimed hash"
);
*boundary.boundary_root_hash
}
};
self.context_client
.force_root_hash(&context_id, root_to_store.into())?;
self.context_client
.update_dag_heads(&context_id, boundary.dag_heads.clone())?;
self.clear_sync_in_progress_marker(context_id)?;
info!(%context_id, applied_records, "Snapshot sync completed successfully");
Ok(SnapshotSyncResult {
boundary_root_hash: boundary.boundary_root_hash,
dag_heads: boundary.dag_heads,
applied_records,
})
}
async fn request_snapshot_boundary(
&self,
context_id: ContextId,
stream: &mut Stream,
) -> Result<SnapshotBoundary> {
use calimero_node_primitives::sync::InitPayload;
let identities = self
.context_client
.get_context_members(&context_id, Some(true));
let Some((our_identity, _)) =
crate::utils::choose_stream(identities, &mut rand::thread_rng())
.await
.transpose()?
else {
eyre::bail!("No owned identity found for context: {}", context_id);
};
let msg = StreamMessage::Init {
context_id,
party_id: our_identity,
payload: InitPayload::SnapshotBoundaryRequest {
context_id,
requested_cutoff_timestamp: None,
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
let response = super::stream::recv(stream, None, self.sync_config.timeout).await?;
let Some(StreamMessage::Message { payload, .. }) = response else {
eyre::bail!("Unexpected response to snapshot boundary request");
};
match payload {
MessagePayload::SnapshotBoundaryResponse {
boundary_timestamp,
boundary_root_hash,
dag_heads,
} => Ok(SnapshotBoundary {
boundary_timestamp,
boundary_root_hash,
dag_heads,
}),
MessagePayload::SnapshotError { error } => {
eyre::bail!("Snapshot boundary request failed: {:?}", error);
}
_ => eyre::bail!("Unexpected payload in snapshot boundary response"),
}
}
async fn request_and_apply_snapshot_pages(
&self,
context_id: ContextId,
boundary: &SnapshotBoundary,
stream: &mut Stream,
) -> Result<usize> {
use calimero_node_primitives::sync::InitPayload;
let identities = self
.context_client
.get_context_members(&context_id, Some(true));
let Some((our_identity, _)) =
crate::utils::choose_stream(identities, &mut rand::thread_rng())
.await
.transpose()?
else {
eyre::bail!("No owned identity found for context: {}", context_id);
};
self.set_sync_in_progress_marker(context_id, &boundary.boundary_root_hash)?;
let started_at = Instant::now();
let existing_keys: HashSet<[u8; 32]> = {
let handle = self.context_client.datastore_handle();
collect_context_state_keys(&handle, context_id)?
.into_iter()
.collect()
};
debug!(%context_id, existing_count = existing_keys.len(), "Collected existing state keys");
let mut received_keys: HashSet<[u8; 32]> = HashSet::new();
let mut total_applied = 0;
let mut resume_cursor: Option<Vec<u8>> = None;
let loaded_app_key = calimero_context::hlc_fence::loaded_reader_app_key(
self.context_client.datastore(),
&context_id,
)
.ok()
.flatten();
let mut anchor_writers: HashMap<
Id,
BTreeMap<calimero_primitives::identity::PublicKey, calimero_storage::entities::OpMask>,
> = HashMap::new();
let mut deferred_members: Vec<(Id, Vec<u8>, Vec<u8>)> = Vec::new();
loop {
let msg = StreamMessage::Init {
context_id,
party_id: our_identity,
payload: InitPayload::SnapshotStreamRequest {
context_id,
boundary_root_hash: boundary.boundary_root_hash,
page_limit: DEFAULT_PAGE_LIMIT,
byte_limit: DEFAULT_PAGE_BYTE_LIMIT,
resume_cursor: resume_cursor.clone(),
},
next_nonce: super::helpers::generate_nonce(),
};
super::stream::send(stream, &msg, None).await?;
let mut pages_in_burst = 0;
loop {
let response = super::stream::recv(stream, None, self.sync_config.timeout).await?;
let Some(StreamMessage::Message { payload, .. }) = response else {
eyre::bail!("Unexpected response during snapshot streaming");
};
match payload {
MessagePayload::SnapshotPage {
payload,
uncompressed_len,
cursor,
page_count,
sent_count,
total_records,
} => {
if payload.is_empty() && uncompressed_len == 0 {
self.cleanup_stale_keys(context_id, &existing_keys, &received_keys)?;
return Ok(total_applied);
}
let decompressed = decompress_snapshot_page(&payload, uncompressed_len)?;
let records = decode_snapshot_records(&decompressed)?;
let mut handle = self.context_client.datastore_handle();
let mut applied = 0usize;
let mut rejected = 0usize;
for record in &records {
match record {
SnapshotRecord::Entity {
id,
entry,
index,
schema_app_key,
} => {
if !snapshot_entity_is_readable(*schema_app_key, loaded_app_key)
{
if let Err(e) = self.buffer_future_schema_snapshot_entity(
context_id,
*id,
entry,
index,
schema_app_key.expect("gate only declines when Some"),
) {
warn!(
%context_id,
id = ?id,
error = ?e,
"snapshot Entity record: failed to buffer \
future-schema entity into the absorb buffer"
);
}
rejected += 1;
continue;
}
let index_entity: calimero_storage::index::EntityIndex =
match borsh::from_slice(index) {
Ok(idx) => idx,
Err(e) => {
warn!(
%context_id,
id = ?id,
error = ?e,
"snapshot Entity record: index blob \
failed to deserialize as EntityIndex — \
dropping"
);
rejected += 1;
continue;
}
};
let id_obj = Id::new(*id);
if matches!(
index_entity.metadata.storage_type,
calimero_storage::entities::StorageType::SharedMember { .. }
) {
deferred_members.push((
id_obj,
entry.clone(),
index.clone(),
));
continue;
}
if let Err(e) =
Interface::<MainStorage>::verify_snapshot_entity_signature(
id_obj,
entry,
&index_entity.metadata,
)
{
warn!(
%context_id,
id = ?id,
error = ?e,
storage_type = ?index_entity.metadata.storage_type,
"snapshot Entity record: signature \
verification failed — dropping"
);
rejected += 1;
continue;
}
let entry_state_key = StorageKey::Entry(id_obj).to_bytes();
let index_state_key = StorageKey::Index(id_obj).to_bytes();
let entry_key =
ContextStateKey::new(context_id, entry_state_key);
let index_key =
ContextStateKey::new(context_id, index_state_key);
let entry_slice: Slice<'_> = entry.clone().into();
let index_slice: Slice<'_> = index.clone().into();
handle
.put(&entry_key, &ContextStateValue::from(entry_slice))?;
handle
.put(&index_key, &ContextStateValue::from(index_slice))?;
let _ = received_keys.insert(entry_state_key);
let _ = received_keys.insert(index_state_key);
let _ = received_keys
.insert(StorageKey::RotationLog(id_obj).to_bytes());
applied += 1;
if let calimero_storage::entities::StorageType::Shared {
writers,
..
} = &index_entity.metadata.storage_type
{
let _ = anchor_writers.insert(id_obj, writers.clone());
if let Err(e) =
crate::delta_store::seed_rotation_log_genesis_direct(
&self.context_client,
context_id,
id_obj,
writers.clone(),
)
{
warn!(
%context_id,
id = ?id_obj.as_bytes(),
error = ?e,
"snapshot: failed to seed anchor rotation-log floor"
);
}
}
}
SnapshotRecord::Auxiliary { kind, id, .. } => {
warn!(
%context_id,
kind,
id = ?id,
"snapshot Auxiliary record: rejecting — no kind \
currently has per-record authentication (issue \
#2387 follow-up: sign each rotation-log entry \
at write time)"
);
rejected += 1;
continue;
}
}
}
if rejected > 0 {
warn!(
%context_id,
applied,
rejected,
page_records = records.len(),
"snapshot page applied with rejections"
);
}
total_applied += applied;
pages_in_burst += 1;
debug!(
%context_id,
pages_in_burst,
page_count,
sent_count,
total_applied,
"Applied snapshot page"
);
self.emit_snapshot_progress(
context_id,
total_applied as u64,
total_records,
started_at.elapsed(),
);
let is_last_in_burst = sent_count == page_count;
if is_last_in_burst {
match cursor {
None => {
if !deferred_members.is_empty() {
let mut handle = self.context_client.datastore_handle();
for (id_obj, entry, index) in deferred_members.drain(..) {
let metadata = match borsh::from_slice::<
calimero_storage::index::EntityIndex,
>(
&index
) {
Ok(idx) => idx.metadata,
Err(e) => {
warn!(
%context_id,
id = ?id_obj.as_bytes(),
error = ?e,
"snapshot deferred SharedMember: index \
blob failed to deserialize — dropping"
);
continue;
}
};
let anchor = match &metadata.storage_type {
calimero_storage::entities::StorageType::SharedMember {
anchor,
..
} => *anchor,
_ => continue,
};
let Some(writers) = anchor_writers.get(&anchor) else {
warn!(
%context_id,
id = ?id_obj.as_bytes(),
anchor = ?anchor.as_bytes(),
"snapshot deferred SharedMember: anchor not \
present in snapshot — dropping (member's \
writers unresolvable)"
);
continue;
};
if let Err(e) =
Interface::<MainStorage>::verify_snapshot_member_signature(
id_obj, &entry, &metadata, writers,
)
{
warn!(
%context_id,
id = ?id_obj.as_bytes(),
error = ?e,
"snapshot deferred SharedMember: signature \
verification failed — dropping"
);
continue;
}
let entry_state_key =
StorageKey::Entry(id_obj).to_bytes();
let index_state_key =
StorageKey::Index(id_obj).to_bytes();
let entry_key =
ContextStateKey::new(context_id, entry_state_key);
let index_key =
ContextStateKey::new(context_id, index_state_key);
let entry_slice: Slice<'_> = entry.into();
let index_slice: Slice<'_> = index.into();
handle.put(
&entry_key,
&ContextStateValue::from(entry_slice),
)?;
handle.put(
&index_key,
&ContextStateValue::from(index_slice),
)?;
let _ = received_keys.insert(entry_state_key);
let _ = received_keys.insert(index_state_key);
let _ = received_keys
.insert(StorageKey::RotationLog(id_obj).to_bytes());
total_applied += 1;
}
}
self.cleanup_stale_keys(
context_id,
&existing_keys,
&received_keys,
)?;
return Ok(total_applied);
}
Some(c) => {
resume_cursor = Some(c);
break; }
}
}
}
MessagePayload::SnapshotError { error } => {
eyre::bail!("Snapshot streaming failed: {:?}", error);
}
_ => eyre::bail!("Unexpected payload during snapshot streaming"),
}
}
}
}
fn cleanup_stale_keys(
&self,
context_id: ContextId,
existing_keys: &std::collections::HashSet<[u8; 32]>,
received_keys: &std::collections::HashSet<[u8; 32]>,
) -> Result<()> {
let mut handle = self.context_client.datastore_handle();
let mut deleted = 0;
for state_key in existing_keys.difference(received_keys) {
handle.delete(&ContextStateKey::new(context_id, *state_key))?;
deleted += 1;
}
if deleted > 0 {
debug!(%context_id, deleted, "Cleaned up stale keys");
}
Ok(())
}
fn set_sync_in_progress_marker(
&self,
context_id: ContextId,
boundary_root_hash: &Hash,
) -> Result<()> {
use calimero_store::types::GenericData;
let key = GenericKey::new(SYNC_IN_PROGRESS_SCOPE, *context_id);
let value_bytes = borsh::to_vec(boundary_root_hash)?;
let value: GenericData<'_> = Slice::from(value_bytes).into();
let mut handle = self.context_client.datastore_handle();
handle.put(&key, &value)?;
debug!(%context_id, "Set sync-in-progress marker");
Ok(())
}
fn emit_snapshot_progress(
&self,
context_id: ContextId,
records_received: u64,
total_records: u64,
elapsed: std::time::Duration,
) {
let (percent, eta_secs) =
snapshot_progress_estimate(records_received, total_records, elapsed);
let state = SyncState::ReceivingSnapshot {
records_received,
percent,
eta_secs,
};
let handle = self.node_state.sync_status_handle();
let (failure_count, last_error) = match handle.get(&context_id) {
Some(prev) => (prev.failure_count, prev.last_error.clone()),
None => (0, None),
};
let _prev = handle.insert(
context_id,
SyncStatusSnapshot {
state,
failure_count,
last_error: last_error.clone(),
},
);
let event = NodeEvent::Context(ContextEvent {
context_id,
payload: ContextEventPayload::SyncStatus(SyncStatusPayload {
sync_state: state,
failure_count,
last_error,
}),
});
if let Err(err) = self.node_client.send_event(event) {
debug!(%context_id, %err, "failed to emit snapshot-progress event");
}
}
fn clear_sync_in_progress_marker(&self, context_id: ContextId) -> Result<()> {
let key = GenericKey::new(SYNC_IN_PROGRESS_SCOPE, *context_id);
let mut handle = self.context_client.datastore_handle();
handle.delete(&key)?;
debug!(%context_id, "Cleared sync-in-progress marker");
Ok(())
}
pub fn check_sync_in_progress(&self, context_id: ContextId) -> Result<Option<Hash>> {
let key = GenericKey::new(SYNC_IN_PROGRESS_SCOPE, *context_id);
let handle = self.context_client.datastore_handle();
let value_opt = handle.get(&key)?;
match value_opt {
Some(value) => {
let bytes: Vec<u8> = value.as_ref().to_vec();
let hash: Hash = borsh::from_slice(&bytes)?;
Ok(Some(hash))
}
None => Ok(None),
}
}
fn buffer_future_schema_snapshot_entity(
&self,
context_id: ContextId,
id: [u8; 32],
entry: &[u8],
index: &[u8],
schema: [u8; 32],
) -> Result<()> {
let record = calimero_context::group_store::AbsorbRecord::from_snapshot_entity(
id,
entry.to_vec(),
index.to_vec(),
schema,
);
calimero_context::group_store::AbsorbRepository::new(self.context_client.datastore())
.save(&context_id, schema, &record)?;
crate::node_metrics::record_delta_outcome("absorbed_snapshot_entity_future_schema");
warn!(
%context_id,
id = ?id,
?schema,
"snapshot entity authored under a newer schema than the loaded reader \
— buffered into the absorb buffer instead of storing unreadable bytes \
(will re-verify + persist once the reader advances)"
);
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SnapshotEntityDrainOutcome {
Persisted,
RedrivenElsewhere,
Pending,
}
pub(crate) fn persist_buffered_snapshot_entity(
handle: &mut calimero_store::Handle<calimero_store::Store>,
context_id: ContextId,
id: [u8; 32],
entry: &[u8],
index: &[u8],
) -> Result<SnapshotEntityDrainOutcome> {
let index_entity: calimero_storage::index::EntityIndex = match borsh::from_slice(index) {
Ok(idx) => idx,
Err(e) => {
warn!(%context_id, id = ?id, error = ?e,
"absorb entity drain: index blob failed to deserialize — leaving pending");
return Ok(SnapshotEntityDrainOutcome::Pending);
}
};
let id_obj = Id::new(id);
if matches!(
index_entity.metadata.storage_type,
calimero_storage::entities::StorageType::SharedMember { .. }
) {
warn!(%context_id, id = ?id,
"absorb entity drain: SharedMember is re-applied by the snapshot pass-2 \
re-drive (anchor-writer authentication) — deleting the orphaned buffer \
record now the reader has advanced to its schema");
return Ok(SnapshotEntityDrainOutcome::RedrivenElsewhere);
}
if let Err(e) = Interface::<MainStorage>::verify_snapshot_entity_signature(
id_obj,
entry,
&index_entity.metadata,
) {
warn!(%context_id, id = ?id, error = ?e,
"absorb entity drain: signature verification failed — leaving pending");
return Ok(SnapshotEntityDrainOutcome::Pending);
}
let entry_key = ContextStateKey::new(context_id, StorageKey::Entry(id_obj).to_bytes());
let index_key = ContextStateKey::new(context_id, StorageKey::Index(id_obj).to_bytes());
let entry_slice: Slice<'_> = entry.to_vec().into();
let index_slice: Slice<'_> = index.to_vec().into();
handle.put(&entry_key, &ContextStateValue::from(entry_slice))?;
handle.put(&index_key, &ContextStateValue::from(index_slice))?;
Ok(SnapshotEntityDrainOutcome::Persisted)
}
#[derive(Debug)]
pub struct SnapshotSyncResult {
pub boundary_root_hash: Hash,
pub dag_heads: Vec<[u8; 32]>,
pub applied_records: usize,
}
struct SnapshotBoundary {
#[allow(dead_code)]
boundary_timestamp: u64,
boundary_root_hash: Hash,
dag_heads: Vec<[u8; 32]>,
}
fn generate_snapshot_pages<L: calimero_store::layer::ReadLayer>(
handle: &calimero_store::Handle<L>,
context_id: ContextId,
start_cursor: Option<&SnapshotCursor>,
page_limit: u16,
byte_limit: u32,
schema_app_key: Option<[u8; 32]>,
) -> Result<(Vec<Vec<u8>>, Option<SnapshotCursor>, u64)> {
let mut iter = handle.iter_snapshot::<ContextStateKey>()?;
let mut present_keys: HashSet<[u8; 32]> = HashSet::new();
let mut entity_ids: Vec<Id> = Vec::new();
for (key_result, value_result) in iter.entries() {
let key = key_result?;
let value = value_result?;
if key.context_id() != context_id {
continue;
}
let state_key = key.state_key();
if let Ok(index_entity) =
borsh::from_slice::<calimero_storage::index::EntityIndex>(value.value.as_ref())
{
if StorageKey::Index(index_entity.id()).to_bytes() == state_key {
entity_ids.push(index_entity.id());
}
}
let _ = present_keys.insert(state_key);
}
let total_records = present_keys.len();
entity_ids.sort_by(|a, b| a.as_bytes().cmp(b.as_bytes()));
let start_after_id = start_cursor.map(|c| c.last_key);
let mut consumed_keys: HashSet<[u8; 32]> = HashSet::new();
let mut orphan_index_without_entry: u64 = 0;
let mut orphan_entry_without_index: u64 = 0;
let mut total_entries: u64 = 0;
for id in &entity_ids {
let id_bytes = *id.as_bytes();
let index_key = StorageKey::Index(*id).to_bytes();
let entry_key = StorageKey::Entry(*id).to_bytes();
let rotation_log_key = StorageKey::RotationLog(*id).to_bytes();
let has_index = present_keys.contains(&index_key);
let has_entry = present_keys.contains(&entry_key);
let has_rotation_log = present_keys.contains(&rotation_log_key);
if has_index && has_entry {
total_entries += 1;
}
if let Some(after) = start_after_id {
if id_bytes <= after {
if has_index {
let _ = consumed_keys.insert(index_key);
}
if has_entry {
let _ = consumed_keys.insert(entry_key);
}
if has_rotation_log {
let _ = consumed_keys.insert(rotation_log_key);
}
continue;
}
}
match (has_index, has_entry) {
(true, true) => {
let _ = consumed_keys.insert(index_key);
let _ = consumed_keys.insert(entry_key);
}
(true, false) => {
debug!(
%context_id, id = ?id_bytes,
"dropping orphan Index (no matching Entry) — would be \
unverifiable on the receiver"
);
orphan_index_without_entry += 1;
}
(false, true) => {
debug!(
%context_id, id = ?id_bytes,
"unreachable: entity_id without matching Index in present_keys"
);
orphan_entry_without_index += 1;
}
(false, false) => {}
}
if has_rotation_log {
let _ = consumed_keys.insert(rotation_log_key);
}
}
let unrecognized_count =
u64::try_from(total_records.saturating_sub(consumed_keys.len())).unwrap_or(u64::MAX);
if unrecognized_count > 0 {
warn!(
%context_id,
unrecognized_count,
orphan_index_without_entry,
orphan_entry_without_index,
"snapshot generation: dropping non-bundle records (orphans + truly \
unrecognized) — well-formed state trees shouldn't have these"
);
}
let emit_start = match start_after_id {
Some(after) => entity_ids.partition_point(|id| *id.as_bytes() <= after),
None => 0,
};
let mut pages: Vec<Vec<u8>> = Vec::new();
let mut current_page: Vec<u8> = Vec::new();
let mut last_id: Option<[u8; 32]> = None;
for id in &entity_ids[emit_start..] {
let id_bytes = *id.as_bytes();
let index_key = StorageKey::Index(*id).to_bytes();
let entry_key = StorageKey::Entry(*id).to_bytes();
if !(present_keys.contains(&index_key) && present_keys.contains(&entry_key)) {
continue;
}
let Some(index) = handle.get(&ContextStateKey::new(context_id, index_key))? else {
warn!(
%context_id, id = ?id_bytes,
"snapshot emit: Index value vanished between scan and read \
(concurrent delete?) — skipping entity; root_hash recheck guards correctness"
);
continue;
};
let index = index.value.to_vec();
let Some(entry) = handle.get(&ContextStateKey::new(context_id, entry_key))? else {
warn!(
%context_id, id = ?id_bytes,
"snapshot emit: Entry value vanished between scan and read \
(concurrent delete?) — skipping entity; root_hash recheck guards correctness"
);
continue;
};
let entry = entry.value.to_vec();
let record_bytes = encode_framed_snapshot_record(&SnapshotRecord::Entity {
id: id_bytes,
entry,
index,
schema_app_key,
})?;
if !current_page.is_empty() && (current_page.len() + record_bytes.len()) as u32 > byte_limit
{
pages.push(std::mem::take(&mut current_page));
if pages.len() >= page_limit as usize {
return Ok((
pages,
last_id.map(|k| SnapshotCursor { last_key: k }),
total_entries,
));
}
}
if current_page.is_empty() {
current_page.push(SNAPSHOT_PAGE_FORMAT_V2);
}
current_page.extend(record_bytes);
last_id = Some(id_bytes);
}
if !current_page.is_empty() {
pages.push(current_page);
}
Ok((pages, None, total_entries))
}
fn encode_framed_snapshot_record(record: &SnapshotRecord) -> Result<Vec<u8>> {
let body = borsh::to_vec(record)?;
let len = u32::try_from(body.len())
.map_err(|_| eyre::eyre!("snapshot record exceeds u32 length frame"))?;
let mut framed = Vec::with_capacity(4 + body.len());
framed.extend_from_slice(&len.to_le_bytes());
framed.extend(body);
Ok(framed)
}
fn snapshot_progress_estimate(
records_received: u64,
total_records: u64,
elapsed: std::time::Duration,
) -> (Option<u8>, Option<u64>) {
if total_records == 0 {
return (None, None);
}
let percent = (records_received.saturating_mul(100) / total_records).min(100) as u8;
let secs = elapsed.as_secs_f64();
let eta = if records_received > 0 && secs > 0.0 {
let rate = records_received as f64 / secs; let remaining = total_records.saturating_sub(records_received) as f64;
let est = (remaining / rate).ceil();
if est.is_finite() {
Some(est.min(u64::MAX as f64) as u64)
} else {
None
}
} else {
None
};
(Some(percent), eta)
}
fn decompress_snapshot_page(payload: &[u8], uncompressed_len: u32) -> Result<Vec<u8>> {
if uncompressed_len > MAX_SNAPSHOT_PAGE_SIZE {
eyre::bail!(
"Snapshot page uncompressed size {} exceeds limit {}",
uncompressed_len,
MAX_SNAPSHOT_PAGE_SIZE
);
}
let prefix = payload
.get(..4)
.ok_or_else(|| eyre::eyre!("Snapshot page payload too short"))?;
let declared = u32::from_le_bytes(prefix.try_into().expect("4-byte prefix"));
if declared != uncompressed_len {
eyre::bail!(
"Snapshot page size prefix {} disagrees with declared length {}",
declared,
uncompressed_len
);
}
let block = &payload[4..];
let mut decompressed = vec![0u8; uncompressed_len as usize];
let written = lz4_flex::decompress_into(block, &mut decompressed)
.map_err(|e| eyre::eyre!("Decompress failed: {}", e))?;
if written != uncompressed_len as usize {
eyre::bail!(
"Size mismatch: declared {} bytes, decompressed {} bytes",
uncompressed_len,
written
);
}
Ok(decompressed)
}
fn decode_snapshot_records(payload: &[u8]) -> Result<Vec<SnapshotRecord>> {
if payload.first() == Some(&SNAPSHOT_PAGE_FORMAT_V2) {
return decode_framed_snapshot_records(&payload[1..]);
}
decode_legacy_snapshot_records(payload)
}
fn decode_framed_snapshot_records(mut remaining: &[u8]) -> Result<Vec<SnapshotRecord>> {
let mut records = Vec::new();
while !remaining.is_empty() {
if remaining.len() < 4 {
eyre::bail!("snapshot page truncated: dangling record length frame");
}
let len =
u32::from_le_bytes([remaining[0], remaining[1], remaining[2], remaining[3]]) as usize;
let body_start = 4usize;
let body_end = body_start
.checked_add(len)
.filter(|end| *end <= remaining.len())
.ok_or_else(|| eyre::eyre!("snapshot record length frame overruns page"))?;
let body = &remaining[body_start..body_end];
let record = borsh::from_slice::<SnapshotRecord>(body)?;
records.push(record);
remaining = &remaining[body_end..];
}
Ok(records)
}
fn decode_legacy_snapshot_records(payload: &[u8]) -> Result<Vec<SnapshotRecord>> {
let mut records = Vec::new();
let mut remaining = payload;
while !remaining.is_empty() {
let mut cursor = remaining;
let record = decode_legacy_record(&mut cursor)?;
let consumed = remaining.len() - cursor.len();
if consumed == 0 {
eyre::bail!("snapshot record deserialization made no progress");
}
remaining = cursor;
records.push(record);
}
Ok(records)
}
fn decode_legacy_record<R: borsh::io::Read>(reader: &mut R) -> Result<SnapshotRecord> {
let variant = u8::deserialize_reader(reader)?;
match variant {
0 => {
let id = <[u8; 32]>::deserialize_reader(reader)?;
let entry = Vec::<u8>::deserialize_reader(reader)?;
let index = Vec::<u8>::deserialize_reader(reader)?;
Ok(SnapshotRecord::Entity {
id,
entry,
index,
schema_app_key: None,
})
}
1 => {
let kind = u8::deserialize_reader(reader)?;
let id = <[u8; 32]>::deserialize_reader(reader)?;
let value = Vec::<u8>::deserialize_reader(reader)?;
Ok(SnapshotRecord::Auxiliary { kind, id, value })
}
other => eyre::bail!("invalid legacy SnapshotRecord variant discriminant {other}"),
}
}
fn has_context_state_keys<L: calimero_store::layer::ReadLayer>(
handle: &calimero_store::Handle<L>,
context_id: ContextId,
) -> Result<bool> {
let mut iter = handle.iter::<ContextStateKey>()?;
for (key_result, _) in iter.entries() {
let key = key_result?;
if key.context_id() == context_id {
return Ok(true); }
}
Ok(false)
}
fn collect_context_state_keys<L: calimero_store::layer::ReadLayer>(
handle: &calimero_store::Handle<L>,
context_id: ContextId,
) -> Result<Vec<[u8; 32]>> {
let mut keys = Vec::new();
let mut iter = handle.iter::<ContextStateKey>()?;
for (key_result, _) in iter.entries() {
let key = key_result?;
if key.context_id() == context_id {
keys.push(key.state_key());
}
}
Ok(keys)
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
use calimero_primitives::context::ContextId;
use calimero_storage::index::EntityIndex;
use calimero_store::db::InMemoryDB;
use calimero_store::Store;
use super::*;
#[test]
fn snapshot_progress_unknown_total_yields_no_estimate() {
let (percent, eta) = snapshot_progress_estimate(5, 0, Duration::from_secs(1));
assert_eq!(percent, None);
assert_eq!(eta, None);
}
#[test]
fn snapshot_progress_percent_is_clamped_and_eta_extrapolates() {
let (percent, eta) = snapshot_progress_estimate(50, 200, Duration::from_secs(5));
assert_eq!(percent, Some(25));
assert_eq!(eta, Some(15));
let (percent, _) = snapshot_progress_estimate(250, 200, Duration::from_secs(1));
assert_eq!(percent, Some(100));
}
#[test]
fn snapshot_progress_eta_none_before_any_record_or_time() {
let (percent, eta) = snapshot_progress_estimate(0, 100, Duration::ZERO);
assert_eq!(percent, Some(0));
assert_eq!(eta, None);
}
#[test]
fn snapshot_progress_complete_reports_zero_eta() {
let (percent, eta) = snapshot_progress_estimate(100, 100, Duration::from_secs(2));
assert_eq!(percent, Some(100));
assert_eq!(eta, Some(0));
}
fn put_entity(store: &Store, ctx: ContextId, id_bytes: [u8; 32], entry_len: usize) {
let id = Id::new(id_bytes);
let index_bytes = borsh::to_vec(&EntityIndex::minimal_for_test(id)).unwrap();
let entry_bytes = vec![0xEE_u8; entry_len];
let mut handle = store.handle();
let index_key = ContextStateKey::new(ctx, StorageKey::Index(id).to_bytes());
handle
.put(
&index_key,
&ContextStateValue::from(Slice::from(index_bytes)),
)
.unwrap();
let entry_key = ContextStateKey::new(ctx, StorageKey::Entry(id).to_bytes());
handle
.put(
&entry_key,
&ContextStateValue::from(Slice::from(entry_bytes)),
)
.unwrap();
}
fn drain_all_pages(
store: &Store,
ctx: ContextId,
page_limit: u16,
byte_limit: u32,
) -> (Vec<[u8; 32]>, Vec<u64>) {
let handle = store.handle();
let mut ids = Vec::new();
let mut totals = Vec::new();
let mut cursor: Option<SnapshotCursor> = None;
let mut completed = false;
for _ in 0..10_000 {
let (pages, next, total) = generate_snapshot_pages(
&handle,
ctx,
cursor.as_ref(),
page_limit,
byte_limit,
None,
)
.unwrap();
totals.push(total);
for page in &pages {
for record in decode_snapshot_records(page).unwrap() {
match record {
SnapshotRecord::Entity { id, .. } => ids.push(id),
SnapshotRecord::Auxiliary { .. } => panic!("unexpected Auxiliary record"),
}
}
}
match next {
Some(c) => cursor = Some(c),
None => {
completed = true;
break;
}
}
}
assert!(
completed,
"drain_all_pages hit its iteration cap before the cursor reached None — \
non-advancing cursor or unexpectedly many pages"
);
(ids, totals)
}
#[test]
fn test_generate_snapshot_pages_empty_context() {
let store = Store::new(Arc::new(InMemoryDB::owned()));
let handle = store.handle();
let ctx = ContextId::from([1u8; 32]);
let (pages, cursor, total) = generate_snapshot_pages(
&handle,
ctx,
None,
DEFAULT_PAGE_LIMIT,
DEFAULT_PAGE_BYTE_LIMIT,
None,
)
.unwrap();
assert!(pages.is_empty());
assert!(cursor.is_none());
assert_eq!(total, 0);
}
#[test]
fn test_generate_snapshot_pages_single_page_round_trips_all_entities() {
let store = Store::new(Arc::new(InMemoryDB::owned()));
let ctx = ContextId::from([2u8; 32]);
let expected: BTreeSet<[u8; 32]> = (0..20u8)
.map(|i| {
let mut id = [0u8; 32];
id[0] = i;
put_entity(&store, ctx, id, 16);
id
})
.collect();
let (pages, cursor, total) = generate_snapshot_pages(
&store.handle(),
ctx,
None,
DEFAULT_PAGE_LIMIT,
1 << 20,
None,
)
.unwrap();
assert!(cursor.is_none(), "single page should not request a resume");
assert_eq!(pages.len(), 1, "20 small entities should fit on one page");
assert_eq!(total, 20);
let mut got = BTreeSet::new();
for page in &pages {
for record in decode_snapshot_records(page).unwrap() {
if let SnapshotRecord::Entity { id, .. } = record {
let _ = got.insert(id);
}
}
}
assert_eq!(got, expected);
}
#[test]
fn test_generate_snapshot_pages_pagination_is_complete_and_dedup() {
let store = Store::new(Arc::new(InMemoryDB::owned()));
let ctx = ContextId::from([3u8; 32]);
let expected: BTreeSet<[u8; 32]> = (0..50u16)
.map(|i| {
let mut id = [0u8; 32];
id[0] = (i / 8) as u8;
id[1] = (i % 8) as u8;
put_entity(&store, ctx, id, 200);
id
})
.collect();
let (ids, totals) = drain_all_pages(&store, ctx, 1, 512);
assert_eq!(ids.len(), expected.len(), "duplicate or dropped entity");
let got: BTreeSet<[u8; 32]> = ids.iter().copied().collect();
assert_eq!(got, expected);
let mut sorted = ids.clone();
sorted.sort();
assert_eq!(ids, sorted, "pages must be emitted in id-sorted order");
assert!(
totals.iter().all(|&t| t == 50),
"total_entries drifted: {totals:?}"
);
}
#[test]
fn test_generate_snapshot_pages_drops_orphan_index_without_entry() {
let store = Store::new(Arc::new(InMemoryDB::owned()));
let ctx = ContextId::from([4u8; 32]);
let mut good = [0u8; 32];
good[0] = 1;
put_entity(&store, ctx, good, 16);
let mut orphan = [0u8; 32];
orphan[0] = 2;
let orphan_id = Id::new(orphan);
let mut handle = store.handle();
let orphan_key = ContextStateKey::new(ctx, StorageKey::Index(orphan_id).to_bytes());
let orphan_bytes = borsh::to_vec(&EntityIndex::minimal_for_test(orphan_id)).unwrap();
handle
.put(
&orphan_key,
&ContextStateValue::from(Slice::from(orphan_bytes)),
)
.unwrap();
let (ids, totals) =
drain_all_pages(&store, ctx, DEFAULT_PAGE_LIMIT, DEFAULT_PAGE_BYTE_LIMIT);
assert_eq!(ids, vec![good]);
assert!(totals.iter().all(|&t| t == 1), "{totals:?}");
}
#[test]
fn test_generate_snapshot_pages_page_limit_boundary_defers_entity() {
let store = Store::new(Arc::new(InMemoryDB::owned()));
let ctx = ContextId::from([5u8; 32]);
let expected: BTreeSet<[u8; 32]> = (0..4u8)
.map(|i| {
let mut id = [0u8; 32];
id[0] = i;
put_entity(&store, ctx, id, 200);
id
})
.collect();
let (ids, totals) = drain_all_pages(&store, ctx, 1, 400);
assert_eq!(
ids.len(),
expected.len(),
"deferred entity dropped or duplicated"
);
let got: BTreeSet<[u8; 32]> = ids.iter().copied().collect();
assert_eq!(got, expected);
let mut sorted = ids.clone();
sorted.sort();
assert_eq!(
ids, sorted,
"entities must arrive in id-sorted order across bursts"
);
assert!(
totals.iter().all(|&t| t == 4),
"total_entries drifted: {totals:?}"
);
}
#[test]
fn test_snapshot_entity_future_schema_is_declined_not_stored() {
let v1 = [1u8; 32];
let v2 = [2u8; 32];
assert!(
!snapshot_entity_is_readable(Some(v2), Some(v1)),
"a v2-authored snapshot entity must be declined by a v1 reader \
(else its bytes are stored unreadable)"
);
assert!(snapshot_entity_is_readable(Some(v1), Some(v1)));
assert!(snapshot_entity_is_readable(None, Some(v1)));
assert!(snapshot_entity_is_readable(Some(v2), None));
assert!(snapshot_entity_is_readable(None, None));
}
#[test]
fn test_persist_buffered_snapshot_entity_sharedmember_is_redriven_not_pending() {
let store = Store::new(Arc::new(InMemoryDB::owned()));
let mut handle = store.handle();
let ctx = ContextId::from([5u8; 32]);
let id = [6u8; 32];
let mut index = EntityIndex::minimal_for_test(Id::new(id));
index.metadata.storage_type = calimero_storage::entities::StorageType::SharedMember {
anchor: Id::new([7u8; 32]),
signature_data: None,
};
let index_bytes = borsh::to_vec(&index).unwrap();
let outcome =
persist_buffered_snapshot_entity(&mut handle, ctx, id, &[1, 2, 3], &index_bytes)
.unwrap();
assert_eq!(
outcome,
SnapshotEntityDrainOutcome::RedrivenElsewhere,
"a matching-schema SharedMember must be marked redriven (delete), not left pending"
);
let pending =
persist_buffered_snapshot_entity(&mut handle, ctx, id, &[1], &[0xFF, 0xFF]).unwrap();
assert_eq!(pending, SnapshotEntityDrainOutcome::Pending);
}
#[test]
fn test_decode_snapshot_records_empty() {
let records = decode_snapshot_records(&[]).unwrap();
assert!(records.is_empty());
}
#[test]
fn test_decode_snapshot_records_single_entity() {
let record = SnapshotRecord::Entity {
id: [1u8; 32],
entry: vec![10, 20, 30],
index: vec![40, 50, 60],
schema_app_key: None,
};
let encoded = build_snapshot_page_v2(std::slice::from_ref(&record));
let records = decode_snapshot_records(&encoded).unwrap();
assert_eq!(records.len(), 1);
match &records[0] {
SnapshotRecord::Entity {
id, entry, index, ..
} => {
assert_eq!(*id, [1u8; 32]);
assert_eq!(entry, &vec![10, 20, 30]);
assert_eq!(index, &vec![40, 50, 60]);
}
_ => panic!("expected Entity record"),
}
}
fn build_snapshot_page_v2(records: &[SnapshotRecord]) -> Vec<u8> {
let mut page = vec![SNAPSHOT_PAGE_FORMAT_V2];
for record in records {
page.extend(encode_framed_snapshot_record(record).unwrap());
}
page
}
#[test]
fn test_decode_legacy_multi_record_page_no_desync() {
fn legacy_entity_bytes(id: [u8; 32], entry: Vec<u8>, index: Vec<u8>) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.push(0u8); bytes.extend_from_slice(&id);
bytes.extend_from_slice(&borsh::to_vec(&entry).unwrap());
bytes.extend_from_slice(&borsh::to_vec(&index).unwrap());
bytes
}
let mut page = legacy_entity_bytes([1u8; 32], vec![10, 20, 30], vec![40, 50]);
page.extend(legacy_entity_bytes([2u8; 32], vec![60], vec![70, 80, 90]));
let records = decode_snapshot_records(&page).expect("legacy multi-record page decodes");
assert_eq!(records.len(), 2, "both legacy records must decode");
assert_eq!(
records[0],
SnapshotRecord::Entity {
id: [1u8; 32],
entry: vec![10, 20, 30],
index: vec![40, 50],
schema_app_key: None,
}
);
assert_eq!(
records[1],
SnapshotRecord::Entity {
id: [2u8; 32],
entry: vec![60],
index: vec![70, 80, 90],
schema_app_key: None,
}
);
}
#[test]
fn test_decode_v2_framed_page_round_trips_schema() {
let mut page = build_snapshot_page_v2(&[
SnapshotRecord::Entity {
id: [3u8; 32],
entry: vec![1, 2],
index: vec![3, 4],
schema_app_key: Some([7u8; 32]),
},
SnapshotRecord::Auxiliary {
kind: snapshot_record_kind::ROTATION_LOG,
id: [4u8; 32],
value: vec![5, 6, 7],
},
]);
let records = decode_snapshot_records(&page).unwrap();
assert_eq!(records.len(), 2);
assert_eq!(
records[0],
SnapshotRecord::Entity {
id: [3u8; 32],
entry: vec![1, 2],
index: vec![3, 4],
schema_app_key: Some([7u8; 32]),
}
);
assert!(matches!(
&records[1],
SnapshotRecord::Auxiliary { kind, id, value }
if *kind == snapshot_record_kind::ROTATION_LOG
&& *id == [4u8; 32]
&& value == &vec![5, 6, 7]
));
page.clear();
}
#[test]
fn test_decode_snapshot_records_mixed() {
let entity = SnapshotRecord::Entity {
id: [1u8; 32],
entry: vec![10],
index: vec![20],
schema_app_key: None,
};
let aux = SnapshotRecord::Auxiliary {
kind: snapshot_record_kind::ROTATION_LOG,
id: [2u8; 32],
value: vec![30, 31],
};
let encoded = build_snapshot_page_v2(&[entity, aux]);
let records = decode_snapshot_records(&encoded).unwrap();
assert_eq!(records.len(), 2);
assert!(matches!(
&records[0],
SnapshotRecord::Entity { id, .. } if *id == [1u8; 32]
));
assert!(matches!(
&records[1],
SnapshotRecord::Auxiliary { kind, id, .. }
if *kind == snapshot_record_kind::ROTATION_LOG && *id == [2u8; 32]
));
}
#[test]
fn test_decompress_snapshot_page_round_trips() {
let original = vec![7u8; 4096];
let payload = lz4_flex::compress_prepend_size(&original);
let out = decompress_snapshot_page(&payload, original.len() as u32).unwrap();
assert_eq!(out, original);
}
#[test]
fn test_decompress_snapshot_page_accepts_oversized_single_record_page() {
let original = vec![9u8; (DEFAULT_PAGE_BYTE_LIMIT as usize) * 4];
let payload = lz4_flex::compress_prepend_size(&original);
let out = decompress_snapshot_page(&payload, original.len() as u32).unwrap();
assert_eq!(out, original);
}
#[test]
fn test_decompress_snapshot_page_rejects_oversized_declared_len() {
let payload = lz4_flex::compress_prepend_size(&[0u8; 16]);
let err = decompress_snapshot_page(&payload, MAX_SNAPSHOT_PAGE_SIZE + 1).unwrap_err();
assert!(err.to_string().contains("exceeds limit"), "{err}");
}
#[test]
fn test_decompress_snapshot_page_rejects_inconsistent_size_prefix() {
let real = vec![3u8; 256];
let mut payload = lz4_flex::compress_prepend_size(&real);
payload[0..4].copy_from_slice(&u32::MAX.to_le_bytes());
let err = decompress_snapshot_page(&payload, 256).unwrap_err();
assert!(err.to_string().contains("disagrees"), "{err}");
}
#[test]
fn test_decompress_snapshot_page_resists_expansion_beyond_buffer() {
let real = vec![3u8; 4096];
let mut payload = lz4_flex::compress_prepend_size(&real);
payload[0..4].copy_from_slice(&8u32.to_le_bytes());
let err = decompress_snapshot_page(&payload, 8).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("Decompress failed") || msg.contains("Size mismatch"),
"{msg}"
);
}
#[test]
fn test_decompress_snapshot_page_rejects_short_payload() {
let err = decompress_snapshot_page(&[1, 2, 3], 8).unwrap_err();
assert!(err.to_string().contains("too short"), "{err}");
}
}