mod common;
use std::time::Duration;
use deadline::deadline;
use quickie::*;
use tokio::time::sleep;
const NUM_MESSAGES: u8 = 3;
#[tokio::test]
async fn streams_uni() {
let (client_cfg, server_cfg) = common::client_and_server_config();
let node = common::TestNode(Node::new(Config::new(Some(client_cfg.clone()), None)));
node.start("127.0.0.1:0".parse().unwrap()).await.unwrap();
let raw_endpoint = common::raw_endpoint(client_cfg, server_cfg);
let raw_endpoint_addr = raw_endpoint.local_addr().unwrap();
let conn_handle = tokio::spawn(async move {
if let Some(conn) = raw_endpoint.accept().await {
conn.accept().unwrap().await.unwrap()
} else {
panic!("failed to accept a connection");
}
});
let conn_id = node
.connect(raw_endpoint_addr, common::SERVER_NAME)
.await
.unwrap();
let connection = conn_handle.await.unwrap();
{
let stream_id = node.open_uni(conn_id).await.unwrap();
for i in 0..NUM_MESSAGES {
node.send_msg(conn_id, stream_id, [i, i].to_vec().into())
.unwrap();
}
let node_clone = node.clone();
deadline!(Duration::from_secs(1), move || {
let stats = node_clone.get_stream_stats(conn_id, stream_id).unwrap();
stats.msgs_sent == NUM_MESSAGES as u64 && stats.bytes_sent == NUM_MESSAGES as u64 * 2
});
let mut raw_recv_stream = connection.accept_uni().await.unwrap();
let mut recv_buf = [255u8, 255];
for i in 0..NUM_MESSAGES {
raw_recv_stream.read_exact(&mut recv_buf).await.unwrap();
assert_eq!(recv_buf, [i, i]);
}
node.close_stream(conn_id, stream_id);
}
{
let mut raw_send_stream = connection.open_uni().await.unwrap();
for i in 0..NUM_MESSAGES {
raw_send_stream.write_all(&[i, i]).await.unwrap();
sleep(Duration::from_millis(1)).await;
}
let mut stream_ids = node.get_stream_ids(conn_id).unwrap();
assert_eq!(stream_ids.len(), 1);
let stream_id = stream_ids.pop().unwrap();
let node_clone = node.clone();
deadline!(Duration::from_secs(1), move || {
let stats = node_clone.get_stream_stats(conn_id, stream_id).unwrap();
stats.msgs_recv == NUM_MESSAGES as u64 && stats.bytes_recv == NUM_MESSAGES as u64 * 2
});
}
}
#[tokio::test]
async fn streams_bi() {
let (client_cfg, server_cfg) = common::client_and_server_config();
let node = common::TestNode(Node::new(Config::new(Some(client_cfg.clone()), None)));
node.start("127.0.0.1:0".parse().unwrap()).await.unwrap();
let raw_endpoint = common::raw_endpoint(client_cfg, server_cfg);
let raw_endpoint_addr = raw_endpoint.local_addr().unwrap();
let conn_handle = tokio::spawn(async move {
if let Some(conn) = raw_endpoint.accept().await {
conn.accept().unwrap().await.unwrap()
} else {
panic!("failed to accept a connection");
}
});
let conn_id = node
.connect(raw_endpoint_addr, common::SERVER_NAME)
.await
.unwrap();
let connection = conn_handle.await.unwrap();
{
let stream_id = node.open_bi(conn_id).await.unwrap();
for i in 0..NUM_MESSAGES {
node.send_msg(conn_id, stream_id, [i, i].to_vec().into())
.unwrap();
}
let node_clone = node.clone();
deadline!(Duration::from_secs(1), move || {
let stats = node_clone.get_stream_stats(conn_id, stream_id).unwrap();
stats.msgs_sent == NUM_MESSAGES as u64 && stats.bytes_sent == NUM_MESSAGES as u64 * 2
});
let (mut raw_send_stream, mut raw_recv_stream) = connection.accept_bi().await.unwrap();
let mut recv_buf = [255u8, 255];
for i in 0..NUM_MESSAGES {
raw_recv_stream.read_exact(&mut recv_buf).await.unwrap();
assert_eq!(recv_buf, [i, i]);
}
for i in 0..NUM_MESSAGES {
raw_send_stream.write_all(&[i, i]).await.unwrap();
sleep(Duration::from_millis(1)).await;
}
let node_clone = node.clone();
deadline!(Duration::from_secs(1), move || {
let stats = node_clone.get_stream_stats(conn_id, stream_id).unwrap();
stats.msgs_recv == NUM_MESSAGES as u64 && stats.bytes_recv == NUM_MESSAGES as u64 * 2
});
node.close_stream(conn_id, stream_id);
}
{
let (mut raw_send_stream, mut raw_recv_stream) = connection.open_bi().await.unwrap();
for i in 0..NUM_MESSAGES {
raw_send_stream.write_all(&[i, i]).await.unwrap();
sleep(Duration::from_millis(1)).await;
}
let mut stream_ids = node.get_stream_ids(conn_id).unwrap();
assert_eq!(stream_ids.len(), 1);
let stream_id = stream_ids.pop().unwrap();
let node_clone = node.clone();
deadline!(Duration::from_secs(1), move || {
let stats = node_clone.get_stream_stats(conn_id, stream_id).unwrap();
stats.msgs_recv == NUM_MESSAGES as u64 && stats.bytes_recv == NUM_MESSAGES as u64 * 2
});
for i in 0..NUM_MESSAGES {
node.send_msg(conn_id, stream_id, [i, i].to_vec().into())
.unwrap();
}
let node_clone = node.clone();
deadline!(Duration::from_secs(1), move || {
let stats = node_clone.get_stream_stats(conn_id, stream_id).unwrap();
stats.msgs_recv == NUM_MESSAGES as u64 && stats.bytes_recv == NUM_MESSAGES as u64 * 2
});
let mut recv_buf = [255u8, 255];
for i in 0..NUM_MESSAGES {
raw_recv_stream.read_exact(&mut recv_buf).await.unwrap();
assert_eq!(recv_buf, [i, i]);
}
}
}
#[tokio::test]
async fn datagrams() {
let (client_cfg, server_cfg) = common::client_and_server_config();
let node = common::TestNode(Node::new(Config::new(Some(client_cfg.clone()), None)));
node.start("127.0.0.1:0".parse().unwrap()).await.unwrap();
let raw_endpoint = common::raw_endpoint(client_cfg, server_cfg);
let raw_endpoint_addr = raw_endpoint.local_addr().unwrap();
let conn_handle = tokio::spawn(async move {
if let Some(conn) = raw_endpoint.accept().await {
conn.accept().unwrap().await.unwrap()
} else {
panic!("failed to accept a connection");
}
});
let conn_id = node
.connect(raw_endpoint_addr, common::SERVER_NAME)
.await
.unwrap();
let connection = conn_handle.await.unwrap();
{
for i in 0..NUM_MESSAGES {
node.send_datagram(conn_id, [i, i].to_vec().into()).unwrap();
}
let node_clone = node.clone();
deadline!(Duration::from_secs(1), move || {
let stats = node_clone.get_datagram_stats(conn_id).unwrap();
stats.msgs_sent == NUM_MESSAGES as u64 && stats.bytes_sent == NUM_MESSAGES as u64 * 2
});
for i in 0..NUM_MESSAGES {
assert_eq!(&connection.read_datagram().await.unwrap(), &[i, i][..]);
}
}
{
for i in 0..NUM_MESSAGES {
connection.send_datagram([i, i].to_vec().into()).unwrap();
}
let node_clone = node.clone();
deadline!(Duration::from_secs(1), move || {
let stats = node_clone.get_datagram_stats(conn_id).unwrap();
stats.msgs_recv == NUM_MESSAGES as u64 && stats.bytes_recv == NUM_MESSAGES as u64 * 2
});
}
}