datum-net 0.9.0

Network sources and sinks for Datum streams, built on datum-core
Documentation
use datum::{
    Keep, Sink, Source, StreamCompletion, StreamError,
    testkit::{TestSink, TestSource},
};
use datum_net::quic::{
    crypto::rustls::{QuicClientConfig, QuicServerConfig},
    quinn,
    rustls::{
        ClientConfig as RustlsClientConfig, RootCertStore, ServerConfig as RustlsServerConfig,
        pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer},
    },
};
use datum_net::{QuicIncomingConnection, TokioQuic};
use rcgen::{CertifiedKey, generate_simple_self_signed};
use std::net::{SocketAddr, UdpSocket as StdUdpSocket};
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};
use std::thread;
use std::time::{Duration, Instant};

const CHUNK_SIZE: usize = 8192;

fn wait_until(timeout: Duration, condition: impl Fn() -> bool) -> bool {
    let deadline = Instant::now() + timeout;
    while Instant::now() < deadline {
        if condition() {
            return true;
        }
        thread::sleep(Duration::from_millis(5));
    }
    condition()
}

fn assert_failed(error: StreamError) {
    assert!(
        matches!(
            error,
            StreamError::Failed(_) | StreamError::AbruptTermination | StreamError::Cancelled
        ),
        "expected QUIC failure-shaped stream error, got {error:?}"
    );
}

fn can_bind_udp(addr: SocketAddr) -> bool {
    StdUdpSocket::bind(addr).is_ok()
}

fn quic_configs() -> (
    quinn::ServerConfig,
    quinn::ClientConfig,
    quinn::ClientConfig,
) {
    let CertifiedKey { cert, key_pair } =
        generate_simple_self_signed(["localhost".to_owned()]).expect("self-signed cert");
    let cert_der: CertificateDer<'static> = cert.der().clone();
    let key_der = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_pair.serialize_der()));
    let server_crypto = RustlsServerConfig::builder()
        .with_no_client_auth()
        .with_single_cert(vec![cert_der.clone()], key_der)
        .expect("server config");

    let mut roots = RootCertStore::empty();
    roots.add(cert_der).expect("trust self-signed cert");
    let trusted_client_crypto = RustlsClientConfig::builder()
        .with_root_certificates(roots)
        .with_no_client_auth();
    let untrusted_client_crypto = RustlsClientConfig::builder()
        .with_root_certificates(RootCertStore::empty())
        .with_no_client_auth();

    (
        quinn::ServerConfig::with_crypto(Arc::new(
            QuicServerConfig::try_from(server_crypto).expect("QUIC server config"),
        )),
        quinn::ClientConfig::new(Arc::new(
            QuicClientConfig::try_from(trusted_client_crypto).expect("QUIC client config"),
        )),
        quinn::ClientConfig::new(Arc::new(
            QuicClientConfig::try_from(untrusted_client_crypto).expect("QUIC client config"),
        )),
    )
}

#[test]
fn quic_bind_connect_and_bidirectional_stream_round_trip() {
    let (server_config, client_config, _untrusted) = quic_configs();
    let (binding_completion, incoming_completion) =
        TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
            .to_mat(Sink::head(), Keep::both)
            .run()
            .expect("QUIC bind source materializes");
    let binding = binding_completion.wait().expect("QUIC binding succeeds");

    let client_connection =
        TokioQuic::connect(binding.local_addr(), "localhost", client_config, CHUNK_SIZE)
            .run_with(Sink::head())
            .expect("QUIC client connection source materializes")
            .wait()
            .expect("QUIC client connects");
    assert_eq!(client_connection.remote_addr(), binding.local_addr());

    let incoming = incoming_completion
        .wait()
        .expect("incoming QUIC connection accepted");
    assert_eq!(incoming.remote_addr(), client_connection.local_addr());

    let (stream_completion, client_response) = Source::single(b"ping".to_vec())
        .via_mat(client_connection.open_bi_default(), Keep::right)
        .to_mat(Sink::head(), Keep::both)
        .run()
        .expect("QUIC client bi stream materializes");

    let accepted_stream = incoming
        .accept_bi_default()
        .run_with(Sink::head())
        .expect("server accept_bi source materializes")
        .wait()
        .expect("server accepts QUIC bi stream");
    let (server_source, server_sink) = accepted_stream.into_parts();
    let server_read = server_source
        .run_with(Sink::head())
        .expect("server read materializes")
        .wait()
        .expect("server reads request");
    assert_eq!(server_read, b"ping".to_vec());

    Source::single(server_read)
        .run_with(server_sink)
        .expect("server write materializes")
        .wait()
        .expect("server write completes");

    assert_eq!(
        client_response.wait().expect("client receives echo"),
        b"ping".to_vec()
    );
    stream_completion
        .wait()
        .expect("client QUIC stream materializes");
}

#[test]
fn quic_handshake_failure_surfaces_as_stream_error() {
    let (server_config, _trusted, untrusted_client_config) = quic_configs();
    let (binding_completion, incoming_completion) =
        TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
            .to_mat(Sink::head(), Keep::both)
            .run()
            .expect("QUIC bind source materializes");
    let binding = binding_completion.wait().expect("QUIC binding succeeds");

    let (connection_completion, client_completion) = TokioQuic::connect(
        binding.local_addr(),
        "localhost",
        untrusted_client_config,
        CHUNK_SIZE,
    )
    .to_mat(Sink::head(), Keep::both)
    .run()
    .expect("failed QUIC client source materializes");

    assert_failed(
        connection_completion
            .wait()
            .expect_err("client connection materialization fails"),
    );
    assert_failed(
        client_completion
            .wait()
            .expect_err("client stream fails without panic"),
    );
    assert_failed(match incoming_completion.wait() {
        Ok(_) => panic!("server incoming stream should see handshake failure"),
        Err(error) => error,
    });
}

#[test]
fn dropping_quic_connection_and_stream_tears_down_endpoints() {
    let (server_config, client_config, _untrusted) = quic_configs();
    let (binding_completion, incoming_completion) =
        TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
            .to_mat(Sink::head(), Keep::both)
            .run()
            .expect("QUIC bind source materializes");
    let binding = binding_completion.wait().expect("QUIC binding succeeds");
    let server_addr = binding.local_addr();

    let client_connection = TokioQuic::connect(server_addr, "localhost", client_config, CHUNK_SIZE)
        .run_with(Sink::head())
        .expect("QUIC client connection source materializes")
        .wait()
        .expect("QUIC client connects");
    let client_addr = client_connection.local_addr();
    let incoming = incoming_completion
        .wait()
        .expect("incoming QUIC connection accepted");

    let ((publisher, stream_completion), _probe) = TestSource::probe::<Vec<u8>>()
        .via_mat(client_connection.open_bi_default(), Keep::both)
        .to_mat(TestSink::probe(), Keep::both)
        .run()
        .expect("client QUIC stream materializes");
    _probe.request(1);
    publisher.expect_request();
    publisher.send_next(b"cancel".to_vec());
    let accepted_stream = incoming
        .accept_bi_default()
        .run_with(Sink::head())
        .expect("server accept_bi source materializes")
        .wait()
        .expect("server accepts client stream");

    drop(accepted_stream);
    drop(stream_completion);
    drop(publisher);
    drop(incoming);
    drop(client_connection);

    assert!(
        wait_until(Duration::from_secs(3), || can_bind_udp(client_addr)),
        "dropping QUIC client connection should close the client UDP socket"
    );
    assert!(
        wait_until(Duration::from_secs(3), || can_bind_udp(server_addr)),
        "dropping accepted connection and bind source should close the server UDP socket"
    );
}

#[test]
fn slow_quic_consumer_backpressures_writer() {
    let (server_config, client_config, _untrusted) = quic_configs();
    let (binding_completion, incoming_completion) =
        TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
            .to_mat(Sink::head(), Keep::both)
            .run()
            .expect("QUIC bind source materializes");
    let binding = binding_completion.wait().expect("QUIC binding succeeds");

    let client_connection =
        TokioQuic::connect(binding.local_addr(), "localhost", client_config, CHUNK_SIZE)
            .run_with(Sink::head())
            .expect("QUIC client connection source materializes")
            .wait()
            .expect("QUIC client connects");
    let incoming: QuicIncomingConnection = incoming_completion
        .wait()
        .expect("incoming QUIC connection accepted");

    let (stream_completion, probe) = Source::<Vec<u8>>::empty()
        .via_mat(client_connection.open_bi_default(), Keep::right)
        .to_mat(TestSink::probe(), Keep::both)
        .run()
        .expect("client QUIC stream probe materializes");
    probe.request(1);
    stream_completion.wait().expect("client QUIC stream opens");

    let accepted_stream = incoming
        .accept_bi_default()
        .run_with(Sink::head())
        .expect("server accept_bi source materializes")
        .wait()
        .expect("server accepts QUIC bi stream");
    let (_server_source, server_sink) = accepted_stream.into_parts();

    let produced = Arc::new(AtomicUsize::new(0));
    let total_chunks = 8192usize;
    let write_completion: StreamCompletion<datum::NotUsed> = Source::unfold(0usize, {
        let produced = Arc::clone(&produced);
        move |index| {
            if index == total_chunks {
                None
            } else {
                produced.fetch_add(1, Ordering::SeqCst);
                Some((index + 1, vec![b'x'; CHUNK_SIZE]))
            }
        }
    })
    .run_with(server_sink)
    .expect("server QUIC write materializes");

    let first = probe.expect_next();
    assert_eq!(first.len(), CHUNK_SIZE);
    assert!(
        wait_until(Duration::from_secs(1), || produced.load(Ordering::SeqCst)
            > 1),
        "server writer should begin producing after first downstream demand"
    );
    assert!(
        !wait_until(Duration::from_millis(250), || {
            produced.load(Ordering::SeqCst) == total_chunks
        }),
        "withheld downstream demand should prevent the QUIC writer from consuming all chunks"
    );

    for _ in 1..total_chunks {
        probe.request(1);
        assert_eq!(probe.expect_next().len(), CHUNK_SIZE);
    }
    probe.request(1);
    probe.expect_complete();
    write_completion
        .wait()
        .expect("server QUIC write completes after downstream drains");
}