Skip to main content

datum/stream/
mod.rs

1//! Core push-stream vocabulary and the first reusable Source/Flow API slice.
2
3use std::{
4    collections::{BTreeMap, HashMap, VecDeque},
5    fmt,
6    future::Future,
7    hash::Hash,
8    marker::PhantomData,
9    panic::{AssertUnwindSafe, catch_unwind},
10    pin::Pin,
11    sync::{
12        Arc, Condvar, Mutex, OnceLock,
13        atomic::{AtomicBool, AtomicUsize, Ordering},
14    },
15    task::{Context, Poll},
16    thread,
17    time::Duration,
18};
19
20use futures::{channel::oneshot, executor::block_on};
21use thiserror::Error;
22use tokio::{
23    runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
24    task::{JoinError, JoinHandle},
25};
26
27pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
28pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
29pub(crate) type RuntimeTransform<In, Out> =
30    Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
31type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
32type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
33const STREAM_READY_SPINS: usize = 256;
34/// Spin-loop hints between consecutive drain-thread readiness checks. Back off
35/// a small block of hints between polls so the spin window still covers the
36/// common fast-completion case without burning a full busy loop before
37/// parking.
38const STREAM_SPIN_BACKOFF: usize = 8;
39const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
40
41/// Private metadata for known finite, synchronous micro-sources.
42/// Set only on Datum-owned constructors whose `next()` is guaranteed
43/// memory-backed and whose total success item count is known at blueprint time.
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45struct InlineMicroSourceHint {
46    max_success_items: usize,
47}
48
49#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
50struct SourceHints {
51    inline_head_terminal: bool,
52    /// Set only for constructors where Datum owns the full iterator and can
53    /// prove it is finite, synchronous, and memory-backed.
54    inline_micro: Option<InlineMicroSourceHint>,
55}
56
57impl SourceHints {
58    const fn with_inline_micro(max_success_items: usize) -> Self {
59        Self {
60            inline_head_terminal: true,
61            inline_micro: Some(InlineMicroSourceHint { max_success_items }),
62        }
63    }
64
65    fn after_flow(self, flow: FlowHints) -> Self {
66        if flow.preserves_inline_head_terminal {
67            // inline_micro is intentionally not propagated through any flow:
68            // even a trivial map wraps the iterator, making eligibility uncertain.
69            Self {
70                inline_head_terminal: true,
71                inline_micro: None,
72            }
73        } else {
74            Self::default()
75        }
76    }
77}
78
79#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
80struct FlowHints {
81    preserves_inline_head_terminal: bool,
82}
83
84impl FlowHints {
85    const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
86        preserves_inline_head_terminal: true,
87    };
88
89    fn then(self, next: Self) -> Self {
90        Self {
91            preserves_inline_head_terminal: self.preserves_inline_head_terminal
92                && next.preserves_inline_head_terminal,
93        }
94    }
95}
96
97struct PartitionSlot<Key, Out> {
98    key: Option<Key>,
99    active: usize,
100    queued: VecDeque<(usize, Out)>,
101    in_ready_queue: bool,
102}
103
104struct AbortOnDropHandle<T> {
105    handle: JoinHandle<T>,
106}
107
108impl<T> AbortOnDropHandle<T> {
109    fn new(handle: JoinHandle<T>) -> Self {
110        Self { handle }
111    }
112}
113
114impl<T> Drop for AbortOnDropHandle<T> {
115    fn drop(&mut self) {
116        self.handle.abort();
117    }
118}
119
120impl<T> Future for AbortOnDropHandle<T> {
121    type Output = Result<T, JoinError>;
122
123    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124        Pin::new(&mut self.handle).poll(cx)
125    }
126}
127
128impl<T> Unpin for AbortOnDropHandle<T> {}
129
130pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
131    static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
132    RUNTIME.get_or_init(|| {
133        TokioRuntimeBuilder::new_multi_thread()
134            .enable_all()
135            .thread_name("datum-stream-tokio")
136            .build()
137            .expect("stream tokio runtime")
138    })
139}
140
141fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
142where
143    Fut: Future<Output = T> + Send + 'static,
144    T: Send + 'static,
145{
146    AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
147}
148
149pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
150    runtime::current_stream_cancelled()
151}
152
153pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
154where
155    F: FnOnce() -> T,
156{
157    catch_unwind(AssertUnwindSafe(f))
158        .map_err(|_| StreamError::Failed(format!("{context} panicked")))
159}
160
161impl<Key, Out> PartitionSlot<Key, Out> {
162    fn new(key: Key) -> Self {
163        Self {
164            key: Some(key),
165            active: 0,
166            queued: VecDeque::new(),
167            in_ready_queue: false,
168        }
169    }
170}
171
172#[inline(always)]
173fn partition_slot_for<Key, Out>(
174    key: Key,
175    slots_by_key: &mut HashMap<Key, usize>,
176    slots: &mut Vec<PartitionSlot<Key, Out>>,
177    free_slots: &mut Vec<usize>,
178) -> usize
179where
180    Key: Clone + Eq + Hash,
181{
182    if let Some(slot) = slots_by_key.get(&key) {
183        return *slot;
184    }
185
186    let slot = if let Some(slot) = free_slots.pop() {
187        let state = &mut slots[slot];
188        state.key = Some(key.clone());
189        state.active = 0;
190        state.queued.clear();
191        state.in_ready_queue = false;
192        slot
193    } else {
194        slots.push(PartitionSlot::new(key.clone()));
195        slots.len() - 1
196    };
197    slots_by_key.insert(key, slot);
198    slot
199}
200
201#[inline(always)]
202fn retire_partition_slot<Key, Out>(
203    slot: usize,
204    slots_by_key: &mut HashMap<Key, usize>,
205    slots: &mut [PartitionSlot<Key, Out>],
206    free_slots: &mut Vec<usize>,
207) where
208    Key: Eq + Hash,
209{
210    let state = &mut slots[slot];
211    if let Some(key) = state.key.take() {
212        slots_by_key.remove(&key);
213    }
214    state.active = 0;
215    state.queued.clear();
216    state.in_ready_queue = false;
217    free_slots.push(slot);
218}
219
220#[inline(always)]
221fn ready_partition_slot<Key, Out>(
222    slots: &mut [PartitionSlot<Key, Out>],
223    ready_slots: &mut VecDeque<usize>,
224    slot: usize,
225    per_partition: usize,
226) {
227    if let Some(state) = slots.get_mut(slot)
228        && state.key.is_some()
229        && !state.in_ready_queue
230        && state.active < per_partition
231        && !state.queued.is_empty()
232    {
233        state.in_ready_queue = true;
234        ready_slots.push_back(slot);
235    }
236}
237
238#[inline(always)]
239fn pop_ready_partition_slot<Key, Out>(
240    slots: &mut [PartitionSlot<Key, Out>],
241    ready_slots: &mut VecDeque<usize>,
242    per_partition: usize,
243) -> Option<(usize, usize, Out)> {
244    while let Some(slot) = ready_slots.pop_front() {
245        let mut requeue = false;
246        let item = if let Some(state) = slots.get_mut(slot) {
247            state.in_ready_queue = false;
248            if state.key.is_some() && state.active < per_partition {
249                let item = state.queued.pop_front().map(|(index, item)| {
250                    state.active += 1;
251                    (index, slot, item)
252                });
253                if !state.queued.is_empty() && state.active < per_partition {
254                    state.in_ready_queue = true;
255                    requeue = true;
256                }
257                item
258            } else {
259                None
260            }
261        } else {
262            None
263        };
264
265        if requeue {
266            ready_slots.push_back(slot);
267        }
268        if item.is_some() {
269            return item;
270        }
271    }
272    None
273}
274
275pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
276    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
277}
278
279struct FnSourceFactory<F>(F);
280
281impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
282where
283    F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
284{
285    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
286        (self.0)(materializer)
287    }
288}
289
290struct MapSourceFactory<In, Out, Mat, F> {
291    source: Arc<dyn SourceFactory<In, Mat>>,
292    stage: F,
293    _marker: PhantomData<fn(In) -> Out>,
294}
295
296impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
297where
298    In: Send + 'static,
299    Out: Send + 'static,
300    Mat: Send + 'static,
301    F: Fn(In) -> Out + Send + Sync + 'static,
302{
303    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
304        let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
305        Ok((
306            Box::new(MapSourceStream {
307                input: stream,
308                factory: self,
309            }),
310            mat,
311        ))
312    }
313}
314
315struct MapSourceStream<In, Out, Mat, F> {
316    input: BoxStream<In>,
317    factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
318}
319
320impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
321where
322    F: Fn(In) -> Out,
323{
324    type Item = StreamResult<Out>;
325
326    fn next(&mut self) -> Option<Self::Item> {
327        self.input
328            .next()
329            .map(|item| item.map(|item| (self.factory.stage)(item)))
330    }
331}
332
333fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
334where
335    Out: Send + 'static,
336{
337    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
338    let mut current = 0usize;
339    Box::new(std::iter::from_fn(move || {
340        loop {
341            let index = next_active_optional_stream(&streams, current, |_| true)?;
342            current = (index + 1) % streams.len().max(1);
343            let Some(stream) = streams[index].as_mut() else {
344                continue;
345            };
346            match stream.next() {
347                Some(item) => return Some(item),
348                None => {
349                    streams[index] = None;
350                    if eager_complete {
351                        return None;
352                    }
353                }
354            }
355        }
356    }))
357}
358
359fn merge_prioritized_streams<Out>(
360    streams: Vec<BoxStream<Out>>,
361    priorities: Vec<usize>,
362    eager_complete: bool,
363) -> BoxStream<Out>
364where
365    Out: Send + 'static,
366{
367    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
368    let schedule: Vec<usize> = priorities
369        .into_iter()
370        .enumerate()
371        .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
372        .collect();
373    let mut schedule_index = 0usize;
374    Box::new(std::iter::from_fn(move || {
375        loop {
376            if streams.iter().all(Option::is_none) {
377                return None;
378            }
379            let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
380            let Some(stream) = streams[index].as_mut() else {
381                continue;
382            };
383            match stream.next() {
384                Some(item) => return Some(item),
385                None => {
386                    streams[index] = None;
387                    if eager_complete {
388                        return None;
389                    }
390                }
391            }
392        }
393    }))
394}
395
396fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
397where
398    Out: Ord + Send + 'static,
399{
400    let mut left_next: Option<Out> = None;
401    let mut right_next: Option<Out> = None;
402    let mut left_done = false;
403    let mut right_done = false;
404    Box::new(std::iter::from_fn(move || {
405        loop {
406            if left_next.is_none() && !left_done {
407                match left.next() {
408                    Some(Ok(item)) => left_next = Some(item),
409                    Some(Err(error)) => return Some(Err(error)),
410                    None => left_done = true,
411                }
412            }
413            if right_next.is_none() && !right_done {
414                match right.next() {
415                    Some(Ok(item)) => right_next = Some(item),
416                    Some(Err(error)) => return Some(Err(error)),
417                    None => right_done = true,
418                }
419            }
420
421            let next = match (&left_next, &right_next) {
422                (Some(left_item), Some(right_item)) => {
423                    if left_item <= right_item {
424                        left_next.take()
425                    } else {
426                        right_next.take()
427                    }
428                }
429                (Some(_), None) if right_done => left_next.take(),
430                (None, Some(_)) if left_done => right_next.take(),
431                (None, None) if left_done && right_done => return None,
432                _ => continue,
433            };
434            if let Some(item) = next {
435                return Some(Ok(item));
436            }
437        }
438    }))
439}
440
441fn merge_latest_streams<Out>(
442    streams: Vec<BoxStream<Out>>,
443    eager_complete: bool,
444) -> BoxStream<Vec<Out>>
445where
446    Out: Clone + Send + 'static,
447{
448    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
449    let mut latest = vec![None; streams.len()];
450    let mut seen = 0usize;
451    let mut current = 0usize;
452    let mut pending = VecDeque::<Vec<Out>>::new();
453    Box::new(std::iter::from_fn(move || {
454        loop {
455            if let Some(output) = pending.pop_front() {
456                return Some(Ok(output));
457            }
458            if streams.iter().all(Option::is_none) {
459                return None;
460            }
461            let index = next_active_optional_stream(&streams, current, |_| true)?;
462            current = (index + 1) % streams.len().max(1);
463            let Some(stream) = streams[index].as_mut() else {
464                continue;
465            };
466            match stream.next() {
467                Some(Ok(item)) => {
468                    if latest[index].is_none() {
469                        seen += 1;
470                    }
471                    latest[index] = Some(item);
472                    if seen == latest.len() {
473                        pending.push_back(
474                            latest
475                                .iter()
476                                .map(|item| item.clone().expect("merge-latest initialized"))
477                                .collect(),
478                        );
479                    }
480                }
481                Some(Err(error)) => return Some(Err(error)),
482                None => {
483                    streams[index] = None;
484                    if eager_complete {
485                        return None;
486                    }
487                }
488            }
489        }
490    }))
491}
492
493fn zip_streams<Left, Right>(
494    mut left: BoxStream<Left>,
495    mut right: BoxStream<Right>,
496) -> BoxStream<(Left, Right)>
497where
498    Left: Send + 'static,
499    Right: Send + 'static,
500{
501    let mut left_next: Option<Left> = None;
502    let mut right_next: Option<Right> = None;
503    let mut left_done = false;
504    let mut right_done = false;
505    Box::new(std::iter::from_fn(move || {
506        loop {
507            if left_next.is_none() && !left_done {
508                match left.next() {
509                    Some(Ok(item)) => left_next = Some(item),
510                    Some(Err(error)) => return Some(Err(error)),
511                    None => left_done = true,
512                }
513            }
514            if right_next.is_none() && !right_done {
515                match right.next() {
516                    Some(Ok(item)) => right_next = Some(item),
517                    Some(Err(error)) => return Some(Err(error)),
518                    None => right_done = true,
519                }
520            }
521            match (left_next.take(), right_next.take()) {
522                (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
523                (left_item, right_item) => {
524                    left_next = left_item;
525                    right_next = right_item;
526                    if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
527                        return None;
528                    }
529                }
530            }
531        }
532    }))
533}
534
535fn zip_latest_with_stream<Left, Right, Out, F>(
536    mut left: BoxStream<Left>,
537    mut right: BoxStream<Right>,
538    eager_complete: bool,
539    combine: Arc<F>,
540) -> BoxStream<Out>
541where
542    Left: Clone + Send + 'static,
543    Right: Clone + Send + 'static,
544    Out: Send + 'static,
545    F: Fn(Left, Right) -> Out + Send + Sync + 'static,
546{
547    let mut left_latest: Option<Left> = None;
548    let mut right_latest: Option<Right> = None;
549    let mut left_done = false;
550    let mut right_done = false;
551    let mut turn_left = true;
552    let mut pending = VecDeque::<Out>::new();
553
554    Box::new(std::iter::from_fn(move || {
555        loop {
556            if let Some(output) = pending.pop_front() {
557                return Some(Ok(output));
558            }
559            if eager_complete && (left_done || right_done) {
560                return None;
561            }
562            if left_done && right_done {
563                return None;
564            }
565            // A side that completed without ever emitting can never be paired:
566            // no further output is possible, so draining the other (possibly
567            // infinite) side would loop forever producing nothing.
568            if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
569                return None;
570            }
571
572            let pull_left = if left_done {
573                false
574            } else if right_done {
575                true
576            } else {
577                let value = turn_left;
578                turn_left = !turn_left;
579                value
580            };
581
582            if pull_left {
583                match left.next() {
584                    Some(Ok(item)) => {
585                        left_latest = Some(item);
586                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
587                            pending.push_back(combine(left_item.clone(), right_item.clone()));
588                        }
589                    }
590                    Some(Err(error)) => return Some(Err(error)),
591                    None => {
592                        left_done = true;
593                        if eager_complete {
594                            return None;
595                        }
596                    }
597                }
598            } else {
599                match right.next() {
600                    Some(Ok(item)) => {
601                        right_latest = Some(item);
602                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
603                            pending.push_back(combine(left_item.clone(), right_item.clone()));
604                        }
605                    }
606                    Some(Err(error)) => return Some(Err(error)),
607                    None => {
608                        right_done = true;
609                        if eager_complete {
610                            return None;
611                        }
612                    }
613                }
614            }
615        }
616    }))
617}
618
619fn zip_all_stream<Left, Right>(
620    mut left: BoxStream<Left>,
621    mut right: BoxStream<Right>,
622    left_fill: Left,
623    right_fill: Right,
624) -> BoxStream<(Left, Right)>
625where
626    Left: Clone + Send + 'static,
627    Right: Clone + Send + 'static,
628{
629    let mut left_done = false;
630    let mut right_done = false;
631    Box::new(std::iter::from_fn(move || {
632        if left_done && right_done {
633            return None;
634        }
635
636        let left_item = if left_done {
637            None
638        } else {
639            match left.next() {
640                Some(Ok(item)) => Some(item),
641                Some(Err(error)) => return Some(Err(error)),
642                None => {
643                    left_done = true;
644                    None
645                }
646            }
647        };
648        let right_item = if right_done {
649            None
650        } else {
651            match right.next() {
652                Some(Ok(item)) => Some(item),
653                Some(Err(error)) => return Some(Err(error)),
654                None => {
655                    right_done = true;
656                    None
657                }
658            }
659        };
660
661        match (left_item, right_item) {
662            (None, None) if left_done && right_done => None,
663            (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
664            (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
665            (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
666            (None, None) => None,
667        }
668    }))
669}
670
671fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
672where
673    Out: Send + 'static,
674    Next: Send + 'static,
675    F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
676{
677    let count = streams.len();
678    if count == 0 {
679        return Box::new(std::iter::empty());
680    }
681    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
682    let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
683    let mut current = 0usize;
684    Box::new(std::iter::from_fn(move || {
685        loop {
686            if slots.iter().all(Option::is_some) {
687                let values = slots
688                    .iter_mut()
689                    .map(|slot| slot.take().expect("zip-n slot filled"))
690                    .collect();
691                return Some(Ok(zipper(values)));
692            }
693
694            let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
695            current = (index + 1) % count.max(1);
696            let Some(stream) = streams[index].as_mut() else {
697                continue;
698            };
699            match stream.next() {
700                Some(Ok(item)) => slots[index] = Some(item),
701                Some(Err(error)) => return Some(Err(error)),
702                None => {
703                    streams[index] = None;
704                    slots[index].as_ref()?;
705                }
706            }
707        }
708    }))
709}
710
711fn next_active_optional_stream<T, F>(
712    streams: &[Option<BoxStream<T>>],
713    current: usize,
714    predicate: F,
715) -> Option<usize>
716where
717    T: Send + 'static,
718    F: Fn(usize) -> bool,
719{
720    if streams.is_empty() {
721        return None;
722    }
723    for offset in 0..streams.len() {
724        let index = (current + offset) % streams.len();
725        if streams[index].is_some() && predicate(index) {
726            return Some(index);
727        }
728    }
729    None
730}
731
732fn next_weighted_stream<T>(
733    streams: &[Option<BoxStream<T>>],
734    schedule: &[usize],
735    schedule_index: &mut usize,
736) -> Option<usize>
737where
738    T: Send + 'static,
739{
740    if streams.is_empty() || schedule.is_empty() {
741        return None;
742    }
743    for _ in 0..schedule.len() {
744        let index = schedule[*schedule_index % schedule.len()];
745        *schedule_index = (*schedule_index + 1) % schedule.len();
746        if streams.get(index).is_some_and(Option::is_some) {
747            return Some(index);
748        }
749    }
750    None
751}
752
753mod completion;
754mod error;
755mod flow;
756mod rate;
757mod restart;
758mod runtime;
759mod sink;
760mod source;
761mod time;
762mod timer;
763
764/// Opaque hook on `Source<T>` for split-segment fast-path sources.
765/// Non-`None` only on `Source`s returned by the split fast path.
766/// Cleared by all composition operators (via/map/to_mat etc.).
767pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
768    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
769}
770
771/// Fast-path descriptor for fold/collect/ignore sinks in split-segment context.
772/// Non-`None` only for recognised sinks (`Sink::fold`, `fold_result`, `collect`, `ignore`).
773pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
774    /// Try to register directly with a split segment hook.
775    /// Returns `None` if the hook type is not recognised.
776    /// Returns `Some(Ok(mat))` on success where `mat` is `Box<StreamCompletion<Acc>>`.
777    fn try_register(
778        &self,
779        hook: Arc<dyn SplitSegmentHookDyn>,
780    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
781}
782
783use self::runtime::runtime_checked_stream;
784
785pub use self::{
786    completion::{Cancellable, StreamCompletion},
787    error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
788    flow::{BidiFlow, Flow},
789    rate::{AggregateTimer, OverflowStrategy},
790    restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
791    runtime::{Materializer, Runtime},
792    sink::{RunnableGraph, Sink, SinkCombineStrategy},
793    source::{Demand, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy},
794    time::{DelayOverflowStrategy, ThrottleMode},
795};
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800    use crate::Attributes;
801    use crate::testkit::TestSink;
802    use std::fs;
803    use std::sync::{
804        Arc as StdArc,
805        atomic::{
806            AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
807        },
808        mpsc,
809    };
810    use std::time::Duration as StdDuration;
811    use std::time::Instant;
812
813    fn wait<T>(completion: StreamCompletion<T>) -> T {
814        completion.wait().unwrap()
815    }
816
817    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
818        let deadline = Instant::now() + timeout;
819        while Instant::now() < deadline {
820            if condition() {
821                return true;
822            }
823            thread::sleep(Duration::from_millis(2));
824        }
825        condition()
826    }
827
828    fn linux_thread_count(thread_name: &str) -> usize {
829        fs::read_dir("/proc/self/task")
830            .expect("task directory readable")
831            .filter_map(Result::ok)
832            .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
833            .filter(|name| name.trim() == thread_name)
834            .count()
835    }
836
837    #[test]
838    fn source_blueprints_are_reusable() {
839        let source = Source::from_iter(0..5).map(|item| item + 1);
840
841        assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
842        assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
843    }
844
845    #[test]
846    fn source_map_preserves_materialized_value() {
847        let graph = Source::single(1)
848            .map_materialized_value(|_| "source")
849            .map(|item| item + 1)
850            .to_mat(Sink::head(), Keep::both);
851
852        let materialized = graph.run().unwrap();
853        assert_eq!(materialized.0, "source");
854        assert_eq!(wait(materialized.1), 2);
855    }
856
857    #[test]
858    fn source_and_flow_compose() {
859        let flow = Flow::identity()
860            .map(|item: i32| item * 2)
861            .filter(|item| item % 3 == 0);
862
863        let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
864
865        assert_eq!(result, vec![0, 6, 12]);
866    }
867
868    #[test]
869    fn sink_setup_sees_materializer_defaults_and_local_attributes() {
870        let observed = StdArc::new(Mutex::new(None));
871        let observed_in_setup = StdArc::clone(&observed);
872        let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
873            *observed_in_setup.lock().unwrap() = Some((
874                attrs.name().map(str::to_owned),
875                attrs.input_buffer_hint(),
876                attrs.dispatcher_hint().map(str::to_owned),
877            ));
878            Sink::ignore()
879        })
880        .add_attributes(Attributes::named("sink-inner"))
881        .add_attributes(Attributes::input_buffer(4, 4))
882        .add_attributes(Attributes::dispatcher("bench-dispatcher"));
883
884        let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
885        wait(
886            Source::from_iter([1, 2, 3])
887                .run_with_materializer(sink, &materializer)
888                .unwrap(),
889        );
890
891        assert_eq!(
892            *observed.lock().unwrap(),
893            Some((
894                Some("sink-inner".to_owned()),
895                Some((4, 4)),
896                Some("bench-dispatcher".to_owned())
897            ))
898        );
899    }
900
901    #[test]
902    fn sink_pre_materialize_feeds_existing_materialization() {
903        let materializer = Materializer::new();
904        let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
905            .pre_materialize(&materializer)
906            .unwrap();
907
908        Source::from_iter([1, 2, 3])
909            .run_with_materializer(pre, &materializer)
910            .unwrap();
911
912        assert_eq!(wait(completion), vec![1, 2, 3]);
913    }
914
915    #[test]
916    fn flow_from_sink_and_source_connects_both_sides() {
917        assert_eq!(
918            Source::from_iter([1, 2, 3])
919                .via(Flow::from_sink_and_source(
920                    Sink::foreach(|_item: i32| {}),
921                    Source::from_iter([10, 20, 30]),
922                ))
923                .run_collect()
924                .unwrap(),
925            vec![10, 20, 30]
926        );
927    }
928
929    #[test]
930    fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
931        let completed = StdArc::new(StdAtomicBool::new(false));
932        let on_complete = StdArc::clone(&completed);
933        let flow = Flow::from_sink_and_source(
934            Sink::on_complete(move || {
935                on_complete.store(true, StdOrdering::SeqCst);
936            }),
937            Source::single(10),
938        );
939
940        let result = Source::from_iter([1, 2, 3])
941            .via(flow)
942            .run_collect()
943            .unwrap();
944
945        assert_eq!(result, vec![10]);
946        assert!(wait_until(StdDuration::from_secs(1), || {
947            completed.load(StdOrdering::SeqCst)
948        }));
949    }
950
951    #[test]
952    fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
953        let cancellable = StdArc::new(Mutex::new(None));
954        let observed = StdArc::clone(&cancellable);
955        let flow = Flow::from_sink_and_source_coupled(
956            Sink::ignore(),
957            Source::tick(
958                StdDuration::from_millis(50),
959                StdDuration::from_millis(50),
960                10,
961            )
962            .map_materialized_value(move |handle| {
963                *observed.lock().unwrap() = Some(handle.clone());
964                handle
965            }),
966        );
967
968        let completion = Source::from_iter(std::iter::empty::<i32>())
969            .via(flow)
970            .run_with(Sink::ignore())
971            .unwrap();
972        assert!(wait_until(StdDuration::from_secs(1), || {
973            cancellable
974                .lock()
975                .unwrap()
976                .as_ref()
977                .is_some_and(Cancellable::is_cancelled)
978        }));
979        assert_eq!(wait(completion), NotUsed);
980    }
981
982    #[test]
983    fn bidi_flow_join_and_atop_compose() {
984        let codec = BidiFlow::from_flows(
985            Flow::identity().map(|item: i32| item + 1),
986            Flow::identity().map(|item: i32| item * 2),
987        )
988        .named("codec");
989        let framing = BidiFlow::from_flows(
990            Flow::identity().map(|item: i32| item * 3),
991            Flow::identity().map(|item: i32| item - 4),
992        );
993
994        let joined = codec
995            .clone()
996            .join(Flow::identity().map(|item: i32| item - 5));
997        let stacked = codec.atop(framing).join(Flow::identity());
998
999        assert_eq!(
1000            Source::single(10).via(joined).run_collect().unwrap(),
1001            vec![12]
1002        );
1003        assert_eq!(
1004            Source::single(10).via(stacked).run_collect().unwrap(),
1005            vec![58]
1006        );
1007    }
1008
1009    #[test]
1010    fn flow_buffer_then_map_runs_end_to_end() {
1011        let flow = Flow::identity()
1012            .buffer(8, OverflowStrategy::Backpressure)
1013            .map(|item: i32| item + 1);
1014
1015        let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1016
1017        assert_eq!(result, vec![1, 2, 3, 4]);
1018    }
1019
1020    #[test]
1021    fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1022        fn buffered_flow() -> Flow<i32, i32> {
1023            Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1024        }
1025
1026        assert_eq!(
1027            Source::from_iter(0..4)
1028                .via(buffered_flow().filter(|item| *item % 2 == 0))
1029                .run_collect()
1030                .unwrap(),
1031            vec![0, 2]
1032        );
1033        assert_eq!(
1034            Source::from_iter(0..4)
1035                .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1036                .run_collect()
1037                .unwrap(),
1038            vec![1, 3]
1039        );
1040        assert_eq!(
1041            Source::from_iter(0..4)
1042                .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1043                .run_collect()
1044                .unwrap(),
1045            vec![10, 12]
1046        );
1047        assert_eq!(
1048            Source::from_iter(0..3)
1049                .via(buffered_flow().map_concat(|item| [item, item + 10]))
1050                .run_collect()
1051                .unwrap(),
1052            vec![0, 10, 1, 11, 2, 12]
1053        );
1054        assert_eq!(
1055            Source::from_iter(0..3)
1056                .via(buffered_flow().stateful_map(5, |state, item| {
1057                    *state += item;
1058                    *state
1059                }))
1060                .run_collect()
1061                .unwrap(),
1062            vec![5, 6, 8]
1063        );
1064        assert_eq!(
1065            Source::from_iter(0..3)
1066                .via(buffered_flow().stateful_map_concat(0, |state, item| {
1067                    *state += item;
1068                    [*state, item]
1069                }))
1070                .run_collect()
1071                .unwrap(),
1072            vec![0, 0, 1, 1, 3, 2]
1073        );
1074        assert_eq!(
1075            Source::from_iter(0..4)
1076                .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1077                .run_collect()
1078                .unwrap(),
1079            vec![1, 2, 3, 4]
1080        );
1081        assert_eq!(
1082            Source::from_iter(0..4)
1083                .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1084                .run_collect()
1085                .unwrap(),
1086            vec![1, 2, 3, 4]
1087        );
1088        assert_eq!(
1089            Source::from_iter(0..4)
1090                .via(buffered_flow().map_async_partitioned(
1091                    2,
1092                    1,
1093                    |item| item % 2,
1094                    |item| async move { Ok(item + 1) },
1095                ))
1096                .run_collect()
1097                .unwrap(),
1098            vec![1, 2, 3, 4]
1099        );
1100        assert_eq!(
1101            Source::from_iter(0..5)
1102                .via(buffered_flow().take(3))
1103                .run_collect()
1104                .unwrap(),
1105            vec![0, 1, 2]
1106        );
1107        assert_eq!(
1108            Source::from_iter(0..5)
1109                .via(buffered_flow().drop(2))
1110                .run_collect()
1111                .unwrap(),
1112            vec![2, 3, 4]
1113        );
1114        assert_eq!(
1115            Source::from_iter(0..5)
1116                .via(buffered_flow().take_while(|item| *item < 3))
1117                .run_collect()
1118                .unwrap(),
1119            vec![0, 1, 2]
1120        );
1121        assert_eq!(
1122            Source::from_iter(0..5)
1123                .via(buffered_flow().drop_while(|item| *item < 3))
1124                .run_collect()
1125                .unwrap(),
1126            vec![3, 4]
1127        );
1128        assert_eq!(
1129            Source::from_iter(0..3)
1130                .via(buffered_flow().limit(5))
1131                .run_collect()
1132                .unwrap(),
1133            vec![0, 1, 2]
1134        );
1135        assert_eq!(
1136            Source::from_iter(0..5)
1137                .via(buffered_flow().grouped(2))
1138                .run_collect()
1139                .unwrap(),
1140            vec![vec![0, 1], vec![2, 3], vec![4]]
1141        );
1142        assert_eq!(
1143            Source::from_iter(1..=3)
1144                .via(buffered_flow().scan(0, |acc, item| acc + item))
1145                .run_collect()
1146                .unwrap(),
1147            vec![0, 1, 3, 6]
1148        );
1149        assert_eq!(
1150            Source::from_iter(1..=4)
1151                .via(buffered_flow().sliding(2, 1))
1152                .run_collect()
1153                .unwrap(),
1154            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1155        );
1156        assert_eq!(
1157            Source::from_iter(1..=4)
1158                .via(buffered_flow().fold(0, |acc, item| acc + item))
1159                .run_collect()
1160                .unwrap(),
1161            vec![10]
1162        );
1163        assert_eq!(
1164            Source::from_iter(1..=4)
1165                .via(buffered_flow().reduce(|acc, item| acc + item))
1166                .run_collect()
1167                .unwrap(),
1168            vec![10]
1169        );
1170        assert_eq!(
1171            Source::from_factory(|| {
1172                Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1173            })
1174            .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1175            .run_collect(),
1176            Err(StreamError::Failed("mapped".into()))
1177        );
1178        assert_eq!(
1179            Source::<i32>::failed(StreamError::Failed("boom".into()))
1180                .via(buffered_flow().recover(|_| Some(42)))
1181                .run_collect()
1182                .unwrap(),
1183            vec![42]
1184        );
1185        assert_eq!(
1186            Source::<i32>::failed(StreamError::Failed("boom".into()))
1187                .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1188                .run_collect()
1189                .unwrap(),
1190            vec![7, 8]
1191        );
1192        assert_eq!(
1193            Source::<i32>::failed(StreamError::Failed("boom".into()))
1194                .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1195                .run_collect()
1196                .unwrap(),
1197            vec![9]
1198        );
1199        assert_eq!(
1200            Source::from_factory(|| {
1201                Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1202            })
1203            .via(buffered_flow().on_error_complete())
1204            .run_collect()
1205            .unwrap(),
1206            vec![1]
1207        );
1208
1209        let materialized = Source::from_iter([1, 2, 3])
1210            .run_with(
1211                buffered_flow()
1212                    .via(Flow::identity().map(|item| item + 1))
1213                    .map_materialized_value(|_| "buffered-flow")
1214                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1215            )
1216            .unwrap();
1217        assert_eq!(materialized.0, "buffered-flow");
1218        assert_eq!(wait(materialized.1), 9);
1219
1220        let kept = Source::from_iter([1, 2, 3])
1221            .run_with(
1222                buffered_flow()
1223                    .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1224                    .to(Sink::fold(0, |acc, item| acc + item)),
1225            )
1226            .unwrap();
1227        assert_eq!(kept, "combined");
1228    }
1229
1230    #[test]
1231    fn runtime_rate_flows_compose_in_flow_form() {
1232        let conflate = Flow::identity()
1233            .conflate(|left: i32, right| left + right)
1234            .map(|item| item + 1);
1235        assert_eq!(
1236            Source::single(4).via(conflate).run_collect().unwrap(),
1237            vec![5]
1238        );
1239
1240        let batch = Flow::identity()
1241            .batch(4, |item: i32| item, |left, right| left + right)
1242            .map(|item| item + 1);
1243        assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1244
1245        let expand = Flow::identity()
1246            .expand(std::iter::once::<i32>)
1247            .map(|item| item + 1);
1248        assert_eq!(
1249            Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1250            vec![1, 2, 3, 4]
1251        );
1252
1253        let aggregate = Flow::identity()
1254            .aggregate_with_boundary(
1255                Vec::<i32>::new,
1256                |mut items, item| {
1257                    items.push(item);
1258                    let ready = !items.is_empty();
1259                    (items, ready)
1260                },
1261                |items| items.into_iter().sum::<i32>(),
1262                None,
1263            )
1264            .map(|item| item + 1);
1265        assert_eq!(
1266            Source::from_iter(0..4)
1267                .via(aggregate)
1268                .run_collect()
1269                .unwrap(),
1270            vec![1, 2, 3, 4]
1271        );
1272
1273        let detached = Flow::identity().detach().map(|item: i32| item + 1);
1274        assert_eq!(
1275            Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1276            vec![1, 2, 3, 4]
1277        );
1278    }
1279
1280    #[test]
1281    fn high_use_source_flow_operators_work() {
1282        let result = Source::from_iter(0..8)
1283            .drop(1)
1284            .take(5)
1285            .filter_not(|item| item % 2 == 0)
1286            .map_concat(|item| [item, item + 10])
1287            .grouped(3)
1288            .run_collect()
1289            .unwrap();
1290
1291        assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1292    }
1293
1294    #[test]
1295    fn prefix_and_tail_emits_prefix_and_live_tail() {
1296        let mut outer = Source::from_iter(0..5)
1297            .prefix_and_tail(2)
1298            .run_collect()
1299            .unwrap();
1300        assert_eq!(outer.len(), 1);
1301        let (prefix, tail) = outer.pop().unwrap();
1302        assert_eq!(prefix, vec![0, 1]);
1303        assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1304        assert_eq!(
1305            tail.run_collect(),
1306            Err(StreamError::Failed(
1307                "substream source cannot be materialized more than once".into()
1308            ))
1309        );
1310    }
1311
1312    #[test]
1313    fn prefix_and_tail_fails_before_prefix_is_ready() {
1314        let result = Source::from_factory(|| {
1315            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1316        })
1317        .prefix_and_tail(2)
1318        .run_collect();
1319        assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1320    }
1321
1322    #[test]
1323    fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1324        let mut outer = Source::from_factory(|| {
1325            Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1326        })
1327        .prefix_and_tail(2)
1328        .run_collect()
1329        .unwrap();
1330        let (prefix, tail) = outer.pop().unwrap();
1331        assert_eq!(prefix, vec![1, 2]);
1332        assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1333    }
1334
1335    #[test]
1336    fn prefix_and_tail_accepts_non_clone_elements() {
1337        #[derive(Debug, PartialEq, Eq)]
1338        struct NonClone(u8);
1339
1340        let mut outer = Source::from_factory(|| {
1341            Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1342        })
1343        .prefix_and_tail(2)
1344        .run_collect()
1345        .unwrap();
1346        let (prefix, tail) = outer.pop().unwrap();
1347        assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1348        assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1349    }
1350
1351    #[test]
1352    fn flat_map_prefix_materializes_on_short_upstream_completion() {
1353        let values = Source::from_iter([1, 2])
1354            .flat_map_prefix(3, |prefix| {
1355                let sum = prefix.into_iter().sum::<i32>();
1356                Flow::identity().prepend(Source::single(sum))
1357            })
1358            .run_collect()
1359            .unwrap();
1360        assert_eq!(values, vec![3]);
1361    }
1362
1363    #[test]
1364    fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1365        let invoked = StdArc::new(StdAtomicBool::new(false));
1366        let invoked_for_stage = StdArc::clone(&invoked);
1367        let result = Source::from_factory(|| {
1368            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1369        })
1370        .flat_map_prefix(3, move |_prefix| {
1371            invoked_for_stage.store(true, StdOrdering::SeqCst);
1372            Flow::identity()
1373        })
1374        .run_collect();
1375        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1376        assert!(!invoked.load(StdOrdering::SeqCst));
1377    }
1378
1379    #[test]
1380    fn flat_map_concat_flattens_nested_sources_sequentially() {
1381        let values = Source::from_iter([1, 2, 3])
1382            .flat_map_concat(|item| Source::from_iter(0..item))
1383            .run_collect()
1384            .unwrap();
1385        assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1386    }
1387
1388    #[test]
1389    fn flat_map_merge_respects_breadth_bound() {
1390        let active = StdArc::new(StdAtomicUsize::new(0));
1391        let max_active = StdArc::new(StdAtomicUsize::new(0));
1392        let active_for_stage = StdArc::clone(&active);
1393        let max_for_stage = StdArc::clone(&max_active);
1394
1395        let mut values = Source::from_iter(0..6)
1396            .flat_map_merge(2, move |item| {
1397                let active = StdArc::clone(&active_for_stage);
1398                let max_active = StdArc::clone(&max_for_stage);
1399                Source::future(move || {
1400                    let active = StdArc::clone(&active);
1401                    let max_active = StdArc::clone(&max_active);
1402                    async move {
1403                        let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1404                        loop {
1405                            let seen = max_active.load(StdOrdering::SeqCst);
1406                            if now <= seen {
1407                                break;
1408                            }
1409                            if max_active
1410                                .compare_exchange(
1411                                    seen,
1412                                    now,
1413                                    StdOrdering::SeqCst,
1414                                    StdOrdering::SeqCst,
1415                                )
1416                                .is_ok()
1417                            {
1418                                break;
1419                            }
1420                        }
1421                        thread::sleep(StdDuration::from_millis(20));
1422                        active.fetch_sub(1, StdOrdering::SeqCst);
1423                        Ok(item)
1424                    }
1425                })
1426            })
1427            .run_collect()
1428            .unwrap();
1429        values.sort_unstable();
1430        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1431        assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1432    }
1433
1434    #[test]
1435    fn flat_map_merge_propagates_inner_failures() {
1436        let result = Source::from_iter([0, 1, 2])
1437            .flat_map_merge(2, |item| {
1438                if item == 1 {
1439                    Source::failed(StreamError::Failed("boom".into()))
1440                } else {
1441                    Source::single(item)
1442                }
1443            })
1444            .run_collect();
1445        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1446    }
1447
1448    #[test]
1449    fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1450        let (release_tx, release_rx) = mpsc::channel();
1451        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1452        let queue = Source::from_factory(move || {
1453            let release_rx = StdArc::clone(&release_rx);
1454            let mut step = 0_u8;
1455            Box::new(std::iter::from_fn(move || {
1456                let item = match step {
1457                    0 => Some(Ok(0)),
1458                    1 => {
1459                        release_rx
1460                            .lock()
1461                            .unwrap()
1462                            .as_ref()
1463                            .expect("release receiver available")
1464                            .recv_timeout(StdDuration::from_secs(1))
1465                            .expect("timed out waiting to release second upstream element");
1466                        Some(Ok(1))
1467                    }
1468                    _ => None,
1469                };
1470                step += 1;
1471                item
1472            }))
1473        })
1474        .flat_map_merge(2, |item| Source::single(item + 10))
1475        .run_with(Sink::queue())
1476        .unwrap();
1477
1478        assert_eq!(queue.pull().unwrap(), Some(10));
1479        release_tx.send(()).unwrap();
1480        assert_eq!(queue.pull().unwrap(), Some(11));
1481        assert!(queue.pull().unwrap().is_none());
1482    }
1483
1484    #[test]
1485    fn group_by_routes_keys_and_drops_closed_keys() {
1486        let outer = Source::from_iter([0, 1, 2, 3, 4])
1487            .group_by(4, |item| item % 2, false)
1488            .run_with(Sink::queue())
1489            .unwrap();
1490
1491        let even = outer.pull().unwrap().unwrap();
1492        let even_completion = even.run_with(Sink::ignore()).unwrap();
1493        let odd = outer.pull().unwrap().unwrap();
1494        drop(even_completion);
1495
1496        assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1497        assert!(outer.pull().unwrap().is_none());
1498    }
1499
1500    #[test]
1501    fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1502        let outer = Source::from_iter([0, 1, 2])
1503            .group_by(2, |item| *item, false)
1504            .run_with(Sink::queue())
1505            .unwrap();
1506
1507        let _ = outer.pull().unwrap().unwrap();
1508        let _ = outer.pull().unwrap().unwrap();
1509        assert!(matches!(
1510            outer.pull(),
1511            Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1512        ));
1513    }
1514
1515    #[test]
1516    fn group_by_can_recreate_closed_substreams_when_enabled() {
1517        let (release_tx, release_rx) = mpsc::channel();
1518        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1519        let outer = Source::from_factory(move || {
1520            let release_rx = StdArc::clone(&release_rx);
1521            let mut step = 0_u8;
1522            Box::new(std::iter::from_fn(move || {
1523                let item = match step {
1524                    0 => Some(Ok(0)),
1525                    1 => Some(Ok(1)),
1526                    2 => {
1527                        release_rx
1528                            .lock()
1529                            .unwrap()
1530                            .as_ref()
1531                            .expect("release receiver available")
1532                            .recv_timeout(StdDuration::from_secs(1))
1533                            .expect("timed out waiting to release recreated key");
1534                        Some(Ok(0))
1535                    }
1536                    _ => None,
1537                };
1538                step += 1;
1539                item
1540            }))
1541        })
1542        .group_by(4, |item| item % 2, true)
1543        .run_with(Sink::queue())
1544        .unwrap();
1545
1546        let even = outer.pull().unwrap().unwrap();
1547        assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1548        release_tx.send(()).unwrap();
1549
1550        let odd = outer.pull().unwrap().unwrap();
1551        assert_eq!(odd.run_collect().unwrap(), vec![1]);
1552
1553        let recreated_even = outer.pull().unwrap().unwrap();
1554        assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1555        assert!(outer.pull().unwrap().is_none());
1556    }
1557
1558    #[test]
1559    fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1560        let outer = Source::from_iter([0, 1])
1561            .group_by(
1562                4,
1563                |item| {
1564                    assert_ne!(*item, 1, "boom");
1565                    item % 2
1566                },
1567                false,
1568            )
1569            .run_with(Sink::queue())
1570            .unwrap();
1571
1572        let substream = outer.pull().unwrap().unwrap();
1573        let (result_tx, result_rx) = mpsc::channel();
1574        thread::spawn(move || {
1575            let _ = result_tx.send(substream.run_collect());
1576        });
1577
1578        assert_eq!(
1579            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1580            Err(StreamError::AbruptTermination)
1581        );
1582        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1583    }
1584
1585    #[test]
1586    fn split_when_starts_new_substream_on_boundary_element() {
1587        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1588            .split_when(|item| *item == 0)
1589            .run_with(Sink::queue())
1590            .unwrap();
1591
1592        let first = outer.pull().unwrap().unwrap();
1593        assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1594        let second = outer.pull().unwrap().unwrap();
1595        assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1596        let third = outer.pull().unwrap().unwrap();
1597        assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1598        assert!(outer.pull().unwrap().is_none());
1599    }
1600
1601    #[test]
1602    fn split_after_ends_current_substream_on_boundary_element() {
1603        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1604            .split_after(|item| *item == 0)
1605            .run_with(Sink::queue())
1606            .unwrap();
1607
1608        let first = outer.pull().unwrap().unwrap();
1609        assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1610        let second = outer.pull().unwrap().unwrap();
1611        assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1612        let third = outer.pull().unwrap().unwrap();
1613        assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1614        assert!(outer.pull().unwrap().is_none());
1615    }
1616
1617    #[test]
1618    fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1619        let outer = Source::from_iter([1, 2])
1620            .split_when(|item| {
1621                assert_ne!(*item, 2, "boom");
1622                false
1623            })
1624            .run_with(Sink::queue())
1625            .unwrap();
1626
1627        let substream = outer.pull().unwrap().unwrap();
1628        let (result_tx, result_rx) = mpsc::channel();
1629        thread::spawn(move || {
1630            let _ = result_tx.send(substream.run_collect());
1631        });
1632
1633        assert_eq!(
1634            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1635            Err(StreamError::AbruptTermination)
1636        );
1637        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1638    }
1639
1640    #[test]
1641    fn split_when_pre_buffer_segments_match_expected_count() {
1642        let outer = Source::from_iter(0..100)
1643            .split_when(|item| *item != 0 && *item % 10 == 0)
1644            .run_with(Sink::queue())
1645            .unwrap();
1646        let mut segment_count = 0;
1647        while let Some(substream) = outer.pull().unwrap() {
1648            let items: Vec<i32> = substream.run_collect().unwrap();
1649            assert!(!items.is_empty(), "segment should not be empty");
1650            segment_count += 1;
1651        }
1652        assert_eq!(segment_count, 10, "100 elements in segments of 10");
1653    }
1654
1655    #[test]
1656    fn split_after_pre_buffer_segments_match_expected_count() {
1657        let outer = Source::from_iter(0..100)
1658            .split_after(|item| (*item + 1) % 10 == 0)
1659            .run_with(Sink::queue())
1660            .unwrap();
1661        let mut segment_count = 0;
1662        let mut total = 0_i32;
1663        while let Some(substream) = outer.pull().unwrap() {
1664            let items: Vec<i32> = substream.run_collect().unwrap();
1665            assert!(!items.is_empty(), "segment should not be empty");
1666            total += items.len() as i32;
1667            segment_count += 1;
1668        }
1669        assert_eq!(segment_count, 10);
1670        assert_eq!(total, 100);
1671    }
1672
1673    #[test]
1674    fn group_by_single_key_fused_matches_general_path() {
1675        let outer = Source::from_iter(0..1000i64)
1676            .group_by(1, |_| 0u8, false)
1677            .run_with(Sink::queue())
1678            .unwrap();
1679        let substream = outer.pull().unwrap().unwrap();
1680        let items: Vec<i64> = substream.run_collect().unwrap();
1681        assert_eq!(items.len(), 1000);
1682        assert_eq!(items[0], 0);
1683        assert_eq!(items[999], 999);
1684        assert!(outer.pull().unwrap().is_none());
1685    }
1686
1687    #[test]
1688    fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
1689        let outer = Source::from_iter([0, 1, 0])
1690            .group_by(2, |item| *item, false)
1691            .run_with(Sink::queue())
1692            .unwrap();
1693        let mut sources = vec![];
1694        while let Some(source) = outer.pull().unwrap() {
1695            sources.push(source);
1696        }
1697        assert_eq!(sources.len(), 2);
1698        assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
1699        assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
1700    }
1701
1702    #[test]
1703    fn flat_map_merge_lock_lighter_matches_expected_count() {
1704        let items = Source::from_iter(0..20)
1705            .flat_map_merge(2, |item| Source::single(item + 100))
1706            .run_with(Sink::queue())
1707            .unwrap();
1708        let mut count = 0;
1709        while items.pull().unwrap().is_some() {
1710            count += 1;
1711        }
1712        assert_eq!(count, 20);
1713    }
1714
1715    // Verify that group_by emits the substream BEFORE upstream completes —
1716    // i.e., the worker does not buffer all elements before delivery.
1717    //
1718    // The test uses a rendezvous channel to synchronize: the group_by worker
1719    // will block waiting for more items (the channel is empty after item 0),
1720    // so if the outer substream is NOT emitted before item 1 arrives, the
1721    // `outer.pull()` call in a separate thread cannot return during that window.
1722    // With the old "single_key_buf" batching, pull() would block until a
1723    // key-change or EOF — which is a liveness bug for infinite or slow sources.
1724    #[test]
1725    fn group_by_single_key_emits_substream_before_upstream_completes() {
1726        // sync_channel(0): rendezvous — sender blocks until receiver is ready,
1727        // guaranteeing the worker processes exactly one item then waits.
1728        let (tx, rx) = mpsc::sync_channel::<i32>(0);
1729        let rx = StdArc::new(std::sync::Mutex::new(rx));
1730
1731        let outer = Source::from_factory({
1732            let rx = StdArc::clone(&rx);
1733            move || {
1734                let rx = StdArc::clone(&rx);
1735                Box::new(std::iter::from_fn(move || {
1736                    rx.lock().unwrap().recv().ok().map(Ok)
1737                })) as BoxStream<i32>
1738            }
1739        })
1740        .group_by(1, |_| 0u8, false)
1741        .run_with(Sink::queue())
1742        .unwrap();
1743
1744        // Kick off the producer: sends item 0, then waits for the outer
1745        // substream to be pulled (via a second channel) before sending more.
1746        let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
1747        let outer_thread = thread::spawn(move || {
1748            let substream = outer.pull().unwrap().expect("expected a substream");
1749            sub_tx.send(substream).unwrap();
1750        });
1751
1752        // Send the first item — the worker processes it and (with the fix)
1753        // immediately emits the substream to outer.
1754        tx.send(0).unwrap();
1755
1756        // The outer pull must complete within the timeout.
1757        let substream = sub_rx
1758            .recv_timeout(StdDuration::from_secs(5))
1759            .expect("timed out — group_by buffered first element before emitting substream");
1760
1761        // Now deliver the remaining items and close.
1762        for i in 1..100_i32 {
1763            tx.send(i).unwrap();
1764        }
1765        drop(tx);
1766
1767        let items: Vec<i32> = substream.run_collect().unwrap();
1768        assert_eq!(items.len(), 100);
1769        outer_thread.join().unwrap();
1770    }
1771
1772    // Verify that split_when emits each substream as a live stream, NOT after
1773    // accumulating all segment elements into memory.  The channel is sized to
1774    // be larger than LIVE_SUBSTREAM_CAPACITY (256) so that, if the old Vec
1775    // pre-buffering path were active, the first substream would never be emitted
1776    // before the boundary.  With live substreams the substream is visible
1777    // immediately upon the first item.
1778    #[test]
1779    fn split_when_emits_substream_before_segment_ends() {
1780        // 512 > LIVE_SUBSTREAM_CAPACITY (256): the live substream will block
1781        // on backpressure mid-segment, but that's fine — the outer substream
1782        // IS emitted immediately and the consumer can drain it concurrently.
1783        const SEGMENT_LEN: usize = 300;
1784
1785        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
1786        for i in 0..SEGMENT_LEN as i32 {
1787            tx.send(i).unwrap();
1788        }
1789        tx.send(-1).unwrap(); // boundary
1790        tx.send(99).unwrap(); // second segment
1791        drop(tx);
1792
1793        let outer = Source::from_iter(rx)
1794            .split_when(|item| *item == -1)
1795            .run_with(Sink::queue())
1796            .unwrap();
1797
1798        let (result_tx, result_rx) = mpsc::channel();
1799        thread::spawn(move || {
1800            let first = outer.pull().unwrap().expect("expected first substream");
1801            let items: Vec<i32> = first.run_collect().unwrap();
1802            let second = outer.pull().unwrap().expect("expected second substream");
1803            let items2: Vec<i32> = second.run_collect().unwrap();
1804            let done = outer.pull().unwrap().is_none();
1805            let _ = result_tx.send((items, items2, done));
1806        });
1807
1808        let (items, items2, done) = result_rx
1809            .recv_timeout(StdDuration::from_secs(5))
1810            .expect("timed out — split_when is buffering the whole segment");
1811        assert_eq!(items.len(), SEGMENT_LEN);
1812        assert_eq!(items2, vec![-1, 99]);
1813        assert!(done);
1814    }
1815
1816    #[test]
1817    fn split_after_emits_substream_before_segment_ends() {
1818        const SEGMENT_LEN: usize = 300;
1819
1820        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
1821        for i in 0..SEGMENT_LEN as i32 {
1822            tx.send(i).unwrap();
1823        }
1824        tx.send(-1).unwrap(); // boundary (included in first segment for split_after)
1825        tx.send(99).unwrap();
1826        drop(tx);
1827
1828        let outer = Source::from_iter(rx)
1829            .split_after(|item| *item == -1)
1830            .run_with(Sink::queue())
1831            .unwrap();
1832
1833        let (result_tx, result_rx) = mpsc::channel();
1834        thread::spawn(move || {
1835            let first = outer.pull().unwrap().expect("expected first substream");
1836            let items: Vec<i32> = first.run_collect().unwrap();
1837            let second = outer.pull().unwrap().expect("expected second substream");
1838            let items2: Vec<i32> = second.run_collect().unwrap();
1839            let done = outer.pull().unwrap().is_none();
1840            let _ = result_tx.send((items, items2, done));
1841        });
1842
1843        let (items, items2, done) = result_rx
1844            .recv_timeout(StdDuration::from_secs(5))
1845            .expect("timed out — split_after is buffering the whole segment");
1846        // split_after includes the boundary element in the first segment.
1847        assert_eq!(items.len(), SEGMENT_LEN + 1);
1848        assert_eq!(items2, vec![99]);
1849        assert!(done);
1850    }
1851
1852    // Stress test: run flat_map_merge 20 times with many concurrent inner
1853    // streams to confirm the fixed coordinator tail loop never hangs.
1854    // Intended to be run with --test-threads=32 to expose the race.
1855    #[test]
1856    fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
1857        for _ in 0..20 {
1858            let result = Source::from_iter(0..50_i32)
1859                .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
1860                .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
1861                .unwrap()
1862                .wait();
1863            // Each i in 0..50 contributes i + (i+1) + (i+2) = 3i + 3.
1864            // Sum = 3 * (0+1+..+49) + 3*50 = 3*1225 + 150 = 3675 + 150 = 3825.
1865            assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
1866        }
1867    }
1868
1869    // Stress test for the single-mutex flat_map_merge refactor.
1870    // 20 runs with high concurrency to verify no deadlock, lost wakeup,
1871    // or window-accounting corruption under the merged-lock design.
1872    // Intended to be run with --test-threads=32.
1873    #[test]
1874    fn flat_map_merge_single_mutex_race_stress() {
1875        for _ in 0..20 {
1876            let result = Source::from_iter(0..100_i64)
1877                .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
1878                .run_with(Sink::fold(0i64, |acc, v| acc + v))
1879                .unwrap()
1880                .wait();
1881            // sum of item + (item+1000) for item in 0..100
1882            // = sum(0..100) + sum(0..100) + 100*1000
1883            // = 4950 + 4950 + 100000 = 109900
1884            assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
1885        }
1886    }
1887
1888    // Bounded-memory test for split_when batching: verify that items are
1889    // delivered to a slow consumer and not held indefinitely in the writer
1890    // buffer.  Uses a rendezvous-style channel where the producer sends MANY
1891    // items into the outer queue (larger than LIVE_SUBSTREAM_BATCH = 64), then
1892    // blocks.  The consumer must be able to drain the first segment fully — i.e.
1893    // the writer must have flushed even when the batch was smaller than 64.
1894    #[test]
1895    fn split_when_bounded_memory_rendezvous() {
1896        // Segment of 100 items (> LIVE_SUBSTREAM_BATCH=64): the writer will
1897        // flush at 64, then again at the boundary (remaining 36).
1898        const SEGMENT: usize = 100;
1899        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
1900        for i in 0..SEGMENT as i32 {
1901            tx.send(i).unwrap();
1902        }
1903        tx.send(-1).unwrap(); // boundary (split_when: starts new segment)
1904        // Second segment
1905        for i in 0..10_i32 {
1906            tx.send(i).unwrap();
1907        }
1908        drop(tx);
1909
1910        let outer = Source::from_iter(rx)
1911            .split_when(|item| *item == -1)
1912            .run_with(Sink::queue())
1913            .unwrap();
1914
1915        let (result_tx, result_rx) = mpsc::channel();
1916        thread::spawn(move || {
1917            let first = outer.pull().unwrap().expect("first segment");
1918            let seg1: Vec<i32> = first.run_collect().unwrap();
1919            let second = outer.pull().unwrap().expect("second segment");
1920            let seg2: Vec<i32> = second.run_collect().unwrap();
1921            let done = outer.pull().unwrap().is_none();
1922            result_tx.send((seg1, seg2, done)).unwrap();
1923        });
1924
1925        let (seg1, seg2, done) = result_rx
1926            .recv_timeout(StdDuration::from_secs(5))
1927            .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
1928        assert_eq!(seg1.len(), SEGMENT, "first segment length");
1929        assert_eq!(seg2[0], -1, "boundary element starts second segment");
1930        assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
1931        assert!(done);
1932    }
1933
1934    // Bounded-memory test for group_by batching: verifies that a slow consumer
1935    // waiting on a single-key substream eventually receives all items (i.e. the
1936    // write batch is flushed, not held indefinitely).
1937    #[test]
1938    fn group_by_single_key_bounded_memory_rendezvous() {
1939        // 200 items, all same key, batched in groups of 64 by the writer.
1940        // The consumer blocks on run_collect(); all items must arrive.
1941        const N: usize = 200;
1942        let outer = Source::from_iter(0..N as i64)
1943            .group_by(1, |_| 0u8, false)
1944            .run_with(Sink::queue())
1945            .unwrap();
1946
1947        let (result_tx, result_rx) = mpsc::channel();
1948        thread::spawn(move || {
1949            let substream = outer.pull().unwrap().expect("substream");
1950            let items: Vec<i64> = substream.run_collect().unwrap();
1951            let done = outer.pull().unwrap().is_none();
1952            result_tx.send((items, done)).unwrap();
1953        });
1954
1955        let (items, done) = result_rx
1956            .recv_timeout(StdDuration::from_secs(5))
1957            .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
1958        assert_eq!(items.len(), N, "all items delivered");
1959        assert_eq!(items[0], 0);
1960        assert_eq!(items[N - 1], (N - 1) as i64);
1961        assert!(done);
1962    }
1963
1964    #[test]
1965    fn scan_emits_seed_and_accumulated_values() {
1966        let result = Source::from_iter(1..=3)
1967            .scan(0, |acc, item| acc + item)
1968            .run_collect()
1969            .unwrap();
1970
1971        assert_eq!(result, vec![0, 1, 3, 6]);
1972    }
1973
1974    #[test]
1975    fn limit_fails_after_max_elements() {
1976        let result = Source::from_iter(0..3).limit(2).run_collect();
1977
1978        assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
1979    }
1980
1981    #[test]
1982    fn limit_weighted_fails_with_limit_error_like_akka() {
1983        let result = Source::from_iter(["this", "is", "some", "string"])
1984            .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
1985            .run_collect();
1986
1987        assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
1988    }
1989
1990    #[test]
1991    fn grouped_weighted_allows_oversized_first_element_like_akka() {
1992        let result = Source::from_iter([10_usize, 1, 2])
1993            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
1994            .run_collect()
1995            .unwrap();
1996
1997        assert_eq!(result, vec![vec![10], vec![1, 2]]);
1998    }
1999
2000    #[test]
2001    fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2002        let result = Source::from_iter([1_usize, 10, 2])
2003            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2004            .run_collect()
2005            .unwrap();
2006
2007        assert_eq!(result, vec![vec![1, 10], vec![2]]);
2008    }
2009
2010    #[test]
2011    fn sink_terminals_materialize_results() {
2012        let sum = Source::from_iter(1..=4)
2013            .run_with(Sink::fold(0, |acc, item| acc + item))
2014            .unwrap();
2015
2016        assert_eq!(wait(sum), 10);
2017        assert_eq!(
2018            wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2019            1
2020        );
2021        assert_eq!(
2022            wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2023            4
2024        );
2025    }
2026
2027    #[test]
2028    fn all_terminal_sink_variants_complete() {
2029        assert_eq!(
2030            wait(
2031                Source::from_iter([1, 2, 3])
2032                    .run_with(Sink::collect())
2033                    .unwrap()
2034            ),
2035            vec![1, 2, 3]
2036        );
2037        assert_eq!(
2038            wait(
2039                Source::<i32>::empty()
2040                    .run_with(Sink::head_option())
2041                    .unwrap()
2042            ),
2043            None
2044        );
2045        assert_eq!(
2046            wait(
2047                Source::from_iter([1, 2, 3])
2048                    .run_with(Sink::last_option())
2049                    .unwrap()
2050            ),
2051            Some(3)
2052        );
2053        assert_eq!(
2054            wait(
2055                Source::from_iter([1, 2, 3])
2056                    .run_with(Sink::reduce(|acc, item| acc + item))
2057                    .unwrap()
2058            ),
2059            6
2060        );
2061
2062        let seen = StdArc::new(StdAtomicUsize::new(0));
2063        let seen_by_sink = StdArc::clone(&seen);
2064        assert_eq!(
2065            wait(
2066                Source::from_iter([1_usize, 2, 3])
2067                    .run_with(Sink::foreach(move |item| {
2068                        seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2069                    }))
2070                    .unwrap()
2071            ),
2072            NotUsed
2073        );
2074        assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2075    }
2076
2077    #[test]
2078    fn take_last_zero_returns_empty_vector() {
2079        let result = Source::from_iter([1, 2, 3])
2080            .run_with(Sink::take_last(0))
2081            .unwrap();
2082
2083        assert_eq!(wait(result), Vec::<i32>::new());
2084    }
2085
2086    #[test]
2087    fn bounded_head_terminals_complete_inline() {
2088        let materializer = Materializer::new();
2089
2090        let mut head = Source::from_iter(0_u64..1_000)
2091            .run_with_materializer(Sink::head(), &materializer)
2092            .unwrap();
2093        assert_eq!(materializer.active_streams(), 0);
2094        assert_eq!(head.try_wait(), Some(Ok(0)));
2095
2096        let mut filtered_head = Source::from_iter(0_u64..1_000)
2097            .filter(|item| *item >= 10)
2098            .run_with_materializer(Sink::head(), &materializer)
2099            .unwrap();
2100        assert_eq!(materializer.active_streams(), 0);
2101        assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2102
2103        let mut head_option = Source::<u64>::empty()
2104            .run_with_materializer(Sink::head_option(), &materializer)
2105            .unwrap();
2106        assert_eq!(materializer.active_streams(), 0);
2107        assert_eq!(head_option.try_wait(), Some(Ok(None)));
2108    }
2109
2110    #[test]
2111    fn bounded_head_fast_path_preserves_terminal_errors() {
2112        let materializer = Materializer::new();
2113
2114        let mut empty = Source::<u64>::empty()
2115            .run_with_materializer(Sink::head(), &materializer)
2116            .unwrap();
2117        assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2118
2119        let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2120            .run_with_materializer(Sink::head(), &materializer)
2121            .unwrap();
2122        assert_eq!(
2123            failed.try_wait(),
2124            Some(Err(StreamError::Failed("boom".into())))
2125        );
2126        assert_eq!(materializer.active_streams(), 0);
2127    }
2128
2129    #[test]
2130    fn runnable_graph_composes_source_and_sink() {
2131        let graph = Source::from_iter(1..=4)
2132            .map(|item| item * 2)
2133            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2134
2135        assert_eq!(wait(graph.run().unwrap()), 20);
2136
2137        let graph = Source::single(1)
2138            .map_materialized_value(|_| 20)
2139            .to(Sink::ignore())
2140            .map_materialized_value(|value| value + 1);
2141        assert_eq!(graph.run().unwrap(), 21);
2142
2143        let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2144        assert_eq!(ignored, NotUsed);
2145    }
2146
2147    #[test]
2148    fn materialized_values_follow_keep_defaults() {
2149        let source = Source::single(1).map_materialized_value(|_| "source");
2150        let flow = Flow::identity().map_materialized_value(|_| "flow");
2151
2152        let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2153        assert_eq!(source_mat.unwrap(), "source");
2154
2155        let combined = source
2156            .via_mat(flow, Keep::both)
2157            .to_mat(Sink::ignore(), Keep::both)
2158            .run()
2159            .unwrap();
2160        assert_eq!(combined.0, ("source", "flow"));
2161        assert_eq!(wait(combined.1), NotUsed);
2162
2163        let sink_mat = Source::single(41)
2164            .map_materialized_value(|_| "ignored source")
2165            .run_with(Sink::fold(1, |acc, item| acc + item))
2166            .unwrap();
2167        assert_eq!(wait(sink_mat), 42);
2168    }
2169
2170    #[test]
2171    fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2172        let sink = Flow::identity()
2173            .map(|item: i32| item + 1)
2174            .map_materialized_value(|_| "flow")
2175            .to(Sink::fold(0, |acc, item| acc + item));
2176
2177        let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2178
2179        assert_eq!(materialized, "flow");
2180        let explicit = Flow::identity()
2181            .map(|item: i32| item + 1)
2182            .map_materialized_value(|_| "flow")
2183            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2184            .run_with(Source::from_iter([1, 2, 3]))
2185            .unwrap();
2186        assert_eq!(explicit, NotUsed);
2187
2188        let explicit = Source::from_iter([1, 2, 3])
2189            .run_with(
2190                Flow::identity()
2191                    .map(|item: i32| item + 1)
2192                    .map_materialized_value(|_| "flow")
2193                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2194            )
2195            .unwrap();
2196        assert_eq!(explicit.0, "flow");
2197        assert_eq!(wait(explicit.1), 9);
2198    }
2199
2200    #[test]
2201    fn materializer_shutdown_fails_materialization() {
2202        let materializer = Materializer::new();
2203        let named = materializer.with_name_prefix("test-stream");
2204        materializer.shutdown();
2205
2206        let graph = Source::single(1).to(Sink::ignore());
2207
2208        assert_eq!(named.name_prefix(), "test-stream");
2209        assert_eq!(
2210            graph.run_with_materializer(&named),
2211            Err(StreamError::AbruptTermination)
2212        );
2213    }
2214
2215    #[test]
2216    fn materializer_shutdown_fails_running_stream_completion() {
2217        let materializer = Materializer::new();
2218        let completion = Source::repeat(1)
2219            .run_with_materializer(Sink::ignore(), &materializer)
2220            .unwrap();
2221
2222        assert_eq!(materializer.active_streams(), 1);
2223        materializer.shutdown();
2224        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2225        assert_eq!(materializer.active_streams(), 0);
2226    }
2227
2228    #[test]
2229    fn dropped_stream_completion_cancels_running_stream() {
2230        let materializer = Materializer::new();
2231        let completion = Source::repeat(1)
2232            .run_with_materializer(Sink::ignore(), &materializer)
2233            .unwrap();
2234
2235        assert_eq!(materializer.active_streams(), 1);
2236        drop(completion);
2237        for _ in 0..50 {
2238            if materializer.active_streams() == 0 {
2239                break;
2240            }
2241            thread::sleep(Duration::from_millis(5));
2242        }
2243        assert_eq!(materializer.active_streams(), 0);
2244    }
2245
2246    #[test]
2247    fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2248        let materializer = Materializer::new();
2249        let (once_tx, once_rx) = mpsc::channel();
2250        let once = materializer.schedule_once(Duration::from_millis(5), move || {
2251            once_tx.send(()).unwrap();
2252        });
2253        once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2254        assert!(!once.is_cancelled());
2255
2256        let (cancelled_tx, cancelled_rx) = mpsc::channel();
2257        let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2258            cancelled_tx.send(()).unwrap();
2259        });
2260        assert!(cancelled.cancel());
2261        assert!(!cancelled.cancel());
2262        assert!(cancelled.is_cancelled());
2263        assert!(
2264            cancelled_rx
2265                .recv_timeout(Duration::from_millis(75))
2266                .is_err()
2267        );
2268
2269        let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2270        let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2271        let fixed_delay = materializer.schedule_with_fixed_delay(
2272            Duration::from_millis(1),
2273            Duration::from_millis(5),
2274            move || {
2275                fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2276            },
2277        );
2278        thread::sleep(Duration::from_millis(25));
2279        assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2280        fixed_delay.cancel();
2281
2282        let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2283        let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2284        let fixed_rate = materializer.schedule_at_fixed_rate(
2285            Duration::from_millis(1),
2286            Duration::from_millis(5),
2287            move || {
2288                fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2289            },
2290        );
2291        thread::sleep(Duration::from_millis(25));
2292        assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2293        fixed_rate.cancel();
2294
2295        let shutdown_materializer = Materializer::new();
2296        let (shutdown_tx, shutdown_rx) = mpsc::channel();
2297        shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2298            shutdown_tx.send(()).unwrap();
2299        });
2300        shutdown_materializer.shutdown();
2301        assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2302    }
2303
2304    #[test]
2305    fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2306        let interval = Duration::from_millis(20);
2307        let task_time = Duration::from_millis(50);
2308
2309        let rate_materializer = Materializer::new();
2310        let (rate_tx, rate_rx) = mpsc::channel();
2311        let fixed_rate =
2312            rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2313                thread::sleep(task_time);
2314                rate_tx.send(Instant::now()).unwrap();
2315            });
2316        let rate_first = rate_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2317        let rate_second = rate_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2318        fixed_rate.cancel();
2319        rate_materializer.shutdown();
2320
2321        let delay_materializer = Materializer::new();
2322        let (delay_tx, delay_rx) = mpsc::channel();
2323        let fixed_delay =
2324            delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2325                thread::sleep(task_time);
2326                delay_tx.send(Instant::now()).unwrap();
2327            });
2328        let delay_first = delay_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2329        let delay_second = delay_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2330        fixed_delay.cancel();
2331        delay_materializer.shutdown();
2332
2333        let rate_gap = rate_second.duration_since(rate_first);
2334        let delay_gap = delay_second.duration_since(delay_first);
2335        assert!(
2336            delay_gap > rate_gap + Duration::from_millis(12),
2337            "fixed-rate gap {rate_gap:?} should stay meaningfully below fixed-delay gap {delay_gap:?}",
2338        );
2339    }
2340
2341    #[test]
2342    fn runtime_repeating_timer_cancellation_stops_future_fires() {
2343        let materializer = Materializer::new();
2344        let (tx, rx) = mpsc::channel();
2345        let timer = materializer.schedule_at_fixed_rate(
2346            Duration::from_millis(1),
2347            Duration::from_millis(30),
2348            move || {
2349                tx.send(()).unwrap();
2350            },
2351        );
2352
2353        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2354        assert!(timer.cancel());
2355        assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2356        materializer.shutdown();
2357    }
2358
2359    #[test]
2360    fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2361        let materializer = Materializer::new();
2362        materializer.schedule_once(Duration::from_millis(1), || {
2363            panic!("timer boom");
2364        });
2365
2366        let (tx, rx) = mpsc::channel();
2367        materializer.schedule_once(Duration::from_millis(20), move || {
2368            tx.send(()).unwrap();
2369        });
2370
2371        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2372        materializer.shutdown();
2373    }
2374
2375    #[test]
2376    fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2377        let materializer = Materializer::new();
2378        let panic_count = StdArc::new(StdAtomicUsize::new(0));
2379        let panic_count_task = StdArc::clone(&panic_count);
2380        materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2381            panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2382            panic!("fixed-rate boom");
2383        });
2384
2385        assert!(wait_until(Duration::from_millis(150), || {
2386            panic_count.load(StdOrdering::SeqCst) == 1
2387        }));
2388
2389        let (tx, rx) = mpsc::channel();
2390        materializer.schedule_once(Duration::from_millis(30), move || {
2391            tx.send(()).unwrap();
2392        });
2393        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2394
2395        thread::sleep(Duration::from_millis(90));
2396        assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2397        materializer.shutdown();
2398    }
2399
2400    #[test]
2401    fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2402        let materializer = Materializer::new();
2403        let started = StdArc::new(StdAtomicBool::new(false));
2404        let started_task = StdArc::clone(&started);
2405        let slow_timer = materializer.schedule_at_fixed_rate(
2406            Duration::ZERO,
2407            Duration::from_millis(250),
2408            move || {
2409                started_task.store(true, StdOrdering::SeqCst);
2410                thread::sleep(Duration::from_millis(200));
2411            },
2412        );
2413
2414        assert!(wait_until(Duration::from_millis(100), || {
2415            started.load(StdOrdering::SeqCst)
2416        }));
2417
2418        let start = Instant::now();
2419        let (tx, rx) = mpsc::channel();
2420        materializer.schedule_once(Duration::from_millis(10), move || {
2421            tx.send(Instant::now()).unwrap();
2422        });
2423        let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2424        let elapsed = fired_at.duration_since(start);
2425
2426        slow_timer.cancel();
2427        materializer.shutdown();
2428        assert!(
2429            elapsed < Duration::from_millis(150),
2430            "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2431        );
2432    }
2433
2434    #[test]
2435    fn runtime_shutdown_stops_timer_driver_thread() {
2436        let materializer = Materializer::new();
2437        assert!(wait_until(Duration::from_secs(1), || materializer
2438            .timer_driver_is_live()));
2439
2440        materializer.shutdown();
2441        assert!(wait_until(Duration::from_secs(2), || !materializer
2442            .timer_driver_is_live()));
2443    }
2444
2445    #[test]
2446    fn runtime_timer_driver_orders_many_timers_by_deadline() {
2447        let materializer = Materializer::new();
2448        let (tx, rx) = mpsc::channel();
2449        let schedule = [(45_u64, 4_u8), (5, 1), (35, 3), (15, 2), (55, 5)];
2450
2451        for (delay_ms, value) in schedule {
2452            let tx = tx.clone();
2453            materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2454                tx.send(value).unwrap();
2455            });
2456        }
2457        drop(tx);
2458
2459        let mut received = Vec::new();
2460        for _ in 0..schedule.len() {
2461            received.push(rx.recv_timeout(Duration::from_millis(250)).unwrap());
2462        }
2463        materializer.shutdown();
2464
2465        assert_eq!(received, vec![1, 2, 3, 4, 5]);
2466    }
2467
2468    #[test]
2469    fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
2470        let materializer = Materializer::new();
2471        let thread_name = materializer.timer_thread_name().to_owned();
2472        assert!(wait_until(Duration::from_millis(200), || {
2473            linux_thread_count(&thread_name) == 1
2474        }));
2475
2476        for _ in 0..128 {
2477            materializer.schedule_once(Duration::from_secs(60), || {});
2478        }
2479
2480        assert!(
2481            linux_thread_count(&thread_name) == 1,
2482            "scheduling timers should not create extra timer threads for a runtime",
2483        );
2484        materializer.shutdown();
2485        assert!(wait_until(Duration::from_millis(200), || {
2486            linux_thread_count(&thread_name) == 0
2487        }));
2488    }
2489
2490    #[test]
2491    fn cancelled_and_never_sinks_have_distinct_materialization_results() {
2492        assert_eq!(
2493            Source::repeat(1)
2494                .run_with(Sink::cancelled())
2495                .expect("cancelled sink materializes"),
2496            NotUsed
2497        );
2498        assert_eq!(
2499            Source::single(1)
2500                .run_with(Sink::never())
2501                .expect("never sink materializes")
2502                .try_wait(),
2503            None
2504        );
2505    }
2506
2507    #[test]
2508    fn never_sink_finishes_on_materializer_shutdown() {
2509        let materializer = Materializer::new();
2510        let completion = Source::single(1)
2511            .run_with_materializer(Sink::never(), &materializer)
2512            .unwrap();
2513
2514        materializer.shutdown();
2515        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2516    }
2517
2518    #[test]
2519    fn dropping_source_never_completion_releases_parked_worker() {
2520        let materializer = Materializer::new();
2521        let completion = Source::<i32>::never()
2522            .run_with_materializer(Sink::ignore(), &materializer)
2523            .unwrap();
2524
2525        assert!(wait_until(StdDuration::from_secs(1), || {
2526            materializer.active_streams() == 1
2527        }));
2528        assert_eq!(materializer.active_streams(), 1);
2529
2530        drop(completion);
2531
2532        assert!(wait_until(StdDuration::from_secs(15), || {
2533            materializer.active_streams() == 0
2534        }));
2535        assert_eq!(materializer.active_streams(), 0);
2536    }
2537
2538    #[test]
2539    fn future_and_maybe_sources_emit_values() {
2540        let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
2541        assert_eq!(future_value, vec![7]);
2542
2543        let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
2544            .run_collect()
2545            .unwrap();
2546        assert_eq!(future_source, vec![1, 2, 3]);
2547
2548        let (handle, source) = Source::maybe();
2549        assert_eq!(
2550            source.clone().run_collect(),
2551            Err(StreamError::MaybeIncomplete)
2552        );
2553        handle.complete(9).unwrap();
2554        assert_eq!(source.run_collect().unwrap(), vec![9]);
2555    }
2556
2557    #[test]
2558    fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
2559        assert_eq!(
2560            Source::cycle(|| [1, 2, 3].into_iter())
2561                .take(8)
2562                .run_collect()
2563                .unwrap(),
2564            vec![1, 2, 3, 1, 2, 3, 1, 2]
2565        );
2566        assert_eq!(
2567            Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
2568            Err(StreamError::Failed("empty iterator".into()))
2569        );
2570        assert_eq!(
2571            Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
2572                .run_collect()
2573                .unwrap(),
2574            vec![0, 1, 2, 3]
2575        );
2576        assert_eq!(
2577            Source::unfold_async(0, |state| async move {
2578                Ok((state < 4).then_some((state + 1, state * 2)))
2579            })
2580            .run_collect()
2581            .unwrap(),
2582            vec![0, 2, 4, 6]
2583        );
2584        assert!(matches!(
2585            Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
2586            Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
2587        ));
2588    }
2589
2590    #[test]
2591    fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
2592        let created = StdArc::new(StdAtomicUsize::new(0));
2593        let created_for_source = StdArc::clone(&created);
2594        let source = Source::<i32>::lazy_source(move || {
2595            created_for_source.fetch_add(1, StdOrdering::SeqCst);
2596            Source::from_iter([7, 8]).map_materialized_value(|_| 99)
2597        });
2598        let materializer = Materializer::new();
2599        let (mut stream, mut mat) = StdArc::clone(&source.factory)
2600            .create(&materializer)
2601            .unwrap();
2602
2603        assert_eq!(created.load(StdOrdering::SeqCst), 0);
2604        assert!(mat.try_wait().is_none());
2605        assert_eq!(stream.next().unwrap().unwrap(), 7);
2606        assert_eq!(mat.wait().unwrap(), 99);
2607        assert_eq!(created.load(StdOrdering::SeqCst), 1);
2608        assert_eq!(stream.next().unwrap().unwrap(), 8);
2609
2610        let never_created = StdArc::new(StdAtomicUsize::new(0));
2611        let never_created_for_source = StdArc::clone(&never_created);
2612        let mat = Source::<i32>::lazy_future_source(move || {
2613            never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
2614            async { Ok(Source::single(1)) }
2615        })
2616        .to(Sink::cancelled())
2617        .run()
2618        .unwrap();
2619        assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
2620        assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
2621
2622        let lazy_future = StdArc::new(StdAtomicUsize::new(0));
2623        let lazy_future_for_source = StdArc::clone(&lazy_future);
2624        let source = Source::lazy_future(move || {
2625            lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
2626            async { Ok(42) }
2627        });
2628        let (mut stream, _) = StdArc::clone(&source.factory)
2629            .create(&Materializer::new())
2630            .unwrap();
2631        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
2632        assert_eq!(stream.next().unwrap().unwrap(), 42);
2633        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
2634    }
2635
2636    #[test]
2637    fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
2638        let closed = StdArc::new(StdAtomicUsize::new(0));
2639        let closed_on_complete = StdArc::clone(&closed);
2640        let values = Source::unfold_resource(
2641            || Ok(std::collections::VecDeque::from([1, 2, 3])),
2642            |items| Ok(items.pop_front()),
2643            move |_items| {
2644                closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
2645                Ok(())
2646            },
2647        )
2648        .run_collect()
2649        .unwrap();
2650        assert_eq!(values, vec![1, 2, 3]);
2651        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
2652
2653        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
2654        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
2655        let failed = Source::<i32>::unfold_resource(
2656            || Ok(()),
2657            |_| Err(StreamError::Failed("read".into())),
2658            move |_| {
2659                closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
2660                Err(StreamError::Failed("close".into()))
2661            },
2662        )
2663        .run_collect();
2664        assert_eq!(failed, Err(StreamError::Failed("read".into())));
2665        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
2666
2667        let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
2668        let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
2669        let first = Source::unfold_resource(
2670            || Ok(0_usize),
2671            |next| {
2672                let item = *next;
2673                *next += 1;
2674                Ok(Some(item))
2675            },
2676            move |_| {
2677                closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
2678                Ok(())
2679            },
2680        )
2681        .run_with(Sink::head())
2682        .unwrap();
2683        assert_eq!(first.wait().unwrap(), 0);
2684        assert!(wait_until(Duration::from_millis(250), || {
2685            closed_on_cancel.load(StdOrdering::SeqCst) == 1
2686        }));
2687    }
2688
2689    #[test]
2690    fn wp6b_async_resource_and_async_accumulators_are_sequential() {
2691        let closed = StdArc::new(StdAtomicUsize::new(0));
2692        let closed_for_close = StdArc::clone(&closed);
2693        let values = Source::unfold_resource_async(
2694            || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
2695            |items| {
2696                let item = items.pop_front();
2697                async move { Ok(item) }
2698            },
2699            move |_items| {
2700                let closed = StdArc::clone(&closed_for_close);
2701                async move {
2702                    closed.fetch_add(1, StdOrdering::SeqCst);
2703                    Ok(())
2704                }
2705            },
2706        )
2707        .run_collect()
2708        .unwrap();
2709        assert_eq!(values, vec![1, 2, 3]);
2710        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
2711
2712        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
2713        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
2714        let failed = Source::<i32>::unfold_resource_async(
2715            || async { Ok(()) },
2716            |_resource| async { Err(StreamError::Failed("read".into())) },
2717            move |_resource| {
2718                let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
2719                async move {
2720                    closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
2721                    Err(StreamError::Failed("close".into()))
2722                }
2723            },
2724        )
2725        .run_collect();
2726        assert_eq!(failed, Err(StreamError::Failed("read".into())));
2727        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
2728
2729        let active = StdArc::new(StdAtomicUsize::new(0));
2730        let max_active = StdArc::new(StdAtomicUsize::new(0));
2731        let active_for_stage = StdArc::clone(&active);
2732        let max_for_stage = StdArc::clone(&max_active);
2733        let scanned = Source::from_iter(1..=4)
2734            .scan_async(0, move |acc, item| {
2735                let active = StdArc::clone(&active_for_stage);
2736                let max_active = StdArc::clone(&max_for_stage);
2737                async move {
2738                    let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
2739                    max_active.fetch_max(now, StdOrdering::SeqCst);
2740                    tokio::time::sleep(Duration::from_millis(1)).await;
2741                    active.fetch_sub(1, StdOrdering::SeqCst);
2742                    Ok(acc + item)
2743                }
2744            })
2745            .run_collect()
2746            .unwrap();
2747        assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
2748        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
2749
2750        let folded = Source::from_iter(1..=4)
2751            .fold_async(0, |acc, item| async move { Ok(acc + item) })
2752            .run_collect()
2753            .unwrap();
2754        assert_eq!(folded, vec![10]);
2755    }
2756
2757    #[test]
2758    fn wp6b_fold_async_materialization_does_not_drain_upstream() {
2759        let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
2760        let started = StdArc::new(StdAtomicBool::new(false));
2761        let source = {
2762            let release = StdArc::clone(&release);
2763            let started = StdArc::clone(&started);
2764            Source::from_factory(move || {
2765                let release = StdArc::clone(&release);
2766                let started = StdArc::clone(&started);
2767                let mut emitted = false;
2768                Box::new(std::iter::from_fn(move || {
2769                    if emitted {
2770                        return None;
2771                    }
2772                    emitted = true;
2773                    started.store(true, StdOrdering::SeqCst);
2774                    let (released, available) = &*release;
2775                    let mut released = released.lock().unwrap();
2776                    while !*released {
2777                        released = available.wait(released).unwrap();
2778                    }
2779                    Some(Ok(1))
2780                }))
2781            })
2782        };
2783
2784        let (materialized_tx, materialized_rx) = mpsc::channel();
2785        let join = thread::spawn(move || {
2786            let queue = source
2787                .fold_async(0, |acc, item| async move { Ok(acc + item) })
2788                .run_with(Sink::queue())
2789                .unwrap();
2790            materialized_tx.send(queue).unwrap();
2791        });
2792
2793        let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
2794            Ok(queue) => queue,
2795            Err(error) => {
2796                let (released, available) = &*release;
2797                *released.lock().unwrap() = true;
2798                available.notify_all();
2799                let _ = join.join();
2800                panic!("fold_async materialization did not return before first pull: {error}");
2801            }
2802        };
2803        let (released, _) = &*release;
2804        assert!(
2805            !*released.lock().unwrap(),
2806            "test source was released before materialization returned"
2807        );
2808
2809        let (released, available) = &*release;
2810        *released.lock().unwrap() = true;
2811        available.notify_all();
2812        assert_eq!(queue.pull().unwrap(), Some(1));
2813        assert_eq!(queue.pull().unwrap(), None);
2814        join.join().unwrap();
2815        assert!(started.load(StdOrdering::SeqCst));
2816    }
2817
2818    #[test]
2819    fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
2820        let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
2821        let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
2822        let empty_sink = Source::<i32>::empty()
2823            .run_with(Sink::lazy_sink(move || {
2824                lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
2825                Sink::ignore()
2826            }))
2827            .unwrap();
2828        assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
2829        assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
2830
2831        let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
2832        let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
2833        Source::from_iter([1_usize, 2, 3])
2834            .run_with(Sink::foreach_async(2, move |item| {
2835                let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
2836                async move {
2837                    foreach_sum.fetch_add(item, StdOrdering::SeqCst);
2838                    Ok(())
2839                }
2840            }))
2841            .unwrap()
2842            .wait()
2843            .unwrap();
2844        assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
2845
2846        let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
2847        let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
2848        let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
2849            lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
2850            Flow::identity()
2851                .map(|item: i32| item + 10)
2852                .map_materialized_value(|_| 123)
2853        });
2854        let mat = (lazy_flow.materialize)().unwrap();
2855        let mut stream = match lazy_flow.transform {
2856            flow::FlowTransform::Runtime(transform) => {
2857                transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
2858            }
2859            flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
2860        };
2861        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
2862        assert_eq!(stream.next().unwrap().unwrap(), 11);
2863        assert_eq!(mat.wait().unwrap(), 123);
2864        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
2865        assert_eq!(stream.next().unwrap().unwrap(), 12);
2866
2867        let future_flow = Source::from_iter([1, 2])
2868            .via_mat(
2869                Flow::future_flow(|| async {
2870                    Ok(Flow::identity()
2871                        .map(|item: i32| item * 2)
2872                        .map_materialized_value(|_| 77))
2873                }),
2874                Keep::right,
2875            )
2876            .to_mat(Sink::collect(), Keep::both)
2877            .run()
2878            .unwrap();
2879        assert_eq!(future_flow.0.wait().unwrap(), 77);
2880        assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
2881    }
2882
2883    #[test]
2884    fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
2885        // Order-sensitive pin for the per-thread FIFO slot pairing: one
2886        // cloned lazy-flow blueprint used twice in the SAME chain on the
2887        // same thread. element = first_stage_id * 100 + second_stage_id, so
2888        // a cross-wired (swapped) mat pairing would yield id2 * 100 + id1.
2889        for round in 0..50 {
2890            let counter = StdArc::new(StdAtomicUsize::new(1));
2891            let factory_counter = StdArc::clone(&counter);
2892            let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
2893                let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
2894                Flow::identity()
2895                    .map(move |x: usize| x * 100 + id)
2896                    .map_materialized_value(move |_| id)
2897            });
2898            let lazy_again = lazy.clone();
2899
2900            let ((first_mat, second_mat), out) = Source::from_iter([0usize])
2901                .via_mat(lazy, Keep::right)
2902                .via_mat(lazy_again, Keep::both)
2903                .to_mat(Sink::collect(), Keep::both)
2904                .run()
2905                .unwrap();
2906
2907            let first_id = first_mat.wait().unwrap();
2908            let second_id = second_mat.wait().unwrap();
2909            let element = out.wait().unwrap()[0];
2910            assert_eq!(
2911                element,
2912                first_id * 100 + second_id,
2913                "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
2914            );
2915            assert_ne!(
2916                first_id, second_id,
2917                "round {round}: same factory instance paired twice"
2918            );
2919        }
2920    }
2921
2922    #[test]
2923    fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
2924        for _ in 0..20 {
2925            let next_id = StdArc::new(StdAtomicUsize::new(0));
2926            let next_id_for_factory = StdArc::clone(&next_id);
2927            let flow = Flow::<i32, i32>::lazy_flow(move || {
2928                let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
2929                Flow::identity()
2930                    .map(move |item: i32| item + (id as i32 * 100))
2931                    .map_materialized_value(move |_| id)
2932            });
2933            let barrier = StdArc::new(std::sync::Barrier::new(3));
2934
2935            let spawn_materialization = |input: i32| {
2936                let flow = flow.clone();
2937                let barrier = StdArc::clone(&barrier);
2938                thread::spawn(move || {
2939                    barrier.wait();
2940                    let (mat, values) = Source::single(input)
2941                        .via_mat(flow, Keep::right)
2942                        .to_mat(Sink::collect(), Keep::both)
2943                        .run()
2944                        .unwrap();
2945                    (input, mat.wait().unwrap(), values.wait().unwrap())
2946                })
2947            };
2948
2949            let first = spawn_materialization(1);
2950            let second = spawn_materialization(2);
2951            barrier.wait();
2952
2953            for result in [first.join().unwrap(), second.join().unwrap()] {
2954                let (input, mat_id, values) = result;
2955                assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
2956            }
2957            assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
2958        }
2959    }
2960
2961    #[test]
2962    fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
2963        let queue = Source::from_factory(|| {
2964            Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
2965        })
2966        .map_with_resource(
2967            || Ok(()),
2968            |_resource, item| Ok(item + 10),
2969            |_resource| Ok(Some(99)),
2970        )
2971        .run_with(Sink::queue())
2972        .unwrap();
2973
2974        assert_eq!(queue.pull().unwrap(), Some(11));
2975        assert_eq!(queue.pull().unwrap(), Some(99));
2976        assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
2977
2978        let failed: StreamResult<Vec<i32>> = Source::single(1)
2979            .map_with_resource(
2980                || Ok(()),
2981                |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
2982                |_resource| -> StreamResult<Option<i32>> {
2983                    Err(StreamError::Failed("close".into()))
2984                },
2985            )
2986            .run_collect();
2987        assert_eq!(failed, Err(StreamError::Failed("map".into())));
2988    }
2989
2990    #[test]
2991    fn stateful_and_terminal_source_operators_work() {
2992        let stateful = Source::from_iter([1, 2, 3])
2993            .stateful_map(0, |sum, item| {
2994                *sum += item;
2995                *sum
2996            })
2997            .run_collect()
2998            .unwrap();
2999        assert_eq!(stateful, vec![1, 3, 6]);
3000
3001        let concat = Source::from_iter([1, 2, 3])
3002            .stateful_map_concat(0, |sum, item| {
3003                *sum += item;
3004                [item, *sum]
3005            })
3006            .run_collect()
3007            .unwrap();
3008        assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3009
3010        assert_eq!(
3011            Source::from_iter([1, 2, 3])
3012                .fold(10, |acc, item| acc + item)
3013                .run_collect()
3014                .unwrap(),
3015            vec![16]
3016        );
3017        assert_eq!(
3018            Source::from_iter([1, 2, 3])
3019                .reduce(|acc, item| acc + item)
3020                .run_collect()
3021                .unwrap(),
3022            vec![6]
3023        );
3024    }
3025
3026    #[test]
3027    fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3028        let concat = Source::single(())
3029            .map_concat(|_| 0_u64..)
3030            .take(1)
3031            .run_collect()
3032            .unwrap();
3033        assert_eq!(concat, vec![0]);
3034
3035        let sliding = Source::repeat(1_u64)
3036            .sliding(2, 1)
3037            .take(1)
3038            .run_collect()
3039            .unwrap();
3040        assert_eq!(sliding, vec![vec![1, 1]]);
3041    }
3042
3043    #[test]
3044    fn fan_in_source_operators_follow_ordering_rules() {
3045        assert_eq!(
3046            Source::from_iter([1, 2])
3047                .concat(Source::from_iter([3, 4]))
3048                .run_collect()
3049                .unwrap(),
3050            vec![1, 2, 3, 4]
3051        );
3052        assert_eq!(
3053            Source::from_iter([3, 4])
3054                .prepend(Source::from_iter([1, 2]))
3055                .run_collect()
3056                .unwrap(),
3057            vec![1, 2, 3, 4]
3058        );
3059        assert_eq!(
3060            Source::empty()
3061                .or_else(Source::from_iter([10, 20]))
3062                .run_collect()
3063                .unwrap(),
3064            vec![10, 20]
3065        );
3066        assert_eq!(
3067            Source::from_iter([1, 2])
3068                .or_else(Source::from_iter([10, 20]))
3069                .run_collect()
3070                .unwrap(),
3071            vec![1, 2]
3072        );
3073        assert_eq!(
3074            Source::from_iter([1, 2, 3])
3075                .interleave(Source::from_iter([10, 11, 12]), 2)
3076                .run_collect()
3077                .unwrap(),
3078            vec![1, 2, 10, 11, 3, 12]
3079        );
3080    }
3081
3082    #[test]
3083    fn fan_in_flow_operators_compose_with_primary_stream() {
3084        let concat = Source::from_iter([1, 2])
3085            .via(Flow::identity().concat(Source::from_iter([3, 4])))
3086            .run_collect()
3087            .unwrap();
3088        assert_eq!(concat, vec![1, 2, 3, 4]);
3089
3090        let prepend = Source::from_iter([3, 4])
3091            .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3092            .run_collect()
3093            .unwrap();
3094        assert_eq!(prepend, vec![1, 2, 3, 4]);
3095
3096        let interleave = Source::from_iter([1, 2, 3])
3097            .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3098            .run_collect()
3099            .unwrap();
3100        assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3101
3102        let merge_sorted = Source::from_iter([1, 4])
3103            .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3104            .run_collect()
3105            .unwrap();
3106        assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3107
3108        let zip_latest = Source::from_iter([1, 2])
3109            .via(Flow::identity().zip_latest(Source::single(10)))
3110            .run_collect()
3111            .unwrap();
3112        assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3113
3114        let zip_latest_with = Source::from_iter([1, 2])
3115            .via(
3116                Flow::identity()
3117                    .zip_latest_with(Source::single(10), false, |left, right| left + right),
3118            )
3119            .run_collect()
3120            .unwrap();
3121        assert_eq!(zip_latest_with, vec![11, 12]);
3122    }
3123
3124    #[test]
3125    fn fan_in_operators_propagate_errors_and_eager_close() {
3126        assert!(matches!(
3127            Source::failed(StreamError::Failed("boom".into()))
3128                .or_else(Source::from_iter([1, 2]))
3129                .run_collect(),
3130            Err(StreamError::Failed(_))
3131        ));
3132        assert!(matches!(
3133            Source::from_iter([1, 2])
3134                .prepend(Source::failed(StreamError::Failed("boom".into())))
3135                .run_collect(),
3136            Err(StreamError::Failed(_))
3137        ));
3138        assert_eq!(
3139            Source::from_iter([1, 2])
3140                .interleave_all([Source::empty()], 1, true)
3141                .run_collect()
3142                .unwrap(),
3143            vec![1]
3144        );
3145    }
3146
3147    #[test]
3148    fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3149        use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3150
3151        let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3152            AtomicUsize::new(0),
3153            AtomicUsize::new(0),
3154            AtomicUsize::new(0),
3155        ]);
3156
3157        let make_source = |idx: usize| {
3158            let pulls = Arc::clone(&pulls);
3159            Source::from_materialized_factory(move |_| {
3160                let pulls = Arc::clone(&pulls);
3161                let mut emitted = false;
3162                Ok((
3163                    Box::new(std::iter::from_fn(move || {
3164                        pulls[idx].fetch_add(1, Ordering::SeqCst);
3165                        if !emitted && idx == 0 {
3166                            emitted = true;
3167                            Some(Ok(42))
3168                        } else {
3169                            None
3170                        }
3171                    })) as BoxStream<i32>,
3172                    NotUsed,
3173                ))
3174            })
3175        };
3176
3177        let result = make_source(0)
3178            .interleave_all([make_source(1), make_source(2)], 1, false)
3179            .run_with(Sink::head());
3180
3181        assert_eq!(wait(result.unwrap()), 42);
3182        assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3183        assert_eq!(
3184            pulls[1].load(Ordering::SeqCst),
3185            0,
3186            "second input should not be pulled when downstream cancels after first element"
3187        );
3188        assert_eq!(
3189            pulls[2].load(Ordering::SeqCst),
3190            0,
3191            "third input should not be pulled before its turn"
3192        );
3193    }
3194
3195    #[test]
3196    fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3197        assert_eq!(
3198            Source::from_iter([1, 2, 3, 4])
3199                .interleave_all(
3200                    [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3201                    1,
3202                    false
3203                )
3204                .run_collect()
3205                .unwrap(),
3206            vec![1, 10, 20, 2, 21, 3, 22, 4]
3207        );
3208    }
3209
3210    #[test]
3211    fn remaining_merge_and_zip_family_matches_expected_ordering() {
3212        assert_eq!(
3213            Source::from_iter([1, 4])
3214                .merge_sorted(Source::from_iter([2, 3, 5]))
3215                .run_collect()
3216                .unwrap(),
3217            vec![1, 2, 3, 4, 5]
3218        );
3219
3220        assert_eq!(
3221            Source::from_iter([1, 2])
3222                .merge_latest(Source::single(10), false)
3223                .run_collect()
3224                .unwrap(),
3225            vec![vec![1, 10], vec![2, 10]]
3226        );
3227
3228        assert_eq!(
3229            Source::from_iter([1, 2, 3])
3230                .merge_all([Source::from_iter([10, 11])], false)
3231                .run_collect()
3232                .unwrap(),
3233            vec![1, 10, 2, 11, 3]
3234        );
3235
3236        assert_eq!(
3237            Source::from_iter([1, 2, 3])
3238                .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3239                .run_collect()
3240                .unwrap(),
3241            vec![11, 13, 15]
3242        );
3243
3244        assert_eq!(
3245            Source::from_iter([1, 2])
3246                .zip_latest(Source::single(10))
3247                .run_collect()
3248                .unwrap(),
3249            vec![(1, 10), (2, 10)]
3250        );
3251
3252        assert_eq!(
3253            Source::from_iter([1, 2, 3])
3254                .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3255                .run_collect()
3256                .unwrap(),
3257            vec![11, 12, 13]
3258        );
3259
3260        assert_eq!(
3261            Source::from_iter([1, 2])
3262                .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3263                .run_collect()
3264                .unwrap(),
3265            vec![(1, 10), (2, 11), (-1, 12)]
3266        );
3267
3268        assert_eq!(
3269            Source::from_iter([5, 6, 7])
3270                .zip_with_index()
3271                .run_collect()
3272                .unwrap(),
3273            vec![(5, 0), (6, 1), (7, 2)]
3274        );
3275
3276        assert_eq!(
3277            Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3278                .run_collect()
3279                .unwrap(),
3280            vec![vec![1, 10], vec![2, 20]]
3281        );
3282
3283        assert_eq!(
3284            Source::zip_with_n(
3285                [
3286                    Source::from_iter([1, 2]),
3287                    Source::from_iter([10, 20]),
3288                    Source::from_iter([100, 200]),
3289                ],
3290                |values| values.into_iter().sum::<i32>(),
3291            )
3292            .run_collect()
3293            .unwrap(),
3294            vec![111, 222]
3295        );
3296
3297        assert_eq!(
3298            Source::merge_prioritized_n(
3299                [
3300                    (Source::from_iter([1, 2, 3, 4]), 2),
3301                    (Source::from_iter([10, 11]), 1),
3302                ],
3303                false,
3304            )
3305            .run_collect()
3306            .unwrap(),
3307            vec![1, 2, 10, 3, 4, 11]
3308        );
3309
3310        assert_eq!(
3311            Source::combine(
3312                Source::from_iter([1, 2, 3]),
3313                Source::from_iter([10, 11]),
3314                std::iter::empty::<Source<i32, NotUsed>>(),
3315                SourceCombineStrategy::Merge {
3316                    eager_complete: false,
3317                },
3318            )
3319            .run_collect()
3320            .unwrap(),
3321            vec![1, 10, 2, 11, 3]
3322        );
3323
3324        let combined_sink = Sink::combine(
3325            Sink::ignore(),
3326            Sink::ignore(),
3327            std::iter::empty::<Sink<i32, NotUsed>>(),
3328            SinkCombineStrategy::Broadcast,
3329        );
3330        assert_eq!(
3331            Source::from_iter([1, 2, 3])
3332                .run_with(combined_sink)
3333                .unwrap(),
3334            NotUsed
3335        );
3336    }
3337
3338    #[test]
3339    fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3340        // Regression: the combined children's materialized values were
3341        // dropped at materialization, tripping cancel-on-drop and silently
3342        // cancelling every child before it consumed anything (0 deliveries).
3343        let first_count = StdArc::new(StdAtomicUsize::new(0));
3344        let second_count = StdArc::new(StdAtomicUsize::new(0));
3345        let first_counter = StdArc::clone(&first_count);
3346        let second_counter = StdArc::clone(&second_count);
3347        let combined = Sink::combine(
3348            Sink::foreach(move |_: i32| {
3349                first_counter.fetch_add(1, StdOrdering::SeqCst);
3350            }),
3351            Sink::foreach(move |_: i32| {
3352                second_counter.fetch_add(1, StdOrdering::SeqCst);
3353            }),
3354            std::iter::empty::<Sink<i32, NotUsed>>(),
3355            SinkCombineStrategy::Broadcast,
3356        );
3357        assert_eq!(
3358            Source::from_iter(0..100).run_with(combined).unwrap(),
3359            NotUsed
3360        );
3361        // The rendezvous channels guarantee each child has received every
3362        // element before the parent send returns. The child callback may still
3363        // be one scheduler tick behind the parent, so assert with a bounded
3364        // condition wait instead of an immediate load.
3365        assert!(wait_until(StdDuration::from_secs(1), || {
3366            first_count.load(StdOrdering::SeqCst) == 100
3367                && second_count.load(StdOrdering::SeqCst) == 100
3368        }));
3369    }
3370
3371    #[test]
3372    fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3373        // Regression: with eager_complete = false, an empty side left
3374        // `latest = None` forever while the loop drained the other side —
3375        // an infinite busy-loop when that side is unbounded.
3376        assert_eq!(
3377            Source::from_iter(std::iter::empty::<i32>())
3378                .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3379                .run_collect()
3380                .unwrap(),
3381            Vec::<i32>::new()
3382        );
3383        assert_eq!(
3384            Source::repeat(10)
3385                .zip_latest_with(
3386                    Source::from_iter(std::iter::empty::<i32>()),
3387                    false,
3388                    |left, right| left + right,
3389                )
3390                .run_collect()
3391                .unwrap(),
3392            Vec::<i32>::new()
3393        );
3394    }
3395
3396    #[test]
3397    fn zip_family_completion_boundaries_match_expected_results() {
3398        assert_eq!(
3399            Source::from_iter([1, 2, 3])
3400                .zip_with(Source::from_iter([10]), |left, right| left + right)
3401                .run_collect()
3402                .unwrap(),
3403            vec![11]
3404        );
3405
3406        assert_eq!(
3407            Source::from_iter([1, 2, 3])
3408                .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3409                .run_collect()
3410                .unwrap(),
3411            vec![11, 12]
3412        );
3413
3414        assert_eq!(
3415            Source::zip_n([
3416                Source::from_iter([1, 2, 3]),
3417                Source::from_iter([10]),
3418                Source::from_iter([100, 200, 300]),
3419            ])
3420            .run_collect()
3421            .unwrap(),
3422            vec![vec![1, 10, 100]]
3423        );
3424    }
3425
3426    #[test]
3427    fn combine_strategies_follow_merge_concat_and_priority_rules() {
3428        assert_eq!(
3429            Source::combine(
3430                Source::from_iter([1, 2]),
3431                Source::from_iter([10, 11]),
3432                [Source::from_iter([100])],
3433                SourceCombineStrategy::Concat,
3434            )
3435            .run_collect()
3436            .unwrap(),
3437            vec![1, 2, 10, 11, 100]
3438        );
3439
3440        assert_eq!(
3441            Source::combine(
3442                Source::from_iter([1, 2, 3, 4]),
3443                Source::from_iter([10, 11]),
3444                std::iter::empty::<Source<i32, NotUsed>>(),
3445                SourceCombineStrategy::Prioritized {
3446                    priorities: vec![2, 1],
3447                    eager_complete: false,
3448                },
3449            )
3450            .run_collect()
3451            .unwrap(),
3452            vec![1, 2, 10, 3, 4, 11]
3453        );
3454    }
3455
3456    #[test]
3457    fn concat_lazy_defers_follow_on_source_until_needed() {
3458        let source_counter = StdArc::new(StdAtomicUsize::new(0));
3459        let source_counter_clone = StdArc::clone(&source_counter);
3460        let lazy_source = Source::from_materialized_factory(move |_| {
3461            source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3462            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3463        });
3464        let source_head = Source::single(1)
3465            .concat_lazy(lazy_source)
3466            .run_with(Sink::head());
3467        assert_eq!(wait(source_head.unwrap()), 1);
3468        assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
3469
3470        let flow_counter = StdArc::new(StdAtomicUsize::new(0));
3471        let flow_counter_clone = StdArc::clone(&flow_counter);
3472        let lazy_flow_source = Source::from_materialized_factory(move |_| {
3473            flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3474            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3475        });
3476        let flow_head = Source::single(1)
3477            .via(Flow::identity().concat_lazy(lazy_flow_source))
3478            .run_with(Sink::head());
3479        assert_eq!(wait(flow_head.unwrap()), 1);
3480        assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
3481    }
3482
3483    #[test]
3484    fn also_to_completes_when_side_sink_cancels() {
3485        assert_eq!(
3486            Source::from_iter([1, 2, 3])
3487                .also_to(Sink::cancelled())
3488                .run_collect()
3489                .unwrap(),
3490            Vec::<i32>::new()
3491        );
3492        assert_eq!(
3493            Source::from_iter([1, 2, 3])
3494                .also_to_all([Sink::cancelled(), Sink::cancelled()])
3495                .run_collect()
3496                .unwrap(),
3497            Vec::<i32>::new()
3498        );
3499    }
3500
3501    #[test]
3502    fn also_to_completes_gracefully_when_side_sink_disconnects() {
3503        let result = Source::from_iter(0..100)
3504            .also_to(Sink::head())
3505            .run_collect()
3506            .unwrap();
3507        assert!(!result.is_empty(), "main should emit at least one element");
3508        assert!(
3509            result.len() < 100,
3510            "main should complete early when side disconnects"
3511        );
3512    }
3513
3514    #[test]
3515    fn also_to_propagates_original_error_when_side_is_disconnected() {
3516        let err = StreamError::Failed("distinctive-boom".into());
3517        assert!(matches!(
3518            Source::<i32>::failed(err.clone())
3519                .also_to(Sink::cancelled())
3520                .run_collect(),
3521            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3522        ));
3523        assert!(matches!(
3524            Source::<i32>::failed(err.clone())
3525                .also_to_all([Sink::cancelled()])
3526                .run_collect(),
3527            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3528        ));
3529        assert!(matches!(
3530            Source::<i32>::failed(err)
3531                .divert_to(Sink::cancelled(), |_: &i32| true)
3532                .run_collect(),
3533            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3534        ));
3535    }
3536
3537    #[test]
3538    fn divert_to_routes_matching_elements_to_side_sink() {
3539        let diverted = Source::from_iter([1, 2, 3, 4])
3540            .divert_to(Sink::ignore(), |item| item % 2 == 0)
3541            .run_collect()
3542            .unwrap();
3543        assert_eq!(diverted, vec![1, 3]);
3544    }
3545
3546    #[test]
3547    fn wire_tap_drops_when_side_sink_backpressures() {
3548        let tapped = Source::from_iter([1, 2, 3])
3549            .wire_tap(Sink::head())
3550            .run_collect()
3551            .unwrap();
3552        assert_eq!(tapped, vec![1, 2, 3]);
3553
3554        let tapped_via_flow = Source::from_iter([1, 2, 3])
3555            .via(Flow::identity().wire_tap(Sink::head()))
3556            .run_collect()
3557            .unwrap();
3558        assert_eq!(tapped_via_flow, vec![1, 2, 3]);
3559    }
3560
3561    #[test]
3562    fn async_mapping_variants_complete() {
3563        let ordered = Source::from_iter(0..4)
3564            .map_async(2, |item| async move { Ok(item * 2) })
3565            .run_collect()
3566            .unwrap();
3567        assert_eq!(ordered, vec![0, 2, 4, 6]);
3568
3569        let unordered = Source::from_iter(0..4)
3570            .map_async_unordered(2, |item| async move { Ok(item * 2) })
3571            .run_collect()
3572            .unwrap();
3573        assert_eq!(unordered, vec![0, 2, 4, 6]);
3574
3575        let partitioned = Source::from_iter(0..4)
3576            .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
3577            .run_collect()
3578            .unwrap();
3579        assert_eq!(partitioned, vec![1, 2, 3, 4]);
3580    }
3581
3582    #[test]
3583    fn map_async_ordered_bounds_pulls_behind_stuck_head() {
3584        let pulls = StdArc::new(StdAtomicUsize::new(0));
3585        let pulls_for_source = StdArc::clone(&pulls);
3586        let probe = Source::from_fn_iter(move || {
3587            let pulls = StdArc::clone(&pulls_for_source);
3588            std::iter::from_fn(move || {
3589                let next = pulls.fetch_add(1, StdOrdering::SeqCst);
3590                Some(next)
3591            })
3592        })
3593        .map_async(2, |item| async move {
3594            if item == 0 {
3595                tokio::time::sleep(StdDuration::from_millis(300)).await;
3596            }
3597            Ok(item)
3598        })
3599        .run_with(TestSink::probe())
3600        .unwrap();
3601
3602        probe.request(16);
3603        thread::sleep(StdDuration::from_millis(100));
3604        assert!(
3605            pulls.load(StdOrdering::SeqCst) <= 3,
3606            "pulled {} elements with parallelism=2 behind a stuck ordered head",
3607            pulls.load(StdOrdering::SeqCst)
3608        );
3609    }
3610
3611    #[test]
3612    fn async_mapping_parks_until_woken_future_completes() {
3613        struct WakeOnceFuture {
3614            value: Option<u64>,
3615            ready: StdArc<StdAtomicBool>,
3616            started: bool,
3617            polls: StdArc<StdAtomicUsize>,
3618            latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
3619        }
3620
3621        impl std::future::Future for WakeOnceFuture {
3622            type Output = StreamResult<u64>;
3623
3624            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3625                let this = self.as_mut().get_mut();
3626                this.polls.fetch_add(1, StdOrdering::SeqCst);
3627                *this.latest_waker.lock().expect("latest wake slot mutex") =
3628                    Some(cx.waker().clone());
3629                if this.ready.load(StdOrdering::SeqCst) {
3630                    return Poll::Ready(Ok(this.value.take().unwrap()));
3631                }
3632
3633                if !this.started {
3634                    this.started = true;
3635                    let ready = StdArc::clone(&this.ready);
3636                    let latest_waker = StdArc::clone(&this.latest_waker);
3637                    thread::spawn(move || {
3638                        thread::sleep(Duration::from_millis(20));
3639                        ready.store(true, StdOrdering::SeqCst);
3640                        if let Some(waker) =
3641                            latest_waker.lock().expect("latest wake slot mutex").take()
3642                        {
3643                            waker.wake();
3644                        }
3645                    });
3646                }
3647                Poll::Pending
3648            }
3649        }
3650
3651        let polls = StdArc::new(StdAtomicUsize::new(0));
3652        let polls_for_stage = StdArc::clone(&polls);
3653        let start = Instant::now();
3654
3655        let values = Source::single(41)
3656            .map_async(1, move |item| WakeOnceFuture {
3657                value: Some(item + 1),
3658                ready: StdArc::new(StdAtomicBool::new(false)),
3659                started: false,
3660                polls: StdArc::clone(&polls_for_stage),
3661                latest_waker: StdArc::new(Mutex::new(None)),
3662            })
3663            .run_collect()
3664            .unwrap();
3665
3666        assert_eq!(values, vec![42]);
3667        let elapsed = start.elapsed();
3668        assert!(
3669            elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
3670            "pending future should park until woken once, elapsed={elapsed:?}"
3671        );
3672        assert!(
3673            polls.load(StdOrdering::SeqCst) < 4096,
3674            "pending future was repolled too aggressively"
3675        );
3676    }
3677
3678    #[test]
3679    fn async_mapping_emits_before_unbounded_upstream_finishes() {
3680        let ordered = Source::repeat(1)
3681            .map_async(2, |item| async move { Ok(item + 1) })
3682            .take(1)
3683            .run_collect()
3684            .unwrap();
3685        assert_eq!(ordered, vec![2]);
3686
3687        let unordered = Source::repeat(1)
3688            .map_async_unordered(2, |item| async move { Ok(item + 1) })
3689            .take(1)
3690            .run_collect()
3691            .unwrap();
3692        assert_eq!(unordered, vec![2]);
3693
3694        let partitioned = Source::repeat(1)
3695            .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
3696            .take(1)
3697            .run_collect()
3698            .unwrap();
3699        assert_eq!(partitioned, vec![2]);
3700    }
3701
3702    #[test]
3703    fn partitioned_async_mapping_limits_same_key_concurrency() {
3704        let active = StdArc::new(StdAtomicUsize::new(0));
3705        let max_active = StdArc::new(StdAtomicUsize::new(0));
3706        let active_for_stage = StdArc::clone(&active);
3707        let max_for_stage = StdArc::clone(&max_active);
3708
3709        let values = Source::from_iter(0..6)
3710            .map_async_partitioned(
3711                4,
3712                1,
3713                |_| 0_u8,
3714                move |item| {
3715                    let active = StdArc::clone(&active_for_stage);
3716                    let max_active = StdArc::clone(&max_for_stage);
3717                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3718                    max_active.fetch_max(current, StdOrdering::SeqCst);
3719                    async move {
3720                        thread::sleep(Duration::from_millis(1));
3721                        active.fetch_sub(1, StdOrdering::SeqCst);
3722                        Ok(item)
3723                    }
3724                },
3725            )
3726            .run_collect()
3727            .unwrap();
3728
3729        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
3730        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3731    }
3732
3733    #[test]
3734    fn partitioned_async_mapping_scans_past_blocked_pending_key() {
3735        let active = StdArc::new(StdAtomicUsize::new(0));
3736        let max_active = StdArc::new(StdAtomicUsize::new(0));
3737        let active_for_stage = StdArc::clone(&active);
3738        let max_for_stage = StdArc::clone(&max_active);
3739        let (release_tx, release_rx) = oneshot::channel::<()>();
3740        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
3741        let release_rx_for_stage = StdArc::clone(&release_rx);
3742        let max_for_release = StdArc::clone(&max_active);
3743
3744        let releaser = thread::spawn(move || {
3745            let deadline = Instant::now() + StdDuration::from_secs(1);
3746            while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
3747                thread::yield_now();
3748            }
3749            let _ = release_tx.send(());
3750        });
3751
3752        let values = Source::from_iter([0, 2, 1])
3753            .map_async_partitioned(
3754                2,
3755                1,
3756                |item| item % 2,
3757                move |item| {
3758                    let active = StdArc::clone(&active_for_stage);
3759                    let max_active = StdArc::clone(&max_for_stage);
3760                    let release_rx = StdArc::clone(&release_rx_for_stage);
3761                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3762                    max_active.fetch_max(current, StdOrdering::SeqCst);
3763                    async move {
3764                        if item == 0 {
3765                            let receiver = release_rx
3766                                .lock()
3767                                .expect("release receiver mutex")
3768                                .take()
3769                                .expect("release receiver present");
3770                            let _ = receiver.await;
3771                        }
3772                        active.fetch_sub(1, StdOrdering::SeqCst);
3773                        Ok(item)
3774                    }
3775                },
3776            )
3777            .run_collect()
3778            .unwrap();
3779        releaser.join().unwrap();
3780
3781        assert_eq!(values, vec![0, 2, 1]);
3782        assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
3783    }
3784
3785    #[test]
3786    fn partitioned_async_mapping_p1_still_evaluates_partition() {
3787        let partitions = StdArc::new(StdAtomicUsize::new(0));
3788        let partitions_for_stage = StdArc::clone(&partitions);
3789
3790        let values = Source::from_iter(0..8)
3791            .map_async_partitioned(
3792                1,
3793                1,
3794                move |item| {
3795                    partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
3796                    item % 2
3797                },
3798                |item| async move { Ok(item + 1) },
3799            )
3800            .run_collect()
3801            .unwrap();
3802
3803        assert_eq!(values, (1..9).collect::<Vec<_>>());
3804        assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
3805    }
3806
3807    #[test]
3808    fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
3809        let active_by_key =
3810            StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
3811        let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
3812        let active_for_stage = StdArc::clone(&active_by_key);
3813        let max_for_stage = StdArc::clone(&max_by_key);
3814
3815        let values = Source::from_iter(0..512_usize)
3816            .map_async_partitioned(
3817                32,
3818                1,
3819                |item| item % 16,
3820                move |item| {
3821                    let active = StdArc::clone(&active_for_stage);
3822                    let max_active = StdArc::clone(&max_for_stage);
3823                    let key = item % 16;
3824                    let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
3825                    max_active[key].fetch_max(current, StdOrdering::SeqCst);
3826                    async move {
3827                        active[key].fetch_sub(1, StdOrdering::SeqCst);
3828                        Ok(item)
3829                    }
3830                },
3831            )
3832            .run_collect()
3833            .unwrap();
3834
3835        assert_eq!(values, (0..512).collect::<Vec<_>>());
3836        for max_active in max_by_key.iter() {
3837            assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3838        }
3839    }
3840
3841    #[test]
3842    fn error_operators_map_recover_and_complete() {
3843        let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
3844            .map_error(|_| StreamError::Failed("mapped".into()))
3845            .run_collect();
3846        assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
3847
3848        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
3849            .recover(|error| match error {
3850                StreamError::Failed(_) => Some(42),
3851                _ => None,
3852            })
3853            .run_collect()
3854            .unwrap();
3855        assert_eq!(recovered, vec![42]);
3856
3857        let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
3858            .recover(|_| None)
3859            .run_collect();
3860        assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
3861
3862        let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
3863            .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
3864            .run_collect()
3865            .unwrap();
3866        assert_eq!(recovered_with, vec![1, 2]);
3867
3868        let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
3869            .recover_with_retries(1, |_| None)
3870            .run_collect();
3871        assert_eq!(
3872            declined_recover_with,
3873            Err(StreamError::Failed("declined".into()))
3874        );
3875
3876        let completed = Source::from_factory(|| {
3877            Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
3878        })
3879        .on_error_complete()
3880        .run_collect()
3881        .unwrap();
3882        assert_eq!(completed, vec![1]);
3883    }
3884
3885    #[test]
3886    fn sliding_matches_akka_window_semantics() {
3887        // Full windows then stop; no spurious trailing partial windows.
3888        assert_eq!(
3889            Source::from_iter(1..=4)
3890                .sliding(3, 1)
3891                .run_collect()
3892                .unwrap(),
3893            vec![vec![1, 2, 3], vec![2, 3, 4]]
3894        );
3895        assert_eq!(
3896            Source::from_iter(1..=4)
3897                .sliding(2, 1)
3898                .run_collect()
3899                .unwrap(),
3900            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
3901        );
3902        // Exact multiple of size leaves nothing to emit at finish.
3903        assert_eq!(
3904            Source::from_iter(1..=3)
3905                .sliding(3, 1)
3906                .run_collect()
3907                .unwrap(),
3908            vec![vec![1, 2, 3]]
3909        );
3910        // Short streams emit a single partial window.
3911        assert_eq!(
3912            Source::from_iter(1..=2)
3913                .sliding(3, 1)
3914                .run_collect()
3915                .unwrap(),
3916            vec![vec![1, 2]]
3917        );
3918        // Every element in its own window.
3919        assert_eq!(
3920            Source::from_iter(1..=3)
3921                .sliding(1, 1)
3922                .run_collect()
3923                .unwrap(),
3924            vec![vec![1], vec![2], vec![3]]
3925        );
3926        // step > size skips the gap between windows.
3927        assert_eq!(
3928            Source::from_iter(1..=6)
3929                .sliding(2, 3)
3930                .run_collect()
3931                .unwrap(),
3932            vec![vec![1, 2], vec![4, 5]]
3933        );
3934        // step > size with an in-progress trailing window is dropped (Akka).
3935        assert_eq!(
3936            Source::from_iter(1..=3)
3937                .sliding(2, 4)
3938                .run_collect()
3939                .unwrap(),
3940            vec![vec![1, 2]]
3941        );
3942    }
3943
3944    #[test]
3945    fn recover_with_retries_indefinitely_like_akka() {
3946        let attempts = StdArc::new(StdAtomicUsize::new(0));
3947        let attempts_in_stage = StdArc::clone(&attempts);
3948        // The first several recovery sources keep failing; `recover_with` must
3949        // retry past Datum's old single-retry limit to reach the good source.
3950        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
3951            .recover_with(move |_error| {
3952                if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
3953                    Some(Source::<i32>::failed(StreamError::Failed("again".into())))
3954                } else {
3955                    Some(Source::from_iter([42]))
3956                }
3957            })
3958            .run_collect()
3959            .unwrap();
3960        assert_eq!(recovered, vec![42]);
3961        assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
3962    }
3963
3964    #[test]
3965    fn many_concurrent_streams_do_not_starve_the_pool() {
3966        // Regression: a fixed-size pool deadlocks once `num_cpus` streams each
3967        // monopolize a worker. The growing pool must keep serving new streams.
3968        //
3969        // Use a small fixed count rather than `available_parallelism() + 2`: the
3970        // point is that several concurrent never-completing streams cannot starve
3971        // a fresh one, and a fixed count keeps this from spawning a large number
3972        // of permanent threads (which, when the whole suite runs under load, can
3973        // exhaust the thread limit and wedge the shared executor).
3974        let materializer = Materializer::new();
3975        let busy = 6_usize;
3976
3977        let mut held = Vec::with_capacity(busy);
3978        for _ in 0..busy {
3979            held.push(
3980                Source::single(1_u64)
3981                    .run_with_materializer(Sink::never(), &materializer)
3982                    .unwrap(),
3983            );
3984        }
3985
3986        for _ in 0..400 {
3987            if materializer.active_streams() >= busy {
3988                break;
3989            }
3990            thread::sleep(Duration::from_millis(5));
3991        }
3992        assert_eq!(materializer.active_streams(), busy);
3993
3994        // A fresh finite stream must still complete despite every prior worker
3995        // being occupied by a never-completing sink.
3996        let sum = Source::from_iter(0_u64..5)
3997            .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
3998            .unwrap();
3999        assert_eq!(sum.wait().unwrap(), 10);
4000
4001        materializer.shutdown();
4002        for completion in held {
4003            assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4004        }
4005    }
4006}