use datum::{
Keep, Sink, Source, StreamError,
io::{Compression, Framing},
testkit::TestSink,
};
use datum_net::quic::{
crypto::rustls::{QuicClientConfig, QuicServerConfig},
quinn,
rustls::{
ClientConfig as QuicRustlsClientConfig, RootCertStore as QuicRootCertStore,
ServerConfig as QuicRustlsServerConfig,
pki_types::{
CertificateDer as QuicCertificateDer, PrivateKeyDer as QuicPrivateKeyDer,
PrivatePkcs8KeyDer as QuicPrivatePkcs8KeyDer,
},
},
};
use datum_net::tls::rustls::{
ClientConfig, RootCertStore, ServerConfig,
pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName},
};
use datum_net::{ConnectionSettings, RetryPolicy, TokioQuic, TokioTls, TokioUdp};
use rcgen::{CertifiedKey, generate_simple_self_signed};
use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket as StdUdpSocket};
use std::sync::{Arc, mpsc};
use std::thread;
use std::time::{Duration, Instant};
const SMALL_CHUNK: usize = 3;
const UDP_DATAGRAM_SIZE: usize = 2048;
const UDP_RECEIVE_BUFFER: usize = 8;
fn wait_until(timeout: Duration, condition: impl Fn() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::sleep(Duration::from_millis(5));
}
condition()
}
fn assert_failed(error: StreamError) {
assert!(
matches!(
error,
StreamError::Failed(_) | StreamError::AbruptTermination | StreamError::Cancelled
),
"expected failure-shaped stream error, got {error:?}"
);
}
fn tls_configs() -> (Arc<ServerConfig>, Arc<ClientConfig>, ServerName<'static>) {
let CertifiedKey { cert, key_pair } =
generate_simple_self_signed(["localhost".to_owned()]).expect("self-signed cert");
let cert_der: CertificateDer<'static> = cert.der().clone();
let key_der = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_pair.serialize_der()));
let server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert_der.clone()], key_der)
.expect("server config");
let mut roots = RootCertStore::empty();
roots.add(cert_der).expect("trust self-signed cert");
let client_config = ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
(
Arc::new(server_config),
Arc::new(client_config),
ServerName::try_from("localhost")
.expect("server name")
.to_owned(),
)
}
fn quic_configs() -> (quinn::ServerConfig, quinn::ClientConfig) {
let CertifiedKey { cert, key_pair } =
generate_simple_self_signed(["localhost".to_owned()]).expect("self-signed cert");
let cert_der: QuicCertificateDer<'static> = cert.der().clone();
let key_der = QuicPrivateKeyDer::Pkcs8(QuicPrivatePkcs8KeyDer::from(key_pair.serialize_der()));
let server_crypto = QuicRustlsServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert_der.clone()], key_der)
.expect("server config");
let mut roots = QuicRootCertStore::empty();
roots.add(cert_der).expect("trust self-signed cert");
let client_crypto = QuicRustlsClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
(
quinn::ServerConfig::with_crypto(Arc::new(
QuicServerConfig::try_from(server_crypto).expect("QUIC server config"),
)),
quinn::ClientConfig::new(Arc::new(
QuicClientConfig::try_from(client_crypto).expect("QUIC client config"),
)),
)
}
fn accept_one(listener: TcpListener, timeout: Duration) -> TcpStream {
listener
.set_nonblocking(true)
.expect("set listener nonblocking");
let deadline = Instant::now() + timeout;
loop {
match listener.accept() {
Ok((stream, _)) => return stream,
Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
assert!(
Instant::now() < deadline,
"timed out waiting for TCP accept"
);
thread::sleep(Duration::from_millis(5));
}
Err(error) => panic!("TCP accept failed: {error}"),
}
}
}
fn can_bind_udp(addr: SocketAddr) -> bool {
StdUdpSocket::bind(addr).is_ok()
}
fn lifecycle_settings() -> ConnectionSettings {
ConnectionSettings::default()
.connect_timeout(Duration::from_millis(100))
.handshake_timeout(Duration::from_millis(100))
.retry_policy(
RetryPolicy::default()
.max_attempts(8)
.initial_backoff(Duration::from_millis(10))
.max_backoff(Duration::from_millis(40)),
)
}
#[test]
fn tls_delimiter_framing_preserves_frames_across_tls_chunks() {
let (server_config, client_config, server_name) = tls_configs();
let (binding_completion, incoming_completion) =
TokioTls::bind("127.0.0.1:0", server_config, SMALL_CHUNK)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("TLS bind source materializes");
let binding = binding_completion.wait().expect("TLS binding succeeds");
let (connection_completion, client_response) =
Source::from_iterable([b"alp".to_vec(), b"ha\nbe".to_vec(), b"ta\ngamma\n".to_vec()])
.via_mat(
TokioTls::outgoing_connection(
binding.local_addr(),
server_name,
client_config,
SMALL_CHUNK,
),
Keep::right,
)
.via(Framing::delimiter(b"\n".to_vec(), 64, false))
.to_mat(Sink::collect(), Keep::both)
.run()
.expect("framed TLS client stream materializes");
let incoming = incoming_completion
.wait()
.expect("server accepts TLS connection");
connection_completion
.wait()
.expect("client TLS connection completes");
let (server_source, server_sink) = incoming.into_parts();
let frames = server_source
.via(Framing::delimiter(b"\n".to_vec(), 64, false))
.take(3)
.run_with(Sink::collect())
.expect("server framed read materializes")
.wait()
.expect("server reads framed request");
assert_eq!(
frames,
vec![b"alpha".to_vec(), b"beta".to_vec(), b"gamma".to_vec()]
);
Source::from_iterable([b"ok:alpha\nok:".to_vec(), b"beta\nok:gamma\n".to_vec()])
.run_with(server_sink)
.expect("server framed response materializes")
.wait()
.expect("server writes framed response");
assert_eq!(
client_response
.wait()
.expect("client decodes framed response"),
vec![
b"ok:alpha".to_vec(),
b"ok:beta".to_vec(),
b"ok:gamma".to_vec()
]
);
}
#[test]
fn tls_compression_carries_gzip_payload_to_decompressing_peer() {
let (server_config, client_config, server_name) = tls_configs();
let (binding_completion, incoming_completion) =
TokioTls::bind("127.0.0.1:0", server_config, SMALL_CHUNK)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("TLS bind source materializes");
let binding = binding_completion.wait().expect("TLS binding succeeds");
let payload = b"gzip over TLS with enough bytes to cross several TLS read chunks".repeat(4);
let (connection_completion, mut client_probe) = Source::<Vec<u8>>::empty()
.via_mat(
TokioTls::outgoing_connection(
binding.local_addr(),
server_name,
client_config,
SMALL_CHUNK,
),
Keep::right,
)
.via(Compression::gunzip())
.to_mat(TestSink::probe(), Keep::both)
.run()
.expect("decompressing TLS client stream materializes");
client_probe.set_timeout(Duration::from_secs(3));
client_probe.request(128);
let incoming = incoming_completion
.wait()
.expect("server accepts TLS connection");
connection_completion
.wait()
.expect("client TLS connection completes");
let (_server_source, server_sink) = incoming.into_parts();
Source::from_iterable([
payload[..17].to_vec(),
payload[17..71].to_vec(),
payload[71..].to_vec(),
])
.via(Compression::gzip())
.run_with(server_sink)
.expect("server gzip response materializes")
.wait()
.expect("server writes compressed response");
let mut decoded = Vec::new();
while decoded.len() < payload.len() {
decoded.extend(client_probe.expect_next());
}
assert_eq!(decoded, payload);
}
#[test]
fn quic_bidirectional_stream_json_framing_round_trip() {
let (server_config, client_config) = quic_configs();
let (binding_completion, incoming_completion) =
TokioQuic::bind("127.0.0.1:0", server_config, SMALL_CHUNK)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("QUIC bind source materializes");
let binding = binding_completion.wait().expect("QUIC binding succeeds");
let client_connection = TokioQuic::connect(
binding.local_addr(),
"localhost",
client_config,
SMALL_CHUNK,
)
.run_with(Sink::head())
.expect("QUIC client connection source materializes")
.wait()
.expect("QUIC client connects");
let incoming = incoming_completion
.wait()
.expect("server accepts QUIC connection");
let (stream_completion, client_response) = Source::from_iterable([
br#"[{"id":1,"payload":"al"#.to_vec(),
br#"pha"},{"id":2,"payload":"beta"}]"#.to_vec(),
])
.via_mat(client_connection.open_bi(SMALL_CHUNK), Keep::right)
.via(Framing::json(128))
.to_mat(Sink::collect(), Keep::both)
.run()
.expect("QUIC JSON-framed stream materializes");
let accepted_stream = incoming
.accept_bi(SMALL_CHUNK)
.run_with(Sink::head())
.expect("server accept_bi source materializes")
.wait()
.expect("server accepts QUIC bi stream");
let (server_source, server_sink) = accepted_stream.into_parts();
let frames = server_source
.via(Framing::json(128))
.take(2)
.run_with(Sink::collect())
.expect("server JSON framing materializes")
.wait()
.expect("server reads JSON frames");
assert_eq!(
frames,
vec![
br#"{"id":1,"payload":"alpha"}"#.to_vec(),
br#"{"id":2,"payload":"beta"}"#.to_vec()
]
);
Source::from_iterable([
br#"{"ok":true,"id":1}"#.to_vec(),
br#"{"ok":true,"id":2}"#.to_vec(),
])
.run_with(server_sink)
.expect("server JSON response materializes")
.wait()
.expect("server writes JSON response");
assert_eq!(
client_response.wait().expect("client receives JSON frames"),
vec![
br#"{"ok":true,"id":1}"#.to_vec(),
br#"{"ok":true,"id":2}"#.to_vec()
]
);
stream_completion.wait().expect("QUIC stream opens");
}
#[test]
fn udp_payloads_flow_through_core_map_filter_and_drop_releases_socket() {
let (binding_completion, mut probe) =
TokioUdp::bind("127.0.0.1:0", UDP_DATAGRAM_SIZE, UDP_RECEIVE_BUFFER)
.map(|datagram| datagram.into_payload())
.filter(|payload| payload.starts_with(b"keep"))
.to_mat(TestSink::probe(), Keep::both)
.run()
.expect("UDP bind source with core operators materializes");
probe.set_timeout(Duration::from_secs(3));
probe.request(2);
let binding = binding_completion.wait().expect("UDP bind succeeds");
let sender = StdUdpSocket::bind("127.0.0.1:0").expect("sender UDP socket");
for payload in [b"drop-one".as_slice(), b"keep-one", b"keep-two"] {
sender
.send_to(payload, binding.local_addr())
.expect("send UDP datagram");
}
let mut received = vec![probe.expect_next(), probe.expect_next()];
received.sort();
assert_eq!(received, vec![b"keep-one".to_vec(), b"keep-two".to_vec()]);
drop(probe);
assert!(
wait_until(Duration::from_secs(3), || can_bind_udp(
binding.local_addr()
)),
"dropping composed UDP stream should release the bound socket"
);
}
#[test]
fn lifecycle_tls_retry_then_carries_framed_data() {
let (server_config, client_config, server_name) = tls_configs();
let dummy_listener = TcpListener::bind("127.0.0.1:0").expect("dummy TCP listener");
let addr = dummy_listener.local_addr().expect("dummy TCP addr");
let (server_done_sender, server_done_receiver) = mpsc::channel();
let server = thread::spawn(move || {
let first_attempt = accept_one(dummy_listener, Duration::from_secs(3));
drop(first_attempt);
let (binding_completion, incoming_completion) =
TokioTls::bind(addr, server_config, SMALL_CHUNK)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("real TLS server materializes");
binding_completion
.wait()
.expect("real TLS server binds after first failure");
let incoming = incoming_completion
.wait()
.expect("retry accepts TLS connection");
let (source, sink) = incoming.into_parts();
let request = source
.via(Framing::delimiter(b"\n".to_vec(), 64, false))
.run_with(Sink::head())
.expect("server framed read materializes")
.wait()
.expect("server reads framed request");
assert_eq!(request, b"retry-frame".to_vec());
Source::single(b"retry-ok\n".to_vec())
.run_with(sink)
.expect("server framed echo materializes")
.wait()
.expect("server framed echo completes");
server_done_sender.send(()).expect("send server done");
});
let (connection_completion, response_completion) = Source::single(b"retry-frame\n".to_vec())
.via_mat(
TokioTls::outgoing_connection_with_lifecycle(
addr,
server_name,
client_config,
lifecycle_settings(),
),
Keep::right,
)
.via(Framing::delimiter(b"\n".to_vec(), 64, false))
.to_mat(Sink::head(), Keep::both)
.run()
.expect("lifecycle client stream materializes");
let connection = connection_completion
.wait()
.expect("client connects after retry");
assert_eq!(connection.remote_addr(), addr);
assert_eq!(
response_completion
.wait()
.expect("client receives framed response"),
b"retry-ok".to_vec()
);
server_done_receiver
.recv_timeout(Duration::from_secs(3))
.expect("server finishes");
server.join().expect("server thread joins");
}
#[test]
fn peer_drop_mid_tls_framed_stream_surfaces_stream_error() {
let (server_config, client_config, server_name) = tls_configs();
let (binding_completion, incoming_completion) =
TokioTls::bind("127.0.0.1:0", server_config, SMALL_CHUNK)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("TLS bind source materializes");
let binding = binding_completion.wait().expect("TLS binding succeeds");
let (connection_completion, client_response) = Source::single(b"request\n".to_vec())
.via_mat(
TokioTls::outgoing_connection(
binding.local_addr(),
server_name,
client_config,
SMALL_CHUNK,
),
Keep::right,
)
.via(Framing::delimiter(b"\n".to_vec(), 64, false))
.to_mat(Sink::head(), Keep::both)
.run()
.expect("framed TLS client stream materializes");
let incoming = incoming_completion
.wait()
.expect("server accepts TLS connection");
connection_completion
.wait()
.expect("client TLS connection completes");
drop(incoming);
let error = client_response
.wait()
.expect_err("peer drop should fail the framed response stream");
assert_failed(error);
}