use std::collections::VecDeque;
use std::time::Duration;
use async_trait::async_trait;
use libp2p::gossipsub::TopicHash;
use libp2p::PeerId;
use parking_lot::Mutex;
use tokio::time;
use super::SyncNetwork;
pub(crate) enum OpenStreamResponse {
Err(String),
SleepThenErr(Duration, String),
}
#[derive(Default)]
pub(crate) struct MockSyncNetwork {
mesh_peers_responses: Mutex<VecDeque<Vec<PeerId>>>,
open_stream_responses: Mutex<VecDeque<OpenStreamResponse>>,
mesh_peers_calls: Mutex<u32>,
}
impl MockSyncNetwork {
pub(crate) fn push_mesh_peers(&self, peers: Vec<PeerId>) -> &Self {
self.mesh_peers_responses.lock().push_back(peers);
self
}
pub(crate) fn push_open_stream_err(&self, msg: impl Into<String>) -> &Self {
self.open_stream_responses
.lock()
.push_back(OpenStreamResponse::Err(msg.into()));
self
}
pub(crate) fn push_open_stream_hang(
&self,
sleep_for: Duration,
then_msg: impl Into<String>,
) -> &Self {
self.open_stream_responses
.lock()
.push_back(OpenStreamResponse::SleepThenErr(sleep_for, then_msg.into()));
self
}
#[track_caller]
pub(crate) fn assert_all_consumed(&self) {
let mesh_remaining = self.mesh_peers_responses.lock().len();
let mesh_calls = *self.mesh_peers_calls.lock();
let open_stream_remaining = self.open_stream_responses.lock().len();
if mesh_remaining > 1 {
panic!(
"MockSyncNetwork: {} unconsumed `mesh_peers` responses queued (sticky-last \
leaves 1 by design; >1 means the code under test made fewer calls than expected)",
mesh_remaining
);
}
if mesh_remaining > 0 && mesh_calls == 0 {
panic!(
"MockSyncNetwork: `mesh_peers` was seeded with {} entries but the code under \
test never called it",
mesh_remaining
);
}
if open_stream_remaining > 0 {
panic!(
"MockSyncNetwork: {} unconsumed `open_stream` responses queued",
open_stream_remaining
);
}
}
}
#[async_trait]
impl SyncNetwork for MockSyncNetwork {
async fn mesh_peers(&self, _topic: TopicHash) -> Vec<PeerId> {
*self.mesh_peers_calls.lock() += 1;
enum Take {
Empty,
Stick,
Pop(Vec<PeerId>),
}
let take = {
let mut queue = self.mesh_peers_responses.lock();
match queue.len() {
0 => Take::Empty,
1 => Take::Stick,
_ => Take::Pop(queue.pop_front().unwrap_or_default()),
}
};
match take {
Take::Empty => Vec::new(),
Take::Stick => {
self.mesh_peers_responses
.lock()
.front()
.cloned()
.unwrap_or_default()
}
Take::Pop(peers) => peers,
}
}
async fn open_stream(
&self,
_peer_id: PeerId,
) -> eyre::Result<calimero_network_primitives::stream::Stream> {
let response = self.open_stream_responses.lock().pop_front();
match response {
None => Err(eyre::eyre!(
"MockSyncNetwork: open_stream called with no queued response"
)),
Some(OpenStreamResponse::Err(msg)) => Err(eyre::eyre!(msg)),
Some(OpenStreamResponse::SleepThenErr(sleep_for, msg)) => {
time::sleep(sleep_for).await;
Err(eyre::eyre!(msg))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn mesh_peers_returns_queued_value_then_repeats_last() {
let mock = MockSyncNetwork::default();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
mock.push_mesh_peers(vec![peer_a])
.push_mesh_peers(vec![peer_b]);
let topic = TopicHash::from_raw("test");
assert_eq!(mock.mesh_peers(topic.clone()).await, vec![peer_a]);
assert_eq!(mock.mesh_peers(topic.clone()).await, vec![peer_b]);
assert_eq!(mock.mesh_peers(topic.clone()).await, vec![peer_b]);
assert_eq!(mock.mesh_peers(topic).await, vec![peer_b]);
}
#[tokio::test]
async fn mesh_peers_empty_when_never_seeded() {
let mock = MockSyncNetwork::default();
assert!(mock.mesh_peers(TopicHash::from_raw("x")).await.is_empty());
}
#[tokio::test]
async fn mesh_peers_sticky_last_at_len_1_boundary() {
let mock = MockSyncNetwork::default();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
mock.push_mesh_peers(vec![peer_a])
.push_mesh_peers(vec![peer_b]);
let topic = TopicHash::from_raw("test");
assert_eq!(mock.mesh_peers(topic.clone()).await, vec![peer_a]);
assert_eq!(mock.mesh_peers(topic.clone()).await, vec![peer_b]);
assert_eq!(mock.mesh_peers(topic).await, vec![peer_b]);
}
#[tokio::test]
async fn open_stream_returns_queued_errors_then_default_after_exhaustion() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_err("first")
.push_open_stream_err("second");
let peer = PeerId::random();
let e1 = mock.open_stream(peer).await.unwrap_err().to_string();
assert_eq!(e1, "first");
let e2 = mock.open_stream(peer).await.unwrap_err().to_string();
assert_eq!(e2, "second");
let e3 = mock.open_stream(peer).await.unwrap_err().to_string();
assert!(e3.contains("no queued response"), "got: {e3}");
}
#[tokio::test(start_paused = true)]
async fn open_stream_hang_sleeps_then_errors() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_hang(Duration::from_secs(5), "hung");
let peer = PeerId::random();
let start = tokio::time::Instant::now();
let err = mock.open_stream(peer).await.unwrap_err();
assert!(start.elapsed() >= Duration::from_secs(5));
assert_eq!(err.to_string(), "hung");
}
#[tokio::test(start_paused = true)]
async fn open_stream_hang_is_interruptible_by_timeout() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_hang(Duration::from_secs(30), "hung");
let peer = PeerId::random();
let outer = time::timeout(Duration::from_millis(100), mock.open_stream(peer)).await;
assert!(outer.is_err(), "expected timeout, got {outer:?}");
}
#[tokio::test]
async fn assert_all_consumed_passes_when_all_used() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_err("first")
.push_open_stream_err("second");
let peer = PeerId::random();
let _ = mock.open_stream(peer).await;
let _ = mock.open_stream(peer).await;
mock.assert_all_consumed();
}
#[tokio::test]
async fn assert_all_consumed_passes_with_sticky_last_mesh_entry() {
let mock = MockSyncNetwork::default();
let peer = PeerId::random();
mock.push_mesh_peers(vec![peer]);
let _ = mock.mesh_peers(TopicHash::from_raw("x")).await;
mock.assert_all_consumed();
}
#[tokio::test]
#[should_panic(expected = "unconsumed `open_stream` responses")]
async fn assert_all_consumed_panics_on_unused_open_stream() {
let mock = MockSyncNetwork::default();
mock.push_open_stream_err("never-popped");
mock.assert_all_consumed();
}
#[tokio::test]
#[should_panic(expected = "unconsumed `mesh_peers` responses")]
async fn assert_all_consumed_panics_on_excess_mesh_peers() {
let mock = MockSyncNetwork::default();
let p1 = PeerId::random();
let p2 = PeerId::random();
mock.push_mesh_peers(vec![p1]).push_mesh_peers(vec![p2]);
mock.assert_all_consumed();
}
#[tokio::test]
#[should_panic(expected = "never called it")]
async fn assert_all_consumed_panics_on_mesh_peers_seeded_but_never_called() {
let mock = MockSyncNetwork::default();
mock.push_mesh_peers(vec![PeerId::random()]);
mock.assert_all_consumed();
}
}