use std::collections::HashSet;
use calimero_network_primitives::stream::Stream;
use libp2p::gossipsub::TopicHash;
use libp2p::PeerId;
use rand::seq::SliceRandom;
use tokio::time;
use tracing::debug;
use crate::sync::network::SyncNetwork;
const DEADLINE_MAX_PEERS_PER_ROUND: u32 = 4;
pub(super) async fn open_namespace_join_stream(
sync_network: &dyn SyncNetwork,
namespace_id: [u8; 32],
open_timeout: std::time::Duration,
mesh_retries: u32,
mesh_retry_delay: std::time::Duration,
excluded_peers: &HashSet<PeerId>,
) -> eyre::Result<(Stream, PeerId)> {
assert!(
mesh_retries > 0,
"mesh_retries must be > 0; got {mesh_retries}"
);
let topic = TopicHash::from_raw(format!("ns/{}", hex::encode(namespace_id)));
let connect_deadline = mesh_retry_delay
.saturating_add(open_timeout.saturating_mul(DEADLINE_MAX_PEERS_PER_ROUND))
.saturating_mul(mesh_retries);
let connect_started = tokio::time::Instant::now();
let mut result: Option<(Stream, PeerId)> = None;
'connect: for attempt in 1..=mesh_retries {
if connect_started.elapsed() >= connect_deadline {
debug!(
namespace_id = %hex::encode(namespace_id),
attempt,
elapsed_ms = connect_started.elapsed().as_millis() as u64,
"namespace-join connect-loop deadline exceeded, giving up"
);
break;
}
let mut peers = sync_network.mesh_peers(topic.clone()).await;
if !excluded_peers.is_empty() {
peers.retain(|p| !excluded_peers.contains(p));
}
peers.shuffle(&mut rand::thread_rng());
for peer in &peers {
if connect_started.elapsed() >= connect_deadline {
break 'connect;
}
match time::timeout(open_timeout, sync_network.open_stream(*peer)).await {
Ok(Ok(opened)) => {
result = Some((opened, *peer));
break 'connect;
}
Ok(Err(err)) => {
debug!(
namespace_id = %hex::encode(namespace_id),
%peer,
attempt,
%err,
"Failed to open namespace-join stream, trying next peer..."
);
}
Err(_) => {
debug!(
namespace_id = %hex::encode(namespace_id),
%peer,
attempt,
"Timed out opening namespace-join stream, trying next peer..."
);
}
}
}
if attempt < mesh_retries
&& connect_started.elapsed().saturating_add(mesh_retry_delay) < connect_deadline
{
debug!(
namespace_id = %hex::encode(namespace_id),
attempt,
peer_count = peers.len(),
"No reachable namespace mesh peer yet, retrying..."
);
time::sleep(mesh_retry_delay).await;
}
}
let elapsed = connect_started.elapsed();
result.ok_or_else(|| {
eyre::eyre!(
"could not open a namespace-join stream to any mesh peer for namespace {} \
(deadline {}ms, elapsed {}ms, excluded {})",
hex::encode(namespace_id),
connect_deadline.as_millis(),
elapsed.as_millis(),
excluded_peers.len()
)
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use libp2p::PeerId;
use super::*;
use crate::sync::network::mock::MockSyncNetwork;
const NAMESPACE_ID: [u8; 32] = [0xA1; 32];
fn defaults() -> (Duration, u32, Duration) {
(Duration::from_millis(100), 3, Duration::from_millis(50))
}
fn no_excluded() -> HashSet<PeerId> {
HashSet::new()
}
#[tokio::test(start_paused = true)]
async fn all_peers_fail_every_round_returns_err() {
let mock = MockSyncNetwork::default();
let p1 = PeerId::random();
let p2 = PeerId::random();
mock.push_mesh_peers(vec![p1, p2]);
let (open_timeout, retries, retry_delay) = defaults();
let expected_open_calls = (retries as usize) * 2;
for i in 0..expected_open_calls {
mock.push_open_stream_err(format!("err-{i}"));
}
let result = open_namespace_join_stream(
&mock,
NAMESPACE_ID,
open_timeout,
retries,
retry_delay,
&no_excluded(),
)
.await;
let err = result.unwrap_err().to_string();
assert!(
err.contains("could not open a namespace-join stream"),
"unexpected err: {err}"
);
assert!(
err.contains("deadline"),
"err should report deadline: {err}"
);
assert!(err.contains("elapsed"), "err should report elapsed: {err}");
mock.assert_all_consumed();
}
#[tokio::test(start_paused = true)]
async fn hanging_peers_are_interrupted_by_per_peer_timeout() {
let mock = MockSyncNetwork::default();
mock.push_mesh_peers(vec![PeerId::random(), PeerId::random()]);
for i in 0..20 {
mock.push_open_stream_hang(Duration::from_secs(10), format!("hang-{i}"));
}
let (open_timeout, retries, retry_delay) = defaults();
let connect_deadline = retry_delay
.saturating_add(open_timeout.saturating_mul(DEADLINE_MAX_PEERS_PER_ROUND))
.saturating_mul(retries);
let start = time::Instant::now();
let result = open_namespace_join_stream(
&mock,
NAMESPACE_ID,
open_timeout,
retries,
retry_delay,
&no_excluded(),
)
.await;
let elapsed = start.elapsed();
assert!(result.is_err(), "expected Err from hanging peers, got Ok");
let upper_bound = connect_deadline.saturating_add(open_timeout);
assert!(
elapsed <= upper_bound,
"loop took {elapsed:?}, expected ≤ {upper_bound:?} (deadline {connect_deadline:?} \
+ one open_timeout slot)"
);
}
#[tokio::test(start_paused = true)]
async fn empty_mesh_every_round_returns_err() {
let mock = MockSyncNetwork::default();
let (open_timeout, retries, retry_delay) = defaults();
let result = open_namespace_join_stream(
&mock,
NAMESPACE_ID,
open_timeout,
retries,
retry_delay,
&no_excluded(),
)
.await;
assert!(
result.is_err(),
"expected Err when mesh stays empty across all retries"
);
}
#[tokio::test(start_paused = true)]
async fn outer_deadline_fires_inside_peer_loop_on_large_mesh() {
let mock = MockSyncNetwork::default();
let many_peers: Vec<PeerId> = (0..10).map(|_| PeerId::random()).collect();
mock.push_mesh_peers(many_peers);
for i in 0..50 {
mock.push_open_stream_hang(Duration::from_secs(60), format!("h-{i}"));
}
let open_timeout = Duration::from_millis(200);
let mesh_retries: u32 = 3;
let mesh_retry_delay = Duration::from_millis(10);
let start = time::Instant::now();
let result = open_namespace_join_stream(
&mock,
NAMESPACE_ID,
open_timeout,
mesh_retries,
mesh_retry_delay,
&no_excluded(),
)
.await;
let elapsed = start.elapsed();
assert!(result.is_err());
assert!(
elapsed < Duration::from_secs(3),
"loop took {elapsed:?}, expected outer deadline + per-peer guard to bound this"
);
}
#[tokio::test(start_paused = true)]
async fn accepts_arc_dyn_sync_network() {
let mock: Arc<dyn SyncNetwork> = Arc::new(MockSyncNetwork::default());
let (open_timeout, retries, retry_delay) = defaults();
let result = open_namespace_join_stream(
&*mock,
NAMESPACE_ID,
open_timeout,
retries,
retry_delay,
&no_excluded(),
)
.await;
assert!(result.is_err());
}
#[tokio::test(start_paused = true)]
async fn all_peers_excluded_returns_err_without_open_attempts() {
let mock = MockSyncNetwork::default();
let p1 = PeerId::random();
let p2 = PeerId::random();
mock.push_mesh_peers(vec![p1, p2]);
let mut excluded = HashSet::new();
excluded.insert(p1);
excluded.insert(p2);
let (open_timeout, retries, retry_delay) = defaults();
let result = open_namespace_join_stream(
&mock,
NAMESPACE_ID,
open_timeout,
retries,
retry_delay,
&excluded,
)
.await;
let err = result.unwrap_err().to_string();
assert!(
err.contains("could not open a namespace-join stream"),
"unexpected err: {err}"
);
assert!(
err.contains("excluded 2"),
"err should report the excluded-peer count: {err}"
);
mock.assert_all_consumed();
}
#[tokio::test(start_paused = true)]
async fn excluded_peer_skipped_other_mesh_peer_still_attempted() {
let mock = MockSyncNetwork::default();
let kept = PeerId::random();
let blocked = PeerId::random();
mock.push_mesh_peers(vec![kept, blocked]);
let mut excluded = HashSet::new();
excluded.insert(blocked);
let (open_timeout, retries, retry_delay) = defaults();
for i in 0..(retries as usize) {
mock.push_open_stream_err(format!("kept-err-{i}"));
}
let result = open_namespace_join_stream(
&mock,
NAMESPACE_ID,
open_timeout,
retries,
retry_delay,
&excluded,
)
.await;
let err = result.unwrap_err().to_string();
assert!(
err.contains("excluded 1"),
"err should report 1 excluded peer for diagnostic symmetry: {err}"
);
mock.assert_all_consumed();
}
}