Skip to main content

datum/stream/
mod.rs

1//! The linear Source/Flow/Sink DSL and the runtime that materializes it — Datum's
2//! primary public surface, mirroring Akka/Pekko Streams Typed.
3//!
4//! This module root owns the shared vocabulary: the `BoxStream`/`PureTransform`/
5//! `RuntimeTransform` type aliases, the `SourceHints`/`FlowHints` optimization-hint
6//! system, the fan-in stream combinators (`merge_*`/`zip_*`), the spin-then-park
7//! constants, and the re-export block. The public types live in the submodules
8//! (`source`, `flow`, `sink`, `runtime`, `rate`, `time`, `restart`, `error`,
9//! `completion`). See this directory's `AGENTS.md` for the map and the crate
10//! `CLAUDE.md` for the blueprint-vs-materialization rule and execution model.
11
12use std::{
13    collections::{BTreeMap, HashMap, VecDeque},
14    fmt,
15    future::Future,
16    hash::Hash,
17    marker::PhantomData,
18    panic::{AssertUnwindSafe, catch_unwind},
19    pin::Pin,
20    sync::{
21        Arc, Condvar, Mutex, OnceLock,
22        atomic::{AtomicBool, AtomicUsize, Ordering},
23    },
24    task::{Context, Poll},
25    thread,
26    time::Duration,
27};
28
29use futures::{channel::oneshot, executor::block_on};
30use thiserror::Error;
31use tokio::{
32    runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
33    task::{JoinError, JoinHandle},
34};
35
36pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
37pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
38pub(crate) type RuntimeTransform<In, Out> =
39    Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
40type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
41type HintedSinkRunner<In, Mat> =
42    dyn Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat> + Send + Sync;
43type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
44const STREAM_READY_SPINS: usize = 256;
45/// Spin-loop hints between consecutive drain-thread readiness checks. Back off
46/// a small block of hints between polls so the spin window still covers the
47/// common fast-completion case without burning a full busy loop before
48/// parking.
49const STREAM_SPIN_BACKOFF: usize = 8;
50const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
51
52/// Private metadata for known finite, synchronous micro-sources.
53/// Set only on Datum-owned constructors whose `next()` is guaranteed
54/// memory-backed and whose total success item count is known at blueprint time.
55#[derive(Clone, Copy, Debug, PartialEq, Eq)]
56struct InlineMicroSourceHint {
57    max_success_items: usize,
58}
59
60#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
61struct SourceHints {
62    inline_head_terminal: bool,
63    /// Set only for constructors where Datum owns the full iterator and can
64    /// prove it is finite, synchronous, and memory-backed.
65    inline_micro: Option<InlineMicroSourceHint>,
66    /// Set only when terminal `fold`/`collect`/`ignore` may amortize the
67    /// checked-stream cancellation/shutdown wrapper. Unknown, probe, queue, IO,
68    /// and timer-backed sources keep the per-element checked path.
69    terminal_consumer_batch: bool,
70}
71
72#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
73pub(crate) struct SourceRuntimeHints {
74    pub(crate) inline_micro_max_success_items: Option<usize>,
75    pub(crate) terminal_consumer_batch: bool,
76}
77
78impl SourceHints {
79    const fn with_inline_micro(max_success_items: usize) -> Self {
80        Self {
81            inline_head_terminal: true,
82            inline_micro: Some(InlineMicroSourceHint { max_success_items }),
83            terminal_consumer_batch: true,
84        }
85    }
86
87    const fn with_terminal_consumer_batch() -> Self {
88        Self {
89            inline_head_terminal: false,
90            inline_micro: None,
91            terminal_consumer_batch: true,
92        }
93    }
94
95    fn after_flow(self, flow: FlowHints) -> Self {
96        if flow.preserves_inline_head_terminal {
97            // inline_micro is intentionally not propagated through any flow:
98            // even a trivial map wraps the iterator, making eligibility uncertain.
99            Self {
100                inline_head_terminal: true,
101                inline_micro: None,
102                terminal_consumer_batch: self.terminal_consumer_batch
103                    && flow.preserves_terminal_consumer_batch,
104            }
105        } else {
106            Self {
107                inline_head_terminal: false,
108                inline_micro: None,
109                terminal_consumer_batch: self.terminal_consumer_batch
110                    && flow.preserves_terminal_consumer_batch,
111            }
112        }
113    }
114
115    fn without_inline_micro(self) -> Self {
116        Self {
117            inline_head_terminal: self.inline_head_terminal,
118            inline_micro: None,
119            terminal_consumer_batch: self.terminal_consumer_batch,
120        }
121    }
122
123    fn runtime(self) -> SourceRuntimeHints {
124        SourceRuntimeHints {
125            inline_micro_max_success_items: self.inline_micro.map(|hint| hint.max_success_items),
126            terminal_consumer_batch: self.terminal_consumer_batch,
127        }
128    }
129}
130
131#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
132struct FlowHints {
133    preserves_inline_head_terminal: bool,
134    preserves_terminal_consumer_batch: bool,
135}
136
137impl FlowHints {
138    const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
139        preserves_inline_head_terminal: true,
140        preserves_terminal_consumer_batch: true,
141    };
142
143    const PRESERVES_TERMINAL_CONSUMER_BATCH: Self = Self {
144        preserves_inline_head_terminal: false,
145        preserves_terminal_consumer_batch: true,
146    };
147
148    fn then(self, next: Self) -> Self {
149        Self {
150            preserves_inline_head_terminal: self.preserves_inline_head_terminal
151                && next.preserves_inline_head_terminal,
152            preserves_terminal_consumer_batch: self.preserves_terminal_consumer_batch
153                && next.preserves_terminal_consumer_batch,
154        }
155    }
156}
157
158struct PartitionSlot<Key, Out> {
159    key: Option<Key>,
160    active: usize,
161    queued: VecDeque<(usize, Out)>,
162    in_ready_queue: bool,
163}
164
165struct AbortOnDropHandle<T> {
166    handle: JoinHandle<T>,
167}
168
169impl<T> AbortOnDropHandle<T> {
170    fn new(handle: JoinHandle<T>) -> Self {
171        Self { handle }
172    }
173}
174
175impl<T> Drop for AbortOnDropHandle<T> {
176    fn drop(&mut self) {
177        self.handle.abort();
178    }
179}
180
181impl<T> Future for AbortOnDropHandle<T> {
182    type Output = Result<T, JoinError>;
183
184    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
185        Pin::new(&mut self.handle).poll(cx)
186    }
187}
188
189impl<T> Unpin for AbortOnDropHandle<T> {}
190
191pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
192    static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
193    RUNTIME.get_or_init(|| {
194        TokioRuntimeBuilder::new_multi_thread()
195            .enable_all()
196            .thread_name("datum-stream-tokio")
197            .build()
198            .expect("stream tokio runtime")
199    })
200}
201
202fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
203where
204    Fut: Future<Output = T> + Send + 'static,
205    T: Send + 'static,
206{
207    AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
208}
209
210pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
211    runtime::current_stream_cancelled()
212}
213
214pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
215where
216    F: FnOnce() -> T,
217{
218    catch_unwind(AssertUnwindSafe(f))
219        .map_err(|_| StreamError::Failed(format!("{context} panicked")))
220}
221
222impl<Key, Out> PartitionSlot<Key, Out> {
223    fn new(key: Key) -> Self {
224        Self {
225            key: Some(key),
226            active: 0,
227            queued: VecDeque::new(),
228            in_ready_queue: false,
229        }
230    }
231}
232
233#[inline(always)]
234fn partition_slot_for<Key, Out>(
235    key: Key,
236    slots_by_key: &mut HashMap<Key, usize>,
237    slots: &mut Vec<PartitionSlot<Key, Out>>,
238    free_slots: &mut Vec<usize>,
239) -> usize
240where
241    Key: Clone + Eq + Hash,
242{
243    if let Some(slot) = slots_by_key.get(&key) {
244        return *slot;
245    }
246
247    let slot = if let Some(slot) = free_slots.pop() {
248        let state = &mut slots[slot];
249        state.key = Some(key.clone());
250        state.active = 0;
251        state.queued.clear();
252        state.in_ready_queue = false;
253        slot
254    } else {
255        slots.push(PartitionSlot::new(key.clone()));
256        slots.len() - 1
257    };
258    slots_by_key.insert(key, slot);
259    slot
260}
261
262#[inline(always)]
263fn retire_partition_slot<Key, Out>(
264    slot: usize,
265    slots_by_key: &mut HashMap<Key, usize>,
266    slots: &mut [PartitionSlot<Key, Out>],
267    free_slots: &mut Vec<usize>,
268) where
269    Key: Eq + Hash,
270{
271    let state = &mut slots[slot];
272    if let Some(key) = state.key.take() {
273        slots_by_key.remove(&key);
274    }
275    state.active = 0;
276    state.queued.clear();
277    state.in_ready_queue = false;
278    free_slots.push(slot);
279}
280
281#[inline(always)]
282fn ready_partition_slot<Key, Out>(
283    slots: &mut [PartitionSlot<Key, Out>],
284    ready_slots: &mut VecDeque<usize>,
285    slot: usize,
286    per_partition: usize,
287) {
288    if let Some(state) = slots.get_mut(slot)
289        && state.key.is_some()
290        && !state.in_ready_queue
291        && state.active < per_partition
292        && !state.queued.is_empty()
293    {
294        state.in_ready_queue = true;
295        ready_slots.push_back(slot);
296    }
297}
298
299#[inline(always)]
300fn pop_ready_partition_slot<Key, Out>(
301    slots: &mut [PartitionSlot<Key, Out>],
302    ready_slots: &mut VecDeque<usize>,
303    per_partition: usize,
304) -> Option<(usize, usize, Out)> {
305    while let Some(slot) = ready_slots.pop_front() {
306        let mut requeue = false;
307        let item = if let Some(state) = slots.get_mut(slot) {
308            state.in_ready_queue = false;
309            if state.key.is_some() && state.active < per_partition {
310                let item = state.queued.pop_front().map(|(index, item)| {
311                    state.active += 1;
312                    (index, slot, item)
313                });
314                if !state.queued.is_empty() && state.active < per_partition {
315                    state.in_ready_queue = true;
316                    requeue = true;
317                }
318                item
319            } else {
320                None
321            }
322        } else {
323            None
324        };
325
326        if requeue {
327            ready_slots.push_back(slot);
328        }
329        if item.is_some() {
330            return item;
331        }
332    }
333    None
334}
335
336pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
337    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
338}
339
340struct FnSourceFactory<F>(F);
341
342impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
343where
344    F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
345{
346    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
347        (self.0)(materializer)
348    }
349}
350
351struct MapSourceFactory<In, Out, Mat, F> {
352    source: Arc<dyn SourceFactory<In, Mat>>,
353    stage: F,
354    _marker: PhantomData<fn(In) -> Out>,
355}
356
357impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
358where
359    In: Send + 'static,
360    Out: Send + 'static,
361    Mat: Send + 'static,
362    F: Fn(In) -> Out + Send + Sync + 'static,
363{
364    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
365        let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
366        Ok((
367            Box::new(MapSourceStream {
368                input: stream,
369                factory: self,
370            }),
371            mat,
372        ))
373    }
374}
375
376struct MapSourceStream<In, Out, Mat, F> {
377    input: BoxStream<In>,
378    factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
379}
380
381impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
382where
383    F: Fn(In) -> Out,
384{
385    type Item = StreamResult<Out>;
386
387    fn next(&mut self) -> Option<Self::Item> {
388        self.input
389            .next()
390            .map(|item| item.map(|item| (self.factory.stage)(item)))
391    }
392}
393
394fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
395where
396    Out: Send + 'static,
397{
398    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
399    let mut current = 0usize;
400    Box::new(std::iter::from_fn(move || {
401        loop {
402            let index = next_active_optional_stream(&streams, current, |_| true)?;
403            current = (index + 1) % streams.len().max(1);
404            let Some(stream) = streams[index].as_mut() else {
405                continue;
406            };
407            match stream.next() {
408                Some(item) => return Some(item),
409                None => {
410                    streams[index] = None;
411                    if eager_complete {
412                        return None;
413                    }
414                }
415            }
416        }
417    }))
418}
419
420fn merge_prioritized_streams<Out>(
421    streams: Vec<BoxStream<Out>>,
422    priorities: Vec<usize>,
423    eager_complete: bool,
424) -> BoxStream<Out>
425where
426    Out: Send + 'static,
427{
428    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
429    let schedule: Vec<usize> = priorities
430        .into_iter()
431        .enumerate()
432        .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
433        .collect();
434    let mut schedule_index = 0usize;
435    Box::new(std::iter::from_fn(move || {
436        loop {
437            if streams.iter().all(Option::is_none) {
438                return None;
439            }
440            let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
441            let Some(stream) = streams[index].as_mut() else {
442                continue;
443            };
444            match stream.next() {
445                Some(item) => return Some(item),
446                None => {
447                    streams[index] = None;
448                    if eager_complete {
449                        return None;
450                    }
451                }
452            }
453        }
454    }))
455}
456
457fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
458where
459    Out: Ord + Send + 'static,
460{
461    let mut left_next: Option<Out> = None;
462    let mut right_next: Option<Out> = None;
463    let mut left_done = false;
464    let mut right_done = false;
465    Box::new(std::iter::from_fn(move || {
466        loop {
467            if left_next.is_none() && !left_done {
468                match left.next() {
469                    Some(Ok(item)) => left_next = Some(item),
470                    Some(Err(error)) => return Some(Err(error)),
471                    None => left_done = true,
472                }
473            }
474            if right_next.is_none() && !right_done {
475                match right.next() {
476                    Some(Ok(item)) => right_next = Some(item),
477                    Some(Err(error)) => return Some(Err(error)),
478                    None => right_done = true,
479                }
480            }
481
482            let next = match (&left_next, &right_next) {
483                (Some(left_item), Some(right_item)) => {
484                    if left_item <= right_item {
485                        left_next.take()
486                    } else {
487                        right_next.take()
488                    }
489                }
490                (Some(_), None) if right_done => left_next.take(),
491                (None, Some(_)) if left_done => right_next.take(),
492                (None, None) if left_done && right_done => return None,
493                _ => continue,
494            };
495            if let Some(item) = next {
496                return Some(Ok(item));
497            }
498        }
499    }))
500}
501
502fn merge_latest_streams<Out>(
503    streams: Vec<BoxStream<Out>>,
504    eager_complete: bool,
505) -> BoxStream<Vec<Out>>
506where
507    Out: Clone + Send + 'static,
508{
509    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
510    let mut latest = vec![None; streams.len()];
511    let mut seen = 0usize;
512    let mut current = 0usize;
513    let mut pending = VecDeque::<Vec<Out>>::new();
514    Box::new(std::iter::from_fn(move || {
515        loop {
516            if let Some(output) = pending.pop_front() {
517                return Some(Ok(output));
518            }
519            if streams.iter().all(Option::is_none) {
520                return None;
521            }
522            let index = next_active_optional_stream(&streams, current, |_| true)?;
523            current = (index + 1) % streams.len().max(1);
524            let Some(stream) = streams[index].as_mut() else {
525                continue;
526            };
527            match stream.next() {
528                Some(Ok(item)) => {
529                    if latest[index].is_none() {
530                        seen += 1;
531                    }
532                    latest[index] = Some(item);
533                    if seen == latest.len() {
534                        pending.push_back(
535                            latest
536                                .iter()
537                                .map(|item| item.clone().expect("merge-latest initialized"))
538                                .collect(),
539                        );
540                    }
541                }
542                Some(Err(error)) => return Some(Err(error)),
543                None => {
544                    streams[index] = None;
545                    if eager_complete {
546                        return None;
547                    }
548                }
549            }
550        }
551    }))
552}
553
554fn zip_streams<Left, Right>(
555    mut left: BoxStream<Left>,
556    mut right: BoxStream<Right>,
557) -> BoxStream<(Left, Right)>
558where
559    Left: Send + 'static,
560    Right: Send + 'static,
561{
562    let mut left_next: Option<Left> = None;
563    let mut right_next: Option<Right> = None;
564    let mut left_done = false;
565    let mut right_done = false;
566    Box::new(std::iter::from_fn(move || {
567        loop {
568            if left_next.is_none() && !left_done {
569                match left.next() {
570                    Some(Ok(item)) => left_next = Some(item),
571                    Some(Err(error)) => return Some(Err(error)),
572                    None => left_done = true,
573                }
574            }
575            if right_next.is_none() && !right_done {
576                match right.next() {
577                    Some(Ok(item)) => right_next = Some(item),
578                    Some(Err(error)) => return Some(Err(error)),
579                    None => right_done = true,
580                }
581            }
582            match (left_next.take(), right_next.take()) {
583                (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
584                (left_item, right_item) => {
585                    left_next = left_item;
586                    right_next = right_item;
587                    if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
588                        return None;
589                    }
590                }
591            }
592        }
593    }))
594}
595
596fn zip_latest_with_stream<Left, Right, Out, F>(
597    mut left: BoxStream<Left>,
598    mut right: BoxStream<Right>,
599    eager_complete: bool,
600    combine: Arc<F>,
601) -> BoxStream<Out>
602where
603    Left: Clone + Send + 'static,
604    Right: Clone + Send + 'static,
605    Out: Send + 'static,
606    F: Fn(Left, Right) -> Out + Send + Sync + 'static,
607{
608    let mut left_latest: Option<Left> = None;
609    let mut right_latest: Option<Right> = None;
610    let mut left_done = false;
611    let mut right_done = false;
612    let mut turn_left = true;
613    let mut pending = VecDeque::<Out>::new();
614
615    Box::new(std::iter::from_fn(move || {
616        loop {
617            if let Some(output) = pending.pop_front() {
618                return Some(Ok(output));
619            }
620            if eager_complete && (left_done || right_done) {
621                return None;
622            }
623            if left_done && right_done {
624                return None;
625            }
626            // A side that completed without ever emitting can never be paired:
627            // no further output is possible, so draining the other (possibly
628            // infinite) side would loop forever producing nothing.
629            if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
630                return None;
631            }
632
633            let pull_left = if left_done {
634                false
635            } else if right_done {
636                true
637            } else {
638                let value = turn_left;
639                turn_left = !turn_left;
640                value
641            };
642
643            if pull_left {
644                match left.next() {
645                    Some(Ok(item)) => {
646                        left_latest = Some(item);
647                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
648                            pending.push_back(combine(left_item.clone(), right_item.clone()));
649                        }
650                    }
651                    Some(Err(error)) => return Some(Err(error)),
652                    None => {
653                        left_done = true;
654                        if eager_complete {
655                            return None;
656                        }
657                    }
658                }
659            } else {
660                match right.next() {
661                    Some(Ok(item)) => {
662                        right_latest = Some(item);
663                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
664                            pending.push_back(combine(left_item.clone(), right_item.clone()));
665                        }
666                    }
667                    Some(Err(error)) => return Some(Err(error)),
668                    None => {
669                        right_done = true;
670                        if eager_complete {
671                            return None;
672                        }
673                    }
674                }
675            }
676        }
677    }))
678}
679
680fn zip_all_stream<Left, Right>(
681    mut left: BoxStream<Left>,
682    mut right: BoxStream<Right>,
683    left_fill: Left,
684    right_fill: Right,
685) -> BoxStream<(Left, Right)>
686where
687    Left: Clone + Send + 'static,
688    Right: Clone + Send + 'static,
689{
690    let mut left_done = false;
691    let mut right_done = false;
692    Box::new(std::iter::from_fn(move || {
693        if left_done && right_done {
694            return None;
695        }
696
697        let left_item = if left_done {
698            None
699        } else {
700            match left.next() {
701                Some(Ok(item)) => Some(item),
702                Some(Err(error)) => return Some(Err(error)),
703                None => {
704                    left_done = true;
705                    None
706                }
707            }
708        };
709        let right_item = if right_done {
710            None
711        } else {
712            match right.next() {
713                Some(Ok(item)) => Some(item),
714                Some(Err(error)) => return Some(Err(error)),
715                None => {
716                    right_done = true;
717                    None
718                }
719            }
720        };
721
722        match (left_item, right_item) {
723            (None, None) if left_done && right_done => None,
724            (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
725            (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
726            (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
727            (None, None) => None,
728        }
729    }))
730}
731
732fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
733where
734    Out: Send + 'static,
735    Next: Send + 'static,
736    F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
737{
738    let count = streams.len();
739    if count == 0 {
740        return Box::new(std::iter::empty());
741    }
742    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
743    let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
744    let mut current = 0usize;
745    Box::new(std::iter::from_fn(move || {
746        loop {
747            if slots.iter().all(Option::is_some) {
748                let values = slots
749                    .iter_mut()
750                    .map(|slot| slot.take().expect("zip-n slot filled"))
751                    .collect();
752                return Some(Ok(zipper(values)));
753            }
754
755            let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
756            current = (index + 1) % count.max(1);
757            let Some(stream) = streams[index].as_mut() else {
758                continue;
759            };
760            match stream.next() {
761                Some(Ok(item)) => slots[index] = Some(item),
762                Some(Err(error)) => return Some(Err(error)),
763                None => {
764                    streams[index] = None;
765                    slots[index].as_ref()?;
766                }
767            }
768        }
769    }))
770}
771
772fn next_active_optional_stream<T, F>(
773    streams: &[Option<BoxStream<T>>],
774    current: usize,
775    predicate: F,
776) -> Option<usize>
777where
778    T: Send + 'static,
779    F: Fn(usize) -> bool,
780{
781    if streams.is_empty() {
782        return None;
783    }
784    for offset in 0..streams.len() {
785        let index = (current + offset) % streams.len();
786        if streams[index].is_some() && predicate(index) {
787            return Some(index);
788        }
789    }
790    None
791}
792
793fn next_weighted_stream<T>(
794    streams: &[Option<BoxStream<T>>],
795    schedule: &[usize],
796    schedule_index: &mut usize,
797) -> Option<usize>
798where
799    T: Send + 'static,
800{
801    if streams.is_empty() || schedule.is_empty() {
802        return None;
803    }
804    for _ in 0..schedule.len() {
805        let index = schedule[*schedule_index % schedule.len()];
806        *schedule_index = (*schedule_index + 1) % schedule.len();
807        if streams.get(index).is_some_and(Option::is_some) {
808            return Some(index);
809        }
810    }
811    None
812}
813
814pub(crate) mod async_boundary;
815mod completion;
816mod error;
817mod flow;
818mod rate;
819mod restart;
820mod runtime;
821mod sink;
822mod source;
823mod time;
824mod timer;
825
826/// Opaque hook on `Source<T>` for split-segment fast-path sources.
827/// Non-`None` only on `Source`s returned by the split fast path.
828/// Cleared by all composition operators (via/map/to_mat etc.).
829pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
830    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
831}
832
833/// Opaque hook on `Source<T>` for a private source-to-terminal fast path.
834///
835/// Implementations synchronously fill bounded batches on the sink worker
836/// spawned by the terminal sink descriptor. Sink descriptors process the batch
837/// after the hook returns, so user closures do not run under source locks.
838pub(crate) trait TerminalSourceHookDyn<In>: Send + Sync + 'static {
839    fn drain_terminal_batch(
840        &self,
841        materializer: &Materializer,
842        cancelled: &Arc<AtomicBool>,
843        batch: &mut Vec<In>,
844    ) -> StreamResult<TerminalSourceStatus>;
845
846    fn supports_direct_terminal(&self) -> bool {
847        false
848    }
849
850    fn try_register_direct_terminal(
851        &self,
852        _consumer: Box<dyn TerminalSinkConsumerDyn<In>>,
853        _cancelled: Arc<AtomicBool>,
854    ) -> Option<StreamResult<()>> {
855        None
856    }
857
858    fn cancel_terminal(&self) {}
859}
860
861#[derive(Clone, Copy, Debug, PartialEq, Eq)]
862pub(crate) enum TerminalSourceStatus {
863    Active,
864    Completed,
865}
866
867pub(crate) trait TerminalSinkConsumerDyn<In>: Send + 'static {
868    fn on_item(&mut self, item: In) -> StreamResult<()>;
869    fn finish(self: Box<Self>, result: StreamResult<()>);
870}
871
872/// Fast-path descriptor for fold/collect/ignore sinks in split-segment context.
873/// Non-`None` only for recognised sinks (`Sink::fold`, `fold_result`, `collect`, `ignore`).
874pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
875    /// Try to register directly with a split segment hook.
876    /// Returns `None` if the hook type is not recognised.
877    /// Returns `Some(Ok(mat))` on success where `mat` is `Box<StreamCompletion<Acc>>`.
878    fn try_register(
879        &self,
880        hook: Arc<dyn SplitSegmentHookDyn>,
881    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
882
883    fn supports_terminal_drain(&self) -> bool {
884        false
885    }
886
887    fn try_register_direct_terminal(
888        &self,
889        _hook: Arc<dyn TerminalSourceHookDyn<In>>,
890        _materializer: &Materializer,
891    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
892        None
893    }
894
895    fn try_register_terminal_drain(
896        &self,
897        _hook: Arc<dyn TerminalSourceHookDyn<In>>,
898        _materializer: &Materializer,
899    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
900        None
901    }
902}
903
904use self::runtime::{runtime_checked_stream, set_current_stream_cancelled};
905
906pub(crate) use self::completion::StreamCancellation;
907
908pub use self::{
909    completion::{Cancellable, StreamCompletion},
910    error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
911    flow::{BidiFlow, Flow},
912    rate::{AggregateTimer, OverflowStrategy},
913    restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
914    runtime::{Materializer, Runtime},
915    sink::{RunnableGraph, Sink, SinkCombineStrategy},
916    source::{
917        Demand, IntoSource, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy,
918    },
919    time::{DelayOverflowStrategy, ThrottleMode},
920};
921
922#[cfg(test)]
923mod tests {
924    use super::*;
925    use crate::Attributes;
926    use crate::testkit::TestSink;
927    use std::fs;
928    use std::sync::{
929        Arc as StdArc,
930        atomic::{
931            AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
932        },
933        mpsc,
934    };
935    use std::time::Duration as StdDuration;
936    use std::time::Instant;
937
938    fn wait<T>(completion: StreamCompletion<T>) -> T {
939        completion.wait().unwrap()
940    }
941
942    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
943        let deadline = Instant::now() + timeout;
944        while Instant::now() < deadline {
945            if condition() {
946                return true;
947            }
948            thread::sleep(Duration::from_millis(2));
949        }
950        condition()
951    }
952
953    fn linux_thread_count(thread_name: &str) -> usize {
954        fs::read_dir("/proc/self/task")
955            .expect("task directory readable")
956            .filter_map(Result::ok)
957            .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
958            .filter(|name| name.trim() == thread_name)
959            .count()
960    }
961
962    #[test]
963    fn source_run_terminal_shortcuts_match_explicit_sinks() {
964        let explicit_fold: StreamResult<StreamCompletion<u64>> =
965            Source::from_iter(1_u64..=4).run_with(Sink::fold(0_u64, |acc, item| acc + item));
966        let sugared_fold: StreamResult<StreamCompletion<u64>> =
967            Source::from_iter(1_u64..=4).run_fold(0_u64, |acc, item| acc + item);
968        assert_eq!(wait(explicit_fold.unwrap()), 10);
969        assert_eq!(wait(sugared_fold.unwrap()), 10);
970
971        let explicit_reduce: StreamResult<StreamCompletion<u64>> =
972            Source::from_iter(1_u64..=4).run_with(Sink::reduce(|left: u64, right| left + right));
973        let sugared_reduce: StreamResult<StreamCompletion<u64>> =
974            Source::from_iter(1_u64..=4).run_reduce(|left, right| left + right);
975        assert_eq!(wait(explicit_reduce.unwrap()), 10);
976        assert_eq!(wait(sugared_reduce.unwrap()), 10);
977
978        let explicit_empty_reduce = Source::<u64>::empty()
979            .run_with(Sink::reduce(|left: u64, right| left + right))
980            .unwrap()
981            .wait();
982        let sugared_empty_reduce = Source::<u64>::empty()
983            .run_reduce(|left, right| left + right)
984            .unwrap()
985            .wait();
986        assert_eq!(sugared_empty_reduce, explicit_empty_reduce);
987        assert_eq!(sugared_empty_reduce, Err(StreamError::EmptyStream));
988
989        let explicit_sum = StdArc::new(StdAtomicUsize::new(0));
990        let explicit_sum_sink = StdArc::clone(&explicit_sum);
991        let explicit_foreach: StreamResult<StreamCompletion<NotUsed>> =
992            Source::from_iter(1_usize..=4).run_with(Sink::foreach(move |item| {
993                explicit_sum_sink.fetch_add(item, StdOrdering::SeqCst);
994            }));
995        assert_eq!(wait(explicit_foreach.unwrap()), NotUsed);
996
997        let sugared_sum = StdArc::new(StdAtomicUsize::new(0));
998        let sugared_sum_sink = StdArc::clone(&sugared_sum);
999        let sugared_foreach: StreamResult<StreamCompletion<NotUsed>> =
1000            Source::from_iter(1_usize..=4).run_foreach(move |item| {
1001                sugared_sum_sink.fetch_add(item, StdOrdering::SeqCst);
1002            });
1003        assert_eq!(wait(sugared_foreach.unwrap()), NotUsed);
1004
1005        let alias_sum = StdArc::new(StdAtomicUsize::new(0));
1006        let alias_sum_sink = StdArc::clone(&alias_sum);
1007        let alias_for_each: StreamResult<StreamCompletion<NotUsed>> =
1008            Source::from_iter(1_usize..=4).run_for_each(move |item| {
1009                alias_sum_sink.fetch_add(item, StdOrdering::SeqCst);
1010            });
1011        assert_eq!(wait(alias_for_each.unwrap()), NotUsed);
1012
1013        assert_eq!(explicit_sum.load(StdOrdering::SeqCst), 10);
1014        assert_eq!(sugared_sum.load(StdOrdering::SeqCst), 10);
1015        assert_eq!(alias_sum.load(StdOrdering::SeqCst), 10);
1016    }
1017
1018    #[test]
1019    fn source_constructor_sugar_matches_from_iter() {
1020        let expected = Source::from_iter(1_u64..=4).run_collect().unwrap();
1021
1022        assert_eq!(
1023            Source::from(vec![1_u64, 2, 3, 4]).run_collect().unwrap(),
1024            expected
1025        );
1026        assert_eq!(
1027            Source::from([1_u64, 2, 3, 4]).run_collect().unwrap(),
1028            expected
1029        );
1030        assert_eq!((1_u64..=4).into_source().run_collect().unwrap(), expected);
1031    }
1032
1033    #[test]
1034    fn source_constructor_sugar_keeps_existing_inference_paths() {
1035        let from_vec_into: Source<u64> = vec![1, 2, 3].into();
1036        let from_array_into: Source<u64> = [1, 2, 3].into();
1037        let from_iter = Source::from_iter(1_u64..=3);
1038        let from_iterable = Source::from_iterable(1_u64..=3);
1039        let from_iterator: Source<u64> = (1_u64..=3).collect();
1040        let from_range_into_source: Source<u64> = (1_u64..=3).into_source();
1041
1042        let expected = vec![1, 2, 3];
1043        assert_eq!(from_vec_into.run_collect().unwrap(), expected);
1044        assert_eq!(from_array_into.run_collect().unwrap(), expected);
1045        assert_eq!(from_iter.run_collect().unwrap(), expected);
1046        assert_eq!(from_iterable.run_collect().unwrap(), expected);
1047        assert_eq!(from_iterator.run_collect().unwrap(), expected);
1048        assert_eq!(from_range_into_source.run_collect().unwrap(), expected);
1049    }
1050
1051    #[test]
1052    fn source_async_boundary_preserves_results() {
1053        let expected = Source::from_iter(0_u64..128)
1054            .map(|item| item.wrapping_add(1))
1055            .filter(|item| item % 3 != 0)
1056            .map(|item| item * 2)
1057            .run_collect()
1058            .unwrap();
1059
1060        let actual = Source::from_iter(0_u64..128)
1061            .map(|item| item.wrapping_add(1))
1062            .async_boundary()
1063            .filter(|item| item % 3 != 0)
1064            .map(|item| item * 2)
1065            .run_collect()
1066            .unwrap();
1067
1068        assert_eq!(actual, expected);
1069    }
1070
1071    #[test]
1072    fn flow_async_boundary_preserves_results() {
1073        let expected = Source::from_iter(0_u64..128)
1074            .map(|item| item + 1)
1075            .map(|item| item * 3)
1076            .run_collect()
1077            .unwrap();
1078
1079        let flow = Flow::identity()
1080            .map(|item: u64| item + 1)
1081            .r#async()
1082            .map(|item| item * 3);
1083        let actual = Source::from_iter(0_u64..128)
1084            .via(flow)
1085            .run_collect()
1086            .unwrap();
1087
1088        assert_eq!(actual, expected);
1089    }
1090
1091    #[test]
1092    fn linear_async_boundary_matches_graph_async_boundary_shape() {
1093        use crate::{
1094            AsyncBoundary, AsyncBoundaryExecutionConfig, FusedExecutionConfig, GraphDsl,
1095            GraphFlowShape, MapStage,
1096        };
1097
1098        let graph = GraphDsl::try_create(|builder| {
1099            let first = builder.add(MapStage::new(|item: u64| item + 1));
1100            let boundary = builder.add(AsyncBoundary::<u64>::new());
1101            let second = builder.add(MapStage::new(|item: u64| item * 2));
1102
1103            builder.connect(first.outlet(), boundary.inlet())?;
1104            builder.connect(boundary.outlet(), second.inlet())?;
1105
1106            Ok(GraphFlowShape::new(first.inlet(), second.outlet()))
1107        })
1108        .unwrap();
1109
1110        let linear = Source::from_iter(1_u64..=4)
1111            .map(|item| item + 1)
1112            .async_boundary_with_buffer(4)
1113            .map(|item| item * 2)
1114            .run_collect()
1115            .unwrap();
1116        let graph_output = graph.run_with_input(1_u64..=4).unwrap();
1117        let report = graph
1118            .run_async_boundary_count_with_input_report(
1119                1_u64..=4,
1120                AsyncBoundaryExecutionConfig {
1121                    fused: FusedExecutionConfig { event_limit: 1024 },
1122                    buffer_size: 4,
1123                },
1124            )
1125            .unwrap();
1126
1127        assert_eq!(linear, graph_output);
1128        assert_eq!(report.result, linear.len());
1129        assert_eq!(report.async_boundary_crossings, linear.len());
1130    }
1131
1132    #[test]
1133    fn async_boundary_regions_run_concurrently() {
1134        let (upstream_tx, upstream_rx) = mpsc::channel::<u64>();
1135        let (downstream_blocked_tx, downstream_blocked_rx) = mpsc::channel::<()>();
1136        let (release_tx, release_rx) = mpsc::channel::<()>();
1137        let release_rx = StdArc::new(Mutex::new(release_rx));
1138
1139        let completion = Source::from_iter(0_u64..3)
1140            .map(move |item| {
1141                upstream_tx.send(item).expect("upstream probe receives");
1142                item
1143            })
1144            .async_boundary_with_buffer(1)
1145            .map({
1146                let release_rx = StdArc::clone(&release_rx);
1147                move |item| {
1148                    if item == 0 {
1149                        downstream_blocked_tx
1150                            .send(())
1151                            .expect("downstream probe receives");
1152                        release_rx
1153                            .lock()
1154                            .expect("release receiver lock")
1155                            .recv_timeout(StdDuration::from_secs(2))
1156                            .expect("downstream release arrives");
1157                    }
1158                    item
1159                }
1160            })
1161            .run_with(Sink::collect())
1162            .unwrap();
1163
1164        assert_eq!(
1165            downstream_blocked_rx.recv_timeout(StdDuration::from_secs(2)),
1166            Ok(())
1167        );
1168        assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1169        assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1170
1171        release_tx.send(()).expect("release downstream");
1172        assert_eq!(completion.wait().unwrap(), vec![0, 1, 2]);
1173    }
1174
1175    #[test]
1176    fn async_boundary_backpressures_slow_downstream() {
1177        let (produced_tx, produced_rx) = mpsc::channel::<u64>();
1178        let (release_tx, release_rx) = mpsc::channel::<()>();
1179        let release_rx = StdArc::new(Mutex::new(release_rx));
1180
1181        let completion = Source::from_iter(0_u64..8)
1182            .map(move |item| {
1183                produced_tx.send(item).expect("producer probe receives");
1184                item
1185            })
1186            .async_boundary_with_buffer(1)
1187            .map({
1188                let release_rx = StdArc::clone(&release_rx);
1189                move |item| {
1190                    if item == 0 {
1191                        release_rx
1192                            .lock()
1193                            .expect("release receiver lock")
1194                            .recv_timeout(StdDuration::from_secs(2))
1195                            .expect("downstream release arrives");
1196                    }
1197                    item
1198                }
1199            })
1200            .run_with(Sink::collect())
1201            .unwrap();
1202
1203        assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1204        assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1205        if let Ok(item) = produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1206            assert_eq!(item, 2);
1207        }
1208        match produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1209            Err(mpsc::RecvTimeoutError::Timeout) => {}
1210            other => panic!("async boundary handoff was not bounded: {other:?}"),
1211        }
1212
1213        release_tx.send(()).expect("release downstream");
1214        assert_eq!(completion.wait().unwrap(), (0_u64..8).collect::<Vec<_>>());
1215    }
1216
1217    #[test]
1218    fn source_blueprints_are_reusable() {
1219        let source = Source::from_iter(0..5).map(|item| item + 1);
1220
1221        assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1222        assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1223    }
1224
1225    #[test]
1226    fn source_map_preserves_materialized_value() {
1227        let graph = Source::single(1)
1228            .map_materialized_value(|_| "source")
1229            .map(|item| item + 1)
1230            .to_mat(Sink::head(), Keep::both);
1231
1232        let materialized = graph.run().unwrap();
1233        assert_eq!(materialized.0, "source");
1234        assert_eq!(wait(materialized.1), 2);
1235    }
1236
1237    #[test]
1238    fn source_and_flow_compose() {
1239        let flow = Flow::identity()
1240            .map(|item: i32| item * 2)
1241            .filter(|item| item % 3 == 0);
1242
1243        let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
1244
1245        assert_eq!(result, vec![0, 6, 12]);
1246    }
1247
1248    #[test]
1249    fn sink_setup_sees_materializer_defaults_and_local_attributes() {
1250        let observed = StdArc::new(Mutex::new(None));
1251        let observed_in_setup = StdArc::clone(&observed);
1252        let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
1253            *observed_in_setup.lock().unwrap() = Some((
1254                attrs.name().map(str::to_owned),
1255                attrs.input_buffer_hint(),
1256                attrs.dispatcher_hint().map(str::to_owned),
1257            ));
1258            Sink::ignore()
1259        })
1260        .add_attributes(Attributes::named("sink-inner"))
1261        .add_attributes(Attributes::input_buffer(4, 4))
1262        .add_attributes(Attributes::dispatcher("bench-dispatcher"));
1263
1264        let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
1265        wait(
1266            Source::from_iter([1, 2, 3])
1267                .run_with_materializer(sink, &materializer)
1268                .unwrap(),
1269        );
1270
1271        assert_eq!(
1272            *observed.lock().unwrap(),
1273            Some((
1274                Some("sink-inner".to_owned()),
1275                Some((4, 4)),
1276                Some("bench-dispatcher".to_owned())
1277            ))
1278        );
1279    }
1280
1281    #[test]
1282    fn sink_pre_materialize_feeds_existing_materialization() {
1283        let materializer = Materializer::new();
1284        let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
1285            .pre_materialize(&materializer)
1286            .unwrap();
1287
1288        Source::from_iter([1, 2, 3])
1289            .run_with_materializer(pre, &materializer)
1290            .unwrap();
1291
1292        assert_eq!(wait(completion), vec![1, 2, 3]);
1293    }
1294
1295    #[test]
1296    fn flow_from_sink_and_source_connects_both_sides() {
1297        assert_eq!(
1298            Source::from_iter([1, 2, 3])
1299                .via(Flow::from_sink_and_source(
1300                    Sink::foreach(|_item: i32| {}),
1301                    Source::from_iter([10, 20, 30]),
1302                ))
1303                .run_collect()
1304                .unwrap(),
1305            vec![10, 20, 30]
1306        );
1307    }
1308
1309    #[test]
1310    fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
1311        let completed = StdArc::new(StdAtomicBool::new(false));
1312        let on_complete = StdArc::clone(&completed);
1313        let flow = Flow::from_sink_and_source(
1314            Sink::on_complete(move || {
1315                on_complete.store(true, StdOrdering::SeqCst);
1316            }),
1317            Source::single(10),
1318        );
1319
1320        let result = Source::from_iter([1, 2, 3])
1321            .via(flow)
1322            .run_collect()
1323            .unwrap();
1324
1325        assert_eq!(result, vec![10]);
1326        assert!(wait_until(StdDuration::from_secs(1), || {
1327            completed.load(StdOrdering::SeqCst)
1328        }));
1329    }
1330
1331    #[test]
1332    fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
1333        let cancellable = StdArc::new(Mutex::new(None));
1334        let observed = StdArc::clone(&cancellable);
1335        let flow = Flow::from_sink_and_source_coupled(
1336            Sink::ignore(),
1337            Source::tick(
1338                StdDuration::from_millis(50),
1339                StdDuration::from_millis(50),
1340                10,
1341            )
1342            .map_materialized_value(move |handle| {
1343                *observed.lock().unwrap() = Some(handle.clone());
1344                handle
1345            }),
1346        );
1347
1348        let completion = Source::from_iter(std::iter::empty::<i32>())
1349            .via(flow)
1350            .run_with(Sink::ignore())
1351            .unwrap();
1352        assert!(wait_until(StdDuration::from_secs(1), || {
1353            cancellable
1354                .lock()
1355                .unwrap()
1356                .as_ref()
1357                .is_some_and(Cancellable::is_cancelled)
1358        }));
1359        assert_eq!(wait(completion), NotUsed);
1360    }
1361
1362    #[test]
1363    fn bidi_flow_join_and_atop_compose() {
1364        let codec = BidiFlow::from_flows(
1365            Flow::identity().map(|item: i32| item + 1),
1366            Flow::identity().map(|item: i32| item * 2),
1367        )
1368        .named("codec");
1369        let framing = BidiFlow::from_flows(
1370            Flow::identity().map(|item: i32| item * 3),
1371            Flow::identity().map(|item: i32| item - 4),
1372        );
1373
1374        let joined = codec
1375            .clone()
1376            .join(Flow::identity().map(|item: i32| item - 5));
1377        let stacked = codec.atop(framing).join(Flow::identity());
1378
1379        assert_eq!(
1380            Source::single(10).via(joined).run_collect().unwrap(),
1381            vec![12]
1382        );
1383        assert_eq!(
1384            Source::single(10).via(stacked).run_collect().unwrap(),
1385            vec![58]
1386        );
1387    }
1388
1389    #[test]
1390    fn flow_buffer_then_map_runs_end_to_end() {
1391        let flow = Flow::identity()
1392            .buffer(8, OverflowStrategy::Backpressure)
1393            .map(|item: i32| item + 1);
1394
1395        let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1396
1397        assert_eq!(result, vec![1, 2, 3, 4]);
1398    }
1399
1400    #[test]
1401    fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1402        fn buffered_flow() -> Flow<i32, i32> {
1403            Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1404        }
1405
1406        assert_eq!(
1407            Source::from_iter(0..4)
1408                .via(buffered_flow().filter(|item| *item % 2 == 0))
1409                .run_collect()
1410                .unwrap(),
1411            vec![0, 2]
1412        );
1413        assert_eq!(
1414            Source::from_iter(0..4)
1415                .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1416                .run_collect()
1417                .unwrap(),
1418            vec![1, 3]
1419        );
1420        assert_eq!(
1421            Source::from_iter(0..4)
1422                .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1423                .run_collect()
1424                .unwrap(),
1425            vec![10, 12]
1426        );
1427        assert_eq!(
1428            Source::from_iter(0..3)
1429                .via(buffered_flow().map_concat(|item| [item, item + 10]))
1430                .run_collect()
1431                .unwrap(),
1432            vec![0, 10, 1, 11, 2, 12]
1433        );
1434        assert_eq!(
1435            Source::from_iter(0..3)
1436                .via(buffered_flow().stateful_map(5, |state, item| {
1437                    *state += item;
1438                    *state
1439                }))
1440                .run_collect()
1441                .unwrap(),
1442            vec![5, 6, 8]
1443        );
1444        assert_eq!(
1445            Source::from_iter(0..3)
1446                .via(buffered_flow().stateful_map_concat(0, |state, item| {
1447                    *state += item;
1448                    [*state, item]
1449                }))
1450                .run_collect()
1451                .unwrap(),
1452            vec![0, 0, 1, 1, 3, 2]
1453        );
1454        assert_eq!(
1455            Source::from_iter(0..4)
1456                .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1457                .run_collect()
1458                .unwrap(),
1459            vec![1, 2, 3, 4]
1460        );
1461        assert_eq!(
1462            Source::from_iter(0..4)
1463                .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1464                .run_collect()
1465                .unwrap(),
1466            vec![1, 2, 3, 4]
1467        );
1468        assert_eq!(
1469            Source::from_iter(0..4)
1470                .via(buffered_flow().map_async_partitioned(
1471                    2,
1472                    1,
1473                    |item| item % 2,
1474                    |item| async move { Ok(item + 1) },
1475                ))
1476                .run_collect()
1477                .unwrap(),
1478            vec![1, 2, 3, 4]
1479        );
1480        assert_eq!(
1481            Source::from_iter(0..5)
1482                .via(buffered_flow().take(3))
1483                .run_collect()
1484                .unwrap(),
1485            vec![0, 1, 2]
1486        );
1487        assert_eq!(
1488            Source::from_iter(0..5)
1489                .via(buffered_flow().drop(2))
1490                .run_collect()
1491                .unwrap(),
1492            vec![2, 3, 4]
1493        );
1494        assert_eq!(
1495            Source::from_iter(0..5)
1496                .via(buffered_flow().take_while(|item| *item < 3))
1497                .run_collect()
1498                .unwrap(),
1499            vec![0, 1, 2]
1500        );
1501        assert_eq!(
1502            Source::from_iter(0..5)
1503                .via(buffered_flow().drop_while(|item| *item < 3))
1504                .run_collect()
1505                .unwrap(),
1506            vec![3, 4]
1507        );
1508        assert_eq!(
1509            Source::from_iter(0..3)
1510                .via(buffered_flow().limit(5))
1511                .run_collect()
1512                .unwrap(),
1513            vec![0, 1, 2]
1514        );
1515        assert_eq!(
1516            Source::from_iter(0..5)
1517                .via(buffered_flow().grouped(2))
1518                .run_collect()
1519                .unwrap(),
1520            vec![vec![0, 1], vec![2, 3], vec![4]]
1521        );
1522        assert_eq!(
1523            Source::from_iter(1..=3)
1524                .via(buffered_flow().scan(0, |acc, item| acc + item))
1525                .run_collect()
1526                .unwrap(),
1527            vec![0, 1, 3, 6]
1528        );
1529        assert_eq!(
1530            Source::from_iter(1..=4)
1531                .via(buffered_flow().sliding(2, 1))
1532                .run_collect()
1533                .unwrap(),
1534            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1535        );
1536        assert_eq!(
1537            Source::from_iter(1..=4)
1538                .via(buffered_flow().fold(0, |acc, item| acc + item))
1539                .run_collect()
1540                .unwrap(),
1541            vec![10]
1542        );
1543        assert_eq!(
1544            Source::from_iter(1..=4)
1545                .via(buffered_flow().reduce(|acc, item| acc + item))
1546                .run_collect()
1547                .unwrap(),
1548            vec![10]
1549        );
1550        assert_eq!(
1551            Source::from_factory(|| {
1552                Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1553            })
1554            .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1555            .run_collect(),
1556            Err(StreamError::Failed("mapped".into()))
1557        );
1558        assert_eq!(
1559            Source::<i32>::failed(StreamError::Failed("boom".into()))
1560                .via(buffered_flow().recover(|_| Some(42)))
1561                .run_collect()
1562                .unwrap(),
1563            vec![42]
1564        );
1565        assert_eq!(
1566            Source::<i32>::failed(StreamError::Failed("boom".into()))
1567                .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1568                .run_collect()
1569                .unwrap(),
1570            vec![7, 8]
1571        );
1572        assert_eq!(
1573            Source::<i32>::failed(StreamError::Failed("boom".into()))
1574                .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1575                .run_collect()
1576                .unwrap(),
1577            vec![9]
1578        );
1579        assert_eq!(
1580            Source::from_factory(|| {
1581                Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1582            })
1583            .via(buffered_flow().on_error_complete())
1584            .run_collect()
1585            .unwrap(),
1586            vec![1]
1587        );
1588
1589        let materialized = Source::from_iter([1, 2, 3])
1590            .run_with(
1591                buffered_flow()
1592                    .via(Flow::identity().map(|item| item + 1))
1593                    .map_materialized_value(|_| "buffered-flow")
1594                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1595            )
1596            .unwrap();
1597        assert_eq!(materialized.0, "buffered-flow");
1598        assert_eq!(wait(materialized.1), 9);
1599
1600        let kept = Source::from_iter([1, 2, 3])
1601            .run_with(
1602                buffered_flow()
1603                    .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1604                    .to(Sink::fold(0, |acc, item| acc + item)),
1605            )
1606            .unwrap();
1607        assert_eq!(kept, "combined");
1608    }
1609
1610    #[test]
1611    fn runtime_rate_flows_compose_in_flow_form() {
1612        let conflate = Flow::identity()
1613            .conflate(|left: i32, right| left + right)
1614            .map(|item| item + 1);
1615        assert_eq!(
1616            Source::single(4).via(conflate).run_collect().unwrap(),
1617            vec![5]
1618        );
1619
1620        let batch = Flow::identity()
1621            .batch(4, |item: i32| item, |left, right| left + right)
1622            .map(|item| item + 1);
1623        assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1624
1625        let expand = Flow::identity()
1626            .expand(std::iter::once::<i32>)
1627            .map(|item| item + 1);
1628        assert_eq!(
1629            Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1630            vec![1, 2, 3, 4]
1631        );
1632
1633        let aggregate = Flow::identity()
1634            .aggregate_with_boundary(
1635                Vec::<i32>::new,
1636                |mut items, item| {
1637                    items.push(item);
1638                    let ready = !items.is_empty();
1639                    (items, ready)
1640                },
1641                |items| items.into_iter().sum::<i32>(),
1642                None,
1643            )
1644            .map(|item| item + 1);
1645        assert_eq!(
1646            Source::from_iter(0..4)
1647                .via(aggregate)
1648                .run_collect()
1649                .unwrap(),
1650            vec![1, 2, 3, 4]
1651        );
1652
1653        let detached = Flow::identity().detach().map(|item: i32| item + 1);
1654        assert_eq!(
1655            Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1656            vec![1, 2, 3, 4]
1657        );
1658    }
1659
1660    #[test]
1661    fn high_use_source_flow_operators_work() {
1662        let result = Source::from_iter(0..8)
1663            .drop(1)
1664            .take(5)
1665            .filter_not(|item| item % 2 == 0)
1666            .map_concat(|item| [item, item + 10])
1667            .grouped(3)
1668            .run_collect()
1669            .unwrap();
1670
1671        assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1672    }
1673
1674    #[test]
1675    fn prefix_and_tail_emits_prefix_and_live_tail() {
1676        let mut outer = Source::from_iter(0..5)
1677            .prefix_and_tail(2)
1678            .run_collect()
1679            .unwrap();
1680        assert_eq!(outer.len(), 1);
1681        let (prefix, tail) = outer.pop().unwrap();
1682        assert_eq!(prefix, vec![0, 1]);
1683        assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1684        assert_eq!(
1685            tail.run_collect(),
1686            Err(StreamError::Failed(
1687                "substream source cannot be materialized more than once".into()
1688            ))
1689        );
1690    }
1691
1692    #[test]
1693    fn prefix_and_tail_fails_before_prefix_is_ready() {
1694        let result = Source::from_factory(|| {
1695            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1696        })
1697        .prefix_and_tail(2)
1698        .run_collect();
1699        assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1700    }
1701
1702    #[test]
1703    fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1704        let mut outer = Source::from_factory(|| {
1705            Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1706        })
1707        .prefix_and_tail(2)
1708        .run_collect()
1709        .unwrap();
1710        let (prefix, tail) = outer.pop().unwrap();
1711        assert_eq!(prefix, vec![1, 2]);
1712        assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1713    }
1714
1715    #[test]
1716    fn prefix_and_tail_accepts_non_clone_elements() {
1717        #[derive(Debug, PartialEq, Eq)]
1718        struct NonClone(u8);
1719
1720        let mut outer = Source::from_factory(|| {
1721            Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1722        })
1723        .prefix_and_tail(2)
1724        .run_collect()
1725        .unwrap();
1726        let (prefix, tail) = outer.pop().unwrap();
1727        assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1728        assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1729    }
1730
1731    #[test]
1732    fn flat_map_prefix_materializes_on_short_upstream_completion() {
1733        let values = Source::from_iter([1, 2])
1734            .flat_map_prefix(3, |prefix| {
1735                let sum = prefix.into_iter().sum::<i32>();
1736                Flow::identity().prepend(Source::single(sum))
1737            })
1738            .run_collect()
1739            .unwrap();
1740        assert_eq!(values, vec![3]);
1741    }
1742
1743    #[test]
1744    fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1745        let invoked = StdArc::new(StdAtomicBool::new(false));
1746        let invoked_for_stage = StdArc::clone(&invoked);
1747        let result = Source::from_factory(|| {
1748            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1749        })
1750        .flat_map_prefix(3, move |_prefix| {
1751            invoked_for_stage.store(true, StdOrdering::SeqCst);
1752            Flow::identity()
1753        })
1754        .run_collect();
1755        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1756        assert!(!invoked.load(StdOrdering::SeqCst));
1757    }
1758
1759    #[test]
1760    fn flat_map_concat_flattens_nested_sources_sequentially() {
1761        let values = Source::from_iter([1, 2, 3])
1762            .flat_map_concat(|item| Source::from_iter(0..item))
1763            .run_collect()
1764            .unwrap();
1765        assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1766    }
1767
1768    #[test]
1769    fn flat_map_merge_respects_breadth_bound() {
1770        let active = StdArc::new(StdAtomicUsize::new(0));
1771        let max_active = StdArc::new(StdAtomicUsize::new(0));
1772        let active_for_stage = StdArc::clone(&active);
1773        let max_for_stage = StdArc::clone(&max_active);
1774
1775        let mut values = Source::from_iter(0..6)
1776            .flat_map_merge(2, move |item| {
1777                let active = StdArc::clone(&active_for_stage);
1778                let max_active = StdArc::clone(&max_for_stage);
1779                Source::future(move || {
1780                    let active = StdArc::clone(&active);
1781                    let max_active = StdArc::clone(&max_active);
1782                    async move {
1783                        let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1784                        loop {
1785                            let seen = max_active.load(StdOrdering::SeqCst);
1786                            if now <= seen {
1787                                break;
1788                            }
1789                            if max_active
1790                                .compare_exchange(
1791                                    seen,
1792                                    now,
1793                                    StdOrdering::SeqCst,
1794                                    StdOrdering::SeqCst,
1795                                )
1796                                .is_ok()
1797                            {
1798                                break;
1799                            }
1800                        }
1801                        thread::sleep(StdDuration::from_millis(20));
1802                        active.fetch_sub(1, StdOrdering::SeqCst);
1803                        Ok(item)
1804                    }
1805                })
1806            })
1807            .run_collect()
1808            .unwrap();
1809        values.sort_unstable();
1810        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1811        assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1812    }
1813
1814    #[test]
1815    fn flat_map_merge_propagates_inner_failures() {
1816        let result = Source::from_iter([0, 1, 2])
1817            .flat_map_merge(2, |item| {
1818                if item == 1 {
1819                    Source::failed(StreamError::Failed("boom".into()))
1820                } else {
1821                    Source::single(item)
1822                }
1823            })
1824            .run_collect();
1825        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1826    }
1827
1828    #[test]
1829    fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1830        let (release_tx, release_rx) = mpsc::channel();
1831        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1832        let queue = Source::from_factory(move || {
1833            let release_rx = StdArc::clone(&release_rx);
1834            let mut step = 0_u8;
1835            Box::new(std::iter::from_fn(move || {
1836                let item = match step {
1837                    0 => Some(Ok(0)),
1838                    1 => {
1839                        release_rx
1840                            .lock()
1841                            .unwrap()
1842                            .as_ref()
1843                            .expect("release receiver available")
1844                            .recv_timeout(StdDuration::from_secs(1))
1845                            .expect("timed out waiting to release second upstream element");
1846                        Some(Ok(1))
1847                    }
1848                    _ => None,
1849                };
1850                step += 1;
1851                item
1852            }))
1853        })
1854        .flat_map_merge(2, |item| Source::single(item + 10))
1855        .run_with(Sink::queue())
1856        .unwrap();
1857
1858        assert_eq!(queue.pull().unwrap(), Some(10));
1859        release_tx.send(()).unwrap();
1860        assert_eq!(queue.pull().unwrap(), Some(11));
1861        assert!(queue.pull().unwrap().is_none());
1862    }
1863
1864    #[test]
1865    fn group_by_routes_keys_and_drops_closed_keys() {
1866        let outer = Source::from_iter([0, 1, 2, 3, 4])
1867            .group_by(4, |item| item % 2, false)
1868            .run_with(Sink::queue())
1869            .unwrap();
1870
1871        let even = outer.pull().unwrap().unwrap();
1872        let even_completion = even.run_with(Sink::ignore()).unwrap();
1873        let odd = outer.pull().unwrap().unwrap();
1874        drop(even_completion);
1875
1876        assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1877        assert!(outer.pull().unwrap().is_none());
1878    }
1879
1880    #[test]
1881    fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1882        let outer = Source::from_iter([0, 1, 2])
1883            .group_by(2, |item| *item, false)
1884            .run_with(Sink::queue())
1885            .unwrap();
1886
1887        let _ = outer.pull().unwrap().unwrap();
1888        let _ = outer.pull().unwrap().unwrap();
1889        assert!(matches!(
1890            outer.pull(),
1891            Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1892        ));
1893    }
1894
1895    #[test]
1896    fn group_by_can_recreate_closed_substreams_when_enabled() {
1897        let (release_tx, release_rx) = mpsc::channel();
1898        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1899        let outer = Source::from_factory(move || {
1900            let release_rx = StdArc::clone(&release_rx);
1901            let mut step = 0_u8;
1902            Box::new(std::iter::from_fn(move || {
1903                let item = match step {
1904                    0 => Some(Ok(0)),
1905                    1 => Some(Ok(1)),
1906                    2 => {
1907                        release_rx
1908                            .lock()
1909                            .unwrap()
1910                            .as_ref()
1911                            .expect("release receiver available")
1912                            .recv_timeout(StdDuration::from_secs(1))
1913                            .expect("timed out waiting to release recreated key");
1914                        Some(Ok(0))
1915                    }
1916                    _ => None,
1917                };
1918                step += 1;
1919                item
1920            }))
1921        })
1922        .group_by(4, |item| item % 2, true)
1923        .run_with(Sink::queue())
1924        .unwrap();
1925
1926        let even = outer.pull().unwrap().unwrap();
1927        assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1928        release_tx.send(()).unwrap();
1929
1930        let odd = outer.pull().unwrap().unwrap();
1931        assert_eq!(odd.run_collect().unwrap(), vec![1]);
1932
1933        let recreated_even = outer.pull().unwrap().unwrap();
1934        assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1935        assert!(outer.pull().unwrap().is_none());
1936    }
1937
1938    #[test]
1939    fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1940        let outer = Source::from_iter([0, 1])
1941            .group_by(
1942                4,
1943                |item| {
1944                    assert_ne!(*item, 1, "boom");
1945                    item % 2
1946                },
1947                false,
1948            )
1949            .run_with(Sink::queue())
1950            .unwrap();
1951
1952        let substream = outer.pull().unwrap().unwrap();
1953        let (result_tx, result_rx) = mpsc::channel();
1954        thread::spawn(move || {
1955            let _ = result_tx.send(substream.run_collect());
1956        });
1957
1958        assert_eq!(
1959            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1960            Err(StreamError::AbruptTermination)
1961        );
1962        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1963    }
1964
1965    #[test]
1966    fn split_when_starts_new_substream_on_boundary_element() {
1967        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1968            .split_when(|item| *item == 0)
1969            .run_with(Sink::queue())
1970            .unwrap();
1971
1972        let first = outer.pull().unwrap().unwrap();
1973        assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1974        let second = outer.pull().unwrap().unwrap();
1975        assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1976        let third = outer.pull().unwrap().unwrap();
1977        assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1978        assert!(outer.pull().unwrap().is_none());
1979    }
1980
1981    #[test]
1982    fn split_after_ends_current_substream_on_boundary_element() {
1983        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1984            .split_after(|item| *item == 0)
1985            .run_with(Sink::queue())
1986            .unwrap();
1987
1988        let first = outer.pull().unwrap().unwrap();
1989        assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1990        let second = outer.pull().unwrap().unwrap();
1991        assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1992        let third = outer.pull().unwrap().unwrap();
1993        assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1994        assert!(outer.pull().unwrap().is_none());
1995    }
1996
1997    #[test]
1998    fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1999        let outer = Source::from_iter([1, 2])
2000            .split_when(|item| {
2001                assert_ne!(*item, 2, "boom");
2002                false
2003            })
2004            .run_with(Sink::queue())
2005            .unwrap();
2006
2007        let substream = outer.pull().unwrap().unwrap();
2008        let (result_tx, result_rx) = mpsc::channel();
2009        thread::spawn(move || {
2010            let _ = result_tx.send(substream.run_collect());
2011        });
2012
2013        assert_eq!(
2014            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
2015            Err(StreamError::AbruptTermination)
2016        );
2017        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
2018    }
2019
2020    #[test]
2021    fn split_when_pre_buffer_segments_match_expected_count() {
2022        let outer = Source::from_iter(0..100)
2023            .split_when(|item| *item != 0 && *item % 10 == 0)
2024            .run_with(Sink::queue())
2025            .unwrap();
2026        let mut segment_count = 0;
2027        while let Some(substream) = outer.pull().unwrap() {
2028            let items: Vec<i32> = substream.run_collect().unwrap();
2029            assert!(!items.is_empty(), "segment should not be empty");
2030            segment_count += 1;
2031        }
2032        assert_eq!(segment_count, 10, "100 elements in segments of 10");
2033    }
2034
2035    #[test]
2036    fn split_after_pre_buffer_segments_match_expected_count() {
2037        let outer = Source::from_iter(0..100)
2038            .split_after(|item| (*item + 1) % 10 == 0)
2039            .run_with(Sink::queue())
2040            .unwrap();
2041        let mut segment_count = 0;
2042        let mut total = 0_i32;
2043        while let Some(substream) = outer.pull().unwrap() {
2044            let items: Vec<i32> = substream.run_collect().unwrap();
2045            assert!(!items.is_empty(), "segment should not be empty");
2046            total += items.len() as i32;
2047            segment_count += 1;
2048        }
2049        assert_eq!(segment_count, 10);
2050        assert_eq!(total, 100);
2051    }
2052
2053    #[test]
2054    fn group_by_single_key_fused_matches_general_path() {
2055        let outer = Source::from_iter(0..1000i64)
2056            .group_by(1, |_| 0u8, false)
2057            .run_with(Sink::queue())
2058            .unwrap();
2059        let substream = outer.pull().unwrap().unwrap();
2060        let items: Vec<i64> = substream.run_collect().unwrap();
2061        assert_eq!(items.len(), 1000);
2062        assert_eq!(items[0], 0);
2063        assert_eq!(items[999], 999);
2064        assert!(outer.pull().unwrap().is_none());
2065    }
2066
2067    #[test]
2068    fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
2069        let outer = Source::from_iter([0, 1, 0])
2070            .group_by(2, |item| *item, false)
2071            .run_with(Sink::queue())
2072            .unwrap();
2073        let mut sources = vec![];
2074        while let Some(source) = outer.pull().unwrap() {
2075            sources.push(source);
2076        }
2077        assert_eq!(sources.len(), 2);
2078        assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
2079        assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
2080    }
2081
2082    #[test]
2083    fn flat_map_merge_lock_lighter_matches_expected_count() {
2084        let items = Source::from_iter(0..20)
2085            .flat_map_merge(2, |item| Source::single(item + 100))
2086            .run_with(Sink::queue())
2087            .unwrap();
2088        let mut count = 0;
2089        while items.pull().unwrap().is_some() {
2090            count += 1;
2091        }
2092        assert_eq!(count, 20);
2093    }
2094
2095    // Verify that group_by emits the substream BEFORE upstream completes —
2096    // i.e., the worker does not buffer all elements before delivery.
2097    //
2098    // The test uses a rendezvous channel to synchronize: the group_by worker
2099    // will block waiting for more items (the channel is empty after item 0),
2100    // so if the outer substream is NOT emitted before item 1 arrives, the
2101    // `outer.pull()` call in a separate thread cannot return during that window.
2102    // With the old "single_key_buf" batching, pull() would block until a
2103    // key-change or EOF — which is a liveness bug for infinite or slow sources.
2104    #[test]
2105    fn group_by_single_key_emits_substream_before_upstream_completes() {
2106        // sync_channel(0): rendezvous — sender blocks until receiver is ready,
2107        // guaranteeing the worker processes exactly one item then waits.
2108        let (tx, rx) = mpsc::sync_channel::<i32>(0);
2109        let rx = StdArc::new(std::sync::Mutex::new(rx));
2110
2111        let outer = Source::from_factory({
2112            let rx = StdArc::clone(&rx);
2113            move || {
2114                let rx = StdArc::clone(&rx);
2115                Box::new(std::iter::from_fn(move || {
2116                    rx.lock().unwrap().recv().ok().map(Ok)
2117                })) as BoxStream<i32>
2118            }
2119        })
2120        .group_by(1, |_| 0u8, false)
2121        .run_with(Sink::queue())
2122        .unwrap();
2123
2124        // Kick off the producer: sends item 0, then waits for the outer
2125        // substream to be pulled (via a second channel) before sending more.
2126        let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
2127        let outer_thread = thread::spawn(move || {
2128            let substream = outer.pull().unwrap().expect("expected a substream");
2129            sub_tx.send(substream).unwrap();
2130        });
2131
2132        // Send the first item — the worker processes it and (with the fix)
2133        // immediately emits the substream to outer.
2134        tx.send(0).unwrap();
2135
2136        // The outer pull must complete within the timeout.
2137        let substream = sub_rx
2138            .recv_timeout(StdDuration::from_secs(5))
2139            .expect("timed out — group_by buffered first element before emitting substream");
2140
2141        // Now deliver the remaining items and close.
2142        for i in 1..100_i32 {
2143            tx.send(i).unwrap();
2144        }
2145        drop(tx);
2146
2147        let items: Vec<i32> = substream.run_collect().unwrap();
2148        assert_eq!(items.len(), 100);
2149        outer_thread.join().unwrap();
2150    }
2151
2152    #[test]
2153    fn group_by_concurrent_live_substreams_do_not_hold_ready_item_stress() {
2154        const STREAMS: usize = 32;
2155        const ROUNDS: usize = 8;
2156        const ITEMS: i64 = 8;
2157
2158        for _ in 0..ROUNDS {
2159            let barrier = StdArc::new(std::sync::Barrier::new(STREAMS));
2160            let mut handles = Vec::with_capacity(STREAMS);
2161
2162            for _ in 0..STREAMS {
2163                let barrier = StdArc::clone(&barrier);
2164                handles.push(thread::spawn(move || {
2165                    let (tx, rx) = mpsc::sync_channel::<i64>(0);
2166                    let rx = StdArc::new(std::sync::Mutex::new(rx));
2167
2168                    let outer = Source::from_factory({
2169                        let rx = StdArc::clone(&rx);
2170                        move || {
2171                            let rx = StdArc::clone(&rx);
2172                            Box::new(std::iter::from_fn(move || {
2173                                rx.lock().unwrap().recv().ok().map(Ok)
2174                            })) as BoxStream<i64>
2175                        }
2176                    })
2177                    .group_by(1, |_| 0_u8, false)
2178                    .run_with(Sink::queue())
2179                    .unwrap();
2180
2181                    barrier.wait();
2182
2183                    tx.send(0).unwrap();
2184                    let substream = outer.pull().unwrap().expect("expected group_by substream");
2185                    let subqueue = substream.run_with(Sink::queue()).unwrap();
2186                    assert_eq!(subqueue.pull().unwrap(), Some(0));
2187
2188                    for item in 1..ITEMS {
2189                        tx.send(item).unwrap();
2190                        assert_eq!(subqueue.pull().unwrap(), Some(item));
2191                    }
2192                    drop(tx);
2193
2194                    assert!(subqueue.pull().unwrap().is_none());
2195                    assert!(outer.pull().unwrap().is_none());
2196                }));
2197            }
2198
2199            for handle in handles {
2200                handle.join().expect("group_by stress worker panicked");
2201            }
2202        }
2203    }
2204
2205    // Verify that split_when emits each substream as a live stream, NOT after
2206    // accumulating all segment elements into memory.  The channel is sized to
2207    // be larger than LIVE_SUBSTREAM_CAPACITY (256) so that, if the old Vec
2208    // pre-buffering path were active, the first substream would never be emitted
2209    // before the boundary.  With live substreams the substream is visible
2210    // immediately upon the first item.
2211    #[test]
2212    fn split_when_emits_substream_before_segment_ends() {
2213        // 512 > LIVE_SUBSTREAM_CAPACITY (256): the live substream will block
2214        // on backpressure mid-segment, but that's fine — the outer substream
2215        // IS emitted immediately and the consumer can drain it concurrently.
2216        const SEGMENT_LEN: usize = 300;
2217
2218        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2219        for i in 0..SEGMENT_LEN as i32 {
2220            tx.send(i).unwrap();
2221        }
2222        tx.send(-1).unwrap(); // boundary
2223        tx.send(99).unwrap(); // second segment
2224        drop(tx);
2225
2226        let outer = Source::from_iter(rx)
2227            .split_when(|item| *item == -1)
2228            .run_with(Sink::queue())
2229            .unwrap();
2230
2231        let (result_tx, result_rx) = mpsc::channel();
2232        thread::spawn(move || {
2233            let first = outer.pull().unwrap().expect("expected first substream");
2234            let items: Vec<i32> = first.run_collect().unwrap();
2235            let second = outer.pull().unwrap().expect("expected second substream");
2236            let items2: Vec<i32> = second.run_collect().unwrap();
2237            let done = outer.pull().unwrap().is_none();
2238            let _ = result_tx.send((items, items2, done));
2239        });
2240
2241        let (items, items2, done) = result_rx
2242            .recv_timeout(StdDuration::from_secs(5))
2243            .expect("timed out — split_when is buffering the whole segment");
2244        assert_eq!(items.len(), SEGMENT_LEN);
2245        assert_eq!(items2, vec![-1, 99]);
2246        assert!(done);
2247    }
2248
2249    #[test]
2250    fn split_after_emits_substream_before_segment_ends() {
2251        const SEGMENT_LEN: usize = 300;
2252
2253        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2254        for i in 0..SEGMENT_LEN as i32 {
2255            tx.send(i).unwrap();
2256        }
2257        tx.send(-1).unwrap(); // boundary (included in first segment for split_after)
2258        tx.send(99).unwrap();
2259        drop(tx);
2260
2261        let outer = Source::from_iter(rx)
2262            .split_after(|item| *item == -1)
2263            .run_with(Sink::queue())
2264            .unwrap();
2265
2266        let (result_tx, result_rx) = mpsc::channel();
2267        thread::spawn(move || {
2268            let first = outer.pull().unwrap().expect("expected first substream");
2269            let items: Vec<i32> = first.run_collect().unwrap();
2270            let second = outer.pull().unwrap().expect("expected second substream");
2271            let items2: Vec<i32> = second.run_collect().unwrap();
2272            let done = outer.pull().unwrap().is_none();
2273            let _ = result_tx.send((items, items2, done));
2274        });
2275
2276        let (items, items2, done) = result_rx
2277            .recv_timeout(StdDuration::from_secs(5))
2278            .expect("timed out — split_after is buffering the whole segment");
2279        // split_after includes the boundary element in the first segment.
2280        assert_eq!(items.len(), SEGMENT_LEN + 1);
2281        assert_eq!(items2, vec![99]);
2282        assert!(done);
2283    }
2284
2285    // Stress test: run flat_map_merge 20 times with many concurrent inner
2286    // streams to confirm the fixed coordinator tail loop never hangs.
2287    // Intended to be run with --test-threads=32 to expose the race.
2288    #[test]
2289    fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
2290        for _ in 0..20 {
2291            let result = Source::from_iter(0..50_i32)
2292                .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
2293                .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
2294                .unwrap()
2295                .wait();
2296            // Each i in 0..50 contributes i + (i+1) + (i+2) = 3i + 3.
2297            // Sum = 3 * (0+1+..+49) + 3*50 = 3*1225 + 150 = 3675 + 150 = 3825.
2298            assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
2299        }
2300    }
2301
2302    // Stress test for the single-mutex flat_map_merge refactor.
2303    // 20 runs with high concurrency to verify no deadlock, lost wakeup,
2304    // or window-accounting corruption under the merged-lock design.
2305    // Intended to be run with --test-threads=32.
2306    #[test]
2307    fn flat_map_merge_single_mutex_race_stress() {
2308        for _ in 0..20 {
2309            let result = Source::from_iter(0..100_i64)
2310                .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
2311                .run_with(Sink::fold(0i64, |acc, v| acc + v))
2312                .unwrap()
2313                .wait();
2314            // sum of item + (item+1000) for item in 0..100
2315            // = sum(0..100) + sum(0..100) + 100*1000
2316            // = 4950 + 4950 + 100000 = 109900
2317            assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
2318        }
2319    }
2320
2321    // Bounded-memory test for split_when batching: verify that items are
2322    // delivered to a slow consumer and not held indefinitely in the writer
2323    // buffer.  Uses a rendezvous-style channel where the producer sends MANY
2324    // items into the outer queue (larger than LIVE_SUBSTREAM_BATCH = 64), then
2325    // blocks.  The consumer must be able to drain the first segment fully — i.e.
2326    // the writer must have flushed even when the batch was smaller than 64.
2327    #[test]
2328    fn split_when_bounded_memory_rendezvous() {
2329        // Segment of 100 items (> LIVE_SUBSTREAM_BATCH=64): the writer will
2330        // flush at 64, then again at the boundary (remaining 36).
2331        const SEGMENT: usize = 100;
2332        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
2333        for i in 0..SEGMENT as i32 {
2334            tx.send(i).unwrap();
2335        }
2336        tx.send(-1).unwrap(); // boundary (split_when: starts new segment)
2337        // Second segment
2338        for i in 0..10_i32 {
2339            tx.send(i).unwrap();
2340        }
2341        drop(tx);
2342
2343        let outer = Source::from_iter(rx)
2344            .split_when(|item| *item == -1)
2345            .run_with(Sink::queue())
2346            .unwrap();
2347
2348        let (result_tx, result_rx) = mpsc::channel();
2349        thread::spawn(move || {
2350            let first = outer.pull().unwrap().expect("first segment");
2351            let seg1: Vec<i32> = first.run_collect().unwrap();
2352            let second = outer.pull().unwrap().expect("second segment");
2353            let seg2: Vec<i32> = second.run_collect().unwrap();
2354            let done = outer.pull().unwrap().is_none();
2355            result_tx.send((seg1, seg2, done)).unwrap();
2356        });
2357
2358        let (seg1, seg2, done) = result_rx
2359            .recv_timeout(StdDuration::from_secs(5))
2360            .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
2361        assert_eq!(seg1.len(), SEGMENT, "first segment length");
2362        assert_eq!(seg2[0], -1, "boundary element starts second segment");
2363        assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
2364        assert!(done);
2365    }
2366
2367    // Bounded-memory test for group_by batching: verifies that a slow consumer
2368    // waiting on a single-key substream eventually receives all items (i.e. the
2369    // write batch is flushed, not held indefinitely).
2370    #[test]
2371    fn group_by_single_key_bounded_memory_rendezvous() {
2372        // 200 items, all same key, batched in groups of 64 by the writer.
2373        // The consumer blocks on run_collect(); all items must arrive.
2374        const N: usize = 200;
2375        let outer = Source::from_iter(0..N as i64)
2376            .group_by(1, |_| 0u8, false)
2377            .run_with(Sink::queue())
2378            .unwrap();
2379
2380        let (result_tx, result_rx) = mpsc::channel();
2381        thread::spawn(move || {
2382            let substream = outer.pull().unwrap().expect("substream");
2383            let items: Vec<i64> = substream.run_collect().unwrap();
2384            let done = outer.pull().unwrap().is_none();
2385            result_tx.send((items, done)).unwrap();
2386        });
2387
2388        let (items, done) = result_rx
2389            .recv_timeout(StdDuration::from_secs(5))
2390            .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
2391        assert_eq!(items.len(), N, "all items delivered");
2392        assert_eq!(items[0], 0);
2393        assert_eq!(items[N - 1], (N - 1) as i64);
2394        assert!(done);
2395    }
2396
2397    #[test]
2398    fn scan_emits_seed_and_accumulated_values() {
2399        let result = Source::from_iter(1..=3)
2400            .scan(0, |acc, item| acc + item)
2401            .run_collect()
2402            .unwrap();
2403
2404        assert_eq!(result, vec![0, 1, 3, 6]);
2405    }
2406
2407    #[test]
2408    fn limit_fails_after_max_elements() {
2409        let result = Source::from_iter(0..3).limit(2).run_collect();
2410
2411        assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
2412    }
2413
2414    #[test]
2415    fn limit_weighted_fails_with_limit_error_like_akka() {
2416        let result = Source::from_iter(["this", "is", "some", "string"])
2417            .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
2418            .run_collect();
2419
2420        assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
2421    }
2422
2423    #[test]
2424    fn grouped_weighted_allows_oversized_first_element_like_akka() {
2425        let result = Source::from_iter([10_usize, 1, 2])
2426            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2427            .run_collect()
2428            .unwrap();
2429
2430        assert_eq!(result, vec![vec![10], vec![1, 2]]);
2431    }
2432
2433    #[test]
2434    fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2435        let result = Source::from_iter([1_usize, 10, 2])
2436            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2437            .run_collect()
2438            .unwrap();
2439
2440        assert_eq!(result, vec![vec![1, 10], vec![2]]);
2441    }
2442
2443    #[test]
2444    fn sink_terminals_materialize_results() {
2445        let sum = Source::from_iter(1..=4)
2446            .run_with(Sink::fold(0, |acc, item| acc + item))
2447            .unwrap();
2448
2449        assert_eq!(wait(sum), 10);
2450        assert_eq!(
2451            wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2452            1
2453        );
2454        assert_eq!(
2455            wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2456            4
2457        );
2458    }
2459
2460    #[test]
2461    fn all_terminal_sink_variants_complete() {
2462        assert_eq!(
2463            wait(
2464                Source::from_iter([1, 2, 3])
2465                    .run_with(Sink::collect())
2466                    .unwrap()
2467            ),
2468            vec![1, 2, 3]
2469        );
2470        assert_eq!(
2471            wait(
2472                Source::<i32>::empty()
2473                    .run_with(Sink::head_option())
2474                    .unwrap()
2475            ),
2476            None
2477        );
2478        assert_eq!(
2479            wait(
2480                Source::from_iter([1, 2, 3])
2481                    .run_with(Sink::last_option())
2482                    .unwrap()
2483            ),
2484            Some(3)
2485        );
2486        assert_eq!(
2487            wait(
2488                Source::from_iter([1, 2, 3])
2489                    .run_with(Sink::reduce(|acc, item| acc + item))
2490                    .unwrap()
2491            ),
2492            6
2493        );
2494
2495        let seen = StdArc::new(StdAtomicUsize::new(0));
2496        let seen_by_sink = StdArc::clone(&seen);
2497        assert_eq!(
2498            wait(
2499                Source::from_iter([1_usize, 2, 3])
2500                    .run_with(Sink::foreach(move |item| {
2501                        seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2502                    }))
2503                    .unwrap()
2504            ),
2505            NotUsed
2506        );
2507        assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2508    }
2509
2510    #[test]
2511    fn take_last_zero_returns_empty_vector() {
2512        let result = Source::from_iter([1, 2, 3])
2513            .run_with(Sink::take_last(0))
2514            .unwrap();
2515
2516        assert_eq!(wait(result), Vec::<i32>::new());
2517    }
2518
2519    #[test]
2520    fn bounded_head_terminals_complete_inline() {
2521        let materializer = Materializer::new();
2522
2523        let mut head = Source::from_iter(0_u64..1_000)
2524            .run_with_materializer(Sink::head(), &materializer)
2525            .unwrap();
2526        assert_eq!(materializer.active_streams(), 0);
2527        assert_eq!(head.try_wait(), Some(Ok(0)));
2528
2529        let mut filtered_head = Source::from_iter(0_u64..1_000)
2530            .filter(|item| *item >= 10)
2531            .run_with_materializer(Sink::head(), &materializer)
2532            .unwrap();
2533        assert_eq!(materializer.active_streams(), 0);
2534        assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2535
2536        let mut head_option = Source::<u64>::empty()
2537            .run_with_materializer(Sink::head_option(), &materializer)
2538            .unwrap();
2539        assert_eq!(materializer.active_streams(), 0);
2540        assert_eq!(head_option.try_wait(), Some(Ok(None)));
2541    }
2542
2543    #[test]
2544    fn bounded_head_fast_path_preserves_terminal_errors() {
2545        let materializer = Materializer::new();
2546
2547        let mut empty = Source::<u64>::empty()
2548            .run_with_materializer(Sink::head(), &materializer)
2549            .unwrap();
2550        assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2551
2552        let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2553            .run_with_materializer(Sink::head(), &materializer)
2554            .unwrap();
2555        assert_eq!(
2556            failed.try_wait(),
2557            Some(Err(StreamError::Failed("boom".into())))
2558        );
2559        assert_eq!(materializer.active_streams(), 0);
2560    }
2561
2562    #[test]
2563    fn runnable_graph_composes_source_and_sink() {
2564        let graph = Source::from_iter(1..=4)
2565            .map(|item| item * 2)
2566            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2567
2568        assert_eq!(wait(graph.run().unwrap()), 20);
2569
2570        let graph = Source::single(1)
2571            .map_materialized_value(|_| 20)
2572            .to(Sink::ignore())
2573            .map_materialized_value(|value| value + 1);
2574        assert_eq!(graph.run().unwrap(), 21);
2575
2576        let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2577        assert_eq!(ignored, NotUsed);
2578    }
2579
2580    #[test]
2581    fn materialized_values_follow_keep_defaults() {
2582        let source = Source::single(1).map_materialized_value(|_| "source");
2583        let flow = Flow::identity().map_materialized_value(|_| "flow");
2584
2585        let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2586        assert_eq!(source_mat.unwrap(), "source");
2587
2588        let combined = source
2589            .via_mat(flow, Keep::both)
2590            .to_mat(Sink::ignore(), Keep::both)
2591            .run()
2592            .unwrap();
2593        assert_eq!(combined.0, ("source", "flow"));
2594        assert_eq!(wait(combined.1), NotUsed);
2595
2596        let sink_mat = Source::single(41)
2597            .map_materialized_value(|_| "ignored source")
2598            .run_with(Sink::fold(1, |acc, item| acc + item))
2599            .unwrap();
2600        assert_eq!(wait(sink_mat), 42);
2601    }
2602
2603    #[test]
2604    fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2605        let sink = Flow::identity()
2606            .map(|item: i32| item + 1)
2607            .map_materialized_value(|_| "flow")
2608            .to(Sink::fold(0, |acc, item| acc + item));
2609
2610        let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2611
2612        assert_eq!(materialized, "flow");
2613        let explicit = Flow::identity()
2614            .map(|item: i32| item + 1)
2615            .map_materialized_value(|_| "flow")
2616            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2617            .run_with(Source::from_iter([1, 2, 3]))
2618            .unwrap();
2619        assert_eq!(explicit, NotUsed);
2620
2621        let explicit = Source::from_iter([1, 2, 3])
2622            .run_with(
2623                Flow::identity()
2624                    .map(|item: i32| item + 1)
2625                    .map_materialized_value(|_| "flow")
2626                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2627            )
2628            .unwrap();
2629        assert_eq!(explicit.0, "flow");
2630        assert_eq!(wait(explicit.1), 9);
2631    }
2632
2633    #[test]
2634    fn materializer_shutdown_fails_materialization() {
2635        let materializer = Materializer::new();
2636        let named = materializer.with_name_prefix("test-stream");
2637        materializer.shutdown();
2638
2639        let graph = Source::single(1).to(Sink::ignore());
2640
2641        assert_eq!(named.name_prefix(), "test-stream");
2642        assert_eq!(
2643            graph.run_with_materializer(&named),
2644            Err(StreamError::AbruptTermination)
2645        );
2646    }
2647
2648    #[test]
2649    fn materializer_shutdown_fails_running_stream_completion() {
2650        let materializer = Materializer::new();
2651        let completion = Source::repeat(1)
2652            .run_with_materializer(Sink::ignore(), &materializer)
2653            .unwrap();
2654
2655        assert_eq!(materializer.active_streams(), 1);
2656        materializer.shutdown();
2657        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2658        assert_eq!(materializer.active_streams(), 0);
2659    }
2660
2661    #[test]
2662    fn dropped_stream_completion_cancels_running_stream() {
2663        let materializer = Materializer::new();
2664        let completion = Source::repeat(1)
2665            .run_with_materializer(Sink::ignore(), &materializer)
2666            .unwrap();
2667
2668        assert_eq!(materializer.active_streams(), 1);
2669        drop(completion);
2670        for _ in 0..50 {
2671            if materializer.active_streams() == 0 {
2672                break;
2673            }
2674            thread::sleep(Duration::from_millis(5));
2675        }
2676        assert_eq!(materializer.active_streams(), 0);
2677    }
2678
2679    #[test]
2680    fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2681        let materializer = Materializer::new();
2682        let (once_tx, once_rx) = mpsc::channel();
2683        let once = materializer.schedule_once(Duration::from_millis(5), move || {
2684            once_tx.send(()).unwrap();
2685        });
2686        once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2687        assert!(!once.is_cancelled());
2688
2689        let (cancelled_tx, cancelled_rx) = mpsc::channel();
2690        let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2691            cancelled_tx.send(()).unwrap();
2692        });
2693        assert!(cancelled.cancel());
2694        assert!(!cancelled.cancel());
2695        assert!(cancelled.is_cancelled());
2696        assert!(
2697            cancelled_rx
2698                .recv_timeout(Duration::from_millis(75))
2699                .is_err()
2700        );
2701
2702        let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2703        let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2704        let fixed_delay = materializer.schedule_with_fixed_delay(
2705            Duration::from_millis(1),
2706            Duration::from_millis(5),
2707            move || {
2708                fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2709            },
2710        );
2711        thread::sleep(Duration::from_millis(25));
2712        assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2713        fixed_delay.cancel();
2714
2715        let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2716        let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2717        let fixed_rate = materializer.schedule_at_fixed_rate(
2718            Duration::from_millis(1),
2719            Duration::from_millis(5),
2720            move || {
2721                fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2722            },
2723        );
2724        thread::sleep(Duration::from_millis(25));
2725        assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2726        fixed_rate.cancel();
2727
2728        let shutdown_materializer = Materializer::new();
2729        let (shutdown_tx, shutdown_rx) = mpsc::channel();
2730        shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2731            shutdown_tx.send(()).unwrap();
2732        });
2733        shutdown_materializer.shutdown();
2734        assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2735    }
2736
2737    #[test]
2738    fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2739        use std::sync::{Condvar, Mutex};
2740
2741        #[derive(Debug)]
2742        enum TimerEvent {
2743            Started(usize, Instant),
2744            Completed(usize, Instant),
2745        }
2746
2747        let recv_event = |rx: &mpsc::Receiver<TimerEvent>, label: &str| {
2748            rx.recv_timeout(Duration::from_secs(20))
2749                .unwrap_or_else(|err| panic!("{label}: expected timer event within 20 s: {err}"))
2750        };
2751        let release = |gate: &StdArc<(Mutex<bool>, Condvar)>| {
2752            let (released, condvar) = &**gate;
2753            let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2754            *released = true;
2755            condvar.notify_all();
2756        };
2757
2758        let interval = Duration::from_secs(2);
2759        let overrun = interval + Duration::from_millis(250);
2760
2761        let rate_materializer = Materializer::new();
2762        let (rate_tx, rate_rx) = mpsc::channel();
2763        let rate_runs = StdArc::new(StdAtomicUsize::new(0));
2764        let rate_task_runs = StdArc::clone(&rate_runs);
2765        let rate_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2766        let rate_task_gate = StdArc::clone(&rate_gate);
2767        let fixed_rate =
2768            rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2769                let run = rate_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2770                rate_tx
2771                    .send(TimerEvent::Started(run, Instant::now()))
2772                    .unwrap();
2773                if run == 1 {
2774                    let (released, condvar) = &*rate_task_gate;
2775                    let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2776                    while !*released {
2777                        released = condvar
2778                            .wait(released)
2779                            .unwrap_or_else(|poison| poison.into_inner());
2780                    }
2781                    rate_tx
2782                        .send(TimerEvent::Completed(run, Instant::now()))
2783                        .unwrap();
2784                }
2785            });
2786        let rate_first_started = match recv_event(&rate_rx, "fixed-rate first task") {
2787            TimerEvent::Started(1, at) => at,
2788            other => panic!("fixed-rate first task: unexpected event {other:?}"),
2789        };
2790        assert!(wait_until(Duration::from_secs(20), || {
2791            rate_first_started.elapsed() >= overrun
2792        }));
2793        release(&rate_gate);
2794        let rate_first_completed = match recv_event(&rate_rx, "fixed-rate first completion") {
2795            TimerEvent::Completed(1, at) => at,
2796            other => panic!("fixed-rate first completion: unexpected event {other:?}"),
2797        };
2798        let rate_second_started = match recv_event(&rate_rx, "fixed-rate second task") {
2799            TimerEvent::Started(2, at) => at,
2800            other => panic!("fixed-rate second task: unexpected event {other:?}"),
2801        };
2802        fixed_rate.cancel();
2803        rate_materializer.shutdown();
2804
2805        let delay_materializer = Materializer::new();
2806        let (delay_tx, delay_rx) = mpsc::channel();
2807        let delay_runs = StdArc::new(StdAtomicUsize::new(0));
2808        let delay_task_runs = StdArc::clone(&delay_runs);
2809        let delay_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2810        let delay_task_gate = StdArc::clone(&delay_gate);
2811        let fixed_delay =
2812            delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2813                let run = delay_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2814                delay_tx
2815                    .send(TimerEvent::Started(run, Instant::now()))
2816                    .unwrap();
2817                if run == 1 {
2818                    let (released, condvar) = &*delay_task_gate;
2819                    let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2820                    while !*released {
2821                        released = condvar
2822                            .wait(released)
2823                            .unwrap_or_else(|poison| poison.into_inner());
2824                    }
2825                    delay_tx
2826                        .send(TimerEvent::Completed(run, Instant::now()))
2827                        .unwrap();
2828                }
2829            });
2830        let delay_first_started = match recv_event(&delay_rx, "fixed-delay first task") {
2831            TimerEvent::Started(1, at) => at,
2832            other => panic!("fixed-delay first task: unexpected event {other:?}"),
2833        };
2834        assert!(wait_until(Duration::from_secs(20), || {
2835            delay_first_started.elapsed() >= overrun
2836        }));
2837        release(&delay_gate);
2838        let delay_first_completed = match recv_event(&delay_rx, "fixed-delay first completion") {
2839            TimerEvent::Completed(1, at) => at,
2840            other => panic!("fixed-delay first completion: unexpected event {other:?}"),
2841        };
2842        let delay_second_started = match recv_event(&delay_rx, "fixed-delay second task") {
2843            TimerEvent::Started(2, at) => at,
2844            other => panic!("fixed-delay second task: unexpected event {other:?}"),
2845        };
2846        fixed_delay.cancel();
2847        delay_materializer.shutdown();
2848
2849        let rate_task_time = rate_first_completed.duration_since(rate_first_started);
2850        let rate_catch_up = rate_second_started.duration_since(rate_first_completed);
2851        let delay_task_time = delay_first_completed.duration_since(delay_first_started);
2852        let delay_gap = delay_second_started.duration_since(delay_first_completed);
2853        assert!(
2854            rate_task_time >= interval,
2855            "fixed-rate first task should overrun its interval; ran for {rate_task_time:?}"
2856        );
2857        assert!(
2858            rate_catch_up < interval,
2859            "fixed-rate second task should catch up after an overrun; waited {rate_catch_up:?}"
2860        );
2861        assert!(
2862            delay_task_time >= interval,
2863            "fixed-delay first task should overrun its interval; ran for {delay_task_time:?}"
2864        );
2865        assert!(
2866            delay_gap >= interval,
2867            "fixed-delay second task fired before one full delay elapsed after completion: {delay_gap:?}",
2868        );
2869    }
2870
2871    #[test]
2872    fn runtime_repeating_timer_cancellation_stops_future_fires() {
2873        let materializer = Materializer::new();
2874        let (tx, rx) = mpsc::channel();
2875        let timer = materializer.schedule_at_fixed_rate(
2876            Duration::from_millis(1),
2877            Duration::from_millis(30),
2878            move || {
2879                tx.send(()).unwrap();
2880            },
2881        );
2882
2883        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2884        assert!(timer.cancel());
2885        assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2886        materializer.shutdown();
2887    }
2888
2889    #[test]
2890    fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2891        let materializer = Materializer::new();
2892        materializer.schedule_once(Duration::from_millis(1), || {
2893            panic!("timer boom");
2894        });
2895
2896        let (tx, rx) = mpsc::channel();
2897        materializer.schedule_once(Duration::from_millis(20), move || {
2898            tx.send(()).unwrap();
2899        });
2900
2901        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2902        materializer.shutdown();
2903    }
2904
2905    #[test]
2906    fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2907        let materializer = Materializer::new();
2908        let panic_count = StdArc::new(StdAtomicUsize::new(0));
2909        let panic_count_task = StdArc::clone(&panic_count);
2910        materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2911            panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2912            panic!("fixed-rate boom");
2913        });
2914
2915        assert!(wait_until(Duration::from_millis(150), || {
2916            panic_count.load(StdOrdering::SeqCst) == 1
2917        }));
2918
2919        let (tx, rx) = mpsc::channel();
2920        materializer.schedule_once(Duration::from_millis(30), move || {
2921            tx.send(()).unwrap();
2922        });
2923        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2924
2925        thread::sleep(Duration::from_millis(90));
2926        assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2927        materializer.shutdown();
2928    }
2929
2930    #[test]
2931    fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2932        let materializer = Materializer::new();
2933        let started = StdArc::new(StdAtomicBool::new(false));
2934        let started_task = StdArc::clone(&started);
2935        let slow_timer = materializer.schedule_at_fixed_rate(
2936            Duration::ZERO,
2937            Duration::from_millis(250),
2938            move || {
2939                started_task.store(true, StdOrdering::SeqCst);
2940                thread::sleep(Duration::from_millis(200));
2941            },
2942        );
2943
2944        assert!(wait_until(Duration::from_millis(100), || {
2945            started.load(StdOrdering::SeqCst)
2946        }));
2947
2948        let start = Instant::now();
2949        let (tx, rx) = mpsc::channel();
2950        materializer.schedule_once(Duration::from_millis(10), move || {
2951            tx.send(Instant::now()).unwrap();
2952        });
2953        let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2954        let elapsed = fired_at.duration_since(start);
2955
2956        slow_timer.cancel();
2957        materializer.shutdown();
2958        assert!(
2959            elapsed < Duration::from_millis(150),
2960            "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2961        );
2962    }
2963
2964    #[test]
2965    fn runtime_shutdown_stops_timer_driver_thread() {
2966        let materializer = Materializer::new();
2967        assert!(wait_until(Duration::from_secs(1), || materializer
2968            .timer_driver_is_live()));
2969
2970        materializer.shutdown();
2971        assert!(wait_until(Duration::from_secs(2), || !materializer
2972            .timer_driver_is_live()));
2973    }
2974
2975    #[test]
2976    fn runtime_timer_driver_orders_many_timers_by_deadline() {
2977        let materializer = Materializer::new();
2978        let (tx, rx) = mpsc::channel();
2979        let schedule = [(450_u64, 4_u8), (50, 1), (350, 3), (150, 2), (550, 5)];
2980
2981        for (delay_ms, value) in schedule {
2982            let tx = tx.clone();
2983            materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2984                tx.send(value).unwrap();
2985            });
2986        }
2987        drop(tx);
2988
2989        let mut received = Vec::new();
2990        for _ in 0..schedule.len() {
2991            received.push(rx.recv_timeout(Duration::from_secs(10)).unwrap());
2992        }
2993        materializer.shutdown();
2994
2995        assert_eq!(received, vec![1, 2, 3, 4, 5]);
2996    }
2997
2998    #[test]
2999    fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
3000        let materializer = Materializer::new();
3001        let thread_name = materializer.timer_thread_name().to_owned();
3002        // Linux exposes thread names through /proc/self/task/*/comm with the
3003        // kernel's 15-byte task-name limit. Once a full test process has
3004        // created enough runtimes, `datum-timer-1000` is reported as
3005        // `datum-timer-100`, so compare against the Linux-visible name and use
3006        // the observed baseline for collision-safe assertions.
3007        let linux_thread_name = thread_name.chars().take(15).collect::<String>();
3008        assert!(wait_until(Duration::from_secs(5), || {
3009            materializer.timer_driver_is_live() && linux_thread_count(&linux_thread_name) >= 1
3010        }));
3011        let live_timer_threads = linux_thread_count(&linux_thread_name);
3012
3013        for _ in 0..128 {
3014            materializer.schedule_once(Duration::from_secs(60), || {});
3015        }
3016
3017        assert!(
3018            wait_until(Duration::from_secs(5), || {
3019                materializer.timer_driver_is_live()
3020                    && linux_thread_count(&linux_thread_name) == live_timer_threads
3021            }),
3022            "scheduling timers should not create extra timer threads for a runtime",
3023        );
3024        materializer.shutdown();
3025        assert!(wait_until(Duration::from_secs(5), || {
3026            !materializer.timer_driver_is_live()
3027                && linux_thread_count(&linux_thread_name) < live_timer_threads
3028        }));
3029    }
3030
3031    #[test]
3032    fn cancelled_and_never_sinks_have_distinct_materialization_results() {
3033        assert_eq!(
3034            Source::repeat(1)
3035                .run_with(Sink::cancelled())
3036                .expect("cancelled sink materializes"),
3037            NotUsed
3038        );
3039        assert_eq!(
3040            Source::single(1)
3041                .run_with(Sink::never())
3042                .expect("never sink materializes")
3043                .try_wait(),
3044            None
3045        );
3046    }
3047
3048    #[test]
3049    fn never_sink_finishes_on_materializer_shutdown() {
3050        let materializer = Materializer::new();
3051        let completion = Source::single(1)
3052            .run_with_materializer(Sink::never(), &materializer)
3053            .unwrap();
3054
3055        materializer.shutdown();
3056        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
3057    }
3058
3059    #[test]
3060    fn dropping_source_never_completion_releases_parked_worker() {
3061        let materializer = Materializer::new();
3062        let completion = Source::<i32>::never()
3063            .run_with_materializer(Sink::ignore(), &materializer)
3064            .unwrap();
3065
3066        assert!(wait_until(StdDuration::from_secs(1), || {
3067            materializer.active_streams() == 1
3068        }));
3069        assert_eq!(materializer.active_streams(), 1);
3070
3071        drop(completion);
3072
3073        assert!(wait_until(StdDuration::from_secs(15), || {
3074            materializer.active_streams() == 0
3075        }));
3076        assert_eq!(materializer.active_streams(), 0);
3077    }
3078
3079    #[test]
3080    fn future_and_maybe_sources_emit_values() {
3081        let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
3082        assert_eq!(future_value, vec![7]);
3083
3084        let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
3085            .run_collect()
3086            .unwrap();
3087        assert_eq!(future_source, vec![1, 2, 3]);
3088
3089        let (handle, source) = Source::maybe();
3090        assert_eq!(
3091            source.clone().run_collect(),
3092            Err(StreamError::MaybeIncomplete)
3093        );
3094        handle.complete(9).unwrap();
3095        assert_eq!(source.run_collect().unwrap(), vec![9]);
3096    }
3097
3098    #[test]
3099    fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
3100        assert_eq!(
3101            Source::cycle(|| [1, 2, 3].into_iter())
3102                .take(8)
3103                .run_collect()
3104                .unwrap(),
3105            vec![1, 2, 3, 1, 2, 3, 1, 2]
3106        );
3107        assert_eq!(
3108            Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
3109            Err(StreamError::Failed("empty iterator".into()))
3110        );
3111        assert_eq!(
3112            Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
3113                .run_collect()
3114                .unwrap(),
3115            vec![0, 1, 2, 3]
3116        );
3117        assert_eq!(
3118            Source::unfold_async(0, |state| async move {
3119                Ok((state < 4).then_some((state + 1, state * 2)))
3120            })
3121            .run_collect()
3122            .unwrap(),
3123            vec![0, 2, 4, 6]
3124        );
3125        assert!(matches!(
3126            Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
3127            Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
3128        ));
3129    }
3130
3131    #[test]
3132    fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
3133        let created = StdArc::new(StdAtomicUsize::new(0));
3134        let created_for_source = StdArc::clone(&created);
3135        let source = Source::<i32>::lazy_source(move || {
3136            created_for_source.fetch_add(1, StdOrdering::SeqCst);
3137            Source::from_iter([7, 8]).map_materialized_value(|_| 99)
3138        });
3139        let materializer = Materializer::new();
3140        let (mut stream, mut mat) = StdArc::clone(&source.factory)
3141            .create(&materializer)
3142            .unwrap();
3143
3144        assert_eq!(created.load(StdOrdering::SeqCst), 0);
3145        assert!(mat.try_wait().is_none());
3146        assert_eq!(stream.next().unwrap().unwrap(), 7);
3147        assert_eq!(mat.wait().unwrap(), 99);
3148        assert_eq!(created.load(StdOrdering::SeqCst), 1);
3149        assert_eq!(stream.next().unwrap().unwrap(), 8);
3150
3151        let never_created = StdArc::new(StdAtomicUsize::new(0));
3152        let never_created_for_source = StdArc::clone(&never_created);
3153        let mat = Source::<i32>::lazy_future_source(move || {
3154            never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
3155            async { Ok(Source::single(1)) }
3156        })
3157        .to(Sink::cancelled())
3158        .run()
3159        .unwrap();
3160        assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
3161        assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
3162
3163        let lazy_future = StdArc::new(StdAtomicUsize::new(0));
3164        let lazy_future_for_source = StdArc::clone(&lazy_future);
3165        let source = Source::lazy_future(move || {
3166            lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
3167            async { Ok(42) }
3168        });
3169        let (mut stream, _) = StdArc::clone(&source.factory)
3170            .create(&Materializer::new())
3171            .unwrap();
3172        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
3173        assert_eq!(stream.next().unwrap().unwrap(), 42);
3174        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
3175    }
3176
3177    #[test]
3178    fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
3179        let closed = StdArc::new(StdAtomicUsize::new(0));
3180        let closed_on_complete = StdArc::clone(&closed);
3181        let values = Source::unfold_resource(
3182            || Ok(std::collections::VecDeque::from([1, 2, 3])),
3183            |items| Ok(items.pop_front()),
3184            move |_items| {
3185                closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
3186                Ok(())
3187            },
3188        )
3189        .run_collect()
3190        .unwrap();
3191        assert_eq!(values, vec![1, 2, 3]);
3192        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3193
3194        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3195        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3196        let failed = Source::<i32>::unfold_resource(
3197            || Ok(()),
3198            |_| Err(StreamError::Failed("read".into())),
3199            move |_| {
3200                closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
3201                Err(StreamError::Failed("close".into()))
3202            },
3203        )
3204        .run_collect();
3205        assert_eq!(failed, Err(StreamError::Failed("read".into())));
3206        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3207
3208        let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
3209        let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
3210        let first = Source::unfold_resource(
3211            || Ok(0_usize),
3212            |next| {
3213                let item = *next;
3214                *next += 1;
3215                Ok(Some(item))
3216            },
3217            move |_| {
3218                closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
3219                Ok(())
3220            },
3221        )
3222        .run_with(Sink::head())
3223        .unwrap();
3224        assert_eq!(first.wait().unwrap(), 0);
3225        assert!(wait_until(Duration::from_millis(250), || {
3226            closed_on_cancel.load(StdOrdering::SeqCst) == 1
3227        }));
3228    }
3229
3230    #[test]
3231    fn wp6b_async_resource_and_async_accumulators_are_sequential() {
3232        let closed = StdArc::new(StdAtomicUsize::new(0));
3233        let closed_for_close = StdArc::clone(&closed);
3234        let values = Source::unfold_resource_async(
3235            || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
3236            |items| {
3237                let item = items.pop_front();
3238                async move { Ok(item) }
3239            },
3240            move |_items| {
3241                let closed = StdArc::clone(&closed_for_close);
3242                async move {
3243                    closed.fetch_add(1, StdOrdering::SeqCst);
3244                    Ok(())
3245                }
3246            },
3247        )
3248        .run_collect()
3249        .unwrap();
3250        assert_eq!(values, vec![1, 2, 3]);
3251        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3252
3253        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3254        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3255        let failed = Source::<i32>::unfold_resource_async(
3256            || async { Ok(()) },
3257            |_resource| async { Err(StreamError::Failed("read".into())) },
3258            move |_resource| {
3259                let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
3260                async move {
3261                    closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
3262                    Err(StreamError::Failed("close".into()))
3263                }
3264            },
3265        )
3266        .run_collect();
3267        assert_eq!(failed, Err(StreamError::Failed("read".into())));
3268        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3269
3270        let active = StdArc::new(StdAtomicUsize::new(0));
3271        let max_active = StdArc::new(StdAtomicUsize::new(0));
3272        let active_for_stage = StdArc::clone(&active);
3273        let max_for_stage = StdArc::clone(&max_active);
3274        let scanned = Source::from_iter(1..=4)
3275            .scan_async(0, move |acc, item| {
3276                let active = StdArc::clone(&active_for_stage);
3277                let max_active = StdArc::clone(&max_for_stage);
3278                async move {
3279                    let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3280                    max_active.fetch_max(now, StdOrdering::SeqCst);
3281                    tokio::time::sleep(Duration::from_millis(1)).await;
3282                    active.fetch_sub(1, StdOrdering::SeqCst);
3283                    Ok(acc + item)
3284                }
3285            })
3286            .run_collect()
3287            .unwrap();
3288        assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
3289        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3290
3291        let folded = Source::from_iter(1..=4)
3292            .fold_async(0, |acc, item| async move { Ok(acc + item) })
3293            .run_collect()
3294            .unwrap();
3295        assert_eq!(folded, vec![10]);
3296    }
3297
3298    #[test]
3299    fn wp6b_fold_async_materialization_does_not_drain_upstream() {
3300        let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
3301        let started = StdArc::new(StdAtomicBool::new(false));
3302        let source = {
3303            let release = StdArc::clone(&release);
3304            let started = StdArc::clone(&started);
3305            Source::from_factory(move || {
3306                let release = StdArc::clone(&release);
3307                let started = StdArc::clone(&started);
3308                let mut emitted = false;
3309                Box::new(std::iter::from_fn(move || {
3310                    if emitted {
3311                        return None;
3312                    }
3313                    emitted = true;
3314                    started.store(true, StdOrdering::SeqCst);
3315                    let (released, available) = &*release;
3316                    let mut released = released.lock().unwrap();
3317                    while !*released {
3318                        released = available.wait(released).unwrap();
3319                    }
3320                    Some(Ok(1))
3321                }))
3322            })
3323        };
3324
3325        let (materialized_tx, materialized_rx) = mpsc::channel();
3326        let join = thread::spawn(move || {
3327            let queue = source
3328                .fold_async(0, |acc, item| async move { Ok(acc + item) })
3329                .run_with(Sink::queue())
3330                .unwrap();
3331            materialized_tx.send(queue).unwrap();
3332        });
3333
3334        let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
3335            Ok(queue) => queue,
3336            Err(error) => {
3337                let (released, available) = &*release;
3338                *released.lock().unwrap() = true;
3339                available.notify_all();
3340                let _ = join.join();
3341                panic!("fold_async materialization did not return before first pull: {error}");
3342            }
3343        };
3344        let (released, _) = &*release;
3345        assert!(
3346            !*released.lock().unwrap(),
3347            "test source was released before materialization returned"
3348        );
3349
3350        let (released, available) = &*release;
3351        *released.lock().unwrap() = true;
3352        available.notify_all();
3353        assert_eq!(queue.pull().unwrap(), Some(1));
3354        assert_eq!(queue.pull().unwrap(), None);
3355        join.join().unwrap();
3356        assert!(started.load(StdOrdering::SeqCst));
3357    }
3358
3359    #[test]
3360    fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
3361        let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
3362        let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
3363        let empty_sink = Source::<i32>::empty()
3364            .run_with(Sink::lazy_sink(move || {
3365                lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3366                Sink::ignore()
3367            }))
3368            .unwrap();
3369        assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
3370        assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
3371
3372        let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
3373        let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
3374        Source::from_iter([1_usize, 2, 3])
3375            .run_with(Sink::foreach_async(2, move |item| {
3376                let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
3377                async move {
3378                    foreach_sum.fetch_add(item, StdOrdering::SeqCst);
3379                    Ok(())
3380                }
3381            }))
3382            .unwrap()
3383            .wait()
3384            .unwrap();
3385        assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
3386
3387        let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
3388        let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
3389        let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
3390            lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3391            Flow::identity()
3392                .map(|item: i32| item + 10)
3393                .map_materialized_value(|_| 123)
3394        });
3395        let mat = (lazy_flow.materialize)().unwrap();
3396        let mut stream = match lazy_flow.transform {
3397            flow::FlowTransform::Runtime(transform) => {
3398                transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
3399            }
3400            flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
3401        };
3402        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
3403        assert_eq!(stream.next().unwrap().unwrap(), 11);
3404        assert_eq!(mat.wait().unwrap(), 123);
3405        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
3406        assert_eq!(stream.next().unwrap().unwrap(), 12);
3407
3408        let future_flow = Source::from_iter([1, 2])
3409            .via_mat(
3410                Flow::future_flow(|| async {
3411                    Ok(Flow::identity()
3412                        .map(|item: i32| item * 2)
3413                        .map_materialized_value(|_| 77))
3414                }),
3415                Keep::right,
3416            )
3417            .to_mat(Sink::collect(), Keep::both)
3418            .run()
3419            .unwrap();
3420        assert_eq!(future_flow.0.wait().unwrap(), 77);
3421        assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
3422    }
3423
3424    #[test]
3425    fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
3426        // Order-sensitive pin for the per-thread FIFO slot pairing: one
3427        // cloned lazy-flow blueprint used twice in the SAME chain on the
3428        // same thread. element = first_stage_id * 100 + second_stage_id, so
3429        // a cross-wired (swapped) mat pairing would yield id2 * 100 + id1.
3430        for round in 0..50 {
3431            let counter = StdArc::new(StdAtomicUsize::new(1));
3432            let factory_counter = StdArc::clone(&counter);
3433            let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
3434                let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
3435                Flow::identity()
3436                    .map(move |x: usize| x * 100 + id)
3437                    .map_materialized_value(move |_| id)
3438            });
3439            let lazy_again = lazy.clone();
3440
3441            let ((first_mat, second_mat), out) = Source::from_iter([0usize])
3442                .via_mat(lazy, Keep::right)
3443                .via_mat(lazy_again, Keep::both)
3444                .to_mat(Sink::collect(), Keep::both)
3445                .run()
3446                .unwrap();
3447
3448            let first_id = first_mat.wait().unwrap();
3449            let second_id = second_mat.wait().unwrap();
3450            let element = out.wait().unwrap()[0];
3451            assert_eq!(
3452                element,
3453                first_id * 100 + second_id,
3454                "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
3455            );
3456            assert_ne!(
3457                first_id, second_id,
3458                "round {round}: same factory instance paired twice"
3459            );
3460        }
3461    }
3462
3463    #[test]
3464    fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
3465        for _ in 0..20 {
3466            let next_id = StdArc::new(StdAtomicUsize::new(0));
3467            let next_id_for_factory = StdArc::clone(&next_id);
3468            let flow = Flow::<i32, i32>::lazy_flow(move || {
3469                let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
3470                Flow::identity()
3471                    .map(move |item: i32| item + (id as i32 * 100))
3472                    .map_materialized_value(move |_| id)
3473            });
3474            let barrier = StdArc::new(std::sync::Barrier::new(3));
3475
3476            let spawn_materialization = |input: i32| {
3477                let flow = flow.clone();
3478                let barrier = StdArc::clone(&barrier);
3479                thread::spawn(move || {
3480                    barrier.wait();
3481                    let (mat, values) = Source::single(input)
3482                        .via_mat(flow, Keep::right)
3483                        .to_mat(Sink::collect(), Keep::both)
3484                        .run()
3485                        .unwrap();
3486                    (input, mat.wait().unwrap(), values.wait().unwrap())
3487                })
3488            };
3489
3490            let first = spawn_materialization(1);
3491            let second = spawn_materialization(2);
3492            barrier.wait();
3493
3494            for result in [first.join().unwrap(), second.join().unwrap()] {
3495                let (input, mat_id, values) = result;
3496                assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
3497            }
3498            assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
3499        }
3500    }
3501
3502    #[test]
3503    fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
3504        let queue = Source::from_factory(|| {
3505            Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
3506        })
3507        .map_with_resource(
3508            || Ok(()),
3509            |_resource, item| Ok(item + 10),
3510            |_resource| Ok(Some(99)),
3511        )
3512        .run_with(Sink::queue())
3513        .unwrap();
3514
3515        assert_eq!(queue.pull().unwrap(), Some(11));
3516        assert_eq!(queue.pull().unwrap(), Some(99));
3517        assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
3518
3519        let failed: StreamResult<Vec<i32>> = Source::single(1)
3520            .map_with_resource(
3521                || Ok(()),
3522                |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
3523                |_resource| -> StreamResult<Option<i32>> {
3524                    Err(StreamError::Failed("close".into()))
3525                },
3526            )
3527            .run_collect();
3528        assert_eq!(failed, Err(StreamError::Failed("map".into())));
3529    }
3530
3531    #[test]
3532    fn stateful_and_terminal_source_operators_work() {
3533        let stateful = Source::from_iter([1, 2, 3])
3534            .stateful_map(0, |sum, item| {
3535                *sum += item;
3536                *sum
3537            })
3538            .run_collect()
3539            .unwrap();
3540        assert_eq!(stateful, vec![1, 3, 6]);
3541
3542        let concat = Source::from_iter([1, 2, 3])
3543            .stateful_map_concat(0, |sum, item| {
3544                *sum += item;
3545                [item, *sum]
3546            })
3547            .run_collect()
3548            .unwrap();
3549        assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3550
3551        assert_eq!(
3552            Source::from_iter([1, 2, 3])
3553                .fold(10, |acc, item| acc + item)
3554                .run_collect()
3555                .unwrap(),
3556            vec![16]
3557        );
3558        assert_eq!(
3559            Source::from_iter([1, 2, 3])
3560                .reduce(|acc, item| acc + item)
3561                .run_collect()
3562                .unwrap(),
3563            vec![6]
3564        );
3565    }
3566
3567    #[test]
3568    fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3569        let concat = Source::single(())
3570            .map_concat(|_| 0_u64..)
3571            .take(1)
3572            .run_collect()
3573            .unwrap();
3574        assert_eq!(concat, vec![0]);
3575
3576        let sliding = Source::repeat(1_u64)
3577            .sliding(2, 1)
3578            .take(1)
3579            .run_collect()
3580            .unwrap();
3581        assert_eq!(sliding, vec![vec![1, 1]]);
3582    }
3583
3584    #[test]
3585    fn fan_in_source_operators_follow_ordering_rules() {
3586        assert_eq!(
3587            Source::from_iter([1, 2])
3588                .concat(Source::from_iter([3, 4]))
3589                .run_collect()
3590                .unwrap(),
3591            vec![1, 2, 3, 4]
3592        );
3593        assert_eq!(
3594            Source::from_iter([3, 4])
3595                .prepend(Source::from_iter([1, 2]))
3596                .run_collect()
3597                .unwrap(),
3598            vec![1, 2, 3, 4]
3599        );
3600        assert_eq!(
3601            Source::empty()
3602                .or_else(Source::from_iter([10, 20]))
3603                .run_collect()
3604                .unwrap(),
3605            vec![10, 20]
3606        );
3607        assert_eq!(
3608            Source::from_iter([1, 2])
3609                .or_else(Source::from_iter([10, 20]))
3610                .run_collect()
3611                .unwrap(),
3612            vec![1, 2]
3613        );
3614        assert_eq!(
3615            Source::from_iter([1, 2, 3])
3616                .interleave(Source::from_iter([10, 11, 12]), 2)
3617                .run_collect()
3618                .unwrap(),
3619            vec![1, 2, 10, 11, 3, 12]
3620        );
3621    }
3622
3623    #[test]
3624    fn fan_in_flow_operators_compose_with_primary_stream() {
3625        let concat = Source::from_iter([1, 2])
3626            .via(Flow::identity().concat(Source::from_iter([3, 4])))
3627            .run_collect()
3628            .unwrap();
3629        assert_eq!(concat, vec![1, 2, 3, 4]);
3630
3631        let prepend = Source::from_iter([3, 4])
3632            .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3633            .run_collect()
3634            .unwrap();
3635        assert_eq!(prepend, vec![1, 2, 3, 4]);
3636
3637        let interleave = Source::from_iter([1, 2, 3])
3638            .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3639            .run_collect()
3640            .unwrap();
3641        assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3642
3643        let merge_sorted = Source::from_iter([1, 4])
3644            .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3645            .run_collect()
3646            .unwrap();
3647        assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3648
3649        let zip_latest = Source::from_iter([1, 2])
3650            .via(Flow::identity().zip_latest(Source::single(10)))
3651            .run_collect()
3652            .unwrap();
3653        assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3654
3655        let zip_latest_with = Source::from_iter([1, 2])
3656            .via(
3657                Flow::identity()
3658                    .zip_latest_with(Source::single(10), false, |left, right| left + right),
3659            )
3660            .run_collect()
3661            .unwrap();
3662        assert_eq!(zip_latest_with, vec![11, 12]);
3663    }
3664
3665    #[test]
3666    fn fan_in_operators_propagate_errors_and_eager_close() {
3667        assert!(matches!(
3668            Source::failed(StreamError::Failed("boom".into()))
3669                .or_else(Source::from_iter([1, 2]))
3670                .run_collect(),
3671            Err(StreamError::Failed(_))
3672        ));
3673        assert!(matches!(
3674            Source::from_iter([1, 2])
3675                .prepend(Source::failed(StreamError::Failed("boom".into())))
3676                .run_collect(),
3677            Err(StreamError::Failed(_))
3678        ));
3679        assert_eq!(
3680            Source::from_iter([1, 2])
3681                .interleave_all([Source::empty()], 1, true)
3682                .run_collect()
3683                .unwrap(),
3684            vec![1]
3685        );
3686    }
3687
3688    #[test]
3689    fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3690        use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3691
3692        let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3693            AtomicUsize::new(0),
3694            AtomicUsize::new(0),
3695            AtomicUsize::new(0),
3696        ]);
3697
3698        let make_source = |idx: usize| {
3699            let pulls = Arc::clone(&pulls);
3700            Source::from_materialized_factory(move |_| {
3701                let pulls = Arc::clone(&pulls);
3702                let mut emitted = false;
3703                Ok((
3704                    Box::new(std::iter::from_fn(move || {
3705                        pulls[idx].fetch_add(1, Ordering::SeqCst);
3706                        if !emitted && idx == 0 {
3707                            emitted = true;
3708                            Some(Ok(42))
3709                        } else {
3710                            None
3711                        }
3712                    })) as BoxStream<i32>,
3713                    NotUsed,
3714                ))
3715            })
3716        };
3717
3718        let result = make_source(0)
3719            .interleave_all([make_source(1), make_source(2)], 1, false)
3720            .run_with(Sink::head());
3721
3722        assert_eq!(wait(result.unwrap()), 42);
3723        assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3724        assert_eq!(
3725            pulls[1].load(Ordering::SeqCst),
3726            0,
3727            "second input should not be pulled when downstream cancels after first element"
3728        );
3729        assert_eq!(
3730            pulls[2].load(Ordering::SeqCst),
3731            0,
3732            "third input should not be pulled before its turn"
3733        );
3734    }
3735
3736    #[test]
3737    fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3738        assert_eq!(
3739            Source::from_iter([1, 2, 3, 4])
3740                .interleave_all(
3741                    [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3742                    1,
3743                    false
3744                )
3745                .run_collect()
3746                .unwrap(),
3747            vec![1, 10, 20, 2, 21, 3, 22, 4]
3748        );
3749    }
3750
3751    #[test]
3752    fn remaining_merge_and_zip_family_matches_expected_ordering() {
3753        assert_eq!(
3754            Source::from_iter([1, 4])
3755                .merge_sorted(Source::from_iter([2, 3, 5]))
3756                .run_collect()
3757                .unwrap(),
3758            vec![1, 2, 3, 4, 5]
3759        );
3760
3761        assert_eq!(
3762            Source::from_iter([1, 2])
3763                .merge_latest(Source::single(10), false)
3764                .run_collect()
3765                .unwrap(),
3766            vec![vec![1, 10], vec![2, 10]]
3767        );
3768
3769        assert_eq!(
3770            Source::from_iter([1, 2, 3])
3771                .merge_all([Source::from_iter([10, 11])], false)
3772                .run_collect()
3773                .unwrap(),
3774            vec![1, 10, 2, 11, 3]
3775        );
3776
3777        assert_eq!(
3778            Source::from_iter([1, 2, 3])
3779                .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3780                .run_collect()
3781                .unwrap(),
3782            vec![11, 13, 15]
3783        );
3784
3785        assert_eq!(
3786            Source::from_iter([1, 2])
3787                .zip_latest(Source::single(10))
3788                .run_collect()
3789                .unwrap(),
3790            vec![(1, 10), (2, 10)]
3791        );
3792
3793        assert_eq!(
3794            Source::from_iter([1, 2, 3])
3795                .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3796                .run_collect()
3797                .unwrap(),
3798            vec![11, 12, 13]
3799        );
3800
3801        assert_eq!(
3802            Source::from_iter([1, 2])
3803                .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3804                .run_collect()
3805                .unwrap(),
3806            vec![(1, 10), (2, 11), (-1, 12)]
3807        );
3808
3809        assert_eq!(
3810            Source::from_iter([5, 6, 7])
3811                .zip_with_index()
3812                .run_collect()
3813                .unwrap(),
3814            vec![(5, 0), (6, 1), (7, 2)]
3815        );
3816
3817        assert_eq!(
3818            Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3819                .run_collect()
3820                .unwrap(),
3821            vec![vec![1, 10], vec![2, 20]]
3822        );
3823
3824        assert_eq!(
3825            Source::zip_with_n(
3826                [
3827                    Source::from_iter([1, 2]),
3828                    Source::from_iter([10, 20]),
3829                    Source::from_iter([100, 200]),
3830                ],
3831                |values| values.into_iter().sum::<i32>(),
3832            )
3833            .run_collect()
3834            .unwrap(),
3835            vec![111, 222]
3836        );
3837
3838        assert_eq!(
3839            Source::merge_prioritized_n(
3840                [
3841                    (Source::from_iter([1, 2, 3, 4]), 2),
3842                    (Source::from_iter([10, 11]), 1),
3843                ],
3844                false,
3845            )
3846            .run_collect()
3847            .unwrap(),
3848            vec![1, 2, 10, 3, 4, 11]
3849        );
3850
3851        assert_eq!(
3852            Source::combine(
3853                Source::from_iter([1, 2, 3]),
3854                Source::from_iter([10, 11]),
3855                std::iter::empty::<Source<i32, NotUsed>>(),
3856                SourceCombineStrategy::Merge {
3857                    eager_complete: false,
3858                },
3859            )
3860            .run_collect()
3861            .unwrap(),
3862            vec![1, 10, 2, 11, 3]
3863        );
3864
3865        let combined_sink = Sink::combine(
3866            Sink::ignore(),
3867            Sink::ignore(),
3868            std::iter::empty::<Sink<i32, NotUsed>>(),
3869            SinkCombineStrategy::Broadcast,
3870        );
3871        assert_eq!(
3872            Source::from_iter([1, 2, 3])
3873                .run_with(combined_sink)
3874                .unwrap(),
3875            NotUsed
3876        );
3877    }
3878
3879    #[test]
3880    fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3881        // Regression: the combined children's materialized values were
3882        // dropped at materialization, tripping cancel-on-drop and silently
3883        // cancelling every child before it consumed anything (0 deliveries).
3884        let first_count = StdArc::new(StdAtomicUsize::new(0));
3885        let second_count = StdArc::new(StdAtomicUsize::new(0));
3886        let first_counter = StdArc::clone(&first_count);
3887        let second_counter = StdArc::clone(&second_count);
3888        let combined = Sink::combine(
3889            Sink::foreach(move |_: i32| {
3890                first_counter.fetch_add(1, StdOrdering::SeqCst);
3891            }),
3892            Sink::foreach(move |_: i32| {
3893                second_counter.fetch_add(1, StdOrdering::SeqCst);
3894            }),
3895            std::iter::empty::<Sink<i32, NotUsed>>(),
3896            SinkCombineStrategy::Broadcast,
3897        );
3898        assert_eq!(
3899            Source::from_iter(0..100).run_with(combined).unwrap(),
3900            NotUsed
3901        );
3902        // The rendezvous channels guarantee each child has received every
3903        // element before the parent send returns. The child callback may still
3904        // be one scheduler tick behind the parent, so assert with a bounded
3905        // condition wait instead of an immediate load.
3906        assert!(wait_until(StdDuration::from_secs(1), || {
3907            first_count.load(StdOrdering::SeqCst) == 100
3908                && second_count.load(StdOrdering::SeqCst) == 100
3909        }));
3910    }
3911
3912    #[test]
3913    fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3914        // Regression: with eager_complete = false, an empty side left
3915        // `latest = None` forever while the loop drained the other side —
3916        // an infinite busy-loop when that side is unbounded.
3917        assert_eq!(
3918            Source::from_iter(std::iter::empty::<i32>())
3919                .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3920                .run_collect()
3921                .unwrap(),
3922            Vec::<i32>::new()
3923        );
3924        assert_eq!(
3925            Source::repeat(10)
3926                .zip_latest_with(
3927                    Source::from_iter(std::iter::empty::<i32>()),
3928                    false,
3929                    |left, right| left + right,
3930                )
3931                .run_collect()
3932                .unwrap(),
3933            Vec::<i32>::new()
3934        );
3935    }
3936
3937    #[test]
3938    fn zip_family_completion_boundaries_match_expected_results() {
3939        assert_eq!(
3940            Source::from_iter([1, 2, 3])
3941                .zip_with(Source::from_iter([10]), |left, right| left + right)
3942                .run_collect()
3943                .unwrap(),
3944            vec![11]
3945        );
3946
3947        assert_eq!(
3948            Source::from_iter([1, 2, 3])
3949                .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3950                .run_collect()
3951                .unwrap(),
3952            vec![11, 12]
3953        );
3954
3955        assert_eq!(
3956            Source::zip_n([
3957                Source::from_iter([1, 2, 3]),
3958                Source::from_iter([10]),
3959                Source::from_iter([100, 200, 300]),
3960            ])
3961            .run_collect()
3962            .unwrap(),
3963            vec![vec![1, 10, 100]]
3964        );
3965    }
3966
3967    #[test]
3968    fn combine_strategies_follow_merge_concat_and_priority_rules() {
3969        assert_eq!(
3970            Source::combine(
3971                Source::from_iter([1, 2]),
3972                Source::from_iter([10, 11]),
3973                [Source::from_iter([100])],
3974                SourceCombineStrategy::Concat,
3975            )
3976            .run_collect()
3977            .unwrap(),
3978            vec![1, 2, 10, 11, 100]
3979        );
3980
3981        assert_eq!(
3982            Source::combine(
3983                Source::from_iter([1, 2, 3, 4]),
3984                Source::from_iter([10, 11]),
3985                std::iter::empty::<Source<i32, NotUsed>>(),
3986                SourceCombineStrategy::Prioritized {
3987                    priorities: vec![2, 1],
3988                    eager_complete: false,
3989                },
3990            )
3991            .run_collect()
3992            .unwrap(),
3993            vec![1, 2, 10, 3, 4, 11]
3994        );
3995    }
3996
3997    #[test]
3998    fn concat_lazy_defers_follow_on_source_until_needed() {
3999        let source_counter = StdArc::new(StdAtomicUsize::new(0));
4000        let source_counter_clone = StdArc::clone(&source_counter);
4001        let lazy_source = Source::from_materialized_factory(move |_| {
4002            source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
4003            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
4004        });
4005        let source_head = Source::single(1)
4006            .concat_lazy(lazy_source)
4007            .run_with(Sink::head());
4008        assert_eq!(wait(source_head.unwrap()), 1);
4009        assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
4010
4011        let flow_counter = StdArc::new(StdAtomicUsize::new(0));
4012        let flow_counter_clone = StdArc::clone(&flow_counter);
4013        let lazy_flow_source = Source::from_materialized_factory(move |_| {
4014            flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
4015            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
4016        });
4017        let flow_head = Source::single(1)
4018            .via(Flow::identity().concat_lazy(lazy_flow_source))
4019            .run_with(Sink::head());
4020        assert_eq!(wait(flow_head.unwrap()), 1);
4021        assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
4022    }
4023
4024    #[test]
4025    fn also_to_completes_when_side_sink_cancels() {
4026        assert_eq!(
4027            Source::from_iter([1, 2, 3])
4028                .also_to(Sink::cancelled())
4029                .run_collect()
4030                .unwrap(),
4031            Vec::<i32>::new()
4032        );
4033        assert_eq!(
4034            Source::from_iter([1, 2, 3])
4035                .also_to_all([Sink::cancelled(), Sink::cancelled()])
4036                .run_collect()
4037                .unwrap(),
4038            Vec::<i32>::new()
4039        );
4040    }
4041
4042    #[test]
4043    fn also_to_completes_gracefully_when_side_sink_disconnects() {
4044        let result = Source::from_iter(0..100)
4045            .also_to(Sink::head())
4046            .run_collect()
4047            .unwrap();
4048        assert!(!result.is_empty(), "main should emit at least one element");
4049        assert!(
4050            result.len() < 100,
4051            "main should complete early when side disconnects"
4052        );
4053    }
4054
4055    #[test]
4056    fn also_to_propagates_original_error_when_side_is_disconnected() {
4057        let err = StreamError::Failed("distinctive-boom".into());
4058        assert!(matches!(
4059            Source::<i32>::failed(err.clone())
4060                .also_to(Sink::cancelled())
4061                .run_collect(),
4062            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
4063        ));
4064        assert!(matches!(
4065            Source::<i32>::failed(err.clone())
4066                .also_to_all([Sink::cancelled()])
4067                .run_collect(),
4068            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
4069        ));
4070        assert!(matches!(
4071            Source::<i32>::failed(err)
4072                .divert_to(Sink::cancelled(), |_: &i32| true)
4073                .run_collect(),
4074            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
4075        ));
4076    }
4077
4078    #[test]
4079    fn divert_to_routes_matching_elements_to_side_sink() {
4080        let diverted = Source::from_iter([1, 2, 3, 4])
4081            .divert_to(Sink::ignore(), |item| item % 2 == 0)
4082            .run_collect()
4083            .unwrap();
4084        assert_eq!(diverted, vec![1, 3]);
4085    }
4086
4087    #[test]
4088    fn wire_tap_drops_when_side_sink_backpressures() {
4089        let tapped = Source::from_iter([1, 2, 3])
4090            .wire_tap(Sink::head())
4091            .run_collect()
4092            .unwrap();
4093        assert_eq!(tapped, vec![1, 2, 3]);
4094
4095        let tapped_via_flow = Source::from_iter([1, 2, 3])
4096            .via(Flow::identity().wire_tap(Sink::head()))
4097            .run_collect()
4098            .unwrap();
4099        assert_eq!(tapped_via_flow, vec![1, 2, 3]);
4100    }
4101
4102    #[test]
4103    fn async_mapping_variants_complete() {
4104        let ordered = Source::from_iter(0..4)
4105            .map_async(2, |item| async move { Ok(item * 2) })
4106            .run_collect()
4107            .unwrap();
4108        assert_eq!(ordered, vec![0, 2, 4, 6]);
4109
4110        let unordered = Source::from_iter(0..4)
4111            .map_async_unordered(2, |item| async move { Ok(item * 2) })
4112            .run_collect()
4113            .unwrap();
4114        assert_eq!(unordered, vec![0, 2, 4, 6]);
4115
4116        let partitioned = Source::from_iter(0..4)
4117            .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
4118            .run_collect()
4119            .unwrap();
4120        assert_eq!(partitioned, vec![1, 2, 3, 4]);
4121    }
4122
4123    #[test]
4124    fn map_async_ordered_bounds_pulls_behind_stuck_head() {
4125        let pulls = StdArc::new(StdAtomicUsize::new(0));
4126        let pulls_for_source = StdArc::clone(&pulls);
4127        let probe = Source::from_fn_iter(move || {
4128            let pulls = StdArc::clone(&pulls_for_source);
4129            std::iter::from_fn(move || {
4130                let next = pulls.fetch_add(1, StdOrdering::SeqCst);
4131                Some(next)
4132            })
4133        })
4134        .map_async(2, |item| async move {
4135            if item == 0 {
4136                tokio::time::sleep(StdDuration::from_millis(300)).await;
4137            }
4138            Ok(item)
4139        })
4140        .run_with(TestSink::probe())
4141        .unwrap();
4142
4143        probe.request(16);
4144        thread::sleep(StdDuration::from_millis(100));
4145        assert!(
4146            pulls.load(StdOrdering::SeqCst) <= 3,
4147            "pulled {} elements with parallelism=2 behind a stuck ordered head",
4148            pulls.load(StdOrdering::SeqCst)
4149        );
4150    }
4151
4152    #[test]
4153    fn async_mapping_parks_until_woken_future_completes() {
4154        struct WakeOnceFuture {
4155            value: Option<u64>,
4156            ready: StdArc<StdAtomicBool>,
4157            started: bool,
4158            polls: StdArc<StdAtomicUsize>,
4159            latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
4160        }
4161
4162        impl std::future::Future for WakeOnceFuture {
4163            type Output = StreamResult<u64>;
4164
4165            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
4166                let this = self.as_mut().get_mut();
4167                this.polls.fetch_add(1, StdOrdering::SeqCst);
4168                *this.latest_waker.lock().expect("latest wake slot mutex") =
4169                    Some(cx.waker().clone());
4170                if this.ready.load(StdOrdering::SeqCst) {
4171                    return Poll::Ready(Ok(this.value.take().unwrap()));
4172                }
4173
4174                if !this.started {
4175                    this.started = true;
4176                    let ready = StdArc::clone(&this.ready);
4177                    let latest_waker = StdArc::clone(&this.latest_waker);
4178                    thread::spawn(move || {
4179                        thread::sleep(Duration::from_millis(20));
4180                        ready.store(true, StdOrdering::SeqCst);
4181                        if let Some(waker) =
4182                            latest_waker.lock().expect("latest wake slot mutex").take()
4183                        {
4184                            waker.wake();
4185                        }
4186                    });
4187                }
4188                Poll::Pending
4189            }
4190        }
4191
4192        let polls = StdArc::new(StdAtomicUsize::new(0));
4193        let polls_for_stage = StdArc::clone(&polls);
4194        let start = Instant::now();
4195
4196        let values = Source::single(41)
4197            .map_async(1, move |item| WakeOnceFuture {
4198                value: Some(item + 1),
4199                ready: StdArc::new(StdAtomicBool::new(false)),
4200                started: false,
4201                polls: StdArc::clone(&polls_for_stage),
4202                latest_waker: StdArc::new(Mutex::new(None)),
4203            })
4204            .run_collect()
4205            .unwrap();
4206
4207        assert_eq!(values, vec![42]);
4208        let elapsed = start.elapsed();
4209        assert!(
4210            elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
4211            "pending future should park until woken once, elapsed={elapsed:?}"
4212        );
4213        assert!(
4214            polls.load(StdOrdering::SeqCst) < 4096,
4215            "pending future was repolled too aggressively"
4216        );
4217    }
4218
4219    #[test]
4220    fn async_mapping_emits_before_unbounded_upstream_finishes() {
4221        let ordered = Source::repeat(1)
4222            .map_async(2, |item| async move { Ok(item + 1) })
4223            .take(1)
4224            .run_collect()
4225            .unwrap();
4226        assert_eq!(ordered, vec![2]);
4227
4228        let unordered = Source::repeat(1)
4229            .map_async_unordered(2, |item| async move { Ok(item + 1) })
4230            .take(1)
4231            .run_collect()
4232            .unwrap();
4233        assert_eq!(unordered, vec![2]);
4234
4235        let partitioned = Source::repeat(1)
4236            .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
4237            .take(1)
4238            .run_collect()
4239            .unwrap();
4240        assert_eq!(partitioned, vec![2]);
4241    }
4242
4243    #[test]
4244    fn partitioned_async_mapping_limits_same_key_concurrency() {
4245        let active = StdArc::new(StdAtomicUsize::new(0));
4246        let max_active = StdArc::new(StdAtomicUsize::new(0));
4247        let active_for_stage = StdArc::clone(&active);
4248        let max_for_stage = StdArc::clone(&max_active);
4249
4250        let values = Source::from_iter(0..6)
4251            .map_async_partitioned(
4252                4,
4253                1,
4254                |_| 0_u8,
4255                move |item| {
4256                    let active = StdArc::clone(&active_for_stage);
4257                    let max_active = StdArc::clone(&max_for_stage);
4258                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4259                    max_active.fetch_max(current, StdOrdering::SeqCst);
4260                    async move {
4261                        thread::sleep(Duration::from_millis(1));
4262                        active.fetch_sub(1, StdOrdering::SeqCst);
4263                        Ok(item)
4264                    }
4265                },
4266            )
4267            .run_collect()
4268            .unwrap();
4269
4270        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
4271        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4272    }
4273
4274    #[test]
4275    fn partitioned_async_mapping_scans_past_blocked_pending_key() {
4276        let active = StdArc::new(StdAtomicUsize::new(0));
4277        let max_active = StdArc::new(StdAtomicUsize::new(0));
4278        let active_for_stage = StdArc::clone(&active);
4279        let max_for_stage = StdArc::clone(&max_active);
4280        let (release_tx, release_rx) = oneshot::channel::<()>();
4281        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
4282        let release_rx_for_stage = StdArc::clone(&release_rx);
4283        let max_for_release = StdArc::clone(&max_active);
4284
4285        let releaser = thread::spawn(move || {
4286            let deadline = Instant::now() + StdDuration::from_secs(1);
4287            while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
4288                thread::yield_now();
4289            }
4290            let _ = release_tx.send(());
4291        });
4292
4293        let values = Source::from_iter([0, 2, 1])
4294            .map_async_partitioned(
4295                2,
4296                1,
4297                |item| item % 2,
4298                move |item| {
4299                    let active = StdArc::clone(&active_for_stage);
4300                    let max_active = StdArc::clone(&max_for_stage);
4301                    let release_rx = StdArc::clone(&release_rx_for_stage);
4302                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4303                    max_active.fetch_max(current, StdOrdering::SeqCst);
4304                    async move {
4305                        if item == 0 {
4306                            let receiver = release_rx
4307                                .lock()
4308                                .expect("release receiver mutex")
4309                                .take()
4310                                .expect("release receiver present");
4311                            let _ = receiver.await;
4312                        }
4313                        active.fetch_sub(1, StdOrdering::SeqCst);
4314                        Ok(item)
4315                    }
4316                },
4317            )
4318            .run_collect()
4319            .unwrap();
4320        releaser.join().unwrap();
4321
4322        assert_eq!(values, vec![0, 2, 1]);
4323        assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
4324    }
4325
4326    #[test]
4327    fn partitioned_async_mapping_p1_still_evaluates_partition() {
4328        let partitions = StdArc::new(StdAtomicUsize::new(0));
4329        let partitions_for_stage = StdArc::clone(&partitions);
4330
4331        let values = Source::from_iter(0..8)
4332            .map_async_partitioned(
4333                1,
4334                1,
4335                move |item| {
4336                    partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
4337                    item % 2
4338                },
4339                |item| async move { Ok(item + 1) },
4340            )
4341            .run_collect()
4342            .unwrap();
4343
4344        assert_eq!(values, (1..9).collect::<Vec<_>>());
4345        assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
4346    }
4347
4348    #[test]
4349    fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
4350        let active_by_key =
4351            StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4352        let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4353        let active_for_stage = StdArc::clone(&active_by_key);
4354        let max_for_stage = StdArc::clone(&max_by_key);
4355
4356        let values = Source::from_iter(0..512_usize)
4357            .map_async_partitioned(
4358                32,
4359                1,
4360                |item| item % 16,
4361                move |item| {
4362                    let active = StdArc::clone(&active_for_stage);
4363                    let max_active = StdArc::clone(&max_for_stage);
4364                    let key = item % 16;
4365                    let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
4366                    max_active[key].fetch_max(current, StdOrdering::SeqCst);
4367                    async move {
4368                        active[key].fetch_sub(1, StdOrdering::SeqCst);
4369                        Ok(item)
4370                    }
4371                },
4372            )
4373            .run_collect()
4374            .unwrap();
4375
4376        assert_eq!(values, (0..512).collect::<Vec<_>>());
4377        for max_active in max_by_key.iter() {
4378            assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4379        }
4380    }
4381
4382    #[test]
4383    fn error_operators_map_recover_and_complete() {
4384        let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
4385            .map_error(|_| StreamError::Failed("mapped".into()))
4386            .run_collect();
4387        assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
4388
4389        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4390            .recover(|error| match error {
4391                StreamError::Failed(_) => Some(42),
4392                _ => None,
4393            })
4394            .run_collect()
4395            .unwrap();
4396        assert_eq!(recovered, vec![42]);
4397
4398        let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
4399            .recover(|_| None)
4400            .run_collect();
4401        assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
4402
4403        let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
4404            .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
4405            .run_collect()
4406            .unwrap();
4407        assert_eq!(recovered_with, vec![1, 2]);
4408
4409        let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
4410            .recover_with_retries(1, |_| None)
4411            .run_collect();
4412        assert_eq!(
4413            declined_recover_with,
4414            Err(StreamError::Failed("declined".into()))
4415        );
4416
4417        let completed = Source::from_factory(|| {
4418            Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
4419        })
4420        .on_error_complete()
4421        .run_collect()
4422        .unwrap();
4423        assert_eq!(completed, vec![1]);
4424    }
4425
4426    #[test]
4427    fn sliding_matches_akka_window_semantics() {
4428        // Full windows then stop; no spurious trailing partial windows.
4429        assert_eq!(
4430            Source::from_iter(1..=4)
4431                .sliding(3, 1)
4432                .run_collect()
4433                .unwrap(),
4434            vec![vec![1, 2, 3], vec![2, 3, 4]]
4435        );
4436        assert_eq!(
4437            Source::from_iter(1..=4)
4438                .sliding(2, 1)
4439                .run_collect()
4440                .unwrap(),
4441            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
4442        );
4443        // Exact multiple of size leaves nothing to emit at finish.
4444        assert_eq!(
4445            Source::from_iter(1..=3)
4446                .sliding(3, 1)
4447                .run_collect()
4448                .unwrap(),
4449            vec![vec![1, 2, 3]]
4450        );
4451        // Short streams emit a single partial window.
4452        assert_eq!(
4453            Source::from_iter(1..=2)
4454                .sliding(3, 1)
4455                .run_collect()
4456                .unwrap(),
4457            vec![vec![1, 2]]
4458        );
4459        // Every element in its own window.
4460        assert_eq!(
4461            Source::from_iter(1..=3)
4462                .sliding(1, 1)
4463                .run_collect()
4464                .unwrap(),
4465            vec![vec![1], vec![2], vec![3]]
4466        );
4467        // step > size skips the gap between windows.
4468        assert_eq!(
4469            Source::from_iter(1..=6)
4470                .sliding(2, 3)
4471                .run_collect()
4472                .unwrap(),
4473            vec![vec![1, 2], vec![4, 5]]
4474        );
4475        // step > size with an in-progress trailing window is dropped (Akka).
4476        assert_eq!(
4477            Source::from_iter(1..=3)
4478                .sliding(2, 4)
4479                .run_collect()
4480                .unwrap(),
4481            vec![vec![1, 2]]
4482        );
4483    }
4484
4485    #[test]
4486    fn recover_with_retries_indefinitely_like_akka() {
4487        let attempts = StdArc::new(StdAtomicUsize::new(0));
4488        let attempts_in_stage = StdArc::clone(&attempts);
4489        // The first several recovery sources keep failing; `recover_with` must
4490        // retry past Datum's old single-retry limit to reach the good source.
4491        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4492            .recover_with(move |_error| {
4493                if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
4494                    Some(Source::<i32>::failed(StreamError::Failed("again".into())))
4495                } else {
4496                    Some(Source::from_iter([42]))
4497                }
4498            })
4499            .run_collect()
4500            .unwrap();
4501        assert_eq!(recovered, vec![42]);
4502        assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
4503    }
4504
4505    #[test]
4506    fn many_concurrent_streams_do_not_starve_the_pool() {
4507        // Regression: a fixed-size pool deadlocks once `num_cpus` streams each
4508        // monopolize a worker. The growing pool must keep serving new streams.
4509        //
4510        // Use a small fixed count rather than `available_parallelism() + 2`: the
4511        // point is that several concurrent never-completing streams cannot starve
4512        // a fresh one, and a fixed count keeps this from spawning a large number
4513        // of permanent threads (which, when the whole suite runs under load, can
4514        // exhaust the thread limit and wedge the shared executor).
4515        let materializer = Materializer::new();
4516        let busy = 6_usize;
4517
4518        let mut held = Vec::with_capacity(busy);
4519        for _ in 0..busy {
4520            held.push(
4521                Source::single(1_u64)
4522                    .run_with_materializer(Sink::never(), &materializer)
4523                    .unwrap(),
4524            );
4525        }
4526
4527        for _ in 0..400 {
4528            if materializer.active_streams() >= busy {
4529                break;
4530            }
4531            thread::sleep(Duration::from_millis(5));
4532        }
4533        assert_eq!(materializer.active_streams(), busy);
4534
4535        // A fresh finite stream must still complete despite every prior worker
4536        // being occupied by a never-completing sink.
4537        let sum = Source::from_iter(0_u64..5)
4538            .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
4539            .unwrap();
4540        assert_eq!(sum.wait().unwrap(), 10);
4541
4542        materializer.shutdown();
4543        for completion in held {
4544            assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4545        }
4546    }
4547}