use std::collections::BTreeSet;
use std::time::Duration;
use calimero_primitives::context::ContextId;
use calimero_primitives::identity::PublicKey;
use libp2p::gossipsub::TopicHash;
use libp2p::PeerId;
use tokio::time;
use tracing::{debug, warn};
use super::network::SyncNetwork;
use super::state_access::SyncStateAccess;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum PeerSource {
ContextMesh,
NamespaceFallback,
}
#[derive(Debug)]
pub(crate) struct DiscoveryOutcome {
pub(crate) peers: Vec<PeerId>,
pub(crate) source: PeerSource,
pub(crate) attempts: u32,
pub(crate) elapsed: Duration,
}
pub(crate) async fn discover_mesh_peers_with_namespace_fallback(
sync_network: &dyn SyncNetwork,
context_id: ContextId,
max_retries: u32,
retry_delay: Duration,
resolve_namespace_topic: impl FnOnce() -> Option<TopicHash>,
) -> eyre::Result<DiscoveryOutcome> {
let discovery_started = std::time::Instant::now();
let context_topic = TopicHash::from_raw(context_id);
let mut peers = Vec::new();
let mut final_attempt = 0u32;
for attempt in 1..=max_retries {
final_attempt = attempt;
peers = sync_network.mesh_peers(context_topic.clone()).await;
if !peers.is_empty() {
break;
}
if attempt < max_retries {
debug!(
%context_id,
attempt,
max_retries,
"No peers found yet, mesh may still be forming, retrying..."
);
time::sleep(retry_delay).await;
}
}
if !peers.is_empty() {
return Ok(DiscoveryOutcome {
peers,
source: PeerSource::ContextMesh,
attempts: final_attempt,
elapsed: discovery_started.elapsed(),
});
}
if let Some(ns_topic) = resolve_namespace_topic() {
let ns_peers = sync_network.mesh_peers(ns_topic).await;
if !ns_peers.is_empty() {
let ns_candidate_count = ns_peers.len();
let ctx_subscribers: BTreeSet<PeerId> = sync_network
.mesh_peers(context_topic.clone())
.await
.into_iter()
.collect();
let filtered: Vec<PeerId> = ns_peers
.into_iter()
.filter(|peer| ctx_subscribers.contains(peer))
.collect();
if !filtered.is_empty() {
return Ok(DiscoveryOutcome {
peers: filtered,
source: PeerSource::NamespaceFallback,
attempts: final_attempt,
elapsed: discovery_started.elapsed(),
});
}
debug!(
%context_id,
ns_candidates = ns_candidate_count,
ctx_subscribers = ctx_subscribers.len(),
"namespace fallback found peers but none subscribe to the context topic; \
not dialing — see #2422"
);
}
}
let elapsed = discovery_started.elapsed();
warn!(
%context_id,
attempts = max_retries,
?elapsed,
"Mesh peer discovery exhausted all retries (context mesh + namespace fallback)"
);
eyre::bail!("No peers to sync with for context {}", context_id);
}
pub(crate) fn partition_peers_anchor_first(
peers: &mut [PeerId],
state_access: &dyn SyncStateAccess,
anchors: &BTreeSet<PublicKey>,
) -> usize {
if anchors.is_empty() {
return 0;
}
let anchor_flags: Vec<bool> = peers
.iter()
.map(|peer| {
state_access
.peer_identities(peer)
.map(|ids| ids.iter().any(|id| anchors.contains(id)))
.unwrap_or(false)
})
.collect();
let mut indices: Vec<usize> = (0..peers.len()).collect();
indices.sort_by_key(|&i| !anchor_flags[i]);
let anchor_count = anchor_flags.iter().filter(|&&f| f).count();
let reordered: Vec<PeerId> = indices.iter().map(|&i| peers[i]).collect();
peers.copy_from_slice(&reordered);
anchor_count
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::sync::network::mock::MockSyncNetwork;
use crate::sync::state_access_mock::MockSyncStateAccess;
fn ctx(byte: u8) -> ContextId {
ContextId::from([byte; 32])
}
fn dummy_peer(n: u8) -> PeerId {
let seed = [n; 32];
let kp = libp2p::identity::Keypair::ed25519_from_bytes(seed).expect("valid seed");
PeerId::from_public_key(&kp.public())
}
fn dummy_pk(n: u8) -> PublicKey {
PublicKey::from([n; 32])
}
#[tokio::test(start_paused = true)]
async fn discovery_returns_context_mesh_on_first_attempt() {
let mock = MockSyncNetwork::default();
let peer = dummy_peer(1);
mock.push_mesh_peers(vec![peer]);
let outcome = discover_mesh_peers_with_namespace_fallback(
&mock,
ctx(0xAA),
3,
Duration::from_millis(50),
|| None,
)
.await
.expect("ok");
assert_eq!(outcome.peers, vec![peer]);
assert_eq!(outcome.source, PeerSource::ContextMesh);
assert_eq!(outcome.attempts, 1, "must succeed on the first attempt");
}
#[tokio::test(start_paused = true)]
async fn discovery_errs_when_context_empty_and_no_namespace_fallback() {
let mock = MockSyncNetwork::default();
let result = discover_mesh_peers_with_namespace_fallback(
&mock,
ctx(0xAA),
3,
Duration::from_millis(50),
|| None,
)
.await;
let err = result.unwrap_err().to_string();
assert!(
err.contains("No peers to sync with"),
"unexpected err: {err}"
);
}
#[tokio::test(start_paused = true)]
async fn discovery_errs_when_both_meshes_empty() {
let mock = MockSyncNetwork::default();
let ns_topic = TopicHash::from_raw("ns/fake");
let result = discover_mesh_peers_with_namespace_fallback(
&mock,
ctx(0xAA),
2,
Duration::from_millis(10),
|| Some(ns_topic),
)
.await;
assert!(
result.is_err(),
"both context mesh and namespace mesh empty → Err"
);
}
#[tokio::test(start_paused = true)]
async fn discovery_filters_namespace_peers_by_context_subscription() {
let mock = MockSyncNetwork::default();
let follower = dummy_peer(1);
let opted_out = dummy_peer(2);
mock.push_mesh_peers(vec![])
.push_mesh_peers(vec![])
.push_mesh_peers(vec![follower, opted_out])
.push_mesh_peers(vec![follower]);
let ns_topic = TopicHash::from_raw("ns/fake");
let outcome = discover_mesh_peers_with_namespace_fallback(
&mock,
ctx(0xAA),
2,
Duration::from_millis(10),
|| Some(ns_topic),
)
.await
.expect("intersection should return follower");
assert_eq!(outcome.peers, vec![follower]);
assert_eq!(outcome.source, PeerSource::NamespaceFallback);
assert!(
!outcome.peers.contains(&opted_out),
"opted-out namespace member must be filtered out"
);
}
#[tokio::test(start_paused = true)]
async fn discovery_errs_when_no_namespace_peer_subscribes_to_context() {
let mock = MockSyncNetwork::default();
let opted_out_a = dummy_peer(1);
let opted_out_b = dummy_peer(2);
mock.push_mesh_peers(vec![])
.push_mesh_peers(vec![])
.push_mesh_peers(vec![opted_out_a, opted_out_b])
.push_mesh_peers(vec![]);
let ns_topic = TopicHash::from_raw("ns/fake");
let result = discover_mesh_peers_with_namespace_fallback(
&mock,
ctx(0xAA),
2,
Duration::from_millis(10),
|| Some(ns_topic),
)
.await;
assert!(
result.is_err(),
"all namespace peers opted out → no candidates → Err"
);
}
fn node_state_with_peer_identities(
entries: impl IntoIterator<Item = (PeerId, BTreeSet<PublicKey>)>,
) -> crate::state::NodeState {
let node_state = crate::state::NodeState::new(false, crate::run::NodeMode::Standard);
for (peer, ids) in entries {
let _replaced = node_state.peer_identities.insert(peer, ids);
}
node_state
}
#[test]
fn partition_empty_anchors_set_returns_zero() {
let mut peers = vec![dummy_peer(1), dummy_peer(2), dummy_peer(3)];
let node_state = node_state_with_peer_identities([]);
let anchors: BTreeSet<PublicKey> = BTreeSet::new();
let count = partition_peers_anchor_first(&mut peers, &node_state, &anchors);
assert_eq!(count, 0);
}
#[test]
fn partition_empty_cache_no_anchors_found() {
let mut peers = vec![dummy_peer(1), dummy_peer(2)];
let original = peers.clone();
let node_state = node_state_with_peer_identities([]);
let anchors: BTreeSet<PublicKey> = [dummy_pk(0xAA)].into_iter().collect();
let count = partition_peers_anchor_first(&mut peers, &node_state, &anchors);
assert_eq!(count, 0);
assert_eq!(peers, original);
}
#[test]
fn partition_all_peers_are_anchors() {
let peer1 = dummy_peer(1);
let peer2 = dummy_peer(2);
let pk_admin = dummy_pk(0xAA);
let node_state = node_state_with_peer_identities([
(peer1, [pk_admin].into_iter().collect()),
(peer2, [pk_admin].into_iter().collect()),
]);
let anchors: BTreeSet<PublicKey> = [pk_admin].into_iter().collect();
let mut peers = vec![peer1, peer2];
let count = partition_peers_anchor_first(&mut peers, &node_state, &anchors);
assert_eq!(count, 2);
assert_eq!(peers, vec![peer1, peer2]);
}
#[test]
fn partition_mixed_anchor_and_non_anchor_preserves_relative_order() {
let anchor_a = dummy_peer(1);
let anchor_b = dummy_peer(2);
let plain_a = dummy_peer(3);
let plain_b = dummy_peer(4);
let pk_admin = dummy_pk(0xAA);
let node_state = node_state_with_peer_identities([
(anchor_a, [pk_admin].into_iter().collect()),
(anchor_b, [pk_admin].into_iter().collect()),
]);
let anchors: BTreeSet<PublicKey> = [pk_admin].into_iter().collect();
let mut peers = vec![plain_a, anchor_a, plain_b, anchor_b];
let count = partition_peers_anchor_first(&mut peers, &node_state, &anchors);
assert_eq!(count, 2);
assert_eq!(peers, vec![anchor_a, anchor_b, plain_a, plain_b]);
}
#[test]
fn partition_peer_with_one_anchor_identity_among_many_qualifies() {
let peer = dummy_peer(1);
let pk_admin = dummy_pk(0xAA);
let pk_other_context = dummy_pk(0xBB);
let node_state = node_state_with_peer_identities([(
peer,
[pk_admin, pk_other_context].into_iter().collect(),
)]);
let anchors: BTreeSet<PublicKey> = [pk_admin].into_iter().collect();
let mut peers = vec![peer];
let count = partition_peers_anchor_first(&mut peers, &node_state, &anchors);
assert_eq!(count, 1);
}
#[test]
fn partition_peer_with_only_non_anchor_identities_does_not_qualify() {
let peer = dummy_peer(1);
let pk_member = dummy_pk(0xCC);
let pk_admin = dummy_pk(0xAA);
let node_state =
node_state_with_peer_identities([(peer, [pk_member].into_iter().collect())]);
let anchors: BTreeSet<PublicKey> = [pk_admin].into_iter().collect();
let mut peers = vec![peer];
let count = partition_peers_anchor_first(&mut peers, &node_state, &anchors);
assert_eq!(count, 0);
}
#[test]
fn partition_works_against_mock_sync_state_access() {
use crate::sync::state_access_mock::{MockSyncStateAccess, SyncStateAccessCall};
let anchor_peer = dummy_peer(1);
let plain_peer = dummy_peer(2);
let pk_admin = dummy_pk(0xAA);
let mock = MockSyncStateAccess::default();
mock.insert_peer_identities(anchor_peer, [pk_admin].into_iter().collect());
let anchors: BTreeSet<PublicKey> = [pk_admin].into_iter().collect();
let mut peers = vec![plain_peer, anchor_peer];
let count = partition_peers_anchor_first(&mut peers, &mock, &anchors);
assert_eq!(count, 1);
assert_eq!(peers, vec![anchor_peer, plain_peer]);
let calls = mock.calls();
assert_eq!(
calls,
vec![
SyncStateAccessCall::PeerIdentities(plain_peer),
SyncStateAccessCall::PeerIdentities(anchor_peer),
]
);
}
}