Skip to main content

datum/stream/
mod.rs

1//! Core push-stream vocabulary and the first reusable Source/Flow API slice.
2
3use std::{
4    collections::{BTreeMap, HashMap, VecDeque},
5    fmt,
6    future::Future,
7    hash::Hash,
8    marker::PhantomData,
9    panic::{AssertUnwindSafe, catch_unwind},
10    pin::Pin,
11    sync::{
12        Arc, Condvar, Mutex, OnceLock,
13        atomic::{AtomicBool, AtomicUsize, Ordering},
14    },
15    task::{Context, Poll},
16    thread,
17    time::Duration,
18};
19
20use futures::{channel::oneshot, executor::block_on};
21use thiserror::Error;
22use tokio::{
23    runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
24    task::{JoinError, JoinHandle},
25};
26
27pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
28pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
29pub(crate) type RuntimeTransform<In, Out> =
30    Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
31type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
32type HintedSinkRunner<In, Mat> =
33    dyn Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat> + Send + Sync;
34type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
35const STREAM_READY_SPINS: usize = 256;
36/// Spin-loop hints between consecutive drain-thread readiness checks. Back off
37/// a small block of hints between polls so the spin window still covers the
38/// common fast-completion case without burning a full busy loop before
39/// parking.
40const STREAM_SPIN_BACKOFF: usize = 8;
41const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
42
43/// Private metadata for known finite, synchronous micro-sources.
44/// Set only on Datum-owned constructors whose `next()` is guaranteed
45/// memory-backed and whose total success item count is known at blueprint time.
46#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47struct InlineMicroSourceHint {
48    max_success_items: usize,
49}
50
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52struct SourceHints {
53    inline_head_terminal: bool,
54    /// Set only for constructors where Datum owns the full iterator and can
55    /// prove it is finite, synchronous, and memory-backed.
56    inline_micro: Option<InlineMicroSourceHint>,
57    /// Set only when terminal `fold`/`collect`/`ignore` may amortize the
58    /// checked-stream cancellation/shutdown wrapper. Unknown, probe, queue, IO,
59    /// and timer-backed sources keep the per-element checked path.
60    terminal_consumer_batch: bool,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub(crate) struct SourceRuntimeHints {
65    pub(crate) inline_micro_max_success_items: Option<usize>,
66    pub(crate) terminal_consumer_batch: bool,
67}
68
69impl SourceHints {
70    const fn with_inline_micro(max_success_items: usize) -> Self {
71        Self {
72            inline_head_terminal: true,
73            inline_micro: Some(InlineMicroSourceHint { max_success_items }),
74            terminal_consumer_batch: true,
75        }
76    }
77
78    const fn with_terminal_consumer_batch() -> Self {
79        Self {
80            inline_head_terminal: false,
81            inline_micro: None,
82            terminal_consumer_batch: true,
83        }
84    }
85
86    fn after_flow(self, flow: FlowHints) -> Self {
87        if flow.preserves_inline_head_terminal {
88            // inline_micro is intentionally not propagated through any flow:
89            // even a trivial map wraps the iterator, making eligibility uncertain.
90            Self {
91                inline_head_terminal: true,
92                inline_micro: None,
93                terminal_consumer_batch: self.terminal_consumer_batch
94                    && flow.preserves_terminal_consumer_batch,
95            }
96        } else {
97            Self {
98                inline_head_terminal: false,
99                inline_micro: None,
100                terminal_consumer_batch: self.terminal_consumer_batch
101                    && flow.preserves_terminal_consumer_batch,
102            }
103        }
104    }
105
106    fn without_inline_micro(self) -> Self {
107        Self {
108            inline_head_terminal: self.inline_head_terminal,
109            inline_micro: None,
110            terminal_consumer_batch: self.terminal_consumer_batch,
111        }
112    }
113
114    fn runtime(self) -> SourceRuntimeHints {
115        SourceRuntimeHints {
116            inline_micro_max_success_items: self.inline_micro.map(|hint| hint.max_success_items),
117            terminal_consumer_batch: self.terminal_consumer_batch,
118        }
119    }
120}
121
122#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
123struct FlowHints {
124    preserves_inline_head_terminal: bool,
125    preserves_terminal_consumer_batch: bool,
126}
127
128impl FlowHints {
129    const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
130        preserves_inline_head_terminal: true,
131        preserves_terminal_consumer_batch: true,
132    };
133
134    const PRESERVES_TERMINAL_CONSUMER_BATCH: Self = Self {
135        preserves_inline_head_terminal: false,
136        preserves_terminal_consumer_batch: true,
137    };
138
139    fn then(self, next: Self) -> Self {
140        Self {
141            preserves_inline_head_terminal: self.preserves_inline_head_terminal
142                && next.preserves_inline_head_terminal,
143            preserves_terminal_consumer_batch: self.preserves_terminal_consumer_batch
144                && next.preserves_terminal_consumer_batch,
145        }
146    }
147}
148
149struct PartitionSlot<Key, Out> {
150    key: Option<Key>,
151    active: usize,
152    queued: VecDeque<(usize, Out)>,
153    in_ready_queue: bool,
154}
155
156struct AbortOnDropHandle<T> {
157    handle: JoinHandle<T>,
158}
159
160impl<T> AbortOnDropHandle<T> {
161    fn new(handle: JoinHandle<T>) -> Self {
162        Self { handle }
163    }
164}
165
166impl<T> Drop for AbortOnDropHandle<T> {
167    fn drop(&mut self) {
168        self.handle.abort();
169    }
170}
171
172impl<T> Future for AbortOnDropHandle<T> {
173    type Output = Result<T, JoinError>;
174
175    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176        Pin::new(&mut self.handle).poll(cx)
177    }
178}
179
180impl<T> Unpin for AbortOnDropHandle<T> {}
181
182pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
183    static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
184    RUNTIME.get_or_init(|| {
185        TokioRuntimeBuilder::new_multi_thread()
186            .enable_all()
187            .thread_name("datum-stream-tokio")
188            .build()
189            .expect("stream tokio runtime")
190    })
191}
192
193fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
194where
195    Fut: Future<Output = T> + Send + 'static,
196    T: Send + 'static,
197{
198    AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
199}
200
201pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
202    runtime::current_stream_cancelled()
203}
204
205pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
206where
207    F: FnOnce() -> T,
208{
209    catch_unwind(AssertUnwindSafe(f))
210        .map_err(|_| StreamError::Failed(format!("{context} panicked")))
211}
212
213impl<Key, Out> PartitionSlot<Key, Out> {
214    fn new(key: Key) -> Self {
215        Self {
216            key: Some(key),
217            active: 0,
218            queued: VecDeque::new(),
219            in_ready_queue: false,
220        }
221    }
222}
223
224#[inline(always)]
225fn partition_slot_for<Key, Out>(
226    key: Key,
227    slots_by_key: &mut HashMap<Key, usize>,
228    slots: &mut Vec<PartitionSlot<Key, Out>>,
229    free_slots: &mut Vec<usize>,
230) -> usize
231where
232    Key: Clone + Eq + Hash,
233{
234    if let Some(slot) = slots_by_key.get(&key) {
235        return *slot;
236    }
237
238    let slot = if let Some(slot) = free_slots.pop() {
239        let state = &mut slots[slot];
240        state.key = Some(key.clone());
241        state.active = 0;
242        state.queued.clear();
243        state.in_ready_queue = false;
244        slot
245    } else {
246        slots.push(PartitionSlot::new(key.clone()));
247        slots.len() - 1
248    };
249    slots_by_key.insert(key, slot);
250    slot
251}
252
253#[inline(always)]
254fn retire_partition_slot<Key, Out>(
255    slot: usize,
256    slots_by_key: &mut HashMap<Key, usize>,
257    slots: &mut [PartitionSlot<Key, Out>],
258    free_slots: &mut Vec<usize>,
259) where
260    Key: Eq + Hash,
261{
262    let state = &mut slots[slot];
263    if let Some(key) = state.key.take() {
264        slots_by_key.remove(&key);
265    }
266    state.active = 0;
267    state.queued.clear();
268    state.in_ready_queue = false;
269    free_slots.push(slot);
270}
271
272#[inline(always)]
273fn ready_partition_slot<Key, Out>(
274    slots: &mut [PartitionSlot<Key, Out>],
275    ready_slots: &mut VecDeque<usize>,
276    slot: usize,
277    per_partition: usize,
278) {
279    if let Some(state) = slots.get_mut(slot)
280        && state.key.is_some()
281        && !state.in_ready_queue
282        && state.active < per_partition
283        && !state.queued.is_empty()
284    {
285        state.in_ready_queue = true;
286        ready_slots.push_back(slot);
287    }
288}
289
290#[inline(always)]
291fn pop_ready_partition_slot<Key, Out>(
292    slots: &mut [PartitionSlot<Key, Out>],
293    ready_slots: &mut VecDeque<usize>,
294    per_partition: usize,
295) -> Option<(usize, usize, Out)> {
296    while let Some(slot) = ready_slots.pop_front() {
297        let mut requeue = false;
298        let item = if let Some(state) = slots.get_mut(slot) {
299            state.in_ready_queue = false;
300            if state.key.is_some() && state.active < per_partition {
301                let item = state.queued.pop_front().map(|(index, item)| {
302                    state.active += 1;
303                    (index, slot, item)
304                });
305                if !state.queued.is_empty() && state.active < per_partition {
306                    state.in_ready_queue = true;
307                    requeue = true;
308                }
309                item
310            } else {
311                None
312            }
313        } else {
314            None
315        };
316
317        if requeue {
318            ready_slots.push_back(slot);
319        }
320        if item.is_some() {
321            return item;
322        }
323    }
324    None
325}
326
327pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
328    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
329}
330
331struct FnSourceFactory<F>(F);
332
333impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
334where
335    F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
336{
337    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
338        (self.0)(materializer)
339    }
340}
341
342struct MapSourceFactory<In, Out, Mat, F> {
343    source: Arc<dyn SourceFactory<In, Mat>>,
344    stage: F,
345    _marker: PhantomData<fn(In) -> Out>,
346}
347
348impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
349where
350    In: Send + 'static,
351    Out: Send + 'static,
352    Mat: Send + 'static,
353    F: Fn(In) -> Out + Send + Sync + 'static,
354{
355    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
356        let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
357        Ok((
358            Box::new(MapSourceStream {
359                input: stream,
360                factory: self,
361            }),
362            mat,
363        ))
364    }
365}
366
367struct MapSourceStream<In, Out, Mat, F> {
368    input: BoxStream<In>,
369    factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
370}
371
372impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
373where
374    F: Fn(In) -> Out,
375{
376    type Item = StreamResult<Out>;
377
378    fn next(&mut self) -> Option<Self::Item> {
379        self.input
380            .next()
381            .map(|item| item.map(|item| (self.factory.stage)(item)))
382    }
383}
384
385fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
386where
387    Out: Send + 'static,
388{
389    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
390    let mut current = 0usize;
391    Box::new(std::iter::from_fn(move || {
392        loop {
393            let index = next_active_optional_stream(&streams, current, |_| true)?;
394            current = (index + 1) % streams.len().max(1);
395            let Some(stream) = streams[index].as_mut() else {
396                continue;
397            };
398            match stream.next() {
399                Some(item) => return Some(item),
400                None => {
401                    streams[index] = None;
402                    if eager_complete {
403                        return None;
404                    }
405                }
406            }
407        }
408    }))
409}
410
411fn merge_prioritized_streams<Out>(
412    streams: Vec<BoxStream<Out>>,
413    priorities: Vec<usize>,
414    eager_complete: bool,
415) -> BoxStream<Out>
416where
417    Out: Send + 'static,
418{
419    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
420    let schedule: Vec<usize> = priorities
421        .into_iter()
422        .enumerate()
423        .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
424        .collect();
425    let mut schedule_index = 0usize;
426    Box::new(std::iter::from_fn(move || {
427        loop {
428            if streams.iter().all(Option::is_none) {
429                return None;
430            }
431            let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
432            let Some(stream) = streams[index].as_mut() else {
433                continue;
434            };
435            match stream.next() {
436                Some(item) => return Some(item),
437                None => {
438                    streams[index] = None;
439                    if eager_complete {
440                        return None;
441                    }
442                }
443            }
444        }
445    }))
446}
447
448fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
449where
450    Out: Ord + Send + 'static,
451{
452    let mut left_next: Option<Out> = None;
453    let mut right_next: Option<Out> = None;
454    let mut left_done = false;
455    let mut right_done = false;
456    Box::new(std::iter::from_fn(move || {
457        loop {
458            if left_next.is_none() && !left_done {
459                match left.next() {
460                    Some(Ok(item)) => left_next = Some(item),
461                    Some(Err(error)) => return Some(Err(error)),
462                    None => left_done = true,
463                }
464            }
465            if right_next.is_none() && !right_done {
466                match right.next() {
467                    Some(Ok(item)) => right_next = Some(item),
468                    Some(Err(error)) => return Some(Err(error)),
469                    None => right_done = true,
470                }
471            }
472
473            let next = match (&left_next, &right_next) {
474                (Some(left_item), Some(right_item)) => {
475                    if left_item <= right_item {
476                        left_next.take()
477                    } else {
478                        right_next.take()
479                    }
480                }
481                (Some(_), None) if right_done => left_next.take(),
482                (None, Some(_)) if left_done => right_next.take(),
483                (None, None) if left_done && right_done => return None,
484                _ => continue,
485            };
486            if let Some(item) = next {
487                return Some(Ok(item));
488            }
489        }
490    }))
491}
492
493fn merge_latest_streams<Out>(
494    streams: Vec<BoxStream<Out>>,
495    eager_complete: bool,
496) -> BoxStream<Vec<Out>>
497where
498    Out: Clone + Send + 'static,
499{
500    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
501    let mut latest = vec![None; streams.len()];
502    let mut seen = 0usize;
503    let mut current = 0usize;
504    let mut pending = VecDeque::<Vec<Out>>::new();
505    Box::new(std::iter::from_fn(move || {
506        loop {
507            if let Some(output) = pending.pop_front() {
508                return Some(Ok(output));
509            }
510            if streams.iter().all(Option::is_none) {
511                return None;
512            }
513            let index = next_active_optional_stream(&streams, current, |_| true)?;
514            current = (index + 1) % streams.len().max(1);
515            let Some(stream) = streams[index].as_mut() else {
516                continue;
517            };
518            match stream.next() {
519                Some(Ok(item)) => {
520                    if latest[index].is_none() {
521                        seen += 1;
522                    }
523                    latest[index] = Some(item);
524                    if seen == latest.len() {
525                        pending.push_back(
526                            latest
527                                .iter()
528                                .map(|item| item.clone().expect("merge-latest initialized"))
529                                .collect(),
530                        );
531                    }
532                }
533                Some(Err(error)) => return Some(Err(error)),
534                None => {
535                    streams[index] = None;
536                    if eager_complete {
537                        return None;
538                    }
539                }
540            }
541        }
542    }))
543}
544
545fn zip_streams<Left, Right>(
546    mut left: BoxStream<Left>,
547    mut right: BoxStream<Right>,
548) -> BoxStream<(Left, Right)>
549where
550    Left: Send + 'static,
551    Right: Send + 'static,
552{
553    let mut left_next: Option<Left> = None;
554    let mut right_next: Option<Right> = None;
555    let mut left_done = false;
556    let mut right_done = false;
557    Box::new(std::iter::from_fn(move || {
558        loop {
559            if left_next.is_none() && !left_done {
560                match left.next() {
561                    Some(Ok(item)) => left_next = Some(item),
562                    Some(Err(error)) => return Some(Err(error)),
563                    None => left_done = true,
564                }
565            }
566            if right_next.is_none() && !right_done {
567                match right.next() {
568                    Some(Ok(item)) => right_next = Some(item),
569                    Some(Err(error)) => return Some(Err(error)),
570                    None => right_done = true,
571                }
572            }
573            match (left_next.take(), right_next.take()) {
574                (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
575                (left_item, right_item) => {
576                    left_next = left_item;
577                    right_next = right_item;
578                    if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
579                        return None;
580                    }
581                }
582            }
583        }
584    }))
585}
586
587fn zip_latest_with_stream<Left, Right, Out, F>(
588    mut left: BoxStream<Left>,
589    mut right: BoxStream<Right>,
590    eager_complete: bool,
591    combine: Arc<F>,
592) -> BoxStream<Out>
593where
594    Left: Clone + Send + 'static,
595    Right: Clone + Send + 'static,
596    Out: Send + 'static,
597    F: Fn(Left, Right) -> Out + Send + Sync + 'static,
598{
599    let mut left_latest: Option<Left> = None;
600    let mut right_latest: Option<Right> = None;
601    let mut left_done = false;
602    let mut right_done = false;
603    let mut turn_left = true;
604    let mut pending = VecDeque::<Out>::new();
605
606    Box::new(std::iter::from_fn(move || {
607        loop {
608            if let Some(output) = pending.pop_front() {
609                return Some(Ok(output));
610            }
611            if eager_complete && (left_done || right_done) {
612                return None;
613            }
614            if left_done && right_done {
615                return None;
616            }
617            // A side that completed without ever emitting can never be paired:
618            // no further output is possible, so draining the other (possibly
619            // infinite) side would loop forever producing nothing.
620            if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
621                return None;
622            }
623
624            let pull_left = if left_done {
625                false
626            } else if right_done {
627                true
628            } else {
629                let value = turn_left;
630                turn_left = !turn_left;
631                value
632            };
633
634            if pull_left {
635                match left.next() {
636                    Some(Ok(item)) => {
637                        left_latest = Some(item);
638                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
639                            pending.push_back(combine(left_item.clone(), right_item.clone()));
640                        }
641                    }
642                    Some(Err(error)) => return Some(Err(error)),
643                    None => {
644                        left_done = true;
645                        if eager_complete {
646                            return None;
647                        }
648                    }
649                }
650            } else {
651                match right.next() {
652                    Some(Ok(item)) => {
653                        right_latest = Some(item);
654                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
655                            pending.push_back(combine(left_item.clone(), right_item.clone()));
656                        }
657                    }
658                    Some(Err(error)) => return Some(Err(error)),
659                    None => {
660                        right_done = true;
661                        if eager_complete {
662                            return None;
663                        }
664                    }
665                }
666            }
667        }
668    }))
669}
670
671fn zip_all_stream<Left, Right>(
672    mut left: BoxStream<Left>,
673    mut right: BoxStream<Right>,
674    left_fill: Left,
675    right_fill: Right,
676) -> BoxStream<(Left, Right)>
677where
678    Left: Clone + Send + 'static,
679    Right: Clone + Send + 'static,
680{
681    let mut left_done = false;
682    let mut right_done = false;
683    Box::new(std::iter::from_fn(move || {
684        if left_done && right_done {
685            return None;
686        }
687
688        let left_item = if left_done {
689            None
690        } else {
691            match left.next() {
692                Some(Ok(item)) => Some(item),
693                Some(Err(error)) => return Some(Err(error)),
694                None => {
695                    left_done = true;
696                    None
697                }
698            }
699        };
700        let right_item = if right_done {
701            None
702        } else {
703            match right.next() {
704                Some(Ok(item)) => Some(item),
705                Some(Err(error)) => return Some(Err(error)),
706                None => {
707                    right_done = true;
708                    None
709                }
710            }
711        };
712
713        match (left_item, right_item) {
714            (None, None) if left_done && right_done => None,
715            (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
716            (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
717            (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
718            (None, None) => None,
719        }
720    }))
721}
722
723fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
724where
725    Out: Send + 'static,
726    Next: Send + 'static,
727    F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
728{
729    let count = streams.len();
730    if count == 0 {
731        return Box::new(std::iter::empty());
732    }
733    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
734    let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
735    let mut current = 0usize;
736    Box::new(std::iter::from_fn(move || {
737        loop {
738            if slots.iter().all(Option::is_some) {
739                let values = slots
740                    .iter_mut()
741                    .map(|slot| slot.take().expect("zip-n slot filled"))
742                    .collect();
743                return Some(Ok(zipper(values)));
744            }
745
746            let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
747            current = (index + 1) % count.max(1);
748            let Some(stream) = streams[index].as_mut() else {
749                continue;
750            };
751            match stream.next() {
752                Some(Ok(item)) => slots[index] = Some(item),
753                Some(Err(error)) => return Some(Err(error)),
754                None => {
755                    streams[index] = None;
756                    slots[index].as_ref()?;
757                }
758            }
759        }
760    }))
761}
762
763fn next_active_optional_stream<T, F>(
764    streams: &[Option<BoxStream<T>>],
765    current: usize,
766    predicate: F,
767) -> Option<usize>
768where
769    T: Send + 'static,
770    F: Fn(usize) -> bool,
771{
772    if streams.is_empty() {
773        return None;
774    }
775    for offset in 0..streams.len() {
776        let index = (current + offset) % streams.len();
777        if streams[index].is_some() && predicate(index) {
778            return Some(index);
779        }
780    }
781    None
782}
783
784fn next_weighted_stream<T>(
785    streams: &[Option<BoxStream<T>>],
786    schedule: &[usize],
787    schedule_index: &mut usize,
788) -> Option<usize>
789where
790    T: Send + 'static,
791{
792    if streams.is_empty() || schedule.is_empty() {
793        return None;
794    }
795    for _ in 0..schedule.len() {
796        let index = schedule[*schedule_index % schedule.len()];
797        *schedule_index = (*schedule_index + 1) % schedule.len();
798        if streams.get(index).is_some_and(Option::is_some) {
799            return Some(index);
800        }
801    }
802    None
803}
804
805mod completion;
806mod error;
807mod flow;
808mod rate;
809mod restart;
810mod runtime;
811mod sink;
812mod source;
813mod time;
814mod timer;
815
816/// Opaque hook on `Source<T>` for split-segment fast-path sources.
817/// Non-`None` only on `Source`s returned by the split fast path.
818/// Cleared by all composition operators (via/map/to_mat etc.).
819pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
820    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
821}
822
823/// Fast-path descriptor for fold/collect/ignore sinks in split-segment context.
824/// Non-`None` only for recognised sinks (`Sink::fold`, `fold_result`, `collect`, `ignore`).
825pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
826    /// Try to register directly with a split segment hook.
827    /// Returns `None` if the hook type is not recognised.
828    /// Returns `Some(Ok(mat))` on success where `mat` is `Box<StreamCompletion<Acc>>`.
829    fn try_register(
830        &self,
831        hook: Arc<dyn SplitSegmentHookDyn>,
832    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
833}
834
835use self::runtime::{runtime_checked_stream, set_current_stream_cancelled};
836
837pub use self::{
838    completion::{Cancellable, StreamCompletion},
839    error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
840    flow::{BidiFlow, Flow},
841    rate::{AggregateTimer, OverflowStrategy},
842    restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
843    runtime::{Materializer, Runtime},
844    sink::{RunnableGraph, Sink, SinkCombineStrategy},
845    source::{Demand, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy},
846    time::{DelayOverflowStrategy, ThrottleMode},
847};
848
849#[cfg(test)]
850mod tests {
851    use super::*;
852    use crate::Attributes;
853    use crate::testkit::TestSink;
854    use std::fs;
855    use std::sync::{
856        Arc as StdArc,
857        atomic::{
858            AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
859        },
860        mpsc,
861    };
862    use std::time::Duration as StdDuration;
863    use std::time::Instant;
864
865    fn wait<T>(completion: StreamCompletion<T>) -> T {
866        completion.wait().unwrap()
867    }
868
869    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
870        let deadline = Instant::now() + timeout;
871        while Instant::now() < deadline {
872            if condition() {
873                return true;
874            }
875            thread::sleep(Duration::from_millis(2));
876        }
877        condition()
878    }
879
880    fn linux_thread_count(thread_name: &str) -> usize {
881        fs::read_dir("/proc/self/task")
882            .expect("task directory readable")
883            .filter_map(Result::ok)
884            .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
885            .filter(|name| name.trim() == thread_name)
886            .count()
887    }
888
889    #[test]
890    fn source_blueprints_are_reusable() {
891        let source = Source::from_iter(0..5).map(|item| item + 1);
892
893        assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
894        assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
895    }
896
897    #[test]
898    fn source_map_preserves_materialized_value() {
899        let graph = Source::single(1)
900            .map_materialized_value(|_| "source")
901            .map(|item| item + 1)
902            .to_mat(Sink::head(), Keep::both);
903
904        let materialized = graph.run().unwrap();
905        assert_eq!(materialized.0, "source");
906        assert_eq!(wait(materialized.1), 2);
907    }
908
909    #[test]
910    fn source_and_flow_compose() {
911        let flow = Flow::identity()
912            .map(|item: i32| item * 2)
913            .filter(|item| item % 3 == 0);
914
915        let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
916
917        assert_eq!(result, vec![0, 6, 12]);
918    }
919
920    #[test]
921    fn sink_setup_sees_materializer_defaults_and_local_attributes() {
922        let observed = StdArc::new(Mutex::new(None));
923        let observed_in_setup = StdArc::clone(&observed);
924        let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
925            *observed_in_setup.lock().unwrap() = Some((
926                attrs.name().map(str::to_owned),
927                attrs.input_buffer_hint(),
928                attrs.dispatcher_hint().map(str::to_owned),
929            ));
930            Sink::ignore()
931        })
932        .add_attributes(Attributes::named("sink-inner"))
933        .add_attributes(Attributes::input_buffer(4, 4))
934        .add_attributes(Attributes::dispatcher("bench-dispatcher"));
935
936        let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
937        wait(
938            Source::from_iter([1, 2, 3])
939                .run_with_materializer(sink, &materializer)
940                .unwrap(),
941        );
942
943        assert_eq!(
944            *observed.lock().unwrap(),
945            Some((
946                Some("sink-inner".to_owned()),
947                Some((4, 4)),
948                Some("bench-dispatcher".to_owned())
949            ))
950        );
951    }
952
953    #[test]
954    fn sink_pre_materialize_feeds_existing_materialization() {
955        let materializer = Materializer::new();
956        let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
957            .pre_materialize(&materializer)
958            .unwrap();
959
960        Source::from_iter([1, 2, 3])
961            .run_with_materializer(pre, &materializer)
962            .unwrap();
963
964        assert_eq!(wait(completion), vec![1, 2, 3]);
965    }
966
967    #[test]
968    fn flow_from_sink_and_source_connects_both_sides() {
969        assert_eq!(
970            Source::from_iter([1, 2, 3])
971                .via(Flow::from_sink_and_source(
972                    Sink::foreach(|_item: i32| {}),
973                    Source::from_iter([10, 20, 30]),
974                ))
975                .run_collect()
976                .unwrap(),
977            vec![10, 20, 30]
978        );
979    }
980
981    #[test]
982    fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
983        let completed = StdArc::new(StdAtomicBool::new(false));
984        let on_complete = StdArc::clone(&completed);
985        let flow = Flow::from_sink_and_source(
986            Sink::on_complete(move || {
987                on_complete.store(true, StdOrdering::SeqCst);
988            }),
989            Source::single(10),
990        );
991
992        let result = Source::from_iter([1, 2, 3])
993            .via(flow)
994            .run_collect()
995            .unwrap();
996
997        assert_eq!(result, vec![10]);
998        assert!(wait_until(StdDuration::from_secs(1), || {
999            completed.load(StdOrdering::SeqCst)
1000        }));
1001    }
1002
1003    #[test]
1004    fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
1005        let cancellable = StdArc::new(Mutex::new(None));
1006        let observed = StdArc::clone(&cancellable);
1007        let flow = Flow::from_sink_and_source_coupled(
1008            Sink::ignore(),
1009            Source::tick(
1010                StdDuration::from_millis(50),
1011                StdDuration::from_millis(50),
1012                10,
1013            )
1014            .map_materialized_value(move |handle| {
1015                *observed.lock().unwrap() = Some(handle.clone());
1016                handle
1017            }),
1018        );
1019
1020        let completion = Source::from_iter(std::iter::empty::<i32>())
1021            .via(flow)
1022            .run_with(Sink::ignore())
1023            .unwrap();
1024        assert!(wait_until(StdDuration::from_secs(1), || {
1025            cancellable
1026                .lock()
1027                .unwrap()
1028                .as_ref()
1029                .is_some_and(Cancellable::is_cancelled)
1030        }));
1031        assert_eq!(wait(completion), NotUsed);
1032    }
1033
1034    #[test]
1035    fn bidi_flow_join_and_atop_compose() {
1036        let codec = BidiFlow::from_flows(
1037            Flow::identity().map(|item: i32| item + 1),
1038            Flow::identity().map(|item: i32| item * 2),
1039        )
1040        .named("codec");
1041        let framing = BidiFlow::from_flows(
1042            Flow::identity().map(|item: i32| item * 3),
1043            Flow::identity().map(|item: i32| item - 4),
1044        );
1045
1046        let joined = codec
1047            .clone()
1048            .join(Flow::identity().map(|item: i32| item - 5));
1049        let stacked = codec.atop(framing).join(Flow::identity());
1050
1051        assert_eq!(
1052            Source::single(10).via(joined).run_collect().unwrap(),
1053            vec![12]
1054        );
1055        assert_eq!(
1056            Source::single(10).via(stacked).run_collect().unwrap(),
1057            vec![58]
1058        );
1059    }
1060
1061    #[test]
1062    fn flow_buffer_then_map_runs_end_to_end() {
1063        let flow = Flow::identity()
1064            .buffer(8, OverflowStrategy::Backpressure)
1065            .map(|item: i32| item + 1);
1066
1067        let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1068
1069        assert_eq!(result, vec![1, 2, 3, 4]);
1070    }
1071
1072    #[test]
1073    fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1074        fn buffered_flow() -> Flow<i32, i32> {
1075            Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1076        }
1077
1078        assert_eq!(
1079            Source::from_iter(0..4)
1080                .via(buffered_flow().filter(|item| *item % 2 == 0))
1081                .run_collect()
1082                .unwrap(),
1083            vec![0, 2]
1084        );
1085        assert_eq!(
1086            Source::from_iter(0..4)
1087                .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1088                .run_collect()
1089                .unwrap(),
1090            vec![1, 3]
1091        );
1092        assert_eq!(
1093            Source::from_iter(0..4)
1094                .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1095                .run_collect()
1096                .unwrap(),
1097            vec![10, 12]
1098        );
1099        assert_eq!(
1100            Source::from_iter(0..3)
1101                .via(buffered_flow().map_concat(|item| [item, item + 10]))
1102                .run_collect()
1103                .unwrap(),
1104            vec![0, 10, 1, 11, 2, 12]
1105        );
1106        assert_eq!(
1107            Source::from_iter(0..3)
1108                .via(buffered_flow().stateful_map(5, |state, item| {
1109                    *state += item;
1110                    *state
1111                }))
1112                .run_collect()
1113                .unwrap(),
1114            vec![5, 6, 8]
1115        );
1116        assert_eq!(
1117            Source::from_iter(0..3)
1118                .via(buffered_flow().stateful_map_concat(0, |state, item| {
1119                    *state += item;
1120                    [*state, item]
1121                }))
1122                .run_collect()
1123                .unwrap(),
1124            vec![0, 0, 1, 1, 3, 2]
1125        );
1126        assert_eq!(
1127            Source::from_iter(0..4)
1128                .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1129                .run_collect()
1130                .unwrap(),
1131            vec![1, 2, 3, 4]
1132        );
1133        assert_eq!(
1134            Source::from_iter(0..4)
1135                .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1136                .run_collect()
1137                .unwrap(),
1138            vec![1, 2, 3, 4]
1139        );
1140        assert_eq!(
1141            Source::from_iter(0..4)
1142                .via(buffered_flow().map_async_partitioned(
1143                    2,
1144                    1,
1145                    |item| item % 2,
1146                    |item| async move { Ok(item + 1) },
1147                ))
1148                .run_collect()
1149                .unwrap(),
1150            vec![1, 2, 3, 4]
1151        );
1152        assert_eq!(
1153            Source::from_iter(0..5)
1154                .via(buffered_flow().take(3))
1155                .run_collect()
1156                .unwrap(),
1157            vec![0, 1, 2]
1158        );
1159        assert_eq!(
1160            Source::from_iter(0..5)
1161                .via(buffered_flow().drop(2))
1162                .run_collect()
1163                .unwrap(),
1164            vec![2, 3, 4]
1165        );
1166        assert_eq!(
1167            Source::from_iter(0..5)
1168                .via(buffered_flow().take_while(|item| *item < 3))
1169                .run_collect()
1170                .unwrap(),
1171            vec![0, 1, 2]
1172        );
1173        assert_eq!(
1174            Source::from_iter(0..5)
1175                .via(buffered_flow().drop_while(|item| *item < 3))
1176                .run_collect()
1177                .unwrap(),
1178            vec![3, 4]
1179        );
1180        assert_eq!(
1181            Source::from_iter(0..3)
1182                .via(buffered_flow().limit(5))
1183                .run_collect()
1184                .unwrap(),
1185            vec![0, 1, 2]
1186        );
1187        assert_eq!(
1188            Source::from_iter(0..5)
1189                .via(buffered_flow().grouped(2))
1190                .run_collect()
1191                .unwrap(),
1192            vec![vec![0, 1], vec![2, 3], vec![4]]
1193        );
1194        assert_eq!(
1195            Source::from_iter(1..=3)
1196                .via(buffered_flow().scan(0, |acc, item| acc + item))
1197                .run_collect()
1198                .unwrap(),
1199            vec![0, 1, 3, 6]
1200        );
1201        assert_eq!(
1202            Source::from_iter(1..=4)
1203                .via(buffered_flow().sliding(2, 1))
1204                .run_collect()
1205                .unwrap(),
1206            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1207        );
1208        assert_eq!(
1209            Source::from_iter(1..=4)
1210                .via(buffered_flow().fold(0, |acc, item| acc + item))
1211                .run_collect()
1212                .unwrap(),
1213            vec![10]
1214        );
1215        assert_eq!(
1216            Source::from_iter(1..=4)
1217                .via(buffered_flow().reduce(|acc, item| acc + item))
1218                .run_collect()
1219                .unwrap(),
1220            vec![10]
1221        );
1222        assert_eq!(
1223            Source::from_factory(|| {
1224                Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1225            })
1226            .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1227            .run_collect(),
1228            Err(StreamError::Failed("mapped".into()))
1229        );
1230        assert_eq!(
1231            Source::<i32>::failed(StreamError::Failed("boom".into()))
1232                .via(buffered_flow().recover(|_| Some(42)))
1233                .run_collect()
1234                .unwrap(),
1235            vec![42]
1236        );
1237        assert_eq!(
1238            Source::<i32>::failed(StreamError::Failed("boom".into()))
1239                .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1240                .run_collect()
1241                .unwrap(),
1242            vec![7, 8]
1243        );
1244        assert_eq!(
1245            Source::<i32>::failed(StreamError::Failed("boom".into()))
1246                .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1247                .run_collect()
1248                .unwrap(),
1249            vec![9]
1250        );
1251        assert_eq!(
1252            Source::from_factory(|| {
1253                Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1254            })
1255            .via(buffered_flow().on_error_complete())
1256            .run_collect()
1257            .unwrap(),
1258            vec![1]
1259        );
1260
1261        let materialized = Source::from_iter([1, 2, 3])
1262            .run_with(
1263                buffered_flow()
1264                    .via(Flow::identity().map(|item| item + 1))
1265                    .map_materialized_value(|_| "buffered-flow")
1266                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1267            )
1268            .unwrap();
1269        assert_eq!(materialized.0, "buffered-flow");
1270        assert_eq!(wait(materialized.1), 9);
1271
1272        let kept = Source::from_iter([1, 2, 3])
1273            .run_with(
1274                buffered_flow()
1275                    .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1276                    .to(Sink::fold(0, |acc, item| acc + item)),
1277            )
1278            .unwrap();
1279        assert_eq!(kept, "combined");
1280    }
1281
1282    #[test]
1283    fn runtime_rate_flows_compose_in_flow_form() {
1284        let conflate = Flow::identity()
1285            .conflate(|left: i32, right| left + right)
1286            .map(|item| item + 1);
1287        assert_eq!(
1288            Source::single(4).via(conflate).run_collect().unwrap(),
1289            vec![5]
1290        );
1291
1292        let batch = Flow::identity()
1293            .batch(4, |item: i32| item, |left, right| left + right)
1294            .map(|item| item + 1);
1295        assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1296
1297        let expand = Flow::identity()
1298            .expand(std::iter::once::<i32>)
1299            .map(|item| item + 1);
1300        assert_eq!(
1301            Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1302            vec![1, 2, 3, 4]
1303        );
1304
1305        let aggregate = Flow::identity()
1306            .aggregate_with_boundary(
1307                Vec::<i32>::new,
1308                |mut items, item| {
1309                    items.push(item);
1310                    let ready = !items.is_empty();
1311                    (items, ready)
1312                },
1313                |items| items.into_iter().sum::<i32>(),
1314                None,
1315            )
1316            .map(|item| item + 1);
1317        assert_eq!(
1318            Source::from_iter(0..4)
1319                .via(aggregate)
1320                .run_collect()
1321                .unwrap(),
1322            vec![1, 2, 3, 4]
1323        );
1324
1325        let detached = Flow::identity().detach().map(|item: i32| item + 1);
1326        assert_eq!(
1327            Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1328            vec![1, 2, 3, 4]
1329        );
1330    }
1331
1332    #[test]
1333    fn high_use_source_flow_operators_work() {
1334        let result = Source::from_iter(0..8)
1335            .drop(1)
1336            .take(5)
1337            .filter_not(|item| item % 2 == 0)
1338            .map_concat(|item| [item, item + 10])
1339            .grouped(3)
1340            .run_collect()
1341            .unwrap();
1342
1343        assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1344    }
1345
1346    #[test]
1347    fn prefix_and_tail_emits_prefix_and_live_tail() {
1348        let mut outer = Source::from_iter(0..5)
1349            .prefix_and_tail(2)
1350            .run_collect()
1351            .unwrap();
1352        assert_eq!(outer.len(), 1);
1353        let (prefix, tail) = outer.pop().unwrap();
1354        assert_eq!(prefix, vec![0, 1]);
1355        assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1356        assert_eq!(
1357            tail.run_collect(),
1358            Err(StreamError::Failed(
1359                "substream source cannot be materialized more than once".into()
1360            ))
1361        );
1362    }
1363
1364    #[test]
1365    fn prefix_and_tail_fails_before_prefix_is_ready() {
1366        let result = Source::from_factory(|| {
1367            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1368        })
1369        .prefix_and_tail(2)
1370        .run_collect();
1371        assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1372    }
1373
1374    #[test]
1375    fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1376        let mut outer = Source::from_factory(|| {
1377            Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1378        })
1379        .prefix_and_tail(2)
1380        .run_collect()
1381        .unwrap();
1382        let (prefix, tail) = outer.pop().unwrap();
1383        assert_eq!(prefix, vec![1, 2]);
1384        assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1385    }
1386
1387    #[test]
1388    fn prefix_and_tail_accepts_non_clone_elements() {
1389        #[derive(Debug, PartialEq, Eq)]
1390        struct NonClone(u8);
1391
1392        let mut outer = Source::from_factory(|| {
1393            Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1394        })
1395        .prefix_and_tail(2)
1396        .run_collect()
1397        .unwrap();
1398        let (prefix, tail) = outer.pop().unwrap();
1399        assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1400        assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1401    }
1402
1403    #[test]
1404    fn flat_map_prefix_materializes_on_short_upstream_completion() {
1405        let values = Source::from_iter([1, 2])
1406            .flat_map_prefix(3, |prefix| {
1407                let sum = prefix.into_iter().sum::<i32>();
1408                Flow::identity().prepend(Source::single(sum))
1409            })
1410            .run_collect()
1411            .unwrap();
1412        assert_eq!(values, vec![3]);
1413    }
1414
1415    #[test]
1416    fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1417        let invoked = StdArc::new(StdAtomicBool::new(false));
1418        let invoked_for_stage = StdArc::clone(&invoked);
1419        let result = Source::from_factory(|| {
1420            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1421        })
1422        .flat_map_prefix(3, move |_prefix| {
1423            invoked_for_stage.store(true, StdOrdering::SeqCst);
1424            Flow::identity()
1425        })
1426        .run_collect();
1427        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1428        assert!(!invoked.load(StdOrdering::SeqCst));
1429    }
1430
1431    #[test]
1432    fn flat_map_concat_flattens_nested_sources_sequentially() {
1433        let values = Source::from_iter([1, 2, 3])
1434            .flat_map_concat(|item| Source::from_iter(0..item))
1435            .run_collect()
1436            .unwrap();
1437        assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1438    }
1439
1440    #[test]
1441    fn flat_map_merge_respects_breadth_bound() {
1442        let active = StdArc::new(StdAtomicUsize::new(0));
1443        let max_active = StdArc::new(StdAtomicUsize::new(0));
1444        let active_for_stage = StdArc::clone(&active);
1445        let max_for_stage = StdArc::clone(&max_active);
1446
1447        let mut values = Source::from_iter(0..6)
1448            .flat_map_merge(2, move |item| {
1449                let active = StdArc::clone(&active_for_stage);
1450                let max_active = StdArc::clone(&max_for_stage);
1451                Source::future(move || {
1452                    let active = StdArc::clone(&active);
1453                    let max_active = StdArc::clone(&max_active);
1454                    async move {
1455                        let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1456                        loop {
1457                            let seen = max_active.load(StdOrdering::SeqCst);
1458                            if now <= seen {
1459                                break;
1460                            }
1461                            if max_active
1462                                .compare_exchange(
1463                                    seen,
1464                                    now,
1465                                    StdOrdering::SeqCst,
1466                                    StdOrdering::SeqCst,
1467                                )
1468                                .is_ok()
1469                            {
1470                                break;
1471                            }
1472                        }
1473                        thread::sleep(StdDuration::from_millis(20));
1474                        active.fetch_sub(1, StdOrdering::SeqCst);
1475                        Ok(item)
1476                    }
1477                })
1478            })
1479            .run_collect()
1480            .unwrap();
1481        values.sort_unstable();
1482        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1483        assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1484    }
1485
1486    #[test]
1487    fn flat_map_merge_propagates_inner_failures() {
1488        let result = Source::from_iter([0, 1, 2])
1489            .flat_map_merge(2, |item| {
1490                if item == 1 {
1491                    Source::failed(StreamError::Failed("boom".into()))
1492                } else {
1493                    Source::single(item)
1494                }
1495            })
1496            .run_collect();
1497        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1498    }
1499
1500    #[test]
1501    fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1502        let (release_tx, release_rx) = mpsc::channel();
1503        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1504        let queue = Source::from_factory(move || {
1505            let release_rx = StdArc::clone(&release_rx);
1506            let mut step = 0_u8;
1507            Box::new(std::iter::from_fn(move || {
1508                let item = match step {
1509                    0 => Some(Ok(0)),
1510                    1 => {
1511                        release_rx
1512                            .lock()
1513                            .unwrap()
1514                            .as_ref()
1515                            .expect("release receiver available")
1516                            .recv_timeout(StdDuration::from_secs(1))
1517                            .expect("timed out waiting to release second upstream element");
1518                        Some(Ok(1))
1519                    }
1520                    _ => None,
1521                };
1522                step += 1;
1523                item
1524            }))
1525        })
1526        .flat_map_merge(2, |item| Source::single(item + 10))
1527        .run_with(Sink::queue())
1528        .unwrap();
1529
1530        assert_eq!(queue.pull().unwrap(), Some(10));
1531        release_tx.send(()).unwrap();
1532        assert_eq!(queue.pull().unwrap(), Some(11));
1533        assert!(queue.pull().unwrap().is_none());
1534    }
1535
1536    #[test]
1537    fn group_by_routes_keys_and_drops_closed_keys() {
1538        let outer = Source::from_iter([0, 1, 2, 3, 4])
1539            .group_by(4, |item| item % 2, false)
1540            .run_with(Sink::queue())
1541            .unwrap();
1542
1543        let even = outer.pull().unwrap().unwrap();
1544        let even_completion = even.run_with(Sink::ignore()).unwrap();
1545        let odd = outer.pull().unwrap().unwrap();
1546        drop(even_completion);
1547
1548        assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1549        assert!(outer.pull().unwrap().is_none());
1550    }
1551
1552    #[test]
1553    fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1554        let outer = Source::from_iter([0, 1, 2])
1555            .group_by(2, |item| *item, false)
1556            .run_with(Sink::queue())
1557            .unwrap();
1558
1559        let _ = outer.pull().unwrap().unwrap();
1560        let _ = outer.pull().unwrap().unwrap();
1561        assert!(matches!(
1562            outer.pull(),
1563            Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1564        ));
1565    }
1566
1567    #[test]
1568    fn group_by_can_recreate_closed_substreams_when_enabled() {
1569        let (release_tx, release_rx) = mpsc::channel();
1570        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1571        let outer = Source::from_factory(move || {
1572            let release_rx = StdArc::clone(&release_rx);
1573            let mut step = 0_u8;
1574            Box::new(std::iter::from_fn(move || {
1575                let item = match step {
1576                    0 => Some(Ok(0)),
1577                    1 => Some(Ok(1)),
1578                    2 => {
1579                        release_rx
1580                            .lock()
1581                            .unwrap()
1582                            .as_ref()
1583                            .expect("release receiver available")
1584                            .recv_timeout(StdDuration::from_secs(1))
1585                            .expect("timed out waiting to release recreated key");
1586                        Some(Ok(0))
1587                    }
1588                    _ => None,
1589                };
1590                step += 1;
1591                item
1592            }))
1593        })
1594        .group_by(4, |item| item % 2, true)
1595        .run_with(Sink::queue())
1596        .unwrap();
1597
1598        let even = outer.pull().unwrap().unwrap();
1599        assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1600        release_tx.send(()).unwrap();
1601
1602        let odd = outer.pull().unwrap().unwrap();
1603        assert_eq!(odd.run_collect().unwrap(), vec![1]);
1604
1605        let recreated_even = outer.pull().unwrap().unwrap();
1606        assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1607        assert!(outer.pull().unwrap().is_none());
1608    }
1609
1610    #[test]
1611    fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1612        let outer = Source::from_iter([0, 1])
1613            .group_by(
1614                4,
1615                |item| {
1616                    assert_ne!(*item, 1, "boom");
1617                    item % 2
1618                },
1619                false,
1620            )
1621            .run_with(Sink::queue())
1622            .unwrap();
1623
1624        let substream = outer.pull().unwrap().unwrap();
1625        let (result_tx, result_rx) = mpsc::channel();
1626        thread::spawn(move || {
1627            let _ = result_tx.send(substream.run_collect());
1628        });
1629
1630        assert_eq!(
1631            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1632            Err(StreamError::AbruptTermination)
1633        );
1634        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1635    }
1636
1637    #[test]
1638    fn split_when_starts_new_substream_on_boundary_element() {
1639        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1640            .split_when(|item| *item == 0)
1641            .run_with(Sink::queue())
1642            .unwrap();
1643
1644        let first = outer.pull().unwrap().unwrap();
1645        assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1646        let second = outer.pull().unwrap().unwrap();
1647        assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1648        let third = outer.pull().unwrap().unwrap();
1649        assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1650        assert!(outer.pull().unwrap().is_none());
1651    }
1652
1653    #[test]
1654    fn split_after_ends_current_substream_on_boundary_element() {
1655        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1656            .split_after(|item| *item == 0)
1657            .run_with(Sink::queue())
1658            .unwrap();
1659
1660        let first = outer.pull().unwrap().unwrap();
1661        assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1662        let second = outer.pull().unwrap().unwrap();
1663        assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1664        let third = outer.pull().unwrap().unwrap();
1665        assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1666        assert!(outer.pull().unwrap().is_none());
1667    }
1668
1669    #[test]
1670    fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1671        let outer = Source::from_iter([1, 2])
1672            .split_when(|item| {
1673                assert_ne!(*item, 2, "boom");
1674                false
1675            })
1676            .run_with(Sink::queue())
1677            .unwrap();
1678
1679        let substream = outer.pull().unwrap().unwrap();
1680        let (result_tx, result_rx) = mpsc::channel();
1681        thread::spawn(move || {
1682            let _ = result_tx.send(substream.run_collect());
1683        });
1684
1685        assert_eq!(
1686            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1687            Err(StreamError::AbruptTermination)
1688        );
1689        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1690    }
1691
1692    #[test]
1693    fn split_when_pre_buffer_segments_match_expected_count() {
1694        let outer = Source::from_iter(0..100)
1695            .split_when(|item| *item != 0 && *item % 10 == 0)
1696            .run_with(Sink::queue())
1697            .unwrap();
1698        let mut segment_count = 0;
1699        while let Some(substream) = outer.pull().unwrap() {
1700            let items: Vec<i32> = substream.run_collect().unwrap();
1701            assert!(!items.is_empty(), "segment should not be empty");
1702            segment_count += 1;
1703        }
1704        assert_eq!(segment_count, 10, "100 elements in segments of 10");
1705    }
1706
1707    #[test]
1708    fn split_after_pre_buffer_segments_match_expected_count() {
1709        let outer = Source::from_iter(0..100)
1710            .split_after(|item| (*item + 1) % 10 == 0)
1711            .run_with(Sink::queue())
1712            .unwrap();
1713        let mut segment_count = 0;
1714        let mut total = 0_i32;
1715        while let Some(substream) = outer.pull().unwrap() {
1716            let items: Vec<i32> = substream.run_collect().unwrap();
1717            assert!(!items.is_empty(), "segment should not be empty");
1718            total += items.len() as i32;
1719            segment_count += 1;
1720        }
1721        assert_eq!(segment_count, 10);
1722        assert_eq!(total, 100);
1723    }
1724
1725    #[test]
1726    fn group_by_single_key_fused_matches_general_path() {
1727        let outer = Source::from_iter(0..1000i64)
1728            .group_by(1, |_| 0u8, false)
1729            .run_with(Sink::queue())
1730            .unwrap();
1731        let substream = outer.pull().unwrap().unwrap();
1732        let items: Vec<i64> = substream.run_collect().unwrap();
1733        assert_eq!(items.len(), 1000);
1734        assert_eq!(items[0], 0);
1735        assert_eq!(items[999], 999);
1736        assert!(outer.pull().unwrap().is_none());
1737    }
1738
1739    #[test]
1740    fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
1741        let outer = Source::from_iter([0, 1, 0])
1742            .group_by(2, |item| *item, false)
1743            .run_with(Sink::queue())
1744            .unwrap();
1745        let mut sources = vec![];
1746        while let Some(source) = outer.pull().unwrap() {
1747            sources.push(source);
1748        }
1749        assert_eq!(sources.len(), 2);
1750        assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
1751        assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
1752    }
1753
1754    #[test]
1755    fn flat_map_merge_lock_lighter_matches_expected_count() {
1756        let items = Source::from_iter(0..20)
1757            .flat_map_merge(2, |item| Source::single(item + 100))
1758            .run_with(Sink::queue())
1759            .unwrap();
1760        let mut count = 0;
1761        while items.pull().unwrap().is_some() {
1762            count += 1;
1763        }
1764        assert_eq!(count, 20);
1765    }
1766
1767    // Verify that group_by emits the substream BEFORE upstream completes —
1768    // i.e., the worker does not buffer all elements before delivery.
1769    //
1770    // The test uses a rendezvous channel to synchronize: the group_by worker
1771    // will block waiting for more items (the channel is empty after item 0),
1772    // so if the outer substream is NOT emitted before item 1 arrives, the
1773    // `outer.pull()` call in a separate thread cannot return during that window.
1774    // With the old "single_key_buf" batching, pull() would block until a
1775    // key-change or EOF — which is a liveness bug for infinite or slow sources.
1776    #[test]
1777    fn group_by_single_key_emits_substream_before_upstream_completes() {
1778        // sync_channel(0): rendezvous — sender blocks until receiver is ready,
1779        // guaranteeing the worker processes exactly one item then waits.
1780        let (tx, rx) = mpsc::sync_channel::<i32>(0);
1781        let rx = StdArc::new(std::sync::Mutex::new(rx));
1782
1783        let outer = Source::from_factory({
1784            let rx = StdArc::clone(&rx);
1785            move || {
1786                let rx = StdArc::clone(&rx);
1787                Box::new(std::iter::from_fn(move || {
1788                    rx.lock().unwrap().recv().ok().map(Ok)
1789                })) as BoxStream<i32>
1790            }
1791        })
1792        .group_by(1, |_| 0u8, false)
1793        .run_with(Sink::queue())
1794        .unwrap();
1795
1796        // Kick off the producer: sends item 0, then waits for the outer
1797        // substream to be pulled (via a second channel) before sending more.
1798        let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
1799        let outer_thread = thread::spawn(move || {
1800            let substream = outer.pull().unwrap().expect("expected a substream");
1801            sub_tx.send(substream).unwrap();
1802        });
1803
1804        // Send the first item — the worker processes it and (with the fix)
1805        // immediately emits the substream to outer.
1806        tx.send(0).unwrap();
1807
1808        // The outer pull must complete within the timeout.
1809        let substream = sub_rx
1810            .recv_timeout(StdDuration::from_secs(5))
1811            .expect("timed out — group_by buffered first element before emitting substream");
1812
1813        // Now deliver the remaining items and close.
1814        for i in 1..100_i32 {
1815            tx.send(i).unwrap();
1816        }
1817        drop(tx);
1818
1819        let items: Vec<i32> = substream.run_collect().unwrap();
1820        assert_eq!(items.len(), 100);
1821        outer_thread.join().unwrap();
1822    }
1823
1824    #[test]
1825    fn group_by_concurrent_live_substreams_do_not_hold_ready_item_stress() {
1826        const STREAMS: usize = 32;
1827        const ROUNDS: usize = 8;
1828        const ITEMS: i64 = 8;
1829
1830        for _ in 0..ROUNDS {
1831            let barrier = StdArc::new(std::sync::Barrier::new(STREAMS));
1832            let mut handles = Vec::with_capacity(STREAMS);
1833
1834            for _ in 0..STREAMS {
1835                let barrier = StdArc::clone(&barrier);
1836                handles.push(thread::spawn(move || {
1837                    let (tx, rx) = mpsc::sync_channel::<i64>(0);
1838                    let rx = StdArc::new(std::sync::Mutex::new(rx));
1839
1840                    let outer = Source::from_factory({
1841                        let rx = StdArc::clone(&rx);
1842                        move || {
1843                            let rx = StdArc::clone(&rx);
1844                            Box::new(std::iter::from_fn(move || {
1845                                rx.lock().unwrap().recv().ok().map(Ok)
1846                            })) as BoxStream<i64>
1847                        }
1848                    })
1849                    .group_by(1, |_| 0_u8, false)
1850                    .run_with(Sink::queue())
1851                    .unwrap();
1852
1853                    barrier.wait();
1854
1855                    tx.send(0).unwrap();
1856                    let substream = outer.pull().unwrap().expect("expected group_by substream");
1857                    let subqueue = substream.run_with(Sink::queue()).unwrap();
1858                    assert_eq!(subqueue.pull().unwrap(), Some(0));
1859
1860                    for item in 1..ITEMS {
1861                        tx.send(item).unwrap();
1862                        assert_eq!(subqueue.pull().unwrap(), Some(item));
1863                    }
1864                    drop(tx);
1865
1866                    assert!(subqueue.pull().unwrap().is_none());
1867                    assert!(outer.pull().unwrap().is_none());
1868                }));
1869            }
1870
1871            for handle in handles {
1872                handle.join().expect("group_by stress worker panicked");
1873            }
1874        }
1875    }
1876
1877    // Verify that split_when emits each substream as a live stream, NOT after
1878    // accumulating all segment elements into memory.  The channel is sized to
1879    // be larger than LIVE_SUBSTREAM_CAPACITY (256) so that, if the old Vec
1880    // pre-buffering path were active, the first substream would never be emitted
1881    // before the boundary.  With live substreams the substream is visible
1882    // immediately upon the first item.
1883    #[test]
1884    fn split_when_emits_substream_before_segment_ends() {
1885        // 512 > LIVE_SUBSTREAM_CAPACITY (256): the live substream will block
1886        // on backpressure mid-segment, but that's fine — the outer substream
1887        // IS emitted immediately and the consumer can drain it concurrently.
1888        const SEGMENT_LEN: usize = 300;
1889
1890        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
1891        for i in 0..SEGMENT_LEN as i32 {
1892            tx.send(i).unwrap();
1893        }
1894        tx.send(-1).unwrap(); // boundary
1895        tx.send(99).unwrap(); // second segment
1896        drop(tx);
1897
1898        let outer = Source::from_iter(rx)
1899            .split_when(|item| *item == -1)
1900            .run_with(Sink::queue())
1901            .unwrap();
1902
1903        let (result_tx, result_rx) = mpsc::channel();
1904        thread::spawn(move || {
1905            let first = outer.pull().unwrap().expect("expected first substream");
1906            let items: Vec<i32> = first.run_collect().unwrap();
1907            let second = outer.pull().unwrap().expect("expected second substream");
1908            let items2: Vec<i32> = second.run_collect().unwrap();
1909            let done = outer.pull().unwrap().is_none();
1910            let _ = result_tx.send((items, items2, done));
1911        });
1912
1913        let (items, items2, done) = result_rx
1914            .recv_timeout(StdDuration::from_secs(5))
1915            .expect("timed out — split_when is buffering the whole segment");
1916        assert_eq!(items.len(), SEGMENT_LEN);
1917        assert_eq!(items2, vec![-1, 99]);
1918        assert!(done);
1919    }
1920
1921    #[test]
1922    fn split_after_emits_substream_before_segment_ends() {
1923        const SEGMENT_LEN: usize = 300;
1924
1925        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
1926        for i in 0..SEGMENT_LEN as i32 {
1927            tx.send(i).unwrap();
1928        }
1929        tx.send(-1).unwrap(); // boundary (included in first segment for split_after)
1930        tx.send(99).unwrap();
1931        drop(tx);
1932
1933        let outer = Source::from_iter(rx)
1934            .split_after(|item| *item == -1)
1935            .run_with(Sink::queue())
1936            .unwrap();
1937
1938        let (result_tx, result_rx) = mpsc::channel();
1939        thread::spawn(move || {
1940            let first = outer.pull().unwrap().expect("expected first substream");
1941            let items: Vec<i32> = first.run_collect().unwrap();
1942            let second = outer.pull().unwrap().expect("expected second substream");
1943            let items2: Vec<i32> = second.run_collect().unwrap();
1944            let done = outer.pull().unwrap().is_none();
1945            let _ = result_tx.send((items, items2, done));
1946        });
1947
1948        let (items, items2, done) = result_rx
1949            .recv_timeout(StdDuration::from_secs(5))
1950            .expect("timed out — split_after is buffering the whole segment");
1951        // split_after includes the boundary element in the first segment.
1952        assert_eq!(items.len(), SEGMENT_LEN + 1);
1953        assert_eq!(items2, vec![99]);
1954        assert!(done);
1955    }
1956
1957    // Stress test: run flat_map_merge 20 times with many concurrent inner
1958    // streams to confirm the fixed coordinator tail loop never hangs.
1959    // Intended to be run with --test-threads=32 to expose the race.
1960    #[test]
1961    fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
1962        for _ in 0..20 {
1963            let result = Source::from_iter(0..50_i32)
1964                .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
1965                .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
1966                .unwrap()
1967                .wait();
1968            // Each i in 0..50 contributes i + (i+1) + (i+2) = 3i + 3.
1969            // Sum = 3 * (0+1+..+49) + 3*50 = 3*1225 + 150 = 3675 + 150 = 3825.
1970            assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
1971        }
1972    }
1973
1974    // Stress test for the single-mutex flat_map_merge refactor.
1975    // 20 runs with high concurrency to verify no deadlock, lost wakeup,
1976    // or window-accounting corruption under the merged-lock design.
1977    // Intended to be run with --test-threads=32.
1978    #[test]
1979    fn flat_map_merge_single_mutex_race_stress() {
1980        for _ in 0..20 {
1981            let result = Source::from_iter(0..100_i64)
1982                .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
1983                .run_with(Sink::fold(0i64, |acc, v| acc + v))
1984                .unwrap()
1985                .wait();
1986            // sum of item + (item+1000) for item in 0..100
1987            // = sum(0..100) + sum(0..100) + 100*1000
1988            // = 4950 + 4950 + 100000 = 109900
1989            assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
1990        }
1991    }
1992
1993    // Bounded-memory test for split_when batching: verify that items are
1994    // delivered to a slow consumer and not held indefinitely in the writer
1995    // buffer.  Uses a rendezvous-style channel where the producer sends MANY
1996    // items into the outer queue (larger than LIVE_SUBSTREAM_BATCH = 64), then
1997    // blocks.  The consumer must be able to drain the first segment fully — i.e.
1998    // the writer must have flushed even when the batch was smaller than 64.
1999    #[test]
2000    fn split_when_bounded_memory_rendezvous() {
2001        // Segment of 100 items (> LIVE_SUBSTREAM_BATCH=64): the writer will
2002        // flush at 64, then again at the boundary (remaining 36).
2003        const SEGMENT: usize = 100;
2004        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
2005        for i in 0..SEGMENT as i32 {
2006            tx.send(i).unwrap();
2007        }
2008        tx.send(-1).unwrap(); // boundary (split_when: starts new segment)
2009        // Second segment
2010        for i in 0..10_i32 {
2011            tx.send(i).unwrap();
2012        }
2013        drop(tx);
2014
2015        let outer = Source::from_iter(rx)
2016            .split_when(|item| *item == -1)
2017            .run_with(Sink::queue())
2018            .unwrap();
2019
2020        let (result_tx, result_rx) = mpsc::channel();
2021        thread::spawn(move || {
2022            let first = outer.pull().unwrap().expect("first segment");
2023            let seg1: Vec<i32> = first.run_collect().unwrap();
2024            let second = outer.pull().unwrap().expect("second segment");
2025            let seg2: Vec<i32> = second.run_collect().unwrap();
2026            let done = outer.pull().unwrap().is_none();
2027            result_tx.send((seg1, seg2, done)).unwrap();
2028        });
2029
2030        let (seg1, seg2, done) = result_rx
2031            .recv_timeout(StdDuration::from_secs(5))
2032            .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
2033        assert_eq!(seg1.len(), SEGMENT, "first segment length");
2034        assert_eq!(seg2[0], -1, "boundary element starts second segment");
2035        assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
2036        assert!(done);
2037    }
2038
2039    // Bounded-memory test for group_by batching: verifies that a slow consumer
2040    // waiting on a single-key substream eventually receives all items (i.e. the
2041    // write batch is flushed, not held indefinitely).
2042    #[test]
2043    fn group_by_single_key_bounded_memory_rendezvous() {
2044        // 200 items, all same key, batched in groups of 64 by the writer.
2045        // The consumer blocks on run_collect(); all items must arrive.
2046        const N: usize = 200;
2047        let outer = Source::from_iter(0..N as i64)
2048            .group_by(1, |_| 0u8, false)
2049            .run_with(Sink::queue())
2050            .unwrap();
2051
2052        let (result_tx, result_rx) = mpsc::channel();
2053        thread::spawn(move || {
2054            let substream = outer.pull().unwrap().expect("substream");
2055            let items: Vec<i64> = substream.run_collect().unwrap();
2056            let done = outer.pull().unwrap().is_none();
2057            result_tx.send((items, done)).unwrap();
2058        });
2059
2060        let (items, done) = result_rx
2061            .recv_timeout(StdDuration::from_secs(5))
2062            .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
2063        assert_eq!(items.len(), N, "all items delivered");
2064        assert_eq!(items[0], 0);
2065        assert_eq!(items[N - 1], (N - 1) as i64);
2066        assert!(done);
2067    }
2068
2069    #[test]
2070    fn scan_emits_seed_and_accumulated_values() {
2071        let result = Source::from_iter(1..=3)
2072            .scan(0, |acc, item| acc + item)
2073            .run_collect()
2074            .unwrap();
2075
2076        assert_eq!(result, vec![0, 1, 3, 6]);
2077    }
2078
2079    #[test]
2080    fn limit_fails_after_max_elements() {
2081        let result = Source::from_iter(0..3).limit(2).run_collect();
2082
2083        assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
2084    }
2085
2086    #[test]
2087    fn limit_weighted_fails_with_limit_error_like_akka() {
2088        let result = Source::from_iter(["this", "is", "some", "string"])
2089            .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
2090            .run_collect();
2091
2092        assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
2093    }
2094
2095    #[test]
2096    fn grouped_weighted_allows_oversized_first_element_like_akka() {
2097        let result = Source::from_iter([10_usize, 1, 2])
2098            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2099            .run_collect()
2100            .unwrap();
2101
2102        assert_eq!(result, vec![vec![10], vec![1, 2]]);
2103    }
2104
2105    #[test]
2106    fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2107        let result = Source::from_iter([1_usize, 10, 2])
2108            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2109            .run_collect()
2110            .unwrap();
2111
2112        assert_eq!(result, vec![vec![1, 10], vec![2]]);
2113    }
2114
2115    #[test]
2116    fn sink_terminals_materialize_results() {
2117        let sum = Source::from_iter(1..=4)
2118            .run_with(Sink::fold(0, |acc, item| acc + item))
2119            .unwrap();
2120
2121        assert_eq!(wait(sum), 10);
2122        assert_eq!(
2123            wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2124            1
2125        );
2126        assert_eq!(
2127            wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2128            4
2129        );
2130    }
2131
2132    #[test]
2133    fn all_terminal_sink_variants_complete() {
2134        assert_eq!(
2135            wait(
2136                Source::from_iter([1, 2, 3])
2137                    .run_with(Sink::collect())
2138                    .unwrap()
2139            ),
2140            vec![1, 2, 3]
2141        );
2142        assert_eq!(
2143            wait(
2144                Source::<i32>::empty()
2145                    .run_with(Sink::head_option())
2146                    .unwrap()
2147            ),
2148            None
2149        );
2150        assert_eq!(
2151            wait(
2152                Source::from_iter([1, 2, 3])
2153                    .run_with(Sink::last_option())
2154                    .unwrap()
2155            ),
2156            Some(3)
2157        );
2158        assert_eq!(
2159            wait(
2160                Source::from_iter([1, 2, 3])
2161                    .run_with(Sink::reduce(|acc, item| acc + item))
2162                    .unwrap()
2163            ),
2164            6
2165        );
2166
2167        let seen = StdArc::new(StdAtomicUsize::new(0));
2168        let seen_by_sink = StdArc::clone(&seen);
2169        assert_eq!(
2170            wait(
2171                Source::from_iter([1_usize, 2, 3])
2172                    .run_with(Sink::foreach(move |item| {
2173                        seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2174                    }))
2175                    .unwrap()
2176            ),
2177            NotUsed
2178        );
2179        assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2180    }
2181
2182    #[test]
2183    fn take_last_zero_returns_empty_vector() {
2184        let result = Source::from_iter([1, 2, 3])
2185            .run_with(Sink::take_last(0))
2186            .unwrap();
2187
2188        assert_eq!(wait(result), Vec::<i32>::new());
2189    }
2190
2191    #[test]
2192    fn bounded_head_terminals_complete_inline() {
2193        let materializer = Materializer::new();
2194
2195        let mut head = Source::from_iter(0_u64..1_000)
2196            .run_with_materializer(Sink::head(), &materializer)
2197            .unwrap();
2198        assert_eq!(materializer.active_streams(), 0);
2199        assert_eq!(head.try_wait(), Some(Ok(0)));
2200
2201        let mut filtered_head = Source::from_iter(0_u64..1_000)
2202            .filter(|item| *item >= 10)
2203            .run_with_materializer(Sink::head(), &materializer)
2204            .unwrap();
2205        assert_eq!(materializer.active_streams(), 0);
2206        assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2207
2208        let mut head_option = Source::<u64>::empty()
2209            .run_with_materializer(Sink::head_option(), &materializer)
2210            .unwrap();
2211        assert_eq!(materializer.active_streams(), 0);
2212        assert_eq!(head_option.try_wait(), Some(Ok(None)));
2213    }
2214
2215    #[test]
2216    fn bounded_head_fast_path_preserves_terminal_errors() {
2217        let materializer = Materializer::new();
2218
2219        let mut empty = Source::<u64>::empty()
2220            .run_with_materializer(Sink::head(), &materializer)
2221            .unwrap();
2222        assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2223
2224        let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2225            .run_with_materializer(Sink::head(), &materializer)
2226            .unwrap();
2227        assert_eq!(
2228            failed.try_wait(),
2229            Some(Err(StreamError::Failed("boom".into())))
2230        );
2231        assert_eq!(materializer.active_streams(), 0);
2232    }
2233
2234    #[test]
2235    fn runnable_graph_composes_source_and_sink() {
2236        let graph = Source::from_iter(1..=4)
2237            .map(|item| item * 2)
2238            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2239
2240        assert_eq!(wait(graph.run().unwrap()), 20);
2241
2242        let graph = Source::single(1)
2243            .map_materialized_value(|_| 20)
2244            .to(Sink::ignore())
2245            .map_materialized_value(|value| value + 1);
2246        assert_eq!(graph.run().unwrap(), 21);
2247
2248        let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2249        assert_eq!(ignored, NotUsed);
2250    }
2251
2252    #[test]
2253    fn materialized_values_follow_keep_defaults() {
2254        let source = Source::single(1).map_materialized_value(|_| "source");
2255        let flow = Flow::identity().map_materialized_value(|_| "flow");
2256
2257        let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2258        assert_eq!(source_mat.unwrap(), "source");
2259
2260        let combined = source
2261            .via_mat(flow, Keep::both)
2262            .to_mat(Sink::ignore(), Keep::both)
2263            .run()
2264            .unwrap();
2265        assert_eq!(combined.0, ("source", "flow"));
2266        assert_eq!(wait(combined.1), NotUsed);
2267
2268        let sink_mat = Source::single(41)
2269            .map_materialized_value(|_| "ignored source")
2270            .run_with(Sink::fold(1, |acc, item| acc + item))
2271            .unwrap();
2272        assert_eq!(wait(sink_mat), 42);
2273    }
2274
2275    #[test]
2276    fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2277        let sink = Flow::identity()
2278            .map(|item: i32| item + 1)
2279            .map_materialized_value(|_| "flow")
2280            .to(Sink::fold(0, |acc, item| acc + item));
2281
2282        let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2283
2284        assert_eq!(materialized, "flow");
2285        let explicit = Flow::identity()
2286            .map(|item: i32| item + 1)
2287            .map_materialized_value(|_| "flow")
2288            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2289            .run_with(Source::from_iter([1, 2, 3]))
2290            .unwrap();
2291        assert_eq!(explicit, NotUsed);
2292
2293        let explicit = Source::from_iter([1, 2, 3])
2294            .run_with(
2295                Flow::identity()
2296                    .map(|item: i32| item + 1)
2297                    .map_materialized_value(|_| "flow")
2298                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2299            )
2300            .unwrap();
2301        assert_eq!(explicit.0, "flow");
2302        assert_eq!(wait(explicit.1), 9);
2303    }
2304
2305    #[test]
2306    fn materializer_shutdown_fails_materialization() {
2307        let materializer = Materializer::new();
2308        let named = materializer.with_name_prefix("test-stream");
2309        materializer.shutdown();
2310
2311        let graph = Source::single(1).to(Sink::ignore());
2312
2313        assert_eq!(named.name_prefix(), "test-stream");
2314        assert_eq!(
2315            graph.run_with_materializer(&named),
2316            Err(StreamError::AbruptTermination)
2317        );
2318    }
2319
2320    #[test]
2321    fn materializer_shutdown_fails_running_stream_completion() {
2322        let materializer = Materializer::new();
2323        let completion = Source::repeat(1)
2324            .run_with_materializer(Sink::ignore(), &materializer)
2325            .unwrap();
2326
2327        assert_eq!(materializer.active_streams(), 1);
2328        materializer.shutdown();
2329        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2330        assert_eq!(materializer.active_streams(), 0);
2331    }
2332
2333    #[test]
2334    fn dropped_stream_completion_cancels_running_stream() {
2335        let materializer = Materializer::new();
2336        let completion = Source::repeat(1)
2337            .run_with_materializer(Sink::ignore(), &materializer)
2338            .unwrap();
2339
2340        assert_eq!(materializer.active_streams(), 1);
2341        drop(completion);
2342        for _ in 0..50 {
2343            if materializer.active_streams() == 0 {
2344                break;
2345            }
2346            thread::sleep(Duration::from_millis(5));
2347        }
2348        assert_eq!(materializer.active_streams(), 0);
2349    }
2350
2351    #[test]
2352    fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2353        let materializer = Materializer::new();
2354        let (once_tx, once_rx) = mpsc::channel();
2355        let once = materializer.schedule_once(Duration::from_millis(5), move || {
2356            once_tx.send(()).unwrap();
2357        });
2358        once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2359        assert!(!once.is_cancelled());
2360
2361        let (cancelled_tx, cancelled_rx) = mpsc::channel();
2362        let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2363            cancelled_tx.send(()).unwrap();
2364        });
2365        assert!(cancelled.cancel());
2366        assert!(!cancelled.cancel());
2367        assert!(cancelled.is_cancelled());
2368        assert!(
2369            cancelled_rx
2370                .recv_timeout(Duration::from_millis(75))
2371                .is_err()
2372        );
2373
2374        let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2375        let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2376        let fixed_delay = materializer.schedule_with_fixed_delay(
2377            Duration::from_millis(1),
2378            Duration::from_millis(5),
2379            move || {
2380                fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2381            },
2382        );
2383        thread::sleep(Duration::from_millis(25));
2384        assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2385        fixed_delay.cancel();
2386
2387        let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2388        let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2389        let fixed_rate = materializer.schedule_at_fixed_rate(
2390            Duration::from_millis(1),
2391            Duration::from_millis(5),
2392            move || {
2393                fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2394            },
2395        );
2396        thread::sleep(Duration::from_millis(25));
2397        assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2398        fixed_rate.cancel();
2399
2400        let shutdown_materializer = Materializer::new();
2401        let (shutdown_tx, shutdown_rx) = mpsc::channel();
2402        shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2403            shutdown_tx.send(()).unwrap();
2404        });
2405        shutdown_materializer.shutdown();
2406        assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2407    }
2408
2409    #[test]
2410    fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2411        use std::sync::{Condvar, Mutex};
2412
2413        #[derive(Debug)]
2414        enum TimerEvent {
2415            Started(usize, Instant),
2416            Completed(usize, Instant),
2417        }
2418
2419        let recv_event = |rx: &mpsc::Receiver<TimerEvent>, label: &str| {
2420            rx.recv_timeout(Duration::from_secs(20))
2421                .unwrap_or_else(|err| panic!("{label}: expected timer event within 20 s: {err}"))
2422        };
2423        let release = |gate: &StdArc<(Mutex<bool>, Condvar)>| {
2424            let (released, condvar) = &**gate;
2425            let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2426            *released = true;
2427            condvar.notify_all();
2428        };
2429
2430        let interval = Duration::from_secs(2);
2431        let overrun = interval + Duration::from_millis(250);
2432
2433        let rate_materializer = Materializer::new();
2434        let (rate_tx, rate_rx) = mpsc::channel();
2435        let rate_runs = StdArc::new(StdAtomicUsize::new(0));
2436        let rate_task_runs = StdArc::clone(&rate_runs);
2437        let rate_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2438        let rate_task_gate = StdArc::clone(&rate_gate);
2439        let fixed_rate =
2440            rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2441                let run = rate_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2442                rate_tx
2443                    .send(TimerEvent::Started(run, Instant::now()))
2444                    .unwrap();
2445                if run == 1 {
2446                    let (released, condvar) = &*rate_task_gate;
2447                    let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2448                    while !*released {
2449                        released = condvar
2450                            .wait(released)
2451                            .unwrap_or_else(|poison| poison.into_inner());
2452                    }
2453                    rate_tx
2454                        .send(TimerEvent::Completed(run, Instant::now()))
2455                        .unwrap();
2456                }
2457            });
2458        let rate_first_started = match recv_event(&rate_rx, "fixed-rate first task") {
2459            TimerEvent::Started(1, at) => at,
2460            other => panic!("fixed-rate first task: unexpected event {other:?}"),
2461        };
2462        assert!(wait_until(Duration::from_secs(20), || {
2463            rate_first_started.elapsed() >= overrun
2464        }));
2465        release(&rate_gate);
2466        let rate_first_completed = match recv_event(&rate_rx, "fixed-rate first completion") {
2467            TimerEvent::Completed(1, at) => at,
2468            other => panic!("fixed-rate first completion: unexpected event {other:?}"),
2469        };
2470        let rate_second_started = match recv_event(&rate_rx, "fixed-rate second task") {
2471            TimerEvent::Started(2, at) => at,
2472            other => panic!("fixed-rate second task: unexpected event {other:?}"),
2473        };
2474        fixed_rate.cancel();
2475        rate_materializer.shutdown();
2476
2477        let delay_materializer = Materializer::new();
2478        let (delay_tx, delay_rx) = mpsc::channel();
2479        let delay_runs = StdArc::new(StdAtomicUsize::new(0));
2480        let delay_task_runs = StdArc::clone(&delay_runs);
2481        let delay_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2482        let delay_task_gate = StdArc::clone(&delay_gate);
2483        let fixed_delay =
2484            delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2485                let run = delay_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2486                delay_tx
2487                    .send(TimerEvent::Started(run, Instant::now()))
2488                    .unwrap();
2489                if run == 1 {
2490                    let (released, condvar) = &*delay_task_gate;
2491                    let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2492                    while !*released {
2493                        released = condvar
2494                            .wait(released)
2495                            .unwrap_or_else(|poison| poison.into_inner());
2496                    }
2497                    delay_tx
2498                        .send(TimerEvent::Completed(run, Instant::now()))
2499                        .unwrap();
2500                }
2501            });
2502        let delay_first_started = match recv_event(&delay_rx, "fixed-delay first task") {
2503            TimerEvent::Started(1, at) => at,
2504            other => panic!("fixed-delay first task: unexpected event {other:?}"),
2505        };
2506        assert!(wait_until(Duration::from_secs(20), || {
2507            delay_first_started.elapsed() >= overrun
2508        }));
2509        release(&delay_gate);
2510        let delay_first_completed = match recv_event(&delay_rx, "fixed-delay first completion") {
2511            TimerEvent::Completed(1, at) => at,
2512            other => panic!("fixed-delay first completion: unexpected event {other:?}"),
2513        };
2514        let delay_second_started = match recv_event(&delay_rx, "fixed-delay second task") {
2515            TimerEvent::Started(2, at) => at,
2516            other => panic!("fixed-delay second task: unexpected event {other:?}"),
2517        };
2518        fixed_delay.cancel();
2519        delay_materializer.shutdown();
2520
2521        let rate_task_time = rate_first_completed.duration_since(rate_first_started);
2522        let rate_catch_up = rate_second_started.duration_since(rate_first_completed);
2523        let delay_task_time = delay_first_completed.duration_since(delay_first_started);
2524        let delay_gap = delay_second_started.duration_since(delay_first_completed);
2525        assert!(
2526            rate_task_time >= interval,
2527            "fixed-rate first task should overrun its interval; ran for {rate_task_time:?}"
2528        );
2529        assert!(
2530            rate_catch_up < interval,
2531            "fixed-rate second task should catch up after an overrun; waited {rate_catch_up:?}"
2532        );
2533        assert!(
2534            delay_task_time >= interval,
2535            "fixed-delay first task should overrun its interval; ran for {delay_task_time:?}"
2536        );
2537        assert!(
2538            delay_gap >= interval,
2539            "fixed-delay second task fired before one full delay elapsed after completion: {delay_gap:?}",
2540        );
2541    }
2542
2543    #[test]
2544    fn runtime_repeating_timer_cancellation_stops_future_fires() {
2545        let materializer = Materializer::new();
2546        let (tx, rx) = mpsc::channel();
2547        let timer = materializer.schedule_at_fixed_rate(
2548            Duration::from_millis(1),
2549            Duration::from_millis(30),
2550            move || {
2551                tx.send(()).unwrap();
2552            },
2553        );
2554
2555        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2556        assert!(timer.cancel());
2557        assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2558        materializer.shutdown();
2559    }
2560
2561    #[test]
2562    fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2563        let materializer = Materializer::new();
2564        materializer.schedule_once(Duration::from_millis(1), || {
2565            panic!("timer boom");
2566        });
2567
2568        let (tx, rx) = mpsc::channel();
2569        materializer.schedule_once(Duration::from_millis(20), move || {
2570            tx.send(()).unwrap();
2571        });
2572
2573        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2574        materializer.shutdown();
2575    }
2576
2577    #[test]
2578    fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2579        let materializer = Materializer::new();
2580        let panic_count = StdArc::new(StdAtomicUsize::new(0));
2581        let panic_count_task = StdArc::clone(&panic_count);
2582        materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2583            panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2584            panic!("fixed-rate boom");
2585        });
2586
2587        assert!(wait_until(Duration::from_millis(150), || {
2588            panic_count.load(StdOrdering::SeqCst) == 1
2589        }));
2590
2591        let (tx, rx) = mpsc::channel();
2592        materializer.schedule_once(Duration::from_millis(30), move || {
2593            tx.send(()).unwrap();
2594        });
2595        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2596
2597        thread::sleep(Duration::from_millis(90));
2598        assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2599        materializer.shutdown();
2600    }
2601
2602    #[test]
2603    fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2604        let materializer = Materializer::new();
2605        let started = StdArc::new(StdAtomicBool::new(false));
2606        let started_task = StdArc::clone(&started);
2607        let slow_timer = materializer.schedule_at_fixed_rate(
2608            Duration::ZERO,
2609            Duration::from_millis(250),
2610            move || {
2611                started_task.store(true, StdOrdering::SeqCst);
2612                thread::sleep(Duration::from_millis(200));
2613            },
2614        );
2615
2616        assert!(wait_until(Duration::from_millis(100), || {
2617            started.load(StdOrdering::SeqCst)
2618        }));
2619
2620        let start = Instant::now();
2621        let (tx, rx) = mpsc::channel();
2622        materializer.schedule_once(Duration::from_millis(10), move || {
2623            tx.send(Instant::now()).unwrap();
2624        });
2625        let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2626        let elapsed = fired_at.duration_since(start);
2627
2628        slow_timer.cancel();
2629        materializer.shutdown();
2630        assert!(
2631            elapsed < Duration::from_millis(150),
2632            "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2633        );
2634    }
2635
2636    #[test]
2637    fn runtime_shutdown_stops_timer_driver_thread() {
2638        let materializer = Materializer::new();
2639        assert!(wait_until(Duration::from_secs(1), || materializer
2640            .timer_driver_is_live()));
2641
2642        materializer.shutdown();
2643        assert!(wait_until(Duration::from_secs(2), || !materializer
2644            .timer_driver_is_live()));
2645    }
2646
2647    #[test]
2648    fn runtime_timer_driver_orders_many_timers_by_deadline() {
2649        let materializer = Materializer::new();
2650        let (tx, rx) = mpsc::channel();
2651        let schedule = [(450_u64, 4_u8), (50, 1), (350, 3), (150, 2), (550, 5)];
2652
2653        for (delay_ms, value) in schedule {
2654            let tx = tx.clone();
2655            materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2656                tx.send(value).unwrap();
2657            });
2658        }
2659        drop(tx);
2660
2661        let mut received = Vec::new();
2662        for _ in 0..schedule.len() {
2663            received.push(rx.recv_timeout(Duration::from_secs(10)).unwrap());
2664        }
2665        materializer.shutdown();
2666
2667        assert_eq!(received, vec![1, 2, 3, 4, 5]);
2668    }
2669
2670    #[test]
2671    fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
2672        let materializer = Materializer::new();
2673        let thread_name = materializer.timer_thread_name().to_owned();
2674        // Linux exposes thread names through /proc/self/task/*/comm with the
2675        // kernel's 15-byte task-name limit. Once a full test process has
2676        // created enough runtimes, `datum-timer-1000` is reported as
2677        // `datum-timer-100`, so compare against the Linux-visible name and use
2678        // the observed baseline for collision-safe assertions.
2679        let linux_thread_name = thread_name.chars().take(15).collect::<String>();
2680        assert!(wait_until(Duration::from_secs(5), || {
2681            materializer.timer_driver_is_live() && linux_thread_count(&linux_thread_name) >= 1
2682        }));
2683        let live_timer_threads = linux_thread_count(&linux_thread_name);
2684
2685        for _ in 0..128 {
2686            materializer.schedule_once(Duration::from_secs(60), || {});
2687        }
2688
2689        assert!(
2690            wait_until(Duration::from_secs(5), || {
2691                materializer.timer_driver_is_live()
2692                    && linux_thread_count(&linux_thread_name) == live_timer_threads
2693            }),
2694            "scheduling timers should not create extra timer threads for a runtime",
2695        );
2696        materializer.shutdown();
2697        assert!(wait_until(Duration::from_secs(5), || {
2698            !materializer.timer_driver_is_live()
2699                && linux_thread_count(&linux_thread_name) < live_timer_threads
2700        }));
2701    }
2702
2703    #[test]
2704    fn cancelled_and_never_sinks_have_distinct_materialization_results() {
2705        assert_eq!(
2706            Source::repeat(1)
2707                .run_with(Sink::cancelled())
2708                .expect("cancelled sink materializes"),
2709            NotUsed
2710        );
2711        assert_eq!(
2712            Source::single(1)
2713                .run_with(Sink::never())
2714                .expect("never sink materializes")
2715                .try_wait(),
2716            None
2717        );
2718    }
2719
2720    #[test]
2721    fn never_sink_finishes_on_materializer_shutdown() {
2722        let materializer = Materializer::new();
2723        let completion = Source::single(1)
2724            .run_with_materializer(Sink::never(), &materializer)
2725            .unwrap();
2726
2727        materializer.shutdown();
2728        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2729    }
2730
2731    #[test]
2732    fn dropping_source_never_completion_releases_parked_worker() {
2733        let materializer = Materializer::new();
2734        let completion = Source::<i32>::never()
2735            .run_with_materializer(Sink::ignore(), &materializer)
2736            .unwrap();
2737
2738        assert!(wait_until(StdDuration::from_secs(1), || {
2739            materializer.active_streams() == 1
2740        }));
2741        assert_eq!(materializer.active_streams(), 1);
2742
2743        drop(completion);
2744
2745        assert!(wait_until(StdDuration::from_secs(15), || {
2746            materializer.active_streams() == 0
2747        }));
2748        assert_eq!(materializer.active_streams(), 0);
2749    }
2750
2751    #[test]
2752    fn future_and_maybe_sources_emit_values() {
2753        let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
2754        assert_eq!(future_value, vec![7]);
2755
2756        let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
2757            .run_collect()
2758            .unwrap();
2759        assert_eq!(future_source, vec![1, 2, 3]);
2760
2761        let (handle, source) = Source::maybe();
2762        assert_eq!(
2763            source.clone().run_collect(),
2764            Err(StreamError::MaybeIncomplete)
2765        );
2766        handle.complete(9).unwrap();
2767        assert_eq!(source.run_collect().unwrap(), vec![9]);
2768    }
2769
2770    #[test]
2771    fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
2772        assert_eq!(
2773            Source::cycle(|| [1, 2, 3].into_iter())
2774                .take(8)
2775                .run_collect()
2776                .unwrap(),
2777            vec![1, 2, 3, 1, 2, 3, 1, 2]
2778        );
2779        assert_eq!(
2780            Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
2781            Err(StreamError::Failed("empty iterator".into()))
2782        );
2783        assert_eq!(
2784            Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
2785                .run_collect()
2786                .unwrap(),
2787            vec![0, 1, 2, 3]
2788        );
2789        assert_eq!(
2790            Source::unfold_async(0, |state| async move {
2791                Ok((state < 4).then_some((state + 1, state * 2)))
2792            })
2793            .run_collect()
2794            .unwrap(),
2795            vec![0, 2, 4, 6]
2796        );
2797        assert!(matches!(
2798            Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
2799            Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
2800        ));
2801    }
2802
2803    #[test]
2804    fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
2805        let created = StdArc::new(StdAtomicUsize::new(0));
2806        let created_for_source = StdArc::clone(&created);
2807        let source = Source::<i32>::lazy_source(move || {
2808            created_for_source.fetch_add(1, StdOrdering::SeqCst);
2809            Source::from_iter([7, 8]).map_materialized_value(|_| 99)
2810        });
2811        let materializer = Materializer::new();
2812        let (mut stream, mut mat) = StdArc::clone(&source.factory)
2813            .create(&materializer)
2814            .unwrap();
2815
2816        assert_eq!(created.load(StdOrdering::SeqCst), 0);
2817        assert!(mat.try_wait().is_none());
2818        assert_eq!(stream.next().unwrap().unwrap(), 7);
2819        assert_eq!(mat.wait().unwrap(), 99);
2820        assert_eq!(created.load(StdOrdering::SeqCst), 1);
2821        assert_eq!(stream.next().unwrap().unwrap(), 8);
2822
2823        let never_created = StdArc::new(StdAtomicUsize::new(0));
2824        let never_created_for_source = StdArc::clone(&never_created);
2825        let mat = Source::<i32>::lazy_future_source(move || {
2826            never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
2827            async { Ok(Source::single(1)) }
2828        })
2829        .to(Sink::cancelled())
2830        .run()
2831        .unwrap();
2832        assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
2833        assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
2834
2835        let lazy_future = StdArc::new(StdAtomicUsize::new(0));
2836        let lazy_future_for_source = StdArc::clone(&lazy_future);
2837        let source = Source::lazy_future(move || {
2838            lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
2839            async { Ok(42) }
2840        });
2841        let (mut stream, _) = StdArc::clone(&source.factory)
2842            .create(&Materializer::new())
2843            .unwrap();
2844        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
2845        assert_eq!(stream.next().unwrap().unwrap(), 42);
2846        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
2847    }
2848
2849    #[test]
2850    fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
2851        let closed = StdArc::new(StdAtomicUsize::new(0));
2852        let closed_on_complete = StdArc::clone(&closed);
2853        let values = Source::unfold_resource(
2854            || Ok(std::collections::VecDeque::from([1, 2, 3])),
2855            |items| Ok(items.pop_front()),
2856            move |_items| {
2857                closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
2858                Ok(())
2859            },
2860        )
2861        .run_collect()
2862        .unwrap();
2863        assert_eq!(values, vec![1, 2, 3]);
2864        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
2865
2866        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
2867        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
2868        let failed = Source::<i32>::unfold_resource(
2869            || Ok(()),
2870            |_| Err(StreamError::Failed("read".into())),
2871            move |_| {
2872                closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
2873                Err(StreamError::Failed("close".into()))
2874            },
2875        )
2876        .run_collect();
2877        assert_eq!(failed, Err(StreamError::Failed("read".into())));
2878        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
2879
2880        let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
2881        let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
2882        let first = Source::unfold_resource(
2883            || Ok(0_usize),
2884            |next| {
2885                let item = *next;
2886                *next += 1;
2887                Ok(Some(item))
2888            },
2889            move |_| {
2890                closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
2891                Ok(())
2892            },
2893        )
2894        .run_with(Sink::head())
2895        .unwrap();
2896        assert_eq!(first.wait().unwrap(), 0);
2897        assert!(wait_until(Duration::from_millis(250), || {
2898            closed_on_cancel.load(StdOrdering::SeqCst) == 1
2899        }));
2900    }
2901
2902    #[test]
2903    fn wp6b_async_resource_and_async_accumulators_are_sequential() {
2904        let closed = StdArc::new(StdAtomicUsize::new(0));
2905        let closed_for_close = StdArc::clone(&closed);
2906        let values = Source::unfold_resource_async(
2907            || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
2908            |items| {
2909                let item = items.pop_front();
2910                async move { Ok(item) }
2911            },
2912            move |_items| {
2913                let closed = StdArc::clone(&closed_for_close);
2914                async move {
2915                    closed.fetch_add(1, StdOrdering::SeqCst);
2916                    Ok(())
2917                }
2918            },
2919        )
2920        .run_collect()
2921        .unwrap();
2922        assert_eq!(values, vec![1, 2, 3]);
2923        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
2924
2925        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
2926        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
2927        let failed = Source::<i32>::unfold_resource_async(
2928            || async { Ok(()) },
2929            |_resource| async { Err(StreamError::Failed("read".into())) },
2930            move |_resource| {
2931                let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
2932                async move {
2933                    closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
2934                    Err(StreamError::Failed("close".into()))
2935                }
2936            },
2937        )
2938        .run_collect();
2939        assert_eq!(failed, Err(StreamError::Failed("read".into())));
2940        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
2941
2942        let active = StdArc::new(StdAtomicUsize::new(0));
2943        let max_active = StdArc::new(StdAtomicUsize::new(0));
2944        let active_for_stage = StdArc::clone(&active);
2945        let max_for_stage = StdArc::clone(&max_active);
2946        let scanned = Source::from_iter(1..=4)
2947            .scan_async(0, move |acc, item| {
2948                let active = StdArc::clone(&active_for_stage);
2949                let max_active = StdArc::clone(&max_for_stage);
2950                async move {
2951                    let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
2952                    max_active.fetch_max(now, StdOrdering::SeqCst);
2953                    tokio::time::sleep(Duration::from_millis(1)).await;
2954                    active.fetch_sub(1, StdOrdering::SeqCst);
2955                    Ok(acc + item)
2956                }
2957            })
2958            .run_collect()
2959            .unwrap();
2960        assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
2961        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
2962
2963        let folded = Source::from_iter(1..=4)
2964            .fold_async(0, |acc, item| async move { Ok(acc + item) })
2965            .run_collect()
2966            .unwrap();
2967        assert_eq!(folded, vec![10]);
2968    }
2969
2970    #[test]
2971    fn wp6b_fold_async_materialization_does_not_drain_upstream() {
2972        let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
2973        let started = StdArc::new(StdAtomicBool::new(false));
2974        let source = {
2975            let release = StdArc::clone(&release);
2976            let started = StdArc::clone(&started);
2977            Source::from_factory(move || {
2978                let release = StdArc::clone(&release);
2979                let started = StdArc::clone(&started);
2980                let mut emitted = false;
2981                Box::new(std::iter::from_fn(move || {
2982                    if emitted {
2983                        return None;
2984                    }
2985                    emitted = true;
2986                    started.store(true, StdOrdering::SeqCst);
2987                    let (released, available) = &*release;
2988                    let mut released = released.lock().unwrap();
2989                    while !*released {
2990                        released = available.wait(released).unwrap();
2991                    }
2992                    Some(Ok(1))
2993                }))
2994            })
2995        };
2996
2997        let (materialized_tx, materialized_rx) = mpsc::channel();
2998        let join = thread::spawn(move || {
2999            let queue = source
3000                .fold_async(0, |acc, item| async move { Ok(acc + item) })
3001                .run_with(Sink::queue())
3002                .unwrap();
3003            materialized_tx.send(queue).unwrap();
3004        });
3005
3006        let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
3007            Ok(queue) => queue,
3008            Err(error) => {
3009                let (released, available) = &*release;
3010                *released.lock().unwrap() = true;
3011                available.notify_all();
3012                let _ = join.join();
3013                panic!("fold_async materialization did not return before first pull: {error}");
3014            }
3015        };
3016        let (released, _) = &*release;
3017        assert!(
3018            !*released.lock().unwrap(),
3019            "test source was released before materialization returned"
3020        );
3021
3022        let (released, available) = &*release;
3023        *released.lock().unwrap() = true;
3024        available.notify_all();
3025        assert_eq!(queue.pull().unwrap(), Some(1));
3026        assert_eq!(queue.pull().unwrap(), None);
3027        join.join().unwrap();
3028        assert!(started.load(StdOrdering::SeqCst));
3029    }
3030
3031    #[test]
3032    fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
3033        let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
3034        let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
3035        let empty_sink = Source::<i32>::empty()
3036            .run_with(Sink::lazy_sink(move || {
3037                lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3038                Sink::ignore()
3039            }))
3040            .unwrap();
3041        assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
3042        assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
3043
3044        let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
3045        let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
3046        Source::from_iter([1_usize, 2, 3])
3047            .run_with(Sink::foreach_async(2, move |item| {
3048                let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
3049                async move {
3050                    foreach_sum.fetch_add(item, StdOrdering::SeqCst);
3051                    Ok(())
3052                }
3053            }))
3054            .unwrap()
3055            .wait()
3056            .unwrap();
3057        assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
3058
3059        let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
3060        let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
3061        let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
3062            lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3063            Flow::identity()
3064                .map(|item: i32| item + 10)
3065                .map_materialized_value(|_| 123)
3066        });
3067        let mat = (lazy_flow.materialize)().unwrap();
3068        let mut stream = match lazy_flow.transform {
3069            flow::FlowTransform::Runtime(transform) => {
3070                transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
3071            }
3072            flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
3073        };
3074        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
3075        assert_eq!(stream.next().unwrap().unwrap(), 11);
3076        assert_eq!(mat.wait().unwrap(), 123);
3077        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
3078        assert_eq!(stream.next().unwrap().unwrap(), 12);
3079
3080        let future_flow = Source::from_iter([1, 2])
3081            .via_mat(
3082                Flow::future_flow(|| async {
3083                    Ok(Flow::identity()
3084                        .map(|item: i32| item * 2)
3085                        .map_materialized_value(|_| 77))
3086                }),
3087                Keep::right,
3088            )
3089            .to_mat(Sink::collect(), Keep::both)
3090            .run()
3091            .unwrap();
3092        assert_eq!(future_flow.0.wait().unwrap(), 77);
3093        assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
3094    }
3095
3096    #[test]
3097    fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
3098        // Order-sensitive pin for the per-thread FIFO slot pairing: one
3099        // cloned lazy-flow blueprint used twice in the SAME chain on the
3100        // same thread. element = first_stage_id * 100 + second_stage_id, so
3101        // a cross-wired (swapped) mat pairing would yield id2 * 100 + id1.
3102        for round in 0..50 {
3103            let counter = StdArc::new(StdAtomicUsize::new(1));
3104            let factory_counter = StdArc::clone(&counter);
3105            let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
3106                let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
3107                Flow::identity()
3108                    .map(move |x: usize| x * 100 + id)
3109                    .map_materialized_value(move |_| id)
3110            });
3111            let lazy_again = lazy.clone();
3112
3113            let ((first_mat, second_mat), out) = Source::from_iter([0usize])
3114                .via_mat(lazy, Keep::right)
3115                .via_mat(lazy_again, Keep::both)
3116                .to_mat(Sink::collect(), Keep::both)
3117                .run()
3118                .unwrap();
3119
3120            let first_id = first_mat.wait().unwrap();
3121            let second_id = second_mat.wait().unwrap();
3122            let element = out.wait().unwrap()[0];
3123            assert_eq!(
3124                element,
3125                first_id * 100 + second_id,
3126                "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
3127            );
3128            assert_ne!(
3129                first_id, second_id,
3130                "round {round}: same factory instance paired twice"
3131            );
3132        }
3133    }
3134
3135    #[test]
3136    fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
3137        for _ in 0..20 {
3138            let next_id = StdArc::new(StdAtomicUsize::new(0));
3139            let next_id_for_factory = StdArc::clone(&next_id);
3140            let flow = Flow::<i32, i32>::lazy_flow(move || {
3141                let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
3142                Flow::identity()
3143                    .map(move |item: i32| item + (id as i32 * 100))
3144                    .map_materialized_value(move |_| id)
3145            });
3146            let barrier = StdArc::new(std::sync::Barrier::new(3));
3147
3148            let spawn_materialization = |input: i32| {
3149                let flow = flow.clone();
3150                let barrier = StdArc::clone(&barrier);
3151                thread::spawn(move || {
3152                    barrier.wait();
3153                    let (mat, values) = Source::single(input)
3154                        .via_mat(flow, Keep::right)
3155                        .to_mat(Sink::collect(), Keep::both)
3156                        .run()
3157                        .unwrap();
3158                    (input, mat.wait().unwrap(), values.wait().unwrap())
3159                })
3160            };
3161
3162            let first = spawn_materialization(1);
3163            let second = spawn_materialization(2);
3164            barrier.wait();
3165
3166            for result in [first.join().unwrap(), second.join().unwrap()] {
3167                let (input, mat_id, values) = result;
3168                assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
3169            }
3170            assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
3171        }
3172    }
3173
3174    #[test]
3175    fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
3176        let queue = Source::from_factory(|| {
3177            Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
3178        })
3179        .map_with_resource(
3180            || Ok(()),
3181            |_resource, item| Ok(item + 10),
3182            |_resource| Ok(Some(99)),
3183        )
3184        .run_with(Sink::queue())
3185        .unwrap();
3186
3187        assert_eq!(queue.pull().unwrap(), Some(11));
3188        assert_eq!(queue.pull().unwrap(), Some(99));
3189        assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
3190
3191        let failed: StreamResult<Vec<i32>> = Source::single(1)
3192            .map_with_resource(
3193                || Ok(()),
3194                |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
3195                |_resource| -> StreamResult<Option<i32>> {
3196                    Err(StreamError::Failed("close".into()))
3197                },
3198            )
3199            .run_collect();
3200        assert_eq!(failed, Err(StreamError::Failed("map".into())));
3201    }
3202
3203    #[test]
3204    fn stateful_and_terminal_source_operators_work() {
3205        let stateful = Source::from_iter([1, 2, 3])
3206            .stateful_map(0, |sum, item| {
3207                *sum += item;
3208                *sum
3209            })
3210            .run_collect()
3211            .unwrap();
3212        assert_eq!(stateful, vec![1, 3, 6]);
3213
3214        let concat = Source::from_iter([1, 2, 3])
3215            .stateful_map_concat(0, |sum, item| {
3216                *sum += item;
3217                [item, *sum]
3218            })
3219            .run_collect()
3220            .unwrap();
3221        assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3222
3223        assert_eq!(
3224            Source::from_iter([1, 2, 3])
3225                .fold(10, |acc, item| acc + item)
3226                .run_collect()
3227                .unwrap(),
3228            vec![16]
3229        );
3230        assert_eq!(
3231            Source::from_iter([1, 2, 3])
3232                .reduce(|acc, item| acc + item)
3233                .run_collect()
3234                .unwrap(),
3235            vec![6]
3236        );
3237    }
3238
3239    #[test]
3240    fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3241        let concat = Source::single(())
3242            .map_concat(|_| 0_u64..)
3243            .take(1)
3244            .run_collect()
3245            .unwrap();
3246        assert_eq!(concat, vec![0]);
3247
3248        let sliding = Source::repeat(1_u64)
3249            .sliding(2, 1)
3250            .take(1)
3251            .run_collect()
3252            .unwrap();
3253        assert_eq!(sliding, vec![vec![1, 1]]);
3254    }
3255
3256    #[test]
3257    fn fan_in_source_operators_follow_ordering_rules() {
3258        assert_eq!(
3259            Source::from_iter([1, 2])
3260                .concat(Source::from_iter([3, 4]))
3261                .run_collect()
3262                .unwrap(),
3263            vec![1, 2, 3, 4]
3264        );
3265        assert_eq!(
3266            Source::from_iter([3, 4])
3267                .prepend(Source::from_iter([1, 2]))
3268                .run_collect()
3269                .unwrap(),
3270            vec![1, 2, 3, 4]
3271        );
3272        assert_eq!(
3273            Source::empty()
3274                .or_else(Source::from_iter([10, 20]))
3275                .run_collect()
3276                .unwrap(),
3277            vec![10, 20]
3278        );
3279        assert_eq!(
3280            Source::from_iter([1, 2])
3281                .or_else(Source::from_iter([10, 20]))
3282                .run_collect()
3283                .unwrap(),
3284            vec![1, 2]
3285        );
3286        assert_eq!(
3287            Source::from_iter([1, 2, 3])
3288                .interleave(Source::from_iter([10, 11, 12]), 2)
3289                .run_collect()
3290                .unwrap(),
3291            vec![1, 2, 10, 11, 3, 12]
3292        );
3293    }
3294
3295    #[test]
3296    fn fan_in_flow_operators_compose_with_primary_stream() {
3297        let concat = Source::from_iter([1, 2])
3298            .via(Flow::identity().concat(Source::from_iter([3, 4])))
3299            .run_collect()
3300            .unwrap();
3301        assert_eq!(concat, vec![1, 2, 3, 4]);
3302
3303        let prepend = Source::from_iter([3, 4])
3304            .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3305            .run_collect()
3306            .unwrap();
3307        assert_eq!(prepend, vec![1, 2, 3, 4]);
3308
3309        let interleave = Source::from_iter([1, 2, 3])
3310            .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3311            .run_collect()
3312            .unwrap();
3313        assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3314
3315        let merge_sorted = Source::from_iter([1, 4])
3316            .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3317            .run_collect()
3318            .unwrap();
3319        assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3320
3321        let zip_latest = Source::from_iter([1, 2])
3322            .via(Flow::identity().zip_latest(Source::single(10)))
3323            .run_collect()
3324            .unwrap();
3325        assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3326
3327        let zip_latest_with = Source::from_iter([1, 2])
3328            .via(
3329                Flow::identity()
3330                    .zip_latest_with(Source::single(10), false, |left, right| left + right),
3331            )
3332            .run_collect()
3333            .unwrap();
3334        assert_eq!(zip_latest_with, vec![11, 12]);
3335    }
3336
3337    #[test]
3338    fn fan_in_operators_propagate_errors_and_eager_close() {
3339        assert!(matches!(
3340            Source::failed(StreamError::Failed("boom".into()))
3341                .or_else(Source::from_iter([1, 2]))
3342                .run_collect(),
3343            Err(StreamError::Failed(_))
3344        ));
3345        assert!(matches!(
3346            Source::from_iter([1, 2])
3347                .prepend(Source::failed(StreamError::Failed("boom".into())))
3348                .run_collect(),
3349            Err(StreamError::Failed(_))
3350        ));
3351        assert_eq!(
3352            Source::from_iter([1, 2])
3353                .interleave_all([Source::empty()], 1, true)
3354                .run_collect()
3355                .unwrap(),
3356            vec![1]
3357        );
3358    }
3359
3360    #[test]
3361    fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3362        use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3363
3364        let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3365            AtomicUsize::new(0),
3366            AtomicUsize::new(0),
3367            AtomicUsize::new(0),
3368        ]);
3369
3370        let make_source = |idx: usize| {
3371            let pulls = Arc::clone(&pulls);
3372            Source::from_materialized_factory(move |_| {
3373                let pulls = Arc::clone(&pulls);
3374                let mut emitted = false;
3375                Ok((
3376                    Box::new(std::iter::from_fn(move || {
3377                        pulls[idx].fetch_add(1, Ordering::SeqCst);
3378                        if !emitted && idx == 0 {
3379                            emitted = true;
3380                            Some(Ok(42))
3381                        } else {
3382                            None
3383                        }
3384                    })) as BoxStream<i32>,
3385                    NotUsed,
3386                ))
3387            })
3388        };
3389
3390        let result = make_source(0)
3391            .interleave_all([make_source(1), make_source(2)], 1, false)
3392            .run_with(Sink::head());
3393
3394        assert_eq!(wait(result.unwrap()), 42);
3395        assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3396        assert_eq!(
3397            pulls[1].load(Ordering::SeqCst),
3398            0,
3399            "second input should not be pulled when downstream cancels after first element"
3400        );
3401        assert_eq!(
3402            pulls[2].load(Ordering::SeqCst),
3403            0,
3404            "third input should not be pulled before its turn"
3405        );
3406    }
3407
3408    #[test]
3409    fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3410        assert_eq!(
3411            Source::from_iter([1, 2, 3, 4])
3412                .interleave_all(
3413                    [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3414                    1,
3415                    false
3416                )
3417                .run_collect()
3418                .unwrap(),
3419            vec![1, 10, 20, 2, 21, 3, 22, 4]
3420        );
3421    }
3422
3423    #[test]
3424    fn remaining_merge_and_zip_family_matches_expected_ordering() {
3425        assert_eq!(
3426            Source::from_iter([1, 4])
3427                .merge_sorted(Source::from_iter([2, 3, 5]))
3428                .run_collect()
3429                .unwrap(),
3430            vec![1, 2, 3, 4, 5]
3431        );
3432
3433        assert_eq!(
3434            Source::from_iter([1, 2])
3435                .merge_latest(Source::single(10), false)
3436                .run_collect()
3437                .unwrap(),
3438            vec![vec![1, 10], vec![2, 10]]
3439        );
3440
3441        assert_eq!(
3442            Source::from_iter([1, 2, 3])
3443                .merge_all([Source::from_iter([10, 11])], false)
3444                .run_collect()
3445                .unwrap(),
3446            vec![1, 10, 2, 11, 3]
3447        );
3448
3449        assert_eq!(
3450            Source::from_iter([1, 2, 3])
3451                .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3452                .run_collect()
3453                .unwrap(),
3454            vec![11, 13, 15]
3455        );
3456
3457        assert_eq!(
3458            Source::from_iter([1, 2])
3459                .zip_latest(Source::single(10))
3460                .run_collect()
3461                .unwrap(),
3462            vec![(1, 10), (2, 10)]
3463        );
3464
3465        assert_eq!(
3466            Source::from_iter([1, 2, 3])
3467                .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3468                .run_collect()
3469                .unwrap(),
3470            vec![11, 12, 13]
3471        );
3472
3473        assert_eq!(
3474            Source::from_iter([1, 2])
3475                .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3476                .run_collect()
3477                .unwrap(),
3478            vec![(1, 10), (2, 11), (-1, 12)]
3479        );
3480
3481        assert_eq!(
3482            Source::from_iter([5, 6, 7])
3483                .zip_with_index()
3484                .run_collect()
3485                .unwrap(),
3486            vec![(5, 0), (6, 1), (7, 2)]
3487        );
3488
3489        assert_eq!(
3490            Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3491                .run_collect()
3492                .unwrap(),
3493            vec![vec![1, 10], vec![2, 20]]
3494        );
3495
3496        assert_eq!(
3497            Source::zip_with_n(
3498                [
3499                    Source::from_iter([1, 2]),
3500                    Source::from_iter([10, 20]),
3501                    Source::from_iter([100, 200]),
3502                ],
3503                |values| values.into_iter().sum::<i32>(),
3504            )
3505            .run_collect()
3506            .unwrap(),
3507            vec![111, 222]
3508        );
3509
3510        assert_eq!(
3511            Source::merge_prioritized_n(
3512                [
3513                    (Source::from_iter([1, 2, 3, 4]), 2),
3514                    (Source::from_iter([10, 11]), 1),
3515                ],
3516                false,
3517            )
3518            .run_collect()
3519            .unwrap(),
3520            vec![1, 2, 10, 3, 4, 11]
3521        );
3522
3523        assert_eq!(
3524            Source::combine(
3525                Source::from_iter([1, 2, 3]),
3526                Source::from_iter([10, 11]),
3527                std::iter::empty::<Source<i32, NotUsed>>(),
3528                SourceCombineStrategy::Merge {
3529                    eager_complete: false,
3530                },
3531            )
3532            .run_collect()
3533            .unwrap(),
3534            vec![1, 10, 2, 11, 3]
3535        );
3536
3537        let combined_sink = Sink::combine(
3538            Sink::ignore(),
3539            Sink::ignore(),
3540            std::iter::empty::<Sink<i32, NotUsed>>(),
3541            SinkCombineStrategy::Broadcast,
3542        );
3543        assert_eq!(
3544            Source::from_iter([1, 2, 3])
3545                .run_with(combined_sink)
3546                .unwrap(),
3547            NotUsed
3548        );
3549    }
3550
3551    #[test]
3552    fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3553        // Regression: the combined children's materialized values were
3554        // dropped at materialization, tripping cancel-on-drop and silently
3555        // cancelling every child before it consumed anything (0 deliveries).
3556        let first_count = StdArc::new(StdAtomicUsize::new(0));
3557        let second_count = StdArc::new(StdAtomicUsize::new(0));
3558        let first_counter = StdArc::clone(&first_count);
3559        let second_counter = StdArc::clone(&second_count);
3560        let combined = Sink::combine(
3561            Sink::foreach(move |_: i32| {
3562                first_counter.fetch_add(1, StdOrdering::SeqCst);
3563            }),
3564            Sink::foreach(move |_: i32| {
3565                second_counter.fetch_add(1, StdOrdering::SeqCst);
3566            }),
3567            std::iter::empty::<Sink<i32, NotUsed>>(),
3568            SinkCombineStrategy::Broadcast,
3569        );
3570        assert_eq!(
3571            Source::from_iter(0..100).run_with(combined).unwrap(),
3572            NotUsed
3573        );
3574        // The rendezvous channels guarantee each child has received every
3575        // element before the parent send returns. The child callback may still
3576        // be one scheduler tick behind the parent, so assert with a bounded
3577        // condition wait instead of an immediate load.
3578        assert!(wait_until(StdDuration::from_secs(1), || {
3579            first_count.load(StdOrdering::SeqCst) == 100
3580                && second_count.load(StdOrdering::SeqCst) == 100
3581        }));
3582    }
3583
3584    #[test]
3585    fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3586        // Regression: with eager_complete = false, an empty side left
3587        // `latest = None` forever while the loop drained the other side —
3588        // an infinite busy-loop when that side is unbounded.
3589        assert_eq!(
3590            Source::from_iter(std::iter::empty::<i32>())
3591                .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3592                .run_collect()
3593                .unwrap(),
3594            Vec::<i32>::new()
3595        );
3596        assert_eq!(
3597            Source::repeat(10)
3598                .zip_latest_with(
3599                    Source::from_iter(std::iter::empty::<i32>()),
3600                    false,
3601                    |left, right| left + right,
3602                )
3603                .run_collect()
3604                .unwrap(),
3605            Vec::<i32>::new()
3606        );
3607    }
3608
3609    #[test]
3610    fn zip_family_completion_boundaries_match_expected_results() {
3611        assert_eq!(
3612            Source::from_iter([1, 2, 3])
3613                .zip_with(Source::from_iter([10]), |left, right| left + right)
3614                .run_collect()
3615                .unwrap(),
3616            vec![11]
3617        );
3618
3619        assert_eq!(
3620            Source::from_iter([1, 2, 3])
3621                .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3622                .run_collect()
3623                .unwrap(),
3624            vec![11, 12]
3625        );
3626
3627        assert_eq!(
3628            Source::zip_n([
3629                Source::from_iter([1, 2, 3]),
3630                Source::from_iter([10]),
3631                Source::from_iter([100, 200, 300]),
3632            ])
3633            .run_collect()
3634            .unwrap(),
3635            vec![vec![1, 10, 100]]
3636        );
3637    }
3638
3639    #[test]
3640    fn combine_strategies_follow_merge_concat_and_priority_rules() {
3641        assert_eq!(
3642            Source::combine(
3643                Source::from_iter([1, 2]),
3644                Source::from_iter([10, 11]),
3645                [Source::from_iter([100])],
3646                SourceCombineStrategy::Concat,
3647            )
3648            .run_collect()
3649            .unwrap(),
3650            vec![1, 2, 10, 11, 100]
3651        );
3652
3653        assert_eq!(
3654            Source::combine(
3655                Source::from_iter([1, 2, 3, 4]),
3656                Source::from_iter([10, 11]),
3657                std::iter::empty::<Source<i32, NotUsed>>(),
3658                SourceCombineStrategy::Prioritized {
3659                    priorities: vec![2, 1],
3660                    eager_complete: false,
3661                },
3662            )
3663            .run_collect()
3664            .unwrap(),
3665            vec![1, 2, 10, 3, 4, 11]
3666        );
3667    }
3668
3669    #[test]
3670    fn concat_lazy_defers_follow_on_source_until_needed() {
3671        let source_counter = StdArc::new(StdAtomicUsize::new(0));
3672        let source_counter_clone = StdArc::clone(&source_counter);
3673        let lazy_source = Source::from_materialized_factory(move |_| {
3674            source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3675            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3676        });
3677        let source_head = Source::single(1)
3678            .concat_lazy(lazy_source)
3679            .run_with(Sink::head());
3680        assert_eq!(wait(source_head.unwrap()), 1);
3681        assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
3682
3683        let flow_counter = StdArc::new(StdAtomicUsize::new(0));
3684        let flow_counter_clone = StdArc::clone(&flow_counter);
3685        let lazy_flow_source = Source::from_materialized_factory(move |_| {
3686            flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3687            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3688        });
3689        let flow_head = Source::single(1)
3690            .via(Flow::identity().concat_lazy(lazy_flow_source))
3691            .run_with(Sink::head());
3692        assert_eq!(wait(flow_head.unwrap()), 1);
3693        assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
3694    }
3695
3696    #[test]
3697    fn also_to_completes_when_side_sink_cancels() {
3698        assert_eq!(
3699            Source::from_iter([1, 2, 3])
3700                .also_to(Sink::cancelled())
3701                .run_collect()
3702                .unwrap(),
3703            Vec::<i32>::new()
3704        );
3705        assert_eq!(
3706            Source::from_iter([1, 2, 3])
3707                .also_to_all([Sink::cancelled(), Sink::cancelled()])
3708                .run_collect()
3709                .unwrap(),
3710            Vec::<i32>::new()
3711        );
3712    }
3713
3714    #[test]
3715    fn also_to_completes_gracefully_when_side_sink_disconnects() {
3716        let result = Source::from_iter(0..100)
3717            .also_to(Sink::head())
3718            .run_collect()
3719            .unwrap();
3720        assert!(!result.is_empty(), "main should emit at least one element");
3721        assert!(
3722            result.len() < 100,
3723            "main should complete early when side disconnects"
3724        );
3725    }
3726
3727    #[test]
3728    fn also_to_propagates_original_error_when_side_is_disconnected() {
3729        let err = StreamError::Failed("distinctive-boom".into());
3730        assert!(matches!(
3731            Source::<i32>::failed(err.clone())
3732                .also_to(Sink::cancelled())
3733                .run_collect(),
3734            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3735        ));
3736        assert!(matches!(
3737            Source::<i32>::failed(err.clone())
3738                .also_to_all([Sink::cancelled()])
3739                .run_collect(),
3740            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3741        ));
3742        assert!(matches!(
3743            Source::<i32>::failed(err)
3744                .divert_to(Sink::cancelled(), |_: &i32| true)
3745                .run_collect(),
3746            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3747        ));
3748    }
3749
3750    #[test]
3751    fn divert_to_routes_matching_elements_to_side_sink() {
3752        let diverted = Source::from_iter([1, 2, 3, 4])
3753            .divert_to(Sink::ignore(), |item| item % 2 == 0)
3754            .run_collect()
3755            .unwrap();
3756        assert_eq!(diverted, vec![1, 3]);
3757    }
3758
3759    #[test]
3760    fn wire_tap_drops_when_side_sink_backpressures() {
3761        let tapped = Source::from_iter([1, 2, 3])
3762            .wire_tap(Sink::head())
3763            .run_collect()
3764            .unwrap();
3765        assert_eq!(tapped, vec![1, 2, 3]);
3766
3767        let tapped_via_flow = Source::from_iter([1, 2, 3])
3768            .via(Flow::identity().wire_tap(Sink::head()))
3769            .run_collect()
3770            .unwrap();
3771        assert_eq!(tapped_via_flow, vec![1, 2, 3]);
3772    }
3773
3774    #[test]
3775    fn async_mapping_variants_complete() {
3776        let ordered = Source::from_iter(0..4)
3777            .map_async(2, |item| async move { Ok(item * 2) })
3778            .run_collect()
3779            .unwrap();
3780        assert_eq!(ordered, vec![0, 2, 4, 6]);
3781
3782        let unordered = Source::from_iter(0..4)
3783            .map_async_unordered(2, |item| async move { Ok(item * 2) })
3784            .run_collect()
3785            .unwrap();
3786        assert_eq!(unordered, vec![0, 2, 4, 6]);
3787
3788        let partitioned = Source::from_iter(0..4)
3789            .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
3790            .run_collect()
3791            .unwrap();
3792        assert_eq!(partitioned, vec![1, 2, 3, 4]);
3793    }
3794
3795    #[test]
3796    fn map_async_ordered_bounds_pulls_behind_stuck_head() {
3797        let pulls = StdArc::new(StdAtomicUsize::new(0));
3798        let pulls_for_source = StdArc::clone(&pulls);
3799        let probe = Source::from_fn_iter(move || {
3800            let pulls = StdArc::clone(&pulls_for_source);
3801            std::iter::from_fn(move || {
3802                let next = pulls.fetch_add(1, StdOrdering::SeqCst);
3803                Some(next)
3804            })
3805        })
3806        .map_async(2, |item| async move {
3807            if item == 0 {
3808                tokio::time::sleep(StdDuration::from_millis(300)).await;
3809            }
3810            Ok(item)
3811        })
3812        .run_with(TestSink::probe())
3813        .unwrap();
3814
3815        probe.request(16);
3816        thread::sleep(StdDuration::from_millis(100));
3817        assert!(
3818            pulls.load(StdOrdering::SeqCst) <= 3,
3819            "pulled {} elements with parallelism=2 behind a stuck ordered head",
3820            pulls.load(StdOrdering::SeqCst)
3821        );
3822    }
3823
3824    #[test]
3825    fn async_mapping_parks_until_woken_future_completes() {
3826        struct WakeOnceFuture {
3827            value: Option<u64>,
3828            ready: StdArc<StdAtomicBool>,
3829            started: bool,
3830            polls: StdArc<StdAtomicUsize>,
3831            latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
3832        }
3833
3834        impl std::future::Future for WakeOnceFuture {
3835            type Output = StreamResult<u64>;
3836
3837            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3838                let this = self.as_mut().get_mut();
3839                this.polls.fetch_add(1, StdOrdering::SeqCst);
3840                *this.latest_waker.lock().expect("latest wake slot mutex") =
3841                    Some(cx.waker().clone());
3842                if this.ready.load(StdOrdering::SeqCst) {
3843                    return Poll::Ready(Ok(this.value.take().unwrap()));
3844                }
3845
3846                if !this.started {
3847                    this.started = true;
3848                    let ready = StdArc::clone(&this.ready);
3849                    let latest_waker = StdArc::clone(&this.latest_waker);
3850                    thread::spawn(move || {
3851                        thread::sleep(Duration::from_millis(20));
3852                        ready.store(true, StdOrdering::SeqCst);
3853                        if let Some(waker) =
3854                            latest_waker.lock().expect("latest wake slot mutex").take()
3855                        {
3856                            waker.wake();
3857                        }
3858                    });
3859                }
3860                Poll::Pending
3861            }
3862        }
3863
3864        let polls = StdArc::new(StdAtomicUsize::new(0));
3865        let polls_for_stage = StdArc::clone(&polls);
3866        let start = Instant::now();
3867
3868        let values = Source::single(41)
3869            .map_async(1, move |item| WakeOnceFuture {
3870                value: Some(item + 1),
3871                ready: StdArc::new(StdAtomicBool::new(false)),
3872                started: false,
3873                polls: StdArc::clone(&polls_for_stage),
3874                latest_waker: StdArc::new(Mutex::new(None)),
3875            })
3876            .run_collect()
3877            .unwrap();
3878
3879        assert_eq!(values, vec![42]);
3880        let elapsed = start.elapsed();
3881        assert!(
3882            elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
3883            "pending future should park until woken once, elapsed={elapsed:?}"
3884        );
3885        assert!(
3886            polls.load(StdOrdering::SeqCst) < 4096,
3887            "pending future was repolled too aggressively"
3888        );
3889    }
3890
3891    #[test]
3892    fn async_mapping_emits_before_unbounded_upstream_finishes() {
3893        let ordered = Source::repeat(1)
3894            .map_async(2, |item| async move { Ok(item + 1) })
3895            .take(1)
3896            .run_collect()
3897            .unwrap();
3898        assert_eq!(ordered, vec![2]);
3899
3900        let unordered = Source::repeat(1)
3901            .map_async_unordered(2, |item| async move { Ok(item + 1) })
3902            .take(1)
3903            .run_collect()
3904            .unwrap();
3905        assert_eq!(unordered, vec![2]);
3906
3907        let partitioned = Source::repeat(1)
3908            .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
3909            .take(1)
3910            .run_collect()
3911            .unwrap();
3912        assert_eq!(partitioned, vec![2]);
3913    }
3914
3915    #[test]
3916    fn partitioned_async_mapping_limits_same_key_concurrency() {
3917        let active = StdArc::new(StdAtomicUsize::new(0));
3918        let max_active = StdArc::new(StdAtomicUsize::new(0));
3919        let active_for_stage = StdArc::clone(&active);
3920        let max_for_stage = StdArc::clone(&max_active);
3921
3922        let values = Source::from_iter(0..6)
3923            .map_async_partitioned(
3924                4,
3925                1,
3926                |_| 0_u8,
3927                move |item| {
3928                    let active = StdArc::clone(&active_for_stage);
3929                    let max_active = StdArc::clone(&max_for_stage);
3930                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3931                    max_active.fetch_max(current, StdOrdering::SeqCst);
3932                    async move {
3933                        thread::sleep(Duration::from_millis(1));
3934                        active.fetch_sub(1, StdOrdering::SeqCst);
3935                        Ok(item)
3936                    }
3937                },
3938            )
3939            .run_collect()
3940            .unwrap();
3941
3942        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
3943        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3944    }
3945
3946    #[test]
3947    fn partitioned_async_mapping_scans_past_blocked_pending_key() {
3948        let active = StdArc::new(StdAtomicUsize::new(0));
3949        let max_active = StdArc::new(StdAtomicUsize::new(0));
3950        let active_for_stage = StdArc::clone(&active);
3951        let max_for_stage = StdArc::clone(&max_active);
3952        let (release_tx, release_rx) = oneshot::channel::<()>();
3953        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
3954        let release_rx_for_stage = StdArc::clone(&release_rx);
3955        let max_for_release = StdArc::clone(&max_active);
3956
3957        let releaser = thread::spawn(move || {
3958            let deadline = Instant::now() + StdDuration::from_secs(1);
3959            while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
3960                thread::yield_now();
3961            }
3962            let _ = release_tx.send(());
3963        });
3964
3965        let values = Source::from_iter([0, 2, 1])
3966            .map_async_partitioned(
3967                2,
3968                1,
3969                |item| item % 2,
3970                move |item| {
3971                    let active = StdArc::clone(&active_for_stage);
3972                    let max_active = StdArc::clone(&max_for_stage);
3973                    let release_rx = StdArc::clone(&release_rx_for_stage);
3974                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3975                    max_active.fetch_max(current, StdOrdering::SeqCst);
3976                    async move {
3977                        if item == 0 {
3978                            let receiver = release_rx
3979                                .lock()
3980                                .expect("release receiver mutex")
3981                                .take()
3982                                .expect("release receiver present");
3983                            let _ = receiver.await;
3984                        }
3985                        active.fetch_sub(1, StdOrdering::SeqCst);
3986                        Ok(item)
3987                    }
3988                },
3989            )
3990            .run_collect()
3991            .unwrap();
3992        releaser.join().unwrap();
3993
3994        assert_eq!(values, vec![0, 2, 1]);
3995        assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
3996    }
3997
3998    #[test]
3999    fn partitioned_async_mapping_p1_still_evaluates_partition() {
4000        let partitions = StdArc::new(StdAtomicUsize::new(0));
4001        let partitions_for_stage = StdArc::clone(&partitions);
4002
4003        let values = Source::from_iter(0..8)
4004            .map_async_partitioned(
4005                1,
4006                1,
4007                move |item| {
4008                    partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
4009                    item % 2
4010                },
4011                |item| async move { Ok(item + 1) },
4012            )
4013            .run_collect()
4014            .unwrap();
4015
4016        assert_eq!(values, (1..9).collect::<Vec<_>>());
4017        assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
4018    }
4019
4020    #[test]
4021    fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
4022        let active_by_key =
4023            StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4024        let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4025        let active_for_stage = StdArc::clone(&active_by_key);
4026        let max_for_stage = StdArc::clone(&max_by_key);
4027
4028        let values = Source::from_iter(0..512_usize)
4029            .map_async_partitioned(
4030                32,
4031                1,
4032                |item| item % 16,
4033                move |item| {
4034                    let active = StdArc::clone(&active_for_stage);
4035                    let max_active = StdArc::clone(&max_for_stage);
4036                    let key = item % 16;
4037                    let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
4038                    max_active[key].fetch_max(current, StdOrdering::SeqCst);
4039                    async move {
4040                        active[key].fetch_sub(1, StdOrdering::SeqCst);
4041                        Ok(item)
4042                    }
4043                },
4044            )
4045            .run_collect()
4046            .unwrap();
4047
4048        assert_eq!(values, (0..512).collect::<Vec<_>>());
4049        for max_active in max_by_key.iter() {
4050            assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4051        }
4052    }
4053
4054    #[test]
4055    fn error_operators_map_recover_and_complete() {
4056        let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
4057            .map_error(|_| StreamError::Failed("mapped".into()))
4058            .run_collect();
4059        assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
4060
4061        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4062            .recover(|error| match error {
4063                StreamError::Failed(_) => Some(42),
4064                _ => None,
4065            })
4066            .run_collect()
4067            .unwrap();
4068        assert_eq!(recovered, vec![42]);
4069
4070        let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
4071            .recover(|_| None)
4072            .run_collect();
4073        assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
4074
4075        let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
4076            .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
4077            .run_collect()
4078            .unwrap();
4079        assert_eq!(recovered_with, vec![1, 2]);
4080
4081        let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
4082            .recover_with_retries(1, |_| None)
4083            .run_collect();
4084        assert_eq!(
4085            declined_recover_with,
4086            Err(StreamError::Failed("declined".into()))
4087        );
4088
4089        let completed = Source::from_factory(|| {
4090            Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
4091        })
4092        .on_error_complete()
4093        .run_collect()
4094        .unwrap();
4095        assert_eq!(completed, vec![1]);
4096    }
4097
4098    #[test]
4099    fn sliding_matches_akka_window_semantics() {
4100        // Full windows then stop; no spurious trailing partial windows.
4101        assert_eq!(
4102            Source::from_iter(1..=4)
4103                .sliding(3, 1)
4104                .run_collect()
4105                .unwrap(),
4106            vec![vec![1, 2, 3], vec![2, 3, 4]]
4107        );
4108        assert_eq!(
4109            Source::from_iter(1..=4)
4110                .sliding(2, 1)
4111                .run_collect()
4112                .unwrap(),
4113            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
4114        );
4115        // Exact multiple of size leaves nothing to emit at finish.
4116        assert_eq!(
4117            Source::from_iter(1..=3)
4118                .sliding(3, 1)
4119                .run_collect()
4120                .unwrap(),
4121            vec![vec![1, 2, 3]]
4122        );
4123        // Short streams emit a single partial window.
4124        assert_eq!(
4125            Source::from_iter(1..=2)
4126                .sliding(3, 1)
4127                .run_collect()
4128                .unwrap(),
4129            vec![vec![1, 2]]
4130        );
4131        // Every element in its own window.
4132        assert_eq!(
4133            Source::from_iter(1..=3)
4134                .sliding(1, 1)
4135                .run_collect()
4136                .unwrap(),
4137            vec![vec![1], vec![2], vec![3]]
4138        );
4139        // step > size skips the gap between windows.
4140        assert_eq!(
4141            Source::from_iter(1..=6)
4142                .sliding(2, 3)
4143                .run_collect()
4144                .unwrap(),
4145            vec![vec![1, 2], vec![4, 5]]
4146        );
4147        // step > size with an in-progress trailing window is dropped (Akka).
4148        assert_eq!(
4149            Source::from_iter(1..=3)
4150                .sliding(2, 4)
4151                .run_collect()
4152                .unwrap(),
4153            vec![vec![1, 2]]
4154        );
4155    }
4156
4157    #[test]
4158    fn recover_with_retries_indefinitely_like_akka() {
4159        let attempts = StdArc::new(StdAtomicUsize::new(0));
4160        let attempts_in_stage = StdArc::clone(&attempts);
4161        // The first several recovery sources keep failing; `recover_with` must
4162        // retry past Datum's old single-retry limit to reach the good source.
4163        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4164            .recover_with(move |_error| {
4165                if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
4166                    Some(Source::<i32>::failed(StreamError::Failed("again".into())))
4167                } else {
4168                    Some(Source::from_iter([42]))
4169                }
4170            })
4171            .run_collect()
4172            .unwrap();
4173        assert_eq!(recovered, vec![42]);
4174        assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
4175    }
4176
4177    #[test]
4178    fn many_concurrent_streams_do_not_starve_the_pool() {
4179        // Regression: a fixed-size pool deadlocks once `num_cpus` streams each
4180        // monopolize a worker. The growing pool must keep serving new streams.
4181        //
4182        // Use a small fixed count rather than `available_parallelism() + 2`: the
4183        // point is that several concurrent never-completing streams cannot starve
4184        // a fresh one, and a fixed count keeps this from spawning a large number
4185        // of permanent threads (which, when the whole suite runs under load, can
4186        // exhaust the thread limit and wedge the shared executor).
4187        let materializer = Materializer::new();
4188        let busy = 6_usize;
4189
4190        let mut held = Vec::with_capacity(busy);
4191        for _ in 0..busy {
4192            held.push(
4193                Source::single(1_u64)
4194                    .run_with_materializer(Sink::never(), &materializer)
4195                    .unwrap(),
4196            );
4197        }
4198
4199        for _ in 0..400 {
4200            if materializer.active_streams() >= busy {
4201                break;
4202            }
4203            thread::sleep(Duration::from_millis(5));
4204        }
4205        assert_eq!(materializer.active_streams(), busy);
4206
4207        // A fresh finite stream must still complete despite every prior worker
4208        // being occupied by a never-completing sink.
4209        let sum = Source::from_iter(0_u64..5)
4210            .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
4211            .unwrap();
4212        assert_eq!(sum.wait().unwrap(), 10);
4213
4214        materializer.shutdown();
4215        for completion in held {
4216            assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4217        }
4218    }
4219}