use commonware_macros::stability_scope;
stability_scope!(ALPHA {
pub(crate) mod audited;
pub(crate) mod deterministic;
});
stability_scope!(BETA {
pub(crate) mod metered;
});
stability_scope!(BETA, cfg(all(not(target_arch = "wasm32"), not(feature = "iouring-network"))) {
pub(crate) mod tokio;
});
stability_scope!(ALPHA, cfg(all(not(target_arch = "wasm32"), feature = "iouring-network")) {
pub(crate) mod iouring;
});
#[cfg(test)]
mod tests {
use crate::{IoBuf, IoBufs, Listener, Sink, Stream};
use commonware_utils::sync::Barrier;
use futures::join;
use std::{net::SocketAddr, sync::Arc};
use tokio::task::JoinSet;
const CLIENT_SEND_DATA: &[u8] = b"client_send_data";
const SERVER_SEND_DATA: &[u8] = b"server_send_data";
pub(super) async fn test_network_trait<N, F>(new_network: F)
where
F: Fn() -> N,
N: crate::Network,
{
test_network_bind_and_dial(new_network()).await;
test_network_vectored_send(new_network()).await;
test_network_multiple_clients(new_network()).await;
test_network_large_data(new_network()).await;
test_network_connection_errors(new_network()).await;
test_network_peek(new_network()).await;
}
async fn test_network_bind_and_dial<N: crate::Network>(network: N) {
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let server = tokio::spawn(async move {
let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept");
let received = stream
.recv(CLIENT_SEND_DATA.len())
.await
.expect("Failed to receive");
assert_eq!(received.coalesce(), CLIENT_SEND_DATA);
sink.send(IoBuf::from(SERVER_SEND_DATA))
.await
.expect("Failed to send");
(sink, stream)
});
let client = tokio::spawn(async move {
let (mut sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
sink.send(IoBuf::from(CLIENT_SEND_DATA))
.await
.expect("Failed to send data");
let received = stream
.recv(SERVER_SEND_DATA.len())
.await
.expect("Failed to receive data");
assert_eq!(received.coalesce(), SERVER_SEND_DATA);
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_vectored_send<N: crate::Network>(network: N) {
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let message = IoBufs::from(vec![
IoBuf::from(b"client_".to_vec()),
IoBuf::from(b"vectored_".to_vec()),
IoBuf::from(b"send".to_vec()),
]);
let expected = message.clone().coalesce();
let server = tokio::spawn(async move {
let (_, sink, mut stream) = listener.accept().await.expect("Failed to accept");
let received = stream
.recv(expected.len())
.await
.expect("Failed to receive");
assert_eq!(received.coalesce(), expected.as_ref());
(sink, stream)
});
let client = tokio::spawn(async move {
let (mut sink, stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
sink.send(message).await.expect("Failed to send data");
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_multiple_clients<N: crate::Network>(network: N) {
const NUM_CLIENTS: usize = 3;
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let barrier = Arc::new(Barrier::new(NUM_CLIENTS * 2));
let server_barrier = barrier.clone();
let server = tokio::spawn(async move {
let mut set = JoinSet::new();
for _ in 0..NUM_CLIENTS {
let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept");
let barrier = server_barrier.clone();
set.spawn(async move {
let received = stream
.recv(CLIENT_SEND_DATA.len())
.await
.expect("Failed to receive");
assert_eq!(received.coalesce(), CLIENT_SEND_DATA);
sink.send(IoBuf::from(SERVER_SEND_DATA))
.await
.expect("Failed to send");
barrier.wait().await;
});
}
while let Some(result) = set.join_next().await {
result.expect("Server connection task failed");
}
});
let mut set = JoinSet::new();
for _ in 0..NUM_CLIENTS {
let network = network.clone();
let barrier = barrier.clone();
set.spawn(async move {
let (mut sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
sink.send(IoBuf::from(CLIENT_SEND_DATA))
.await
.expect("Failed to send data");
let received = stream
.recv(SERVER_SEND_DATA.len())
.await
.expect("Failed to receive data");
assert_eq!(received.coalesce(), SERVER_SEND_DATA);
barrier.wait().await;
});
}
while let Some(result) = set.join_next().await {
result.expect("Client task failed");
}
server.await.expect("Server task failed");
}
async fn test_network_large_data<N: crate::Network>(network: N) {
const NUM_CHUNKS: usize = 1_000;
const CHUNK_SIZE: usize = 8 * 1024;
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let server = tokio::spawn(async move {
let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept");
for _ in 0..NUM_CHUNKS {
let received = stream
.recv(CHUNK_SIZE)
.await
.expect("Failed to receive chunk");
sink.send(received).await.expect("Failed to send chunk");
}
(sink, stream)
});
let client = tokio::spawn(async move {
let (mut sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
let pattern = (0..CHUNK_SIZE).map(|i| (i % 256) as u8).collect::<Vec<_>>();
for _ in 0..NUM_CHUNKS {
sink.send(pattern.clone())
.await
.expect("Failed to send chunk");
let received = stream
.recv(CHUNK_SIZE)
.await
.expect("Failed to receive chunk");
assert_eq!(received.coalesce(), &pattern[..]);
}
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_connection_errors<N: crate::Network>(network: N) {
let invalid_addr = SocketAddr::from(([127, 0, 0, 1], 1));
let result = network.dial(invalid_addr).await;
assert!(matches!(result, Err(crate::Error::ConnectionFailed)));
let listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let result = network.bind(listener_addr).await;
assert!(matches!(result, Err(crate::Error::BindFailed)));
}
async fn test_network_peek<N: crate::Network>(network: N) {
const DATA: &[u8] = b"hello world - peek test data";
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let server = tokio::spawn(async move {
let (_, mut sink, stream) = listener.accept().await.expect("Failed to accept");
sink.send(IoBuf::from(DATA)).await.expect("Failed to send");
(sink, stream)
});
let client = tokio::spawn(async move {
let (sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
let first = stream.recv(5).await.expect("Failed to receive");
assert_eq!(first.coalesce(), b"hello");
let peeked = stream.peek(100);
assert!(!peeked.is_empty());
let peeked_again = stream.peek(100);
assert_eq!(peeked, peeked_again, "peek should be non-consuming");
if peeked.len() >= 3 {
let peeked_small = stream.peek(3);
assert_eq!(peeked_small.len(), 3);
assert_eq!(peeked_small, &peeked[..3]);
}
let rest_len = DATA.len() - 5;
let rest = stream.recv(rest_len).await.expect("Failed to receive");
assert_eq!(rest.coalesce(), &DATA[5..]);
let final_peek = stream.peek(100);
assert!(final_peek.is_empty());
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
pub(super) async fn stress_test_network_trait<N, F>(new_network: F)
where
F: Fn() -> N,
N: crate::Network,
{
stress_concurrent_streams(new_network()).await;
}
async fn stress_concurrent_streams<N: crate::Network>(network: N) {
const NUM_CLIENTS: usize = 96;
const NUM_MESSAGES: usize = 16_384;
const MESSAGE_SIZE: usize = 4096;
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let addr = listener.local_addr().unwrap();
let barrier = Arc::new(Barrier::new(NUM_CLIENTS * 2));
let server_barrier = barrier.clone();
let server = tokio::spawn(async move {
let mut set = JoinSet::new();
for _ in 0..NUM_CLIENTS {
let (_, mut sink, mut stream) = listener.accept().await.unwrap();
let barrier = server_barrier.clone();
set.spawn(async move {
for _ in 0..NUM_MESSAGES {
let received = stream.recv(MESSAGE_SIZE).await.unwrap();
sink.send(received).await.unwrap();
}
barrier.wait().await;
});
}
while let Some(result) = set.join_next().await {
result.unwrap();
}
});
let mut set = JoinSet::new();
for _ in 0..NUM_CLIENTS {
let network = network.clone();
let barrier = barrier.clone();
set.spawn(async move {
let (mut sink, mut stream) = network.dial(addr).await.unwrap();
let payload = vec![42u8; MESSAGE_SIZE];
for _ in 0..NUM_MESSAGES {
sink.send(payload.clone()).await.unwrap();
let received = stream.recv(MESSAGE_SIZE).await.unwrap();
assert_eq!(received.coalesce(), &payload[..]);
}
barrier.wait().await;
});
}
while let Some(result) = set.join_next().await {
result.unwrap();
}
server.await.unwrap();
}
}