use std::sync::Arc;
use std::time::{Duration, Instant};
use calimero_blobstore::BlobManager as BlobStore;
use calimero_context_client::client::ContextClient;
use calimero_node_primitives::client::NodeClient;
use calimero_primitives::{blobs::BlobId, context::ContextId};
use dashmap::DashMap;
use tracing::{debug, warn};
use crate::constants;
use crate::delta_store::DeltaStore;
use crate::run::NodeMode;
use crate::specialized_node_invite_state::{
new_pending_specialized_node_invites, PendingSpecializedNodeInvites,
};
use crate::sync::SyncManager;
#[derive(Debug, Clone)]
pub struct CachedBlob {
pub data: Arc<[u8]>,
pub last_accessed: Instant,
}
impl CachedBlob {
pub fn new(data: Arc<[u8]>) -> Self {
Self {
data,
last_accessed: Instant::now(),
}
}
pub fn touch(&mut self) {
self.last_accessed = Instant::now();
}
}
#[derive(Debug, Clone)]
pub(crate) struct NodeClients {
pub(crate) context: ContextClient,
pub(crate) node: NodeClient,
}
#[derive(Clone, Debug)]
pub(crate) struct NodeManagers {
pub(crate) blobstore: BlobStore,
pub(crate) sync: SyncManager,
}
#[derive(Debug)]
pub(crate) enum SyncSessionState {
BufferingDeltas,
}
impl SyncSessionState {
pub fn should_buffer_deltas(&self) -> bool {
matches!(self, Self::BufferingDeltas)
}
}
#[derive(Debug)]
pub(crate) struct SyncSession {
pub state: SyncSessionState,
pub delta_buffer: calimero_node_primitives::delta_buffer::DeltaBuffer,
pub last_drop_warning: Option<Instant>,
}
#[derive(Clone, Debug)]
pub(crate) struct NodeState {
pub(crate) blob_cache: Arc<DashMap<BlobId, CachedBlob>>,
pub(crate) delta_stores: Arc<DashMap<ContextId, DeltaStore>>,
pub(crate) pending_specialized_node_invites: PendingSpecializedNodeInvites,
pub(crate) accept_mock_tee: bool,
pub(crate) node_mode: NodeMode,
pub(crate) sync_sessions: Arc<DashMap<ContextId, SyncSession>>,
}
impl NodeState {
pub(crate) fn blob_cache_handle(&self) -> Arc<DashMap<BlobId, CachedBlob>> {
self.blob_cache.clone()
}
pub(crate) fn delta_stores_handle(&self) -> Arc<DashMap<ContextId, DeltaStore>> {
self.delta_stores.clone()
}
pub(crate) fn pending_specialized_node_invites_handle(&self) -> PendingSpecializedNodeInvites {
self.pending_specialized_node_invites.clone()
}
pub(crate) const fn accept_mock_tee(&self) -> bool {
self.accept_mock_tee
}
pub(crate) const fn node_mode(&self) -> NodeMode {
self.node_mode
}
pub(crate) fn new(accept_mock_tee: bool, node_mode: NodeMode) -> Self {
Self {
blob_cache: Arc::new(DashMap::new()),
delta_stores: Arc::new(DashMap::new()),
pending_specialized_node_invites: new_pending_specialized_node_invites(),
accept_mock_tee,
node_mode,
sync_sessions: Arc::new(DashMap::new()),
}
}
pub(crate) fn should_buffer_delta(&self, context_id: &ContextId) -> bool {
self.sync_sessions
.get(context_id)
.is_some_and(|session| session.state.should_buffer_deltas())
}
pub(crate) fn buffer_delta(
&self,
context_id: &ContextId,
delta: calimero_node_primitives::delta_buffer::BufferedDelta,
) -> Option<calimero_node_primitives::delta_buffer::PushResult> {
use calimero_node_primitives::delta_buffer::PushResult;
if let Some(mut session) = self.sync_sessions.get_mut(context_id) {
let incoming_delta_id = delta.id;
let result = session.delta_buffer.push(delta);
if result.had_data_loss() {
let should_warn = session.last_drop_warning.is_none_or(|last| {
last.elapsed()
> Duration::from_secs(constants::DELTA_BUFFER_DROP_WARNING_RATE_LIMIT_S)
});
if should_warn {
session.last_drop_warning = Some(Instant::now());
let (evicted_id, reason) = match &result {
PushResult::Evicted(id) => (id, "buffer overflow"),
PushResult::DroppedZeroCapacity(id) => (id, "zero capacity"),
_ => unreachable!(),
};
warn!(
%context_id,
lost_delta_id = ?evicted_id,
incoming_delta_id = ?incoming_delta_id,
reason = reason,
drops = session.delta_buffer.drops(),
buffer_size = session.delta_buffer.len(),
capacity = session.delta_buffer.capacity(),
"Delta buffer data loss - {} (I6 violation risk)",
reason
);
}
}
Some(result)
} else {
None }
}
pub(crate) fn start_sync_session(&self, context_id: ContextId, sync_start_hlc: u64) {
self.start_sync_session_with_capacity(
context_id,
sync_start_hlc,
calimero_node_primitives::delta_buffer::DEFAULT_BUFFER_CAPACITY,
);
}
pub(crate) fn start_sync_session_with_capacity(
&self,
context_id: ContextId,
sync_start_hlc: u64,
capacity: usize,
) {
use calimero_node_primitives::delta_buffer::{DeltaBuffer, MIN_RECOMMENDED_CAPACITY};
if capacity < MIN_RECOMMENDED_CAPACITY {
warn!(
%context_id,
capacity,
min_recommended = MIN_RECOMMENDED_CAPACITY,
"Delta buffer capacity below recommended minimum - may cause excessive data loss"
);
}
debug!(
%context_id,
sync_start_hlc,
capacity,
"Starting sync session with delta buffering"
);
self.sync_sessions.insert(
context_id,
SyncSession {
state: SyncSessionState::BufferingDeltas,
delta_buffer: DeltaBuffer::new(capacity, sync_start_hlc),
last_drop_warning: None,
},
);
}
pub(crate) fn end_sync_session(
&self,
context_id: &ContextId,
) -> Option<Vec<calimero_node_primitives::delta_buffer::BufferedDelta>> {
if let Some((_, mut session)) = self.sync_sessions.remove(context_id) {
let drops = session.delta_buffer.drops();
let buffered_count = session.delta_buffer.len();
if drops > 0 {
warn!(
%context_id,
drops,
buffered_count,
"Sync session ended with {} dropped deltas (I6 partial violation)",
drops
);
} else {
debug!(
%context_id,
buffered_count,
"Sync session ended successfully"
);
}
Some(session.delta_buffer.drain())
} else {
None
}
}
pub(crate) fn cancel_sync_session(&self, context_id: &ContextId) {
if let Some((_, session)) = self.sync_sessions.remove(context_id) {
let drops = session.delta_buffer.drops();
let buffered_count = session.delta_buffer.len();
warn!(
%context_id,
buffered_count,
drops,
"Sync session cancelled - discarding buffered deltas"
);
}
}
pub(crate) fn evict_old_blobs(&self) {
let now = Instant::now();
let before_count = self.blob_cache.len();
self.blob_cache.retain(|_, cached_blob| {
now.duration_since(cached_blob.last_accessed)
< Duration::from_secs(constants::MAX_BLOB_AGE_S)
});
let after_time_eviction = self.blob_cache.len();
if self.blob_cache.len() > constants::MAX_BLOB_CACHE_COUNT {
let mut blobs: Vec<_> = self
.blob_cache
.iter()
.map(|entry| (*entry.key(), entry.value().last_accessed))
.collect();
blobs.sort_by_key(|(_, accessed)| *accessed);
let to_remove = self.blob_cache.len() - constants::MAX_BLOB_CACHE_COUNT;
for (blob_id, _) in blobs.iter().take(to_remove) {
let _removed = self.blob_cache.remove(blob_id);
}
}
let after_count_eviction = self.blob_cache.len();
let total_size: usize = self
.blob_cache
.iter()
.map(|entry| entry.value().data.len())
.sum();
if total_size > constants::MAX_BLOB_CACHE_SIZE_BYTES {
let mut blobs: Vec<_> = self
.blob_cache
.iter()
.map(|entry| {
(
*entry.key(),
entry.value().last_accessed,
entry.value().data.len(),
)
})
.collect();
blobs.sort_by_key(|(_, accessed, _)| *accessed);
let mut current_size = total_size;
let mut removed_count = 0;
for (blob_id, _, size) in blobs {
if current_size <= constants::MAX_BLOB_CACHE_SIZE_BYTES {
break;
}
let _removed = self.blob_cache.remove(&blob_id);
current_size = current_size.saturating_sub(size);
removed_count += 1;
}
if removed_count > 0 {
#[expect(
clippy::integer_division,
reason = "MB conversion for logging, precision not critical"
)]
let freed_mb = total_size.saturating_sub(current_size) / 1024 / 1024;
#[expect(
clippy::integer_division,
reason = "MB conversion for logging, precision not critical"
)]
let new_size_mb = current_size / 1024 / 1024;
tracing::debug!(
removed_count,
freed_mb,
new_size_mb,
"Evicted blobs to stay under memory limit"
);
}
}
let total_evicted = before_count.saturating_sub(self.blob_cache.len());
if total_evicted > 0 {
tracing::debug!(
total_evicted,
time_evicted = before_count.saturating_sub(after_time_eviction),
count_evicted = after_time_eviction.saturating_sub(after_count_eviction),
memory_evicted = after_count_eviction.saturating_sub(self.blob_cache.len()),
remaining_count = self.blob_cache.len(),
"Blob cache eviction completed"
);
}
}
}