use std::collections::{BTreeSet, HashMap, VecDeque};
use std::time::Duration;
use calimero_context_config::types::ContextGroupId;
use calimero_node_primitives::delta_buffer::BufferedDelta;
use calimero_primitives::context::{ContextId, GroupMemberRole};
use calimero_primitives::identity::PublicKey;
use libp2p::PeerId;
use parking_lot::Mutex;
use super::state_access::SyncStateAccess;
use crate::delta_store::DeltaStore;
use crate::sync::reconciler::reconcile_cooldown;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum SyncStateAccessCall {
DeltaStore(ContextId),
GetOrRegisterDeltaStore {
context_id: ContextId,
created: bool,
},
EndSyncSession(ContextId),
CancelSyncSession(ContextId),
PeerIdentities(PeerId),
CachedMemberPeersForGroup(ContextGroupId),
ReconcileRemainingCooldown(ContextId),
RecordReconcileSuccess(ContextId),
RecordReconcileFailure(ContextId),
}
#[derive(Default)]
pub(crate) struct MockSyncStateAccess {
delta_stores: Mutex<HashMap<ContextId, DeltaStore>>,
peer_identities: Mutex<HashMap<PeerId, BTreeSet<PublicKey>>>,
member_peers: Mutex<HashMap<ContextGroupId, Vec<(PeerId, GroupMemberRole)>>>,
end_sync_session_responses: Mutex<HashMap<ContextId, VecDeque<Option<Vec<BufferedDelta>>>>>,
reconcile_cooldowns: Mutex<HashMap<ContextId, (Duration, u32)>>,
failure_counts: Mutex<HashMap<ContextId, u32>>,
calls: Mutex<Vec<SyncStateAccessCall>>,
}
impl MockSyncStateAccess {
pub(crate) fn insert_delta_store(&self, context_id: ContextId, store: DeltaStore) {
let _replaced = self.delta_stores.lock().insert(context_id, store);
}
pub(crate) fn insert_member_peers(
&self,
group: ContextGroupId,
peers: Vec<(PeerId, GroupMemberRole)>,
) {
let _replaced = self.member_peers.lock().insert(group, peers);
}
pub(crate) fn insert_peer_identities(&self, peer: PeerId, ids: BTreeSet<PublicKey>) {
let _replaced = self.peer_identities.lock().insert(peer, ids);
}
pub(crate) fn push_end_sync_session_response(
&self,
context_id: ContextId,
response: Option<Vec<BufferedDelta>>,
) {
self.end_sync_session_responses
.lock()
.entry(context_id)
.or_default()
.push_back(response);
}
pub(crate) fn set_reconcile_cooldown(
&self,
context_id: ContextId,
cooldown: Duration,
consecutive_failures: u32,
) {
let _replaced = self
.reconcile_cooldowns
.lock()
.insert(context_id, (cooldown, consecutive_failures));
}
pub(crate) fn calls(&self) -> Vec<SyncStateAccessCall> {
self.calls.lock().clone()
}
}
impl SyncStateAccess for MockSyncStateAccess {
fn delta_store(&self, context_id: &ContextId) -> Option<DeltaStore> {
self.calls
.lock()
.push(SyncStateAccessCall::DeltaStore(*context_id));
self.delta_stores.lock().get(context_id).cloned()
}
fn get_or_register_delta_store(
&self,
context_id: ContextId,
factory: Box<dyn FnOnce() -> DeltaStore + Send>,
) -> (DeltaStore, bool) {
let mut stores = self.delta_stores.lock();
match stores.get(&context_id) {
Some(existing) => {
let store = existing.clone();
self.calls
.lock()
.push(SyncStateAccessCall::GetOrRegisterDeltaStore {
context_id,
created: false,
});
drop(stores);
(store, false)
}
None => {
let store = factory();
let _replaced = stores.insert(context_id, store.clone());
self.calls
.lock()
.push(SyncStateAccessCall::GetOrRegisterDeltaStore {
context_id,
created: true,
});
drop(stores);
(store, true)
}
}
}
fn end_sync_session(&self, context_id: &ContextId) -> Option<Vec<BufferedDelta>> {
self.calls
.lock()
.push(SyncStateAccessCall::EndSyncSession(*context_id));
self.end_sync_session_responses
.lock()
.get_mut(context_id)
.and_then(|q| q.pop_front())
.unwrap_or(None)
}
fn cancel_sync_session(&self, context_id: &ContextId) {
self.calls
.lock()
.push(SyncStateAccessCall::CancelSyncSession(*context_id));
}
fn peer_identities(&self, peer_id: &PeerId) -> Option<BTreeSet<PublicKey>> {
self.calls
.lock()
.push(SyncStateAccessCall::PeerIdentities(*peer_id));
self.peer_identities.lock().get(peer_id).cloned()
}
fn cached_member_peers_for_group(
&self,
group: &ContextGroupId,
) -> Vec<(PeerId, GroupMemberRole)> {
self.calls
.lock()
.push(SyncStateAccessCall::CachedMemberPeersForGroup(*group));
self.member_peers
.lock()
.get(group)
.cloned()
.unwrap_or_default()
}
fn reconcile_remaining_cooldown(&self, context_id: &ContextId) -> Option<(Duration, u32)> {
self.calls
.lock()
.push(SyncStateAccessCall::ReconcileRemainingCooldown(*context_id));
self.reconcile_cooldowns.lock().get(context_id).copied()
}
fn record_reconcile_success(&self, context_id: &ContextId) {
self.calls
.lock()
.push(SyncStateAccessCall::RecordReconcileSuccess(*context_id));
let _ = self.reconcile_cooldowns.lock().remove(context_id);
let _ = self.failure_counts.lock().remove(context_id);
}
fn record_reconcile_failure(&self, context_id: ContextId) -> u32 {
self.calls
.lock()
.push(SyncStateAccessCall::RecordReconcileFailure(context_id));
let mut counts = self.failure_counts.lock();
let entry = counts.entry(context_id).or_insert(0);
*entry = entry.saturating_add(1);
let failures = *entry;
drop(counts);
let cooldown = reconcile_cooldown(failures);
let _replaced = self
.reconcile_cooldowns
.lock()
.insert(context_id, (cooldown, failures));
failures
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ctx(byte: u8) -> ContextId {
ContextId::from([byte; 32])
}
#[test]
fn cached_member_peers_returns_injected_and_records_call() {
let mock = MockSyncStateAccess::default();
let group = ContextGroupId::from([1u8; 32]);
let peer = PeerId::random();
mock.insert_member_peers(group, vec![(peer, GroupMemberRole::Admin)]);
assert_eq!(
mock.cached_member_peers_for_group(&group),
vec![(peer, GroupMemberRole::Admin)]
);
assert!(mock
.cached_member_peers_for_group(&ContextGroupId::from([2u8; 32]))
.is_empty());
assert!(mock
.calls()
.iter()
.any(|c| matches!(c, SyncStateAccessCall::CachedMemberPeersForGroup(_))));
}
#[test]
fn defaults_return_none_or_zero_and_calls_are_recorded() {
let mock = MockSyncStateAccess::default();
assert!(mock.delta_store(&ctx(1)).is_none());
assert!(mock.end_sync_session(&ctx(1)).is_none());
assert!(mock.peer_identities(&PeerId::random()).is_none());
assert!(mock.reconcile_remaining_cooldown(&ctx(2)).is_none());
assert_eq!(mock.record_reconcile_failure(ctx(2)), 1);
assert!(mock.reconcile_remaining_cooldown(&ctx(2)).is_some());
assert_eq!(mock.record_reconcile_failure(ctx(2)), 2);
assert_eq!(mock.record_reconcile_failure(ctx(3)), 1);
mock.record_reconcile_success(&ctx(2));
assert!(mock.reconcile_remaining_cooldown(&ctx(2)).is_none());
assert_eq!(mock.record_reconcile_failure(ctx(2)), 1);
let calls = mock.calls();
assert!(matches!(
calls.first(),
Some(SyncStateAccessCall::DeltaStore(_))
));
}
#[test]
fn end_sync_session_responses_pop_per_context_fifo() {
let mock = MockSyncStateAccess::default();
mock.push_end_sync_session_response(ctx(1), Some(vec![]));
mock.push_end_sync_session_response(ctx(1), None);
assert!(mock.end_sync_session(&ctx(1)).is_some());
assert!(mock.end_sync_session(&ctx(1)).is_none());
assert!(mock.end_sync_session(&ctx(1)).is_none());
}
}