datum-net 0.6.0

Network sources and sinks for Datum streams, built on datum-core
Documentation
use datum::{
    Keep, Sink, Source, StreamError,
    testkit::{TestSink, TestSource},
};
use datum_net::{Datagram, TokioUdp};
use std::net::{SocketAddr, UdpSocket as StdUdpSocket};
use std::thread;
use std::time::{Duration, Instant};

const DATAGRAM_SIZE: usize = 2048;
const RECEIVE_BUFFER: usize = 8;

fn wait_until(timeout: Duration, condition: impl Fn() -> bool) -> bool {
    let deadline = Instant::now() + timeout;
    while Instant::now() < deadline {
        if condition() {
            return true;
        }
        thread::sleep(Duration::from_millis(5));
    }
    condition()
}

fn assert_failed(error: StreamError) {
    assert!(
        matches!(
            error,
            StreamError::Failed(_) | StreamError::AbruptTermination | StreamError::Cancelled
        ),
        "expected UDP failure-shaped stream error, got {error:?}"
    );
}

fn can_bind(addr: SocketAddr) -> bool {
    StdUdpSocket::bind(addr).is_ok()
}

fn temporary_udp_addr() -> SocketAddr {
    let socket = StdUdpSocket::bind("127.0.0.1:0").expect("reserve UDP port");
    socket.local_addr().expect("reserved UDP local addr")
}

#[test]
fn udp_send_sink_to_bind_round_trip_preserves_datagrams() {
    let expected = vec![b"one".to_vec(), b"two-two".to_vec(), vec![0, 1, 2, 3, 4]];
    let (binding_completion, received_completion) =
        TokioUdp::bind("127.0.0.1:0", DATAGRAM_SIZE, RECEIVE_BUFFER)
            .take(expected.len())
            .to_mat(Sink::collect(), Keep::both)
            .run()
            .expect("UDP bind source materializes");
    let binding = binding_completion.wait().expect("UDP bind succeeds");

    Source::from_iterable(
        expected
            .iter()
            .cloned()
            .map(|payload| Datagram::new(payload, binding.local_addr())),
    )
    .run_with(TokioUdp::send_sink("127.0.0.1:0"))
    .expect("UDP send sink materializes")
    .wait()
    .expect("UDP send sink completes");

    let received = received_completion
        .wait()
        .expect("UDP receiver collects datagrams");
    let first_remote = received
        .first()
        .expect("at least one received datagram")
        .remote();
    assert!(first_remote.ip().is_loopback());
    assert_ne!(first_remote.port(), 0);
    assert!(
        received
            .iter()
            .all(|datagram| datagram.remote() == first_remote)
    );

    let mut actual_payloads = received
        .into_iter()
        .map(Datagram::into_payload)
        .collect::<Vec<_>>();
    let mut expected_payloads = expected;
    actual_payloads.sort();
    expected_payloads.sort();
    assert_eq!(actual_payloads, expected_payloads);
}

#[test]
fn udp_bind_flow_receives_and_replies_on_same_socket() {
    let ((publisher, binding_completion), probe) = TestSource::probe::<Datagram>()
        .via_mat(
            TokioUdp::bind_flow("127.0.0.1:0", DATAGRAM_SIZE, RECEIVE_BUFFER),
            Keep::both,
        )
        .to_mat(TestSink::probe(), Keep::both)
        .run()
        .expect("UDP bind_flow materializes");
    probe.request(1);
    let binding = binding_completion.wait().expect("UDP bind_flow binds");

    let client = StdUdpSocket::bind("127.0.0.1:0").expect("client UDP socket");
    client
        .set_read_timeout(Some(Duration::from_secs(3)))
        .expect("client read timeout");
    client
        .send_to(b"flow", binding.local_addr())
        .expect("client sends datagram");

    let inbound = probe.expect_next();
    assert_eq!(inbound.payload(), b"flow");
    assert_eq!(inbound.remote(), client.local_addr().expect("client addr"));

    publisher.expect_request();
    publisher.send_next(Datagram::new(inbound.payload().to_vec(), inbound.remote()));

    let mut buffer = [0_u8; 16];
    let (read, remote) = client.recv_from(&mut buffer).expect("client receives echo");
    assert_eq!(&buffer[..read], b"flow");
    assert_eq!(remote, binding.local_addr());

    publisher.send_complete();
}

#[test]
fn udp_connected_flow_echo_round_trip() {
    let server = StdUdpSocket::bind("127.0.0.1:0").expect("server UDP socket");
    server
        .set_read_timeout(Some(Duration::from_secs(3)))
        .expect("server read timeout");
    let server_addr = server.local_addr().expect("server UDP addr");
    let server_thread = thread::spawn(move || {
        let mut buffer = [0_u8; 64];
        let (read, remote) = server.recv_from(&mut buffer).expect("server receives");
        server
            .send_to(&buffer[..read], remote)
            .expect("server echoes datagram");
    });

    let (connection_completion, response_completion) = Source::single(b"connected".to_vec())
        .via_mat(
            TokioUdp::connect("127.0.0.1:0", server_addr, DATAGRAM_SIZE, RECEIVE_BUFFER),
            Keep::right,
        )
        .to_mat(Sink::head(), Keep::both)
        .run()
        .expect("connected UDP flow materializes");
    let connection = connection_completion
        .wait()
        .expect("connected UDP flow connects");
    assert_eq!(connection.remote_addr(), server_addr);
    assert_eq!(
        response_completion.wait().expect("client receives echo"),
        b"connected".to_vec()
    );
    server_thread.join().expect("server thread joins");
}

#[test]
fn udp_bind_failure_surfaces_as_stream_error() {
    let occupied = StdUdpSocket::bind("127.0.0.1:0").expect("occupied UDP socket");
    let occupied_addr = occupied.local_addr().expect("occupied UDP addr");

    let (binding_completion, stream_completion) =
        TokioUdp::bind(occupied_addr, DATAGRAM_SIZE, RECEIVE_BUFFER)
            .to_mat(Sink::head(), Keep::both)
            .run()
            .expect("failed UDP bind source materializes");

    assert_failed(
        binding_completion
            .wait()
            .expect_err("binding completion fails"),
    );
    assert_failed(
        stream_completion
            .wait()
            .expect_err("stream completion fails"),
    );
}

#[test]
fn dropping_udp_source_and_sink_tears_down_sockets() {
    let (binding_completion, probe) = TokioUdp::bind("127.0.0.1:0", DATAGRAM_SIZE, RECEIVE_BUFFER)
        .to_mat(TestSink::probe(), Keep::both)
        .run()
        .expect("UDP bind source materializes");
    probe.request(1);
    let binding = binding_completion.wait().expect("UDP bind succeeds");
    let source_sender = StdUdpSocket::bind("127.0.0.1:0").expect("source cancellation sender");
    source_sender
        .send_to(b"close", binding.local_addr())
        .expect("source cancellation datagram");
    assert_eq!(probe.expect_next().payload(), b"close");
    drop(probe);
    assert!(
        wait_until(Duration::from_secs(3), || can_bind(binding.local_addr())),
        "dropping UDP source should close the bound socket"
    );

    let local_addr = temporary_udp_addr();
    let remote = StdUdpSocket::bind("127.0.0.1:0").expect("remote UDP socket");
    remote
        .set_read_timeout(Some(Duration::from_secs(3)))
        .expect("remote read timeout");
    let (publisher, completion) = TestSource::probe::<Datagram>()
        .to_mat(TokioUdp::send_sink(local_addr), Keep::both)
        .run()
        .expect("UDP send sink materializes");
    publisher.expect_request();
    publisher.send_next(Datagram::new(
        b"cancel".to_vec(),
        remote.local_addr().expect("remote local addr"),
    ));
    let mut buffer = [0_u8; 16];
    remote
        .recv_from(&mut buffer)
        .expect("remote receives first datagram");

    drop(completion);
    drop(publisher);
    assert!(
        wait_until(Duration::from_secs(3), || can_bind(local_addr)),
        "dropping UDP sink should close the bound socket"
    );
}

#[test]
fn slow_udp_consumer_stays_bounded_and_responsive_under_burst() {
    let (binding_completion, mut probe) = TokioUdp::bind("127.0.0.1:0", DATAGRAM_SIZE, 1)
        .to_mat(TestSink::probe(), Keep::both)
        .run()
        .expect("UDP bind source materializes");
    probe.set_timeout(Duration::from_secs(3));
    probe.request(1);
    let binding = binding_completion.wait().expect("UDP bind succeeds");

    let sender = StdUdpSocket::bind("127.0.0.1:0").expect("sender UDP socket");
    sender
        .send_to(b"warmup", binding.local_addr())
        .expect("send warmup datagram");
    assert_eq!(probe.expect_next().payload(), b"warmup");

    let payload = vec![b'x'; 512];
    for _ in 0..4096 {
        sender
            .send_to(&payload, binding.local_addr())
            .expect("send burst datagram");
    }

    probe.request(1);
    assert_eq!(probe.expect_next().payload().len(), payload.len());
}