use std::collections::{BTreeMap, HashMap};
use std::sync::Mutex;
use std::time::{Duration, Instant};
use actix::{Actor, AsyncContext, Context, Handler, Message};
use calimero_context::group_store::NamespaceRepository;
use calimero_context_client::local_governance::{NamespaceTopicMsg, SignedMigrationHeartbeat};
use calimero_node_primitives::client::NodeClient;
use calimero_node_primitives::messages::MigrationStatusReport;
use calimero_node_primitives::sync::BroadcastMessage;
use calimero_primitives::identity::PublicKey;
use calimero_store::Store;
use zeroize::Zeroize;
pub const DEFAULT_HEARTBEAT_TTL: Duration = Duration::from_secs(60);
pub const MAX_HEARTBEAT_CLOCK_DRIFT_MS: u64 = 60_000;
#[derive(Debug, Clone)]
pub struct CacheEntry {
pub schema_version: u32,
pub residue_auto: u64,
pub residue_identity: u64,
pub synced_up_to_hlc: u64,
pub authored_remaining: u64,
pub ts_millis: u64,
pub received_at: Instant,
}
#[must_use]
pub fn cache_entry_to_report(entry: &CacheEntry) -> MigrationStatusReport {
MigrationStatusReport {
schema_version: entry.schema_version,
residue_auto: entry.residue_auto,
residue_identity: entry.residue_identity,
synced_up_to_hlc: entry.synced_up_to_hlc,
reported_at: entry.ts_millis,
authored_remaining: entry.authored_remaining,
}
}
#[derive(Debug, Default)]
pub struct MigrationStatusCache {
entries: Mutex<BTreeMap<([u8; 32], PublicKey), CacheEntry>>,
}
impl MigrationStatusCache {
fn entries_lock(
&self,
) -> std::sync::MutexGuard<'_, BTreeMap<([u8; 32], PublicKey), CacheEntry>> {
self.entries
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
pub fn insert(&self, hb: &SignedMigrationHeartbeat) {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if hb.ts_millis > now_ms.saturating_add(MAX_HEARTBEAT_CLOCK_DRIFT_MS) {
return;
}
let now = Instant::now();
let mut g = self.entries_lock();
let key = (hb.namespace_id, hb.peer_pubkey);
if let Some(existing) = g.get(&key) {
if hb.ts_millis < existing.ts_millis
|| (hb.ts_millis == existing.ts_millis
&& hb.synced_up_to_hlc <= existing.synced_up_to_hlc)
{
return;
}
}
let evict_window = Duration::from_millis(MAX_HEARTBEAT_CLOCK_DRIFT_MS.saturating_mul(2));
g.retain(|(ns, _), entry| {
*ns != hb.namespace_id || now.duration_since(entry.received_at) <= evict_window
});
let _ = g.insert(
key,
CacheEntry {
schema_version: hb.schema_version,
residue_auto: hb.residue_auto,
residue_identity: hb.residue_identity,
synced_up_to_hlc: hb.synced_up_to_hlc,
authored_remaining: hb.authored_remaining,
ts_millis: hb.ts_millis,
received_at: now,
},
);
}
pub fn fresh_peers(&self, ns: [u8; 32], ttl: Duration) -> Vec<(PublicKey, CacheEntry)> {
let g = self.entries_lock();
let now = Instant::now();
g.iter()
.filter(|((nns, _), e)| *nns == ns && now.duration_since(e.received_at) <= ttl)
.map(|((_, pk), e)| (*pk, e.clone()))
.collect()
}
pub fn migration_status_reports(
&self,
ns: [u8; 32],
ttl: Duration,
) -> BTreeMap<PublicKey, MigrationStatusReport> {
self.fresh_peers(ns, ttl)
.into_iter()
.map(|(pk, e)| (pk, cache_entry_to_report(&e)))
.collect()
}
pub fn peer_entry(&self, ns: [u8; 32], peer: PublicKey, ttl: Duration) -> Option<CacheEntry> {
let g = self.entries_lock();
let now = Instant::now();
g.get(&(ns, peer))
.filter(|e| now.duration_since(e.received_at) <= ttl)
.cloned()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MigrationFacts {
pub schema_version: u32,
pub residue_auto: u64,
pub residue_identity: u64,
pub synced_up_to_hlc: u64,
pub authored_remaining: u64,
}
#[must_use]
pub fn should_emit_on_change(last: Option<MigrationFacts>, current: MigrationFacts) -> bool {
match last {
None => true,
Some(prev) => {
prev.schema_version != current.schema_version
|| prev.residue_auto != current.residue_auto
|| prev.residue_identity != current.residue_identity
|| prev.authored_remaining != current.authored_remaining
}
}
}
pub fn record_facts_update(
last_emitted: &mut HashMap<[u8; 32], MigrationFacts>,
namespace_id: [u8; 32],
facts: MigrationFacts,
) -> bool {
let prior = last_emitted.get(&namespace_id).copied();
let emit = should_emit_on_change(prior, facts);
let _ = last_emitted.insert(namespace_id, facts);
emit
}
fn parse_major_version(version: &str) -> Option<u32> {
version
.split('.')
.next()
.and_then(|major| major.trim().parse::<u32>().ok())
}
fn derive_target_version(
datastore: &Store,
group_id: &calimero_context_config::types::ContextGroupId,
) -> u32 {
match calimero_context::group_store::UpgradesRepository::new(datastore)
.load(group_id)
.ok()
.flatten()
{
None => 0,
Some(record) => parse_major_version(&record.to_version).unwrap_or(u32::MAX),
}
}
fn loaded_context_version(
datastore: &Store,
context_id: &calimero_primitives::context::ContextId,
) -> Option<u32> {
let handle = datastore.handle();
let ctx_meta = handle
.get(&calimero_store::key::ContextMeta::new(*context_id))
.ok()
.flatten()?;
let app_meta = handle.get(&ctx_meta.application).ok().flatten()?;
parse_major_version(&app_meta.version)
}
#[must_use]
pub fn compute_namespace_migration_facts(
datastore: &Store,
namespace_id: [u8; 32],
) -> MigrationFacts {
let group_id = calimero_context_config::types::ContextGroupId::from(namespace_id);
let target_version = derive_target_version(datastore, &group_id);
let contexts = calimero_context::group_store::enumerate_group_contexts(
datastore,
&group_id,
0,
usize::MAX,
)
.unwrap_or_default();
let mut min_loaded: Option<u32> = None;
let mut residue_auto: u64 = 0;
let mut authored_remaining: u64 = 0;
for context_id in &contexts {
if let Ok(Some(entry)) =
datastore
.handle()
.get(&calimero_store::key::ContextAuthoredRemaining::new(
*context_id,
))
{
authored_remaining = authored_remaining.saturating_add(u64::from(entry.count));
}
let Some(loaded) = loaded_context_version(datastore, context_id) else {
continue;
};
min_loaded = Some(min_loaded.map_or(loaded, |m| m.min(loaded)));
if loaded < target_version {
residue_auto += 1;
}
}
let schema_version = match min_loaded {
Some(loaded) => loaded,
None if target_version == u32::MAX => 0,
None => target_version,
};
let residue_identity = residue_identity_count::<CommittedStateScan>(target_version);
MigrationFacts {
schema_version,
residue_auto,
residue_identity,
synced_up_to_hlc: 0,
authored_remaining,
}
}
#[must_use]
fn residue_identity_count<S>(target_version: u32) -> u64
where
S: calimero_storage::store::IterableStorage,
{
calimero_storage::index::Index::<S>::count_unconverted_identity_gated(target_version)
.map_or(0, |count| count as u64)
}
#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
struct CommittedStateScan;
impl calimero_storage::store::StorageAdaptor for CommittedStateScan {
fn storage_read(_key: calimero_storage::store::Key) -> Option<Vec<u8>> {
None
}
fn storage_remove(_key: calimero_storage::store::Key) -> bool {
false
}
fn storage_write(_key: calimero_storage::store::Key, _value: &[u8]) -> bool {
false
}
}
impl calimero_storage::store::IterableStorage for CommittedStateScan {
fn storage_iter_keys() -> Vec<calimero_storage::store::Key> {
Vec::new()
}
}
pub fn build_signed_heartbeat(
signer_sk: &calimero_primitives::identity::PrivateKey,
namespace_id: [u8; 32],
facts: MigrationFacts,
ts_millis: u64,
) -> Result<SignedMigrationHeartbeat, calimero_context_client::local_governance::GovernanceError> {
let peer_pubkey = signer_sk.public_key();
let mut hb = SignedMigrationHeartbeat {
namespace_id,
peer_pubkey,
schema_version: facts.schema_version,
residue_auto: facts.residue_auto,
residue_identity: facts.residue_identity,
synced_up_to_hlc: facts.synced_up_to_hlc,
authored_remaining: facts.authored_remaining,
ts_millis,
signature: [0u8; 64],
};
let signable = hb.signable_bytes()?;
let signature = signer_sk.sign(&signable)?.to_bytes();
hb.signature = signature;
Ok(hb)
}
pub const DEFAULT_EMIT_INTERVAL: Duration = Duration::from_secs(30);
pub struct MigrationEmitter {
pub node_client: NodeClient,
pub datastore: Store,
pub interval: Duration,
pub last_emitted: HashMap<[u8; 32], MigrationFacts>,
}
impl Actor for MigrationEmitter {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(self.interval, |this, _ctx| {
let ns_ids: Vec<[u8; 32]> = this.last_emitted.keys().copied().collect();
for ns_id in ns_ids {
let facts = compute_namespace_migration_facts(&this.datastore, ns_id);
let facts = this.refresh_hlc(ns_id, facts);
this.last_emitted.insert(ns_id, facts);
this.publish_heartbeat(ns_id, facts);
}
});
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct MigrationFactsUpdate {
pub namespace_id: [u8; 32],
pub facts: MigrationFacts,
}
impl Handler<MigrationFactsUpdate> for MigrationEmitter {
type Result = ();
fn handle(&mut self, msg: MigrationFactsUpdate, _ctx: &mut Self::Context) {
let facts = self.refresh_hlc(msg.namespace_id, msg.facts);
let last = self.last_emitted.get(&msg.namespace_id).copied();
if should_emit_on_change(last, facts) {
self.last_emitted.insert(msg.namespace_id, facts);
self.publish_heartbeat(msg.namespace_id, facts);
} else {
self.last_emitted.insert(msg.namespace_id, facts);
}
}
}
impl MigrationEmitter {
fn refresh_hlc(&self, ns_id: [u8; 32], mut facts: MigrationFacts) -> MigrationFacts {
let handle = self.datastore.handle();
let key = calimero_store::key::NamespaceGovHead::new(ns_id);
if let Ok(Some(head)) = handle.get(&key) {
facts.synced_up_to_hlc = head.sequence;
}
facts
}
fn publish_heartbeat(&self, ns_id: [u8; 32], facts: MigrationFacts) {
let group_id = calimero_context_config::types::ContextGroupId::from(ns_id);
let identity = match NamespaceRepository::new(&self.datastore).identity(&group_id) {
Ok(Some(id)) => id,
Ok(None) => return, Err(err) => {
tracing::debug!(?err, ?ns_id, "MigrationHeartbeat: identity load failed");
return;
}
};
let (_peer_pubkey, mut sk_bytes, mut sender_key) = identity;
sender_key.zeroize();
let ts_millis = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let signing_key = calimero_primitives::identity::PrivateKey::from(sk_bytes);
sk_bytes.zeroize();
let heartbeat = match build_signed_heartbeat(&signing_key, ns_id, facts, ts_millis) {
Ok(hb) => hb,
Err(err) => {
tracing::debug!(?err, "MigrationHeartbeat: sign failed");
return;
}
};
let topic = calimero_context::governance_broadcast::ns_topic(ns_id);
let inner = match borsh::to_vec(&NamespaceTopicMsg::MigrationHeartbeat(heartbeat)) {
Ok(b) => b,
Err(err) => {
tracing::debug!(?err, "MigrationHeartbeat: borsh encode (inner) failed");
return;
}
};
let envelope = BroadcastMessage::NamespaceGovernanceDelta {
namespace_id: ns_id,
delta_id: [0u8; 32],
parent_ids: Vec::new(),
payload: inner,
};
let bytes = match borsh::to_vec(&envelope) {
Ok(b) => b,
Err(err) => {
tracing::debug!(?err, "MigrationHeartbeat: borsh encode (envelope) failed");
return;
}
};
let net = self.node_client.network_client().clone();
let log_ns = ns_id;
let log_schema = facts.schema_version;
let log_residue = facts.residue_identity;
let log_authored = facts.authored_remaining;
actix::spawn(async move {
match net.publish(topic, bytes).await {
Ok(_) => tracing::debug!(
namespace_id = %hex::encode(log_ns),
schema_version = log_schema,
residue_identity = log_residue,
authored_remaining = log_authored,
"migration heartbeat emitted"
),
Err(err) => {
tracing::debug!(?err, "MigrationHeartbeat publish failed (non-fatal)");
}
}
});
}
}
#[cfg(test)]
mod tests {
use calimero_context_client::local_governance::wire::{
SignableMigrationHeartbeat, MIGRATION_HEARTBEAT_SIGN_DOMAIN,
};
use calimero_primitives::identity::PrivateKey;
use super::*;
const NS: [u8; 32] = [42u8; 32];
fn signed_hb(
sk: &PrivateKey,
ns: [u8; 32],
schema_version: u32,
residue_auto: u64,
residue_identity: u64,
ts_millis: u64,
) -> SignedMigrationHeartbeat {
let peer_pubkey = sk.public_key();
let body = SignableMigrationHeartbeat {
namespace_id: ns,
peer_pubkey,
schema_version,
residue_auto,
residue_identity,
synced_up_to_hlc: 0,
ts_millis,
};
let mut signable = Vec::new();
signable.extend_from_slice(MIGRATION_HEARTBEAT_SIGN_DOMAIN);
signable.extend_from_slice(&borsh::to_vec(&body).unwrap());
let signature = sk.sign(&signable).unwrap().to_bytes();
SignedMigrationHeartbeat {
namespace_id: ns,
peer_pubkey,
schema_version,
residue_auto,
residue_identity,
synced_up_to_hlc: 0,
ts_millis,
signature,
authored_remaining: 0,
}
}
#[test]
fn verified_heartbeat_is_cached_and_readable() {
let cache = MigrationStatusCache::default();
let sk = PrivateKey::random(&mut rand::thread_rng());
let hb = signed_hb(&sk, NS, 2, 0, 0, 0);
assert!(hb.verify_signature().is_ok());
cache.insert(&hb);
let fresh = cache.fresh_peers(NS, DEFAULT_HEARTBEAT_TTL);
assert_eq!(fresh.len(), 1, "verified heartbeat must be cached");
let entry = cache
.peer_entry(NS, sk.public_key(), DEFAULT_HEARTBEAT_TTL)
.expect("entry readable by (ns, peer)");
assert_eq!(entry.schema_version, 2);
assert_eq!(entry.residue_identity, 0);
}
#[test]
fn migration_status_reports_projects_fresh_entries() {
let cache = MigrationStatusCache::default();
let sk = PrivateKey::random(&mut rand::thread_rng());
let hb = signed_hb(&sk, NS, 2, 1, 3, 7);
cache.insert(&hb);
let reports = cache.migration_status_reports(NS, DEFAULT_HEARTBEAT_TTL);
assert_eq!(reports.len(), 1, "fresh entry must project into a report");
let report = reports
.get(&sk.public_key())
.copied()
.expect("report keyed by peer pubkey");
assert_eq!(report.schema_version, 2);
assert_eq!(report.residue_auto, 1);
assert_eq!(report.residue_identity, 3);
assert_eq!(report.reported_at, 7, "ts_millis projects to reported_at");
}
#[test]
fn migration_status_reports_excludes_other_namespaces() {
let cache = MigrationStatusCache::default();
let sk = PrivateKey::random(&mut rand::thread_rng());
let other_ns = [7u8; 32];
cache.insert(&signed_hb(&sk, other_ns, 2, 0, 0, 0));
let reports = cache.migration_status_reports(NS, DEFAULT_HEARTBEAT_TTL);
assert!(
reports.is_empty(),
"an other-namespace heartbeat must not appear in this namespace's reports"
);
}
#[test]
fn wire_verify_signature_rejects_field_substitution() {
let sk = PrivateKey::random(&mut rand::thread_rng());
let mut hb = signed_hb(&sk, NS, 2, 0, 5, 0);
assert!(hb.verify_signature().is_ok());
hb.residue_identity = 0; assert!(
hb.verify_signature().is_err(),
"verify_signature must reject a mutated residue_identity"
);
}
#[test]
fn stale_entry_is_filtered_after_ttl() {
let cache = MigrationStatusCache::default();
let sk = PrivateKey::random(&mut rand::thread_rng());
cache.insert(&signed_hb(&sk, NS, 2, 0, 0, 0));
std::thread::sleep(Duration::from_millis(10));
assert!(
cache.fresh_peers(NS, Duration::from_millis(5)).is_empty(),
"entry past TTL must not be fresh"
);
assert!(
cache
.peer_entry(NS, sk.public_key(), Duration::from_millis(5))
.is_none(),
"stale (ns, peer) lookup must report unknown (None)"
);
}
#[test]
fn insert_drops_stale_heartbeat_from_same_peer() {
let cache = MigrationStatusCache::default();
let sk = PrivateKey::random(&mut rand::thread_rng());
let mut fresh = signed_hb(&sk, NS, 2, 0, 0, 2000);
fresh.residue_identity = 0;
let mut stale = signed_hb(&sk, NS, 1, 0, 9, 1000);
stale.residue_identity = 9;
cache.insert(&fresh);
cache.insert(&stale); let entry = cache
.peer_entry(NS, sk.public_key(), DEFAULT_HEARTBEAT_TTL)
.unwrap();
assert_eq!(
entry.schema_version, 2,
"stale heartbeat must not overwrite fresher entry from same peer"
);
assert_eq!(entry.residue_identity, 0);
}
#[test]
fn insert_accepts_newer_heartbeat_from_same_peer() {
let cache = MigrationStatusCache::default();
let sk = PrivateKey::random(&mut rand::thread_rng());
let older = signed_hb(&sk, NS, 1, 0, 9, 1000);
let newer = signed_hb(&sk, NS, 2, 0, 0, 2000);
cache.insert(&older);
cache.insert(&newer);
let entry = cache
.peer_entry(NS, sk.public_key(), DEFAULT_HEARTBEAT_TTL)
.unwrap();
assert_eq!(
entry.schema_version, 2,
"newer heartbeat must replace older"
);
assert_eq!(entry.residue_identity, 0);
}
#[test]
fn insert_rejects_far_future_ts_millis() {
let cache = MigrationStatusCache::default();
let sk = PrivateKey::random(&mut rand::thread_rng());
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let poison = signed_hb(&sk, NS, 9, 0, 0, now_ms + 600_000);
cache.insert(&poison);
assert!(
cache.fresh_peers(NS, DEFAULT_HEARTBEAT_TTL).is_empty(),
"far-future heartbeat must be rejected to prevent cache poisoning"
);
let legit = signed_hb(&sk, NS, 2, 0, 0, now_ms);
cache.insert(&legit);
let entry = cache
.peer_entry(NS, sk.public_key(), DEFAULT_HEARTBEAT_TTL)
.unwrap();
assert_eq!(entry.schema_version, 2);
}
#[test]
fn on_change_emit_fires_when_residue_changes() {
let base = MigrationFacts {
schema_version: 2,
residue_auto: 0,
residue_identity: 4,
synced_up_to_hlc: 10,
authored_remaining: 0,
};
assert!(should_emit_on_change(None, base));
let drained = MigrationFacts {
residue_identity: 0,
..base
};
assert!(
should_emit_on_change(Some(base), drained),
"residue_identity change must edge-trigger an emit"
);
let auto_changed = MigrationFacts {
residue_auto: 1,
..base
};
assert!(should_emit_on_change(Some(base), auto_changed));
let bumped = MigrationFacts {
schema_version: 3,
..base
};
assert!(should_emit_on_change(Some(base), bumped));
}
#[test]
fn on_change_emit_suppressed_when_only_hlc_advances() {
let prev = MigrationFacts {
schema_version: 2,
residue_auto: 0,
residue_identity: 0,
synced_up_to_hlc: 10,
authored_remaining: 0,
};
let advanced = MigrationFacts {
synced_up_to_hlc: 99,
..prev
};
assert!(
!should_emit_on_change(Some(prev), advanced),
"an HLC-only advance must not edge-trigger an emit"
);
}
#[test]
fn record_facts_update_seeds_namespace_and_reports_edge() {
let mut last_emitted: HashMap<[u8; 32], MigrationFacts> = HashMap::new();
let facts = MigrationFacts {
schema_version: 2,
residue_auto: 0,
residue_identity: 3,
synced_up_to_hlc: 10,
authored_remaining: 0,
};
let emit = record_facts_update(&mut last_emitted, NS, facts);
assert!(emit, "first-ever facts for a namespace must edge-trigger");
assert_eq!(
last_emitted.get(&NS).copied(),
Some(facts),
"first update must seed last_emitted so the periodic tick has work \
(the dead-empty-map regression)"
);
let advanced = MigrationFacts {
synced_up_to_hlc: 99,
..facts
};
let emit = record_facts_update(&mut last_emitted, NS, advanced);
assert!(!emit, "HLC-only advance must not edge-trigger");
assert_eq!(
last_emitted.get(&NS).copied(),
Some(advanced),
"carry-forward value must still update on a non-edge"
);
let drained = MigrationFacts {
residue_identity: 0,
..advanced
};
let emit = record_facts_update(&mut last_emitted, NS, drained);
assert!(emit, "residue drop must edge-trigger");
}
#[test]
fn facts_for_namespace_reads_target_from_upgrade_record() {
use calimero_context::group_store::UpgradesRepository;
use calimero_context_config::types::ContextGroupId;
use calimero_store::db::InMemoryDB;
use calimero_store::key::{GroupUpgradeStatus, GroupUpgradeValue};
use std::sync::Arc;
let store = Store::new(Arc::new(InMemoryDB::owned()));
let ns = [0x55u8; 32];
let facts = compute_namespace_migration_facts(&store, ns);
assert_eq!(
facts.schema_version, 0,
"no upgrade record => baseline schema version 0"
);
assert_eq!(facts.residue_auto, 0);
assert_eq!(facts.residue_identity, 0);
UpgradesRepository::new(&store)
.save(
&ContextGroupId::from(ns),
&GroupUpgradeValue {
from_version: "1".to_owned(),
to_version: "2".to_owned(),
migration: None,
initiated_at: 0,
initiated_by: PrivateKey::random(&mut rand::thread_rng()).public_key(),
status: GroupUpgradeStatus::Completed { completed_at: None },
cascade_hlc: None,
cascade_seq: None,
},
)
.unwrap();
let facts = compute_namespace_migration_facts(&store, ns);
assert_eq!(
facts.schema_version, 2,
"an upgrade record targeting v2 must advertise schema_version 2"
);
}
fn install_loaded_context(store: &Store, ns: [u8; 32], ctx: [u8; 32], version: &str) {
use calimero_primitives::application::ApplicationId;
use calimero_store::key::{
ApplicationMeta as ApplicationMetaKey, ContextMeta as ContextMetaKey,
};
use calimero_store::types::{ApplicationMeta, ContextMeta};
let app_id = ApplicationId::from(ctx); let blob = calimero_store::key::BlobMeta::new(calimero_primitives::blobs::BlobId::from(
[0x9Au8; 32],
));
let app_meta = ApplicationMeta::new(
blob,
1,
"test://loaded".to_owned().into_boxed_str(),
Box::new([]),
blob,
"loaded-test-pkg".to_owned().into_boxed_str(),
version.to_owned().into_boxed_str(),
"loaded-test-signer".to_owned().into_boxed_str(),
);
let mut handle = store.handle();
handle
.put(&ApplicationMetaKey::new(app_id), &app_meta)
.expect("put ApplicationMeta");
handle
.put(
&ContextMetaKey::new(ctx.into()),
&ContextMeta::new(
ApplicationMetaKey::new(app_id),
[0x01; 32],
Vec::new(),
None,
),
)
.expect("put ContextMeta");
calimero_context::group_store::register_context_in_group(
store,
&calimero_context_config::types::ContextGroupId::from(ns),
&ctx.into(),
)
.expect("register context in group");
}
#[test]
fn facts_report_loaded_version_not_target_when_binary_behind() {
use calimero_context::group_store::UpgradesRepository;
use calimero_context_config::types::ContextGroupId;
use calimero_store::db::InMemoryDB;
use calimero_store::key::{GroupUpgradeStatus, GroupUpgradeValue};
use std::sync::Arc;
let store = Store::new(Arc::new(InMemoryDB::owned()));
let ns = [0x77u8; 32];
UpgradesRepository::new(&store)
.save(
&ContextGroupId::from(ns),
&GroupUpgradeValue {
from_version: "1".to_owned(),
to_version: "2".to_owned(),
migration: None,
initiated_at: 0,
initiated_by: PrivateKey::random(&mut rand::thread_rng()).public_key(),
status: GroupUpgradeStatus::InProgress {
total: 1,
completed: 0,
failed: 0,
},
cascade_hlc: None,
cascade_seq: None,
},
)
.unwrap();
install_loaded_context(&store, ns, [0xC1u8; 32], "1.0.0");
let facts = compute_namespace_migration_facts(&store, ns);
assert_eq!(
facts.schema_version, 1,
"a node whose loaded binary is behind must report the LOADED (v1) \
version, not the migration target (v2)"
);
assert_eq!(
facts.residue_auto, 1,
"a context whose loaded version trails the target is unconverted \
(residue_auto), keeping all_migrated false"
);
}
#[test]
fn facts_report_target_and_zero_residue_once_binary_swapped() {
use calimero_context::group_store::UpgradesRepository;
use calimero_context_config::types::ContextGroupId;
use calimero_store::db::InMemoryDB;
use calimero_store::key::{GroupUpgradeStatus, GroupUpgradeValue};
use std::sync::Arc;
let store = Store::new(Arc::new(InMemoryDB::owned()));
let ns = [0x78u8; 32];
UpgradesRepository::new(&store)
.save(
&ContextGroupId::from(ns),
&GroupUpgradeValue {
from_version: "1".to_owned(),
to_version: "2".to_owned(),
migration: None,
initiated_at: 0,
initiated_by: PrivateKey::random(&mut rand::thread_rng()).public_key(),
status: GroupUpgradeStatus::Completed { completed_at: None },
cascade_hlc: None,
cascade_seq: None,
},
)
.unwrap();
install_loaded_context(&store, ns, [0xC2u8; 32], "2.0.0");
let facts = compute_namespace_migration_facts(&store, ns);
assert_eq!(facts.schema_version, 2, "loaded binary at v2 reports v2");
assert_eq!(
facts.residue_auto, 0,
"a context at the target version contributes no residue_auto"
);
}
#[test]
fn built_heartbeat_verifies_and_carries_facts() {
let sk = PrivateKey::random(&mut rand::thread_rng());
let facts = MigrationFacts {
schema_version: 2,
residue_auto: 3,
residue_identity: 1,
synced_up_to_hlc: 77,
authored_remaining: 0,
};
let hb = build_signed_heartbeat(&sk, NS, facts, 1234).expect("sign");
assert!(hb.verify_signature().is_ok());
assert_eq!(hb.peer_pubkey, sk.public_key());
assert_eq!(hb.schema_version, 2);
assert_eq!(hb.residue_auto, 3);
assert_eq!(hb.residue_identity, 1);
assert_eq!(hb.synced_up_to_hlc, 77);
let cache = MigrationStatusCache::default();
cache.insert(&hb);
let entry = cache
.peer_entry(NS, sk.public_key(), DEFAULT_HEARTBEAT_TTL)
.expect("built heartbeat is cacheable");
assert_eq!(entry.residue_identity, 1);
}
#[test]
fn residue_identity_count_invokes_the_scan() {
use calimero_storage::address::Id;
use calimero_storage::entities::{ChildInfo, Metadata, StorageType};
use calimero_storage::index::Index;
use calimero_storage::store::MockedStorage;
type S = MockedStorage<7200>;
let owner = PublicKey::from([0xAAu8; 32]);
let seed_user = |id: Id, schema: Option<u32>| {
let mut md = Metadata::new(1, 1);
md.storage_type = StorageType::User {
owner,
signature_data: None,
};
md.schema_version = schema;
<Index<S>>::add_root(ChildInfo::new(id, [0u8; 32], md)).expect("seed user entry");
};
seed_user(Id::new([1; 32]), None);
seed_user(Id::new([2; 32]), Some(1));
seed_user(Id::new([3; 32]), Some(2));
let mut public_md = Metadata::new(1, 1);
public_md.storage_type = StorageType::Public;
<Index<S>>::add_root(ChildInfo::new(Id::new([4; 32]), [0u8; 32], public_md))
.expect("seed public entry");
assert_eq!(
residue_identity_count::<S>(2),
2,
"residue_identity must INVOKE the scan and count only stale \
identity-gated entries"
);
}
}