use datum::{
Sink, Source, StreamError, StreamRefId, StreamRefSettings, StreamRefs, testkit::TestSink,
};
use datum_net::{
serve_sink_ref_over_tcp, serve_source_ref_over_tcp, sink_ref_over_tcp, source_ref_over_tcp,
};
use std::net::{SocketAddr, TcpListener as StdTcpListener, TcpStream as StdTcpStream};
use std::sync::{
Arc, Mutex, MutexGuard,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
use std::thread;
use std::time::{Duration, Instant};
const STREAM_REF_ID: StreamRefId = StreamRefId::from_u128(1);
fn tcp_stream_ref_test_guard() -> MutexGuard<'static, ()> {
static LOCK: Mutex<()> = Mutex::new(());
LOCK.lock().unwrap_or_else(|poison| poison.into_inner())
}
fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::sleep(Duration::from_millis(5));
}
condition()
}
fn assert_condition_holds(timeout: Duration, mut condition: impl FnMut() -> bool) {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
assert!(condition());
thread::sleep(Duration::from_millis(5));
}
assert!(condition());
}
fn can_bind(addr: SocketAddr) -> bool {
StdTcpListener::bind(addr).is_ok()
}
fn short_settings() -> StreamRefSettings {
StreamRefSettings::default()
.with_buffer_capacity(4)
.with_subscription_timeout(Duration::from_millis(80))
}
fn assert_is_stream_error<T>(result: Result<T, StreamError>) {
assert!(result.is_err(), "expected stream error, got success");
}
#[test]
fn stream_ref_tcp_terminal_fold_specialization_streams_generic_strings() {
let _guard = tcp_stream_ref_test_guard();
let source_ref = Source::from_iter(["a".to_owned(), "b".to_owned(), "c".to_owned()])
.run_with(StreamRefs::source_ref())
.expect("source_ref materializes");
let (binding, server_handle) = serve_source_ref_over_tcp(
"127.0.0.1:0",
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP server binds");
let (remote_source, client_handle) = source_ref_over_tcp::<String, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP client connects");
let folded = remote_source
.run_with(Sink::fold(String::new(), |mut acc, item: String| {
acc.push_str(&item);
acc
}))
.expect("remote fold materializes")
.wait()
.expect("remote fold completes");
assert_eq!(folded, "abc");
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn stream_ref_tcp_terminal_foreach_specialization_runs_side_effects() {
let _guard = tcp_stream_ref_test_guard();
let source_ref = Source::from_iter(1_u64..=4)
.run_with(StreamRefs::source_ref())
.expect("source_ref materializes");
let (binding, server_handle) = serve_source_ref_over_tcp(
"127.0.0.1:0",
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP server binds");
let (remote_source, client_handle) = source_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP client connects");
let sum = Arc::new(AtomicUsize::new(0));
let sum_for_sink = Arc::clone(&sum);
remote_source
.run_with(Sink::foreach(move |item| {
sum_for_sink.fetch_add(item as usize, Ordering::SeqCst);
}))
.expect("remote foreach materializes")
.wait()
.expect("remote foreach completes");
assert_eq!(sum.load(Ordering::SeqCst), 10);
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn stream_ref_tcp_terminal_ignore_specialization_completes() {
let _guard = tcp_stream_ref_test_guard();
let source_ref = Source::from_iter(0_u64..64)
.run_with(StreamRefs::source_ref())
.expect("source_ref materializes");
let (binding, server_handle) = serve_source_ref_over_tcp(
"127.0.0.1:0",
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP server binds");
let (remote_source, client_handle) = source_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP client connects");
remote_source
.run_with(Sink::ignore())
.expect("remote ignore materializes")
.wait()
.expect("remote ignore completes");
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn stream_ref_tcp_terminal_fold_specialization_propagates_origin_failure() {
let _guard = tcp_stream_ref_test_guard();
let source_ref = Source::<u64>::failed(StreamError::Failed("terminal boom".to_owned()))
.run_with(StreamRefs::source_ref())
.expect("source_ref materializes");
let (binding, server_handle) = serve_source_ref_over_tcp(
"127.0.0.1:0",
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP server binds");
let (remote_source, client_handle) = source_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP client connects");
assert!(matches!(
remote_source
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.expect("remote fold materializes")
.wait(),
Err(StreamError::Failed(message)) if message.contains("terminal boom")
));
let _ = client_handle.wait();
let _ = server_handle.wait();
}
#[test]
fn stream_ref_tcp_terminal_fold_specialization_drop_cancels_producer() {
let _guard = tcp_stream_ref_test_guard();
let closed = Arc::new(AtomicBool::new(false));
let pulled = Arc::new(AtomicUsize::new(0));
let close_flag = Arc::clone(&closed);
let pulled_for_source = Arc::clone(&pulled);
let source = Source::unfold_resource(
|| Ok(0_u64),
move |next| {
pulled_for_source.fetch_add(1, Ordering::SeqCst);
let value = *next;
*next += 1;
Ok(Some(value))
},
move |_state| {
close_flag.store(true, Ordering::SeqCst);
Ok(())
},
);
let source_ref = source
.run_with(StreamRefs::source_ref())
.expect("source_ref materializes");
let (binding, server_handle) = serve_source_ref_over_tcp(
"127.0.0.1:0",
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP server binds");
let (remote_source, client_handle) = source_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP client connects");
let completion = remote_source
.run_with(Sink::fold(0_u64, |acc, item| acc.wrapping_add(item)))
.expect("remote fold materializes");
assert!(wait_until(Duration::from_secs(2), || {
pulled.load(Ordering::SeqCst) > 0
}));
drop(completion);
let _ = client_handle.wait();
let _ = server_handle.wait();
assert!(wait_until(Duration::from_secs(2), || closed.load(Ordering::SeqCst)));
}
#[test]
fn stream_ref_tcp_terminal_foreach_specialization_backpressures_slow_callback() {
let _guard = tcp_stream_ref_test_guard();
let settings = short_settings();
let pulled = Arc::new(AtomicUsize::new(0));
let pulled_for_source = Arc::clone(&pulled);
let source = Source::unfold(0_u64, move |next| {
if next >= 64 {
return None;
}
pulled_for_source.fetch_add(1, Ordering::SeqCst);
Some((next + 1, next))
});
let source_ref = source
.run_with(StreamRefs::source_ref_with_settings(settings))
.expect("source_ref materializes");
let (binding, server_handle) =
serve_source_ref_over_tcp("127.0.0.1:0", source_ref, STREAM_REF_ID, settings)
.expect("source ref TCP server binds");
let (remote_source, client_handle) =
source_ref_over_tcp::<u64, _>(binding.local_addr(), STREAM_REF_ID, settings)
.expect("source ref TCP client connects");
let (started_tx, started_rx) = std::sync::mpsc::channel();
let (release_tx, release_rx) = std::sync::mpsc::channel();
let started = Arc::new(std::sync::Mutex::new(Some(started_tx)));
let release = Arc::new(std::sync::Mutex::new(release_rx));
let started_for_sink = Arc::clone(&started);
let release_for_sink = Arc::clone(&release);
let completion = remote_source
.run_with(Sink::foreach(move |_item| {
if let Some(tx) = started_for_sink.lock().unwrap().take() {
let _ = tx.send(());
let _ = release_for_sink
.lock()
.unwrap()
.recv_timeout(Duration::from_secs(3));
}
}))
.expect("remote foreach materializes");
started_rx
.recv_timeout(Duration::from_secs(10))
.expect("slow terminal callback starts");
assert_condition_holds(Duration::from_millis(100), || {
pulled.load(Ordering::SeqCst) <= settings.buffer_capacity() * 2
});
let _ = release_tx.send(());
completion.wait().expect("remote foreach completes");
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn stream_ref_tcp_terminal_late_subscription_fails_fold() {
let _guard = tcp_stream_ref_test_guard();
let listener = StdTcpListener::bind("127.0.0.1:0").expect("dummy TCP listener");
let addr = listener.local_addr().expect("dummy TCP addr");
let (closed_tx, closed_rx) = std::sync::mpsc::channel();
let accept_thread = thread::spawn(move || {
let (stream, _) = listener.accept().expect("dummy accept");
drop(stream);
let _ = closed_tx.send(());
});
let (remote_source, client_handle) =
source_ref_over_tcp::<u64, _>(addr, STREAM_REF_ID, short_settings())
.expect("source ref TCP client connects");
closed_rx
.recv_timeout(Duration::from_secs(3))
.expect("dummy peer closes before materialization");
assert_is_stream_error(
remote_source
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.expect("late fold materializes")
.wait(),
);
accept_thread.join().unwrap();
let _ = client_handle.wait();
}
#[test]
fn sink_ref_tcp_receiver_terminal_fold_specialization_streams_elements() {
let _guard = tcp_stream_ref_test_guard();
let (send_sink, binding, client_handle) =
sink_ref_over_tcp::<u64, _>("127.0.0.1:0", STREAM_REF_ID, StreamRefSettings::default())
.expect("sink ref TCP sender binds");
let (remote_source, server_handle) = serve_sink_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("sink ref TCP receiver connects");
let receiver_thread = thread::spawn(move || {
remote_source
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.expect("receiver fold materializes")
.wait()
});
Source::from_iter(1_u64..=5)
.run_with(send_sink)
.expect("sender source materializes")
.wait()
.expect("sender source completes");
assert_eq!(receiver_thread.join().unwrap().unwrap(), 15);
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn stream_ref_tcp_round_trip_checksum() {
let _guard = tcp_stream_ref_test_guard();
let count = 128_u64;
let source_ref = Source::from_iter(0_u64..count)
.run_with(StreamRefs::source_ref())
.expect("source_ref materializes");
let (binding, server_handle) = serve_source_ref_over_tcp(
"127.0.0.1:0",
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP server binds");
let (remote_source, client_handle) = source_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP client connects");
let values = remote_source.run_collect().expect("remote source collects");
assert_eq!(values.len(), count as usize);
assert_eq!(values.iter().sum::<u64>(), (0_u64..count).sum::<u64>());
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn stream_ref_tcp_peer_drop_errors_receiver() {
let _guard = tcp_stream_ref_test_guard();
let listener = StdTcpListener::bind("127.0.0.1:0").expect("dummy TCP listener");
let addr = listener.local_addr().expect("dummy TCP addr");
let accept_thread = thread::spawn(move || {
let (stream, _) = listener.accept().expect("dummy accept");
drop(stream);
});
let (remote_source, client_handle) =
source_ref_over_tcp::<u64, _>(addr, STREAM_REF_ID, short_settings())
.expect("source ref TCP client connects to dummy");
assert_is_stream_error(remote_source.run_collect());
accept_thread.join().unwrap();
let _ = client_handle.wait();
}
#[test]
fn stream_ref_tcp_downstream_cancel_reaches_producer_and_socket_rebinds() {
let _guard = tcp_stream_ref_test_guard();
let closed = Arc::new(AtomicBool::new(false));
let close_flag = Arc::clone(&closed);
let source = Source::unfold_resource(
|| Ok(0_u64),
|next| {
let value = *next;
*next += 1;
Ok(Some(value))
},
move |_state| {
close_flag.store(true, Ordering::SeqCst);
Ok(())
},
);
let source_ref = source
.run_with(StreamRefs::source_ref())
.expect("source_ref materializes");
let (binding, server_handle) = serve_source_ref_over_tcp(
"127.0.0.1:0",
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP server binds");
let (remote_source, client_handle) = source_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source ref TCP client connects");
assert_eq!(remote_source.take(1).run_collect().unwrap(), vec![0]);
client_handle.wait().unwrap();
server_handle.wait().unwrap();
assert!(wait_until(Duration::from_secs(1), || closed.load(Ordering::SeqCst)));
assert!(wait_until(Duration::from_secs(3), || can_bind(
binding.local_addr()
)));
}
#[test]
fn stream_ref_tcp_bounded_backpressure_with_slow_consumer() {
let _guard = tcp_stream_ref_test_guard();
let settings = short_settings();
let pulled = Arc::new(AtomicUsize::new(0));
let pulled_for_source = Arc::clone(&pulled);
let source = Source::unfold(0_u64, move |next| {
pulled_for_source.fetch_add(1, Ordering::SeqCst);
Some((next + 1, next))
});
let source_ref = source
.run_with(StreamRefs::source_ref_with_settings(settings))
.expect("source_ref materializes");
let (binding, server_handle) =
serve_source_ref_over_tcp("127.0.0.1:0", source_ref, STREAM_REF_ID, settings)
.expect("source ref TCP server binds");
let (remote_source, client_handle) =
source_ref_over_tcp::<u64, _>(binding.local_addr(), STREAM_REF_ID, settings)
.expect("source ref TCP client connects");
let mut probe = remote_source
.run_with(TestSink::probe())
.expect("remote source probe materializes");
probe.set_timeout(Duration::from_secs(10));
probe.request(1);
probe.assert_next(0);
assert!(wait_until(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) >= settings.buffer_capacity()
}));
assert_condition_holds(Duration::from_millis(100), || {
pulled.load(Ordering::SeqCst) <= settings.buffer_capacity() * 2
});
probe.cancel();
let _ = client_handle.wait();
let _ = server_handle.wait();
}
#[test]
fn sink_ref_tcp_round_trip_checksum() {
let _guard = tcp_stream_ref_test_guard();
let (send_sink, binding, client_handle) =
sink_ref_over_tcp::<u64, _>("127.0.0.1:0", STREAM_REF_ID, StreamRefSettings::default())
.expect("sink ref TCP sender binds");
let (remote_source, server_handle) = serve_sink_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("sink ref TCP receiver connects");
let server_thread = thread::spawn(move || remote_source.run_collect());
let count = 128_u64;
let send_completion = Source::from_iter(0_u64..count)
.run_with(send_sink)
.expect("sender source materializes");
send_completion.wait().unwrap();
let values = server_thread.join().unwrap().unwrap();
assert_eq!(values.len(), count as usize);
assert_eq!(values.iter().sum::<u64>(), (0_u64..count).sum::<u64>());
assert_eq!(values, (0_u64..count).collect::<Vec<_>>());
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn sink_ref_tcp_peer_drop_errors_sender() {
let _guard = tcp_stream_ref_test_guard();
let (send_sink, binding, client_handle) =
sink_ref_over_tcp::<u64, _>("127.0.0.1:0", STREAM_REF_ID, short_settings())
.expect("sink ref TCP sender binds");
let send_completion = Source::repeat(1_u64)
.run_with(send_sink)
.expect("repeat source materializes");
let client_thread = thread::spawn(move || send_completion.wait());
let peer = StdTcpStream::connect(binding.local_addr()).expect("dummy receiver connects");
drop(peer);
assert_is_stream_error(client_thread.join().unwrap());
let _ = client_handle.wait();
}
#[test]
fn sink_ref_tcp_receiver_peer_drop_errors_receiver() {
let _guard = tcp_stream_ref_test_guard();
let listener = StdTcpListener::bind("127.0.0.1:0").expect("dummy TCP listener");
let addr = listener.local_addr().expect("dummy TCP addr");
let accept_thread = thread::spawn(move || {
let (stream, _) = listener.accept().expect("dummy accept");
drop(stream);
});
let (remote_source, server_handle) =
serve_sink_ref_over_tcp::<u64, _>(addr, STREAM_REF_ID, short_settings())
.expect("sink ref TCP receiver connects to dummy");
assert_is_stream_error(remote_source.run_collect());
accept_thread.join().unwrap();
let _ = server_handle.wait();
}
#[test]
fn sink_ref_tcp_receiver_cancel_reaches_sender_and_socket_rebinds() {
let _guard = tcp_stream_ref_test_guard();
let (send_sink, binding, client_handle) =
sink_ref_over_tcp::<u64, _>("127.0.0.1:0", STREAM_REF_ID, StreamRefSettings::default())
.expect("sink ref TCP sender binds");
let (remote_source, server_handle) = serve_sink_ref_over_tcp::<u64, _>(
binding.local_addr(),
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("sink ref TCP receiver connects");
let server_thread = thread::spawn(move || remote_source.take(1).run_collect());
let closed = Arc::new(AtomicBool::new(false));
let close_flag = Arc::clone(&closed);
let source = Source::unfold_resource(
|| Ok(0_u64),
|next| {
let value = *next;
*next += 1;
Ok(Some(value))
},
move |_state| {
close_flag.store(true, Ordering::SeqCst);
Ok(())
},
);
let send_completion = source
.run_with(send_sink)
.expect("infinite sender source materializes");
let client_thread = thread::spawn(move || send_completion.wait());
assert_eq!(server_thread.join().unwrap().unwrap(), vec![0]);
let _ = server_handle.wait();
let _ = client_thread.join();
let _ = client_handle.wait();
assert!(wait_until(Duration::from_secs(2), || closed.load(Ordering::SeqCst)));
assert!(wait_until(Duration::from_secs(3), || can_bind(
binding.local_addr()
)));
}
#[test]
fn sink_ref_tcp_bounded_backpressure_with_slow_receiver() {
let _guard = tcp_stream_ref_test_guard();
let settings = short_settings();
let (send_sink, binding, client_handle) =
sink_ref_over_tcp::<u64, _>("127.0.0.1:0", STREAM_REF_ID, settings)
.expect("sink ref TCP sender binds");
let (remote_source, server_handle) =
serve_sink_ref_over_tcp::<u64, _>(binding.local_addr(), STREAM_REF_ID, settings)
.expect("sink ref TCP receiver connects");
let mut probe = remote_source
.run_with(TestSink::probe())
.expect("receiver probe materializes");
probe.set_timeout(Duration::from_secs(10));
probe.request(1);
let pulled = Arc::new(AtomicUsize::new(0));
let pulled_for_source = Arc::clone(&pulled);
let source = Source::unfold(0_u64, move |next| {
pulled_for_source.fetch_add(1, Ordering::SeqCst);
Some((next + 1, next))
});
let send_completion = source
.run_with(send_sink)
.expect("infinite sender source materializes");
let client_thread = thread::spawn(move || send_completion.wait());
probe.assert_next(0);
assert!(wait_until(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) >= settings.buffer_capacity()
}));
assert_condition_holds(Duration::from_millis(100), || {
pulled.load(Ordering::SeqCst) <= settings.buffer_capacity() * 2
});
probe.cancel();
let _ = server_handle.wait();
let _ = client_thread.join();
let _ = client_handle.wait();
}