use std::collections::{HashMap, VecDeque};
use std::time::Duration;
use async_trait::async_trait;
use calimero_network_primitives::stream::Stream;
use libp2p::gossipsub::TopicHash;
use libp2p::PeerId;
use parking_lot::Mutex;
use tokio::time;
use super::SyncNetwork;
pub(crate) enum OpenStreamResponse {
Ok(Stream),
Err(String),
SleepThenErr(Duration, String),
}
fn sticky_last(queue: &mut VecDeque<Vec<PeerId>>) -> Vec<PeerId> {
match queue.len() {
0 => Vec::new(),
1 => queue.front().cloned().unwrap_or_default(),
_ => queue.pop_front().unwrap_or_default(),
}
}
#[derive(Default)]
pub(crate) struct MockSyncNetwork {
subscribed_peers_responses: Mutex<VecDeque<Vec<PeerId>>>,
subscribed_peers_by_topic: Mutex<HashMap<TopicHash, VecDeque<Vec<PeerId>>>>,
open_stream_responses: Mutex<VecDeque<OpenStreamResponse>>,
shared_queue_reads: Mutex<u32>,
subscribed_peers_reads_by_topic: Mutex<HashMap<TopicHash, u32>>,
}
impl MockSyncNetwork {
pub(crate) fn push_subscribed_peers(&self, peers: Vec<PeerId>) -> &Self {
self.subscribed_peers_responses.lock().push_back(peers);
self
}
pub(crate) fn push_subscribed_peers_for(&self, topic: TopicHash, peers: Vec<PeerId>) -> &Self {
self.subscribed_peers_by_topic
.lock()
.entry(topic)
.or_default()
.push_back(peers);
self
}
pub(crate) fn push_open_stream_ok(&self) -> &Self {
let (stream, _peer_end) = Stream::test_pair();
self.open_stream_responses
.lock()
.push_back(OpenStreamResponse::Ok(stream));
self
}
pub(crate) fn push_open_stream_err(&self, msg: impl Into<String>) -> &Self {
self.open_stream_responses
.lock()
.push_back(OpenStreamResponse::Err(msg.into()));
self
}
pub(crate) fn push_open_stream_hang(
&self,
sleep_for: Duration,
then_msg: impl Into<String>,
) -> &Self {
self.open_stream_responses
.lock()
.push_back(OpenStreamResponse::SleepThenErr(sleep_for, then_msg.into()));
self
}
#[track_caller]
pub(crate) fn assert_all_consumed(&self) {
let shared_remaining = self.subscribed_peers_responses.lock().len();
let shared_reads = *self.shared_queue_reads.lock();
let open_stream_remaining = self.open_stream_responses.lock().len();
if shared_remaining > 1 {
panic!(
"MockSyncNetwork: {shared_remaining} unconsumed `subscribed_peers` responses queued \
(sticky-last leaves 1 by design; >1 means the code under test made fewer calls \
than expected)",
);
}
if shared_remaining > 0 && shared_reads == 0 {
panic!(
"MockSyncNetwork: `subscribed_peers` shared queue was seeded with {shared_remaining} \
entries but the code under test never read it",
);
}
let by_topic = self.subscribed_peers_by_topic.lock();
let reads_by_topic = self.subscribed_peers_reads_by_topic.lock();
for (topic, queue) in by_topic.iter() {
let remaining = queue.len();
if remaining > 1 {
panic!(
"MockSyncNetwork: {remaining} unconsumed `subscribed_peers` responses queued \
for topic {topic:?} (sticky-last leaves 1 by design; >1 means the code under \
test made fewer calls than expected)",
);
}
if remaining > 0 && reads_by_topic.get(topic).copied().unwrap_or(0) == 0 {
panic!(
"MockSyncNetwork: `subscribed_peers` was seeded with {remaining} entries for \
topic {topic:?} but the code under test never queried it",
);
}
}
if open_stream_remaining > 0 {
panic!(
"MockSyncNetwork: {} unconsumed `open_stream` responses queued",
open_stream_remaining
);
}
}
}
#[async_trait]
impl SyncNetwork for MockSyncNetwork {
async fn subscribed_peers(&self, topic: TopicHash) -> Vec<PeerId> {
let per_topic = {
let mut by_topic = self.subscribed_peers_by_topic.lock();
by_topic.get_mut(&topic).map(sticky_last)
};
if let Some(peers) = per_topic {
*self
.subscribed_peers_reads_by_topic
.lock()
.entry(topic)
.or_insert(0) += 1;
return peers;
}
*self.shared_queue_reads.lock() += 1;
sticky_last(&mut self.subscribed_peers_responses.lock())
}
async fn open_stream(&self, _peer_id: PeerId) -> eyre::Result<Stream> {
let response = self.open_stream_responses.lock().pop_front();
match response {
None => Err(eyre::eyre!(
"MockSyncNetwork: open_stream called with no queued response"
)),
Some(OpenStreamResponse::Ok(stream)) => Ok(stream),
Some(OpenStreamResponse::Err(msg)) => Err(eyre::eyre!(msg)),
Some(OpenStreamResponse::SleepThenErr(sleep_for, msg)) => {
time::sleep(sleep_for).await;
Err(eyre::eyre!(msg))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn shared_queue_returns_queued_value_then_repeats_last() {
let mock = MockSyncNetwork::default();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
mock.push_subscribed_peers(vec![peer_a])
.push_subscribed_peers(vec![peer_b]);
let topic = TopicHash::from_raw("test");
assert_eq!(mock.subscribed_peers(topic.clone()).await, vec![peer_a]);
assert_eq!(mock.subscribed_peers(topic.clone()).await, vec![peer_b]);
assert_eq!(mock.subscribed_peers(topic.clone()).await, vec![peer_b]);
assert_eq!(mock.subscribed_peers(topic).await, vec![peer_b]);
}
#[tokio::test]
async fn subscribed_peers_empty_when_never_seeded() {
let mock = MockSyncNetwork::default();
assert!(mock
.subscribed_peers(TopicHash::from_raw("x"))
.await
.is_empty());
}
#[tokio::test]
async fn shared_queue_sticky_last_at_len_1_boundary() {
let mock = MockSyncNetwork::default();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
mock.push_subscribed_peers(vec![peer_a])
.push_subscribed_peers(vec![peer_b]);
let topic = TopicHash::from_raw("test");
assert_eq!(mock.subscribed_peers(topic.clone()).await, vec![peer_a]);
assert_eq!(mock.subscribed_peers(topic.clone()).await, vec![peer_b]);
assert_eq!(mock.subscribed_peers(topic).await, vec![peer_b]);
}
#[tokio::test]
async fn per_topic_queues_are_independent() {
let mock = MockSyncNetwork::default();
let ctx_peer = PeerId::random();
let ns_peer = PeerId::random();
let ctx_topic = TopicHash::from_raw("ctx");
let ns_topic = TopicHash::from_raw("ns");
mock.push_subscribed_peers_for(ctx_topic.clone(), vec![ctx_peer])
.push_subscribed_peers_for(ns_topic.clone(), vec![ns_peer]);
assert_eq!(mock.subscribed_peers(ns_topic.clone()).await, vec![ns_peer]);
assert_eq!(
mock.subscribed_peers(ctx_topic.clone()).await,
vec![ctx_peer]
);
assert_eq!(mock.subscribed_peers(ctx_topic).await, vec![ctx_peer]);
assert_eq!(mock.subscribed_peers(ns_topic).await, vec![ns_peer]);
}
#[tokio::test]
async fn per_topic_takes_precedence_then_shared_fallthrough() {
let mock = MockSyncNetwork::default();
let seeded = PeerId::random();
let shared = PeerId::random();
let seeded_topic = TopicHash::from_raw("seeded");
mock.push_subscribed_peers_for(seeded_topic.clone(), vec![seeded])
.push_subscribed_peers(vec![shared]);
assert_eq!(mock.subscribed_peers(seeded_topic).await, vec![seeded]);
assert_eq!(
mock.subscribed_peers(TopicHash::from_raw("other")).await,
vec![shared]
);
}
#[tokio::test]
async fn per_topic_sticky_last_sequence() {
let mock = MockSyncNetwork::default();
let peer = PeerId::random();
let topic = TopicHash::from_raw("ctx");
mock.push_subscribed_peers_for(topic.clone(), vec![])
.push_subscribed_peers_for(topic.clone(), vec![peer]);
assert!(mock.subscribed_peers(topic.clone()).await.is_empty());
assert_eq!(mock.subscribed_peers(topic.clone()).await, vec![peer]);
assert_eq!(mock.subscribed_peers(topic).await, vec![peer]);
}
#[tokio::test]
async fn open_stream_returns_queued_errors_then_default_after_exhaustion() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_err("first")
.push_open_stream_err("second");
let peer = PeerId::random();
let e1 = mock.open_stream(peer).await.unwrap_err().to_string();
assert_eq!(e1, "first");
let e2 = mock.open_stream(peer).await.unwrap_err().to_string();
assert_eq!(e2, "second");
let e3 = mock.open_stream(peer).await.unwrap_err().to_string();
assert!(e3.contains("no queued response"), "got: {e3}");
}
#[tokio::test(start_paused = true)]
async fn open_stream_hang_sleeps_then_errors() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_hang(Duration::from_secs(5), "hung");
let peer = PeerId::random();
let start = tokio::time::Instant::now();
let err = mock.open_stream(peer).await.unwrap_err();
assert!(start.elapsed() >= Duration::from_secs(5));
assert_eq!(err.to_string(), "hung");
}
#[tokio::test(start_paused = true)]
async fn open_stream_hang_is_interruptible_by_timeout() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_hang(Duration::from_secs(30), "hung");
let peer = PeerId::random();
let outer = time::timeout(Duration::from_millis(100), mock.open_stream(peer)).await;
assert!(outer.is_err(), "expected timeout, got {outer:?}");
}
#[tokio::test]
async fn assert_all_consumed_passes_when_all_used() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_err("first")
.push_open_stream_err("second");
let peer = PeerId::random();
let _ = mock.open_stream(peer).await;
let _ = mock.open_stream(peer).await;
mock.assert_all_consumed();
}
#[tokio::test]
async fn assert_all_consumed_passes_with_sticky_last_shared_entry() {
let mock = MockSyncNetwork::default();
let peer = PeerId::random();
mock.push_subscribed_peers(vec![peer]);
let _ = mock.subscribed_peers(TopicHash::from_raw("x")).await;
mock.assert_all_consumed();
}
#[tokio::test]
#[should_panic(expected = "unconsumed `open_stream` responses")]
async fn assert_all_consumed_panics_on_unused_open_stream() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_err("never-popped");
mock.assert_all_consumed();
}
#[tokio::test]
#[should_panic(expected = "unconsumed `subscribed_peers` responses")]
async fn assert_all_consumed_panics_on_excess_shared_responses() {
let mock = MockSyncNetwork::default();
let p1 = PeerId::random();
let p2 = PeerId::random();
mock.push_subscribed_peers(vec![p1])
.push_subscribed_peers(vec![p2]);
mock.assert_all_consumed();
}
#[tokio::test]
#[should_panic(expected = "never read it")]
async fn assert_all_consumed_panics_on_shared_seeded_but_never_read() {
let mock = MockSyncNetwork::default();
mock.push_subscribed_peers(vec![PeerId::random()]);
mock.assert_all_consumed();
}
#[tokio::test]
#[should_panic(expected = "shared queue was seeded")]
async fn assert_all_consumed_detects_shared_leak_despite_per_topic_read() {
let mock = MockSyncNetwork::default();
let topic = TopicHash::from_raw("ctx");
mock.push_subscribed_peers_for(topic.clone(), vec![PeerId::random()])
.push_subscribed_peers(vec![PeerId::random()]);
let _ = mock.subscribed_peers(topic).await;
mock.assert_all_consumed();
}
#[tokio::test]
#[should_panic(expected = "fewer calls than expected")]
async fn assert_all_consumed_panics_on_excess_per_topic_responses() {
let mock = MockSyncNetwork::default();
let topic = TopicHash::from_raw("ctx");
mock.push_subscribed_peers_for(topic.clone(), vec![PeerId::random()])
.push_subscribed_peers_for(topic, vec![PeerId::random()]);
mock.assert_all_consumed();
}
#[tokio::test]
#[should_panic(expected = "never queried it")]
async fn assert_all_consumed_panics_on_per_topic_seeded_but_never_queried() {
let mock = MockSyncNetwork::default();
mock.push_subscribed_peers_for(TopicHash::from_raw("never"), vec![PeerId::random()]);
let _ = mock.subscribed_peers(TopicHash::from_raw("other")).await;
mock.assert_all_consumed();
}
#[tokio::test]
async fn assert_all_consumed_passes_with_sticky_last_per_topic_entry() {
let mock = MockSyncNetwork::default();
let topic = TopicHash::from_raw("ctx");
mock.push_subscribed_peers_for(topic.clone(), vec![PeerId::random()]);
let _ = mock.subscribed_peers(topic).await;
mock.assert_all_consumed();
}
}