use std::sync::Arc;
use std::time::Duration;
use actix::Actor;
use calimero_blobstore::config::BlobStoreConfig;
use calimero_blobstore::{BlobManager as BlobStore, FileSystem};
use calimero_context::group_store::{
add_group_member, apply_local_signed_group_op, check_group_membership, count_group_members,
get_group_member_value, get_local_gov_nonce, save_group_meta, store_group_signing_key,
store_namespace_identity,
};
use calimero_context::ContextManager;
use calimero_context_client::client::ContextClient;
use calimero_context_client::group::SetMemberAutoFollowRequest;
use calimero_context_client::local_governance::{GroupOp, SignedGroupOp};
use calimero_context_config::types::ContextGroupId;
use calimero_network_primitives::client::NetworkClient;
use calimero_network_primitives::messages::{IdentTopic, Message, MessageId, NetworkEvent};
use calimero_network_primitives::specialized_node_invite::SpecializedNodeType;
use calimero_node_primitives::client::{BlobManager, NodeClient, SyncClient};
use calimero_node_primitives::messages::NodeMessage;
use calimero_node_primitives::sync::BroadcastMessage;
use calimero_node_primitives::NodeMode;
use calimero_primitives::application::ApplicationId;
use calimero_primitives::context::{GroupMemberRole, UpgradePolicy};
use calimero_primitives::identity::{PrivateKey, PublicKey};
use calimero_store::db::InMemoryDB;
use calimero_store::key::GroupMetaValue;
use calimero_store::Store;
use calimero_utils_actix::LazyRecipient;
use prometheus_client::registry::Registry;
use rand::rngs::OsRng;
use sha2::{Digest, Sha256};
use tempfile::TempDir;
use tokio::sync::{broadcast, mpsc};
use tokio::time::sleep;
use crate::arbiter_pool::ArbiterPool;
use crate::sync::{SyncConfig, SyncManager};
use crate::{NodeManager, NodeState};
struct StubNetworkActor;
impl actix::Actor for StubNetworkActor {
type Context = actix::Context<Self>;
}
impl actix::Handler<calimero_network_primitives::messages::NetworkMessage> for StubNetworkActor {
type Result = ();
fn handle(
&mut self,
msg: calimero_network_primitives::messages::NetworkMessage,
_ctx: &mut Self::Context,
) -> Self::Result {
use calimero_network_primitives::messages::NetworkMessage;
match msg {
NetworkMessage::MeshPeerCount { outcome, .. } => {
let _ = outcome.send(0);
}
NetworkMessage::MeshPeers { outcome, .. } => {
let _ = outcome.send(Vec::new());
}
NetworkMessage::MeshStats { outcome, .. } => {
let _ = outcome.send(Vec::new());
}
NetworkMessage::PeerCount { outcome, .. } => {
let _ = outcome.send(0);
}
NetworkMessage::Publish { outcome, .. } => {
let _ = outcome.send(Ok(MessageId(b"stub".to_vec())));
}
_ => {}
}
}
}
fn sample_meta(admin: PublicKey) -> GroupMetaValue {
GroupMetaValue {
app_key: [0xBB; 32],
target_application_id: ApplicationId::from([0xCC; 32]),
upgrade_policy: UpgradePolicy::Automatic,
created_at: 1_700_000_000,
admin_identity: admin,
owner_identity: admin,
migration: None,
auto_join: true,
}
}
struct TestNode {
_pool: ArbiterPool,
_tmp: TempDir,
store: Store,
context_client: ContextClient,
node_addr: actix::Addr<NodeManager>,
}
async fn boot_test_node() -> TestNode {
let mut pool = ArbiterPool::new().await.expect("arbiter pool");
let tmp = tempfile::tempdir().expect("tempdir");
let db = InMemoryDB::owned();
let store = Store::new(Arc::new(db));
let blob_store_config =
BlobStoreConfig::new(tmp.path().to_path_buf().try_into().expect("utf8 blob path"));
let file_system = FileSystem::new(&blob_store_config).await.expect("blob fs");
let blob_store = BlobStore::new(store.clone(), file_system);
let blob_manager = BlobManager::new(blob_store.clone());
let node_recipient = LazyRecipient::<NodeMessage>::new();
let context_recipient = LazyRecipient::new();
let network_recipient = LazyRecipient::new();
let network_client = NetworkClient::new(network_recipient.clone());
let (event_sender, _) = broadcast::channel(16);
let (ctx_sync_tx, ctx_sync_rx) = mpsc::channel(64);
let (ns_sync_tx, ns_sync_rx) = mpsc::channel(16);
let (ns_join_tx, ns_join_rx) = mpsc::channel(16);
let (open_subgroup_join_tx, open_subgroup_join_rx) = mpsc::channel(16);
let sync_client = SyncClient::new(ctx_sync_tx, ns_sync_tx, ns_join_tx, open_subgroup_join_tx);
let node_client = NodeClient::new(
store.clone(),
blob_manager.clone(),
network_client.clone(),
node_recipient.clone(),
event_sender,
sync_client,
String::new(),
None,
);
let context_client = ContextClient::new(
store.clone(),
node_client.clone(),
context_recipient.clone(),
);
let mut registry = Registry::default();
let context_manager = ContextManager::new(
store.clone(),
node_client.clone(),
context_client.clone(),
Some(&mut registry),
);
let node_state = NodeState::new(false, NodeMode::Standard);
let mut sync_manager = SyncManager::new(
SyncConfig::default(),
node_client.clone(),
context_client.clone(),
network_client.clone(),
node_state.clone(),
ctx_sync_rx,
ns_sync_rx,
ns_join_rx,
open_subgroup_join_rx,
);
let state_delta_arbiter = pool.get().await.expect("state-delta arbiter");
let state_delta_tx = crate::state_delta_bridge::start_state_delta_actor(
&state_delta_arbiter,
crate::state_delta_bridge::STATE_DELTA_CHANNEL_CAPACITY,
);
let sync_session_arbiter = pool.get().await.expect("sync-session arbiter");
let (session_result_tx, session_result_rx) = tokio::sync::mpsc::unbounded_channel();
let sync_session_tx = crate::sync_session_bridge::start_sync_session_actor(
&sync_session_arbiter,
crate::sync_session_bridge::SYNC_SESSION_CHANNEL_CAPACITY,
SyncConfig::default().max_concurrent,
sync_manager.clone(),
SyncConfig::default().session_deadline,
Some(session_result_tx),
&mut registry,
);
sync_manager.set_session_handles(sync_session_tx.clone(), session_result_rx);
let node_manager = NodeManager::new(
blob_store,
sync_manager,
context_client.clone(),
node_client,
store.clone(),
node_state,
state_delta_tx,
sync_session_tx,
prometheus_client::metrics::counter::Counter::default(),
);
let arb = pool.get().await.expect("arbiter");
let _context_addr = Actor::start_in_arbiter(&arb, move |ctx| {
assert!(context_recipient.init(ctx), "context recipient");
context_manager
});
let arb2 = pool.get().await.expect("arbiter 2");
let node_addr = Actor::start_in_arbiter(&arb2, move |ctx| {
assert!(node_recipient.init(ctx), "node recipient");
node_manager
});
let arb3 = pool.get().await.expect("arbiter 3");
let _network_addr = Actor::start_in_arbiter(&arb3, move |ctx| {
assert!(network_recipient.init(ctx), "network recipient");
StubNetworkActor
});
sleep(Duration::from_millis(50)).await;
TestNode {
_pool: pool,
_tmp: tmp,
store,
context_client,
node_addr,
}
}
#[tokio::test]
async fn apply_signed_group_op_via_context_client() {
let node = boot_test_node().await;
let mut rng = OsRng;
let gid = ContextGroupId::from([0x77u8; 32]);
let gid_bytes = gid.to_bytes();
let admin_sk = PrivateKey::random(&mut rng);
let admin_pk = admin_sk.public_key();
save_group_meta(&node.store, &gid, &sample_meta(admin_pk)).expect("save_group_meta");
add_group_member(&node.store, &gid, &admin_pk, GroupMemberRole::Admin).expect("add admin");
let new_member = PrivateKey::random(&mut rng).public_key();
let op = SignedGroupOp::sign(
&admin_sk,
gid_bytes,
vec![],
[0u8; 32],
1,
GroupOp::MemberAdded {
member: new_member,
role: GroupMemberRole::Member,
},
)
.expect("sign op");
match node
.context_client
.apply_signed_group_op(op)
.await
.expect("apply")
{
true => {}
false => panic!("expected op applied immediately (no pending parents)"),
}
assert!(
check_group_membership(&node.store, &gid, &new_member).expect("check_group_membership"),
"member should be present after apply_signed_group_op"
);
assert_eq!(
get_local_gov_nonce(&node.store, &gid, &admin_pk)
.expect("get_local_gov_nonce")
.expect("nonce row"),
1
);
}
#[tokio::test]
async fn set_member_auto_follow_handler_error_paths() {
let node = boot_test_node().await;
let mut rng = OsRng;
let gid = ContextGroupId::from([0x55u8; 32]);
let admin_sk = PrivateKey::random(&mut rng);
let alice_sk = PrivateKey::random(&mut rng);
let stranger = PrivateKey::random(&mut rng).public_key();
save_group_meta(&node.store, &gid, &sample_meta(admin_sk.public_key())).unwrap();
add_group_member(
&node.store,
&gid,
&admin_sk.public_key(),
GroupMemberRole::Admin,
)
.unwrap();
add_group_member(
&node.store,
&gid,
&alice_sk.public_key(),
GroupMemberRole::Member,
)
.unwrap();
store_group_signing_key(&node.store, &gid, &admin_sk.public_key(), &admin_sk).unwrap();
let unknown_gid = ContextGroupId::from([0xEE; 32]);
let err = node
.context_client
.set_member_auto_follow(SetMemberAutoFollowRequest {
group_id: unknown_gid,
target: alice_sk.public_key(),
auto_follow_contexts: true,
auto_follow_subgroups: false,
requester: Some(admin_sk.public_key()),
})
.await
.expect_err("unknown group should fail preflight");
assert!(
err.to_string().contains("not found"),
"unexpected error: {err}"
);
let err = node
.context_client
.set_member_auto_follow(SetMemberAutoFollowRequest {
group_id: gid,
target: stranger,
auto_follow_contexts: true,
auto_follow_subgroups: false,
requester: Some(admin_sk.public_key()),
})
.await
.expect_err("stranger is not a member");
assert!(
err.to_string().contains("not a member"),
"unexpected error: {err}"
);
let alice_row = get_group_member_value(&node.store, &gid, &alice_sk.public_key())
.unwrap()
.expect("alice row");
assert!(alice_row.auto_follow.contexts);
assert!(!alice_row.auto_follow.subgroups);
}
const MOCK_QUOTE_HEADER: &[u8] = b"MOCK_TDX_QUOTE_V1";
const MOCK_MEASUREMENT_48_HEX: &str =
"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000";
fn mock_quote_bytes(nonce: &[u8; 32], pk_hash: &[u8; 32]) -> Vec<u8> {
let report_data = calimero_tee_attestation::build_report_data(nonce, Some(pk_hash));
let mut quote_bytes = Vec::with_capacity(256);
quote_bytes.extend_from_slice(MOCK_QUOTE_HEADER);
quote_bytes.extend_from_slice(&report_data);
quote_bytes.resize(256, 0);
quote_bytes
}
fn announce_network_event(
source: libp2p::PeerId,
topic: &str,
quote_bytes: Vec<u8>,
public_key: PublicKey,
nonce: [u8; 32],
) -> NetworkEvent {
let payload = BroadcastMessage::TeeAttestationAnnounce {
quote_bytes,
public_key,
nonce,
node_type: SpecializedNodeType::ReadOnly,
};
let data = borsh::to_vec(&payload).expect("borsh encode TeeAttestationAnnounce");
NetworkEvent::Message {
id: MessageId(b"test-announce".to_vec()),
message: Message {
source: Some(source),
data,
sequence_number: Some(1),
topic: IdentTopic::new(topic.to_owned()).hash(),
},
}
}
fn provision_tee_owner(node: &TestNode, gid: &ContextGroupId, rng: &mut OsRng) -> PublicKey {
let owner_sk = PrivateKey::random(rng);
let owner_pk = owner_sk.public_key();
save_group_meta(&node.store, gid, &sample_meta(owner_pk)).expect("save_group_meta");
add_group_member(&node.store, gid, &owner_pk, GroupMemberRole::Admin).expect("add owner admin");
store_namespace_identity(&node.store, gid, &owner_pk, &owner_sk, &[0u8; 32])
.expect("store_namespace_identity");
let policy_op = SignedGroupOp::sign(
&owner_sk,
gid.to_bytes(),
vec![],
[0u8; 32],
1,
GroupOp::TeeAdmissionPolicySet {
allowed_mrtd: vec![MOCK_MEASUREMENT_48_HEX.to_owned()],
allowed_rtmr0: vec![],
allowed_rtmr1: vec![],
allowed_rtmr2: vec![],
allowed_rtmr3: vec![],
allowed_tcb_statuses: vec![],
accept_mock: true,
},
)
.expect("sign TeeAdmissionPolicySet");
apply_local_signed_group_op(&node.store, &policy_op).expect("apply policy op");
owner_pk
}
async fn wait_until<F: Fn() -> bool>(cond: F) -> bool {
for _ in 0..100 {
if cond() {
return true;
}
sleep(Duration::from_millis(50)).await;
}
cond()
}
#[tokio::test]
async fn ns_announce_admits_announcer_as_read_only_tee_member() {
let node = boot_test_node().await;
let mut rng = OsRng;
let gid = ContextGroupId::from([0x91u8; 32]);
let owner_pk = provision_tee_owner(&node, &gid, &mut rng);
assert_eq!(count_group_members(&node.store, &gid).expect("count"), 1);
assert!(
check_group_membership(&node.store, &gid, &owner_pk).expect("owner membership"),
"owner must be the sole member before the announce"
);
let announcer_pk = PrivateKey::random(&mut rng).public_key();
let nonce = [0x7Au8; 32];
let pk_hash: [u8; 32] = Sha256::digest(*announcer_pk).into();
let quote_bytes = mock_quote_bytes(&nonce, &pk_hash);
let topic = format!("ns/{}", hex::encode(gid.to_bytes()));
let event = announce_network_event(
libp2p::PeerId::random(),
&topic,
quote_bytes,
announcer_pk,
nonce,
);
node.node_addr
.send(event)
.await
.expect("deliver NetworkEvent to node actor");
let admitted = wait_until(|| {
get_group_member_value(&node.store, &gid, &announcer_pk)
.ok()
.flatten()
.map(|v| v.role == GroupMemberRole::ReadOnlyTee)
.unwrap_or(false)
})
.await;
assert!(
admitted,
"announcer must be admitted as a ReadOnlyTee member after a TeeAttestationAnnounce on the ns/ topic"
);
assert_eq!(
count_group_members(&node.store, &gid).expect("count after admit"),
2,
"member_count must increment to 2 (owner + admitted TEE node)"
);
}
#[tokio::test]
async fn group_topic_announce_is_not_routed_as_namespace_admission() {
let node = boot_test_node().await;
let mut rng = OsRng;
let gid = ContextGroupId::from([0x92u8; 32]);
let _owner_pk = provision_tee_owner(&node, &gid, &mut rng);
let announcer_pk = PrivateKey::random(&mut rng).public_key();
let nonce = [0x7Bu8; 32];
let pk_hash: [u8; 32] = Sha256::digest(*announcer_pk).into();
let quote_bytes = mock_quote_bytes(&nonce, &pk_hash);
let topic = format!("group/{}", hex::encode(gid.to_bytes()));
let event = announce_network_event(
libp2p::PeerId::random(),
&topic,
quote_bytes,
announcer_pk,
nonce,
);
node.node_addr
.send(event)
.await
.expect("deliver NetworkEvent to node actor");
assert!(
get_group_member_value(&node.store, &gid, &announcer_pk)
.ok()
.flatten()
.is_none(),
"a group/ announce was routed into admission synchronously (the #2096 bug shape)"
);
let leaked = wait_until(|| {
get_group_member_value(&node.store, &gid, &announcer_pk)
.ok()
.flatten()
.is_some()
})
.await;
assert!(
!leaked,
"a TeeAttestationAnnounce on a group/ topic must not be routed into namespace admission"
);
assert_eq!(
count_group_members(&node.store, &gid).expect("count"),
1,
"no member should be admitted from a group/ topic announce (owner only)"
);
}