use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
time::{Duration, Instant},
};
use comnoq::{ClientBuilder, ConnectionError, Endpoint, TransportConfig};
use compio::io::AsyncWriteExt;
use futures_util::join;
mod common;
use common::{config_pair, subscribe};
#[compio::test]
async fn handshake_timeout() {
let _guard = subscribe();
let endpoint = Endpoint::client("127.0.0.1:0").await.unwrap();
const IDLE_TIMEOUT: Duration = Duration::from_millis(100);
let mut transport_config = TransportConfig::default();
transport_config
.max_idle_timeout(Some(IDLE_TIMEOUT.try_into().unwrap()))
.initial_rtt(Duration::from_millis(10));
let mut client_config = ClientBuilder::new_with_no_server_verification().build();
client_config.transport_config(Arc::new(transport_config));
let start = Instant::now();
match endpoint
.connect(
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1),
"localhost",
Some(client_config),
)
.unwrap()
.await
{
Err(ConnectionError::TimedOut) => {}
Err(e) => panic!("unexpected error: {e:?}"),
Ok(_) => panic!("unexpected success"),
}
let dt = start.elapsed();
assert!(dt > IDLE_TIMEOUT && dt < 2 * IDLE_TIMEOUT);
endpoint.shutdown().await.unwrap();
}
#[compio::test]
async fn close_endpoint() {
let _guard = subscribe();
let endpoint = ClientBuilder::new_with_no_server_verification()
.bind("127.0.0.1:0")
.await
.unwrap();
let conn = endpoint
.connect(
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1),
"localhost",
None,
)
.unwrap();
endpoint.close(0u32.into(), b"");
match conn.await {
Err(ConnectionError::LocallyClosed) => (),
Err(e) => panic!("unexpected error: {e}"),
Ok(_) => {
panic!("unexpected success");
}
}
endpoint.shutdown().await.unwrap();
}
async fn endpoint() -> Endpoint {
let (server_config, client_config) = config_pair(None);
let mut endpoint = Endpoint::server("127.0.0.1:0", server_config)
.await
.unwrap();
endpoint.default_client_config = Some(client_config);
endpoint
}
#[compio::test]
async fn read_after_close() {
let _guard = subscribe();
let endpoint = endpoint().await;
const MSG: &[u8] = b"goodbye!";
join!(
async {
let conn = endpoint.wait_incoming().await.unwrap().await.unwrap();
let mut s = conn.open_uni().unwrap();
s.write_all(MSG).await.unwrap();
s.finish().unwrap();
let _ = s.stopped().await;
},
async {
let conn = endpoint
.connect(endpoint.local_addr().unwrap(), "localhost", None)
.unwrap()
.await
.unwrap();
let mut recv = conn.accept_uni().await.unwrap();
let (_, buf) = recv.read_to_end(vec![]).await.unwrap();
assert_eq!(buf, MSG);
},
);
endpoint.shutdown().await.unwrap();
}
#[compio::test]
async fn export_keying_material() {
let _guard = subscribe();
let endpoint = endpoint().await;
let (conn1, conn2) = join!(
async {
endpoint
.connect(endpoint.local_addr().unwrap(), "localhost", None)
.unwrap()
.await
.unwrap()
},
async { endpoint.wait_incoming().await.unwrap().await.unwrap() },
);
let mut buf1 = [0u8; 64];
let mut buf2 = [0u8; 64];
conn1
.export_keying_material(&mut buf1, b"qaq", b"qwq")
.unwrap();
conn2
.export_keying_material(&mut buf2, b"qaq", b"qwq")
.unwrap();
assert_eq!(buf1, buf2);
drop(conn1);
drop(conn2);
endpoint.shutdown().await.unwrap();
}
#[compio::test]
async fn zero_rtt() {
let _guard = subscribe();
let endpoint = endpoint().await;
const MSG0: &[u8] = b"zero";
const MSG1: &[u8] = b"one";
join!(
async {
for _ in 0..2 {
let conn = endpoint
.wait_incoming()
.await
.unwrap()
.accept()
.unwrap()
.into_0rtt()
.unwrap();
join!(
async {
while let Ok(mut recv) = conn.accept_uni().await {
let (_, buf) = recv.read_to_end(vec![]).await.unwrap();
assert_eq!(buf, MSG0);
}
},
async {
let mut send = conn.open_uni().unwrap();
send.write_all(MSG0).await.unwrap();
send.finish().unwrap();
conn.accepted_0rtt().await.unwrap();
let mut send = conn.open_uni().unwrap();
send.write_all(MSG1).await.unwrap();
send.finish().unwrap();
},
);
}
},
async {
{
let conn = endpoint
.connect(endpoint.local_addr().unwrap(), "localhost", None)
.unwrap()
.into_0rtt()
.unwrap_err()
.await
.unwrap();
let mut recv = conn.accept_uni().await.unwrap();
let (_, mut buf) = recv.read_to_end(vec![]).await.expect("read_to_end");
assert_eq!(buf, MSG0);
buf.clear();
let mut recv = conn.accept_uni().await.unwrap();
let (_, buf) = recv.read_to_end(buf).await.expect("read_to_end");
assert_eq!(buf, MSG1);
}
let conn = endpoint
.connect(endpoint.local_addr().unwrap(), "localhost", None)
.unwrap()
.into_0rtt()
.unwrap();
let mut send = conn.open_uni().unwrap();
send.write_all(MSG0).await.unwrap();
send.finish().unwrap();
let mut recv = conn.accept_uni().await.unwrap();
let (_, mut buf) = recv.read_to_end(vec![]).await.expect("read_to_end");
assert_eq!(buf, MSG0);
assert!(conn.accepted_0rtt().await.unwrap());
buf.clear();
let mut recv = conn.accept_uni().await.unwrap();
let (_, buf) = recv.read_to_end(buf).await.expect("read_to_end");
assert_eq!(buf, MSG1);
},
);
endpoint.shutdown().await.unwrap();
}
#[compio::test]
async fn two_datagram_readers() {
let _guard = subscribe();
let endpoint = endpoint().await;
const MSG1: &[u8] = b"one";
const MSG2: &[u8] = b"two";
let (conn1, conn2) = join!(
async {
endpoint
.connect(endpoint.local_addr().unwrap(), "localhost", None)
.unwrap()
.await
.unwrap()
},
async { endpoint.wait_incoming().await.unwrap().await.unwrap() },
);
let (tx, rx) = flume::bounded::<()>(1);
let (a, b, _) = join!(
async {
let x = conn1.recv_datagram().await.unwrap();
let _ = tx.try_send(());
x
},
async {
let x = conn1.recv_datagram().await.unwrap();
let _ = tx.try_send(());
x
},
async {
conn2.send_datagram(MSG1.into()).unwrap();
rx.recv_async().await.unwrap();
conn2.send_datagram_wait(MSG2.into()).await.unwrap();
}
);
assert!(a == MSG1 || b == MSG1);
assert!(a == MSG2 || b == MSG2);
drop(conn1);
drop(conn2);
endpoint.shutdown().await.unwrap();
}
#[compio::test]
async fn try_recv_datagram() {
let _guard = subscribe();
let endpoint = endpoint().await;
const MSG1: &[u8] = b"one";
const MSG2: &[u8] = b"two";
let (conn1, conn2) = join!(
async {
endpoint
.connect(endpoint.local_addr().unwrap(), "localhost", None)
.unwrap()
.await
.unwrap()
},
async { endpoint.wait_incoming().await.unwrap().await.unwrap() },
);
conn1.send_datagram_wait(MSG1.into()).await.unwrap();
conn1.send_datagram_wait(MSG2.into()).await.unwrap();
assert_eq!(conn2.recv_datagram().await.unwrap(), MSG1);
assert_eq!(conn2.try_recv_datagram().unwrap().unwrap(), MSG2);
drop(conn1);
drop(conn2);
endpoint.shutdown().await.unwrap();
}