use std::{sync::Arc, time::Duration};
use dquic::{
prelude::{handy::*, *},
qbase::param::{ClientParameters, ServerParameters},
qinterface::{bind_uri::BindUri, component::route::QuicRouter},
qresolve::Source,
};
use tokio::task::JoinSet;
use tokio_util::task::AbortOnDropHandle;
use tracing::Instrument;
mod common;
use common::*;
mod echo_common;
use echo_common::*;
#[test]
fn single_stream() -> Result<(), BoxError> {
run(async {
let router = Arc::new(QuicRouter::default());
let (listeners, server_task) =
launch_echo_server(router.clone(), server_parameters()).await?;
let _server_task = AbortOnDropHandle::new(tokio::spawn(server_task));
let server_addr = get_server_addr(&listeners);
let client = launch_test_client(router, client_parameters());
let connection = client
.connected_to_with_source("localhost", [(Source::System, server_addr.into())])
.await?;
send_and_verify_echo(&connection, TEST_DATA).await?;
listeners.shutdown();
Ok(())
})
}
#[test]
fn signal_big_stream() -> Result<(), BoxError> {
run(async {
let router = Arc::new(QuicRouter::default());
let (listeners, server_task) =
launch_echo_server(router.clone(), server_parameters()).await?;
let _server_task = AbortOnDropHandle::new(tokio::spawn(server_task));
let server_addr = get_server_addr(&listeners);
let client = launch_test_client(router, client_parameters());
let connection = client
.connected_to_with_source("localhost", [(Source::System, server_addr.into())])
.await?;
send_and_verify_echo(&connection, &TEST_DATA.to_vec().repeat(16)).await?;
listeners.shutdown();
Ok(())
})
}
#[test]
fn empty_stream() -> Result<(), BoxError> {
run(async {
let router = Arc::new(QuicRouter::default());
let (listeners, server_task) =
launch_echo_server(router.clone(), server_parameters()).await?;
let _server_task = AbortOnDropHandle::new(tokio::spawn(server_task));
let server_addr = get_server_addr(&listeners);
let client = launch_test_client(router, client_parameters());
let connection = client
.connected_to_with_source("localhost", [(Source::System, server_addr.into())])
.await?;
send_and_verify_echo(&connection, b"").await?;
listeners.shutdown();
Ok(())
})
}
#[test]
fn shutdown() -> Result<(), BoxError> {
run(async {
async fn serve_only_one_stream(listeners: Arc<QuicListeners>) {
while let Ok((connection, server, pathway, _link)) = listeners.accept().await {
assert_eq!(server, "localhost");
tracing::info!(source = ?pathway.remote(), "accepted new connection");
tokio::spawn(async move {
let (_sid, (reader, writer)) = connection.accept_bi_stream().await?;
echo_stream(reader, writer).await;
_ = connection.close("Bye bye", 0);
Result::<(), BoxError>::Ok(())
});
}
}
let router = Arc::new(QuicRouter::default());
let listeners = QuicListeners::builder()
.with_router(router.clone())
.without_client_cert_verifier()
.with_parameters(server_parameters())
.with_qlog(qlogger())
.listen(128)?;
listeners
.add_server(
"localhost",
SERVER_CERT,
SERVER_KEY,
[BindUri::from("inet://127.0.0.1:0").alloc_port()],
None,
)
.await?;
let server_task = serve_only_one_stream(listeners.clone());
let _server_task = AbortOnDropHandle::new(tokio::spawn(server_task));
let server_addr = get_server_addr(&listeners);
let client = launch_test_client(router, client_parameters());
let connection = client
.connected_to_with_source("localhost", [(Source::System, server_addr.into())])
.await?;
_ = connection.handshaked().await;
assert!(
send_and_verify_echo(&connection, b"").await.is_err()
|| send_and_verify_echo(&connection, b"").await.is_err()
);
connection.terminated().await;
listeners.shutdown();
Ok(())
})
}
#[test]
fn idle_timeout() -> Result<(), BoxError> {
run(async {
fn server_parameters() -> ServerParameters {
let mut params = handy::server_parameters();
params
.set(ParameterId::MaxIdleTimeout, Duration::from_secs(1))
.expect("unreachable");
params
}
let router = Arc::new(QuicRouter::default());
let (listeners, server_task) =
launch_echo_server(router.clone(), server_parameters()).await?;
let _server_task = AbortOnDropHandle::new(tokio::spawn(server_task));
let server_addr = get_server_addr(&listeners);
let client = launch_test_client(router, client_parameters());
let connection = client
.connected_to_with_source("localhost", [(Source::System, server_addr.into())])
.await?;
connection.terminated().await;
listeners.shutdown();
Ok(())
})
}
#[test]
fn double_connections() -> Result<(), BoxError> {
run(async {
fn client_parameters() -> ClientParameters {
let mut params = handy::client_parameters();
params
.set(ParameterId::MaxIdleTimeout, Duration::from_secs(60))
.expect("unreachable");
params
}
fn server_parameters() -> ServerParameters {
let mut params = handy::server_parameters();
params
.set(ParameterId::MaxIdleTimeout, Duration::from_secs(60))
.expect("unreachable");
params
}
let router = Arc::new(QuicRouter::default());
let (listeners, server_task) =
launch_echo_server(router.clone(), server_parameters()).await?;
let _server_task = AbortOnDropHandle::new(tokio::spawn(server_task));
let server_addr = get_server_addr(&listeners);
let client = launch_test_client(router, client_parameters());
let mut connections = JoinSet::new();
for conn_idx in 0..2 {
let connection = client
.connected_to_with_source("localhost", [(Source::System, server_addr.into())])
.await?;
connections.spawn(
async move { send_and_verify_echo(&connection, TEST_DATA).await }
.instrument(tracing::info_span!("stream", conn_idx)),
);
}
connections
.join_all()
.await
.into_iter()
.collect::<Result<(), BoxError>>()?;
listeners.shutdown();
Ok(())
})
}
const PARALLEL_ECHO_CONNS: usize = 3;
const PARALLEL_ECHO_STREAMS: usize = 2;
#[test]
fn parallel_stream() -> Result<(), BoxError> {
run(async {
fn client_parameters() -> ClientParameters {
let mut params = handy::client_parameters();
params
.set(ParameterId::MaxIdleTimeout, Duration::from_secs(60))
.expect("unreachable");
params
}
fn server_parameters() -> ServerParameters {
let mut params = handy::server_parameters();
params
.set(ParameterId::MaxIdleTimeout, Duration::from_secs(60))
.expect("unreachable");
params
}
let router = Arc::new(QuicRouter::default());
let (listeners, server_task) =
launch_echo_server(router.clone(), server_parameters()).await?;
let _server_task = AbortOnDropHandle::new(tokio::spawn(server_task));
let server_addr = get_server_addr(&listeners);
let client = launch_test_client(router, client_parameters());
let mut streams = JoinSet::new();
for conn_idx in 0..PARALLEL_ECHO_CONNS {
tracing::info!(conn_idx, "Starting connection");
let connection = Arc::new(
client
.connected_to_with_source("localhost", [(Source::System, server_addr.into())])
.await?,
);
tracing::info!(conn_idx, "Connected");
for stream_idx in 0..PARALLEL_ECHO_STREAMS {
let connection = connection.clone();
streams.spawn(
async move { send_and_verify_echo(&connection, TEST_DATA).await }
.instrument(tracing::info_span!("stream", conn_idx, stream_idx)),
);
}
}
streams
.join_all()
.await
.into_iter()
.collect::<Result<(), BoxError>>()?;
listeners.shutdown();
Ok(())
})
}
#[test]
fn parallel_big_stream() -> Result<(), BoxError> {
run(async {
fn client_parameters() -> ClientParameters {
let mut params = handy::client_parameters();
params
.set(ParameterId::MaxIdleTimeout, Duration::from_secs(60))
.expect("unreachable");
params
}
fn server_parameters() -> ServerParameters {
let mut params = handy::server_parameters();
params
.set(ParameterId::MaxIdleTimeout, Duration::from_secs(60))
.expect("unreachable");
params
}
let router = Arc::new(QuicRouter::default());
let (listeners, server_task) =
launch_echo_server(router.clone(), server_parameters()).await?;
let _server_task = AbortOnDropHandle::new(tokio::spawn(server_task));
let server_addr = get_server_addr(&listeners);
let client = launch_test_client(router, client_parameters());
let mut big_streams = JoinSet::new();
let test_data = Arc::new(TEST_DATA.to_vec().repeat(4));
for conn_idx in 0..PARALLEL_ECHO_CONNS {
let connection = client
.connected_to_with_source("localhost", [(Source::System, server_addr.into())])
.await?;
let test_data = test_data.clone();
big_streams.spawn(
async move { send_and_verify_echo(&connection, &test_data).await }
.instrument(tracing::info_span!("stream", conn_idx)),
);
}
big_streams
.join_all()
.await
.into_iter()
.collect::<Result<(), BoxError>>()?;
listeners.shutdown();
Ok(())
})
}
#[test]
fn limited_streams() -> Result<(), BoxError> {
run(async {
pub fn client_parameters() -> ClientParameters {
let mut params = ClientParameters::default();
for (id, value) in [
(ParameterId::InitialMaxStreamsBidi, 2u32),
(ParameterId::InitialMaxStreamsUni, 0u32),
(ParameterId::InitialMaxData, 1u32 << 10),
(ParameterId::InitialMaxStreamDataBidiLocal, 1u32 << 10),
(ParameterId::InitialMaxStreamDataBidiRemote, 1u32 << 10),
(ParameterId::InitialMaxStreamDataUni, 1u32 << 10),
] {
params.set(id, value).expect("unreachable");
}
params
}
pub fn server_parameters() -> ServerParameters {
let mut params = ServerParameters::default();
for (id, value) in [
(ParameterId::InitialMaxStreamsBidi, 2u32),
(ParameterId::InitialMaxStreamsUni, 2u32),
(ParameterId::InitialMaxData, 1u32 << 20),
(ParameterId::InitialMaxStreamDataBidiLocal, 1u32 << 10),
(ParameterId::InitialMaxStreamDataBidiRemote, 1u32 << 10),
(ParameterId::InitialMaxStreamDataUni, 1u32 << 10),
] {
params.set(id, value).expect("unreachable");
}
params
.set(ParameterId::MaxIdleTimeout, Duration::from_secs(30))
.expect("unreachable");
params
}
let router = Arc::new(QuicRouter::default());
let (listeners, server_task) =
launch_echo_server(router.clone(), server_parameters()).await?;
let _server_task = AbortOnDropHandle::new(tokio::spawn(server_task));
let server_addr = get_server_addr(&listeners);
let client = launch_test_client(router, client_parameters());
let mut streams = JoinSet::new();
for conn_idx in 0..PARALLEL_ECHO_CONNS / 2 {
let connection = Arc::new(
client
.connected_to_with_source("localhost", [(Source::System, server_addr.into())])
.await?,
);
for stream_idx in 0..PARALLEL_ECHO_STREAMS / 2 {
let connection = connection.clone();
streams.spawn(
async move { send_and_verify_echo(&connection, TEST_DATA).await }
.instrument(tracing::info_span!("stream", conn_idx, stream_idx)),
);
}
}
streams
.join_all()
.await
.into_iter()
.collect::<Result<(), BoxError>>()?;
listeners.shutdown();
Ok(())
})
}