use datum::{
Keep, Sink, Source, StreamError,
testkit::{TestSink, TestSource},
};
use datum_net::{Datagram, TokioUdp};
use std::net::{SocketAddr, UdpSocket as StdUdpSocket};
use std::thread;
use std::time::{Duration, Instant};
const DATAGRAM_SIZE: usize = 2048;
const RECEIVE_BUFFER: usize = 8;
fn wait_until(timeout: Duration, condition: impl Fn() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::sleep(Duration::from_millis(5));
}
condition()
}
fn assert_failed(error: StreamError) {
assert!(
matches!(
error,
StreamError::Failed(_) | StreamError::AbruptTermination | StreamError::Cancelled
),
"expected UDP failure-shaped stream error, got {error:?}"
);
}
fn can_bind(addr: SocketAddr) -> bool {
StdUdpSocket::bind(addr).is_ok()
}
fn temporary_udp_addr() -> SocketAddr {
let socket = StdUdpSocket::bind("127.0.0.1:0").expect("reserve UDP port");
socket.local_addr().expect("reserved UDP local addr")
}
#[test]
fn udp_send_sink_to_bind_round_trip_preserves_datagrams() {
let expected = vec![b"one".to_vec(), b"two-two".to_vec(), vec![0, 1, 2, 3, 4]];
let (binding_completion, received_completion) =
TokioUdp::bind("127.0.0.1:0", DATAGRAM_SIZE, RECEIVE_BUFFER)
.take(expected.len())
.to_mat(Sink::collect(), Keep::both)
.run()
.expect("UDP bind source materializes");
let binding = binding_completion.wait().expect("UDP bind succeeds");
Source::from_iterable(
expected
.iter()
.cloned()
.map(|payload| Datagram::new(payload, binding.local_addr())),
)
.run_with(TokioUdp::send_sink("127.0.0.1:0"))
.expect("UDP send sink materializes")
.wait()
.expect("UDP send sink completes");
let received = received_completion
.wait()
.expect("UDP receiver collects datagrams");
let first_remote = received
.first()
.expect("at least one received datagram")
.remote();
assert!(first_remote.ip().is_loopback());
assert_ne!(first_remote.port(), 0);
assert!(
received
.iter()
.all(|datagram| datagram.remote() == first_remote)
);
let mut actual_payloads = received
.into_iter()
.map(Datagram::into_payload)
.collect::<Vec<_>>();
let mut expected_payloads = expected;
actual_payloads.sort();
expected_payloads.sort();
assert_eq!(actual_payloads, expected_payloads);
}
#[test]
fn udp_bind_flow_receives_and_replies_on_same_socket() {
let ((publisher, binding_completion), probe) = TestSource::probe::<Datagram>()
.via_mat(
TokioUdp::bind_flow("127.0.0.1:0", DATAGRAM_SIZE, RECEIVE_BUFFER),
Keep::both,
)
.to_mat(TestSink::probe(), Keep::both)
.run()
.expect("UDP bind_flow materializes");
probe.request(1);
let binding = binding_completion.wait().expect("UDP bind_flow binds");
let client = StdUdpSocket::bind("127.0.0.1:0").expect("client UDP socket");
client
.set_read_timeout(Some(Duration::from_secs(3)))
.expect("client read timeout");
client
.send_to(b"flow", binding.local_addr())
.expect("client sends datagram");
let inbound = probe.expect_next();
assert_eq!(inbound.payload(), b"flow");
assert_eq!(inbound.remote(), client.local_addr().expect("client addr"));
publisher.expect_request();
publisher.send_next(Datagram::new(inbound.payload().to_vec(), inbound.remote()));
let mut buffer = [0_u8; 16];
let (read, remote) = client.recv_from(&mut buffer).expect("client receives echo");
assert_eq!(&buffer[..read], b"flow");
assert_eq!(remote, binding.local_addr());
publisher.send_complete();
}
#[test]
fn udp_connected_flow_echo_round_trip() {
let server = StdUdpSocket::bind("127.0.0.1:0").expect("server UDP socket");
server
.set_read_timeout(Some(Duration::from_secs(3)))
.expect("server read timeout");
let server_addr = server.local_addr().expect("server UDP addr");
let server_thread = thread::spawn(move || {
let mut buffer = [0_u8; 64];
let (read, remote) = server.recv_from(&mut buffer).expect("server receives");
server
.send_to(&buffer[..read], remote)
.expect("server echoes datagram");
});
let (connection_completion, response_completion) = Source::single(b"connected".to_vec())
.via_mat(
TokioUdp::connect("127.0.0.1:0", server_addr, DATAGRAM_SIZE, RECEIVE_BUFFER),
Keep::right,
)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("connected UDP flow materializes");
let connection = connection_completion
.wait()
.expect("connected UDP flow connects");
assert_eq!(connection.remote_addr(), server_addr);
assert_eq!(
response_completion.wait().expect("client receives echo"),
b"connected".to_vec()
);
server_thread.join().expect("server thread joins");
}
#[test]
fn udp_bind_failure_surfaces_as_stream_error() {
let occupied = StdUdpSocket::bind("127.0.0.1:0").expect("occupied UDP socket");
let occupied_addr = occupied.local_addr().expect("occupied UDP addr");
let (binding_completion, stream_completion) =
TokioUdp::bind(occupied_addr, DATAGRAM_SIZE, RECEIVE_BUFFER)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("failed UDP bind source materializes");
assert_failed(
binding_completion
.wait()
.expect_err("binding completion fails"),
);
assert_failed(
stream_completion
.wait()
.expect_err("stream completion fails"),
);
}
#[test]
fn dropping_udp_source_and_sink_tears_down_sockets() {
let (binding_completion, probe) = TokioUdp::bind("127.0.0.1:0", DATAGRAM_SIZE, RECEIVE_BUFFER)
.to_mat(TestSink::probe(), Keep::both)
.run()
.expect("UDP bind source materializes");
probe.request(1);
let binding = binding_completion.wait().expect("UDP bind succeeds");
let source_sender = StdUdpSocket::bind("127.0.0.1:0").expect("source cancellation sender");
source_sender
.send_to(b"close", binding.local_addr())
.expect("source cancellation datagram");
assert_eq!(probe.expect_next().payload(), b"close");
drop(probe);
assert!(
wait_until(Duration::from_secs(3), || can_bind(binding.local_addr())),
"dropping UDP source should close the bound socket"
);
let local_addr = temporary_udp_addr();
let remote = StdUdpSocket::bind("127.0.0.1:0").expect("remote UDP socket");
remote
.set_read_timeout(Some(Duration::from_secs(3)))
.expect("remote read timeout");
let (publisher, completion) = TestSource::probe::<Datagram>()
.to_mat(TokioUdp::send_sink(local_addr), Keep::both)
.run()
.expect("UDP send sink materializes");
publisher.expect_request();
publisher.send_next(Datagram::new(
b"cancel".to_vec(),
remote.local_addr().expect("remote local addr"),
));
let mut buffer = [0_u8; 16];
remote
.recv_from(&mut buffer)
.expect("remote receives first datagram");
drop(completion);
drop(publisher);
assert!(
wait_until(Duration::from_secs(3), || can_bind(local_addr)),
"dropping UDP sink should close the bound socket"
);
}
#[test]
fn slow_udp_consumer_stays_bounded_and_responsive_under_burst() {
let (binding_completion, mut probe) = TokioUdp::bind("127.0.0.1:0", DATAGRAM_SIZE, 1)
.to_mat(TestSink::probe(), Keep::both)
.run()
.expect("UDP bind source materializes");
probe.set_timeout(Duration::from_secs(3));
probe.request(1);
let binding = binding_completion.wait().expect("UDP bind succeeds");
let sender = StdUdpSocket::bind("127.0.0.1:0").expect("sender UDP socket");
sender
.send_to(b"warmup", binding.local_addr())
.expect("send warmup datagram");
assert_eq!(probe.expect_next().payload(), b"warmup");
let payload = vec![b'x'; 512];
for _ in 0..4096 {
sender
.send_to(&payload, binding.local_addr())
.expect("send burst datagram");
}
probe.request(1);
assert_eq!(probe.expect_next().payload().len(), payload.len());
}