use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use bytes::Bytes;
use kitsune2_api::{AgentId, Id, LocalAgent, MessageBlockCount, SpaceId};
use kitsune2_api::{
AgentInfoSigned, BlockTarget, BoxFut, Builder, DynSpace, DynTransport,
K2Result, SpaceHandler, TxBaseHandler, TxHandler, TxModuleHandler,
TxSpaceHandler, Url,
};
use kitsune2_core::factories::CoreGossipStubFactory;
use kitsune2_core::{Ed25519LocalAgent, Ed25519Verifier};
use kitsune2_test_utils::agent::AgentBuilder;
use kitsune2_test_utils::iter_check;
use kitsune2_test_utils::noop_bootstrap::NoopBootstrapFactory;
use kitsune2_test_utils::{enable_tracing, space::TEST_SPACE_ID};
#[cfg(feature = "transport-iroh")]
use kitsune2_transport_iroh::{
IrohTransportFactory,
config::{IrohTransportConfig, IrohTransportModConfig},
};
use std::sync::mpsc::{Receiver, Sender};
use tokio::sync::OnceCell;
#[derive(Debug)]
struct TestSpaceHandler {
recv_notify_sender: Sender<bytes::Bytes>,
}
impl TestSpaceHandler {
fn create() -> (Self, Receiver<bytes::Bytes>) {
let (send, recv) = std::sync::mpsc::channel();
(
Self {
recv_notify_sender: send,
},
recv,
)
}
}
impl SpaceHandler for TestSpaceHandler {
fn recv_notify(
&self,
_from_peer: Url,
_space_id: kitsune2_api::SpaceId,
data: bytes::Bytes,
) -> K2Result<()> {
self.recv_notify_sender.send(data).unwrap();
Ok(())
}
}
#[derive(Debug)]
pub struct TestTxHandler {
pub peer_url: std::sync::Mutex<Url>,
space: Arc<OnceCell<DynSpace>>,
recv_module_msg_sender: Sender<bytes::Bytes>,
peer_disconnect_sender: Sender<Url>,
}
impl TestTxHandler {
fn create(
peer_url: Mutex<Url>,
space: Arc<OnceCell<DynSpace>>,
) -> (Arc<Self>, Receiver<bytes::Bytes>, Receiver<Url>) {
let (recv_module_msg_sender, recv_module_msg_recv) =
std::sync::mpsc::channel();
let (peer_disconnect_sender, peer_disconnect_recv) =
std::sync::mpsc::channel();
(
Arc::new(Self {
peer_url,
space,
recv_module_msg_sender,
peer_disconnect_sender,
}),
recv_module_msg_recv,
peer_disconnect_recv,
)
}
}
impl TxModuleHandler for TestTxHandler {
fn recv_module_msg(
&self,
_peer: Url,
_space_id: kitsune2_api::SpaceId,
_module: String,
data: bytes::Bytes,
) -> K2Result<()> {
self.recv_module_msg_sender.send(data).unwrap();
Ok(())
}
}
impl TxBaseHandler for TestTxHandler {
fn new_listening_address(&self, this_url: Url) -> BoxFut<'static, ()> {
*(self.peer_url.lock().unwrap()) = this_url;
Box::pin(async {})
}
fn peer_disconnect(&self, peer: Url, _reason: Option<String>) {
if self.peer_disconnect_sender.send(peer).is_err() {
tracing::error!(
"Failed to send peer disconnect. This is okay if it happens at the end of a test if the receiver has been dropped before the connection got dropped."
);
};
}
}
impl TxHandler for TestTxHandler {
fn preflight_gather_outgoing(
&self,
_peer_url: Url,
) -> BoxFut<'_, K2Result<bytes::Bytes>> {
let space = self
.space
.get()
.expect("Space OnceCell has not been initialized in time.")
.clone();
Box::pin(async move {
let agents = space.peer_store().get_all().await.unwrap();
let agents_encoded: Vec<String> =
agents.into_iter().map(|a| a.encode().unwrap()).collect();
Ok(serde_json::to_vec(&agents_encoded).unwrap().into())
})
}
fn preflight_validate_incoming(
&self,
_peer_url: Url,
data: bytes::Bytes,
) -> BoxFut<'_, K2Result<()>> {
let agents_encoded: Vec<String> =
serde_json::from_slice(&data).unwrap();
let agents: Vec<Arc<AgentInfoSigned>> = agents_encoded
.iter()
.map(|a| {
AgentInfoSigned::decode(&Ed25519Verifier, a.as_bytes()).unwrap()
})
.collect();
let space = self
.space
.get()
.expect("Space OnceCell has not been initialized in time.")
.clone();
Box::pin(async move {
space.peer_store().insert(agents).await?;
Ok(())
})
}
}
#[cfg(feature = "transport-iroh")]
async fn builder_with_iroh() -> (
Arc<Builder>,
kitsune2_test_utils::bootstrap::TestBootstrapSrv,
) {
let bootstrap_server =
kitsune2_test_utils::bootstrap::TestBootstrapSrv::new(false).await;
let relay_url = format!("{}/relay", bootstrap_server.addr());
let builder = Builder {
transport: IrohTransportFactory::create(),
gossip: CoreGossipStubFactory::create(),
bootstrap: Arc::new(NoopBootstrapFactory {}),
..kitsune2_core::default_test_builder()
}
.with_default_config()
.unwrap();
builder
.config
.set_module_config(&IrohTransportModConfig {
iroh_transport: IrohTransportConfig {
relay_url: Some(relay_url),
..Default::default()
},
})
.unwrap();
(Arc::new(builder), bootstrap_server)
}
macro_rules! builder_with_relay {
() => {{
#[cfg(feature = "transport-iroh")]
{
builder_with_iroh().await
}
}};
}
pub struct TestPeer {
space: DynSpace,
transport: DynTransport,
peer_url: Url,
agent_id: AgentId,
agent_info: Arc<AgentInfoSigned>,
recv_notify_recv: Receiver<bytes::Bytes>,
recv_module_msg_recv: Receiver<bytes::Bytes>,
peer_disconnect_recv: Receiver<Url>,
}
pub async fn make_test_peer(builder: Arc<Builder>) -> TestPeer {
let space_once_cell = Arc::new(OnceCell::new());
let (tx_handler, recv_module_msg_recv, peer_disconnect_recv) =
TestTxHandler::create(
Mutex::new(
Url::from_str("ws://127.0.0.1:80").unwrap(),
),
space_once_cell.clone(),
);
let transport = builder
.transport
.create(builder.clone(), tx_handler.clone())
.await
.unwrap();
transport.register_module_handler(TEST_SPACE_ID, "test".into(), tx_handler);
let peer_url = iter_check!(5000, 100, {
let stats = transport.dump_network_stats().await.unwrap();
let peer_url = stats.transport_stats.peer_urls.first();
if let Some(url) = peer_url {
return url.clone();
}
});
let (space_handler, recv_notify_recv) = TestSpaceHandler::create();
let report = builder
.report
.create(builder.clone(), transport.clone())
.await
.unwrap();
let space = builder
.space
.create(
builder.clone(),
None,
Arc::new(space_handler),
TEST_SPACE_ID,
report,
transport.clone(),
)
.await
.unwrap();
let agent_info =
join_new_local_agent_and_wait_for_agent_info(space.clone()).await;
space_once_cell.set(space.clone()).unwrap();
TestPeer {
space,
transport,
agent_id: agent_info.agent.clone(),
agent_info,
peer_url,
recv_notify_recv,
recv_module_msg_recv,
peer_disconnect_recv,
}
}
const TEST_SPACE_ID_1: SpaceId =
SpaceId(Id(Bytes::from_static(b"test_space_1")));
const TEST_SPACE_ID_2: SpaceId =
SpaceId(Id(Bytes::from_static(b"test_space_2")));
pub struct TestPeerLight {
space1: DynSpace,
space2: DynSpace,
dummy_agent_info_1: Arc<AgentInfoSigned>,
dummy_agent_info_2: Arc<AgentInfoSigned>,
transport: DynTransport,
peer_url: Url,
}
pub async fn make_test_peer_light(builder: Arc<Builder>) -> TestPeerLight {
#[derive(Debug)]
struct NoopHandler;
impl TxHandler for NoopHandler {}
impl TxBaseHandler for NoopHandler {}
impl SpaceHandler for NoopHandler {}
impl TxSpaceHandler for NoopHandler {
fn is_any_agent_at_url_blocked(
&self,
_peer_url: &Url,
) -> K2Result<bool> {
Ok(false)
}
}
let transport = builder
.transport
.create(builder.clone(), Arc::new(NoopHandler))
.await
.unwrap();
let peer_url = iter_check!(5000, {
let stats = transport.dump_network_stats().await.unwrap();
let peer_url = stats.transport_stats.peer_urls.first();
if let Some(url) = peer_url {
return url.clone();
}
});
let report = builder
.report
.create(builder.clone(), transport.clone())
.await
.unwrap();
let space1 = builder
.space
.create(
builder.clone(),
None,
Arc::new(NoopHandler),
TEST_SPACE_ID_1,
report.clone(),
transport.clone(),
)
.await
.unwrap();
let space2 = builder
.space
.create(
builder.clone(),
None,
Arc::new(NoopHandler),
TEST_SPACE_ID_2,
report,
transport.clone(),
)
.await
.unwrap();
let local_agent_1 = Ed25519LocalAgent::default();
let dummy_agent_info_1 = AgentBuilder::default()
.with_space(TEST_SPACE_ID_1)
.with_url(Some(peer_url.clone()))
.build(local_agent_1);
let local_agent_2 = Ed25519LocalAgent::default();
let dummy_agent_info_2 = AgentBuilder::default()
.with_space(TEST_SPACE_ID_2)
.with_url(Some(peer_url.clone()))
.build(local_agent_2);
TestPeerLight {
space1,
space2,
dummy_agent_info_1,
dummy_agent_info_2,
transport,
peer_url,
}
}
async fn block_agent_in_space(agent: AgentId, space: DynSpace) {
let block_targets = vec![BlockTarget::Agent(agent.clone())];
let any_blocked = space
.blocks()
.is_any_blocked(block_targets.clone())
.await
.unwrap();
assert!(!any_blocked);
space
.blocks()
.block(BlockTarget::Agent(agent.clone()))
.await
.unwrap();
space.peer_store().remove(agent.clone()).await.unwrap();
let any_blocked =
space.blocks().is_any_blocked(block_targets).await.unwrap();
assert!(any_blocked);
}
async fn join_new_local_agent_and_wait_for_agent_info(
space: DynSpace,
) -> Arc<AgentInfoSigned> {
let local_agent = Arc::new(Ed25519LocalAgent::default());
space.local_agent_join(local_agent.clone()).await.unwrap();
let agent_id = local_agent.agent().clone();
let peer_store = space.peer_store().clone();
iter_check!(5000, 5, {
let agents = peer_store.get_all().await.unwrap();
if agents.iter().any(|a| a.agent == agent_id) {
break;
}
});
space
.peer_store()
.get(local_agent.agent().clone())
.await
.unwrap()
.unwrap()
}
#[tokio::test(flavor = "multi_thread")]
async fn incoming_message_block_count_increases_correctly() {
enable_tracing();
let (builder, _relay_server) = builder_with_relay!();
let TestPeerLight {
space1: space_alice_1,
space2: space_alice_2,
transport: transport_alice,
peer_url: peer_url_alice,
..
} = make_test_peer_light(builder.clone()).await;
let TestPeerLight {
space1: space_bob_1,
space2: _space_bob_2, transport: transport_bob,
peer_url: peer_url_bob,
..
} = make_test_peer_light(builder.clone()).await;
space_bob_1
.local_agent_store()
.add(Arc::new(Ed25519LocalAgent::default()))
.await
.unwrap();
let TestPeerLight {
space1: space_carol_1,
space2: space_carol_2,
dummy_agent_info_1: dummy_agent_info_carol_1,
dummy_agent_info_2: dummy_agent_info_carol_2,
transport: transport_carol,
peer_url: peer_url_carol,
} = make_test_peer_light(builder.clone()).await;
space_carol_1
.local_agent_store()
.add(Arc::new(Ed25519LocalAgent::default()))
.await
.unwrap();
space_carol_2
.local_agent_store()
.add(Arc::new(Ed25519LocalAgent::default()))
.await
.unwrap();
space_alice_1
.local_agent_store()
.add(Arc::new(Ed25519LocalAgent::default()))
.await
.unwrap();
space_alice_2
.local_agent_store()
.add(Arc::new(Ed25519LocalAgent::default()))
.await
.unwrap();
space_alice_1
.peer_store()
.insert(vec![dummy_agent_info_carol_1.clone()])
.await
.unwrap();
space_alice_2
.peer_store()
.insert(vec![dummy_agent_info_carol_2.clone()])
.await
.unwrap();
let stats = transport_carol.dump_network_stats().await.unwrap();
assert_eq!(stats.blocked_message_counts.len(), 0);
transport_alice
.send_space_notify(
peer_url_carol.clone(),
TEST_SPACE_ID_1,
Bytes::new(),
)
.await
.unwrap();
let expected_blocked_message_counts: HashMap<
Url,
HashMap<SpaceId, MessageBlockCount>,
> = [(
peer_url_alice.clone(),
[(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 1,
outgoing: 0,
},
)]
.into(),
)]
.into();
iter_check!(500, {
let net_stats = transport_carol.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts == expected_blocked_message_counts {
break;
}
});
transport_alice
.send_space_notify(
peer_url_carol.clone(),
TEST_SPACE_ID_1,
Bytes::new(),
)
.await
.unwrap();
let expected_blocked_message_counts: HashMap<
Url,
HashMap<SpaceId, MessageBlockCount>,
> = [(
peer_url_alice.clone(),
[(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 2,
outgoing: 0,
},
)]
.into(),
)]
.into();
iter_check!(500, {
let net_stats = transport_carol.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts == expected_blocked_message_counts {
break;
}
});
transport_alice
.send_space_notify(
peer_url_carol.clone(),
TEST_SPACE_ID_2,
Bytes::new(),
)
.await
.unwrap();
let expected_blocked_message_counts: HashMap<
Url,
HashMap<SpaceId, MessageBlockCount>,
> = [(
peer_url_alice.clone(),
[
(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 2,
outgoing: 0,
},
),
(
TEST_SPACE_ID_2,
MessageBlockCount {
incoming: 1,
outgoing: 0,
},
),
]
.into(),
)]
.into();
iter_check!(500, {
let net_stats = transport_carol.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts == expected_blocked_message_counts {
break;
}
});
space_bob_1
.peer_store()
.insert(vec![dummy_agent_info_carol_1.clone()])
.await
.unwrap();
transport_bob
.send_space_notify(
peer_url_carol.clone(),
TEST_SPACE_ID_1,
Bytes::new(),
)
.await
.unwrap();
let expected_blocked_message_counts: HashMap<
Url,
HashMap<SpaceId, MessageBlockCount>,
> = [
(
peer_url_alice.clone(),
[
(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 2,
outgoing: 0,
},
),
(
TEST_SPACE_ID_2,
MessageBlockCount {
incoming: 1,
outgoing: 0,
},
),
]
.into(),
),
(
peer_url_bob,
[(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 1,
outgoing: 0,
},
)]
.into(),
),
]
.into();
iter_check!(500, {
let net_stats = transport_carol.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts == expected_blocked_message_counts {
break;
}
});
}
#[tokio::test(flavor = "multi_thread")]
async fn outgoing_message_block_count_increases_correctly() {
enable_tracing();
let (builder, _relay_server) = builder_with_relay!();
let TestPeerLight {
space1: space_alice_1, space2: space_alice_2, transport: transport_alice,
..
} = make_test_peer_light(builder.clone()).await;
let TestPeerLight {
space1: _space_bob_1, space2: _space_bob_2, peer_url: peer_url_bob,
..
} = make_test_peer_light(builder.clone()).await;
let TestPeerLight {
space1: _space_carol_1, space2: _space_carol_2, peer_url: peer_url_carol,
..
} = make_test_peer_light(builder.clone()).await;
space_alice_1
.local_agent_store()
.add(Arc::new(Ed25519LocalAgent::default()))
.await
.unwrap();
space_alice_2
.local_agent_store()
.add(Arc::new(Ed25519LocalAgent::default()))
.await
.unwrap();
let stats = transport_alice.dump_network_stats().await.unwrap();
assert_eq!(stats.blocked_message_counts.len(), 0);
transport_alice
.send_space_notify(
peer_url_carol.clone(),
TEST_SPACE_ID_1,
Bytes::new(),
)
.await
.unwrap();
let expected_blocked_message_counts: HashMap<
Url,
HashMap<SpaceId, MessageBlockCount>,
> = [(
peer_url_carol.clone(),
[(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 0,
outgoing: 1,
},
)]
.into(),
)]
.into();
iter_check!(500, {
let net_stats = transport_alice.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts == expected_blocked_message_counts {
break;
}
});
transport_alice
.send_space_notify(
peer_url_carol.clone(),
TEST_SPACE_ID_1,
Bytes::new(),
)
.await
.unwrap();
let expected_blocked_message_counts: HashMap<
Url,
HashMap<SpaceId, MessageBlockCount>,
> = [(
peer_url_carol.clone(),
[(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 0,
outgoing: 2,
},
)]
.into(),
)]
.into();
iter_check!(500, {
let net_stats = transport_alice.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts == expected_blocked_message_counts {
break;
}
});
transport_alice
.send_space_notify(
peer_url_carol.clone(),
TEST_SPACE_ID_2,
Bytes::new(),
)
.await
.unwrap();
let expected_blocked_message_counts: HashMap<
Url,
HashMap<SpaceId, MessageBlockCount>,
> = [(
peer_url_carol.clone(),
[
(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 0,
outgoing: 2,
},
),
(
TEST_SPACE_ID_2,
MessageBlockCount {
incoming: 0,
outgoing: 1,
},
),
]
.into(),
)]
.into();
iter_check!(500, {
let net_stats = transport_alice.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts == expected_blocked_message_counts {
break;
}
});
transport_alice
.send_space_notify(peer_url_bob.clone(), TEST_SPACE_ID_1, Bytes::new())
.await
.unwrap();
let expected_blocked_message_counts: HashMap<
Url,
HashMap<SpaceId, MessageBlockCount>,
> = [
(
peer_url_carol.clone(),
[
(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 0,
outgoing: 2,
},
),
(
TEST_SPACE_ID_2,
MessageBlockCount {
incoming: 0,
outgoing: 1,
},
),
]
.into(),
),
(
peer_url_bob,
[(
TEST_SPACE_ID_1,
MessageBlockCount {
incoming: 0,
outgoing: 1,
},
)]
.into(),
),
]
.into();
iter_check!(500, {
let net_stats = transport_alice.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts == expected_blocked_message_counts {
break;
}
});
}
#[tokio::test(flavor = "multi_thread")]
async fn incoming_notify_messages_from_blocked_peers_are_dropped() {
enable_tracing();
let (builder, _relay_server) = builder_with_relay!();
let TestPeer {
space: space_alice,
transport: transport_alice,
peer_url: peer_url_alice,
agent_id: agent_id_alice,
peer_disconnect_recv: peer_disconnect_recv_alice,
..
} = make_test_peer(builder.clone()).await;
let TestPeer {
space: space_bob,
transport: transport_bob,
peer_url: peer_url_bob,
agent_info: agent_info_bob,
recv_notify_recv: recv_notify_recv_bob,
peer_disconnect_recv: _peer_disconnect_recv_bob,
..
} = make_test_peer(builder).await;
space_alice
.peer_store()
.insert(vec![agent_info_bob])
.await
.unwrap();
let payload = Bytes::from("Hello world");
transport_alice
.send_space_notify(peer_url_bob.clone(), TEST_SPACE_ID, payload.clone())
.await
.unwrap();
let payload_received = recv_notify_recv_bob
.recv_timeout(std::time::Duration::from_secs(2))
.expect("timed out waiting for space message");
assert_eq!(payload, payload_received);
let net_stats_bob = transport_bob.dump_network_stats().await.unwrap();
assert_eq!(net_stats_bob.transport_stats.connections.len(), 1);
assert_eq!(net_stats_bob.blocked_message_counts.len(), 0);
let agents_in_peer_store = space_bob.peer_store().get_all().await.unwrap();
assert_eq!(agents_in_peer_store.len(), 2);
block_agent_in_space(agent_id_alice, space_bob.clone()).await;
transport_alice
.send_space_notify(
peer_url_bob.clone(),
TEST_SPACE_ID,
Bytes::from("Sending to blocker"),
)
.await
.unwrap();
iter_check!(500, {
let net_stats = transport_bob.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts.len() == 1 {
break;
}
});
transport_bob.disconnect(peer_url_alice.clone(), None).await;
iter_check!(500, {
if let Ok(peer_url) = peer_disconnect_recv_alice.try_recv()
&& peer_url == peer_url_bob
{
break;
}
});
assert!(
recv_notify_recv_bob
.recv_timeout(std::time::Duration::from_millis(50))
.is_err()
);
let alice_local_agent_2 = Arc::new(Ed25519LocalAgent::default());
space_alice
.local_agent_join(alice_local_agent_2.clone())
.await
.unwrap();
transport_alice
.send_space_notify(
peer_url_bob.clone(),
TEST_SPACE_ID,
Bytes::from("Should still be blocked"),
)
.await
.unwrap();
iter_check!(500, {
let net_stats = transport_bob.dump_network_stats().await.unwrap();
if let Some(space_blocks) =
net_stats.blocked_message_counts.get(&peer_url_alice)
&& let Some(c) = space_blocks.get(&TEST_SPACE_ID)
&& c.incoming >= 2
{
break;
}
});
assert!(
recv_notify_recv_bob
.recv_timeout(std::time::Duration::from_millis(200))
.is_err()
);
}
#[tokio::test(flavor = "multi_thread")]
async fn incoming_module_messages_from_blocked_peers_are_dropped() {
enable_tracing();
let (builder, _relay_server) = builder_with_relay!();
let TestPeer {
space: space_alice,
transport: transport_alice,
peer_url: peer_url_alice,
agent_id: agent_id_alice,
peer_disconnect_recv: peer_disconnect_recv_alice,
..
} = make_test_peer(builder.clone()).await;
let TestPeer {
space: space_bob,
transport: transport_bob,
peer_url: peer_url_bob,
agent_info: agent_info_bob,
recv_module_msg_recv: recv_module_msg_recv_bob,
peer_disconnect_recv: _peer_disconnect_recv_bob,
..
} = make_test_peer(builder).await;
space_alice
.peer_store()
.insert(vec![agent_info_bob])
.await
.unwrap();
let payload_module = Bytes::from("Hello module world");
transport_alice
.send_module(
peer_url_bob.clone(),
TEST_SPACE_ID,
"test".into(),
payload_module.clone(),
)
.await
.unwrap();
let payload_module_received = recv_module_msg_recv_bob
.recv_timeout(Duration::from_secs(2))
.expect("timed out waiting for module message");
assert_eq!(payload_module, payload_module_received);
let net_stats_bob = transport_bob.dump_network_stats().await.unwrap();
assert_eq!(net_stats_bob.transport_stats.connections.len(), 1);
assert_eq!(net_stats_bob.blocked_message_counts.len(), 0);
let agents_in_peer_store = space_bob.peer_store().get_all().await.unwrap();
assert_eq!(agents_in_peer_store.len(), 2);
block_agent_in_space(agent_id_alice, space_bob.clone()).await;
transport_alice
.send_module(
peer_url_bob.clone(),
TEST_SPACE_ID,
"test".into(),
Bytes::from("Sending to blocker"),
)
.await
.unwrap();
iter_check!(500, {
let net_stats = transport_bob.dump_network_stats().await.unwrap();
if net_stats.blocked_message_counts.len() == 1 {
break;
}
});
transport_bob.disconnect(peer_url_alice.clone(), None).await;
iter_check!(500, {
if let Ok(peer_url) = peer_disconnect_recv_alice.try_recv()
&& peer_url == peer_url_bob
{
break;
}
});
assert!(
recv_module_msg_recv_bob
.recv_timeout(std::time::Duration::from_millis(50))
.is_err()
);
let alice_local_agent_2 = Arc::new(Ed25519LocalAgent::default());
space_alice
.local_agent_join(alice_local_agent_2.clone())
.await
.unwrap();
transport_alice
.send_module(
peer_url_bob.clone(),
TEST_SPACE_ID,
"test".into(),
Bytes::from("Should still be blocked"),
)
.await
.unwrap();
iter_check!(500, {
let net_stats = transport_bob.dump_network_stats().await.unwrap();
if let Some(space_blocks) =
net_stats.blocked_message_counts.get(&peer_url_alice)
&& let Some(c) = space_blocks.get(&TEST_SPACE_ID)
&& c.incoming >= 2
{
break;
}
});
assert!(
recv_module_msg_recv_bob
.recv_timeout(std::time::Duration::from_millis(200))
.is_err()
);
}
#[tokio::test(flavor = "multi_thread")]
async fn outgoing_notify_messages_to_blocked_peers_are_dropped() {
enable_tracing();
let (builder, _relay_server) = builder_with_relay!();
let TestPeer {
space: space_alice,
transport: transport_alice,
peer_url: peer_url_alice,
peer_disconnect_recv: peer_disconnect_recv_alice,
..
} = make_test_peer(builder.clone()).await;
let TestPeer {
space: space_bob,
transport: transport_bob,
peer_url: peer_url_bob,
agent_id: agent_id_bob,
agent_info: agent_info_bob,
recv_notify_recv: recv_notify_recv_bob,
peer_disconnect_recv: _peer_disconnect_recv_bob,
..
} = make_test_peer(builder).await;
space_alice
.peer_store()
.insert(vec![agent_info_bob.clone()])
.await
.unwrap();
let payload = Bytes::from("Hello world");
transport_alice
.send_space_notify(peer_url_bob.clone(), TEST_SPACE_ID, payload.clone())
.await
.unwrap();
let payload_received = recv_notify_recv_bob
.recv_timeout(std::time::Duration::from_secs(2))
.expect("timed out waiting for space message");
assert_eq!(payload, payload_received);
let agents_in_peer_store =
space_alice.peer_store().get_all().await.unwrap();
assert_eq!(agents_in_peer_store.len(), 2);
block_agent_in_space(agent_id_bob, space_alice.clone()).await;
transport_alice
.send_space_notify(
peer_url_bob.clone(),
TEST_SPACE_ID,
Bytes::from("Sending to blocker"),
)
.await
.unwrap();
iter_check!(500, {
let net_stats = transport_alice.dump_network_stats().await.unwrap();
if let Some(space_blocks) =
net_stats.blocked_message_counts.get(&peer_url_bob)
&& let Some(c) = space_blocks.get(&TEST_SPACE_ID)
&& c.outgoing == 1
{
break;
}
});
transport_bob.disconnect(peer_url_alice.clone(), None).await;
iter_check!(500, {
if let Ok(peer_url) = peer_disconnect_recv_alice.try_recv()
&& peer_url == peer_url_bob
{
break;
}
});
assert!(
recv_notify_recv_bob
.recv_timeout(std::time::Duration::from_millis(50))
.is_err()
);
let agent_info_bob_2 =
join_new_local_agent_and_wait_for_agent_info(space_bob).await;
space_alice
.peer_store()
.insert(vec![agent_info_bob_2])
.await
.unwrap();
transport_alice
.send_space_notify(
peer_url_bob.clone(),
TEST_SPACE_ID,
Bytes::from("Should still be blocked"),
)
.await
.unwrap();
iter_check!(500, {
let net_stats = transport_alice.dump_network_stats().await.unwrap();
if let Some(space_blocks) =
net_stats.blocked_message_counts.get(&peer_url_bob)
&& let Some(c) = space_blocks.get(&TEST_SPACE_ID)
&& c.outgoing == 2
{
break;
}
});
assert!(
recv_notify_recv_bob
.recv_timeout(std::time::Duration::from_millis(200))
.is_err()
);
}
#[tokio::test(flavor = "multi_thread")]
async fn outgoing_module_messages_to_blocked_peers_are_dropped() {
enable_tracing();
let (builder, _relay_server) = builder_with_relay!();
let TestPeer {
space: space_alice,
transport: transport_alice,
peer_url: peer_url_alice,
peer_disconnect_recv: peer_disconnect_recv_alice,
..
} = make_test_peer(builder.clone()).await;
let TestPeer {
space: space_bob,
transport: transport_bob,
peer_url: peer_url_bob,
agent_id: agent_id_bob,
agent_info: agent_info_bob,
recv_module_msg_recv: recv_module_msg_recv_bob,
peer_disconnect_recv: _peer_disconnect_recv_bob,
..
} = make_test_peer(builder).await;
space_alice
.peer_store()
.insert(vec![agent_info_bob.clone()])
.await
.unwrap();
let payload = Bytes::from("Hello module world");
transport_alice
.send_module(
peer_url_bob.clone(),
TEST_SPACE_ID,
"test".into(),
payload.clone(),
)
.await
.unwrap();
let payload_received = recv_module_msg_recv_bob
.recv_timeout(std::time::Duration::from_secs(2))
.expect("timed out waiting for module message");
assert_eq!(payload, payload_received);
let agents_in_peer_store =
space_alice.peer_store().get_all().await.unwrap();
assert_eq!(agents_in_peer_store.len(), 2);
block_agent_in_space(agent_id_bob, space_alice.clone()).await;
transport_alice
.send_module(
peer_url_bob.clone(),
TEST_SPACE_ID,
"test".into(),
Bytes::from("Sending to blocker"),
)
.await
.unwrap();
iter_check!(500, {
let net_stats = transport_alice.dump_network_stats().await.unwrap();
if let Some(space_blocks) =
net_stats.blocked_message_counts.get(&peer_url_bob)
&& let Some(c) = space_blocks.get(&TEST_SPACE_ID)
&& c.outgoing == 1
{
break;
}
});
transport_bob.disconnect(peer_url_alice, None).await;
iter_check!(500, {
if let Ok(peer_url) = peer_disconnect_recv_alice.try_recv()
&& peer_url == peer_url_bob
{
break;
}
});
assert!(
recv_module_msg_recv_bob
.recv_timeout(std::time::Duration::from_millis(50))
.is_err()
);
let agent_info_bob_2 =
join_new_local_agent_and_wait_for_agent_info(space_bob).await;
space_alice
.peer_store()
.insert(vec![agent_info_bob_2])
.await
.unwrap();
transport_alice
.send_module(
peer_url_bob.clone(),
TEST_SPACE_ID,
"test".into(),
Bytes::from("Should still be blocked"),
)
.await
.unwrap();
iter_check!(500, {
let net_stats = transport_alice.dump_network_stats().await.unwrap();
if let Some(space_blocks) =
net_stats.blocked_message_counts.get(&peer_url_bob)
&& let Some(c) = space_blocks.get(&TEST_SPACE_ID)
&& c.outgoing == 2
{
break;
}
});
assert!(
recv_module_msg_recv_bob
.recv_timeout(std::time::Duration::from_millis(200))
.is_err()
);
}
#[tokio::test(flavor = "multi_thread")]
async fn send_before_local_agent_join_returns_error() {
enable_tracing();
let (builder, _relay_server) = builder_with_relay!();
let space_once_cell_alice = Arc::new(OnceCell::new());
let (
tx_handler_alice,
_recv_module_msg_recv_alice,
_peer_disconnect_recv_alice,
) = TestTxHandler::create(
Mutex::new(Url::from_str("ws://127.0.0.1:80").unwrap()),
space_once_cell_alice.clone(),
);
let transport_alice = builder
.transport
.create(builder.clone(), tx_handler_alice.clone())
.await
.unwrap();
transport_alice.register_module_handler(
TEST_SPACE_ID,
"test".into(),
tx_handler_alice,
);
let (space_handler_alice, _recv_notify_recv_alice) =
TestSpaceHandler::create();
let report_alice = builder
.report
.create(builder.clone(), transport_alice.clone())
.await
.unwrap();
let space_alice = builder
.space
.create(
builder.clone(),
None,
Arc::new(space_handler_alice),
TEST_SPACE_ID,
report_alice,
transport_alice.clone(),
)
.await
.unwrap();
space_once_cell_alice.set(space_alice.clone()).unwrap();
let bob = make_test_peer(builder.clone()).await;
space_alice
.peer_store()
.insert(vec![bob.agent_info.clone()])
.await
.unwrap();
let result = transport_alice
.send_space_notify(
bob.peer_url.clone(),
TEST_SPACE_ID,
Bytes::from("Hello from Alice"),
)
.await;
assert!(
result.is_err(),
"Expected error when sending before local agent joins, but send succeeded"
);
#[cfg(feature = "transport-iroh")]
{
let local_agent_alice = Arc::new(Ed25519LocalAgent::default());
space_alice
.local_agent_join(local_agent_alice.clone())
.await
.unwrap();
let agent_id = local_agent_alice.agent().clone();
let peer_store = space_alice.peer_store().clone();
iter_check!(5000, 5, {
let agents = peer_store.get_all().await.unwrap();
if agents.iter().any(|a| a.agent == agent_id) {
break;
}
});
transport_alice
.send_space_notify(
bob.peer_url.clone(),
TEST_SPACE_ID,
Bytes::from("Hello after join"),
)
.await
.unwrap();
}
}
#[tokio::test(flavor = "multi_thread")]
async fn send_module_before_local_agent_join_returns_error() {
enable_tracing();
let (builder, _relay_server) = builder_with_relay!();
let space_once_cell_alice = Arc::new(OnceCell::new());
let (
tx_handler_alice,
_recv_module_msg_recv_alice,
_peer_disconnect_recv_alice,
) = TestTxHandler::create(
Mutex::new(Url::from_str("ws://127.0.0.1:80").unwrap()),
space_once_cell_alice.clone(),
);
let transport_alice = builder
.transport
.create(builder.clone(), tx_handler_alice.clone())
.await
.unwrap();
transport_alice.register_module_handler(
TEST_SPACE_ID,
"test".into(),
tx_handler_alice,
);
let (space_handler_alice, _recv_notify_recv_alice) =
TestSpaceHandler::create();
let report_alice = builder
.report
.create(builder.clone(), transport_alice.clone())
.await
.unwrap();
let space_alice = builder
.space
.create(
builder.clone(),
None,
Arc::new(space_handler_alice),
TEST_SPACE_ID,
report_alice,
transport_alice.clone(),
)
.await
.unwrap();
space_once_cell_alice.set(space_alice.clone()).unwrap();
let bob = make_test_peer(builder.clone()).await;
space_alice
.peer_store()
.insert(vec![bob.agent_info.clone()])
.await
.unwrap();
let result = transport_alice
.send_module(
bob.peer_url.clone(),
TEST_SPACE_ID,
"test".into(),
Bytes::from("Hello module from Alice"),
)
.await;
assert!(
result.is_err(),
"Expected error when sending module message before local agent joins, but send succeeded"
);
#[cfg(feature = "transport-iroh")]
{
let local_agent_alice = Arc::new(Ed25519LocalAgent::default());
space_alice
.local_agent_join(local_agent_alice.clone())
.await
.unwrap();
let agent_id = local_agent_alice.agent().clone();
let peer_store = space_alice.peer_store().clone();
iter_check!(5000, 5, {
let agents = peer_store.get_all().await.unwrap();
if agents.iter().any(|a| a.agent == agent_id) {
break;
}
});
transport_alice
.send_module(
bob.peer_url.clone(),
TEST_SPACE_ID,
"test".into(),
Bytes::from("Hello module after join"),
)
.await
.unwrap();
}
}