use std::{
collections::{BTreeMap, HashMap, VecDeque},
fmt,
future::Future,
hash::Hash,
marker::PhantomData,
panic::{AssertUnwindSafe, catch_unwind},
pin::Pin,
sync::{
Arc, Condvar, Mutex, OnceLock,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
task::{Context, Poll},
thread,
time::Duration,
};
use futures::{channel::oneshot, executor::block_on};
use thiserror::Error;
use tokio::{
runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
task::{JoinError, JoinHandle},
};
pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
pub(crate) type RuntimeTransform<In, Out> =
Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
type HintedSinkRunner<In, Mat> =
dyn Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat> + Send + Sync;
type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
const STREAM_READY_SPINS: usize = 256;
const STREAM_SPIN_BACKOFF: usize = 8;
const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct InlineMicroSourceHint {
max_success_items: usize,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
struct SourceHints {
inline_head_terminal: bool,
inline_micro: Option<InlineMicroSourceHint>,
terminal_consumer_batch: bool,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) struct SourceRuntimeHints {
pub(crate) inline_micro_max_success_items: Option<usize>,
pub(crate) terminal_consumer_batch: bool,
}
impl SourceHints {
const fn with_inline_micro(max_success_items: usize) -> Self {
Self {
inline_head_terminal: true,
inline_micro: Some(InlineMicroSourceHint { max_success_items }),
terminal_consumer_batch: true,
}
}
const fn with_terminal_consumer_batch() -> Self {
Self {
inline_head_terminal: false,
inline_micro: None,
terminal_consumer_batch: true,
}
}
fn after_flow(self, flow: FlowHints) -> Self {
if flow.preserves_inline_head_terminal {
Self {
inline_head_terminal: true,
inline_micro: None,
terminal_consumer_batch: self.terminal_consumer_batch
&& flow.preserves_terminal_consumer_batch,
}
} else {
Self {
inline_head_terminal: false,
inline_micro: None,
terminal_consumer_batch: self.terminal_consumer_batch
&& flow.preserves_terminal_consumer_batch,
}
}
}
fn without_inline_micro(self) -> Self {
Self {
inline_head_terminal: self.inline_head_terminal,
inline_micro: None,
terminal_consumer_batch: self.terminal_consumer_batch,
}
}
fn runtime(self) -> SourceRuntimeHints {
SourceRuntimeHints {
inline_micro_max_success_items: self.inline_micro.map(|hint| hint.max_success_items),
terminal_consumer_batch: self.terminal_consumer_batch,
}
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
struct FlowHints {
preserves_inline_head_terminal: bool,
preserves_terminal_consumer_batch: bool,
}
impl FlowHints {
const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
preserves_inline_head_terminal: true,
preserves_terminal_consumer_batch: true,
};
const PRESERVES_TERMINAL_CONSUMER_BATCH: Self = Self {
preserves_inline_head_terminal: false,
preserves_terminal_consumer_batch: true,
};
fn then(self, next: Self) -> Self {
Self {
preserves_inline_head_terminal: self.preserves_inline_head_terminal
&& next.preserves_inline_head_terminal,
preserves_terminal_consumer_batch: self.preserves_terminal_consumer_batch
&& next.preserves_terminal_consumer_batch,
}
}
}
struct PartitionSlot<Key, Out> {
key: Option<Key>,
active: usize,
queued: VecDeque<(usize, Out)>,
in_ready_queue: bool,
}
struct AbortOnDropHandle<T> {
handle: JoinHandle<T>,
}
impl<T> AbortOnDropHandle<T> {
fn new(handle: JoinHandle<T>) -> Self {
Self { handle }
}
}
impl<T> Drop for AbortOnDropHandle<T> {
fn drop(&mut self) {
self.handle.abort();
}
}
impl<T> Future for AbortOnDropHandle<T> {
type Output = Result<T, JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handle).poll(cx)
}
}
impl<T> Unpin for AbortOnDropHandle<T> {}
pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
RUNTIME.get_or_init(|| {
TokioRuntimeBuilder::new_multi_thread()
.enable_all()
.thread_name("datum-stream-tokio")
.build()
.expect("stream tokio runtime")
})
}
fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
where
Fut: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
}
pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
runtime::current_stream_cancelled()
}
pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
where
F: FnOnce() -> T,
{
catch_unwind(AssertUnwindSafe(f))
.map_err(|_| StreamError::Failed(format!("{context} panicked")))
}
impl<Key, Out> PartitionSlot<Key, Out> {
fn new(key: Key) -> Self {
Self {
key: Some(key),
active: 0,
queued: VecDeque::new(),
in_ready_queue: false,
}
}
}
#[inline(always)]
fn partition_slot_for<Key, Out>(
key: Key,
slots_by_key: &mut HashMap<Key, usize>,
slots: &mut Vec<PartitionSlot<Key, Out>>,
free_slots: &mut Vec<usize>,
) -> usize
where
Key: Clone + Eq + Hash,
{
if let Some(slot) = slots_by_key.get(&key) {
return *slot;
}
let slot = if let Some(slot) = free_slots.pop() {
let state = &mut slots[slot];
state.key = Some(key.clone());
state.active = 0;
state.queued.clear();
state.in_ready_queue = false;
slot
} else {
slots.push(PartitionSlot::new(key.clone()));
slots.len() - 1
};
slots_by_key.insert(key, slot);
slot
}
#[inline(always)]
fn retire_partition_slot<Key, Out>(
slot: usize,
slots_by_key: &mut HashMap<Key, usize>,
slots: &mut [PartitionSlot<Key, Out>],
free_slots: &mut Vec<usize>,
) where
Key: Eq + Hash,
{
let state = &mut slots[slot];
if let Some(key) = state.key.take() {
slots_by_key.remove(&key);
}
state.active = 0;
state.queued.clear();
state.in_ready_queue = false;
free_slots.push(slot);
}
#[inline(always)]
fn ready_partition_slot<Key, Out>(
slots: &mut [PartitionSlot<Key, Out>],
ready_slots: &mut VecDeque<usize>,
slot: usize,
per_partition: usize,
) {
if let Some(state) = slots.get_mut(slot)
&& state.key.is_some()
&& !state.in_ready_queue
&& state.active < per_partition
&& !state.queued.is_empty()
{
state.in_ready_queue = true;
ready_slots.push_back(slot);
}
}
#[inline(always)]
fn pop_ready_partition_slot<Key, Out>(
slots: &mut [PartitionSlot<Key, Out>],
ready_slots: &mut VecDeque<usize>,
per_partition: usize,
) -> Option<(usize, usize, Out)> {
while let Some(slot) = ready_slots.pop_front() {
let mut requeue = false;
let item = if let Some(state) = slots.get_mut(slot) {
state.in_ready_queue = false;
if state.key.is_some() && state.active < per_partition {
let item = state.queued.pop_front().map(|(index, item)| {
state.active += 1;
(index, slot, item)
});
if !state.queued.is_empty() && state.active < per_partition {
state.in_ready_queue = true;
requeue = true;
}
item
} else {
None
}
} else {
None
};
if requeue {
ready_slots.push_back(slot);
}
if item.is_some() {
return item;
}
}
None
}
pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
}
struct FnSourceFactory<F>(F);
impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
where
F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
{
fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
(self.0)(materializer)
}
}
struct MapSourceFactory<In, Out, Mat, F> {
source: Arc<dyn SourceFactory<In, Mat>>,
stage: F,
_marker: PhantomData<fn(In) -> Out>,
}
impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
where
In: Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
F: Fn(In) -> Out + Send + Sync + 'static,
{
fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
Ok((
Box::new(MapSourceStream {
input: stream,
factory: self,
}),
mat,
))
}
}
struct MapSourceStream<In, Out, Mat, F> {
input: BoxStream<In>,
factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
}
impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
where
F: Fn(In) -> Out,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
self.input
.next()
.map(|item| item.map(|item| (self.factory.stage)(item)))
}
}
fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
where
Out: Send + 'static,
{
let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
let mut current = 0usize;
Box::new(std::iter::from_fn(move || {
loop {
let index = next_active_optional_stream(&streams, current, |_| true)?;
current = (index + 1) % streams.len().max(1);
let Some(stream) = streams[index].as_mut() else {
continue;
};
match stream.next() {
Some(item) => return Some(item),
None => {
streams[index] = None;
if eager_complete {
return None;
}
}
}
}
}))
}
fn merge_prioritized_streams<Out>(
streams: Vec<BoxStream<Out>>,
priorities: Vec<usize>,
eager_complete: bool,
) -> BoxStream<Out>
where
Out: Send + 'static,
{
let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
let schedule: Vec<usize> = priorities
.into_iter()
.enumerate()
.flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
.collect();
let mut schedule_index = 0usize;
Box::new(std::iter::from_fn(move || {
loop {
if streams.iter().all(Option::is_none) {
return None;
}
let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
let Some(stream) = streams[index].as_mut() else {
continue;
};
match stream.next() {
Some(item) => return Some(item),
None => {
streams[index] = None;
if eager_complete {
return None;
}
}
}
}
}))
}
fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
where
Out: Ord + Send + 'static,
{
let mut left_next: Option<Out> = None;
let mut right_next: Option<Out> = None;
let mut left_done = false;
let mut right_done = false;
Box::new(std::iter::from_fn(move || {
loop {
if left_next.is_none() && !left_done {
match left.next() {
Some(Ok(item)) => left_next = Some(item),
Some(Err(error)) => return Some(Err(error)),
None => left_done = true,
}
}
if right_next.is_none() && !right_done {
match right.next() {
Some(Ok(item)) => right_next = Some(item),
Some(Err(error)) => return Some(Err(error)),
None => right_done = true,
}
}
let next = match (&left_next, &right_next) {
(Some(left_item), Some(right_item)) => {
if left_item <= right_item {
left_next.take()
} else {
right_next.take()
}
}
(Some(_), None) if right_done => left_next.take(),
(None, Some(_)) if left_done => right_next.take(),
(None, None) if left_done && right_done => return None,
_ => continue,
};
if let Some(item) = next {
return Some(Ok(item));
}
}
}))
}
fn merge_latest_streams<Out>(
streams: Vec<BoxStream<Out>>,
eager_complete: bool,
) -> BoxStream<Vec<Out>>
where
Out: Clone + Send + 'static,
{
let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
let mut latest = vec![None; streams.len()];
let mut seen = 0usize;
let mut current = 0usize;
let mut pending = VecDeque::<Vec<Out>>::new();
Box::new(std::iter::from_fn(move || {
loop {
if let Some(output) = pending.pop_front() {
return Some(Ok(output));
}
if streams.iter().all(Option::is_none) {
return None;
}
let index = next_active_optional_stream(&streams, current, |_| true)?;
current = (index + 1) % streams.len().max(1);
let Some(stream) = streams[index].as_mut() else {
continue;
};
match stream.next() {
Some(Ok(item)) => {
if latest[index].is_none() {
seen += 1;
}
latest[index] = Some(item);
if seen == latest.len() {
pending.push_back(
latest
.iter()
.map(|item| item.clone().expect("merge-latest initialized"))
.collect(),
);
}
}
Some(Err(error)) => return Some(Err(error)),
None => {
streams[index] = None;
if eager_complete {
return None;
}
}
}
}
}))
}
fn zip_streams<Left, Right>(
mut left: BoxStream<Left>,
mut right: BoxStream<Right>,
) -> BoxStream<(Left, Right)>
where
Left: Send + 'static,
Right: Send + 'static,
{
let mut left_next: Option<Left> = None;
let mut right_next: Option<Right> = None;
let mut left_done = false;
let mut right_done = false;
Box::new(std::iter::from_fn(move || {
loop {
if left_next.is_none() && !left_done {
match left.next() {
Some(Ok(item)) => left_next = Some(item),
Some(Err(error)) => return Some(Err(error)),
None => left_done = true,
}
}
if right_next.is_none() && !right_done {
match right.next() {
Some(Ok(item)) => right_next = Some(item),
Some(Err(error)) => return Some(Err(error)),
None => right_done = true,
}
}
match (left_next.take(), right_next.take()) {
(Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
(left_item, right_item) => {
left_next = left_item;
right_next = right_item;
if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
return None;
}
}
}
}
}))
}
fn zip_latest_with_stream<Left, Right, Out, F>(
mut left: BoxStream<Left>,
mut right: BoxStream<Right>,
eager_complete: bool,
combine: Arc<F>,
) -> BoxStream<Out>
where
Left: Clone + Send + 'static,
Right: Clone + Send + 'static,
Out: Send + 'static,
F: Fn(Left, Right) -> Out + Send + Sync + 'static,
{
let mut left_latest: Option<Left> = None;
let mut right_latest: Option<Right> = None;
let mut left_done = false;
let mut right_done = false;
let mut turn_left = true;
let mut pending = VecDeque::<Out>::new();
Box::new(std::iter::from_fn(move || {
loop {
if let Some(output) = pending.pop_front() {
return Some(Ok(output));
}
if eager_complete && (left_done || right_done) {
return None;
}
if left_done && right_done {
return None;
}
if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
return None;
}
let pull_left = if left_done {
false
} else if right_done {
true
} else {
let value = turn_left;
turn_left = !turn_left;
value
};
if pull_left {
match left.next() {
Some(Ok(item)) => {
left_latest = Some(item);
if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
pending.push_back(combine(left_item.clone(), right_item.clone()));
}
}
Some(Err(error)) => return Some(Err(error)),
None => {
left_done = true;
if eager_complete {
return None;
}
}
}
} else {
match right.next() {
Some(Ok(item)) => {
right_latest = Some(item);
if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
pending.push_back(combine(left_item.clone(), right_item.clone()));
}
}
Some(Err(error)) => return Some(Err(error)),
None => {
right_done = true;
if eager_complete {
return None;
}
}
}
}
}
}))
}
fn zip_all_stream<Left, Right>(
mut left: BoxStream<Left>,
mut right: BoxStream<Right>,
left_fill: Left,
right_fill: Right,
) -> BoxStream<(Left, Right)>
where
Left: Clone + Send + 'static,
Right: Clone + Send + 'static,
{
let mut left_done = false;
let mut right_done = false;
Box::new(std::iter::from_fn(move || {
if left_done && right_done {
return None;
}
let left_item = if left_done {
None
} else {
match left.next() {
Some(Ok(item)) => Some(item),
Some(Err(error)) => return Some(Err(error)),
None => {
left_done = true;
None
}
}
};
let right_item = if right_done {
None
} else {
match right.next() {
Some(Ok(item)) => Some(item),
Some(Err(error)) => return Some(Err(error)),
None => {
right_done = true;
None
}
}
};
match (left_item, right_item) {
(None, None) if left_done && right_done => None,
(Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
(Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
(None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
(None, None) => None,
}
}))
}
fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
where
Out: Send + 'static,
Next: Send + 'static,
F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
{
let count = streams.len();
if count == 0 {
return Box::new(std::iter::empty());
}
let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
let mut current = 0usize;
Box::new(std::iter::from_fn(move || {
loop {
if slots.iter().all(Option::is_some) {
let values = slots
.iter_mut()
.map(|slot| slot.take().expect("zip-n slot filled"))
.collect();
return Some(Ok(zipper(values)));
}
let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
current = (index + 1) % count.max(1);
let Some(stream) = streams[index].as_mut() else {
continue;
};
match stream.next() {
Some(Ok(item)) => slots[index] = Some(item),
Some(Err(error)) => return Some(Err(error)),
None => {
streams[index] = None;
slots[index].as_ref()?;
}
}
}
}))
}
fn next_active_optional_stream<T, F>(
streams: &[Option<BoxStream<T>>],
current: usize,
predicate: F,
) -> Option<usize>
where
T: Send + 'static,
F: Fn(usize) -> bool,
{
if streams.is_empty() {
return None;
}
for offset in 0..streams.len() {
let index = (current + offset) % streams.len();
if streams[index].is_some() && predicate(index) {
return Some(index);
}
}
None
}
fn next_weighted_stream<T>(
streams: &[Option<BoxStream<T>>],
schedule: &[usize],
schedule_index: &mut usize,
) -> Option<usize>
where
T: Send + 'static,
{
if streams.is_empty() || schedule.is_empty() {
return None;
}
for _ in 0..schedule.len() {
let index = schedule[*schedule_index % schedule.len()];
*schedule_index = (*schedule_index + 1) % schedule.len();
if streams.get(index).is_some_and(Option::is_some) {
return Some(index);
}
}
None
}
pub(crate) mod async_boundary;
mod completion;
mod error;
mod flow;
mod rate;
mod restart;
mod runtime;
mod sink;
mod source;
mod time;
mod timer;
pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
}
pub(crate) trait TerminalSourceHookDyn<In>: Send + Sync + 'static {
fn drain_terminal_batch(
&self,
materializer: &Materializer,
cancelled: &Arc<AtomicBool>,
batch: &mut Vec<In>,
) -> StreamResult<TerminalSourceStatus>;
fn cancel_terminal(&self) {}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum TerminalSourceStatus {
Active,
Completed,
}
pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
fn try_register(
&self,
hook: Arc<dyn SplitSegmentHookDyn>,
) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
fn supports_terminal_drain(&self) -> bool {
false
}
fn try_register_terminal_drain(
&self,
_hook: Arc<dyn TerminalSourceHookDyn<In>>,
_materializer: &Materializer,
) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
None
}
}
use self::runtime::{runtime_checked_stream, set_current_stream_cancelled};
pub(crate) use self::completion::StreamCancellation;
pub use self::{
completion::{Cancellable, StreamCompletion},
error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
flow::{BidiFlow, Flow},
rate::{AggregateTimer, OverflowStrategy},
restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
runtime::{Materializer, Runtime},
sink::{RunnableGraph, Sink, SinkCombineStrategy},
source::{Demand, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy},
time::{DelayOverflowStrategy, ThrottleMode},
};
#[cfg(test)]
mod tests {
use super::*;
use crate::Attributes;
use crate::testkit::TestSink;
use std::fs;
use std::sync::{
Arc as StdArc,
atomic::{
AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
},
mpsc,
};
use std::time::Duration as StdDuration;
use std::time::Instant;
fn wait<T>(completion: StreamCompletion<T>) -> T {
completion.wait().unwrap()
}
fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::sleep(Duration::from_millis(2));
}
condition()
}
fn linux_thread_count(thread_name: &str) -> usize {
fs::read_dir("/proc/self/task")
.expect("task directory readable")
.filter_map(Result::ok)
.filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
.filter(|name| name.trim() == thread_name)
.count()
}
#[test]
fn source_async_boundary_preserves_results() {
let expected = Source::from_iter(0_u64..128)
.map(|item| item.wrapping_add(1))
.filter(|item| item % 3 != 0)
.map(|item| item * 2)
.run_collect()
.unwrap();
let actual = Source::from_iter(0_u64..128)
.map(|item| item.wrapping_add(1))
.async_boundary()
.filter(|item| item % 3 != 0)
.map(|item| item * 2)
.run_collect()
.unwrap();
assert_eq!(actual, expected);
}
#[test]
fn flow_async_boundary_preserves_results() {
let expected = Source::from_iter(0_u64..128)
.map(|item| item + 1)
.map(|item| item * 3)
.run_collect()
.unwrap();
let flow = Flow::identity()
.map(|item: u64| item + 1)
.r#async()
.map(|item| item * 3);
let actual = Source::from_iter(0_u64..128)
.via(flow)
.run_collect()
.unwrap();
assert_eq!(actual, expected);
}
#[test]
fn linear_async_boundary_matches_graph_async_boundary_shape() {
use crate::{
AsyncBoundary, AsyncBoundaryExecutionConfig, FusedExecutionConfig, GraphDsl,
GraphFlowShape, MapStage,
};
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: u64| item + 1));
let boundary = builder.add(AsyncBoundary::<u64>::new());
let second = builder.add(MapStage::new(|item: u64| item * 2));
builder.connect(first.outlet(), boundary.inlet())?;
builder.connect(boundary.outlet(), second.inlet())?;
Ok(GraphFlowShape::new(first.inlet(), second.outlet()))
})
.unwrap();
let linear = Source::from_iter(1_u64..=4)
.map(|item| item + 1)
.async_boundary_with_buffer(4)
.map(|item| item * 2)
.run_collect()
.unwrap();
let graph_output = graph.run_with_input(1_u64..=4).unwrap();
let report = graph
.run_async_boundary_count_with_input_report(
1_u64..=4,
AsyncBoundaryExecutionConfig {
fused: FusedExecutionConfig { event_limit: 1024 },
buffer_size: 4,
},
)
.unwrap();
assert_eq!(linear, graph_output);
assert_eq!(report.result, linear.len());
assert_eq!(report.async_boundary_crossings, linear.len());
}
#[test]
fn async_boundary_regions_run_concurrently() {
let (upstream_tx, upstream_rx) = mpsc::channel::<u64>();
let (downstream_blocked_tx, downstream_blocked_rx) = mpsc::channel::<()>();
let (release_tx, release_rx) = mpsc::channel::<()>();
let release_rx = StdArc::new(Mutex::new(release_rx));
let completion = Source::from_iter(0_u64..3)
.map(move |item| {
upstream_tx.send(item).expect("upstream probe receives");
item
})
.async_boundary_with_buffer(1)
.map({
let release_rx = StdArc::clone(&release_rx);
move |item| {
if item == 0 {
downstream_blocked_tx
.send(())
.expect("downstream probe receives");
release_rx
.lock()
.expect("release receiver lock")
.recv_timeout(StdDuration::from_secs(2))
.expect("downstream release arrives");
}
item
}
})
.run_with(Sink::collect())
.unwrap();
assert_eq!(
downstream_blocked_rx.recv_timeout(StdDuration::from_secs(2)),
Ok(())
);
assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
release_tx.send(()).expect("release downstream");
assert_eq!(completion.wait().unwrap(), vec![0, 1, 2]);
}
#[test]
fn async_boundary_backpressures_slow_downstream() {
let (produced_tx, produced_rx) = mpsc::channel::<u64>();
let (release_tx, release_rx) = mpsc::channel::<()>();
let release_rx = StdArc::new(Mutex::new(release_rx));
let completion = Source::from_iter(0_u64..8)
.map(move |item| {
produced_tx.send(item).expect("producer probe receives");
item
})
.async_boundary_with_buffer(1)
.map({
let release_rx = StdArc::clone(&release_rx);
move |item| {
if item == 0 {
release_rx
.lock()
.expect("release receiver lock")
.recv_timeout(StdDuration::from_secs(2))
.expect("downstream release arrives");
}
item
}
})
.run_with(Sink::collect())
.unwrap();
assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
if let Ok(item) = produced_rx.recv_timeout(StdDuration::from_millis(100)) {
assert_eq!(item, 2);
}
match produced_rx.recv_timeout(StdDuration::from_millis(100)) {
Err(mpsc::RecvTimeoutError::Timeout) => {}
other => panic!("async boundary handoff was not bounded: {other:?}"),
}
release_tx.send(()).expect("release downstream");
assert_eq!(completion.wait().unwrap(), (0_u64..8).collect::<Vec<_>>());
}
#[test]
fn source_blueprints_are_reusable() {
let source = Source::from_iter(0..5).map(|item| item + 1);
assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
}
#[test]
fn source_map_preserves_materialized_value() {
let graph = Source::single(1)
.map_materialized_value(|_| "source")
.map(|item| item + 1)
.to_mat(Sink::head(), Keep::both);
let materialized = graph.run().unwrap();
assert_eq!(materialized.0, "source");
assert_eq!(wait(materialized.1), 2);
}
#[test]
fn source_and_flow_compose() {
let flow = Flow::identity()
.map(|item: i32| item * 2)
.filter(|item| item % 3 == 0);
let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
assert_eq!(result, vec![0, 6, 12]);
}
#[test]
fn sink_setup_sees_materializer_defaults_and_local_attributes() {
let observed = StdArc::new(Mutex::new(None));
let observed_in_setup = StdArc::clone(&observed);
let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
*observed_in_setup.lock().unwrap() = Some((
attrs.name().map(str::to_owned),
attrs.input_buffer_hint(),
attrs.dispatcher_hint().map(str::to_owned),
));
Sink::ignore()
})
.add_attributes(Attributes::named("sink-inner"))
.add_attributes(Attributes::input_buffer(4, 4))
.add_attributes(Attributes::dispatcher("bench-dispatcher"));
let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
wait(
Source::from_iter([1, 2, 3])
.run_with_materializer(sink, &materializer)
.unwrap(),
);
assert_eq!(
*observed.lock().unwrap(),
Some((
Some("sink-inner".to_owned()),
Some((4, 4)),
Some("bench-dispatcher".to_owned())
))
);
}
#[test]
fn sink_pre_materialize_feeds_existing_materialization() {
let materializer = Materializer::new();
let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
.pre_materialize(&materializer)
.unwrap();
Source::from_iter([1, 2, 3])
.run_with_materializer(pre, &materializer)
.unwrap();
assert_eq!(wait(completion), vec![1, 2, 3]);
}
#[test]
fn flow_from_sink_and_source_connects_both_sides() {
assert_eq!(
Source::from_iter([1, 2, 3])
.via(Flow::from_sink_and_source(
Sink::foreach(|_item: i32| {}),
Source::from_iter([10, 20, 30]),
))
.run_collect()
.unwrap(),
vec![10, 20, 30]
);
}
#[test]
fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
let completed = StdArc::new(StdAtomicBool::new(false));
let on_complete = StdArc::clone(&completed);
let flow = Flow::from_sink_and_source(
Sink::on_complete(move || {
on_complete.store(true, StdOrdering::SeqCst);
}),
Source::single(10),
);
let result = Source::from_iter([1, 2, 3])
.via(flow)
.run_collect()
.unwrap();
assert_eq!(result, vec![10]);
assert!(wait_until(StdDuration::from_secs(1), || {
completed.load(StdOrdering::SeqCst)
}));
}
#[test]
fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
let cancellable = StdArc::new(Mutex::new(None));
let observed = StdArc::clone(&cancellable);
let flow = Flow::from_sink_and_source_coupled(
Sink::ignore(),
Source::tick(
StdDuration::from_millis(50),
StdDuration::from_millis(50),
10,
)
.map_materialized_value(move |handle| {
*observed.lock().unwrap() = Some(handle.clone());
handle
}),
);
let completion = Source::from_iter(std::iter::empty::<i32>())
.via(flow)
.run_with(Sink::ignore())
.unwrap();
assert!(wait_until(StdDuration::from_secs(1), || {
cancellable
.lock()
.unwrap()
.as_ref()
.is_some_and(Cancellable::is_cancelled)
}));
assert_eq!(wait(completion), NotUsed);
}
#[test]
fn bidi_flow_join_and_atop_compose() {
let codec = BidiFlow::from_flows(
Flow::identity().map(|item: i32| item + 1),
Flow::identity().map(|item: i32| item * 2),
)
.named("codec");
let framing = BidiFlow::from_flows(
Flow::identity().map(|item: i32| item * 3),
Flow::identity().map(|item: i32| item - 4),
);
let joined = codec
.clone()
.join(Flow::identity().map(|item: i32| item - 5));
let stacked = codec.atop(framing).join(Flow::identity());
assert_eq!(
Source::single(10).via(joined).run_collect().unwrap(),
vec![12]
);
assert_eq!(
Source::single(10).via(stacked).run_collect().unwrap(),
vec![58]
);
}
#[test]
fn flow_buffer_then_map_runs_end_to_end() {
let flow = Flow::identity()
.buffer(8, OverflowStrategy::Backpressure)
.map(|item: i32| item + 1);
let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
assert_eq!(result, vec![1, 2, 3, 4]);
}
#[test]
fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
fn buffered_flow() -> Flow<i32, i32> {
Flow::identity().buffer(8, OverflowStrategy::Backpressure)
}
assert_eq!(
Source::from_iter(0..4)
.via(buffered_flow().filter(|item| *item % 2 == 0))
.run_collect()
.unwrap(),
vec![0, 2]
);
assert_eq!(
Source::from_iter(0..4)
.via(buffered_flow().filter_not(|item| *item % 2 == 0))
.run_collect()
.unwrap(),
vec![1, 3]
);
assert_eq!(
Source::from_iter(0..4)
.via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
.run_collect()
.unwrap(),
vec![10, 12]
);
assert_eq!(
Source::from_iter(0..3)
.via(buffered_flow().map_concat(|item| [item, item + 10]))
.run_collect()
.unwrap(),
vec![0, 10, 1, 11, 2, 12]
);
assert_eq!(
Source::from_iter(0..3)
.via(buffered_flow().stateful_map(5, |state, item| {
*state += item;
*state
}))
.run_collect()
.unwrap(),
vec![5, 6, 8]
);
assert_eq!(
Source::from_iter(0..3)
.via(buffered_flow().stateful_map_concat(0, |state, item| {
*state += item;
[*state, item]
}))
.run_collect()
.unwrap(),
vec![0, 0, 1, 1, 3, 2]
);
assert_eq!(
Source::from_iter(0..4)
.via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
.run_collect()
.unwrap(),
vec![1, 2, 3, 4]
);
assert_eq!(
Source::from_iter(0..4)
.via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
.run_collect()
.unwrap(),
vec![1, 2, 3, 4]
);
assert_eq!(
Source::from_iter(0..4)
.via(buffered_flow().map_async_partitioned(
2,
1,
|item| item % 2,
|item| async move { Ok(item + 1) },
))
.run_collect()
.unwrap(),
vec![1, 2, 3, 4]
);
assert_eq!(
Source::from_iter(0..5)
.via(buffered_flow().take(3))
.run_collect()
.unwrap(),
vec![0, 1, 2]
);
assert_eq!(
Source::from_iter(0..5)
.via(buffered_flow().drop(2))
.run_collect()
.unwrap(),
vec![2, 3, 4]
);
assert_eq!(
Source::from_iter(0..5)
.via(buffered_flow().take_while(|item| *item < 3))
.run_collect()
.unwrap(),
vec![0, 1, 2]
);
assert_eq!(
Source::from_iter(0..5)
.via(buffered_flow().drop_while(|item| *item < 3))
.run_collect()
.unwrap(),
vec![3, 4]
);
assert_eq!(
Source::from_iter(0..3)
.via(buffered_flow().limit(5))
.run_collect()
.unwrap(),
vec![0, 1, 2]
);
assert_eq!(
Source::from_iter(0..5)
.via(buffered_flow().grouped(2))
.run_collect()
.unwrap(),
vec![vec![0, 1], vec![2, 3], vec![4]]
);
assert_eq!(
Source::from_iter(1..=3)
.via(buffered_flow().scan(0, |acc, item| acc + item))
.run_collect()
.unwrap(),
vec![0, 1, 3, 6]
);
assert_eq!(
Source::from_iter(1..=4)
.via(buffered_flow().sliding(2, 1))
.run_collect()
.unwrap(),
vec![vec![1, 2], vec![2, 3], vec![3, 4]]
);
assert_eq!(
Source::from_iter(1..=4)
.via(buffered_flow().fold(0, |acc, item| acc + item))
.run_collect()
.unwrap(),
vec![10]
);
assert_eq!(
Source::from_iter(1..=4)
.via(buffered_flow().reduce(|acc, item| acc + item))
.run_collect()
.unwrap(),
vec![10]
);
assert_eq!(
Source::from_factory(|| {
Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
})
.via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
.run_collect(),
Err(StreamError::Failed("mapped".into()))
);
assert_eq!(
Source::<i32>::failed(StreamError::Failed("boom".into()))
.via(buffered_flow().recover(|_| Some(42)))
.run_collect()
.unwrap(),
vec![42]
);
assert_eq!(
Source::<i32>::failed(StreamError::Failed("boom".into()))
.via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
.run_collect()
.unwrap(),
vec![7, 8]
);
assert_eq!(
Source::<i32>::failed(StreamError::Failed("boom".into()))
.via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
.run_collect()
.unwrap(),
vec![9]
);
assert_eq!(
Source::from_factory(|| {
Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
})
.via(buffered_flow().on_error_complete())
.run_collect()
.unwrap(),
vec![1]
);
let materialized = Source::from_iter([1, 2, 3])
.run_with(
buffered_flow()
.via(Flow::identity().map(|item| item + 1))
.map_materialized_value(|_| "buffered-flow")
.to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
)
.unwrap();
assert_eq!(materialized.0, "buffered-flow");
assert_eq!(wait(materialized.1), 9);
let kept = Source::from_iter([1, 2, 3])
.run_with(
buffered_flow()
.via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
.to(Sink::fold(0, |acc, item| acc + item)),
)
.unwrap();
assert_eq!(kept, "combined");
}
#[test]
fn runtime_rate_flows_compose_in_flow_form() {
let conflate = Flow::identity()
.conflate(|left: i32, right| left + right)
.map(|item| item + 1);
assert_eq!(
Source::single(4).via(conflate).run_collect().unwrap(),
vec![5]
);
let batch = Flow::identity()
.batch(4, |item: i32| item, |left, right| left + right)
.map(|item| item + 1);
assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
let expand = Flow::identity()
.expand(std::iter::once::<i32>)
.map(|item| item + 1);
assert_eq!(
Source::from_iter(0..4).via(expand).run_collect().unwrap(),
vec![1, 2, 3, 4]
);
let aggregate = Flow::identity()
.aggregate_with_boundary(
Vec::<i32>::new,
|mut items, item| {
items.push(item);
let ready = !items.is_empty();
(items, ready)
},
|items| items.into_iter().sum::<i32>(),
None,
)
.map(|item| item + 1);
assert_eq!(
Source::from_iter(0..4)
.via(aggregate)
.run_collect()
.unwrap(),
vec![1, 2, 3, 4]
);
let detached = Flow::identity().detach().map(|item: i32| item + 1);
assert_eq!(
Source::from_iter(0..4).via(detached).run_collect().unwrap(),
vec![1, 2, 3, 4]
);
}
#[test]
fn high_use_source_flow_operators_work() {
let result = Source::from_iter(0..8)
.drop(1)
.take(5)
.filter_not(|item| item % 2 == 0)
.map_concat(|item| [item, item + 10])
.grouped(3)
.run_collect()
.unwrap();
assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
}
#[test]
fn prefix_and_tail_emits_prefix_and_live_tail() {
let mut outer = Source::from_iter(0..5)
.prefix_and_tail(2)
.run_collect()
.unwrap();
assert_eq!(outer.len(), 1);
let (prefix, tail) = outer.pop().unwrap();
assert_eq!(prefix, vec![0, 1]);
assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
assert_eq!(
tail.run_collect(),
Err(StreamError::Failed(
"substream source cannot be materialized more than once".into()
))
);
}
#[test]
fn prefix_and_tail_fails_before_prefix_is_ready() {
let result = Source::from_factory(|| {
Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
})
.prefix_and_tail(2)
.run_collect();
assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
}
#[test]
fn prefix_and_tail_tail_propagates_late_upstream_failure() {
let mut outer = Source::from_factory(|| {
Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
})
.prefix_and_tail(2)
.run_collect()
.unwrap();
let (prefix, tail) = outer.pop().unwrap();
assert_eq!(prefix, vec![1, 2]);
assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
}
#[test]
fn prefix_and_tail_accepts_non_clone_elements() {
#[derive(Debug, PartialEq, Eq)]
struct NonClone(u8);
let mut outer = Source::from_factory(|| {
Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
})
.prefix_and_tail(2)
.run_collect()
.unwrap();
let (prefix, tail) = outer.pop().unwrap();
assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
}
#[test]
fn flat_map_prefix_materializes_on_short_upstream_completion() {
let values = Source::from_iter([1, 2])
.flat_map_prefix(3, |prefix| {
let sum = prefix.into_iter().sum::<i32>();
Flow::identity().prepend(Source::single(sum))
})
.run_collect()
.unwrap();
assert_eq!(values, vec![3]);
}
#[test]
fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
let invoked = StdArc::new(StdAtomicBool::new(false));
let invoked_for_stage = StdArc::clone(&invoked);
let result = Source::from_factory(|| {
Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
})
.flat_map_prefix(3, move |_prefix| {
invoked_for_stage.store(true, StdOrdering::SeqCst);
Flow::identity()
})
.run_collect();
assert_eq!(result, Err(StreamError::Failed("boom".into())));
assert!(!invoked.load(StdOrdering::SeqCst));
}
#[test]
fn flat_map_concat_flattens_nested_sources_sequentially() {
let values = Source::from_iter([1, 2, 3])
.flat_map_concat(|item| Source::from_iter(0..item))
.run_collect()
.unwrap();
assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
}
#[test]
fn flat_map_merge_respects_breadth_bound() {
let active = StdArc::new(StdAtomicUsize::new(0));
let max_active = StdArc::new(StdAtomicUsize::new(0));
let active_for_stage = StdArc::clone(&active);
let max_for_stage = StdArc::clone(&max_active);
let mut values = Source::from_iter(0..6)
.flat_map_merge(2, move |item| {
let active = StdArc::clone(&active_for_stage);
let max_active = StdArc::clone(&max_for_stage);
Source::future(move || {
let active = StdArc::clone(&active);
let max_active = StdArc::clone(&max_active);
async move {
let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
loop {
let seen = max_active.load(StdOrdering::SeqCst);
if now <= seen {
break;
}
if max_active
.compare_exchange(
seen,
now,
StdOrdering::SeqCst,
StdOrdering::SeqCst,
)
.is_ok()
{
break;
}
}
thread::sleep(StdDuration::from_millis(20));
active.fetch_sub(1, StdOrdering::SeqCst);
Ok(item)
}
})
})
.run_collect()
.unwrap();
values.sort_unstable();
assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
assert!(max_active.load(StdOrdering::SeqCst) <= 2);
}
#[test]
fn flat_map_merge_propagates_inner_failures() {
let result = Source::from_iter([0, 1, 2])
.flat_map_merge(2, |item| {
if item == 1 {
Source::failed(StreamError::Failed("boom".into()))
} else {
Source::single(item)
}
})
.run_collect();
assert_eq!(result, Err(StreamError::Failed("boom".into())));
}
#[test]
fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
let (release_tx, release_rx) = mpsc::channel();
let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
let queue = Source::from_factory(move || {
let release_rx = StdArc::clone(&release_rx);
let mut step = 0_u8;
Box::new(std::iter::from_fn(move || {
let item = match step {
0 => Some(Ok(0)),
1 => {
release_rx
.lock()
.unwrap()
.as_ref()
.expect("release receiver available")
.recv_timeout(StdDuration::from_secs(1))
.expect("timed out waiting to release second upstream element");
Some(Ok(1))
}
_ => None,
};
step += 1;
item
}))
})
.flat_map_merge(2, |item| Source::single(item + 10))
.run_with(Sink::queue())
.unwrap();
assert_eq!(queue.pull().unwrap(), Some(10));
release_tx.send(()).unwrap();
assert_eq!(queue.pull().unwrap(), Some(11));
assert!(queue.pull().unwrap().is_none());
}
#[test]
fn group_by_routes_keys_and_drops_closed_keys() {
let outer = Source::from_iter([0, 1, 2, 3, 4])
.group_by(4, |item| item % 2, false)
.run_with(Sink::queue())
.unwrap();
let even = outer.pull().unwrap().unwrap();
let even_completion = even.run_with(Sink::ignore()).unwrap();
let odd = outer.pull().unwrap().unwrap();
drop(even_completion);
assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
assert!(outer.pull().unwrap().is_none());
}
#[test]
fn group_by_fails_when_distinct_key_limit_is_exceeded() {
let outer = Source::from_iter([0, 1, 2])
.group_by(2, |item| *item, false)
.run_with(Sink::queue())
.unwrap();
let _ = outer.pull().unwrap().unwrap();
let _ = outer.pull().unwrap().unwrap();
assert!(matches!(
outer.pull(),
Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
));
}
#[test]
fn group_by_can_recreate_closed_substreams_when_enabled() {
let (release_tx, release_rx) = mpsc::channel();
let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
let outer = Source::from_factory(move || {
let release_rx = StdArc::clone(&release_rx);
let mut step = 0_u8;
Box::new(std::iter::from_fn(move || {
let item = match step {
0 => Some(Ok(0)),
1 => Some(Ok(1)),
2 => {
release_rx
.lock()
.unwrap()
.as_ref()
.expect("release receiver available")
.recv_timeout(StdDuration::from_secs(1))
.expect("timed out waiting to release recreated key");
Some(Ok(0))
}
_ => None,
};
step += 1;
item
}))
})
.group_by(4, |item| item % 2, true)
.run_with(Sink::queue())
.unwrap();
let even = outer.pull().unwrap().unwrap();
assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
release_tx.send(()).unwrap();
let odd = outer.pull().unwrap().unwrap();
assert_eq!(odd.run_collect().unwrap(), vec![1]);
let recreated_even = outer.pull().unwrap().unwrap();
assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
assert!(outer.pull().unwrap().is_none());
}
#[test]
fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
let outer = Source::from_iter([0, 1])
.group_by(
4,
|item| {
assert_ne!(*item, 1, "boom");
item % 2
},
false,
)
.run_with(Sink::queue())
.unwrap();
let substream = outer.pull().unwrap().unwrap();
let (result_tx, result_rx) = mpsc::channel();
thread::spawn(move || {
let _ = result_tx.send(substream.run_collect());
});
assert_eq!(
result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
Err(StreamError::AbruptTermination)
);
assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
}
#[test]
fn split_when_starts_new_substream_on_boundary_element() {
let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
.split_when(|item| *item == 0)
.run_with(Sink::queue())
.unwrap();
let first = outer.pull().unwrap().unwrap();
assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
let second = outer.pull().unwrap().unwrap();
assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
let third = outer.pull().unwrap().unwrap();
assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
assert!(outer.pull().unwrap().is_none());
}
#[test]
fn split_after_ends_current_substream_on_boundary_element() {
let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
.split_after(|item| *item == 0)
.run_with(Sink::queue())
.unwrap();
let first = outer.pull().unwrap().unwrap();
assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
let second = outer.pull().unwrap().unwrap();
assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
let third = outer.pull().unwrap().unwrap();
assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
assert!(outer.pull().unwrap().is_none());
}
#[test]
fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
let outer = Source::from_iter([1, 2])
.split_when(|item| {
assert_ne!(*item, 2, "boom");
false
})
.run_with(Sink::queue())
.unwrap();
let substream = outer.pull().unwrap().unwrap();
let (result_tx, result_rx) = mpsc::channel();
thread::spawn(move || {
let _ = result_tx.send(substream.run_collect());
});
assert_eq!(
result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
Err(StreamError::AbruptTermination)
);
assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
}
#[test]
fn split_when_pre_buffer_segments_match_expected_count() {
let outer = Source::from_iter(0..100)
.split_when(|item| *item != 0 && *item % 10 == 0)
.run_with(Sink::queue())
.unwrap();
let mut segment_count = 0;
while let Some(substream) = outer.pull().unwrap() {
let items: Vec<i32> = substream.run_collect().unwrap();
assert!(!items.is_empty(), "segment should not be empty");
segment_count += 1;
}
assert_eq!(segment_count, 10, "100 elements in segments of 10");
}
#[test]
fn split_after_pre_buffer_segments_match_expected_count() {
let outer = Source::from_iter(0..100)
.split_after(|item| (*item + 1) % 10 == 0)
.run_with(Sink::queue())
.unwrap();
let mut segment_count = 0;
let mut total = 0_i32;
while let Some(substream) = outer.pull().unwrap() {
let items: Vec<i32> = substream.run_collect().unwrap();
assert!(!items.is_empty(), "segment should not be empty");
total += items.len() as i32;
segment_count += 1;
}
assert_eq!(segment_count, 10);
assert_eq!(total, 100);
}
#[test]
fn group_by_single_key_fused_matches_general_path() {
let outer = Source::from_iter(0..1000i64)
.group_by(1, |_| 0u8, false)
.run_with(Sink::queue())
.unwrap();
let substream = outer.pull().unwrap().unwrap();
let items: Vec<i64> = substream.run_collect().unwrap();
assert_eq!(items.len(), 1000);
assert_eq!(items[0], 0);
assert_eq!(items[999], 999);
assert!(outer.pull().unwrap().is_none());
}
#[test]
fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
let outer = Source::from_iter([0, 1, 0])
.group_by(2, |item| *item, false)
.run_with(Sink::queue())
.unwrap();
let mut sources = vec![];
while let Some(source) = outer.pull().unwrap() {
sources.push(source);
}
assert_eq!(sources.len(), 2);
assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
}
#[test]
fn flat_map_merge_lock_lighter_matches_expected_count() {
let items = Source::from_iter(0..20)
.flat_map_merge(2, |item| Source::single(item + 100))
.run_with(Sink::queue())
.unwrap();
let mut count = 0;
while items.pull().unwrap().is_some() {
count += 1;
}
assert_eq!(count, 20);
}
#[test]
fn group_by_single_key_emits_substream_before_upstream_completes() {
let (tx, rx) = mpsc::sync_channel::<i32>(0);
let rx = StdArc::new(std::sync::Mutex::new(rx));
let outer = Source::from_factory({
let rx = StdArc::clone(&rx);
move || {
let rx = StdArc::clone(&rx);
Box::new(std::iter::from_fn(move || {
rx.lock().unwrap().recv().ok().map(Ok)
})) as BoxStream<i32>
}
})
.group_by(1, |_| 0u8, false)
.run_with(Sink::queue())
.unwrap();
let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
let outer_thread = thread::spawn(move || {
let substream = outer.pull().unwrap().expect("expected a substream");
sub_tx.send(substream).unwrap();
});
tx.send(0).unwrap();
let substream = sub_rx
.recv_timeout(StdDuration::from_secs(5))
.expect("timed out — group_by buffered first element before emitting substream");
for i in 1..100_i32 {
tx.send(i).unwrap();
}
drop(tx);
let items: Vec<i32> = substream.run_collect().unwrap();
assert_eq!(items.len(), 100);
outer_thread.join().unwrap();
}
#[test]
fn group_by_concurrent_live_substreams_do_not_hold_ready_item_stress() {
const STREAMS: usize = 32;
const ROUNDS: usize = 8;
const ITEMS: i64 = 8;
for _ in 0..ROUNDS {
let barrier = StdArc::new(std::sync::Barrier::new(STREAMS));
let mut handles = Vec::with_capacity(STREAMS);
for _ in 0..STREAMS {
let barrier = StdArc::clone(&barrier);
handles.push(thread::spawn(move || {
let (tx, rx) = mpsc::sync_channel::<i64>(0);
let rx = StdArc::new(std::sync::Mutex::new(rx));
let outer = Source::from_factory({
let rx = StdArc::clone(&rx);
move || {
let rx = StdArc::clone(&rx);
Box::new(std::iter::from_fn(move || {
rx.lock().unwrap().recv().ok().map(Ok)
})) as BoxStream<i64>
}
})
.group_by(1, |_| 0_u8, false)
.run_with(Sink::queue())
.unwrap();
barrier.wait();
tx.send(0).unwrap();
let substream = outer.pull().unwrap().expect("expected group_by substream");
let subqueue = substream.run_with(Sink::queue()).unwrap();
assert_eq!(subqueue.pull().unwrap(), Some(0));
for item in 1..ITEMS {
tx.send(item).unwrap();
assert_eq!(subqueue.pull().unwrap(), Some(item));
}
drop(tx);
assert!(subqueue.pull().unwrap().is_none());
assert!(outer.pull().unwrap().is_none());
}));
}
for handle in handles {
handle.join().expect("group_by stress worker panicked");
}
}
}
#[test]
fn split_when_emits_substream_before_segment_ends() {
const SEGMENT_LEN: usize = 300;
let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
for i in 0..SEGMENT_LEN as i32 {
tx.send(i).unwrap();
}
tx.send(-1).unwrap(); tx.send(99).unwrap(); drop(tx);
let outer = Source::from_iter(rx)
.split_when(|item| *item == -1)
.run_with(Sink::queue())
.unwrap();
let (result_tx, result_rx) = mpsc::channel();
thread::spawn(move || {
let first = outer.pull().unwrap().expect("expected first substream");
let items: Vec<i32> = first.run_collect().unwrap();
let second = outer.pull().unwrap().expect("expected second substream");
let items2: Vec<i32> = second.run_collect().unwrap();
let done = outer.pull().unwrap().is_none();
let _ = result_tx.send((items, items2, done));
});
let (items, items2, done) = result_rx
.recv_timeout(StdDuration::from_secs(5))
.expect("timed out — split_when is buffering the whole segment");
assert_eq!(items.len(), SEGMENT_LEN);
assert_eq!(items2, vec![-1, 99]);
assert!(done);
}
#[test]
fn split_after_emits_substream_before_segment_ends() {
const SEGMENT_LEN: usize = 300;
let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
for i in 0..SEGMENT_LEN as i32 {
tx.send(i).unwrap();
}
tx.send(-1).unwrap(); tx.send(99).unwrap();
drop(tx);
let outer = Source::from_iter(rx)
.split_after(|item| *item == -1)
.run_with(Sink::queue())
.unwrap();
let (result_tx, result_rx) = mpsc::channel();
thread::spawn(move || {
let first = outer.pull().unwrap().expect("expected first substream");
let items: Vec<i32> = first.run_collect().unwrap();
let second = outer.pull().unwrap().expect("expected second substream");
let items2: Vec<i32> = second.run_collect().unwrap();
let done = outer.pull().unwrap().is_none();
let _ = result_tx.send((items, items2, done));
});
let (items, items2, done) = result_rx
.recv_timeout(StdDuration::from_secs(5))
.expect("timed out — split_after is buffering the whole segment");
assert_eq!(items.len(), SEGMENT_LEN + 1);
assert_eq!(items2, vec![99]);
assert!(done);
}
#[test]
fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
for _ in 0..20 {
let result = Source::from_iter(0..50_i32)
.flat_map_merge(8, |item| Source::from_iter(item..item + 3))
.run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
.unwrap()
.wait();
assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
}
}
#[test]
fn flat_map_merge_single_mutex_race_stress() {
for _ in 0..20 {
let result = Source::from_iter(0..100_i64)
.flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
.run_with(Sink::fold(0i64, |acc, v| acc + v))
.unwrap()
.wait();
assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
}
}
#[test]
fn split_when_bounded_memory_rendezvous() {
const SEGMENT: usize = 100;
let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
for i in 0..SEGMENT as i32 {
tx.send(i).unwrap();
}
tx.send(-1).unwrap(); for i in 0..10_i32 {
tx.send(i).unwrap();
}
drop(tx);
let outer = Source::from_iter(rx)
.split_when(|item| *item == -1)
.run_with(Sink::queue())
.unwrap();
let (result_tx, result_rx) = mpsc::channel();
thread::spawn(move || {
let first = outer.pull().unwrap().expect("first segment");
let seg1: Vec<i32> = first.run_collect().unwrap();
let second = outer.pull().unwrap().expect("second segment");
let seg2: Vec<i32> = second.run_collect().unwrap();
let done = outer.pull().unwrap().is_none();
result_tx.send((seg1, seg2, done)).unwrap();
});
let (seg1, seg2, done) = result_rx
.recv_timeout(StdDuration::from_secs(5))
.expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
assert_eq!(seg1.len(), SEGMENT, "first segment length");
assert_eq!(seg2[0], -1, "boundary element starts second segment");
assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
assert!(done);
}
#[test]
fn group_by_single_key_bounded_memory_rendezvous() {
const N: usize = 200;
let outer = Source::from_iter(0..N as i64)
.group_by(1, |_| 0u8, false)
.run_with(Sink::queue())
.unwrap();
let (result_tx, result_rx) = mpsc::channel();
thread::spawn(move || {
let substream = outer.pull().unwrap().expect("substream");
let items: Vec<i64> = substream.run_collect().unwrap();
let done = outer.pull().unwrap().is_none();
result_tx.send((items, done)).unwrap();
});
let (items, done) = result_rx
.recv_timeout(StdDuration::from_secs(5))
.expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
assert_eq!(items.len(), N, "all items delivered");
assert_eq!(items[0], 0);
assert_eq!(items[N - 1], (N - 1) as i64);
assert!(done);
}
#[test]
fn scan_emits_seed_and_accumulated_values() {
let result = Source::from_iter(1..=3)
.scan(0, |acc, item| acc + item)
.run_collect()
.unwrap();
assert_eq!(result, vec![0, 1, 3, 6]);
}
#[test]
fn limit_fails_after_max_elements() {
let result = Source::from_iter(0..3).limit(2).run_collect();
assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
}
#[test]
fn limit_weighted_fails_with_limit_error_like_akka() {
let result = Source::from_iter(["this", "is", "some", "string"])
.via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
.run_collect();
assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
}
#[test]
fn grouped_weighted_allows_oversized_first_element_like_akka() {
let result = Source::from_iter([10_usize, 1, 2])
.via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
.run_collect()
.unwrap();
assert_eq!(result, vec![vec![10], vec![1, 2]]);
}
#[test]
fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
let result = Source::from_iter([1_usize, 10, 2])
.via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
.run_collect()
.unwrap();
assert_eq!(result, vec![vec![1, 10], vec![2]]);
}
#[test]
fn sink_terminals_materialize_results() {
let sum = Source::from_iter(1..=4)
.run_with(Sink::fold(0, |acc, item| acc + item))
.unwrap();
assert_eq!(wait(sum), 10);
assert_eq!(
wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
1
);
assert_eq!(
wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
4
);
}
#[test]
fn all_terminal_sink_variants_complete() {
assert_eq!(
wait(
Source::from_iter([1, 2, 3])
.run_with(Sink::collect())
.unwrap()
),
vec![1, 2, 3]
);
assert_eq!(
wait(
Source::<i32>::empty()
.run_with(Sink::head_option())
.unwrap()
),
None
);
assert_eq!(
wait(
Source::from_iter([1, 2, 3])
.run_with(Sink::last_option())
.unwrap()
),
Some(3)
);
assert_eq!(
wait(
Source::from_iter([1, 2, 3])
.run_with(Sink::reduce(|acc, item| acc + item))
.unwrap()
),
6
);
let seen = StdArc::new(StdAtomicUsize::new(0));
let seen_by_sink = StdArc::clone(&seen);
assert_eq!(
wait(
Source::from_iter([1_usize, 2, 3])
.run_with(Sink::foreach(move |item| {
seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
}))
.unwrap()
),
NotUsed
);
assert_eq!(seen.load(StdOrdering::SeqCst), 6);
}
#[test]
fn take_last_zero_returns_empty_vector() {
let result = Source::from_iter([1, 2, 3])
.run_with(Sink::take_last(0))
.unwrap();
assert_eq!(wait(result), Vec::<i32>::new());
}
#[test]
fn bounded_head_terminals_complete_inline() {
let materializer = Materializer::new();
let mut head = Source::from_iter(0_u64..1_000)
.run_with_materializer(Sink::head(), &materializer)
.unwrap();
assert_eq!(materializer.active_streams(), 0);
assert_eq!(head.try_wait(), Some(Ok(0)));
let mut filtered_head = Source::from_iter(0_u64..1_000)
.filter(|item| *item >= 10)
.run_with_materializer(Sink::head(), &materializer)
.unwrap();
assert_eq!(materializer.active_streams(), 0);
assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
let mut head_option = Source::<u64>::empty()
.run_with_materializer(Sink::head_option(), &materializer)
.unwrap();
assert_eq!(materializer.active_streams(), 0);
assert_eq!(head_option.try_wait(), Some(Ok(None)));
}
#[test]
fn bounded_head_fast_path_preserves_terminal_errors() {
let materializer = Materializer::new();
let mut empty = Source::<u64>::empty()
.run_with_materializer(Sink::head(), &materializer)
.unwrap();
assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
.run_with_materializer(Sink::head(), &materializer)
.unwrap();
assert_eq!(
failed.try_wait(),
Some(Err(StreamError::Failed("boom".into())))
);
assert_eq!(materializer.active_streams(), 0);
}
#[test]
fn runnable_graph_composes_source_and_sink() {
let graph = Source::from_iter(1..=4)
.map(|item| item * 2)
.to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
assert_eq!(wait(graph.run().unwrap()), 20);
let graph = Source::single(1)
.map_materialized_value(|_| 20)
.to(Sink::ignore())
.map_materialized_value(|value| value + 1);
assert_eq!(graph.run().unwrap(), 21);
let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
assert_eq!(ignored, NotUsed);
}
#[test]
fn materialized_values_follow_keep_defaults() {
let source = Source::single(1).map_materialized_value(|_| "source");
let flow = Flow::identity().map_materialized_value(|_| "flow");
let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
assert_eq!(source_mat.unwrap(), "source");
let combined = source
.via_mat(flow, Keep::both)
.to_mat(Sink::ignore(), Keep::both)
.run()
.unwrap();
assert_eq!(combined.0, ("source", "flow"));
assert_eq!(wait(combined.1), NotUsed);
let sink_mat = Source::single(41)
.map_materialized_value(|_| "ignored source")
.run_with(Sink::fold(1, |acc, item| acc + item))
.unwrap();
assert_eq!(wait(sink_mat), 42);
}
#[test]
fn flow_to_sink_preserves_flow_materialized_value_by_default() {
let sink = Flow::identity()
.map(|item: i32| item + 1)
.map_materialized_value(|_| "flow")
.to(Sink::fold(0, |acc, item| acc + item));
let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
assert_eq!(materialized, "flow");
let explicit = Flow::identity()
.map(|item: i32| item + 1)
.map_materialized_value(|_| "flow")
.to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
.run_with(Source::from_iter([1, 2, 3]))
.unwrap();
assert_eq!(explicit, NotUsed);
let explicit = Source::from_iter([1, 2, 3])
.run_with(
Flow::identity()
.map(|item: i32| item + 1)
.map_materialized_value(|_| "flow")
.to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
)
.unwrap();
assert_eq!(explicit.0, "flow");
assert_eq!(wait(explicit.1), 9);
}
#[test]
fn materializer_shutdown_fails_materialization() {
let materializer = Materializer::new();
let named = materializer.with_name_prefix("test-stream");
materializer.shutdown();
let graph = Source::single(1).to(Sink::ignore());
assert_eq!(named.name_prefix(), "test-stream");
assert_eq!(
graph.run_with_materializer(&named),
Err(StreamError::AbruptTermination)
);
}
#[test]
fn materializer_shutdown_fails_running_stream_completion() {
let materializer = Materializer::new();
let completion = Source::repeat(1)
.run_with_materializer(Sink::ignore(), &materializer)
.unwrap();
assert_eq!(materializer.active_streams(), 1);
materializer.shutdown();
assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
assert_eq!(materializer.active_streams(), 0);
}
#[test]
fn dropped_stream_completion_cancels_running_stream() {
let materializer = Materializer::new();
let completion = Source::repeat(1)
.run_with_materializer(Sink::ignore(), &materializer)
.unwrap();
assert_eq!(materializer.active_streams(), 1);
drop(completion);
for _ in 0..50 {
if materializer.active_streams() == 0 {
break;
}
thread::sleep(Duration::from_millis(5));
}
assert_eq!(materializer.active_streams(), 0);
}
#[test]
fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
let materializer = Materializer::new();
let (once_tx, once_rx) = mpsc::channel();
let once = materializer.schedule_once(Duration::from_millis(5), move || {
once_tx.send(()).unwrap();
});
once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
assert!(!once.is_cancelled());
let (cancelled_tx, cancelled_rx) = mpsc::channel();
let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
cancelled_tx.send(()).unwrap();
});
assert!(cancelled.cancel());
assert!(!cancelled.cancel());
assert!(cancelled.is_cancelled());
assert!(
cancelled_rx
.recv_timeout(Duration::from_millis(75))
.is_err()
);
let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
let fixed_delay = materializer.schedule_with_fixed_delay(
Duration::from_millis(1),
Duration::from_millis(5),
move || {
fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
},
);
thread::sleep(Duration::from_millis(25));
assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
fixed_delay.cancel();
let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
let fixed_rate = materializer.schedule_at_fixed_rate(
Duration::from_millis(1),
Duration::from_millis(5),
move || {
fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
},
);
thread::sleep(Duration::from_millis(25));
assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
fixed_rate.cancel();
let shutdown_materializer = Materializer::new();
let (shutdown_tx, shutdown_rx) = mpsc::channel();
shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
shutdown_tx.send(()).unwrap();
});
shutdown_materializer.shutdown();
assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
}
#[test]
fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
use std::sync::{Condvar, Mutex};
#[derive(Debug)]
enum TimerEvent {
Started(usize, Instant),
Completed(usize, Instant),
}
let recv_event = |rx: &mpsc::Receiver<TimerEvent>, label: &str| {
rx.recv_timeout(Duration::from_secs(20))
.unwrap_or_else(|err| panic!("{label}: expected timer event within 20 s: {err}"))
};
let release = |gate: &StdArc<(Mutex<bool>, Condvar)>| {
let (released, condvar) = &**gate;
let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
*released = true;
condvar.notify_all();
};
let interval = Duration::from_secs(2);
let overrun = interval + Duration::from_millis(250);
let rate_materializer = Materializer::new();
let (rate_tx, rate_rx) = mpsc::channel();
let rate_runs = StdArc::new(StdAtomicUsize::new(0));
let rate_task_runs = StdArc::clone(&rate_runs);
let rate_gate = StdArc::new((Mutex::new(false), Condvar::new()));
let rate_task_gate = StdArc::clone(&rate_gate);
let fixed_rate =
rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
let run = rate_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
rate_tx
.send(TimerEvent::Started(run, Instant::now()))
.unwrap();
if run == 1 {
let (released, condvar) = &*rate_task_gate;
let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
while !*released {
released = condvar
.wait(released)
.unwrap_or_else(|poison| poison.into_inner());
}
rate_tx
.send(TimerEvent::Completed(run, Instant::now()))
.unwrap();
}
});
let rate_first_started = match recv_event(&rate_rx, "fixed-rate first task") {
TimerEvent::Started(1, at) => at,
other => panic!("fixed-rate first task: unexpected event {other:?}"),
};
assert!(wait_until(Duration::from_secs(20), || {
rate_first_started.elapsed() >= overrun
}));
release(&rate_gate);
let rate_first_completed = match recv_event(&rate_rx, "fixed-rate first completion") {
TimerEvent::Completed(1, at) => at,
other => panic!("fixed-rate first completion: unexpected event {other:?}"),
};
let rate_second_started = match recv_event(&rate_rx, "fixed-rate second task") {
TimerEvent::Started(2, at) => at,
other => panic!("fixed-rate second task: unexpected event {other:?}"),
};
fixed_rate.cancel();
rate_materializer.shutdown();
let delay_materializer = Materializer::new();
let (delay_tx, delay_rx) = mpsc::channel();
let delay_runs = StdArc::new(StdAtomicUsize::new(0));
let delay_task_runs = StdArc::clone(&delay_runs);
let delay_gate = StdArc::new((Mutex::new(false), Condvar::new()));
let delay_task_gate = StdArc::clone(&delay_gate);
let fixed_delay =
delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
let run = delay_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
delay_tx
.send(TimerEvent::Started(run, Instant::now()))
.unwrap();
if run == 1 {
let (released, condvar) = &*delay_task_gate;
let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
while !*released {
released = condvar
.wait(released)
.unwrap_or_else(|poison| poison.into_inner());
}
delay_tx
.send(TimerEvent::Completed(run, Instant::now()))
.unwrap();
}
});
let delay_first_started = match recv_event(&delay_rx, "fixed-delay first task") {
TimerEvent::Started(1, at) => at,
other => panic!("fixed-delay first task: unexpected event {other:?}"),
};
assert!(wait_until(Duration::from_secs(20), || {
delay_first_started.elapsed() >= overrun
}));
release(&delay_gate);
let delay_first_completed = match recv_event(&delay_rx, "fixed-delay first completion") {
TimerEvent::Completed(1, at) => at,
other => panic!("fixed-delay first completion: unexpected event {other:?}"),
};
let delay_second_started = match recv_event(&delay_rx, "fixed-delay second task") {
TimerEvent::Started(2, at) => at,
other => panic!("fixed-delay second task: unexpected event {other:?}"),
};
fixed_delay.cancel();
delay_materializer.shutdown();
let rate_task_time = rate_first_completed.duration_since(rate_first_started);
let rate_catch_up = rate_second_started.duration_since(rate_first_completed);
let delay_task_time = delay_first_completed.duration_since(delay_first_started);
let delay_gap = delay_second_started.duration_since(delay_first_completed);
assert!(
rate_task_time >= interval,
"fixed-rate first task should overrun its interval; ran for {rate_task_time:?}"
);
assert!(
rate_catch_up < interval,
"fixed-rate second task should catch up after an overrun; waited {rate_catch_up:?}"
);
assert!(
delay_task_time >= interval,
"fixed-delay first task should overrun its interval; ran for {delay_task_time:?}"
);
assert!(
delay_gap >= interval,
"fixed-delay second task fired before one full delay elapsed after completion: {delay_gap:?}",
);
}
#[test]
fn runtime_repeating_timer_cancellation_stops_future_fires() {
let materializer = Materializer::new();
let (tx, rx) = mpsc::channel();
let timer = materializer.schedule_at_fixed_rate(
Duration::from_millis(1),
Duration::from_millis(30),
move || {
tx.send(()).unwrap();
},
);
rx.recv_timeout(Duration::from_millis(250)).unwrap();
assert!(timer.cancel());
assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
materializer.shutdown();
}
#[test]
fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
let materializer = Materializer::new();
materializer.schedule_once(Duration::from_millis(1), || {
panic!("timer boom");
});
let (tx, rx) = mpsc::channel();
materializer.schedule_once(Duration::from_millis(20), move || {
tx.send(()).unwrap();
});
rx.recv_timeout(Duration::from_millis(250)).unwrap();
materializer.shutdown();
}
#[test]
fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
let materializer = Materializer::new();
let panic_count = StdArc::new(StdAtomicUsize::new(0));
let panic_count_task = StdArc::clone(&panic_count);
materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
panic_count_task.fetch_add(1, StdOrdering::SeqCst);
panic!("fixed-rate boom");
});
assert!(wait_until(Duration::from_millis(150), || {
panic_count.load(StdOrdering::SeqCst) == 1
}));
let (tx, rx) = mpsc::channel();
materializer.schedule_once(Duration::from_millis(30), move || {
tx.send(()).unwrap();
});
rx.recv_timeout(Duration::from_millis(250)).unwrap();
thread::sleep(Duration::from_millis(90));
assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
materializer.shutdown();
}
#[test]
fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
let materializer = Materializer::new();
let started = StdArc::new(StdAtomicBool::new(false));
let started_task = StdArc::clone(&started);
let slow_timer = materializer.schedule_at_fixed_rate(
Duration::ZERO,
Duration::from_millis(250),
move || {
started_task.store(true, StdOrdering::SeqCst);
thread::sleep(Duration::from_millis(200));
},
);
assert!(wait_until(Duration::from_millis(100), || {
started.load(StdOrdering::SeqCst)
}));
let start = Instant::now();
let (tx, rx) = mpsc::channel();
materializer.schedule_once(Duration::from_millis(10), move || {
tx.send(Instant::now()).unwrap();
});
let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
let elapsed = fired_at.duration_since(start);
slow_timer.cancel();
materializer.shutdown();
assert!(
elapsed < Duration::from_millis(150),
"unrelated timer was delayed by a blocking timer task: {elapsed:?}",
);
}
#[test]
fn runtime_shutdown_stops_timer_driver_thread() {
let materializer = Materializer::new();
assert!(wait_until(Duration::from_secs(1), || materializer
.timer_driver_is_live()));
materializer.shutdown();
assert!(wait_until(Duration::from_secs(2), || !materializer
.timer_driver_is_live()));
}
#[test]
fn runtime_timer_driver_orders_many_timers_by_deadline() {
let materializer = Materializer::new();
let (tx, rx) = mpsc::channel();
let schedule = [(450_u64, 4_u8), (50, 1), (350, 3), (150, 2), (550, 5)];
for (delay_ms, value) in schedule {
let tx = tx.clone();
materializer.schedule_once(Duration::from_millis(delay_ms), move || {
tx.send(value).unwrap();
});
}
drop(tx);
let mut received = Vec::new();
for _ in 0..schedule.len() {
received.push(rx.recv_timeout(Duration::from_secs(10)).unwrap());
}
materializer.shutdown();
assert_eq!(received, vec![1, 2, 3, 4, 5]);
}
#[test]
fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
let materializer = Materializer::new();
let thread_name = materializer.timer_thread_name().to_owned();
let linux_thread_name = thread_name.chars().take(15).collect::<String>();
assert!(wait_until(Duration::from_secs(5), || {
materializer.timer_driver_is_live() && linux_thread_count(&linux_thread_name) >= 1
}));
let live_timer_threads = linux_thread_count(&linux_thread_name);
for _ in 0..128 {
materializer.schedule_once(Duration::from_secs(60), || {});
}
assert!(
wait_until(Duration::from_secs(5), || {
materializer.timer_driver_is_live()
&& linux_thread_count(&linux_thread_name) == live_timer_threads
}),
"scheduling timers should not create extra timer threads for a runtime",
);
materializer.shutdown();
assert!(wait_until(Duration::from_secs(5), || {
!materializer.timer_driver_is_live()
&& linux_thread_count(&linux_thread_name) < live_timer_threads
}));
}
#[test]
fn cancelled_and_never_sinks_have_distinct_materialization_results() {
assert_eq!(
Source::repeat(1)
.run_with(Sink::cancelled())
.expect("cancelled sink materializes"),
NotUsed
);
assert_eq!(
Source::single(1)
.run_with(Sink::never())
.expect("never sink materializes")
.try_wait(),
None
);
}
#[test]
fn never_sink_finishes_on_materializer_shutdown() {
let materializer = Materializer::new();
let completion = Source::single(1)
.run_with_materializer(Sink::never(), &materializer)
.unwrap();
materializer.shutdown();
assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
}
#[test]
fn dropping_source_never_completion_releases_parked_worker() {
let materializer = Materializer::new();
let completion = Source::<i32>::never()
.run_with_materializer(Sink::ignore(), &materializer)
.unwrap();
assert!(wait_until(StdDuration::from_secs(1), || {
materializer.active_streams() == 1
}));
assert_eq!(materializer.active_streams(), 1);
drop(completion);
assert!(wait_until(StdDuration::from_secs(15), || {
materializer.active_streams() == 0
}));
assert_eq!(materializer.active_streams(), 0);
}
#[test]
fn future_and_maybe_sources_emit_values() {
let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
assert_eq!(future_value, vec![7]);
let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
.run_collect()
.unwrap();
assert_eq!(future_source, vec![1, 2, 3]);
let (handle, source) = Source::maybe();
assert_eq!(
source.clone().run_collect(),
Err(StreamError::MaybeIncomplete)
);
handle.complete(9).unwrap();
assert_eq!(source.run_collect().unwrap(), vec![9]);
}
#[test]
fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
assert_eq!(
Source::cycle(|| [1, 2, 3].into_iter())
.take(8)
.run_collect()
.unwrap(),
vec![1, 2, 3, 1, 2, 3, 1, 2]
);
assert_eq!(
Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
Err(StreamError::Failed("empty iterator".into()))
);
assert_eq!(
Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
.run_collect()
.unwrap(),
vec![0, 1, 2, 3]
);
assert_eq!(
Source::unfold_async(0, |state| async move {
Ok((state < 4).then_some((state + 1, state * 2)))
})
.run_collect()
.unwrap(),
vec![0, 2, 4, 6]
);
assert!(matches!(
Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
));
}
#[test]
fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
let created = StdArc::new(StdAtomicUsize::new(0));
let created_for_source = StdArc::clone(&created);
let source = Source::<i32>::lazy_source(move || {
created_for_source.fetch_add(1, StdOrdering::SeqCst);
Source::from_iter([7, 8]).map_materialized_value(|_| 99)
});
let materializer = Materializer::new();
let (mut stream, mut mat) = StdArc::clone(&source.factory)
.create(&materializer)
.unwrap();
assert_eq!(created.load(StdOrdering::SeqCst), 0);
assert!(mat.try_wait().is_none());
assert_eq!(stream.next().unwrap().unwrap(), 7);
assert_eq!(mat.wait().unwrap(), 99);
assert_eq!(created.load(StdOrdering::SeqCst), 1);
assert_eq!(stream.next().unwrap().unwrap(), 8);
let never_created = StdArc::new(StdAtomicUsize::new(0));
let never_created_for_source = StdArc::clone(&never_created);
let mat = Source::<i32>::lazy_future_source(move || {
never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
async { Ok(Source::single(1)) }
})
.to(Sink::cancelled())
.run()
.unwrap();
assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
let lazy_future = StdArc::new(StdAtomicUsize::new(0));
let lazy_future_for_source = StdArc::clone(&lazy_future);
let source = Source::lazy_future(move || {
lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
async { Ok(42) }
});
let (mut stream, _) = StdArc::clone(&source.factory)
.create(&Materializer::new())
.unwrap();
assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
assert_eq!(stream.next().unwrap().unwrap(), 42);
assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
}
#[test]
fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
let closed = StdArc::new(StdAtomicUsize::new(0));
let closed_on_complete = StdArc::clone(&closed);
let values = Source::unfold_resource(
|| Ok(std::collections::VecDeque::from([1, 2, 3])),
|items| Ok(items.pop_front()),
move |_items| {
closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
Ok(())
},
)
.run_collect()
.unwrap();
assert_eq!(values, vec![1, 2, 3]);
assert_eq!(closed.load(StdOrdering::SeqCst), 1);
let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
let failed = Source::<i32>::unfold_resource(
|| Ok(()),
|_| Err(StreamError::Failed("read".into())),
move |_| {
closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
Err(StreamError::Failed("close".into()))
},
)
.run_collect();
assert_eq!(failed, Err(StreamError::Failed("read".into())));
assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
let first = Source::unfold_resource(
|| Ok(0_usize),
|next| {
let item = *next;
*next += 1;
Ok(Some(item))
},
move |_| {
closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
Ok(())
},
)
.run_with(Sink::head())
.unwrap();
assert_eq!(first.wait().unwrap(), 0);
assert!(wait_until(Duration::from_millis(250), || {
closed_on_cancel.load(StdOrdering::SeqCst) == 1
}));
}
#[test]
fn wp6b_async_resource_and_async_accumulators_are_sequential() {
let closed = StdArc::new(StdAtomicUsize::new(0));
let closed_for_close = StdArc::clone(&closed);
let values = Source::unfold_resource_async(
|| async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
|items| {
let item = items.pop_front();
async move { Ok(item) }
},
move |_items| {
let closed = StdArc::clone(&closed_for_close);
async move {
closed.fetch_add(1, StdOrdering::SeqCst);
Ok(())
}
},
)
.run_collect()
.unwrap();
assert_eq!(values, vec![1, 2, 3]);
assert_eq!(closed.load(StdOrdering::SeqCst), 1);
let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
let failed = Source::<i32>::unfold_resource_async(
|| async { Ok(()) },
|_resource| async { Err(StreamError::Failed("read".into())) },
move |_resource| {
let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
async move {
closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
Err(StreamError::Failed("close".into()))
}
},
)
.run_collect();
assert_eq!(failed, Err(StreamError::Failed("read".into())));
assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
let active = StdArc::new(StdAtomicUsize::new(0));
let max_active = StdArc::new(StdAtomicUsize::new(0));
let active_for_stage = StdArc::clone(&active);
let max_for_stage = StdArc::clone(&max_active);
let scanned = Source::from_iter(1..=4)
.scan_async(0, move |acc, item| {
let active = StdArc::clone(&active_for_stage);
let max_active = StdArc::clone(&max_for_stage);
async move {
let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
max_active.fetch_max(now, StdOrdering::SeqCst);
tokio::time::sleep(Duration::from_millis(1)).await;
active.fetch_sub(1, StdOrdering::SeqCst);
Ok(acc + item)
}
})
.run_collect()
.unwrap();
assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
let folded = Source::from_iter(1..=4)
.fold_async(0, |acc, item| async move { Ok(acc + item) })
.run_collect()
.unwrap();
assert_eq!(folded, vec![10]);
}
#[test]
fn wp6b_fold_async_materialization_does_not_drain_upstream() {
let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
let started = StdArc::new(StdAtomicBool::new(false));
let source = {
let release = StdArc::clone(&release);
let started = StdArc::clone(&started);
Source::from_factory(move || {
let release = StdArc::clone(&release);
let started = StdArc::clone(&started);
let mut emitted = false;
Box::new(std::iter::from_fn(move || {
if emitted {
return None;
}
emitted = true;
started.store(true, StdOrdering::SeqCst);
let (released, available) = &*release;
let mut released = released.lock().unwrap();
while !*released {
released = available.wait(released).unwrap();
}
Some(Ok(1))
}))
})
};
let (materialized_tx, materialized_rx) = mpsc::channel();
let join = thread::spawn(move || {
let queue = source
.fold_async(0, |acc, item| async move { Ok(acc + item) })
.run_with(Sink::queue())
.unwrap();
materialized_tx.send(queue).unwrap();
});
let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
Ok(queue) => queue,
Err(error) => {
let (released, available) = &*release;
*released.lock().unwrap() = true;
available.notify_all();
let _ = join.join();
panic!("fold_async materialization did not return before first pull: {error}");
}
};
let (released, _) = &*release;
assert!(
!*released.lock().unwrap(),
"test source was released before materialization returned"
);
let (released, available) = &*release;
*released.lock().unwrap() = true;
available.notify_all();
assert_eq!(queue.pull().unwrap(), Some(1));
assert_eq!(queue.pull().unwrap(), None);
join.join().unwrap();
assert!(started.load(StdOrdering::SeqCst));
}
#[test]
fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
let empty_sink = Source::<i32>::empty()
.run_with(Sink::lazy_sink(move || {
lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
Sink::ignore()
}))
.unwrap();
assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
Source::from_iter([1_usize, 2, 3])
.run_with(Sink::foreach_async(2, move |item| {
let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
async move {
foreach_sum.fetch_add(item, StdOrdering::SeqCst);
Ok(())
}
}))
.unwrap()
.wait()
.unwrap();
assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
Flow::identity()
.map(|item: i32| item + 10)
.map_materialized_value(|_| 123)
});
let mat = (lazy_flow.materialize)().unwrap();
let mut stream = match lazy_flow.transform {
flow::FlowTransform::Runtime(transform) => {
transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
}
flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
};
assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
assert_eq!(stream.next().unwrap().unwrap(), 11);
assert_eq!(mat.wait().unwrap(), 123);
assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
assert_eq!(stream.next().unwrap().unwrap(), 12);
let future_flow = Source::from_iter([1, 2])
.via_mat(
Flow::future_flow(|| async {
Ok(Flow::identity()
.map(|item: i32| item * 2)
.map_materialized_value(|_| 77))
}),
Keep::right,
)
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
assert_eq!(future_flow.0.wait().unwrap(), 77);
assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
}
#[test]
fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
for round in 0..50 {
let counter = StdArc::new(StdAtomicUsize::new(1));
let factory_counter = StdArc::clone(&counter);
let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
Flow::identity()
.map(move |x: usize| x * 100 + id)
.map_materialized_value(move |_| id)
});
let lazy_again = lazy.clone();
let ((first_mat, second_mat), out) = Source::from_iter([0usize])
.via_mat(lazy, Keep::right)
.via_mat(lazy_again, Keep::both)
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
let first_id = first_mat.wait().unwrap();
let second_id = second_mat.wait().unwrap();
let element = out.wait().unwrap()[0];
assert_eq!(
element,
first_id * 100 + second_id,
"round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
);
assert_ne!(
first_id, second_id,
"round {round}: same factory instance paired twice"
);
}
}
#[test]
fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
for _ in 0..20 {
let next_id = StdArc::new(StdAtomicUsize::new(0));
let next_id_for_factory = StdArc::clone(&next_id);
let flow = Flow::<i32, i32>::lazy_flow(move || {
let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
Flow::identity()
.map(move |item: i32| item + (id as i32 * 100))
.map_materialized_value(move |_| id)
});
let barrier = StdArc::new(std::sync::Barrier::new(3));
let spawn_materialization = |input: i32| {
let flow = flow.clone();
let barrier = StdArc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
let (mat, values) = Source::single(input)
.via_mat(flow, Keep::right)
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
(input, mat.wait().unwrap(), values.wait().unwrap())
})
};
let first = spawn_materialization(1);
let second = spawn_materialization(2);
barrier.wait();
for result in [first.join().unwrap(), second.join().unwrap()] {
let (input, mat_id, values) = result;
assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
}
assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
}
}
#[test]
fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
let queue = Source::from_factory(|| {
Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
})
.map_with_resource(
|| Ok(()),
|_resource, item| Ok(item + 10),
|_resource| Ok(Some(99)),
)
.run_with(Sink::queue())
.unwrap();
assert_eq!(queue.pull().unwrap(), Some(11));
assert_eq!(queue.pull().unwrap(), Some(99));
assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
let failed: StreamResult<Vec<i32>> = Source::single(1)
.map_with_resource(
|| Ok(()),
|_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
|_resource| -> StreamResult<Option<i32>> {
Err(StreamError::Failed("close".into()))
},
)
.run_collect();
assert_eq!(failed, Err(StreamError::Failed("map".into())));
}
#[test]
fn stateful_and_terminal_source_operators_work() {
let stateful = Source::from_iter([1, 2, 3])
.stateful_map(0, |sum, item| {
*sum += item;
*sum
})
.run_collect()
.unwrap();
assert_eq!(stateful, vec![1, 3, 6]);
let concat = Source::from_iter([1, 2, 3])
.stateful_map_concat(0, |sum, item| {
*sum += item;
[item, *sum]
})
.run_collect()
.unwrap();
assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
assert_eq!(
Source::from_iter([1, 2, 3])
.fold(10, |acc, item| acc + item)
.run_collect()
.unwrap(),
vec![16]
);
assert_eq!(
Source::from_iter([1, 2, 3])
.reduce(|acc, item| acc + item)
.run_collect()
.unwrap(),
vec![6]
);
}
#[test]
fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
let concat = Source::single(())
.map_concat(|_| 0_u64..)
.take(1)
.run_collect()
.unwrap();
assert_eq!(concat, vec![0]);
let sliding = Source::repeat(1_u64)
.sliding(2, 1)
.take(1)
.run_collect()
.unwrap();
assert_eq!(sliding, vec![vec![1, 1]]);
}
#[test]
fn fan_in_source_operators_follow_ordering_rules() {
assert_eq!(
Source::from_iter([1, 2])
.concat(Source::from_iter([3, 4]))
.run_collect()
.unwrap(),
vec![1, 2, 3, 4]
);
assert_eq!(
Source::from_iter([3, 4])
.prepend(Source::from_iter([1, 2]))
.run_collect()
.unwrap(),
vec![1, 2, 3, 4]
);
assert_eq!(
Source::empty()
.or_else(Source::from_iter([10, 20]))
.run_collect()
.unwrap(),
vec![10, 20]
);
assert_eq!(
Source::from_iter([1, 2])
.or_else(Source::from_iter([10, 20]))
.run_collect()
.unwrap(),
vec![1, 2]
);
assert_eq!(
Source::from_iter([1, 2, 3])
.interleave(Source::from_iter([10, 11, 12]), 2)
.run_collect()
.unwrap(),
vec![1, 2, 10, 11, 3, 12]
);
}
#[test]
fn fan_in_flow_operators_compose_with_primary_stream() {
let concat = Source::from_iter([1, 2])
.via(Flow::identity().concat(Source::from_iter([3, 4])))
.run_collect()
.unwrap();
assert_eq!(concat, vec![1, 2, 3, 4]);
let prepend = Source::from_iter([3, 4])
.via(Flow::identity().prepend(Source::from_iter([1, 2])))
.run_collect()
.unwrap();
assert_eq!(prepend, vec![1, 2, 3, 4]);
let interleave = Source::from_iter([1, 2, 3])
.via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
.run_collect()
.unwrap();
assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
let merge_sorted = Source::from_iter([1, 4])
.via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
.run_collect()
.unwrap();
assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
let zip_latest = Source::from_iter([1, 2])
.via(Flow::identity().zip_latest(Source::single(10)))
.run_collect()
.unwrap();
assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
let zip_latest_with = Source::from_iter([1, 2])
.via(
Flow::identity()
.zip_latest_with(Source::single(10), false, |left, right| left + right),
)
.run_collect()
.unwrap();
assert_eq!(zip_latest_with, vec![11, 12]);
}
#[test]
fn fan_in_operators_propagate_errors_and_eager_close() {
assert!(matches!(
Source::failed(StreamError::Failed("boom".into()))
.or_else(Source::from_iter([1, 2]))
.run_collect(),
Err(StreamError::Failed(_))
));
assert!(matches!(
Source::from_iter([1, 2])
.prepend(Source::failed(StreamError::Failed("boom".into())))
.run_collect(),
Err(StreamError::Failed(_))
));
assert_eq!(
Source::from_iter([1, 2])
.interleave_all([Source::empty()], 1, true)
.run_collect()
.unwrap(),
vec![1]
);
}
#[test]
fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
AtomicUsize::new(0),
AtomicUsize::new(0),
AtomicUsize::new(0),
]);
let make_source = |idx: usize| {
let pulls = Arc::clone(&pulls);
Source::from_materialized_factory(move |_| {
let pulls = Arc::clone(&pulls);
let mut emitted = false;
Ok((
Box::new(std::iter::from_fn(move || {
pulls[idx].fetch_add(1, Ordering::SeqCst);
if !emitted && idx == 0 {
emitted = true;
Some(Ok(42))
} else {
None
}
})) as BoxStream<i32>,
NotUsed,
))
})
};
let result = make_source(0)
.interleave_all([make_source(1), make_source(2)], 1, false)
.run_with(Sink::head());
assert_eq!(wait(result.unwrap()), 42);
assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
assert_eq!(
pulls[1].load(Ordering::SeqCst),
0,
"second input should not be pulled when downstream cancels after first element"
);
assert_eq!(
pulls[2].load(Ordering::SeqCst),
0,
"third input should not be pulled before its turn"
);
}
#[test]
fn interleave_non_eager_drains_remaining_when_one_input_completes() {
assert_eq!(
Source::from_iter([1, 2, 3, 4])
.interleave_all(
[Source::from_iter([10]), Source::from_iter([20, 21, 22])],
1,
false
)
.run_collect()
.unwrap(),
vec![1, 10, 20, 2, 21, 3, 22, 4]
);
}
#[test]
fn remaining_merge_and_zip_family_matches_expected_ordering() {
assert_eq!(
Source::from_iter([1, 4])
.merge_sorted(Source::from_iter([2, 3, 5]))
.run_collect()
.unwrap(),
vec![1, 2, 3, 4, 5]
);
assert_eq!(
Source::from_iter([1, 2])
.merge_latest(Source::single(10), false)
.run_collect()
.unwrap(),
vec![vec![1, 10], vec![2, 10]]
);
assert_eq!(
Source::from_iter([1, 2, 3])
.merge_all([Source::from_iter([10, 11])], false)
.run_collect()
.unwrap(),
vec![1, 10, 2, 11, 3]
);
assert_eq!(
Source::from_iter([1, 2, 3])
.zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
.run_collect()
.unwrap(),
vec![11, 13, 15]
);
assert_eq!(
Source::from_iter([1, 2])
.zip_latest(Source::single(10))
.run_collect()
.unwrap(),
vec![(1, 10), (2, 10)]
);
assert_eq!(
Source::from_iter([1, 2, 3])
.zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
.run_collect()
.unwrap(),
vec![11, 12, 13]
);
assert_eq!(
Source::from_iter([1, 2])
.zip_all(Source::from_iter([10, 11, 12]), -1, -2)
.run_collect()
.unwrap(),
vec![(1, 10), (2, 11), (-1, 12)]
);
assert_eq!(
Source::from_iter([5, 6, 7])
.zip_with_index()
.run_collect()
.unwrap(),
vec![(5, 0), (6, 1), (7, 2)]
);
assert_eq!(
Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
.run_collect()
.unwrap(),
vec![vec![1, 10], vec![2, 20]]
);
assert_eq!(
Source::zip_with_n(
[
Source::from_iter([1, 2]),
Source::from_iter([10, 20]),
Source::from_iter([100, 200]),
],
|values| values.into_iter().sum::<i32>(),
)
.run_collect()
.unwrap(),
vec![111, 222]
);
assert_eq!(
Source::merge_prioritized_n(
[
(Source::from_iter([1, 2, 3, 4]), 2),
(Source::from_iter([10, 11]), 1),
],
false,
)
.run_collect()
.unwrap(),
vec![1, 2, 10, 3, 4, 11]
);
assert_eq!(
Source::combine(
Source::from_iter([1, 2, 3]),
Source::from_iter([10, 11]),
std::iter::empty::<Source<i32, NotUsed>>(),
SourceCombineStrategy::Merge {
eager_complete: false,
},
)
.run_collect()
.unwrap(),
vec![1, 10, 2, 11, 3]
);
let combined_sink = Sink::combine(
Sink::ignore(),
Sink::ignore(),
std::iter::empty::<Sink<i32, NotUsed>>(),
SinkCombineStrategy::Broadcast,
);
assert_eq!(
Source::from_iter([1, 2, 3])
.run_with(combined_sink)
.unwrap(),
NotUsed
);
}
#[test]
fn sink_combine_broadcast_delivers_every_element_to_every_child() {
let first_count = StdArc::new(StdAtomicUsize::new(0));
let second_count = StdArc::new(StdAtomicUsize::new(0));
let first_counter = StdArc::clone(&first_count);
let second_counter = StdArc::clone(&second_count);
let combined = Sink::combine(
Sink::foreach(move |_: i32| {
first_counter.fetch_add(1, StdOrdering::SeqCst);
}),
Sink::foreach(move |_: i32| {
second_counter.fetch_add(1, StdOrdering::SeqCst);
}),
std::iter::empty::<Sink<i32, NotUsed>>(),
SinkCombineStrategy::Broadcast,
);
assert_eq!(
Source::from_iter(0..100).run_with(combined).unwrap(),
NotUsed
);
assert!(wait_until(StdDuration::from_secs(1), || {
first_count.load(StdOrdering::SeqCst) == 100
&& second_count.load(StdOrdering::SeqCst) == 100
}));
}
#[test]
fn zip_latest_completes_when_one_side_finishes_without_emitting() {
assert_eq!(
Source::from_iter(std::iter::empty::<i32>())
.zip_latest_with(Source::repeat(10), false, |left, right| left + right)
.run_collect()
.unwrap(),
Vec::<i32>::new()
);
assert_eq!(
Source::repeat(10)
.zip_latest_with(
Source::from_iter(std::iter::empty::<i32>()),
false,
|left, right| left + right,
)
.run_collect()
.unwrap(),
Vec::<i32>::new()
);
}
#[test]
fn zip_family_completion_boundaries_match_expected_results() {
assert_eq!(
Source::from_iter([1, 2, 3])
.zip_with(Source::from_iter([10]), |left, right| left + right)
.run_collect()
.unwrap(),
vec![11]
);
assert_eq!(
Source::from_iter([1, 2, 3])
.zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
.run_collect()
.unwrap(),
vec![11, 12]
);
assert_eq!(
Source::zip_n([
Source::from_iter([1, 2, 3]),
Source::from_iter([10]),
Source::from_iter([100, 200, 300]),
])
.run_collect()
.unwrap(),
vec![vec![1, 10, 100]]
);
}
#[test]
fn combine_strategies_follow_merge_concat_and_priority_rules() {
assert_eq!(
Source::combine(
Source::from_iter([1, 2]),
Source::from_iter([10, 11]),
[Source::from_iter([100])],
SourceCombineStrategy::Concat,
)
.run_collect()
.unwrap(),
vec![1, 2, 10, 11, 100]
);
assert_eq!(
Source::combine(
Source::from_iter([1, 2, 3, 4]),
Source::from_iter([10, 11]),
std::iter::empty::<Source<i32, NotUsed>>(),
SourceCombineStrategy::Prioritized {
priorities: vec![2, 1],
eager_complete: false,
},
)
.run_collect()
.unwrap(),
vec![1, 2, 10, 3, 4, 11]
);
}
#[test]
fn concat_lazy_defers_follow_on_source_until_needed() {
let source_counter = StdArc::new(StdAtomicUsize::new(0));
let source_counter_clone = StdArc::clone(&source_counter);
let lazy_source = Source::from_materialized_factory(move |_| {
source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
});
let source_head = Source::single(1)
.concat_lazy(lazy_source)
.run_with(Sink::head());
assert_eq!(wait(source_head.unwrap()), 1);
assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
let flow_counter = StdArc::new(StdAtomicUsize::new(0));
let flow_counter_clone = StdArc::clone(&flow_counter);
let lazy_flow_source = Source::from_materialized_factory(move |_| {
flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
});
let flow_head = Source::single(1)
.via(Flow::identity().concat_lazy(lazy_flow_source))
.run_with(Sink::head());
assert_eq!(wait(flow_head.unwrap()), 1);
assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
}
#[test]
fn also_to_completes_when_side_sink_cancels() {
assert_eq!(
Source::from_iter([1, 2, 3])
.also_to(Sink::cancelled())
.run_collect()
.unwrap(),
Vec::<i32>::new()
);
assert_eq!(
Source::from_iter([1, 2, 3])
.also_to_all([Sink::cancelled(), Sink::cancelled()])
.run_collect()
.unwrap(),
Vec::<i32>::new()
);
}
#[test]
fn also_to_completes_gracefully_when_side_sink_disconnects() {
let result = Source::from_iter(0..100)
.also_to(Sink::head())
.run_collect()
.unwrap();
assert!(!result.is_empty(), "main should emit at least one element");
assert!(
result.len() < 100,
"main should complete early when side disconnects"
);
}
#[test]
fn also_to_propagates_original_error_when_side_is_disconnected() {
let err = StreamError::Failed("distinctive-boom".into());
assert!(matches!(
Source::<i32>::failed(err.clone())
.also_to(Sink::cancelled())
.run_collect(),
Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
));
assert!(matches!(
Source::<i32>::failed(err.clone())
.also_to_all([Sink::cancelled()])
.run_collect(),
Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
));
assert!(matches!(
Source::<i32>::failed(err)
.divert_to(Sink::cancelled(), |_: &i32| true)
.run_collect(),
Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
));
}
#[test]
fn divert_to_routes_matching_elements_to_side_sink() {
let diverted = Source::from_iter([1, 2, 3, 4])
.divert_to(Sink::ignore(), |item| item % 2 == 0)
.run_collect()
.unwrap();
assert_eq!(diverted, vec![1, 3]);
}
#[test]
fn wire_tap_drops_when_side_sink_backpressures() {
let tapped = Source::from_iter([1, 2, 3])
.wire_tap(Sink::head())
.run_collect()
.unwrap();
assert_eq!(tapped, vec![1, 2, 3]);
let tapped_via_flow = Source::from_iter([1, 2, 3])
.via(Flow::identity().wire_tap(Sink::head()))
.run_collect()
.unwrap();
assert_eq!(tapped_via_flow, vec![1, 2, 3]);
}
#[test]
fn async_mapping_variants_complete() {
let ordered = Source::from_iter(0..4)
.map_async(2, |item| async move { Ok(item * 2) })
.run_collect()
.unwrap();
assert_eq!(ordered, vec![0, 2, 4, 6]);
let unordered = Source::from_iter(0..4)
.map_async_unordered(2, |item| async move { Ok(item * 2) })
.run_collect()
.unwrap();
assert_eq!(unordered, vec![0, 2, 4, 6]);
let partitioned = Source::from_iter(0..4)
.map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
.run_collect()
.unwrap();
assert_eq!(partitioned, vec![1, 2, 3, 4]);
}
#[test]
fn map_async_ordered_bounds_pulls_behind_stuck_head() {
let pulls = StdArc::new(StdAtomicUsize::new(0));
let pulls_for_source = StdArc::clone(&pulls);
let probe = Source::from_fn_iter(move || {
let pulls = StdArc::clone(&pulls_for_source);
std::iter::from_fn(move || {
let next = pulls.fetch_add(1, StdOrdering::SeqCst);
Some(next)
})
})
.map_async(2, |item| async move {
if item == 0 {
tokio::time::sleep(StdDuration::from_millis(300)).await;
}
Ok(item)
})
.run_with(TestSink::probe())
.unwrap();
probe.request(16);
thread::sleep(StdDuration::from_millis(100));
assert!(
pulls.load(StdOrdering::SeqCst) <= 3,
"pulled {} elements with parallelism=2 behind a stuck ordered head",
pulls.load(StdOrdering::SeqCst)
);
}
#[test]
fn async_mapping_parks_until_woken_future_completes() {
struct WakeOnceFuture {
value: Option<u64>,
ready: StdArc<StdAtomicBool>,
started: bool,
polls: StdArc<StdAtomicUsize>,
latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
}
impl std::future::Future for WakeOnceFuture {
type Output = StreamResult<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
this.polls.fetch_add(1, StdOrdering::SeqCst);
*this.latest_waker.lock().expect("latest wake slot mutex") =
Some(cx.waker().clone());
if this.ready.load(StdOrdering::SeqCst) {
return Poll::Ready(Ok(this.value.take().unwrap()));
}
if !this.started {
this.started = true;
let ready = StdArc::clone(&this.ready);
let latest_waker = StdArc::clone(&this.latest_waker);
thread::spawn(move || {
thread::sleep(Duration::from_millis(20));
ready.store(true, StdOrdering::SeqCst);
if let Some(waker) =
latest_waker.lock().expect("latest wake slot mutex").take()
{
waker.wake();
}
});
}
Poll::Pending
}
}
let polls = StdArc::new(StdAtomicUsize::new(0));
let polls_for_stage = StdArc::clone(&polls);
let start = Instant::now();
let values = Source::single(41)
.map_async(1, move |item| WakeOnceFuture {
value: Some(item + 1),
ready: StdArc::new(StdAtomicBool::new(false)),
started: false,
polls: StdArc::clone(&polls_for_stage),
latest_waker: StdArc::new(Mutex::new(None)),
})
.run_collect()
.unwrap();
assert_eq!(values, vec![42]);
let elapsed = start.elapsed();
assert!(
elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
"pending future should park until woken once, elapsed={elapsed:?}"
);
assert!(
polls.load(StdOrdering::SeqCst) < 4096,
"pending future was repolled too aggressively"
);
}
#[test]
fn async_mapping_emits_before_unbounded_upstream_finishes() {
let ordered = Source::repeat(1)
.map_async(2, |item| async move { Ok(item + 1) })
.take(1)
.run_collect()
.unwrap();
assert_eq!(ordered, vec![2]);
let unordered = Source::repeat(1)
.map_async_unordered(2, |item| async move { Ok(item + 1) })
.take(1)
.run_collect()
.unwrap();
assert_eq!(unordered, vec![2]);
let partitioned = Source::repeat(1)
.map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
.take(1)
.run_collect()
.unwrap();
assert_eq!(partitioned, vec![2]);
}
#[test]
fn partitioned_async_mapping_limits_same_key_concurrency() {
let active = StdArc::new(StdAtomicUsize::new(0));
let max_active = StdArc::new(StdAtomicUsize::new(0));
let active_for_stage = StdArc::clone(&active);
let max_for_stage = StdArc::clone(&max_active);
let values = Source::from_iter(0..6)
.map_async_partitioned(
4,
1,
|_| 0_u8,
move |item| {
let active = StdArc::clone(&active_for_stage);
let max_active = StdArc::clone(&max_for_stage);
let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
max_active.fetch_max(current, StdOrdering::SeqCst);
async move {
thread::sleep(Duration::from_millis(1));
active.fetch_sub(1, StdOrdering::SeqCst);
Ok(item)
}
},
)
.run_collect()
.unwrap();
assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
}
#[test]
fn partitioned_async_mapping_scans_past_blocked_pending_key() {
let active = StdArc::new(StdAtomicUsize::new(0));
let max_active = StdArc::new(StdAtomicUsize::new(0));
let active_for_stage = StdArc::clone(&active);
let max_for_stage = StdArc::clone(&max_active);
let (release_tx, release_rx) = oneshot::channel::<()>();
let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
let release_rx_for_stage = StdArc::clone(&release_rx);
let max_for_release = StdArc::clone(&max_active);
let releaser = thread::spawn(move || {
let deadline = Instant::now() + StdDuration::from_secs(1);
while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
thread::yield_now();
}
let _ = release_tx.send(());
});
let values = Source::from_iter([0, 2, 1])
.map_async_partitioned(
2,
1,
|item| item % 2,
move |item| {
let active = StdArc::clone(&active_for_stage);
let max_active = StdArc::clone(&max_for_stage);
let release_rx = StdArc::clone(&release_rx_for_stage);
let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
max_active.fetch_max(current, StdOrdering::SeqCst);
async move {
if item == 0 {
let receiver = release_rx
.lock()
.expect("release receiver mutex")
.take()
.expect("release receiver present");
let _ = receiver.await;
}
active.fetch_sub(1, StdOrdering::SeqCst);
Ok(item)
}
},
)
.run_collect()
.unwrap();
releaser.join().unwrap();
assert_eq!(values, vec![0, 2, 1]);
assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
}
#[test]
fn partitioned_async_mapping_p1_still_evaluates_partition() {
let partitions = StdArc::new(StdAtomicUsize::new(0));
let partitions_for_stage = StdArc::clone(&partitions);
let values = Source::from_iter(0..8)
.map_async_partitioned(
1,
1,
move |item| {
partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
item % 2
},
|item| async move { Ok(item + 1) },
)
.run_collect()
.unwrap();
assert_eq!(values, (1..9).collect::<Vec<_>>());
assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
}
#[test]
fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
let active_by_key =
StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
let active_for_stage = StdArc::clone(&active_by_key);
let max_for_stage = StdArc::clone(&max_by_key);
let values = Source::from_iter(0..512_usize)
.map_async_partitioned(
32,
1,
|item| item % 16,
move |item| {
let active = StdArc::clone(&active_for_stage);
let max_active = StdArc::clone(&max_for_stage);
let key = item % 16;
let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
max_active[key].fetch_max(current, StdOrdering::SeqCst);
async move {
active[key].fetch_sub(1, StdOrdering::SeqCst);
Ok(item)
}
},
)
.run_collect()
.unwrap();
assert_eq!(values, (0..512).collect::<Vec<_>>());
for max_active in max_by_key.iter() {
assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
}
}
#[test]
fn error_operators_map_recover_and_complete() {
let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
.map_error(|_| StreamError::Failed("mapped".into()))
.run_collect();
assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
.recover(|error| match error {
StreamError::Failed(_) => Some(42),
_ => None,
})
.run_collect()
.unwrap();
assert_eq!(recovered, vec![42]);
let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
.recover(|_| None)
.run_collect();
assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
.recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
.run_collect()
.unwrap();
assert_eq!(recovered_with, vec![1, 2]);
let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
.recover_with_retries(1, |_| None)
.run_collect();
assert_eq!(
declined_recover_with,
Err(StreamError::Failed("declined".into()))
);
let completed = Source::from_factory(|| {
Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
})
.on_error_complete()
.run_collect()
.unwrap();
assert_eq!(completed, vec![1]);
}
#[test]
fn sliding_matches_akka_window_semantics() {
assert_eq!(
Source::from_iter(1..=4)
.sliding(3, 1)
.run_collect()
.unwrap(),
vec![vec![1, 2, 3], vec![2, 3, 4]]
);
assert_eq!(
Source::from_iter(1..=4)
.sliding(2, 1)
.run_collect()
.unwrap(),
vec![vec![1, 2], vec![2, 3], vec![3, 4]]
);
assert_eq!(
Source::from_iter(1..=3)
.sliding(3, 1)
.run_collect()
.unwrap(),
vec![vec![1, 2, 3]]
);
assert_eq!(
Source::from_iter(1..=2)
.sliding(3, 1)
.run_collect()
.unwrap(),
vec![vec![1, 2]]
);
assert_eq!(
Source::from_iter(1..=3)
.sliding(1, 1)
.run_collect()
.unwrap(),
vec![vec![1], vec![2], vec![3]]
);
assert_eq!(
Source::from_iter(1..=6)
.sliding(2, 3)
.run_collect()
.unwrap(),
vec![vec![1, 2], vec![4, 5]]
);
assert_eq!(
Source::from_iter(1..=3)
.sliding(2, 4)
.run_collect()
.unwrap(),
vec![vec![1, 2]]
);
}
#[test]
fn recover_with_retries_indefinitely_like_akka() {
let attempts = StdArc::new(StdAtomicUsize::new(0));
let attempts_in_stage = StdArc::clone(&attempts);
let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
.recover_with(move |_error| {
if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
Some(Source::<i32>::failed(StreamError::Failed("again".into())))
} else {
Some(Source::from_iter([42]))
}
})
.run_collect()
.unwrap();
assert_eq!(recovered, vec![42]);
assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
}
#[test]
fn many_concurrent_streams_do_not_starve_the_pool() {
let materializer = Materializer::new();
let busy = 6_usize;
let mut held = Vec::with_capacity(busy);
for _ in 0..busy {
held.push(
Source::single(1_u64)
.run_with_materializer(Sink::never(), &materializer)
.unwrap(),
);
}
for _ in 0..400 {
if materializer.active_streams() >= busy {
break;
}
thread::sleep(Duration::from_millis(5));
}
assert_eq!(materializer.active_streams(), busy);
let sum = Source::from_iter(0_u64..5)
.run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
.unwrap();
assert_eq!(sum.wait().unwrap(), 10);
materializer.shutdown();
for completion in held {
assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
}
}
}