use datum::{
Keep, Sink, Source, StreamError, StreamRefFrame, StreamRefId, StreamRefMessage,
StreamRefSettings, StreamRefs,
testkit::{TestSink, TestSource},
};
use datum_net::quic::{
crypto::rustls::{QuicClientConfig, QuicServerConfig},
quinn,
rustls::{
ClientConfig as RustlsClientConfig, RootCertStore, ServerConfig as RustlsServerConfig,
pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer},
},
};
use datum_net::{
QuicBidirectionalStream, QuicIncomingConnection, TokioQuic, serve_sink_ref_over_quic,
serve_source_ref_over_quic, sink_ref_over_quic, source_ref_over_quic,
};
use rcgen::{CertifiedKey, generate_simple_self_signed};
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc,
};
use std::thread;
use std::time::{Duration, Instant};
const CHUNK_SIZE: usize = 8192;
const STREAM_REF_CHUNK_SIZE: usize = 8192;
const STREAM_REF_ID: StreamRefId = StreamRefId::from_u128(1);
const SUSTAINED_QUIC_ROUND_TRIPS: usize = 512;
fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::sleep(Duration::from_millis(5));
}
condition()
}
fn assert_condition_holds(timeout: Duration, mut condition: impl FnMut() -> bool) {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
assert!(condition());
thread::sleep(Duration::from_millis(5));
}
assert!(condition());
}
fn short_settings() -> StreamRefSettings {
StreamRefSettings::default()
.with_buffer_capacity(4)
.with_subscription_timeout(Duration::from_millis(80))
}
fn assert_stream_error<T>(result: Result<T, StreamError>) {
assert!(result.is_err(), "expected StreamError, got success");
}
fn quic_configs() -> (quinn::ServerConfig, quinn::ClientConfig) {
let CertifiedKey { cert, key_pair } =
generate_simple_self_signed(["localhost".to_owned()]).expect("self-signed cert");
let cert_der: CertificateDer<'static> = cert.der().clone();
let key_der = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(key_pair.serialize_der()));
let server_crypto = RustlsServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert_der.clone()], key_der)
.expect("server config");
let mut roots = RootCertStore::empty();
roots.add(cert_der).expect("trust self-signed cert");
let client_crypto = RustlsClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
(
quinn::ServerConfig::with_crypto(Arc::new(
QuicServerConfig::try_from(server_crypto).expect("QUIC server config"),
)),
quinn::ClientConfig::new(Arc::new(
QuicClientConfig::try_from(client_crypto).expect("QUIC client config"),
)),
)
}
fn connected_pair() -> (datum_net::QuicConnection, QuicIncomingConnection) {
let (server_config, client_config) = quic_configs();
let (binding_completion, incoming_completion) =
TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
.to_mat(Sink::head(), Keep::both)
.run()
.expect("QUIC bind source materializes");
let binding = binding_completion.wait().expect("QUIC binding succeeds");
let client_connection =
TokioQuic::connect(binding.local_addr(), "localhost", client_config, CHUNK_SIZE)
.run_with(Sink::head())
.expect("QUIC client connection source materializes")
.wait()
.expect("QUIC client connects");
let incoming = incoming_completion
.wait()
.expect("incoming QUIC connection accepted");
(client_connection, incoming)
}
fn recv_with_timeout<T: Send + 'static>(
label: &str,
timeout: Duration,
run: impl FnOnce() -> T + Send + 'static,
) -> T {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
let _ = sender.send(run());
});
match receiver.recv_timeout(timeout) {
Ok(value) => value,
Err(mpsc::RecvTimeoutError::Timeout) => panic!("{label} timed out after {timeout:?}"),
Err(mpsc::RecvTimeoutError::Disconnected) => panic!("{label} worker terminated"),
}
}
fn open_client_stream(connection: &datum_net::QuicConnection) -> QuicBidirectionalStream {
connection
.open_bi_stream_available(STREAM_REF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("client QUIC stream source materializes")
.wait()
.expect("client opens QUIC stream")
}
fn accept_server_stream(incoming: &QuicIncomingConnection) -> QuicBidirectionalStream {
incoming
.accept_bi_available(STREAM_REF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("server accept_bi_available source materializes")
.wait()
.expect("server accepts QUIC stream")
}
fn open_server_stream(incoming: &QuicIncomingConnection) -> QuicBidirectionalStream {
incoming
.open_bi_stream_available(STREAM_REF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("server open_bi_stream_available source materializes")
.wait()
.expect("server opens QUIC stream")
}
fn accept_client_stream(connection: &datum_net::QuicConnection) -> QuicBidirectionalStream {
connection
.accept_bi_available(STREAM_REF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("client accept_bi_available source materializes")
.wait()
.expect("client accepts QUIC stream")
}
#[test]
fn stream_ref_quic_sustained_fresh_connections_source_and_sink_refs() {
let (server_config, client_config) = quic_configs();
let source_server_iterations = Arc::new(AtomicUsize::new(0));
let source_server_iterations_for_sink = Arc::clone(&source_server_iterations);
let (source_binding_completion, source_server_completion) =
TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
.to_mat(
Sink::foreach(move |connection: QuicIncomingConnection| {
let iteration =
source_server_iterations_for_sink.fetch_add(1, Ordering::SeqCst);
let stream = connection
.accept_bi_available(STREAM_REF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("source churn server accept_bi materializes")
.wait()
.unwrap_or_else(|error| {
panic!("source churn server accept_bi iteration {iteration}: {error:?}")
});
let source_ref = Source::from_iter(0_u64..64)
.run_with(StreamRefs::source_ref())
.expect("source churn source_ref materializes");
let handle = serve_source_ref_over_quic(
stream,
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.expect("source churn serve_source_ref_over_quic");
handle.wait().unwrap_or_else(|error| {
panic!("source churn server handle iteration {iteration}: {error:?}")
});
}),
Keep::both,
)
.run()
.expect("source churn QUIC server materializes");
let source_addr = source_binding_completion
.wait()
.expect("source churn QUIC server binds")
.local_addr();
for iteration in 0..SUSTAINED_QUIC_ROUND_TRIPS {
let client_config = client_config.clone();
let sum = recv_with_timeout(
"source churn client iteration",
Duration::from_secs(5),
move || {
let connection =
TokioQuic::connect(source_addr, "localhost", client_config, CHUNK_SIZE)
.run_with(Sink::head())
.expect("source churn client connection materializes")
.wait()
.unwrap_or_else(|error| {
panic!("source churn client connects iteration {iteration}: {error:?}")
});
let stream = connection
.open_bi_stream_available(STREAM_REF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("source churn client open_bi materializes")
.wait()
.unwrap_or_else(|error| {
panic!("source churn client open_bi iteration {iteration}: {error:?}")
});
let (remote_source, handle) = source_ref_over_quic::<u64>(
stream,
STREAM_REF_ID,
StreamRefSettings::default(),
);
let sum = remote_source
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.expect("source churn remote fold materializes")
.wait()
.unwrap_or_else(|error| {
panic!("source churn remote fold iteration {iteration}: {error:?}")
});
handle.wait().unwrap_or_else(|error| {
panic!("source churn client handle iteration {iteration}: {error:?}")
});
sum
},
);
assert_eq!(
sum,
(0_u64..64).sum::<u64>(),
"source churn iteration {iteration}"
);
}
assert_eq!(
source_server_iterations.load(Ordering::SeqCst),
SUSTAINED_QUIC_ROUND_TRIPS
);
drop(source_server_completion);
let (server_config, client_config) = quic_configs();
let sink_server_iterations = Arc::new(AtomicUsize::new(0));
let sink_server_iterations_for_sink = Arc::clone(&sink_server_iterations);
let (sink_binding_completion, sink_server_completion) =
TokioQuic::bind("127.0.0.1:0", server_config, CHUNK_SIZE)
.to_mat(
Sink::foreach(move |connection: QuicIncomingConnection| {
let iteration = sink_server_iterations_for_sink.fetch_add(1, Ordering::SeqCst);
let stream = connection
.open_bi_stream_available(STREAM_REF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("sink churn server open_bi materializes")
.wait()
.unwrap_or_else(|error| {
panic!("sink churn server open_bi iteration {iteration}: {error:?}")
});
let (remote_source, handle) = serve_sink_ref_over_quic::<u64>(
stream,
STREAM_REF_ID,
StreamRefSettings::default(),
);
let sum = remote_source
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.expect("sink churn server fold materializes")
.wait()
.unwrap_or_else(|error| {
panic!("sink churn server fold iteration {iteration}: {error:?}")
});
assert_eq!(sum, (0_u64..64).sum::<u64>());
handle.wait().unwrap_or_else(|error| {
panic!("sink churn server handle iteration {iteration}: {error:?}")
});
}),
Keep::both,
)
.run()
.expect("sink churn QUIC server materializes");
let sink_addr = sink_binding_completion
.wait()
.expect("sink churn QUIC server binds")
.local_addr();
for iteration in 0..SUSTAINED_QUIC_ROUND_TRIPS {
let client_config = client_config.clone();
recv_with_timeout(
"sink churn client iteration",
Duration::from_secs(5),
move || {
let connection =
TokioQuic::connect(sink_addr, "localhost", client_config, CHUNK_SIZE)
.run_with(Sink::head())
.expect("sink churn client connection materializes")
.wait()
.unwrap_or_else(|error| {
panic!("sink churn client connects iteration {iteration}: {error:?}")
});
let stream = connection
.accept_bi_available(STREAM_REF_CHUNK_SIZE)
.run_with(Sink::head())
.expect("sink churn client accept_bi materializes")
.wait()
.unwrap_or_else(|error| {
panic!("sink churn client accept_bi iteration {iteration}: {error:?}")
});
let (send_sink, handle) =
sink_ref_over_quic::<u64>(stream, STREAM_REF_ID, StreamRefSettings::default());
Source::from_iter(0_u64..64)
.run_with(send_sink)
.expect("sink churn sender materializes")
.wait()
.unwrap_or_else(|error| {
panic!("sink churn sender completion iteration {iteration}: {error:?}")
});
handle.wait().unwrap_or_else(|error| {
panic!("sink churn client handle iteration {iteration}: {error:?}")
});
},
);
}
assert_eq!(
sink_server_iterations.load(Ordering::SeqCst),
SUSTAINED_QUIC_ROUND_TRIPS
);
drop(sink_server_completion);
}
#[test]
fn stream_ref_quic_connection_loss_before_terminal_errors_source() {
let (client_connection, incoming) = connected_pair();
let client_stream = open_client_stream(&client_connection);
let (remote_source, client_handle) =
source_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, StreamRefSettings::default());
let (source_result_sender, source_result_receiver) = mpsc::channel();
thread::spawn(move || {
let _ = source_result_sender.send(remote_source.run_collect());
});
let server_stream = accept_server_stream(&incoming);
incoming
.connection()
.close(b"closed before StreamRefs terminal");
drop(server_stream);
let source_result = source_result_receiver
.recv_timeout(Duration::from_secs(5))
.expect("remote source should finish after peer connection loss");
assert_stream_error(source_result);
let carrier_result = recv_with_timeout(
"connection-loss client carrier",
Duration::from_secs(5),
move || client_handle.wait(),
);
assert_stream_error(carrier_result);
}
#[test]
fn stream_ref_quic_round_trip_checksum() {
let (client_connection, incoming) = connected_pair();
let client_stream = open_client_stream(&client_connection);
let (remote_source, client_handle) =
source_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, StreamRefSettings::default());
let client_thread = thread::spawn(move || remote_source.run_collect());
let server_stream = accept_server_stream(&incoming);
let count = 128_u64;
let source_ref = Source::from_iter(0_u64..count)
.run_with(StreamRefs::source_ref())
.unwrap();
let server_handle = serve_source_ref_over_quic(
server_stream,
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.unwrap();
let values = client_thread.join().unwrap().unwrap();
assert_eq!(values.len(), count as usize);
assert_eq!(values.iter().sum::<u64>(), (0_u64..count).sum::<u64>());
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn stream_ref_quic_terminal_fold_specialization_streams_elements() {
let (client_connection, incoming) = connected_pair();
let client_stream = open_client_stream(&client_connection);
let (remote_source, client_handle) =
source_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, StreamRefSettings::default());
let client_thread = thread::spawn(move || {
remote_source
.run_with(Sink::fold(0_u64, |acc, item| acc + item))
.expect("remote fold materializes")
.wait()
});
let server_stream = accept_server_stream(&incoming);
let source_ref = Source::from_iter(1_u64..=5)
.run_with(StreamRefs::source_ref())
.unwrap();
let server_handle = serve_source_ref_over_quic(
server_stream,
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.unwrap();
assert_eq!(client_thread.join().unwrap().unwrap(), 15);
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn stream_ref_quic_propagates_origin_failure() {
let (client_connection, incoming) = connected_pair();
let client_stream = open_client_stream(&client_connection);
let (remote_source, client_handle) =
source_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, StreamRefSettings::default());
let client_thread = thread::spawn(move || remote_source.run_collect());
let server_stream = accept_server_stream(&incoming);
let source_ref = Source::<u64>::failed(StreamError::Failed("boom".to_owned()))
.run_with(StreamRefs::source_ref())
.unwrap();
let server_handle = serve_source_ref_over_quic(
server_stream,
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.unwrap();
assert!(matches!(
client_thread.join().unwrap(),
Err(StreamError::Failed(message)) if message.contains("boom")
));
let _ = client_handle.wait();
let _ = server_handle.wait();
}
#[test]
fn stream_ref_quic_downstream_cancel_reaches_producer_and_acks() {
let (client_connection, incoming) = connected_pair();
let client_stream = open_client_stream(&client_connection);
let (remote_source, client_handle) =
source_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, StreamRefSettings::default());
let client_thread = thread::spawn(move || remote_source.take(1).run_collect());
let server_stream = accept_server_stream(&incoming);
let closed = Arc::new(AtomicBool::new(false));
let close_flag = Arc::clone(&closed);
let source = Source::unfold_resource(
|| Ok(0_u64),
|next| {
let value = *next;
*next += 1;
Ok(Some(value))
},
move |_state| {
close_flag.store(true, Ordering::SeqCst);
Ok(())
},
);
let source_ref = source
.run_with(StreamRefs::source_ref())
.expect("source_ref materializes");
let server_handle = serve_source_ref_over_quic(
server_stream,
source_ref,
STREAM_REF_ID,
StreamRefSettings::default(),
)
.unwrap();
assert_eq!(client_thread.join().unwrap().unwrap(), vec![0]);
client_handle.wait().unwrap();
server_handle.wait().unwrap();
assert!(wait_until(Duration::from_secs(1), || closed.load(Ordering::SeqCst)));
}
#[test]
fn stream_ref_quic_bounded_backpressure_with_slow_consumer() {
let settings = short_settings();
let (client_connection, incoming) = connected_pair();
let client_stream = open_client_stream(&client_connection);
let (remote_source, client_handle) =
source_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, settings);
let mut probe = remote_source
.run_with(TestSink::probe())
.expect("remote source probe materializes");
probe.request(1);
let server_stream = accept_server_stream(&incoming);
let pulled = Arc::new(AtomicUsize::new(0));
let pulled_for_source = Arc::clone(&pulled);
let source = Source::unfold(0_u64, move |next| {
pulled_for_source.fetch_add(1, Ordering::SeqCst);
Some((next + 1, next))
});
let source_ref = source
.run_with(StreamRefs::source_ref_with_settings(settings))
.unwrap();
let server_handle =
serve_source_ref_over_quic(server_stream, source_ref, STREAM_REF_ID, settings).unwrap();
probe.assert_next(0);
assert!(wait_until(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) >= settings.buffer_capacity()
}));
assert_condition_holds(Duration::from_millis(100), || {
pulled.load(Ordering::SeqCst) <= settings.buffer_capacity() * 2
});
probe.cancel();
let _ = client_handle.wait();
let _ = server_handle.wait();
}
#[test]
fn stream_ref_quic_subscription_timeout_without_first_demand() {
let settings = short_settings();
let (client_connection, incoming) = connected_pair();
let client_stream = open_client_stream(&client_connection);
let (_client_source, client_sink) = client_stream.into_parts();
let (publisher, _write_completion) = TestSource::probe::<Vec<u8>>()
.to_mat(client_sink, Keep::both)
.run()
.expect("client raw writer materializes");
let handshake = StreamRefFrame::new(STREAM_REF_ID, StreamRefMessage::OnSubscribeHandshake);
publisher.expect_request();
publisher.send_next(length_prefixed(handshake));
let server_stream = accept_server_stream(&incoming);
let source_ref = Source::repeat(1_u64)
.run_with(StreamRefs::source_ref_with_settings(settings))
.unwrap();
let server_handle =
serve_source_ref_over_quic(server_stream, source_ref, STREAM_REF_ID, settings).unwrap();
drop(publisher);
let error = server_handle.wait().unwrap_err();
assert!(
matches!(&error, StreamError::Failed(message) if message.contains("first demand")),
"unexpected error: {error:?}"
);
}
fn length_prefixed(frame: StreamRefFrame) -> Vec<u8> {
let payload = frame.encode_to_vec();
let mut bytes = Vec::with_capacity(4 + payload.len());
bytes.extend((payload.len() as u32).to_be_bytes());
bytes.extend(payload);
bytes
}
#[test]
fn sink_ref_quic_round_trip_checksum() {
let (client_connection, incoming) = connected_pair();
let server_stream = open_server_stream(&incoming);
let (remote_source, server_handle) =
serve_sink_ref_over_quic::<u64>(server_stream, STREAM_REF_ID, StreamRefSettings::default());
let server_thread = thread::spawn(move || remote_source.run_collect());
let client_stream = accept_client_stream(&client_connection);
let (send_sink, client_handle) =
sink_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, StreamRefSettings::default());
let count = 128_u64;
let send_completion = Source::from_iter(0_u64..count)
.run_with(send_sink)
.expect("sender source materializes");
let client_thread = thread::spawn(move || send_completion.wait());
client_thread.join().unwrap().unwrap();
let values = server_thread.join().unwrap().unwrap();
assert_eq!(values.len(), count as usize);
assert_eq!(values.iter().sum::<u64>(), (0_u64..count).sum::<u64>());
assert_eq!(values, (0_u64..count).collect::<Vec<_>>());
client_handle.wait().unwrap();
server_handle.wait().unwrap();
}
#[test]
fn sink_ref_quic_sender_failure_reaches_receiver() {
let (client_connection, incoming) = connected_pair();
let server_stream = open_server_stream(&incoming);
let (remote_source, server_handle) =
serve_sink_ref_over_quic::<u64>(server_stream, STREAM_REF_ID, StreamRefSettings::default());
let collect_completion = remote_source
.run_with(Sink::collect())
.expect("receiver source materializes");
let server_thread = thread::spawn(move || collect_completion.wait());
let client_stream = accept_client_stream(&client_connection);
let (send_sink, client_handle) =
sink_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, StreamRefSettings::default());
let send_completion = Source::<u64>::failed(StreamError::Failed("boom".to_owned()))
.run_with(send_sink)
.expect("failed source materializes");
let client_thread = thread::spawn(move || send_completion.wait());
assert!(matches!(
client_thread.join().unwrap(),
Err(StreamError::Failed(message)) if message.contains("boom")
));
assert!(matches!(
server_thread.join().unwrap(),
Err(StreamError::Failed(message)) if message.contains("boom")
));
let _ = client_handle.wait();
let _ = server_handle.wait();
}
#[test]
fn sink_ref_quic_receiver_cancel_reaches_sender() {
let (client_connection, incoming) = connected_pair();
let server_stream = open_server_stream(&incoming);
let (remote_source, server_handle) =
serve_sink_ref_over_quic::<u64>(server_stream, STREAM_REF_ID, StreamRefSettings::default());
let collect_completion = remote_source
.take(1)
.run_with(Sink::collect())
.expect("take(1) receiver materializes");
let server_thread = thread::spawn(move || collect_completion.wait());
let client_stream = accept_client_stream(&client_connection);
let (send_sink, client_handle) =
sink_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, StreamRefSettings::default());
let closed = Arc::new(AtomicBool::new(false));
let close_flag = Arc::clone(&closed);
let source = Source::unfold_resource(
|| Ok(0_u64),
|next| {
let value = *next;
*next += 1;
Ok(Some(value))
},
move |_state| {
close_flag.store(true, Ordering::SeqCst);
Ok(())
},
);
let send_completion = source
.run_with(send_sink)
.expect("infinite sender source materializes");
let client_thread = thread::spawn(move || send_completion.wait());
let values = server_thread.join().unwrap().unwrap();
assert_eq!(values, vec![0]);
let _ = server_handle.wait();
let _ = client_thread.join();
let _ = client_handle.wait();
assert!(wait_until(Duration::from_secs(2), || closed.load(Ordering::SeqCst)));
}
#[test]
fn sink_ref_quic_bounded_backpressure_with_slow_receiver() {
let settings = short_settings();
let (client_connection, incoming) = connected_pair();
let server_stream = open_server_stream(&incoming);
let (remote_source, server_handle) =
serve_sink_ref_over_quic::<u64>(server_stream, STREAM_REF_ID, settings);
let mut probe = remote_source
.run_with(TestSink::probe())
.expect("receiver probe materializes");
probe.request(1);
let client_stream = accept_client_stream(&client_connection);
let (send_sink, client_handle) =
sink_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, settings);
let pulled = Arc::new(AtomicUsize::new(0));
let pulled_for_source = Arc::clone(&pulled);
let source = Source::unfold(0_u64, move |next| {
pulled_for_source.fetch_add(1, Ordering::SeqCst);
Some((next + 1, next))
});
let send_completion = source
.run_with(send_sink)
.expect("infinite sender source materializes");
let client_thread = thread::spawn(move || send_completion.wait());
probe.assert_next(0);
assert!(wait_until(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) >= settings.buffer_capacity()
}));
assert_condition_holds(Duration::from_millis(100), || {
pulled.load(Ordering::SeqCst) <= settings.buffer_capacity() * 2
});
probe.cancel();
let _ = server_handle.wait();
let _ = client_thread.join();
let _ = client_handle.wait();
}
#[test]
fn sink_ref_quic_subscription_timeout_without_first_demand() {
let settings = short_settings();
let (client_connection, incoming) = connected_pair();
let server_stream = open_server_stream(&incoming);
let (_server_source, server_sink) = server_stream.into_parts();
let (publisher, write_completion) = TestSource::probe::<Vec<u8>>()
.to_mat(server_sink, Keep::both)
.run()
.expect("server raw writer materializes");
let handshake = StreamRefFrame::new(STREAM_REF_ID, StreamRefMessage::OnSubscribeHandshake);
publisher.expect_request();
publisher.send_next(length_prefixed(handshake));
let client_stream = accept_client_stream(&client_connection);
let (send_sink, client_handle) =
sink_ref_over_quic::<u64>(client_stream, STREAM_REF_ID, settings);
let send_completion = Source::repeat(1_u64)
.run_with(send_sink)
.expect("repeat source materializes");
let client_thread = thread::spawn(move || send_completion.wait());
drop(publisher);
let _ = write_completion.wait();
let error = client_thread.join().unwrap().unwrap_err();
assert!(
matches!(&error, StreamError::Failed(message) if message.contains("first demand")),
"unexpected error: {error:?}"
);
let _ = client_handle.wait();
}