use calimero_context::group_store::{
MembershipRepository, MetaRepository, MetadataRepository, NamespaceRepository,
SigningKeysRepository, UpgradesRepository,
};
use std::time::Duration;
use calimero_context::group_store::register_context_in_group;
use calimero_context_client::group::UpgradeGroupRequest;
use calimero_context_client::messages::MigrationParams;
use calimero_context_config::types::ContextGroupId;
use calimero_primitives::application::ApplicationId;
use calimero_primitives::context::{ContextId, GroupMemberRole, UpgradePolicy};
use calimero_primitives::identity::{PrivateKey, PublicKey};
use calimero_store::key::{
self, ApplicationMeta as ApplicationMetaKey, ContextMeta as ContextMetaKey, GroupMetaValue,
GroupUpgradeStatus, GroupUpgradeValue,
};
use calimero_store::types::{ApplicationMeta, ContextMeta};
use calimero_store::Store;
use rand::rngs::OsRng;
use tokio::time::sleep;
use crate::local_governance_node_e2e::boot_test_node;
const APP_KEY_V1: [u8; 32] = [0x11; 32];
const APP_KEY_V2: [u8; 32] = [0x22; 32];
const APP_KEY_OTHER: [u8; 32] = [0x33; 32];
fn app_id_v1() -> ApplicationId {
ApplicationId::from([0xAA; 32])
}
fn app_id_v2() -> ApplicationId {
ApplicationId::from([0xBB; 32])
}
fn app_id_other() -> ApplicationId {
ApplicationId::from([0xCC; 32])
}
fn meta_for(admin: PublicKey, app_key: [u8; 32], target: ApplicationId) -> GroupMetaValue {
GroupMetaValue {
app_key,
target_application_id: target,
upgrade_policy: UpgradePolicy::Automatic,
created_at: 1_700_000_000,
admin_identity: admin,
owner_identity: admin,
migration: None,
auto_join: true,
}
}
fn provision_group(
store: &Store,
gid: &ContextGroupId,
admin: PublicKey,
app_key: [u8; 32],
target: ApplicationId,
) {
MetaRepository::new(store)
.save(gid, &meta_for(admin, app_key, target))
.expect("save_group_meta");
MembershipRepository::new(store)
.add_member(gid, &admin, GroupMemberRole::Admin)
.expect("add admin");
}
fn install_application(store: &Store, app_id: ApplicationId, app_key: [u8; 32], version: &str) {
let bytecode_blob = key::BlobMeta::new(calimero_primitives::blobs::BlobId::from(app_key));
let meta = ApplicationMeta::new(
bytecode_blob,
1,
"test://cascade".to_owned().into_boxed_str(),
Box::new([]),
bytecode_blob,
"cascade-test-pkg".to_owned().into_boxed_str(),
version.to_owned().into_boxed_str(),
"cascade-test-signer".to_owned().into_boxed_str(),
);
let mut handle = store.handle();
handle
.put(&ApplicationMetaKey::new(app_id), &meta)
.expect("put ApplicationMeta");
}
fn register_context_for(
store: &Store,
group_id: &ContextGroupId,
context_id: ContextId,
app_id: ApplicationId,
) {
let meta = ContextMeta::new(
ApplicationMetaKey::new(app_id),
[0x01; 32],
Vec::new(),
None,
);
let mut handle = store.handle();
handle
.put(&ContextMetaKey::new(context_id), &meta)
.expect("put ContextMeta");
register_context_in_group(store, group_id, &context_id).expect("register_context_in_group");
}
struct CascadeFixture {
admin_pk: PublicKey,
ns: ContextGroupId,
g1: ContextGroupId,
g2: ContextGroupId,
ctx_ns: ContextId,
ctx_g1: ContextId,
ctx_g2: ContextId,
}
fn provision_namespace(
store: &Store,
admin_sk: &PrivateKey,
g2_app_key: [u8; 32],
) -> CascadeFixture {
let admin_pk = admin_sk.public_key();
let ns = ContextGroupId::from([0x70; 32]);
let g1 = ContextGroupId::from([0xA1; 32]);
let g2 = ContextGroupId::from([0xA2; 32]);
provision_group(store, &ns, admin_pk, APP_KEY_V1, app_id_v1());
provision_group(store, &g1, admin_pk, APP_KEY_V1, app_id_v1());
let g2_target = if g2_app_key == APP_KEY_V1 {
app_id_v1()
} else {
app_id_other()
};
provision_group(store, &g2, admin_pk, g2_app_key, g2_target);
NamespaceRepository::new(store)
.nest(&ns, &g1)
.expect("nest g1");
NamespaceRepository::new(store)
.nest(&ns, &g2)
.expect("nest g2");
install_application(store, app_id_v1(), APP_KEY_V1, "0.1.0");
install_application(store, app_id_v2(), APP_KEY_V2, "0.2.0");
if g2_app_key != APP_KEY_V1 {
install_application(store, app_id_other(), APP_KEY_OTHER, "0.1.0-other");
}
let ctx_ns = ContextId::from([0xC0; 32]);
let ctx_g1 = ContextId::from([0xC1; 32]);
let ctx_g2 = ContextId::from([0xC2; 32]);
register_context_for(store, &ns, ctx_ns, app_id_v1());
register_context_for(store, &g1, ctx_g1, app_id_v1());
register_context_for(
store,
&g2,
ctx_g2,
if g2_app_key == APP_KEY_V1 {
app_id_v1()
} else {
app_id_other()
},
);
SigningKeysRepository::new(store)
.store_key(&ns, &admin_pk, admin_sk)
.expect("store signing key");
CascadeFixture {
admin_pk,
ns,
g1,
g2,
ctx_ns,
ctx_g1,
ctx_g2,
}
}
async fn wait_until<F: Fn() -> bool>(cond: F) -> bool {
for _ in 0..200 {
if cond() {
return true;
}
sleep(Duration::from_millis(25)).await;
}
cond()
}
#[tokio::test]
async fn cascade_dispatch_e2e_single_node_emitter() {
let node = boot_test_node().await;
let mut rng = OsRng;
let admin_sk = PrivateKey::random(&mut rng);
let fx = provision_namespace(&node.store, &admin_sk, APP_KEY_V1);
let response = node
.context_client
.upgrade_group(UpgradeGroupRequest {
group_id: fx.ns,
target_application_id: app_id_v2(),
requester: Some(fx.admin_pk),
migration: Some(MigrationParams {
method: "migrate_v1_to_v2".to_owned(),
}),
cascade: true,
})
.await
.expect("cascade upgrade should succeed");
assert_eq!(response.group_id, fx.ns, "response must echo signed group");
for gid in [&fx.ns, &fx.g1, &fx.g2] {
let meta = MetaRepository::new(&node.store)
.load(gid)
.expect("load_group_meta")
.expect("meta exists");
assert_eq!(
meta.app_key,
APP_KEY_V2,
"group {} must have rotated app_key",
hex::encode(gid.to_bytes())
);
assert_eq!(
meta.target_application_id,
app_id_v2(),
"group {} must point at app_v2",
hex::encode(gid.to_bytes())
);
}
for gid in [&fx.ns, &fx.g1, &fx.g2] {
let observed = wait_until(|| {
UpgradesRepository::new(&node.store)
.load(gid)
.ok()
.flatten()
.is_some()
})
.await;
assert!(
observed,
"per-descendant GroupUpgradeValue must exist for {}",
hex::encode(gid.to_bytes())
);
let upgrade = UpgradesRepository::new(&node.store)
.load(gid)
.expect("load_group_upgrade")
.expect("upgrade row");
match upgrade.status {
GroupUpgradeStatus::InProgress {
total,
completed: _,
failed: _,
} => {
let expected_total = MetadataRepository::new(&node.store)
.count_contexts(gid)
.expect("count_group_contexts") as u32;
assert_eq!(
total,
expected_total,
"InProgress.total for {} must match enumerated context count",
hex::encode(gid.to_bytes())
);
}
GroupUpgradeStatus::Completed { .. } => {
}
}
assert_eq!(upgrade.initiated_by, fx.admin_pk, "initiated_by mismatch");
}
}
#[tokio::test]
async fn cascade_dispatch_e2e_write_gate_blocks_user_calls() {
let node = boot_test_node().await;
let mut rng = OsRng;
let admin_sk = PrivateKey::random(&mut rng);
let fx = provision_namespace(&node.store, &admin_sk, APP_KEY_V1);
UpgradesRepository::new(&node.store)
.save(
&fx.g1,
&GroupUpgradeValue {
from_version: "0.1.0".to_owned(),
to_version: "0.2.0".to_owned(),
migration: None,
initiated_at: 1_700_000_000,
initiated_by: fx.admin_pk,
status: GroupUpgradeStatus::InProgress {
total: 1,
completed: 0,
failed: 0,
},
},
)
.expect("save_group_upgrade InProgress for G1");
let err = node
.context_client
.execute(
&fx.ctx_g1,
&fx.admin_pk,
"set_description".to_owned(),
Vec::new(),
Vec::new(),
None,
)
.await
.expect_err("execute must be refused while G1 is InProgress");
use calimero_context_client::messages::ExecuteError;
match err {
ExecuteError::UpgradeInProgress { group_id } => {
assert_eq!(
group_id, fx.g1,
"gate must surface the owning group of the targeted context"
);
}
other => panic!(
"expected ExecuteError::UpgradeInProgress, got {other:?} — \
write-gate is not firing on a pre-set InProgress status"
),
}
}
#[tokio::test]
async fn cascade_dispatch_e2e_predicate_skip_on_heterogeneous() {
let node = boot_test_node().await;
let mut rng = OsRng;
let admin_sk = PrivateKey::random(&mut rng);
let fx = provision_namespace(&node.store, &admin_sk, APP_KEY_OTHER);
node.context_client
.upgrade_group(UpgradeGroupRequest {
group_id: fx.ns,
target_application_id: app_id_v2(),
requester: Some(fx.admin_pk),
migration: Some(MigrationParams {
method: "migrate_v1_to_v2".to_owned(),
}),
cascade: true,
})
.await
.expect("cascade upgrade should succeed");
for gid in [&fx.ns, &fx.g1] {
let meta = MetaRepository::new(&node.store)
.load(gid)
.expect("load_group_meta")
.expect("meta exists");
assert_eq!(
meta.app_key,
APP_KEY_V2,
"{} must migrate",
hex::encode(gid.to_bytes())
);
assert_eq!(meta.target_application_id, app_id_v2());
}
let meta_g2 = MetaRepository::new(&node.store)
.load(&fx.g2)
.expect("load_group_meta g2")
.expect("g2 meta exists");
assert_eq!(
meta_g2.app_key, APP_KEY_OTHER,
"G2 must NOT be touched — predicate skip on heterogeneous app_key"
);
assert_eq!(meta_g2.target_application_id, app_id_other());
assert!(
UpgradesRepository::new(&node.store).load(&fx.g2)
.expect("load_group_upgrade g2")
.is_none(),
"G2 must NOT have a GroupUpgradeValue row — predicate skip means no propagator and no status write"
);
for gid in [&fx.ns, &fx.g1] {
assert!(
UpgradesRepository::new(&node.store)
.load(gid)
.expect("load_group_upgrade")
.is_some(),
"{} must have a GroupUpgradeValue row",
hex::encode(gid.to_bytes())
);
}
let _ = (fx.ctx_ns, fx.ctx_g1, fx.ctx_g2);
sleep(Duration::from_millis(25)).await;
}