use crate::security::authenticated::AuthenticatedSymbol;
use crate::transport::error::SinkError;
use crate::transport::{ChannelWaiter, SharedChannel};
use smallvec::SmallVec;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll, Waker};
fn upsert_channel_waiter(
wakers: &mut SmallVec<[ChannelWaiter; 2]>,
queued: &Arc<AtomicBool>,
waker: &Waker,
) {
if let Some(existing) = wakers
.iter_mut()
.find(|entry| Arc::ptr_eq(&entry.queued, queued))
{
if !existing.waker.will_wake(waker) {
existing.waker.clone_from(waker);
}
} else {
wakers.push(ChannelWaiter {
waker: waker.clone(),
queued: Arc::clone(queued),
});
}
}
fn pop_next_queued_waiter(wakers: &mut SmallVec<[ChannelWaiter; 2]>) -> Option<ChannelWaiter> {
wakers.retain(|entry| entry.queued.load(Ordering::Acquire));
if wakers.is_empty() {
None
} else {
Some(wakers.remove(0))
}
}
pub trait SymbolSink: Send + Unpin {
fn poll_send(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
symbol: AuthenticatedSymbol,
) -> Poll<Result<(), SinkError>>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), SinkError>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), SinkError>>;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), SinkError>>;
}
pub trait SymbolSinkExt: SymbolSink {
fn send(&mut self, symbol: AuthenticatedSymbol) -> SendFuture<'_, Self>
where
Self: Unpin,
{
SendFuture {
sink: self,
symbol: Some(symbol),
completed: false,
}
}
fn send_all<I>(&mut self, symbols: I) -> SendAllFuture<'_, Self, I::IntoIter>
where
Self: Unpin,
I: IntoIterator<Item = AuthenticatedSymbol>,
{
SendAllFuture {
sink: self,
iter: symbols.into_iter(),
buffered: None,
count: 0,
completed: false,
iter_exhausted: false,
}
}
fn flush(&mut self) -> FlushFuture<'_, Self>
where
Self: Unpin,
{
FlushFuture {
sink: self,
completed: false,
}
}
fn close(&mut self) -> CloseFuture<'_, Self>
where
Self: Unpin,
{
CloseFuture {
sink: self,
completed: false,
}
}
fn buffer(self, capacity: usize) -> BufferedSink<Self>
where
Self: Sized,
{
BufferedSink::new(self, capacity)
}
}
impl<S: SymbolSink + ?Sized> SymbolSinkExt for S {}
const SEND_ALL_COOPERATIVE_BUDGET: usize = 1024;
pub struct SendFuture<'a, S: ?Sized> {
sink: &'a mut S,
symbol: Option<AuthenticatedSymbol>,
completed: bool,
}
impl<S: SymbolSink + Unpin + ?Sized> Future for SendFuture<'_, S> {
type Output = Result<(), SinkError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if this.completed {
return Poll::Ready(Err(SinkError::PolledAfterCompletion));
}
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
this.completed = true;
return Poll::Ready(Err(SinkError::Io {
source: std::io::Error::new(std::io::ErrorKind::Interrupted, "cancelled"),
}));
}
match Pin::new(&mut *this.sink).poll_ready(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => {
this.completed = true;
return Poll::Ready(Err(e));
}
Poll::Pending => return Poll::Pending,
}
if let Some(symbol) = this.symbol.take() {
match Pin::new(&mut *this.sink).poll_send(cx, symbol.clone()) {
Poll::Ready(Ok(())) => {
this.completed = true;
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
this.completed = true;
Poll::Ready(Err(e))
}
Poll::Pending => {
this.symbol = Some(symbol);
Poll::Pending
}
}
} else {
this.completed = true;
Poll::Ready(Err(SinkError::PolledAfterCompletion))
}
}
}
pub struct SendAllFuture<'a, S: ?Sized, I> {
sink: &'a mut S,
iter: I,
buffered: Option<AuthenticatedSymbol>,
count: usize,
completed: bool,
iter_exhausted: bool,
}
impl<S, I> Future for SendAllFuture<'_, S, I>
where
S: SymbolSink + Unpin + ?Sized,
I: Iterator<Item = AuthenticatedSymbol> + Unpin,
{
type Output = Result<usize, SinkError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.completed {
return Poll::Ready(Err(SinkError::PolledAfterCompletion));
}
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
self.completed = true;
return Poll::Ready(Err(SinkError::Io {
source: std::io::Error::new(std::io::ErrorKind::Interrupted, "cancelled"),
}));
}
let mut sent_this_poll = 0usize;
loop {
if let Some(symbol) = self.buffered.take() {
match Pin::new(&mut *self.sink).poll_ready(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => {
self.completed = true;
return Poll::Ready(Err(e));
}
Poll::Pending => {
self.buffered = Some(symbol);
return Poll::Pending;
}
}
match Pin::new(&mut *self.sink).poll_send(cx, symbol.clone()) {
Poll::Ready(Ok(())) => {
self.count += 1;
sent_this_poll += 1;
if sent_this_poll >= SEND_ALL_COOPERATIVE_BUDGET {
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
Poll::Ready(Err(e)) => {
self.completed = true;
return Poll::Ready(Err(e));
}
Poll::Pending => {
self.buffered = Some(symbol);
return Poll::Pending;
}
}
}
if self.iter_exhausted {
match Pin::new(&mut *self.sink).poll_flush(cx) {
Poll::Ready(Ok(())) => {
self.completed = true;
return Poll::Ready(Ok(self.count));
}
Poll::Ready(Err(e)) => {
self.completed = true;
return Poll::Ready(Err(e));
}
Poll::Pending => return Poll::Pending,
}
}
match self.iter.next() {
Some(symbol) => self.buffered = Some(symbol),
None => {
self.iter_exhausted = true;
}
}
}
}
}
pub struct FlushFuture<'a, S: ?Sized> {
sink: &'a mut S,
completed: bool,
}
impl<S: SymbolSink + Unpin + ?Sized> Future for FlushFuture<'_, S> {
type Output = Result<(), SinkError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.completed {
return Poll::Ready(Err(SinkError::PolledAfterCompletion));
}
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
self.completed = true;
return Poll::Ready(Err(SinkError::Io {
source: std::io::Error::new(std::io::ErrorKind::Interrupted, "cancelled"),
}));
}
match Pin::new(&mut *self.sink).poll_flush(cx) {
Poll::Ready(result) => {
self.completed = true;
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}
}
pub struct CloseFuture<'a, S: ?Sized> {
sink: &'a mut S,
completed: bool,
}
impl<S: SymbolSink + Unpin + ?Sized> Future for CloseFuture<'_, S> {
type Output = Result<(), SinkError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.completed {
return Poll::Ready(Err(SinkError::PolledAfterCompletion));
}
if crate::cx::Cx::current().is_some_and(|c| c.checkpoint().is_err()) {
self.completed = true;
return Poll::Ready(Err(SinkError::Io {
source: std::io::Error::new(std::io::ErrorKind::Interrupted, "cancelled"),
}));
}
match Pin::new(&mut *self.sink).poll_close(cx) {
Poll::Ready(result) => {
self.completed = true;
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}
}
use std::collections::VecDeque;
pub struct BufferedSink<S> {
inner: S,
buffer: VecDeque<AuthenticatedSymbol>,
staged_symbols: VecDeque<AuthenticatedSymbol>,
capacity: usize,
}
impl<S> BufferedSink<S> {
pub fn new(inner: S, capacity: usize) -> Self {
Self {
inner,
buffer: VecDeque::with_capacity(capacity),
staged_symbols: VecDeque::new(),
capacity,
}
}
}
impl<S: SymbolSink + Unpin> BufferedSink<S> {
fn poll_inner_terminal_error(&mut self, cx: &mut Context<'_>) -> Option<SinkError> {
match Pin::new(&mut self.inner).poll_ready(cx) {
Poll::Ready(Err(err)) => Some(err),
Poll::Ready(Ok(())) | Poll::Pending => None,
}
}
}
impl<S: SymbolSink + Unpin> SymbolSink for BufferedSink<S> {
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
let this = self.get_mut();
if this.buffer.len() < this.capacity && this.staged_symbols.is_empty() {
this.poll_inner_terminal_error(cx)
.map_or(Poll::Ready(Ok(())), |err| Poll::Ready(Err(err)))
} else {
Pin::new(this).poll_flush(cx)
}
}
fn poll_send(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
symbol: AuthenticatedSymbol,
) -> Poll<Result<(), SinkError>> {
let this = self.as_mut().get_mut();
if let Some(err) = this.poll_inner_terminal_error(cx) {
return Poll::Ready(Err(err));
}
if this.buffer.len() >= this.capacity || !this.staged_symbols.is_empty() {
if let Poll::Ready(Err(err)) = Pin::new(&mut *this).poll_flush(cx) {
return Poll::Ready(Err(err));
}
}
if this.buffer.len() < this.capacity && this.staged_symbols.is_empty() {
this.buffer.push_back(symbol);
return Poll::Ready(Ok(()));
}
if this.staged_symbols.len() >= this.capacity.saturating_mul(2).max(16) {
return Poll::Pending;
}
this.staged_symbols.push_back(symbol);
Poll::Ready(Ok(()))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
let this = self.as_mut().get_mut();
loop {
while this.buffer.len() < this.capacity {
let Some(symbol) = this.staged_symbols.pop_front() else {
break;
};
this.buffer.push_back(symbol);
}
if this.buffer.is_empty() {
break;
}
match Pin::new(&mut this.inner).poll_ready(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
}
let symbol = match this.buffer.front() {
Some(symbol) => symbol.clone(),
None => break,
};
match Pin::new(&mut this.inner).poll_send(cx, symbol) {
Poll::Ready(Ok(())) => {
this.buffer.pop_front();
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {
return Poll::Pending;
}
}
}
Pin::new(&mut self.get_mut().inner).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
let this = self.as_mut().get_mut();
match Pin::new(this).poll_flush(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
}
Pin::new(&mut self.get_mut().inner).poll_close(cx)
}
}
pub struct ChannelSink {
shared: Arc<SharedChannel>,
waiter: Option<Arc<AtomicBool>>,
}
impl ChannelSink {
pub(crate) fn new(shared: Arc<SharedChannel>) -> Self {
Self {
shared,
waiter: None,
}
}
}
impl Drop for ChannelSink {
fn drop(&mut self) {
let Some(waiter) = self.waiter.as_ref() else {
return;
};
waiter.store(false, Ordering::Release);
{
let mut wakers = self.shared.send_wakers.lock();
wakers.retain(|entry| !Arc::ptr_eq(&entry.queued, waiter));
}
let has_capacity = {
let queue = self.shared.queue.lock();
queue.len() < self.shared.capacity && !self.shared.closed.load(Ordering::Acquire)
};
if has_capacity {
let next_waiter = {
let mut wakers = self.shared.send_wakers.lock();
pop_next_queued_waiter(&mut wakers)
};
if let Some(w) = next_waiter {
w.queued.store(false, Ordering::Release);
w.waker.wake();
}
}
}
}
impl SymbolSink for ChannelSink {
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
let this = self.get_mut();
let queue = this.shared.queue.lock();
if this.shared.closed.load(Ordering::Acquire) {
return Poll::Ready(Err(SinkError::Closed));
}
if queue.len() < this.shared.capacity {
if let Some(waiter) = this.waiter.as_ref() {
waiter.store(false, Ordering::Release);
}
Poll::Ready(Ok(()))
} else {
drop(queue); if this.shared.closed.load(Ordering::Acquire) {
return Poll::Ready(Err(SinkError::Closed));
}
let mut new_waiter = None;
let mut closed = false;
{
let mut wakers = this.shared.send_wakers.lock();
if this.shared.closed.load(Ordering::Acquire) {
closed = true;
} else {
match this.waiter.as_ref() {
Some(waiter) if !waiter.load(Ordering::Acquire) => {
waiter.store(true, Ordering::Release);
upsert_channel_waiter(&mut wakers, waiter, cx.waker());
}
Some(waiter) => {
upsert_channel_waiter(&mut wakers, waiter, cx.waker());
}
None => {
let waiter = Arc::new(AtomicBool::new(true));
upsert_channel_waiter(&mut wakers, &waiter, cx.waker());
new_waiter = Some(waiter);
}
}
}
drop(wakers);
}
if closed {
return Poll::Ready(Err(SinkError::Closed));
}
if let Some(waiter) = new_waiter {
this.waiter = Some(waiter);
}
{
let queue = this.shared.queue.lock();
if queue.len() < this.shared.capacity || this.shared.closed.load(Ordering::Acquire)
{
drop(queue);
cx.waker().wake_by_ref();
}
}
Poll::Pending
}
}
fn poll_send(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
symbol: AuthenticatedSymbol,
) -> Poll<Result<(), SinkError>> {
let this = self.get_mut();
{
let mut queue = this.shared.queue.lock();
if this.shared.closed.load(Ordering::Acquire) {
return Poll::Ready(Err(SinkError::Closed));
}
if queue.len() >= this.shared.capacity {
return Poll::Ready(Err(SinkError::BufferFull));
}
queue.push_back(symbol);
}
let waiter = {
let mut wakers = this.shared.recv_wakers.lock();
pop_next_queued_waiter(&mut wakers)
};
if let Some(w) = waiter {
w.queued.store(false, Ordering::Release);
w.waker.wake();
}
Poll::Ready(Ok(()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
self.shared.close();
Poll::Ready(Ok(()))
}
}
pub struct CollectingSink {
symbols: Vec<AuthenticatedSymbol>,
}
impl CollectingSink {
#[must_use]
pub fn new() -> Self {
Self {
symbols: Vec::new(),
}
}
#[must_use]
pub fn symbols(&self) -> &[AuthenticatedSymbol] {
&self.symbols
}
#[must_use]
pub fn into_symbols(self) -> Vec<AuthenticatedSymbol> {
self.symbols
}
}
impl Default for CollectingSink {
fn default() -> Self {
Self::new()
}
}
impl SymbolSink for CollectingSink {
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
fn poll_send(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
symbol: AuthenticatedSymbol,
) -> Poll<Result<(), SinkError>> {
self.symbols.push(symbol);
Poll::Ready(Ok(()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::security::authenticated::AuthenticatedSymbol;
use crate::security::tag::AuthenticationTag;
use crate::transport::SharedChannel;
use crate::transport::channel;
use crate::transport::stream::SymbolStream;
use crate::transport::stream::SymbolStreamExt;
use crate::types::{Symbol, SymbolId, SymbolKind};
use futures_lite::future;
use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll, Waker};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn create_symbol(esi: u32) -> AuthenticatedSymbol {
let id = SymbolId::new_for_test(1, 0, esi);
let symbol = Symbol::new(id, vec![esi as u8], SymbolKind::Source);
let tag = AuthenticationTag::zero();
AuthenticatedSymbol::new_verified(symbol, tag)
}
fn queued_symbol_ids(shared: &SharedChannel) -> Vec<u32> {
shared
.queue
.lock()
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect()
}
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
struct FlagWake {
flag: Arc<AtomicBool>,
}
use std::task::Wake;
impl Wake for FlagWake {
fn wake(self: Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
}
}
fn flagged_waker(flag: Arc<AtomicBool>) -> Waker {
Waker::from(Arc::new(FlagWake { flag }))
}
#[allow(clippy::struct_excessive_bools)]
struct TrackingSinkState {
ready_after: usize,
ready_polls: usize,
send_pending_once: bool,
send_pending_done: bool,
send_error_once: bool,
sent: Vec<AuthenticatedSymbol>,
flush_count: usize,
closed: bool,
}
impl TrackingSinkState {
fn new() -> Self {
Self {
ready_after: 0,
ready_polls: 0,
send_pending_once: false,
send_pending_done: false,
send_error_once: false,
sent: Vec::new(),
flush_count: 0,
closed: false,
}
}
}
#[derive(Clone)]
struct TrackingSink {
state: Arc<Mutex<TrackingSinkState>>,
}
impl TrackingSink {
fn new(state: TrackingSinkState) -> Self {
Self {
state: Arc::new(Mutex::new(state)),
}
}
fn state(&self) -> Arc<Mutex<TrackingSinkState>> {
Arc::clone(&self.state)
}
}
impl SymbolSink for TrackingSink {
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
let mut state = self.state.lock();
if state.closed {
drop(state);
return Poll::Ready(Err(SinkError::Closed));
}
if state.ready_polls < state.ready_after {
state.ready_polls += 1;
drop(state);
return Poll::Pending;
}
drop(state);
Poll::Ready(Ok(()))
}
fn poll_send(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
symbol: AuthenticatedSymbol,
) -> Poll<Result<(), SinkError>> {
let mut state = self.state.lock();
if state.closed {
drop(state);
return Poll::Ready(Err(SinkError::Closed));
}
if state.send_error_once {
state.send_error_once = false;
drop(state);
return Poll::Ready(Err(SinkError::SendFailed {
reason: "send failed".to_string(),
}));
}
if state.send_pending_once && !state.send_pending_done {
state.send_pending_done = true;
drop(state);
return Poll::Pending;
}
state.sent.push(symbol);
drop(state);
Poll::Ready(Ok(()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
let mut state = self.state.lock();
if state.closed {
drop(state);
return Poll::Ready(Err(SinkError::Closed));
}
state.flush_count += 1;
drop(state);
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
let mut state = self.state.lock();
state.closed = true;
drop(state);
Poll::Ready(Ok(()))
}
}
#[derive(Default)]
struct SequencedSendSinkState {
send_calls: usize,
pending_send_call: usize,
sent: Vec<AuthenticatedSymbol>,
flush_count: usize,
}
#[derive(Clone)]
struct SequencedSendSink {
state: Arc<Mutex<SequencedSendSinkState>>,
}
impl SequencedSendSink {
fn new(pending_send_call: usize) -> Self {
Self {
state: Arc::new(Mutex::new(SequencedSendSinkState {
pending_send_call,
..SequencedSendSinkState::default()
})),
}
}
fn state(&self) -> Arc<Mutex<SequencedSendSinkState>> {
Arc::clone(&self.state)
}
}
impl SymbolSink for SequencedSendSink {
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
fn poll_send(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
symbol: AuthenticatedSymbol,
) -> Poll<Result<(), SinkError>> {
let mut state = self.state.lock();
state.send_calls = state.send_calls.saturating_add(1);
if state.pending_send_call != 0 && state.send_calls == state.pending_send_call {
return Poll::Pending;
}
state.sent.push(symbol);
drop(state);
Poll::Ready(Ok(()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
let mut state = self.state.lock();
state.flush_count = state.flush_count.saturating_add(1);
drop(state);
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SinkError>> {
Poll::Ready(Ok(()))
}
}
#[test]
fn test_send_future_pending_then_ready() {
init_test("test_send_future_pending_then_ready");
let mut sink = TrackingSink::new({
let mut state = TrackingSinkState::new();
state.ready_after = 1;
state
});
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let mut fut = sink.send(create_symbol(1));
let mut fut = Pin::new(&mut fut);
let first = fut.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(first, Poll::Pending),
"pending",
true,
matches!(first, Poll::Pending)
);
let second = fut.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(second, Poll::Ready(Ok(()))),
"ready",
true,
matches!(second, Poll::Ready(Ok(())))
);
let sent_len = {
let state = sink.state.lock();
state.sent.len()
};
crate::assert_with_log!(sent_len == 1, "sent", 1usize, sent_len);
crate::test_complete!("test_send_future_pending_then_ready");
}
#[test]
fn test_send_future_propagates_send_error() {
init_test("test_send_future_propagates_send_error");
let mut sink = TrackingSink::new({
let mut state = TrackingSinkState::new();
state.send_error_once = true;
state
});
let res = future::block_on(async { sink.send(create_symbol(2)).await });
crate::assert_with_log!(
matches!(res, Err(SinkError::SendFailed { .. })),
"send failed",
true,
matches!(res, Err(SinkError::SendFailed { .. }))
);
let sent_empty = {
let state = sink.state.lock();
state.sent.is_empty()
};
crate::assert_with_log!(sent_empty, "no sent", true, sent_empty);
crate::test_complete!("test_send_future_propagates_send_error");
}
#[test]
fn test_send_future_repoll_after_completion_fails_closed() {
init_test("test_send_future_repoll_after_completion_fails_closed");
let mut sink = TrackingSink::new(TrackingSinkState::new());
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let mut future = sink.send(create_symbol(44));
let mut future = Pin::new(&mut future);
let first = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(first, Poll::Ready(Ok(()))),
"first send completes",
true,
matches!(first, Poll::Ready(Ok(())))
);
let second = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(SinkError::PolledAfterCompletion))),
"second poll fails closed",
true,
matches!(second, Poll::Ready(Err(SinkError::PolledAfterCompletion)))
);
let sent_len = {
let state = sink.state.lock();
state.sent.len()
};
crate::assert_with_log!(sent_len == 1, "symbol sent once", 1usize, sent_len);
crate::test_complete!("test_send_future_repoll_after_completion_fails_closed");
}
#[test]
fn test_send_all_counts_and_flushes() {
init_test("test_send_all_counts_and_flushes");
let mut sink = TrackingSink::new(TrackingSinkState::new());
let symbols = vec![create_symbol(1), create_symbol(2), create_symbol(3)];
let count = future::block_on(async { sink.send_all(symbols).await.unwrap() });
let (sent_len, flush_count) = {
let state = sink.state.lock();
(state.sent.len(), state.flush_count)
};
crate::assert_with_log!(count == 3, "count", 3usize, count);
crate::assert_with_log!(sent_len == 3, "sent", 3usize, sent_len);
crate::assert_with_log!(flush_count == 1, "flush count", 1usize, flush_count);
crate::test_complete!("test_send_all_counts_and_flushes");
}
#[test]
fn test_send_all_yields_after_budget_on_always_ready_sink() {
init_test("test_send_all_yields_after_budget_on_always_ready_sink");
let mut sink = TrackingSink::new(TrackingSinkState::new());
let state = sink.state();
let symbols = (0..(SEND_ALL_COOPERATIVE_BUDGET as u32 + 5))
.map(create_symbol)
.collect::<Vec<_>>();
let woke = Arc::new(AtomicBool::new(false));
let waker = flagged_waker(woke.clone());
let mut context = Context::from_waker(&waker);
let mut future = sink.send_all(symbols);
let mut future = Pin::new(&mut future);
let first = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(first, Poll::Pending),
"first poll yields cooperatively",
"Poll::Pending",
first
);
crate::assert_with_log!(
woke.load(Ordering::SeqCst),
"cooperative yield self-wakes",
true,
woke.load(Ordering::SeqCst)
);
crate::assert_with_log!(
future.count == SEND_ALL_COOPERATIVE_BUDGET,
"budgeted sends counted before yielding",
SEND_ALL_COOPERATIVE_BUDGET,
future.count
);
let (sent_len, flush_count) = {
let state = state.lock();
(state.sent.len(), state.flush_count)
};
crate::assert_with_log!(
sent_len == SEND_ALL_COOPERATIVE_BUDGET,
"sink observed only budgeted sends on first poll",
SEND_ALL_COOPERATIVE_BUDGET,
sent_len
);
crate::assert_with_log!(
flush_count == 0,
"flush deferred until iterator drains",
0usize,
flush_count
);
let second = future.as_mut().poll(&mut context);
let second_total = match second {
Poll::Ready(Ok(total)) => Some(total),
_ => None,
};
crate::assert_with_log!(
second_total == Some(SEND_ALL_COOPERATIVE_BUDGET + 5),
"second poll completes remaining sends",
Some(SEND_ALL_COOPERATIVE_BUDGET + 5),
second_total
);
let (sent_len, flush_count) = {
let state = state.lock();
(state.sent.len(), state.flush_count)
};
crate::assert_with_log!(
sent_len == SEND_ALL_COOPERATIVE_BUDGET + 5,
"all symbols were eventually sent",
SEND_ALL_COOPERATIVE_BUDGET + 5,
sent_len
);
crate::assert_with_log!(
flush_count == 1,
"final completion still flushes exactly once",
1usize,
flush_count
);
crate::test_complete!("test_send_all_yields_after_budget_on_always_ready_sink");
}
#[test]
fn test_send_all_repoll_after_completion_fails_closed() {
init_test("test_send_all_repoll_after_completion_fails_closed");
let mut sink = TrackingSink::new(TrackingSinkState::new());
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let mut future = sink.send_all(vec![create_symbol(8), create_symbol(9)]);
let mut future = Pin::new(&mut future);
let first = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(first, Poll::Ready(Ok(2))),
"first poll sends all",
true,
matches!(first, Poll::Ready(Ok(2)))
);
let second = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(SinkError::PolledAfterCompletion))),
"second poll fails closed",
true,
matches!(second, Poll::Ready(Err(SinkError::PolledAfterCompletion)))
);
let (sent_len, flush_count) = {
let state = sink.state.lock();
(state.sent.len(), state.flush_count)
};
crate::assert_with_log!(sent_len == 2, "symbols sent once", 2usize, sent_len);
crate::assert_with_log!(flush_count == 1, "flush ran once", 1usize, flush_count);
crate::test_complete!("test_send_all_repoll_after_completion_fails_closed");
}
#[test]
fn test_send_all_propagates_error() {
init_test("test_send_all_propagates_error");
let mut sink = TrackingSink::new({
let mut state = TrackingSinkState::new();
state.send_error_once = true;
state
});
let res = future::block_on(async { sink.send_all(vec![create_symbol(9)]).await });
crate::assert_with_log!(
matches!(res, Err(SinkError::SendFailed { .. })),
"error",
true,
matches!(res, Err(SinkError::SendFailed { .. }))
);
crate::test_complete!("test_send_all_propagates_error");
}
#[test]
fn metamorphic_batching_preserves_sink_send_order() {
init_test("metamorphic_batching_preserves_sink_send_order");
let symbols = vec![
create_symbol(11),
create_symbol(12),
create_symbol(13),
create_symbol(14),
];
let mut baseline_sink = TrackingSink::new(TrackingSinkState::new());
for symbol in symbols.clone() {
future::block_on(async { baseline_sink.send(symbol).await.unwrap() });
}
let baseline_ids = {
let state = baseline_sink.state.lock();
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>()
};
let mut transformed_sink = TrackingSink::new(TrackingSinkState::new());
let transformed_count =
future::block_on(async { transformed_sink.send_all(symbols).await.unwrap() });
let (transformed_ids, transformed_flushes) = {
let state = transformed_sink.state.lock();
(
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>(),
state.flush_count,
)
};
crate::assert_with_log!(
transformed_ids == baseline_ids,
"batching the same symbol sequence must preserve sink emission order",
baseline_ids,
transformed_ids
);
crate::assert_with_log!(
transformed_count == 4,
"send_all reports the full batched symbol count",
4usize,
transformed_count
);
crate::assert_with_log!(
transformed_flushes == 1,
"send_all still flushes exactly once after batching",
1usize,
transformed_flushes
);
crate::test_complete!("metamorphic_batching_preserves_sink_send_order");
}
#[test]
fn metamorphic_batch_partitioning_preserves_symbol_count_and_order() {
init_test("metamorphic_batch_partitioning_preserves_symbol_count_and_order");
let symbols = vec![
create_symbol(31),
create_symbol(32),
create_symbol(33),
create_symbol(34),
create_symbol(35),
create_symbol(36),
];
let mut baseline_sink = TrackingSink::new(TrackingSinkState::new());
let baseline_count =
future::block_on(async { baseline_sink.send_all(symbols.clone()).await.unwrap() });
let baseline_ids = {
let state = baseline_sink.state.lock();
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>()
};
let partitions = [2usize, 1usize, 3usize];
let mut transformed_sink = TrackingSink::new(TrackingSinkState::new());
let mut transformed_count = 0usize;
let mut offset = 0usize;
for width in partitions {
let upper = offset + width;
transformed_count += future::block_on(async {
transformed_sink
.send_all(symbols[offset..upper].iter().cloned())
.await
.unwrap()
});
offset = upper;
}
let (transformed_ids, transformed_flushes) = {
let state = transformed_sink.state.lock();
(
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>(),
state.flush_count,
)
};
crate::assert_with_log!(
transformed_ids == baseline_ids,
"partitioning one batch into smaller batches must preserve emitted order",
baseline_ids,
transformed_ids
);
crate::assert_with_log!(
transformed_count == baseline_count,
"partitioning must preserve the total number of emitted symbols",
baseline_count,
transformed_count
);
crate::assert_with_log!(
transformed_flushes == partitions.len(),
"each partition still flushes exactly once",
partitions.len(),
transformed_flushes
);
crate::test_complete!("metamorphic_batch_partitioning_preserves_symbol_count_and_order");
}
#[test]
fn metamorphic_cancelled_batch_preserves_committed_prefix() {
init_test("metamorphic_cancelled_batch_preserves_committed_prefix");
let symbols = vec![
create_symbol(41),
create_symbol(42),
create_symbol(43),
create_symbol(44),
];
let mut baseline_sink = TrackingSink::new(TrackingSinkState::new());
future::block_on(async {
baseline_sink.send(symbols[0].clone()).await.unwrap();
baseline_sink.send(symbols[1].clone()).await.unwrap();
});
let baseline_prefix_ids = {
let state = baseline_sink.state.lock();
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>()
};
let cx = crate::cx::Cx::for_testing();
let _current_guard = crate::cx::Cx::set_current(Some(cx.clone()));
let mut sink = SequencedSendSink::new(3);
let state = sink.state();
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let mut future = sink.send_all(symbols);
let mut future = Pin::new(&mut future);
let first = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(first, Poll::Pending),
"partial batch reaches backpressure before cancellation",
true,
matches!(first, Poll::Pending)
);
let committed_prefix_ids = {
let state = state.lock();
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>()
};
crate::assert_with_log!(
committed_prefix_ids == baseline_prefix_ids,
"partial progress before cancellation preserves the committed prefix",
baseline_prefix_ids.clone(),
committed_prefix_ids
);
cx.set_cancel_requested(true);
let cancelled = future.as_mut().poll(&mut context);
let cancel_kind = match cancelled {
Poll::Ready(Err(SinkError::Io { source })) => Some(source.kind()),
_ => None,
};
crate::assert_with_log!(
cancel_kind == Some(std::io::ErrorKind::Interrupted),
"cancellation converts the in-flight batch into an interrupted sink error",
Some(std::io::ErrorKind::Interrupted),
cancel_kind
);
let (cancelled_ids, flush_count) = {
let state = state.lock();
(
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>(),
state.flush_count,
)
};
crate::assert_with_log!(
cancelled_ids == baseline_prefix_ids,
"cancellation must not duplicate or reorder the already-committed prefix",
baseline_prefix_ids,
cancelled_ids
);
crate::assert_with_log!(
flush_count == 0,
"cancelled partial batches must not flush a not-yet-exhausted iterator",
0usize,
flush_count
);
crate::test_complete!("metamorphic_cancelled_batch_preserves_committed_prefix");
}
#[test]
fn metamorphic_buffered_sink_backpressure_preserves_capacity_and_delivery_order() {
init_test("metamorphic_buffered_sink_backpressure_preserves_capacity_and_delivery_order");
let symbols = [1_u32, 2, 3, 4].map(create_symbol);
let mut baseline_sink = TrackingSink::new(TrackingSinkState::new());
let baseline_count = future::block_on(async {
baseline_sink
.send_all(symbols.iter().cloned())
.await
.unwrap()
});
let baseline_ids = {
let state = baseline_sink.state.lock();
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>()
};
let inner = TrackingSink::new({
let mut state = TrackingSinkState::new();
state.ready_after = 10;
state
});
let state = inner.state();
let mut buffered = BufferedSink::new(inner, 2);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
for symbol in symbols.iter().cloned() {
let poll = Pin::new(&mut buffered).poll_send(&mut context, symbol);
crate::assert_with_log!(
matches!(poll, Poll::Ready(Ok(()))),
"buffered send accepted despite downstream backpressure",
true,
matches!(poll, Poll::Ready(Ok(())))
);
crate::assert_with_log!(
buffered.buffer.len() <= buffered.capacity,
"primary buffer never exceeds configured capacity",
true,
buffered.buffer.len() <= buffered.capacity
);
}
let mut flush_completed = false;
for _ in 0..=10 {
let poll = Pin::new(&mut buffered).poll_flush(&mut context);
crate::assert_with_log!(
buffered.buffer.len() <= buffered.capacity,
"flush retries keep the primary buffer bounded",
true,
buffered.buffer.len() <= buffered.capacity
);
if matches!(poll, Poll::Ready(Ok(()))) {
flush_completed = true;
break;
}
crate::assert_with_log!(
matches!(poll, Poll::Pending),
"intermediate flushes stay pending until the inner sink is ready",
true,
matches!(poll, Poll::Pending)
);
}
let transformed_ids = {
let state = state.lock();
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>()
};
crate::assert_with_log!(
flush_completed,
"eventual flush completes once backpressure clears",
true,
flush_completed
);
crate::assert_with_log!(
transformed_ids == baseline_ids,
"backpressure retries preserve final delivery order",
baseline_ids.clone(),
transformed_ids
);
crate::assert_with_log!(
baseline_count == baseline_ids.len(),
"baseline count matches delivered symbols",
baseline_ids.len(),
baseline_count
);
crate::assert_with_log!(
buffered.buffer.is_empty() && buffered.staged_symbols.is_empty(),
"all local buffering drains after the final flush",
true,
buffered.buffer.is_empty() && buffered.staged_symbols.is_empty()
);
crate::test_complete!(
"metamorphic_buffered_sink_backpressure_preserves_capacity_and_delivery_order"
);
}
#[test]
fn metamorphic_cancelled_send_future_does_not_consume_released_channel_capacity() {
init_test("metamorphic_cancelled_send_future_does_not_consume_released_channel_capacity");
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let (mut baseline_fill, mut baseline_stream) = channel(1);
let baseline_shared = Arc::clone(&baseline_fill.shared);
future::block_on(async {
baseline_fill.send(create_symbol(1)).await.unwrap();
let received = baseline_stream.next().await.unwrap().unwrap();
crate::assert_with_log!(
received.symbol().id().esi() == 1,
"baseline drain receives the queued symbol",
1_u32,
received.symbol().id().esi()
);
});
let mut baseline_replacement = ChannelSink::new(Arc::clone(&baseline_shared));
let baseline_ready = Pin::new(&mut baseline_replacement).poll_ready(&mut context);
let baseline_send =
Pin::new(&mut baseline_replacement).poll_send(&mut context, create_symbol(2));
crate::assert_with_log!(
matches!(baseline_ready, Poll::Ready(Ok(()))),
"baseline replacement sender sees freed capacity",
true,
matches!(baseline_ready, Poll::Ready(Ok(())))
);
crate::assert_with_log!(
matches!(baseline_send, Poll::Ready(Ok(()))),
"baseline replacement send succeeds",
true,
matches!(baseline_send, Poll::Ready(Ok(())))
);
let baseline_ids = queued_symbol_ids(&baseline_shared);
let (mut transformed_fill, mut transformed_stream) = channel(1);
let transformed_shared = Arc::clone(&transformed_fill.shared);
future::block_on(async {
transformed_fill.send(create_symbol(1)).await.unwrap();
});
let mut cancelled_sender = ChannelSink::new(Arc::clone(&transformed_shared));
let mut replacement_sender = ChannelSink::new(Arc::clone(&transformed_shared));
{
let cx = crate::cx::Cx::for_testing();
let _current_guard = crate::cx::Cx::set_current(Some(cx.clone()));
let mut pending_send = cancelled_sender.send(create_symbol(99));
let mut pending_send = Pin::new(&mut pending_send);
let first = pending_send.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(first, Poll::Pending),
"full channel blocks the in-flight send before cancellation",
true,
matches!(first, Poll::Pending)
);
let waiter_registered = transformed_shared.send_wakers.lock().len();
crate::assert_with_log!(
waiter_registered == 1,
"pending send registers exactly one waiter",
1usize,
waiter_registered
);
cx.set_cancel_requested(true);
let cancelled = pending_send.as_mut().poll(&mut context);
let cancel_kind = match cancelled {
Poll::Ready(Err(SinkError::Io { source })) => Some(source.kind()),
_ => None,
};
crate::assert_with_log!(
cancel_kind == Some(std::io::ErrorKind::Interrupted),
"cancellation converts the blocked send into an interrupted error",
Some(std::io::ErrorKind::Interrupted),
cancel_kind
);
}
future::block_on(async {
let received = transformed_stream.next().await.unwrap().unwrap();
crate::assert_with_log!(
received.symbol().id().esi() == 1,
"transformed drain receives the original queued symbol",
1_u32,
received.symbol().id().esi()
);
});
let transformed_ready = Pin::new(&mut replacement_sender).poll_ready(&mut context);
let transformed_send =
Pin::new(&mut replacement_sender).poll_send(&mut context, create_symbol(2));
crate::assert_with_log!(
matches!(transformed_ready, Poll::Ready(Ok(()))),
"replacement sender sees the released capacity after cancellation",
true,
matches!(transformed_ready, Poll::Ready(Ok(())))
);
crate::assert_with_log!(
matches!(transformed_send, Poll::Ready(Ok(()))),
"replacement sender can commit after the cancelled send drops out",
true,
matches!(transformed_send, Poll::Ready(Ok(())))
);
let transformed_ids = queued_symbol_ids(&transformed_shared);
let waiter_cleared = cancelled_sender
.waiter
.as_ref()
.is_some_and(|waiter| !waiter.load(Ordering::Acquire));
let remaining_waiters = transformed_shared.send_wakers.lock().len();
crate::assert_with_log!(
transformed_ids == baseline_ids,
"cancelled sends do not consume the next released buffer slot",
baseline_ids,
transformed_ids
);
crate::assert_with_log!(
waiter_cleared,
"draining the queued symbol clears the cancelled sender waiter",
true,
waiter_cleared
);
crate::assert_with_log!(
remaining_waiters == 0,
"no stale send waiters remain after the cancelled path drains",
0usize,
remaining_waiters
);
crate::test_complete!(
"metamorphic_cancelled_send_future_does_not_consume_released_channel_capacity"
);
}
#[test]
fn metamorphic_channel_sink_concurrent_poll_ready_preserves_total_send_order() {
init_test("metamorphic_channel_sink_concurrent_poll_ready_preserves_total_send_order");
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let baseline_shared = Arc::new(SharedChannel::new(2));
let mut baseline_first = ChannelSink::new(Arc::clone(&baseline_shared));
let mut baseline_second = ChannelSink::new(Arc::clone(&baseline_shared));
let baseline_first_ready = Pin::new(&mut baseline_first).poll_ready(&mut context);
let baseline_first_send =
Pin::new(&mut baseline_first).poll_send(&mut context, create_symbol(11));
let baseline_second_ready = Pin::new(&mut baseline_second).poll_ready(&mut context);
let baseline_second_send =
Pin::new(&mut baseline_second).poll_send(&mut context, create_symbol(12));
crate::assert_with_log!(
matches!(baseline_first_ready, Poll::Ready(Ok(())))
&& matches!(baseline_first_send, Poll::Ready(Ok(())))
&& matches!(baseline_second_ready, Poll::Ready(Ok(())))
&& matches!(baseline_second_send, Poll::Ready(Ok(()))),
"baseline send order commits cleanly",
true,
matches!(baseline_first_ready, Poll::Ready(Ok(())))
&& matches!(baseline_first_send, Poll::Ready(Ok(())))
&& matches!(baseline_second_ready, Poll::Ready(Ok(())))
&& matches!(baseline_second_send, Poll::Ready(Ok(())))
);
let baseline_ids = queued_symbol_ids(&baseline_shared);
let transformed_shared = Arc::new(SharedChannel::new(2));
let mut transformed_first = ChannelSink::new(Arc::clone(&transformed_shared));
let mut transformed_second = ChannelSink::new(Arc::clone(&transformed_shared));
let transformed_first_ready = Pin::new(&mut transformed_first).poll_ready(&mut context);
let transformed_second_ready = Pin::new(&mut transformed_second).poll_ready(&mut context);
let transformed_first_send =
Pin::new(&mut transformed_first).poll_send(&mut context, create_symbol(11));
let transformed_second_send =
Pin::new(&mut transformed_second).poll_send(&mut context, create_symbol(12));
crate::assert_with_log!(
matches!(transformed_first_ready, Poll::Ready(Ok(())))
&& matches!(transformed_second_ready, Poll::Ready(Ok(())))
&& matches!(transformed_first_send, Poll::Ready(Ok(())))
&& matches!(transformed_second_send, Poll::Ready(Ok(()))),
"concurrent ready polling still allows both sends to commit",
true,
matches!(transformed_first_ready, Poll::Ready(Ok(())))
&& matches!(transformed_second_ready, Poll::Ready(Ok(())))
&& matches!(transformed_first_send, Poll::Ready(Ok(())))
&& matches!(transformed_second_send, Poll::Ready(Ok(())))
);
let transformed_ids = queued_symbol_ids(&transformed_shared);
crate::assert_with_log!(
transformed_ids == baseline_ids,
"extra poll_ready interleaving preserves the committed channel order",
baseline_ids.clone(),
transformed_ids
);
crate::assert_with_log!(
baseline_ids == vec![11, 12],
"queue order matches the committed send order",
vec![11_u32, 12_u32],
baseline_ids
);
crate::test_complete!(
"metamorphic_channel_sink_concurrent_poll_ready_preserves_total_send_order"
);
}
#[test]
fn test_flush_future_repoll_after_completion_fails_closed() {
init_test("test_flush_future_repoll_after_completion_fails_closed");
let mut sink = TrackingSink::new(TrackingSinkState::new());
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let mut future = sink.flush();
let mut future = Pin::new(&mut future);
let first = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(first, Poll::Ready(Ok(()))),
"first flush completes",
true,
matches!(first, Poll::Ready(Ok(())))
);
let second = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(SinkError::PolledAfterCompletion))),
"second flush poll fails closed",
true,
matches!(second, Poll::Ready(Err(SinkError::PolledAfterCompletion)))
);
let flush_count = {
let state = sink.state.lock();
state.flush_count
};
crate::assert_with_log!(flush_count == 1, "flush invoked once", 1usize, flush_count);
crate::test_complete!("test_flush_future_repoll_after_completion_fails_closed");
}
#[test]
fn test_close_future_repoll_after_completion_fails_closed() {
init_test("test_close_future_repoll_after_completion_fails_closed");
let mut sink = TrackingSink::new(TrackingSinkState::new());
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let mut future = sink.close();
let mut future = Pin::new(&mut future);
let first = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(first, Poll::Ready(Ok(()))),
"first close completes",
true,
matches!(first, Poll::Ready(Ok(())))
);
let second = future.as_mut().poll(&mut context);
crate::assert_with_log!(
matches!(second, Poll::Ready(Err(SinkError::PolledAfterCompletion))),
"second close poll fails closed",
true,
matches!(second, Poll::Ready(Err(SinkError::PolledAfterCompletion)))
);
let closed = {
let state = sink.state.lock();
state.closed
};
crate::assert_with_log!(closed, "sink closed", true, closed);
crate::test_complete!("test_close_future_repoll_after_completion_fails_closed");
}
#[test]
fn test_buffered_sink_defers_send_until_flush() {
init_test("test_buffered_sink_defers_send_until_flush");
let mut buffered = BufferedSink::new(CollectingSink::new(), 2);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let first = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(1));
let second = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(2));
crate::assert_with_log!(
matches!(first, Poll::Ready(Ok(()))),
"first buffered",
true,
matches!(first, Poll::Ready(Ok(())))
);
crate::assert_with_log!(
matches!(second, Poll::Ready(Ok(()))),
"second buffered",
true,
matches!(second, Poll::Ready(Ok(())))
);
crate::assert_with_log!(
buffered.inner.symbols.is_empty(),
"inner empty before flush",
true,
buffered.inner.symbols.is_empty()
);
let flushed = Pin::new(&mut buffered).poll_flush(&mut context);
crate::assert_with_log!(
matches!(flushed, Poll::Ready(Ok(()))),
"flush ok",
true,
matches!(flushed, Poll::Ready(Ok(())))
);
crate::assert_with_log!(
buffered.inner.symbols.len() == 2,
"inner received",
2usize,
buffered.inner.symbols.len()
);
crate::test_complete!("test_buffered_sink_defers_send_until_flush");
}
#[test]
fn test_buffered_sink_ready_pending_when_inner_not_ready() {
init_test("test_buffered_sink_ready_pending_when_inner_not_ready");
let inner = TrackingSink::new({
let mut state = TrackingSinkState::new();
state.ready_after = 2;
state
});
let mut buffered = BufferedSink::new(inner, 1);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let send = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(7));
crate::assert_with_log!(
matches!(send, Poll::Ready(Ok(()))),
"buffered send",
true,
matches!(send, Poll::Ready(Ok(())))
);
let ready = Pin::new(&mut buffered).poll_ready(&mut context);
crate::assert_with_log!(
matches!(ready, Poll::Pending),
"ready pending",
true,
matches!(ready, Poll::Pending)
);
crate::assert_with_log!(
buffered.buffer.len() == 1,
"buffer retained",
1usize,
buffered.buffer.len()
);
crate::test_complete!("test_buffered_sink_ready_pending_when_inner_not_ready");
}
#[test]
fn test_buffered_sink_pending_full_send_retains_staged_symbol() {
init_test("test_buffered_sink_pending_full_send_retains_staged_symbol");
let inner = TrackingSink::new({
let mut state = TrackingSinkState::new();
state.ready_after = 3;
state
});
let mut buffered = BufferedSink::new(inner, 1);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let first = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(1));
crate::assert_with_log!(
matches!(first, Poll::Ready(Ok(()))),
"first symbol buffered",
true,
matches!(first, Poll::Ready(Ok(())))
);
let second = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(2));
crate::assert_with_log!(
matches!(second, Poll::Pending),
"second send backpressures instead of dropping",
true,
matches!(second, Poll::Pending)
);
crate::assert_with_log!(
buffered.buffer.len() == 1,
"full buffer still holds original symbol",
1usize,
buffered.buffer.len()
);
crate::assert_with_log!(
buffered.staged_symbols.len() == 1,
"staged symbol retained across flush backpressure",
true,
buffered.staged_symbols.len() == 1
);
let flushed = Pin::new(&mut buffered).poll_flush(&mut context);
crate::assert_with_log!(
matches!(flushed, Poll::Ready(Ok(()))),
"flush drains both buffered and pending symbols",
true,
matches!(flushed, Poll::Ready(Ok(())))
);
let sent_ids = {
let state = buffered.inner.state.lock();
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>()
};
crate::assert_with_log!(
sent_ids == vec![1, 2],
"flush preserves symbol order and retention",
vec![1_u32, 2_u32],
sent_ids
);
crate::assert_with_log!(
buffered.staged_symbols.is_empty(),
"staged backlog cleared after flush",
true,
buffered.staged_symbols.is_empty()
);
crate::test_complete!("test_buffered_sink_pending_full_send_retains_staged_symbol");
}
#[test]
fn test_buffered_sink_direct_poll_send_preserves_fifo_with_staged_backlog() {
init_test("test_buffered_sink_direct_poll_send_preserves_fifo_with_staged_backlog");
let inner = SequencedSendSink::new(2);
let state = inner.state();
let mut buffered = BufferedSink::new(inner, 2);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let first = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(1));
let second = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(2));
let third = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(3));
crate::assert_with_log!(
matches!(first, Poll::Ready(Ok(()))),
"first symbol buffered",
true,
matches!(first, Poll::Ready(Ok(())))
);
crate::assert_with_log!(
matches!(second, Poll::Ready(Ok(()))),
"second symbol buffered",
true,
matches!(second, Poll::Ready(Ok(())))
);
crate::assert_with_log!(
matches!(third, Poll::Pending),
"third symbol stalls on partial drain",
true,
matches!(third, Poll::Pending)
);
let sent_after_third = {
let state = state.lock();
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>()
};
crate::assert_with_log!(
sent_after_third == vec![1],
"partial drain only sends the head symbol",
vec![1_u32],
sent_after_third
);
let fourth = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(4));
crate::assert_with_log!(
matches!(fourth, Poll::Ready(Ok(()))),
"fourth direct send is retained and drained without reordering",
true,
matches!(fourth, Poll::Ready(Ok(())))
);
let flushed = Pin::new(&mut buffered).poll_flush(&mut context);
crate::assert_with_log!(
matches!(flushed, Poll::Ready(Ok(()))),
"final flush completes",
true,
matches!(flushed, Poll::Ready(Ok(())))
);
let sent_ids = {
let state = state.lock();
state
.sent
.iter()
.map(|symbol| symbol.symbol().id().esi())
.collect::<Vec<_>>()
};
crate::assert_with_log!(
sent_ids == vec![1, 2, 3, 4],
"all directly-polled sends retain FIFO order",
vec![1_u32, 2_u32, 3_u32, 4_u32],
sent_ids
);
crate::assert_with_log!(
buffered.buffer.is_empty(),
"local buffer drained",
true,
buffered.buffer.is_empty()
);
crate::assert_with_log!(
buffered.staged_symbols.is_empty(),
"staged backlog drained",
true,
buffered.staged_symbols.is_empty()
);
crate::test_complete!(
"test_buffered_sink_direct_poll_send_preserves_fifo_with_staged_backlog"
);
}
#[test]
fn test_buffered_sink_ready_errors_when_inner_closed() {
init_test("test_buffered_sink_ready_errors_when_inner_closed");
let inner = TrackingSink::new({
let mut state = TrackingSinkState::new();
state.closed = true;
state
});
let mut buffered = BufferedSink::new(inner, 2);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let ready = Pin::new(&mut buffered).poll_ready(&mut context);
crate::assert_with_log!(
matches!(ready, Poll::Ready(Err(SinkError::Closed))),
"closed inner fails poll_ready",
true,
matches!(ready, Poll::Ready(Err(SinkError::Closed)))
);
crate::assert_with_log!(
buffered.buffer.is_empty(),
"buffer remains empty",
true,
buffered.buffer.is_empty()
);
crate::test_complete!("test_buffered_sink_ready_errors_when_inner_closed");
}
#[test]
fn test_buffered_sink_send_future_rejects_closed_inner_without_buffering() {
init_test("test_buffered_sink_send_future_rejects_closed_inner_without_buffering");
let inner = TrackingSink::new({
let mut state = TrackingSinkState::new();
state.closed = true;
state
});
let mut buffered = BufferedSink::new(inner, 2);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
{
let mut send = buffered.send(create_symbol(13));
let poll = Pin::new(&mut send).poll(&mut context);
crate::assert_with_log!(
matches!(poll, Poll::Ready(Err(SinkError::Closed))),
"closed inner fails send future",
true,
matches!(poll, Poll::Ready(Err(SinkError::Closed)))
);
}
crate::assert_with_log!(
buffered.buffer.is_empty(),
"symbol not stranded in buffer",
true,
buffered.buffer.is_empty()
);
crate::test_complete!(
"test_buffered_sink_send_future_rejects_closed_inner_without_buffering"
);
}
#[test]
fn test_buffered_sink_ready_still_buffers_through_transient_inner_pending() {
init_test("test_buffered_sink_ready_still_buffers_through_transient_inner_pending");
let inner = TrackingSink::new({
let mut state = TrackingSinkState::new();
state.ready_after = 1;
state
});
let mut buffered = BufferedSink::new(inner, 2);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let ready = Pin::new(&mut buffered).poll_ready(&mut context);
crate::assert_with_log!(
matches!(ready, Poll::Ready(Ok(()))),
"transient inner pending still permits buffering",
true,
matches!(ready, Poll::Ready(Ok(())))
);
let send = Pin::new(&mut buffered).poll_send(&mut context, create_symbol(14));
crate::assert_with_log!(
matches!(send, Poll::Ready(Ok(()))),
"send buffers symbol despite transient backpressure",
true,
matches!(send, Poll::Ready(Ok(())))
);
crate::assert_with_log!(
buffered.buffer.len() == 1,
"symbol buffered locally",
1usize,
buffered.buffer.len()
);
crate::test_complete!(
"test_buffered_sink_ready_still_buffers_through_transient_inner_pending"
);
}
#[test]
fn test_channel_sink_pending_when_full_and_ready_after_recv() {
init_test("test_channel_sink_pending_when_full_and_ready_after_recv");
let (mut sink, mut stream) = channel(1);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let ready = Pin::new(&mut sink).poll_ready(&mut context);
crate::assert_with_log!(
matches!(ready, Poll::Ready(Ok(()))),
"ready ok",
true,
matches!(ready, Poll::Ready(Ok(())))
);
let send = Pin::new(&mut sink).poll_send(&mut context, create_symbol(1));
crate::assert_with_log!(
matches!(send, Poll::Ready(Ok(()))),
"send ok",
true,
matches!(send, Poll::Ready(Ok(())))
);
let pending = Pin::new(&mut sink).poll_ready(&mut context);
crate::assert_with_log!(
matches!(pending, Poll::Pending),
"pending when full",
true,
matches!(pending, Poll::Pending)
);
let queued = sink
.waiter
.as_ref()
.is_some_and(|flag| flag.load(Ordering::Acquire));
crate::assert_with_log!(queued, "waiter queued", true, queued);
future::block_on(async {
let _ = stream.next().await.unwrap().unwrap();
});
let ready_after = Pin::new(&mut sink).poll_ready(&mut context);
crate::assert_with_log!(
matches!(ready_after, Poll::Ready(Ok(()))),
"ready after recv",
true,
matches!(ready_after, Poll::Ready(Ok(())))
);
let queued_after = sink
.waiter
.as_ref()
.is_some_and(|flag| flag.load(Ordering::Acquire));
crate::assert_with_log!(!queued_after, "waiter cleared", false, queued_after);
crate::test_complete!("test_channel_sink_pending_when_full_and_ready_after_recv");
}
#[test]
fn test_channel_sink_drop_removes_queued_waiter() {
init_test("test_channel_sink_drop_removes_queued_waiter");
let shared = Arc::new(SharedChannel::new(1));
{
let mut queue = shared.queue.lock();
queue.push_back(create_symbol(1));
}
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let mut sink = ChannelSink::new(Arc::clone(&shared));
let pending = Pin::new(&mut sink).poll_ready(&mut context);
crate::assert_with_log!(
matches!(pending, Poll::Pending),
"ready pending when full",
true,
matches!(pending, Poll::Pending)
);
let queued_before = shared.send_wakers.lock().len();
crate::assert_with_log!(
queued_before == 1,
"one waiter registered",
1usize,
queued_before
);
drop(sink);
let queued_after = shared.send_wakers.lock().len();
crate::assert_with_log!(
queued_after == 0,
"queued waiter removed on drop",
0usize,
queued_after
);
crate::test_complete!("test_channel_sink_drop_removes_queued_waiter");
}
#[test]
fn test_channel_sink_refreshes_queued_waker_on_repoll() {
init_test("test_channel_sink_refreshes_queued_waker_on_repoll");
let (mut sink, mut stream) = channel(1);
let ready_waker = noop_waker();
let mut ready_context = Context::from_waker(&ready_waker);
let _ = Pin::new(&mut sink).poll_send(&mut ready_context, create_symbol(1));
let first_flag = Arc::new(AtomicBool::new(false));
let second_flag = Arc::new(AtomicBool::new(false));
let first_waker = flagged_waker(Arc::clone(&first_flag));
let second_waker = flagged_waker(Arc::clone(&second_flag));
let mut first_context = Context::from_waker(&first_waker);
let mut second_context = Context::from_waker(&second_waker);
let first_pending = Pin::new(&mut sink).poll_ready(&mut first_context);
crate::assert_with_log!(
matches!(first_pending, Poll::Pending),
"first poll pending",
true,
matches!(first_pending, Poll::Pending)
);
let second_pending = Pin::new(&mut sink).poll_ready(&mut second_context);
crate::assert_with_log!(
matches!(second_pending, Poll::Pending),
"second poll pending",
true,
matches!(second_pending, Poll::Pending)
);
let _ = SymbolStream::poll_next(Pin::new(&mut stream), &mut ready_context);
let first_woke = first_flag.load(Ordering::Acquire);
let second_woke = second_flag.load(Ordering::Acquire);
crate::assert_with_log!(!first_woke, "stale waker not used", false, first_woke);
crate::assert_with_log!(second_woke, "latest waker used", true, second_woke);
crate::test_complete!("test_channel_sink_refreshes_queued_waker_on_repoll");
}
#[test]
fn test_channel_sink_skips_stale_recv_waiter_entries() {
init_test("test_channel_sink_skips_stale_recv_waiter_entries");
let shared = Arc::new(SharedChannel::new(1));
let mut sink = ChannelSink::new(Arc::clone(&shared));
let stale_flag = Arc::new(AtomicBool::new(false));
let active_flag = Arc::new(AtomicBool::new(false));
let stale_queued = Arc::new(AtomicBool::new(false));
let active_queued = Arc::new(AtomicBool::new(true));
{
let mut recv_wakers = shared.recv_wakers.lock();
recv_wakers.push(ChannelWaiter {
waker: flagged_waker(Arc::clone(&active_flag)),
queued: Arc::clone(&active_queued),
});
recv_wakers.push(ChannelWaiter {
waker: flagged_waker(Arc::clone(&stale_flag)),
queued: Arc::clone(&stale_queued),
});
}
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let send = Pin::new(&mut sink).poll_send(&mut context, create_symbol(5));
crate::assert_with_log!(
matches!(send, Poll::Ready(Ok(()))),
"send succeeds",
true,
matches!(send, Poll::Ready(Ok(())))
);
let stale_woke = stale_flag.load(Ordering::Acquire);
let active_woke = active_flag.load(Ordering::Acquire);
crate::assert_with_log!(!stale_woke, "stale waiter not woken", false, stale_woke);
crate::assert_with_log!(active_woke, "active waiter woken", true, active_woke);
let active_cleared = !active_queued.load(Ordering::Acquire);
crate::assert_with_log!(
active_cleared,
"active waiter flag cleared",
true,
active_cleared
);
let recv_waiters_empty = shared.recv_wakers.lock().is_empty();
crate::assert_with_log!(
recv_waiters_empty,
"stale entries pruned",
true,
recv_waiters_empty
);
crate::test_complete!("test_channel_sink_skips_stale_recv_waiter_entries");
}
#[test]
fn test_channel_sink_wakes_oldest_recv_waiter_first() {
init_test("test_channel_sink_wakes_oldest_recv_waiter_first");
let shared = Arc::new(SharedChannel::new(2));
let mut sink = ChannelSink::new(Arc::clone(&shared));
let first_flag = Arc::new(AtomicBool::new(false));
let second_flag = Arc::new(AtomicBool::new(false));
let first_queued = Arc::new(AtomicBool::new(true));
let second_queued = Arc::new(AtomicBool::new(true));
{
let mut recv_wakers = shared.recv_wakers.lock();
recv_wakers.push(ChannelWaiter {
waker: flagged_waker(Arc::clone(&first_flag)),
queued: Arc::clone(&first_queued),
});
recv_wakers.push(ChannelWaiter {
waker: flagged_waker(Arc::clone(&second_flag)),
queued: Arc::clone(&second_queued),
});
}
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let send = Pin::new(&mut sink).poll_send(&mut context, create_symbol(9));
crate::assert_with_log!(
matches!(send, Poll::Ready(Ok(()))),
"send succeeds",
true,
matches!(send, Poll::Ready(Ok(())))
);
let first_woke = first_flag.load(Ordering::Acquire);
let second_woke = second_flag.load(Ordering::Acquire);
crate::assert_with_log!(first_woke, "first waiter woken", true, first_woke);
crate::assert_with_log!(
!second_woke,
"second waiter still waiting",
false,
second_woke
);
let second_still_queued = second_queued.load(Ordering::Acquire);
crate::assert_with_log!(
second_still_queued,
"second waiter remains queued",
true,
second_still_queued
);
let queued_len = shared.recv_wakers.lock().len();
crate::assert_with_log!(queued_len == 1, "one waiter remains", 1usize, queued_len);
crate::test_complete!("test_channel_sink_wakes_oldest_recv_waiter_first");
}
#[test]
fn test_channel_sink_poll_send_buffer_full() {
init_test("test_channel_sink_poll_send_buffer_full");
let (mut sink, _stream) = channel(1);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let ready = Pin::new(&mut sink).poll_ready(&mut context);
crate::assert_with_log!(
matches!(ready, Poll::Ready(Ok(()))),
"ready ok",
true,
matches!(ready, Poll::Ready(Ok(())))
);
let send = Pin::new(&mut sink).poll_send(&mut context, create_symbol(1));
crate::assert_with_log!(
matches!(send, Poll::Ready(Ok(()))),
"send ok",
true,
matches!(send, Poll::Ready(Ok(())))
);
let full = Pin::new(&mut sink).poll_send(&mut context, create_symbol(2));
crate::assert_with_log!(
matches!(full, Poll::Ready(Err(SinkError::BufferFull))),
"buffer full",
true,
matches!(full, Poll::Ready(Err(SinkError::BufferFull)))
);
crate::test_complete!("test_channel_sink_poll_send_buffer_full");
}
#[test]
fn test_collecting_sink_collects() {
init_test("test_collecting_sink_collects");
let mut sink = CollectingSink::new();
future::block_on(async {
sink.send(create_symbol(1)).await.unwrap();
sink.send(create_symbol(2)).await.unwrap();
});
crate::assert_with_log!(
sink.symbols().len() == 2,
"len",
2usize,
sink.symbols().len()
);
crate::test_complete!("test_collecting_sink_collects");
}
#[test]
fn test_channel_sink_close_sets_closed_and_ready_errors() {
init_test("test_channel_sink_close_sets_closed_and_ready_errors");
let (mut sink, _stream) = channel(1);
let waker = noop_waker();
let mut context = Context::from_waker(&waker);
let close = Pin::new(&mut sink).poll_close(&mut context);
crate::assert_with_log!(
matches!(close, Poll::Ready(Ok(()))),
"close ok",
true,
matches!(close, Poll::Ready(Ok(())))
);
let ready = Pin::new(&mut sink).poll_ready(&mut context);
crate::assert_with_log!(
matches!(ready, Poll::Ready(Err(SinkError::Closed))),
"ready closed",
true,
matches!(ready, Poll::Ready(Err(SinkError::Closed)))
);
crate::test_complete!("test_channel_sink_close_sets_closed_and_ready_errors");
}
}