use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use calimero_context::governance_broadcast::ns_topic;
use calimero_context::group_store::{self, namespace_member_pubkeys, NamespaceGovernance};
use calimero_context_client::local_governance::{
AckRouter, NamespaceOp, NamespaceTopicMsg, ReadinessProbe, RootOp,
};
use calimero_context_config::types::{ContextGroupId, SignedGroupOpenInvitation};
use calimero_node_primitives::client::NodeClient;
use calimero_node_primitives::sync::BroadcastMessage;
use calimero_primitives::application::ApplicationId;
use calimero_primitives::context::UpgradePolicy;
use calimero_primitives::identity::{PrivateKey, PublicKey};
use calimero_store::key::GroupMetaValue;
use calimero_store::Store;
use rand::Rng;
use thiserror::Error;
use tracing::debug;
use zeroize::Zeroize;
use crate::readiness::{ReadinessCache, ReadinessCacheNotify, ReadinessConfig};
#[derive(Debug, Clone)]
pub struct JoinStarted {
pub namespace_id: [u8; 32],
pub sync_partner: PublicKey,
pub partner_head: [u8; 32],
pub partner_applied: u64,
pub elapsed_ms: u64,
}
#[derive(Debug, Clone)]
pub struct ReadyReport {
pub namespace_id: [u8; 32],
pub final_head: [u8; 32],
pub applied_through: u64,
pub members_learned: usize,
pub acked_by: Vec<PublicKey>,
pub elapsed_ms: u64,
}
#[derive(Debug, Error)]
pub enum JoinError {
#[error("no ready peers responded within {waited_ms}ms")]
NoReadyPeers { waited_ms: u64 },
#[error("invitation invalid: {0}")]
InvalidInvitation(String),
#[error("transport: {0}")]
Transport(String),
#[error("local: {0}")]
Local(String),
}
#[derive(Debug, Error)]
pub enum ReadyError {
#[error("no ready peers in cache")]
NoReadyPeers,
#[error("backfill: {0}")]
Backfill(String),
#[error("publish MemberJoined: {0}")]
PublishMemberJoined(String),
#[error("local: {0}")]
Local(String),
#[error("join failed: {0}")]
JoinFailed(String),
#[error("invitation invalid: {0}")]
InvalidInvitation(String),
}
pub async fn join_namespace(
store: &Store,
node_client: &NodeClient,
readiness_cache: &Arc<ReadinessCache>,
readiness_notify: &Arc<ReadinessCacheNotify>,
config: &ReadinessConfig,
invitation: SignedGroupOpenInvitation,
deadline: Duration,
) -> Result<JoinStarted, JoinError> {
let start = Instant::now();
let group_id = invitation.invitation.group_id;
let expiration = invitation.invitation.expiration_timestamp;
if expiration != 0 {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if now > expiration {
return Err(JoinError::InvalidInvitation("invitation expired".into()));
}
}
let (ns_id, _pk, mut sk_bytes, mut sender_key) =
group_store::get_or_create_namespace_identity(store, &group_id)
.map_err(|e| JoinError::Local(e.to_string()))?;
let namespace_id = ns_id.to_bytes();
sk_bytes.zeroize();
sender_key.zeroize();
if group_store::load_group_meta(store, &group_id)
.map_err(|e| JoinError::Local(e.to_string()))?
.is_none()
{
let admin_identity = PublicKey::from(invitation.invitation.inviter_identity.to_bytes());
let meta = GroupMetaValue {
admin_identity,
target_application_id: ApplicationId::from([0u8; 32]),
app_key: [0u8; 32],
upgrade_policy: UpgradePolicy::default(),
migration: None,
created_at: 0,
auto_join: true,
};
group_store::save_group_meta(store, &group_id, &meta)
.map_err(|e| JoinError::Local(e.to_string()))?;
}
node_client
.subscribe_namespace(namespace_id)
.await
.map_err(|e| JoinError::Transport(e.to_string()))?;
let probe = ReadinessProbe {
namespace_id,
nonce: rand::random(),
};
let inner = borsh::to_vec(&NamespaceTopicMsg::ReadinessProbe(probe))
.map_err(|e| JoinError::Transport(e.to_string()))?;
let envelope = BroadcastMessage::NamespaceGovernanceDelta {
namespace_id,
delta_id: [0u8; 32],
parent_ids: Vec::new(),
payload: inner,
};
let bytes = borsh::to_vec(&envelope).map_err(|e| JoinError::Transport(e.to_string()))?;
let topic = ns_topic(namespace_id);
if let Err(err) = node_client.network_client().publish(topic, bytes).await {
debug!(
?err,
"ReadinessProbe publish failed (non-fatal — solo or no-peers)"
);
}
let remaining = deadline.saturating_sub(start.elapsed());
let (sync_partner, entry) = readiness_cache
.await_first_fresh_beacon(
readiness_notify,
namespace_id,
config.ttl_heartbeat,
remaining,
)
.await
.ok_or_else(|| JoinError::NoReadyPeers {
waited_ms: start.elapsed().as_millis() as u64,
})?;
Ok(JoinStarted {
namespace_id,
sync_partner,
partner_head: entry.head,
partner_applied: entry.applied_through,
elapsed_ms: start.elapsed().as_millis() as u64,
})
}
pub async fn await_namespace_ready(
store: &Store,
node_client: &NodeClient,
ack_router: &AckRouter,
invitation: SignedGroupOpenInvitation,
namespace_id: [u8; 32],
deadline: Duration,
) -> Result<ReadyReport, ReadyError> {
let start = Instant::now();
if let Err(err) = node_client.sync_namespace(namespace_id).await {
debug!(
?err,
"namespace sync request failed (non-fatal — try publish anyway)"
);
}
let backfill_wait = std::cmp::min(deadline / 4, Duration::from_secs(2));
tokio::time::sleep(backfill_wait).await;
let mesh_poll_window = std::cmp::min(
deadline.saturating_sub(start.elapsed()),
Duration::from_secs(3),
);
let mesh_deadline = Instant::now() + mesh_poll_window;
let mesh_n_low = node_client.gossipsub_mesh_n_low();
loop {
let mesh = node_client
.mesh_peer_count_for_namespace(namespace_id)
.await;
let topic = ns_topic(namespace_id);
let known = node_client.known_subscribers(&topic);
let required = std::cmp::min(mesh_n_low, known);
if mesh >= required {
break;
}
if Instant::now() >= mesh_deadline {
break;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
let group_id = ContextGroupId::from(namespace_id);
let (_, my_pk, mut my_sk_bytes, mut sender_key) =
group_store::get_or_create_namespace_identity(store, &group_id)
.map_err(|e| ReadyError::Local(e.to_string()))?;
sender_key.zeroize();
let signing_key = PrivateKey::from(my_sk_bytes);
my_sk_bytes.zeroize();
let op = NamespaceOp::Root(RootOp::MemberJoined {
member: my_pk,
signed_invitation: invitation,
});
let report = NamespaceGovernance::new(store, namespace_id)
.sign_and_publish_without_apply(node_client, ack_router, &signing_key, op, None)
.await
.map_err(|e| ReadyError::PublishMemberJoined(e.to_string()))?;
let members_learned = namespace_member_pubkeys(store, namespace_id)
.map(|m| m.len())
.unwrap_or(0);
Ok(ReadyReport {
namespace_id,
final_head: [0u8; 32], applied_through: 0, members_learned,
acked_by: report.acked_by,
elapsed_ms: start.elapsed().as_millis() as u64,
})
}
pub async fn join_and_wait_ready(
store: &Store,
node_client: &NodeClient,
ack_router: &AckRouter,
readiness_cache: &Arc<ReadinessCache>,
readiness_notify: &Arc<ReadinessCacheNotify>,
config: &ReadinessConfig,
invitation: SignedGroupOpenInvitation,
deadline: Duration,
) -> Result<ReadyReport, ReadyError> {
let start = Instant::now();
let join_deadline = std::cmp::min(Duration::from_secs(2), deadline / 2);
let started = join_namespace(
store,
node_client,
readiness_cache,
readiness_notify,
config,
invitation.clone(),
join_deadline,
)
.await
.map_err(|e| match e {
JoinError::InvalidInvitation(msg) => ReadyError::InvalidInvitation(msg),
other => ReadyError::JoinFailed(other.to_string()),
})?;
let ready_deadline = deadline.saturating_sub(start.elapsed());
await_namespace_ready(
store,
node_client,
ack_router,
invitation,
started.namespace_id,
ready_deadline,
)
.await
}
const ATTEMPT_DEADLINE: Duration = Duration::from_secs(10);
const MAX_BACKOFF: Duration = Duration::from_secs(30);
const INITIAL_BACKOFF: Duration = Duration::from_secs(3);
pub async fn join_namespace_with_retry(
store: &Store,
node_client: &NodeClient,
readiness_cache: &Arc<ReadinessCache>,
readiness_notify: &Arc<ReadinessCacheNotify>,
config: &ReadinessConfig,
invitation: SignedGroupOpenInvitation,
max_total: Duration,
) -> Result<JoinStarted, JoinError> {
let mut delay = INITIAL_BACKOFF;
let start = Instant::now();
loop {
let remaining = max_total.saturating_sub(start.elapsed());
if remaining.is_zero() {
return Err(JoinError::NoReadyPeers {
waited_ms: start.elapsed().as_millis() as u64,
});
}
let attempt_deadline = std::cmp::min(ATTEMPT_DEADLINE, remaining);
match join_namespace(
store,
node_client,
readiness_cache,
readiness_notify,
config,
invitation.clone(),
attempt_deadline,
)
.await
{
Ok(started) => return Ok(started),
Err(JoinError::NoReadyPeers { .. }) => {
let jitter = {
let bound = delay.as_millis() as u64 / 4;
if bound == 0 {
Duration::ZERO
} else {
Duration::from_millis(rand::thread_rng().gen_range(0..bound))
}
};
if start.elapsed() + delay + jitter > max_total {
return Err(JoinError::NoReadyPeers {
waited_ms: start.elapsed().as_millis() as u64,
});
}
tokio::time::sleep(delay + jitter).await;
delay = std::cmp::min(delay * 2, MAX_BACKOFF);
}
Err(other) => return Err(other),
}
}
}