use std::collections::BTreeMap;
use calimero_context_config::types::ContextGroupId;
use calimero_governance_store::op_events::{self, OpEvent};
use calimero_network_primitives::client::NetworkClient;
use calimero_store::key::Generic as GenericKey;
use calimero_store::slice::Slice;
use calimero_store::types::GenericData;
use calimero_store::Store;
use libp2p::PeerId;
use tracing::{debug, info, warn};
use crate::peer_identity_cache::{PeerIdentityCache, PeerScoreTier, PEER_IDENTITY_TTL_SECS};
use crate::state::{now_unix_secs, NodeState};
const SNAPSHOT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
fn store_key() -> GenericKey {
GenericKey::new(*b"calimero-idpeers", [0u8; 32])
}
pub(crate) fn persist(state: &NodeState, store: &Store) {
let blob = state
.lock_peer_identity_cache()
.to_persisted_all(now_unix_secs(), PEER_IDENTITY_TTL_SECS);
if blob.groups.is_empty() {
return;
}
let bytes = match serde_json::to_vec(&blob) {
Ok(bytes) => bytes,
Err(err) => {
debug!(?err, "failed to serialize peer-identity cache");
return;
}
};
let data = GenericData::from(Slice::from(bytes));
let mut handle = store.handle();
if let Err(err) = handle.put(&store_key(), &data) {
debug!(?err, "failed to persist peer-identity cache to store");
}
}
pub(crate) fn hydrate(state: &NodeState, store: &Store) {
let now = now_unix_secs();
let blob = match store.handle().get(&store_key()) {
Ok(Some(data)) => match serde_json::from_slice(data.as_ref()) {
Ok(blob) => blob,
Err(err) => {
warn!(?err, "ignoring corrupt peer-identity cache blob in store");
return;
}
},
Ok(None) => return, Err(err) => {
debug!(?err, "failed to read peer-identity cache from store");
return;
}
};
let cache = PeerIdentityCache::load_all_from_persisted(blob, now, PEER_IDENTITY_TTL_SECS);
let pairs = cache.all_peer_identity_pairs();
let pair_count = pairs.len();
*state.lock_peer_identity_cache() = cache;
for (peer, identity) in pairs {
let _ = state
.peer_identities
.entry(peer)
.or_default()
.insert(identity);
}
if pair_count > 0 {
info!(
pair_count,
"hydrated peer-identity cache from store for cold-start member selection"
);
}
}
pub(crate) fn spawn_snapshot_tick(
state: NodeState,
store: Store,
network: NetworkClient,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(SNAPSHOT_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let _ = interval.tick().await;
let mut ticks: u64 = 0;
loop {
let _ = interval.tick().await;
persist(&state, &store);
ticks = ticks.wrapping_add(1);
let force_full = ticks % SCORE_FULL_RESYNC_TICKS == 0;
reconcile_peer_scores(&state, &network, force_full);
}
})
}
const SCORE_FULL_RESYNC_TICKS: u64 = 20;
fn desired_score_tiers(
cache: &PeerIdentityCache,
now_secs: u64,
ttl_secs: u64,
) -> BTreeMap<PeerId, PeerScoreTier> {
let mut desired: BTreeMap<PeerId, PeerScoreTier> = BTreeMap::new();
for group in cache.groups() {
for member in cache.members_for_group(group, now_secs, ttl_secs) {
let tier = PeerScoreTier::from_role(&member.role);
for peer in member.peers {
let entry = desired.entry(peer).or_insert(tier);
*entry = (*entry).max(tier);
}
}
}
desired
}
fn compute_score_updates(
desired: &BTreeMap<PeerId, PeerScoreTier>,
tracker: &BTreeMap<PeerId, PeerScoreTier>,
force_full: bool,
) -> (Vec<(PeerId, PeerScoreTier)>, Vec<PeerId>) {
let pushes = desired
.iter()
.filter(|(peer, tier)| force_full || tracker.get(peer) != Some(tier))
.map(|(peer, tier)| (*peer, *tier))
.collect();
let clears = tracker
.keys()
.filter(|peer| !desired.contains_key(peer))
.copied()
.collect();
(pushes, clears)
}
pub(crate) fn reconcile_peer_scores(state: &NodeState, network: &NetworkClient, force_full: bool) {
let now = now_unix_secs();
let desired = {
let cache = state.lock_peer_identity_cache();
desired_score_tiers(&cache, now, PEER_IDENTITY_TTL_SECS)
};
let (pushes, clears) = {
let mut tracker = state.lock_peer_scores();
let (pushes, clears) = compute_score_updates(&desired, &tracker, force_full);
for peer in &clears {
let _ = tracker.remove(peer);
}
for (peer, tier) in &pushes {
let _ = tracker.insert(*peer, *tier);
}
(pushes, clears)
};
for (peer, tier) in pushes {
network.set_peer_score(peer, tier.score());
}
for peer in clears {
network.set_peer_score(peer, 0.0);
}
}
fn apply_invalidation_event(state: &NodeState, event: &OpEvent) {
if let OpEvent::MemberRemoved { group_id, member } = event {
state
.lock_peer_identity_cache()
.remove_member(&ContextGroupId::from(*group_id), member);
debug!(
group_id = %hex::encode(group_id),
%member,
"dropped removed member from peer-identity cache"
);
}
}
pub(crate) fn spawn_invalidation_task(state: NodeState) -> tokio::task::JoinHandle<()> {
let mut rx = op_events::subscribe();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => apply_invalidation_event(&state, &event),
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
warn!(
skipped,
"peer-identity invalidation subscriber lagged; missed MemberRemoved \
events age out via TTL and are re-derived on restart"
);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use calimero_context_config::types::ContextGroupId;
use calimero_primitives::context::GroupMemberRole;
use calimero_primitives::identity::PublicKey;
use calimero_store::db::InMemoryDB;
use libp2p::PeerId;
use super::*;
use crate::peer_identity_cache::ObservedMembership;
use crate::run::NodeMode;
fn store() -> Store {
Store::new(Arc::new(InMemoryDB::owned()))
}
#[test]
fn compute_score_updates_diffs_desired_against_tracker() {
let mut cache = PeerIdentityCache::default();
let group = ContextGroupId::from([1u8; 32]);
let admin_peer = PeerId::random();
let member_peer = PeerId::random();
cache.record(
group,
PublicKey::from([1u8; 32]),
admin_peer,
GroupMemberRole::Admin,
100,
);
cache.record(
group,
PublicKey::from([2u8; 32]),
member_peer,
GroupMemberRole::Member,
100,
);
let desired = desired_score_tiers(&cache, 100, 1000);
let empty = BTreeMap::new();
let (pushes, clears) = compute_score_updates(&desired, &empty, false);
let pushed: BTreeMap<_, _> = pushes.into_iter().collect();
assert_eq!(pushed.get(&admin_peer), Some(&PeerScoreTier::Anchor));
assert_eq!(pushed.get(&member_peer), Some(&PeerScoreTier::Member));
assert!(clears.is_empty());
let matched = BTreeMap::from([
(admin_peer, PeerScoreTier::Anchor),
(member_peer, PeerScoreTier::Member),
]);
let (pushes, clears) = compute_score_updates(&desired, &matched, false);
assert!(pushes.is_empty(), "unchanged tiers produce no push");
assert!(clears.is_empty());
let (pushes, _) = compute_score_updates(&desired, &matched, true);
assert_eq!(pushes.len(), 2, "force_full re-pushes all desired peers");
let stranger = PeerId::random();
let mut stale = matched.clone();
let _ = stale.insert(stranger, PeerScoreTier::Member);
let (pushes, clears) = compute_score_updates(&desired, &stale, false);
assert!(pushes.is_empty());
assert_eq!(clears, vec![stranger], "dropped peer is cleared");
}
#[test]
fn persist_then_hydrate_round_trips_through_store() {
let store = store();
let group = ContextGroupId::from([7u8; 32]);
let identity = PublicKey::from([9u8; 32]);
let peer = PeerId::random();
let state = NodeState::new(false, NodeMode::Standard);
state.observe_peer_identity(
peer,
identity,
Some(ObservedMembership {
group_id: group,
role: GroupMemberRole::Admin,
}),
);
persist(&state, &store);
let restored = NodeState::new(false, NodeMode::Standard);
assert!(restored.peer_identities.is_empty(), "starts empty");
hydrate(&restored, &store);
assert!(
restored
.peer_identities
.get(&peer)
.is_some_and(|ids| ids.contains(&identity)),
"reverse view hydrated"
);
let members = restored.lock_peer_identity_cache().members_for_group(
&group,
now_unix_secs(),
PEER_IDENTITY_TTL_SECS,
);
assert_eq!(members.len(), 1);
assert_eq!(members[0].identity, identity);
assert_eq!(members[0].role, GroupMemberRole::Admin);
assert_eq!(members[0].peers, vec![peer]);
}
#[test]
fn member_removed_event_drops_cached_member() {
let state = NodeState::new(false, NodeMode::Standard);
let group = ContextGroupId::from([7u8; 32]);
let member = PublicKey::from([9u8; 32]);
let peer = PeerId::random();
state.observe_peer_identity(
peer,
member,
Some(ObservedMembership {
group_id: group,
role: GroupMemberRole::Admin,
}),
);
let cached = |s: &NodeState| {
!s.lock_peer_identity_cache()
.members_for_group(&group, now_unix_secs(), PEER_IDENTITY_TTL_SECS)
.is_empty()
};
assert!(cached(&state), "seeded");
apply_invalidation_event(
&state,
&OpEvent::MemberRemoved {
group_id: [7u8; 32],
member,
},
);
assert!(!cached(&state), "MemberRemoved dropped the cached member");
assert!(
state
.peer_identities
.get(&peer)
.is_some_and(|ids| ids.contains(&member)),
"reverse view deliberately retained after MemberRemoved"
);
}
#[test]
fn hydrate_with_no_blob_is_a_noop() {
let store = store();
let state = NodeState::new(false, NodeMode::Standard);
hydrate(&state, &store);
assert!(state.peer_identities.is_empty());
assert_eq!(state.lock_peer_identity_cache().groups().count(), 0);
}
#[test]
fn observation_without_membership_does_not_persist() {
let store = store();
let state = NodeState::new(false, NodeMode::Standard);
state.observe_peer_identity(PeerId::random(), PublicKey::from([1u8; 32]), None);
persist(&state, &store);
let restored = NodeState::new(false, NodeMode::Standard);
hydrate(&restored, &store);
assert_eq!(restored.lock_peer_identity_cache().groups().count(), 0);
assert!(restored.peer_identities.is_empty());
}
}