use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};
use crate::stream::{BoxStream, NotUsed, OverflowStrategy, Sink, Source, StreamCompletion};
use crate::{StreamError, StreamResult};
use futures::channel::oneshot;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QueueOfferResult {
Enqueued,
Dropped,
QueueClosed,
Failure(StreamError),
}
#[derive(Clone)]
enum TerminalSignal {
Complete,
Error(StreamError),
}
struct BoundedQueueShared<T> {
state: Mutex<BoundedQueueState<T>>,
available: Condvar,
capacity: usize,
}
struct BoundedQueueState<T> {
buffer: VecDeque<T>,
terminal: Option<TerminalSignal>,
}
impl<T> BoundedQueueShared<T> {
fn new(capacity: usize) -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(BoundedQueueState {
buffer: VecDeque::with_capacity(capacity),
terminal: None,
}),
available: Condvar::new(),
capacity,
})
}
}
#[derive(Clone)]
pub struct BoundedSourceQueue<T> {
shared: Arc<BoundedQueueShared<T>>,
}
impl<T> BoundedSourceQueue<T> {
pub fn offer(&self, elem: T) -> QueueOfferResult {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
match &state.terminal {
Some(TerminalSignal::Complete) | Some(TerminalSignal::Error(_)) => {
return QueueOfferResult::QueueClosed;
}
None => {}
}
if state.buffer.len() < self.shared.capacity {
state.buffer.push_back(elem);
self.shared.available.notify_all();
QueueOfferResult::Enqueued
} else {
QueueOfferResult::Dropped
}
}
pub fn complete(&self) {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(TerminalSignal::Complete);
}
drop(state);
self.shared.available.notify_all();
}
pub fn fail(&self, error: StreamError) {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(TerminalSignal::Error(error));
}
drop(state);
self.shared.available.notify_all();
}
}
impl<T> Drop for BoundedSourceQueue<T> {
fn drop(&mut self) {
if Arc::strong_count(&self.shared) != 2 {
return;
}
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(TerminalSignal::Complete);
}
drop(state);
self.shared.available.notify_all();
}
}
struct BoundedQueueStream<T> {
shared: Arc<BoundedQueueShared<T>>,
}
impl<T> Iterator for BoundedQueueStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some(item) = state.buffer.pop_front() {
self.shared.available.notify_all();
return Some(Ok(item));
}
if let Some(terminal) = state.terminal.clone() {
return match terminal {
TerminalSignal::Complete => None,
TerminalSignal::Error(error) => Some(Err(error)),
};
}
state = self
.shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
}
}
impl<T> Drop for BoundedQueueStream<T> {
fn drop(&mut self) {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(TerminalSignal::Complete);
}
self.shared.available.notify_all();
}
}
struct SourceQueueShared<T> {
state: Mutex<SourceQueueState<T>>,
available: Condvar,
capacity: usize,
strategy: OverflowStrategy,
}
struct SourceQueueState<T> {
buffer: VecDeque<T>,
terminal: Option<TerminalSignal>,
terminating: bool,
pending_count: usize,
}
impl<T> SourceQueueShared<T> {
fn new(capacity: usize, strategy: OverflowStrategy) -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(SourceQueueState {
buffer: VecDeque::with_capacity(capacity),
terminal: None,
terminating: false,
pending_count: 0,
}),
available: Condvar::new(),
capacity,
strategy,
})
}
}
pub struct SourceQueue<T> {
shared: Arc<SourceQueueShared<T>>,
completion: Option<StreamCompletion<NotUsed>>,
}
impl<T> Clone for SourceQueue<T> {
fn clone(&self) -> Self {
Self {
shared: Arc::clone(&self.shared),
completion: Some(StreamCompletion::ready(Err(StreamError::Failed(
"cannot clone queue completion handle; use watch_completion() on the original handle"
.into(),
)))),
}
}
}
impl<T: Send + 'static> SourceQueue<T> {
pub fn watch_completion(mut self) -> StreamCompletion<NotUsed> {
self.completion.take().unwrap_or_else(|| {
StreamCompletion::ready(Err(StreamError::Failed(
"queue completion handle already taken".into(),
)))
})
}
pub fn offer(&self, elem: T) -> StreamResult<QueueOfferResult> {
let strategy = self.shared.strategy;
let capacity = self.shared.capacity;
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_some() {
return Ok(QueueOfferResult::QueueClosed);
}
if state.terminating {
return Ok(QueueOfferResult::QueueClosed);
}
if state.buffer.len() < capacity {
state.buffer.push_back(elem);
drop(state);
self.shared.available.notify_all();
return Ok(QueueOfferResult::Enqueued);
}
match strategy {
OverflowStrategy::DropHead => {
let _ = state.buffer.pop_front();
state.buffer.push_back(elem);
drop(state);
self.shared.available.notify_all();
Ok(QueueOfferResult::Enqueued)
}
OverflowStrategy::DropTail => {
let _ = state.buffer.pop_back();
state.buffer.push_back(elem);
drop(state);
self.shared.available.notify_all();
Ok(QueueOfferResult::Enqueued)
}
OverflowStrategy::DropBuffer => {
state.buffer.clear();
state.buffer.push_back(elem);
drop(state);
self.shared.available.notify_all();
Ok(QueueOfferResult::Enqueued)
}
OverflowStrategy::DropNew => Ok(QueueOfferResult::Dropped),
OverflowStrategy::Fail => {
state.buffer.clear();
let error =
StreamError::Failed(format!("Buffer overflow (max capacity was: {capacity})!"));
state.terminal = Some(TerminalSignal::Error(error.clone()));
drop(state);
self.shared.available.notify_all();
Ok(QueueOfferResult::Failure(error))
}
OverflowStrategy::Backpressure => {
if state.pending_count >= 1 {
return Err(StreamError::Failed(
"Too many concurrent offers. Specified maximum is 1. \
You have to wait for the previous offer to resolve to send another request"
.into(),
));
}
state.pending_count += 1;
loop {
if state.terminal.is_some() || state.terminating {
state.pending_count -= 1;
return Ok(QueueOfferResult::QueueClosed);
}
if state.buffer.len() < capacity {
state.pending_count -= 1;
state.buffer.push_back(elem);
drop(state);
self.shared.available.notify_all();
return Ok(QueueOfferResult::Enqueued);
}
state = self
.shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
}
}
}
pub fn complete(&self) {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.buffer.is_empty() && state.pending_count == 0 {
if state.terminal.is_none() {
state.terminal = Some(TerminalSignal::Complete);
}
} else {
state.terminating = true;
}
drop(state);
self.shared.available.notify_all();
}
pub fn fail(&self, error: StreamError) {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(TerminalSignal::Error(error));
}
drop(state);
self.shared.available.notify_all();
}
}
impl<T> Drop for SourceQueue<T> {
fn drop(&mut self) {
if Arc::strong_count(&self.shared) != 2 {
return;
}
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() && !state.terminating {
state.terminal = Some(TerminalSignal::Complete);
}
drop(state);
self.shared.available.notify_all();
}
}
struct SourceQueueStream<T: Send + 'static> {
shared: Arc<SourceQueueShared<T>>,
completion_sender: Option<oneshot::Sender<StreamResult<NotUsed>>>,
}
impl<T: Send + 'static> Iterator for SourceQueueStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some(TerminalSignal::Error(error)) = &state.terminal {
let error = error.clone();
drop(state);
self.signal_completion(Err(error.clone()));
self.shared.available.notify_all();
return Some(Err(error));
}
if let Some(item) = state.buffer.pop_front() {
drop(state);
self.shared.available.notify_all();
return Some(Ok(item));
}
if let Some(terminal) = state.terminal.clone() {
if state.terminating {
state.terminating = false;
}
drop(state);
self.signal_completion(match &terminal {
TerminalSignal::Complete => Ok(NotUsed),
TerminalSignal::Error(error) => Err(error.clone()),
});
self.shared.available.notify_all();
return match terminal {
TerminalSignal::Complete => None,
TerminalSignal::Error(error) => Some(Err(error)),
};
}
if state.terminating && state.buffer.is_empty() && state.pending_count == 0 {
state.terminal = Some(TerminalSignal::Complete);
state.terminating = false;
drop(state);
self.signal_completion(Ok(NotUsed));
self.shared.available.notify_all();
return None;
}
state = self
.shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
}
}
impl<T: Send + 'static> SourceQueueStream<T> {
fn signal_completion(&mut self, result: StreamResult<NotUsed>) {
if let Some(sender) = self.completion_sender.take() {
let _ = sender.send(result);
}
}
}
impl<T: Send + 'static> Drop for SourceQueueStream<T> {
fn drop(&mut self) {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(TerminalSignal::Complete);
}
state.terminating = false;
drop(state);
self.signal_completion(Ok(NotUsed));
self.shared.available.notify_all();
}
}
struct SinkQueueShared<T> {
state: Mutex<SinkQueueState<T>>,
available: Condvar,
}
struct SinkQueueState<T> {
buffer: VecDeque<T>,
error: Option<StreamError>,
completed: bool,
}
impl<T> SinkQueueShared<T> {
fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(SinkQueueState {
buffer: VecDeque::new(),
error: None,
completed: false,
}),
available: Condvar::new(),
})
}
}
pub struct SinkQueue<T> {
shared: Arc<SinkQueueShared<T>>,
_completion: StreamCompletion<NotUsed>,
}
impl<T> SinkQueue<T> {
pub fn pull(&self) -> StreamResult<Option<T>> {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some(item) = state.buffer.pop_front() {
return Ok(Some(item));
}
if let Some(error) = state.error.clone() {
return Err(error);
}
if state.completed {
return Ok(None);
}
state = self
.shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
}
}
impl<T: Send + 'static> Source<T, NotUsed> {
#[must_use]
pub fn queue_bounded(capacity: usize) -> Source<T, BoundedSourceQueue<T>> {
assert!(capacity > 0, "queue capacity must be greater than zero");
Source::from_materialized_factory(move |_materializer| {
let shared = BoundedQueueShared::new(capacity);
let stream: BoxStream<T> = Box::new(BoundedQueueStream {
shared: Arc::clone(&shared),
});
let handle = BoundedSourceQueue { shared };
Ok((stream, handle))
})
}
#[must_use]
pub fn queue(capacity: usize, strategy: OverflowStrategy) -> Source<T, SourceQueue<T>> {
assert!(capacity > 0, "queue capacity must be greater than zero");
Source::from_materialized_factory(move |_materializer| {
let shared = SourceQueueShared::new(capacity, strategy);
let (completion_sender, completion_receiver) = oneshot::channel();
let stream: BoxStream<T> = Box::new(SourceQueueStream {
shared: Arc::clone(&shared),
completion_sender: Some(completion_sender),
});
let handle = SourceQueue {
shared,
completion: Some(StreamCompletion::from_receiver(completion_receiver, None)),
};
Ok((stream, handle))
})
}
}
impl<T: Send + 'static> Sink<T, SinkQueue<T>> {
#[must_use]
pub fn queue() -> Self {
Sink::from_runner(move |mut input, materializer| {
let shared = SinkQueueShared::new();
let worker_shared = Arc::clone(&shared);
let completion = materializer.spawn_stream(move |stream_cancelled| {
loop {
if stream_cancelled.load(Ordering::SeqCst) {
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let mut state = worker_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
state.buffer.push_back(item);
drop(state);
worker_shared.available.notify_all();
}
Some(Err(error)) => {
let mut state = worker_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.error.is_none() {
state.error = Some(error);
}
drop(state);
worker_shared.available.notify_all();
return Ok(NotUsed);
}
None => {
let mut state = worker_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
state.completed = true;
drop(state);
worker_shared.available.notify_all();
return Ok(NotUsed);
}
}
}
});
Ok(SinkQueue {
shared: Arc::clone(&shared),
_completion: completion,
})
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::Materializer;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
#[test]
fn bounded_offer_accepted_vs_processed_distinct() {
let (queue, mut stream) = materialize_bounded_queue(2);
assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
assert_eq!(stream.next(), Some(Ok(2)));
assert_eq!(stream.next(), Some(Ok(3)));
}
#[test]
fn bounded_offer_dropped_when_full() {
let (queue, mut stream) = materialize_bounded_queue(1);
assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(2), QueueOfferResult::Dropped);
assert_eq!(queue.offer(3), QueueOfferResult::Dropped);
queue.complete();
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), None);
}
#[test]
fn bounded_queue_closed_after_complete() {
let (queue, mut stream) = materialize_bounded_queue(2);
assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
queue.complete();
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), None);
assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
}
#[test]
fn bounded_queue_closed_after_fail() {
let (queue, mut stream) = materialize_bounded_queue(2);
assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
queue.fail(StreamError::Failed("boom".into()));
assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
}
#[test]
fn bounded_drop_handle_completes_stream() {
let queue = {
let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
queue.offer(1);
queue.offer(2);
assert_eq!(stream.next(), Some(Ok(1)));
queue
};
assert_eq!(queue.offer(3), QueueOfferResult::QueueClosed);
}
#[test]
fn bounded_drop_last_producer_completes_stream() {
let (queue, mut stream) = materialize_bounded_queue::<i32>(1);
let producer = queue.clone();
let consumer = thread::spawn(move || {
assert_eq!(stream.next(), None);
});
drop(producer);
drop(queue);
consumer.join().unwrap();
}
#[test]
fn bounded_drain_before_complete() {
let (queue, mut stream) = materialize_bounded_queue(3);
assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(4), QueueOfferResult::Dropped);
assert_eq!(queue.offer(5), QueueOfferResult::Dropped);
queue.complete();
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), Some(Ok(2)));
assert_eq!(stream.next(), Some(Ok(3)));
assert_eq!(stream.next(), None);
}
#[test]
fn bounded_terminal_completion_is_sticky() {
let (queue, mut stream) = materialize_bounded_queue(2);
queue.offer(1);
queue.complete();
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), None);
assert_eq!(stream.next(), None);
assert_eq!(stream.next(), None);
}
#[test]
fn bounded_terminal_failure_is_sticky() {
let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
queue.fail(StreamError::Failed("boom".into()));
assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
}
#[test]
fn bounded_producer_consumer_across_threads() {
let (queue, mut stream) = materialize_bounded_queue::<i32>(16);
let consumer = thread::spawn(move || {
let mut collected = Vec::new();
while let Some(Ok(item)) = stream.next() {
collected.push(item);
}
collected
});
let producer = thread::spawn({
let queue = queue.clone();
move || {
for i in 0..10 {
assert_eq!(queue.offer(i), QueueOfferResult::Enqueued);
}
queue.complete();
}
});
producer.join().unwrap();
let collected = consumer.join().unwrap();
assert_eq!(collected, (0..10).collect::<Vec<_>>());
}
#[test]
fn bounded_multi_producer_single_consumer() {
let n = 50_i32;
let producer_count = 4;
let total = (producer_count * n) as usize;
let (queue, mut stream) = materialize_bounded_queue::<i32>(total);
let consumer = thread::spawn(move || {
let mut collected = Vec::new();
while let Some(Ok(item)) = stream.next() {
collected.push(item);
}
collected
});
let mut handles = Vec::new();
for p in 0..producer_count {
let q = queue.clone();
handles.push(thread::spawn(move || {
for i in 0..n {
assert_eq!(q.offer(p * n + i), QueueOfferResult::Enqueued);
}
}));
}
for h in handles {
h.join().unwrap();
}
queue.complete();
let mut collected = consumer.join().unwrap();
collected.sort_unstable();
assert_eq!(collected.len(), total);
collected.sort_unstable();
assert_eq!(collected.len(), (producer_count * n) as usize);
}
fn materialize_source_queue<T: Send + 'static>(
capacity: usize,
strategy: OverflowStrategy,
) -> (SourceQueue<T>, BoxStream<T>) {
let materializer = Materializer::new();
let (stream, queue) = Source::<T>::queue(capacity, strategy)
.factory
.create(&materializer)
.unwrap();
(queue, stream)
}
#[test]
fn source_queue_offer_enqueued() {
let (queue, mut stream) =
materialize_source_queue::<i32>(2, OverflowStrategy::Backpressure);
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), Some(Ok(2)));
}
#[test]
fn source_queue_drop_head() {
let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropHead);
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
queue.complete();
assert_eq!(stream.next(), Some(Ok(2)));
assert_eq!(stream.next(), Some(Ok(3)));
assert_eq!(stream.next(), None);
}
#[test]
fn source_queue_drop_tail() {
let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropTail);
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
queue.complete();
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), Some(Ok(3)));
assert_eq!(stream.next(), None);
}
#[test]
fn source_queue_drop_buffer() {
let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropBuffer);
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
queue.complete();
assert_eq!(stream.next(), Some(Ok(3)));
assert_eq!(stream.next(), None);
}
#[test]
fn source_queue_drop_new() {
let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropNew);
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Dropped);
queue.complete();
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), Some(Ok(2)));
assert_eq!(stream.next(), None);
}
#[test]
fn source_queue_fail_strategy() {
let (queue, _stream) = materialize_source_queue::<i32>(2, OverflowStrategy::Fail);
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
match queue.offer(3).unwrap() {
QueueOfferResult::Failure(e) => {
assert!(format!("{e:?}").contains("Buffer overflow"));
}
other => panic!("expected Failure, got {other:?}"),
}
}
#[test]
fn source_queue_backpressure_blocks() {
let (queue, mut stream) =
materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
let consumed = Arc::new(AtomicBool::new(false));
let c = Arc::clone(&consumed);
let (release_tx, release_rx) = mpsc::channel();
let consumer = thread::spawn(move || {
assert_eq!(stream.next(), Some(Ok(1)));
c.store(true, Ordering::SeqCst);
release_rx.recv().unwrap();
});
wait_until(Duration::from_secs(1), || consumed.load(Ordering::SeqCst));
assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
release_tx.send(()).unwrap();
consumer.join().unwrap();
assert!(consumed.load(Ordering::SeqCst));
}
#[test]
fn source_queue_concurrent_offer_violation() {
let (queue, mut stream) =
materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
let q = queue.clone();
let started = Arc::new(AtomicBool::new(false));
let s = Arc::clone(&started);
let blocker = thread::spawn(move || {
s.store(true, Ordering::SeqCst);
let _ = q.offer(2);
});
while !started.load(Ordering::SeqCst) {
thread::yield_now();
}
wait_until(Duration::from_secs(1), || {
queue
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner())
.pending_count
== 1
});
let result = queue.offer(3);
assert!(result.is_err());
assert!(format!("{result:?}").contains("Too many concurrent offers"));
assert_eq!(stream.next(), Some(Ok(1)));
blocker.join().unwrap();
}
#[test]
fn source_queue_watch_completion() {
let (queue, mut stream) =
materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
queue.offer(1).unwrap();
queue.complete();
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), None);
assert_eq!(queue.watch_completion().wait(), Ok(NotUsed));
}
#[test]
fn source_queue_watch_completion_on_failure() {
let (queue, mut stream) =
materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
queue.fail(StreamError::Failed("boom".into()));
assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
assert_eq!(
queue.watch_completion().wait(),
Err(StreamError::Failed("boom".into()))
);
}
#[test]
fn source_queue_terminal_completion_is_sticky() {
let (queue, mut stream) =
materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
queue.complete();
assert_eq!(stream.next(), None);
assert_eq!(stream.next(), None);
assert_eq!(stream.next(), None);
}
#[test]
fn source_queue_offer_after_complete_returns_queue_closed() {
let (queue, _stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
queue.complete();
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::QueueClosed);
}
#[test]
fn source_queue_drop_stream_closes_queue() {
let (queue, stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
queue.offer(1).unwrap();
drop(stream);
assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::QueueClosed);
}
#[test]
fn source_queue_drop_last_producer_completes_stream() {
let (queue, mut stream) =
materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
let producer = queue.clone();
let consumer = thread::spawn(move || {
assert_eq!(stream.next(), None);
});
drop(producer);
drop(queue);
consumer.join().unwrap();
}
#[test]
fn source_queue_backpressure_unblocks_on_complete() {
let (queue, mut stream) =
materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
let q = queue.clone();
let blocker = thread::spawn(move || {
q.offer(2)
});
wait_until(Duration::from_secs(1), || {
queue
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner())
.pending_count
== 1
});
queue.complete();
let result = blocker.join().unwrap();
assert_eq!(result.unwrap(), QueueOfferResult::QueueClosed);
assert_eq!(stream.next(), Some(Ok(1)));
assert_eq!(stream.next(), None);
}
fn materialize_sink_queue<T: Send + 'static>(source: Source<T>) -> SinkQueue<T> {
source.run_with(Sink::queue()).unwrap()
}
#[test]
fn sink_queue_pull_elements() {
let queue = materialize_sink_queue(Source::from_iter([1, 2, 3]));
assert_eq!(queue.pull().unwrap(), Some(1));
assert_eq!(queue.pull().unwrap(), Some(2));
assert_eq!(queue.pull().unwrap(), Some(3));
assert_eq!(queue.pull().unwrap(), None);
}
#[test]
fn sink_queue_pull_none_after_upstream_completion() {
let queue = materialize_sink_queue(Source::from_iter([42]));
assert_eq!(queue.pull().unwrap(), Some(42));
assert_eq!(queue.pull().unwrap(), None);
assert_eq!(queue.pull().unwrap(), None);
}
#[test]
fn sink_queue_pull_error_from_upstream_failure() {
let queue =
materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
assert_eq!(
queue.pull().unwrap_err(),
StreamError::Failed("boom".into())
);
assert_eq!(
queue.pull().unwrap_err(),
StreamError::Failed("boom".into())
);
}
#[test]
fn sink_queue_terminal_stickiness() {
let queue = materialize_sink_queue(Source::<i32>::empty());
assert_eq!(queue.pull().unwrap(), None);
assert_eq!(queue.pull().unwrap(), None);
assert_eq!(queue.pull().unwrap(), None);
}
#[test]
fn sink_queue_terminal_failure_stickiness() {
let queue =
materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
assert_eq!(
queue.pull().unwrap_err(),
StreamError::Failed("boom".into())
);
assert_eq!(
queue.pull().unwrap_err(),
StreamError::Failed("boom".into())
);
}
#[test]
fn sink_queue_drop_cancels_upstream() {
let queue = materialize_sink_queue(Source::repeat(42));
drop(queue);
}
#[test]
fn sink_queue_drain_multiple_items() {
let queue = materialize_sink_queue(Source::from_iter(0..100));
for i in 0..100 {
assert_eq!(queue.pull().unwrap(), Some(i));
}
assert_eq!(queue.pull().unwrap(), None);
}
fn materialize_bounded_queue<T: Send + 'static>(
capacity: usize,
) -> (BoundedSourceQueue<T>, BoxStream<T>) {
let materializer = Materializer::new();
let (stream, queue) = Source::<T>::queue_bounded(capacity)
.factory
.create(&materializer)
.unwrap();
(queue, stream)
}
fn wait_until(timeout: Duration, condition: impl Fn() -> bool) {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return;
}
thread::yield_now();
thread::sleep(Duration::from_millis(1));
}
assert!(condition(), "condition was not met within {timeout:?}");
}
}