use datum::{
Keep, Sink, Source, StreamCompletion, StreamError,
testkit::{TestSink, TestSource},
};
use datum_net::quic::{
crypto::rustls::{QuicClientConfig, QuicServerConfig},
quinn,
rustls::{
ClientConfig as RustlsClientConfig, RootCertStore, ServerConfig as RustlsServerConfig,
pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer},
},
};
use datum_net::{QuicIncomingConnection, TokioQuic};
use rcgen::{CertifiedKey, generate_simple_self_signed};
use std::net::{SocketAddr, UdpSocket as StdUdpSocket};
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use std::thread;
use std::time::{Duration, Instant};
const CHUNK_SIZE: usize = 8192;
fn wait_until(timeout: Duration, condition: impl Fn() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::sleep(Duration::from_millis(5));
}
condition()
}
fn assert_failed(error: StreamError) {
assert!(
matches!(
error,
StreamError::Failed(_) | StreamError::AbruptTermination | StreamError::Cancelled
),
"expected QUIC failure-shaped stream error, got {error:?}"
);
}
fn can_bind_udp(addr: SocketAddr) -> bool {
StdUdpSocket::bind(addr).is_ok()
}
fn quic_configs() -> (
quinn::ServerConfig,
quinn::ClientConfig,
quinn::ClientConfig,
) {
let CertifiedKey { cert, key_pair } =
generate_simple_self_signed(["localhost".to_owned()]).expect("self-signed cert");
let cert_der: CertificateDer<'static> = cert.der().clone();
let key_der = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_pair.serialize_der()));
let server_crypto = RustlsServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert_der.clone()], key_der)
.expect("server config");
let mut roots = RootCertStore::empty();
roots.add(cert_der).expect("trust self-signed cert");
let trusted_client_crypto = RustlsClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
let untrusted_client_crypto = RustlsClientConfig::builder()
.with_root_certificates(RootCertStore::empty())
.with_no_client_auth();
(
quinn::ServerConfig::with_crypto(Arc::new(
QuicServerConfig::try_from(server_crypto).expect("QUIC server config"),
)),
quinn::ClientConfig::new(Arc::new(
QuicClientConfig::try_from(trusted_client_crypto).expect("QUIC client config"),
)),
quinn::ClientConfig::new(Arc::new(
QuicClientConfig::try_from(untrusted_client_crypto).expect("QUIC client config"),
)),
)
}
#[test]
fn quic_bind_connect_and_bidirectional_stream_round_trip() {
let (server_config, client_config, _untrusted) = quic_configs();
let (binding_completion, incoming_completion) =
TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("QUIC bind source materializes");
let binding = binding_completion.wait().expect("QUIC binding succeeds");
let client_connection =
TokioQuic::connect(binding.local_addr(), "localhost", client_config, CHUNK_SIZE)
.run_with(Sink::head())
.expect("QUIC client connection source materializes")
.wait()
.expect("QUIC client connects");
assert_eq!(client_connection.remote_addr(), binding.local_addr());
let incoming = incoming_completion
.wait()
.expect("incoming QUIC connection accepted");
assert_eq!(incoming.remote_addr(), client_connection.local_addr());
let (stream_completion, client_response) = Source::single(b"ping".to_vec())
.via_mat(client_connection.open_bi_default(), Keep::right)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("QUIC client bi stream materializes");
let accepted_stream = incoming
.accept_bi_default()
.run_with(Sink::head())
.expect("server accept_bi source materializes")
.wait()
.expect("server accepts QUIC bi stream");
let (server_source, server_sink) = accepted_stream.into_parts();
let server_read = server_source
.run_with(Sink::head())
.expect("server read materializes")
.wait()
.expect("server reads request");
assert_eq!(server_read, b"ping".to_vec());
Source::single(server_read)
.run_with(server_sink)
.expect("server write materializes")
.wait()
.expect("server write completes");
assert_eq!(
client_response.wait().expect("client receives echo"),
b"ping".to_vec()
);
stream_completion
.wait()
.expect("client QUIC stream materializes");
}
#[test]
fn quic_handshake_failure_surfaces_as_stream_error() {
let (server_config, _trusted, untrusted_client_config) = quic_configs();
let (binding_completion, incoming_completion) =
TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("QUIC bind source materializes");
let binding = binding_completion.wait().expect("QUIC binding succeeds");
let (connection_completion, client_completion) = TokioQuic::connect(
binding.local_addr(),
"localhost",
untrusted_client_config,
CHUNK_SIZE,
)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("failed QUIC client source materializes");
assert_failed(
connection_completion
.wait()
.expect_err("client connection materialization fails"),
);
assert_failed(
client_completion
.wait()
.expect_err("client stream fails without panic"),
);
assert_failed(match incoming_completion.wait() {
Ok(_) => panic!("server incoming stream should see handshake failure"),
Err(error) => error,
});
}
#[test]
fn dropping_quic_connection_and_stream_tears_down_endpoints() {
let (server_config, client_config, _untrusted) = quic_configs();
let (binding_completion, incoming_completion) =
TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("QUIC bind source materializes");
let binding = binding_completion.wait().expect("QUIC binding succeeds");
let server_addr = binding.local_addr();
let client_connection = TokioQuic::connect(server_addr, "localhost", client_config, CHUNK_SIZE)
.run_with(Sink::head())
.expect("QUIC client connection source materializes")
.wait()
.expect("QUIC client connects");
let client_addr = client_connection.local_addr();
let incoming = incoming_completion
.wait()
.expect("incoming QUIC connection accepted");
let ((publisher, stream_completion), _probe) = TestSource::probe::<Vec<u8>>()
.via_mat(client_connection.open_bi_default(), Keep::both)
.to_mat(TestSink::probe(), Keep::both)
.run()
.expect("client QUIC stream materializes");
_probe.request(1);
publisher.expect_request();
publisher.send_next(b"cancel".to_vec());
let accepted_stream = incoming
.accept_bi_default()
.run_with(Sink::head())
.expect("server accept_bi source materializes")
.wait()
.expect("server accepts client stream");
drop(accepted_stream);
drop(stream_completion);
drop(publisher);
drop(incoming);
drop(client_connection);
assert!(
wait_until(Duration::from_secs(3), || can_bind_udp(client_addr)),
"dropping QUIC client connection should close the client UDP socket"
);
assert!(
wait_until(Duration::from_secs(3), || can_bind_udp(server_addr)),
"dropping accepted connection and bind source should close the server UDP socket"
);
}
#[test]
fn slow_quic_consumer_backpressures_writer() {
let (server_config, client_config, _untrusted) = quic_configs();
let (binding_completion, incoming_completion) =
TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("QUIC bind source materializes");
let binding = binding_completion.wait().expect("QUIC binding succeeds");
let client_connection =
TokioQuic::connect(binding.local_addr(), "localhost", client_config, CHUNK_SIZE)
.run_with(Sink::head())
.expect("QUIC client connection source materializes")
.wait()
.expect("QUIC client connects");
let incoming: QuicIncomingConnection = incoming_completion
.wait()
.expect("incoming QUIC connection accepted");
let (stream_completion, probe) = Source::<Vec<u8>>::empty()
.via_mat(client_connection.open_bi_default(), Keep::right)
.to_mat(TestSink::probe(), Keep::both)
.run()
.expect("client QUIC stream probe materializes");
probe.request(1);
stream_completion.wait().expect("client QUIC stream opens");
let accepted_stream = incoming
.accept_bi_default()
.run_with(Sink::head())
.expect("server accept_bi source materializes")
.wait()
.expect("server accepts QUIC bi stream");
let (_server_source, server_sink) = accepted_stream.into_parts();
let produced = Arc::new(AtomicUsize::new(0));
let total_chunks = 8192usize;
let write_completion: StreamCompletion<datum::NotUsed> = Source::unfold(0usize, {
let produced = Arc::clone(&produced);
move |index| {
if index == total_chunks {
None
} else {
produced.fetch_add(1, Ordering::SeqCst);
Some((index + 1, vec![b'x'; CHUNK_SIZE]))
}
}
})
.run_with(server_sink)
.expect("server QUIC write materializes");
let first = probe.expect_next();
assert_eq!(first.len(), CHUNK_SIZE);
assert!(
wait_until(Duration::from_secs(1), || produced.load(Ordering::SeqCst)
> 1),
"server writer should begin producing after first downstream demand"
);
assert!(
!wait_until(Duration::from_millis(250), || {
produced.load(Ordering::SeqCst) == total_chunks
}),
"withheld downstream demand should prevent the QUIC writer from consuming all chunks"
);
for _ in 1..total_chunks {
probe.request(1);
assert_eq!(probe.expect_next().len(), CHUNK_SIZE);
}
probe.request(1);
probe.expect_complete();
write_completion
.wait()
.expect("server QUIC write completes after downstream drains");
}