use commonware_macros::stability_scope;
stability_scope!(ALPHA {
pub(crate) mod audited;
pub(crate) mod deterministic;
});
stability_scope!(BETA {
pub(crate) mod metered;
});
stability_scope!(BETA, cfg(all(not(target_arch = "wasm32"), not(feature = "iouring-network"))) {
pub(crate) mod tokio;
});
stability_scope!(ALPHA, cfg(all(not(target_arch = "wasm32"), feature = "iouring-network")) {
pub(crate) mod iouring;
});
#[cfg(test)]
mod tests {
use crate::{IoBuf, IoBufs, Listener, Sink, Stream};
use commonware_macros::select;
use commonware_utils::sync::Barrier;
use futures::{join, FutureExt};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{sync::oneshot, task::JoinSet};
const CLIENT_SEND_DATA: &[u8] = b"client_send_data";
const SERVER_SEND_DATA: &[u8] = b"server_send_data";
pub(super) async fn test_network_trait<N, F>(new_network: F)
where
F: Fn() -> N,
N: crate::Network,
{
test_network_bind_and_dial(new_network()).await;
test_network_vectored_send(new_network()).await;
test_network_multiple_clients(new_network()).await;
test_network_large_data(new_network()).await;
test_network_connection_errors(new_network()).await;
test_network_peek(new_network()).await;
test_network_canceled_recv_poisons_stream(new_network()).await;
test_network_canceled_send_poisons_sink(new_network()).await;
test_network_recv_error_poisons_stream(new_network()).await;
test_network_send_error_poisons_sink(new_network()).await;
}
async fn test_network_bind_and_dial<N: crate::Network>(network: N) {
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let server = tokio::spawn(async move {
let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept");
let received = stream
.recv(CLIENT_SEND_DATA.len())
.await
.expect("Failed to receive");
assert_eq!(received.coalesce(), CLIENT_SEND_DATA);
sink.send(IoBuf::from(SERVER_SEND_DATA))
.await
.expect("Failed to send");
(sink, stream)
});
let client = tokio::spawn(async move {
let (mut sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
sink.send(IoBuf::from(CLIENT_SEND_DATA))
.await
.expect("Failed to send data");
let received = stream
.recv(SERVER_SEND_DATA.len())
.await
.expect("Failed to receive data");
assert_eq!(received.coalesce(), SERVER_SEND_DATA);
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_vectored_send<N: crate::Network>(network: N) {
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let message = IoBufs::from(vec![
IoBuf::from(b"client_".to_vec()),
IoBuf::from(b"vectored_".to_vec()),
IoBuf::from(b"send".to_vec()),
]);
let expected = message.clone().coalesce();
let server = tokio::spawn(async move {
let (_, sink, mut stream) = listener.accept().await.expect("Failed to accept");
let received = stream
.recv(expected.len())
.await
.expect("Failed to receive");
assert_eq!(received.coalesce(), expected.as_ref());
(sink, stream)
});
let client = tokio::spawn(async move {
let (mut sink, stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
sink.send(message).await.expect("Failed to send data");
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_multiple_clients<N: crate::Network>(network: N) {
const NUM_CLIENTS: usize = 3;
let network = Arc::new(network);
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let barrier = Arc::new(Barrier::new(NUM_CLIENTS * 2));
let server_barrier = barrier.clone();
let server = tokio::spawn(async move {
let mut set = JoinSet::new();
for _ in 0..NUM_CLIENTS {
let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept");
let barrier = server_barrier.clone();
set.spawn(async move {
let received = stream
.recv(CLIENT_SEND_DATA.len())
.await
.expect("Failed to receive");
assert_eq!(received.coalesce(), CLIENT_SEND_DATA);
sink.send(IoBuf::from(SERVER_SEND_DATA))
.await
.expect("Failed to send");
barrier.wait().await;
});
}
while let Some(result) = set.join_next().await {
result.expect("Server connection task failed");
}
});
let mut set = JoinSet::new();
for _ in 0..NUM_CLIENTS {
let network = network.clone();
let barrier = barrier.clone();
set.spawn(async move {
let (mut sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
sink.send(IoBuf::from(CLIENT_SEND_DATA))
.await
.expect("Failed to send data");
let received = stream
.recv(SERVER_SEND_DATA.len())
.await
.expect("Failed to receive data");
assert_eq!(received.coalesce(), SERVER_SEND_DATA);
barrier.wait().await;
});
}
while let Some(result) = set.join_next().await {
result.expect("Client task failed");
}
server.await.expect("Server task failed");
}
async fn test_network_large_data<N: crate::Network>(network: N) {
const NUM_CHUNKS: usize = 1_000;
const CHUNK_SIZE: usize = 8 * 1024;
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let server = tokio::spawn(async move {
let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept");
for _ in 0..NUM_CHUNKS {
let received = stream
.recv(CHUNK_SIZE)
.await
.expect("Failed to receive chunk");
sink.send(received).await.expect("Failed to send chunk");
}
(sink, stream)
});
let client = tokio::spawn(async move {
let (mut sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
let pattern = (0..CHUNK_SIZE).map(|i| (i % 256) as u8).collect::<Vec<_>>();
for _ in 0..NUM_CHUNKS {
sink.send(pattern.clone())
.await
.expect("Failed to send chunk");
let received = stream
.recv(CHUNK_SIZE)
.await
.expect("Failed to receive chunk");
assert_eq!(received.coalesce(), &pattern[..]);
}
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_connection_errors<N: crate::Network>(network: N) {
let invalid_addr = SocketAddr::from(([127, 0, 0, 1], 1));
let result = network.dial(invalid_addr).await;
assert!(matches!(result, Err(crate::Error::ConnectionFailed)));
let listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let result = network.bind(listener_addr).await;
assert!(matches!(result, Err(crate::Error::BindFailed)));
}
async fn test_network_peek<N: crate::Network>(network: N) {
const DATA: &[u8] = b"hello world - peek test data";
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let server = tokio::spawn(async move {
let (_, mut sink, stream) = listener.accept().await.expect("Failed to accept");
sink.send(IoBuf::from(DATA)).await.expect("Failed to send");
(sink, stream)
});
let client = tokio::spawn(async move {
let (sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
let first = stream.recv(5).await.expect("Failed to receive");
assert_eq!(first.coalesce(), b"hello");
let peeked = stream.peek(100);
assert!(!peeked.is_empty());
let peeked_again = stream.peek(100);
assert_eq!(peeked, peeked_again, "peek should be non-consuming");
if peeked.len() >= 3 {
let peeked_small = stream.peek(3);
assert_eq!(peeked_small.len(), 3);
assert_eq!(peeked_small, &peeked[..3]);
}
let rest_len = DATA.len() - 5;
let rest = stream.recv(rest_len).await.expect("Failed to receive");
assert_eq!(rest.coalesce(), &DATA[5..]);
let final_peek = stream.peek(100);
assert!(final_peek.is_empty());
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_canceled_recv_poisons_stream<N: crate::Network>(network: N) {
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let server = tokio::spawn(async move {
let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept");
select! {
v = stream.recv(100) => {
panic!("unexpected value: {v:?}");
},
_ = tokio::time::sleep(Duration::from_millis(5)) => {},
};
assert!(matches!(stream.recv(1).await, Err(crate::Error::Closed)));
sink.send(IoBuf::from(b"ok"))
.await
.expect("sink should remain usable after stream cancellation");
(sink, stream)
});
let client = tokio::spawn(async move {
let (sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
let received = stream.recv(2).await.expect("Failed to receive response");
assert_eq!(received.coalesce(), b"ok");
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_canceled_send_poisons_sink<N: crate::Network>(network: N) {
if cfg!(target_os = "windows") {
return;
}
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let (canceled_sender, canceled_receiver) = oneshot::channel();
let server = tokio::spawn(async move {
let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept");
let mut blocked = false;
for _ in 0..1024 {
match sink.send(vec![0u8; 128 * 1024]).now_or_never() {
Some(Ok(())) => {}
Some(Err(err)) => panic!("send failed before blocking: {err:?}"),
None => {
blocked = true;
break;
}
}
}
assert!(blocked, "send should have blocked on backpressure");
assert!(matches!(
sink.send(b"after".as_slice()).await,
Err(crate::Error::Closed)
));
canceled_sender
.send(())
.expect("client should wait for send cancellation");
let received = stream.recv(2).await.expect("stream should remain usable");
assert_eq!(received.coalesce(), b"ok");
(sink, stream)
});
let client = tokio::spawn(async move {
let (mut sink, stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
canceled_receiver
.await
.expect("server should cancel the send first");
sink.send(IoBuf::from(b"ok")).await.expect("Failed to send");
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_recv_error_poisons_stream<N: crate::Network>(network: N) {
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let server = tokio::spawn(async move {
let (_, mut sink, mut stream) = listener.accept().await.expect("Failed to accept");
let err = stream
.recv(100)
.await
.expect_err("recv should fail after a partial read");
assert!(matches!(err, crate::Error::RecvFailed));
assert!(matches!(stream.recv(1).await, Err(crate::Error::Closed)));
sink.send(IoBuf::from(b"ok"))
.await
.expect("sink should remain usable after stream error");
(sink, stream)
});
let client = tokio::spawn(async move {
let (mut sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
sink.send([1; 50].as_slice())
.await
.expect("Failed to send partial payload");
drop(sink);
let received = stream.recv(2).await.expect("Failed to receive response");
assert_eq!(received.coalesce(), b"ok");
stream
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
async fn test_network_send_error_poisons_sink<N: crate::Network>(network: N) {
const DATA: &[u8] = b"okay";
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.expect("Failed to bind");
let listener_addr = listener.local_addr().expect("Failed to get local address");
let (buffered_sender, buffered_receiver) = oneshot::channel();
let (closed_sender, closed_receiver) = oneshot::channel();
let server = tokio::spawn(async move {
let (_, mut sink, stream) = listener.accept().await.expect("Failed to accept");
sink.send(IoBuf::from(DATA))
.await
.expect("stream peer should remain readable after sink error");
buffered_receiver
.await
.expect("client should signal once the response is buffered");
drop(sink);
drop(stream);
closed_sender
.send(())
.expect("client should still be waiting for the close");
});
let client = tokio::spawn(async move {
let (mut sink, mut stream) = network
.dial(listener_addr)
.await
.expect("Failed to dial server");
let prefix = stream.recv(2).await.expect("Failed to receive response");
assert_eq!(prefix.coalesce(), &DATA[..2]);
assert_eq!(stream.peek(2), &DATA[2..]);
buffered_sender
.send(())
.expect("server should still be waiting for the client");
closed_receiver
.await
.expect("server should signal after closing the connection");
let mut err = None;
for _ in 0..10 {
match sink.send([9u8].as_slice()).await {
Ok(()) => tokio::time::sleep(Duration::from_millis(5)).await,
Err(send_err) => {
err = Some(send_err);
break;
}
}
}
let err = err.expect("send should fail after the peer closes");
assert!(matches!(err, crate::Error::SendFailed));
assert!(matches!(
sink.send([9u8].as_slice()).await,
Err(crate::Error::Closed)
));
let suffix = stream
.recv(2)
.await
.expect("Failed to receive buffered response");
assert_eq!(suffix.coalesce(), &DATA[2..]);
(sink, stream)
});
let (server_result, client_result) = join!(server, client);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
}
pub(super) async fn stress_test_network_trait<N, F>(new_network: F)
where
F: Fn() -> N,
N: crate::Network,
{
stress_concurrent_streams(new_network()).await;
}
async fn stress_concurrent_streams<N: crate::Network>(network: N) {
const NUM_CLIENTS: usize = 96;
const NUM_MESSAGES: usize = 16_384;
const MESSAGE_SIZE: usize = 4096;
let network = Arc::new(network);
let mut listener = network
.bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let addr = listener.local_addr().unwrap();
let barrier = Arc::new(Barrier::new(NUM_CLIENTS * 2));
let server_barrier = barrier.clone();
let server = tokio::spawn(async move {
let mut set = JoinSet::new();
for _ in 0..NUM_CLIENTS {
let (_, mut sink, mut stream) = listener.accept().await.unwrap();
let barrier = server_barrier.clone();
set.spawn(async move {
for _ in 0..NUM_MESSAGES {
let received = stream.recv(MESSAGE_SIZE).await.unwrap();
sink.send(received).await.unwrap();
}
barrier.wait().await;
});
}
while let Some(result) = set.join_next().await {
result.unwrap();
}
});
let mut set = JoinSet::new();
for _ in 0..NUM_CLIENTS {
let network = network.clone();
let barrier = barrier.clone();
set.spawn(async move {
let (mut sink, mut stream) = network.dial(addr).await.unwrap();
let payload = vec![42u8; MESSAGE_SIZE];
for _ in 0..NUM_MESSAGES {
sink.send(payload.clone()).await.unwrap();
let received = stream.recv(MESSAGE_SIZE).await.unwrap();
assert_eq!(received.coalesce(), &payload[..]);
}
barrier.wait().await;
});
}
while let Some(result) = set.join_next().await {
result.unwrap();
}
server.await.unwrap();
}
}