Skip to main content

datum/stream/
mod.rs

1//! Core push-stream vocabulary and the first reusable Source/Flow API slice.
2
3use std::{
4    collections::{BTreeMap, HashMap, VecDeque},
5    fmt,
6    future::Future,
7    hash::Hash,
8    marker::PhantomData,
9    panic::{AssertUnwindSafe, catch_unwind},
10    pin::Pin,
11    sync::{
12        Arc, Condvar, Mutex, OnceLock,
13        atomic::{AtomicBool, AtomicUsize, Ordering},
14    },
15    task::{Context, Poll},
16    thread,
17    time::Duration,
18};
19
20use futures::{channel::oneshot, executor::block_on};
21use thiserror::Error;
22use tokio::{
23    runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
24    task::{JoinError, JoinHandle},
25};
26
27pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
28pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
29pub(crate) type RuntimeTransform<In, Out> =
30    Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
31type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
32type HintedSinkRunner<In, Mat> =
33    dyn Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat> + Send + Sync;
34type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
35const STREAM_READY_SPINS: usize = 256;
36/// Spin-loop hints between consecutive drain-thread readiness checks. Back off
37/// a small block of hints between polls so the spin window still covers the
38/// common fast-completion case without burning a full busy loop before
39/// parking.
40const STREAM_SPIN_BACKOFF: usize = 8;
41const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
42
43/// Private metadata for known finite, synchronous micro-sources.
44/// Set only on Datum-owned constructors whose `next()` is guaranteed
45/// memory-backed and whose total success item count is known at blueprint time.
46#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47struct InlineMicroSourceHint {
48    max_success_items: usize,
49}
50
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52struct SourceHints {
53    inline_head_terminal: bool,
54    /// Set only for constructors where Datum owns the full iterator and can
55    /// prove it is finite, synchronous, and memory-backed.
56    inline_micro: Option<InlineMicroSourceHint>,
57    /// Set only when terminal `fold`/`collect`/`ignore` may amortize the
58    /// checked-stream cancellation/shutdown wrapper. Unknown, probe, queue, IO,
59    /// and timer-backed sources keep the per-element checked path.
60    terminal_consumer_batch: bool,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub(crate) struct SourceRuntimeHints {
65    pub(crate) inline_micro_max_success_items: Option<usize>,
66    pub(crate) terminal_consumer_batch: bool,
67}
68
69impl SourceHints {
70    const fn with_inline_micro(max_success_items: usize) -> Self {
71        Self {
72            inline_head_terminal: true,
73            inline_micro: Some(InlineMicroSourceHint { max_success_items }),
74            terminal_consumer_batch: true,
75        }
76    }
77
78    const fn with_terminal_consumer_batch() -> Self {
79        Self {
80            inline_head_terminal: false,
81            inline_micro: None,
82            terminal_consumer_batch: true,
83        }
84    }
85
86    fn after_flow(self, flow: FlowHints) -> Self {
87        if flow.preserves_inline_head_terminal {
88            // inline_micro is intentionally not propagated through any flow:
89            // even a trivial map wraps the iterator, making eligibility uncertain.
90            Self {
91                inline_head_terminal: true,
92                inline_micro: None,
93                terminal_consumer_batch: self.terminal_consumer_batch
94                    && flow.preserves_terminal_consumer_batch,
95            }
96        } else {
97            Self {
98                inline_head_terminal: false,
99                inline_micro: None,
100                terminal_consumer_batch: self.terminal_consumer_batch
101                    && flow.preserves_terminal_consumer_batch,
102            }
103        }
104    }
105
106    fn without_inline_micro(self) -> Self {
107        Self {
108            inline_head_terminal: self.inline_head_terminal,
109            inline_micro: None,
110            terminal_consumer_batch: self.terminal_consumer_batch,
111        }
112    }
113
114    fn runtime(self) -> SourceRuntimeHints {
115        SourceRuntimeHints {
116            inline_micro_max_success_items: self.inline_micro.map(|hint| hint.max_success_items),
117            terminal_consumer_batch: self.terminal_consumer_batch,
118        }
119    }
120}
121
122#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
123struct FlowHints {
124    preserves_inline_head_terminal: bool,
125    preserves_terminal_consumer_batch: bool,
126}
127
128impl FlowHints {
129    const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
130        preserves_inline_head_terminal: true,
131        preserves_terminal_consumer_batch: true,
132    };
133
134    const PRESERVES_TERMINAL_CONSUMER_BATCH: Self = Self {
135        preserves_inline_head_terminal: false,
136        preserves_terminal_consumer_batch: true,
137    };
138
139    fn then(self, next: Self) -> Self {
140        Self {
141            preserves_inline_head_terminal: self.preserves_inline_head_terminal
142                && next.preserves_inline_head_terminal,
143            preserves_terminal_consumer_batch: self.preserves_terminal_consumer_batch
144                && next.preserves_terminal_consumer_batch,
145        }
146    }
147}
148
149struct PartitionSlot<Key, Out> {
150    key: Option<Key>,
151    active: usize,
152    queued: VecDeque<(usize, Out)>,
153    in_ready_queue: bool,
154}
155
156struct AbortOnDropHandle<T> {
157    handle: JoinHandle<T>,
158}
159
160impl<T> AbortOnDropHandle<T> {
161    fn new(handle: JoinHandle<T>) -> Self {
162        Self { handle }
163    }
164}
165
166impl<T> Drop for AbortOnDropHandle<T> {
167    fn drop(&mut self) {
168        self.handle.abort();
169    }
170}
171
172impl<T> Future for AbortOnDropHandle<T> {
173    type Output = Result<T, JoinError>;
174
175    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176        Pin::new(&mut self.handle).poll(cx)
177    }
178}
179
180impl<T> Unpin for AbortOnDropHandle<T> {}
181
182pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
183    static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
184    RUNTIME.get_or_init(|| {
185        TokioRuntimeBuilder::new_multi_thread()
186            .enable_all()
187            .thread_name("datum-stream-tokio")
188            .build()
189            .expect("stream tokio runtime")
190    })
191}
192
193fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
194where
195    Fut: Future<Output = T> + Send + 'static,
196    T: Send + 'static,
197{
198    AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
199}
200
201pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
202    runtime::current_stream_cancelled()
203}
204
205pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
206where
207    F: FnOnce() -> T,
208{
209    catch_unwind(AssertUnwindSafe(f))
210        .map_err(|_| StreamError::Failed(format!("{context} panicked")))
211}
212
213impl<Key, Out> PartitionSlot<Key, Out> {
214    fn new(key: Key) -> Self {
215        Self {
216            key: Some(key),
217            active: 0,
218            queued: VecDeque::new(),
219            in_ready_queue: false,
220        }
221    }
222}
223
224#[inline(always)]
225fn partition_slot_for<Key, Out>(
226    key: Key,
227    slots_by_key: &mut HashMap<Key, usize>,
228    slots: &mut Vec<PartitionSlot<Key, Out>>,
229    free_slots: &mut Vec<usize>,
230) -> usize
231where
232    Key: Clone + Eq + Hash,
233{
234    if let Some(slot) = slots_by_key.get(&key) {
235        return *slot;
236    }
237
238    let slot = if let Some(slot) = free_slots.pop() {
239        let state = &mut slots[slot];
240        state.key = Some(key.clone());
241        state.active = 0;
242        state.queued.clear();
243        state.in_ready_queue = false;
244        slot
245    } else {
246        slots.push(PartitionSlot::new(key.clone()));
247        slots.len() - 1
248    };
249    slots_by_key.insert(key, slot);
250    slot
251}
252
253#[inline(always)]
254fn retire_partition_slot<Key, Out>(
255    slot: usize,
256    slots_by_key: &mut HashMap<Key, usize>,
257    slots: &mut [PartitionSlot<Key, Out>],
258    free_slots: &mut Vec<usize>,
259) where
260    Key: Eq + Hash,
261{
262    let state = &mut slots[slot];
263    if let Some(key) = state.key.take() {
264        slots_by_key.remove(&key);
265    }
266    state.active = 0;
267    state.queued.clear();
268    state.in_ready_queue = false;
269    free_slots.push(slot);
270}
271
272#[inline(always)]
273fn ready_partition_slot<Key, Out>(
274    slots: &mut [PartitionSlot<Key, Out>],
275    ready_slots: &mut VecDeque<usize>,
276    slot: usize,
277    per_partition: usize,
278) {
279    if let Some(state) = slots.get_mut(slot)
280        && state.key.is_some()
281        && !state.in_ready_queue
282        && state.active < per_partition
283        && !state.queued.is_empty()
284    {
285        state.in_ready_queue = true;
286        ready_slots.push_back(slot);
287    }
288}
289
290#[inline(always)]
291fn pop_ready_partition_slot<Key, Out>(
292    slots: &mut [PartitionSlot<Key, Out>],
293    ready_slots: &mut VecDeque<usize>,
294    per_partition: usize,
295) -> Option<(usize, usize, Out)> {
296    while let Some(slot) = ready_slots.pop_front() {
297        let mut requeue = false;
298        let item = if let Some(state) = slots.get_mut(slot) {
299            state.in_ready_queue = false;
300            if state.key.is_some() && state.active < per_partition {
301                let item = state.queued.pop_front().map(|(index, item)| {
302                    state.active += 1;
303                    (index, slot, item)
304                });
305                if !state.queued.is_empty() && state.active < per_partition {
306                    state.in_ready_queue = true;
307                    requeue = true;
308                }
309                item
310            } else {
311                None
312            }
313        } else {
314            None
315        };
316
317        if requeue {
318            ready_slots.push_back(slot);
319        }
320        if item.is_some() {
321            return item;
322        }
323    }
324    None
325}
326
327pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
328    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
329}
330
331struct FnSourceFactory<F>(F);
332
333impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
334where
335    F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
336{
337    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
338        (self.0)(materializer)
339    }
340}
341
342struct MapSourceFactory<In, Out, Mat, F> {
343    source: Arc<dyn SourceFactory<In, Mat>>,
344    stage: F,
345    _marker: PhantomData<fn(In) -> Out>,
346}
347
348impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
349where
350    In: Send + 'static,
351    Out: Send + 'static,
352    Mat: Send + 'static,
353    F: Fn(In) -> Out + Send + Sync + 'static,
354{
355    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
356        let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
357        Ok((
358            Box::new(MapSourceStream {
359                input: stream,
360                factory: self,
361            }),
362            mat,
363        ))
364    }
365}
366
367struct MapSourceStream<In, Out, Mat, F> {
368    input: BoxStream<In>,
369    factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
370}
371
372impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
373where
374    F: Fn(In) -> Out,
375{
376    type Item = StreamResult<Out>;
377
378    fn next(&mut self) -> Option<Self::Item> {
379        self.input
380            .next()
381            .map(|item| item.map(|item| (self.factory.stage)(item)))
382    }
383}
384
385fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
386where
387    Out: Send + 'static,
388{
389    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
390    let mut current = 0usize;
391    Box::new(std::iter::from_fn(move || {
392        loop {
393            let index = next_active_optional_stream(&streams, current, |_| true)?;
394            current = (index + 1) % streams.len().max(1);
395            let Some(stream) = streams[index].as_mut() else {
396                continue;
397            };
398            match stream.next() {
399                Some(item) => return Some(item),
400                None => {
401                    streams[index] = None;
402                    if eager_complete {
403                        return None;
404                    }
405                }
406            }
407        }
408    }))
409}
410
411fn merge_prioritized_streams<Out>(
412    streams: Vec<BoxStream<Out>>,
413    priorities: Vec<usize>,
414    eager_complete: bool,
415) -> BoxStream<Out>
416where
417    Out: Send + 'static,
418{
419    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
420    let schedule: Vec<usize> = priorities
421        .into_iter()
422        .enumerate()
423        .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
424        .collect();
425    let mut schedule_index = 0usize;
426    Box::new(std::iter::from_fn(move || {
427        loop {
428            if streams.iter().all(Option::is_none) {
429                return None;
430            }
431            let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
432            let Some(stream) = streams[index].as_mut() else {
433                continue;
434            };
435            match stream.next() {
436                Some(item) => return Some(item),
437                None => {
438                    streams[index] = None;
439                    if eager_complete {
440                        return None;
441                    }
442                }
443            }
444        }
445    }))
446}
447
448fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
449where
450    Out: Ord + Send + 'static,
451{
452    let mut left_next: Option<Out> = None;
453    let mut right_next: Option<Out> = None;
454    let mut left_done = false;
455    let mut right_done = false;
456    Box::new(std::iter::from_fn(move || {
457        loop {
458            if left_next.is_none() && !left_done {
459                match left.next() {
460                    Some(Ok(item)) => left_next = Some(item),
461                    Some(Err(error)) => return Some(Err(error)),
462                    None => left_done = true,
463                }
464            }
465            if right_next.is_none() && !right_done {
466                match right.next() {
467                    Some(Ok(item)) => right_next = Some(item),
468                    Some(Err(error)) => return Some(Err(error)),
469                    None => right_done = true,
470                }
471            }
472
473            let next = match (&left_next, &right_next) {
474                (Some(left_item), Some(right_item)) => {
475                    if left_item <= right_item {
476                        left_next.take()
477                    } else {
478                        right_next.take()
479                    }
480                }
481                (Some(_), None) if right_done => left_next.take(),
482                (None, Some(_)) if left_done => right_next.take(),
483                (None, None) if left_done && right_done => return None,
484                _ => continue,
485            };
486            if let Some(item) = next {
487                return Some(Ok(item));
488            }
489        }
490    }))
491}
492
493fn merge_latest_streams<Out>(
494    streams: Vec<BoxStream<Out>>,
495    eager_complete: bool,
496) -> BoxStream<Vec<Out>>
497where
498    Out: Clone + Send + 'static,
499{
500    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
501    let mut latest = vec![None; streams.len()];
502    let mut seen = 0usize;
503    let mut current = 0usize;
504    let mut pending = VecDeque::<Vec<Out>>::new();
505    Box::new(std::iter::from_fn(move || {
506        loop {
507            if let Some(output) = pending.pop_front() {
508                return Some(Ok(output));
509            }
510            if streams.iter().all(Option::is_none) {
511                return None;
512            }
513            let index = next_active_optional_stream(&streams, current, |_| true)?;
514            current = (index + 1) % streams.len().max(1);
515            let Some(stream) = streams[index].as_mut() else {
516                continue;
517            };
518            match stream.next() {
519                Some(Ok(item)) => {
520                    if latest[index].is_none() {
521                        seen += 1;
522                    }
523                    latest[index] = Some(item);
524                    if seen == latest.len() {
525                        pending.push_back(
526                            latest
527                                .iter()
528                                .map(|item| item.clone().expect("merge-latest initialized"))
529                                .collect(),
530                        );
531                    }
532                }
533                Some(Err(error)) => return Some(Err(error)),
534                None => {
535                    streams[index] = None;
536                    if eager_complete {
537                        return None;
538                    }
539                }
540            }
541        }
542    }))
543}
544
545fn zip_streams<Left, Right>(
546    mut left: BoxStream<Left>,
547    mut right: BoxStream<Right>,
548) -> BoxStream<(Left, Right)>
549where
550    Left: Send + 'static,
551    Right: Send + 'static,
552{
553    let mut left_next: Option<Left> = None;
554    let mut right_next: Option<Right> = None;
555    let mut left_done = false;
556    let mut right_done = false;
557    Box::new(std::iter::from_fn(move || {
558        loop {
559            if left_next.is_none() && !left_done {
560                match left.next() {
561                    Some(Ok(item)) => left_next = Some(item),
562                    Some(Err(error)) => return Some(Err(error)),
563                    None => left_done = true,
564                }
565            }
566            if right_next.is_none() && !right_done {
567                match right.next() {
568                    Some(Ok(item)) => right_next = Some(item),
569                    Some(Err(error)) => return Some(Err(error)),
570                    None => right_done = true,
571                }
572            }
573            match (left_next.take(), right_next.take()) {
574                (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
575                (left_item, right_item) => {
576                    left_next = left_item;
577                    right_next = right_item;
578                    if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
579                        return None;
580                    }
581                }
582            }
583        }
584    }))
585}
586
587fn zip_latest_with_stream<Left, Right, Out, F>(
588    mut left: BoxStream<Left>,
589    mut right: BoxStream<Right>,
590    eager_complete: bool,
591    combine: Arc<F>,
592) -> BoxStream<Out>
593where
594    Left: Clone + Send + 'static,
595    Right: Clone + Send + 'static,
596    Out: Send + 'static,
597    F: Fn(Left, Right) -> Out + Send + Sync + 'static,
598{
599    let mut left_latest: Option<Left> = None;
600    let mut right_latest: Option<Right> = None;
601    let mut left_done = false;
602    let mut right_done = false;
603    let mut turn_left = true;
604    let mut pending = VecDeque::<Out>::new();
605
606    Box::new(std::iter::from_fn(move || {
607        loop {
608            if let Some(output) = pending.pop_front() {
609                return Some(Ok(output));
610            }
611            if eager_complete && (left_done || right_done) {
612                return None;
613            }
614            if left_done && right_done {
615                return None;
616            }
617            // A side that completed without ever emitting can never be paired:
618            // no further output is possible, so draining the other (possibly
619            // infinite) side would loop forever producing nothing.
620            if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
621                return None;
622            }
623
624            let pull_left = if left_done {
625                false
626            } else if right_done {
627                true
628            } else {
629                let value = turn_left;
630                turn_left = !turn_left;
631                value
632            };
633
634            if pull_left {
635                match left.next() {
636                    Some(Ok(item)) => {
637                        left_latest = Some(item);
638                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
639                            pending.push_back(combine(left_item.clone(), right_item.clone()));
640                        }
641                    }
642                    Some(Err(error)) => return Some(Err(error)),
643                    None => {
644                        left_done = true;
645                        if eager_complete {
646                            return None;
647                        }
648                    }
649                }
650            } else {
651                match right.next() {
652                    Some(Ok(item)) => {
653                        right_latest = Some(item);
654                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
655                            pending.push_back(combine(left_item.clone(), right_item.clone()));
656                        }
657                    }
658                    Some(Err(error)) => return Some(Err(error)),
659                    None => {
660                        right_done = true;
661                        if eager_complete {
662                            return None;
663                        }
664                    }
665                }
666            }
667        }
668    }))
669}
670
671fn zip_all_stream<Left, Right>(
672    mut left: BoxStream<Left>,
673    mut right: BoxStream<Right>,
674    left_fill: Left,
675    right_fill: Right,
676) -> BoxStream<(Left, Right)>
677where
678    Left: Clone + Send + 'static,
679    Right: Clone + Send + 'static,
680{
681    let mut left_done = false;
682    let mut right_done = false;
683    Box::new(std::iter::from_fn(move || {
684        if left_done && right_done {
685            return None;
686        }
687
688        let left_item = if left_done {
689            None
690        } else {
691            match left.next() {
692                Some(Ok(item)) => Some(item),
693                Some(Err(error)) => return Some(Err(error)),
694                None => {
695                    left_done = true;
696                    None
697                }
698            }
699        };
700        let right_item = if right_done {
701            None
702        } else {
703            match right.next() {
704                Some(Ok(item)) => Some(item),
705                Some(Err(error)) => return Some(Err(error)),
706                None => {
707                    right_done = true;
708                    None
709                }
710            }
711        };
712
713        match (left_item, right_item) {
714            (None, None) if left_done && right_done => None,
715            (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
716            (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
717            (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
718            (None, None) => None,
719        }
720    }))
721}
722
723fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
724where
725    Out: Send + 'static,
726    Next: Send + 'static,
727    F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
728{
729    let count = streams.len();
730    if count == 0 {
731        return Box::new(std::iter::empty());
732    }
733    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
734    let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
735    let mut current = 0usize;
736    Box::new(std::iter::from_fn(move || {
737        loop {
738            if slots.iter().all(Option::is_some) {
739                let values = slots
740                    .iter_mut()
741                    .map(|slot| slot.take().expect("zip-n slot filled"))
742                    .collect();
743                return Some(Ok(zipper(values)));
744            }
745
746            let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
747            current = (index + 1) % count.max(1);
748            let Some(stream) = streams[index].as_mut() else {
749                continue;
750            };
751            match stream.next() {
752                Some(Ok(item)) => slots[index] = Some(item),
753                Some(Err(error)) => return Some(Err(error)),
754                None => {
755                    streams[index] = None;
756                    slots[index].as_ref()?;
757                }
758            }
759        }
760    }))
761}
762
763fn next_active_optional_stream<T, F>(
764    streams: &[Option<BoxStream<T>>],
765    current: usize,
766    predicate: F,
767) -> Option<usize>
768where
769    T: Send + 'static,
770    F: Fn(usize) -> bool,
771{
772    if streams.is_empty() {
773        return None;
774    }
775    for offset in 0..streams.len() {
776        let index = (current + offset) % streams.len();
777        if streams[index].is_some() && predicate(index) {
778            return Some(index);
779        }
780    }
781    None
782}
783
784fn next_weighted_stream<T>(
785    streams: &[Option<BoxStream<T>>],
786    schedule: &[usize],
787    schedule_index: &mut usize,
788) -> Option<usize>
789where
790    T: Send + 'static,
791{
792    if streams.is_empty() || schedule.is_empty() {
793        return None;
794    }
795    for _ in 0..schedule.len() {
796        let index = schedule[*schedule_index % schedule.len()];
797        *schedule_index = (*schedule_index + 1) % schedule.len();
798        if streams.get(index).is_some_and(Option::is_some) {
799            return Some(index);
800        }
801    }
802    None
803}
804
805pub(crate) mod async_boundary;
806mod completion;
807mod error;
808mod flow;
809mod rate;
810mod restart;
811mod runtime;
812mod sink;
813mod source;
814mod time;
815mod timer;
816
817/// Opaque hook on `Source<T>` for split-segment fast-path sources.
818/// Non-`None` only on `Source`s returned by the split fast path.
819/// Cleared by all composition operators (via/map/to_mat etc.).
820pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
821    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
822}
823
824/// Opaque hook on `Source<T>` for a private source-to-terminal fast path.
825///
826/// Implementations synchronously fill bounded batches on the sink worker
827/// spawned by the terminal sink descriptor. Sink descriptors process the batch
828/// after the hook returns, so user closures do not run under source locks.
829pub(crate) trait TerminalSourceHookDyn<In>: Send + Sync + 'static {
830    fn drain_terminal_batch(
831        &self,
832        materializer: &Materializer,
833        cancelled: &Arc<AtomicBool>,
834        batch: &mut Vec<In>,
835    ) -> StreamResult<TerminalSourceStatus>;
836
837    fn supports_direct_terminal(&self) -> bool {
838        false
839    }
840
841    fn try_register_direct_terminal(
842        &self,
843        _consumer: Box<dyn TerminalSinkConsumerDyn<In>>,
844        _cancelled: Arc<AtomicBool>,
845    ) -> Option<StreamResult<()>> {
846        None
847    }
848
849    fn cancel_terminal(&self) {}
850}
851
852#[derive(Clone, Copy, Debug, PartialEq, Eq)]
853pub(crate) enum TerminalSourceStatus {
854    Active,
855    Completed,
856}
857
858pub(crate) trait TerminalSinkConsumerDyn<In>: Send + 'static {
859    fn on_item(&mut self, item: In) -> StreamResult<()>;
860    fn finish(self: Box<Self>, result: StreamResult<()>);
861}
862
863/// Fast-path descriptor for fold/collect/ignore sinks in split-segment context.
864/// Non-`None` only for recognised sinks (`Sink::fold`, `fold_result`, `collect`, `ignore`).
865pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
866    /// Try to register directly with a split segment hook.
867    /// Returns `None` if the hook type is not recognised.
868    /// Returns `Some(Ok(mat))` on success where `mat` is `Box<StreamCompletion<Acc>>`.
869    fn try_register(
870        &self,
871        hook: Arc<dyn SplitSegmentHookDyn>,
872    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
873
874    fn supports_terminal_drain(&self) -> bool {
875        false
876    }
877
878    fn try_register_direct_terminal(
879        &self,
880        _hook: Arc<dyn TerminalSourceHookDyn<In>>,
881        _materializer: &Materializer,
882    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
883        None
884    }
885
886    fn try_register_terminal_drain(
887        &self,
888        _hook: Arc<dyn TerminalSourceHookDyn<In>>,
889        _materializer: &Materializer,
890    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
891        None
892    }
893}
894
895use self::runtime::{runtime_checked_stream, set_current_stream_cancelled};
896
897pub(crate) use self::completion::StreamCancellation;
898
899pub use self::{
900    completion::{Cancellable, StreamCompletion},
901    error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
902    flow::{BidiFlow, Flow},
903    rate::{AggregateTimer, OverflowStrategy},
904    restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
905    runtime::{Materializer, Runtime},
906    sink::{RunnableGraph, Sink, SinkCombineStrategy},
907    source::{Demand, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy},
908    time::{DelayOverflowStrategy, ThrottleMode},
909};
910
911#[cfg(test)]
912mod tests {
913    use super::*;
914    use crate::Attributes;
915    use crate::testkit::TestSink;
916    use std::fs;
917    use std::sync::{
918        Arc as StdArc,
919        atomic::{
920            AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
921        },
922        mpsc,
923    };
924    use std::time::Duration as StdDuration;
925    use std::time::Instant;
926
927    fn wait<T>(completion: StreamCompletion<T>) -> T {
928        completion.wait().unwrap()
929    }
930
931    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
932        let deadline = Instant::now() + timeout;
933        while Instant::now() < deadline {
934            if condition() {
935                return true;
936            }
937            thread::sleep(Duration::from_millis(2));
938        }
939        condition()
940    }
941
942    fn linux_thread_count(thread_name: &str) -> usize {
943        fs::read_dir("/proc/self/task")
944            .expect("task directory readable")
945            .filter_map(Result::ok)
946            .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
947            .filter(|name| name.trim() == thread_name)
948            .count()
949    }
950
951    #[test]
952    fn source_async_boundary_preserves_results() {
953        let expected = Source::from_iter(0_u64..128)
954            .map(|item| item.wrapping_add(1))
955            .filter(|item| item % 3 != 0)
956            .map(|item| item * 2)
957            .run_collect()
958            .unwrap();
959
960        let actual = Source::from_iter(0_u64..128)
961            .map(|item| item.wrapping_add(1))
962            .async_boundary()
963            .filter(|item| item % 3 != 0)
964            .map(|item| item * 2)
965            .run_collect()
966            .unwrap();
967
968        assert_eq!(actual, expected);
969    }
970
971    #[test]
972    fn flow_async_boundary_preserves_results() {
973        let expected = Source::from_iter(0_u64..128)
974            .map(|item| item + 1)
975            .map(|item| item * 3)
976            .run_collect()
977            .unwrap();
978
979        let flow = Flow::identity()
980            .map(|item: u64| item + 1)
981            .r#async()
982            .map(|item| item * 3);
983        let actual = Source::from_iter(0_u64..128)
984            .via(flow)
985            .run_collect()
986            .unwrap();
987
988        assert_eq!(actual, expected);
989    }
990
991    #[test]
992    fn linear_async_boundary_matches_graph_async_boundary_shape() {
993        use crate::{
994            AsyncBoundary, AsyncBoundaryExecutionConfig, FusedExecutionConfig, GraphDsl,
995            GraphFlowShape, MapStage,
996        };
997
998        let graph = GraphDsl::try_create(|builder| {
999            let first = builder.add(MapStage::new(|item: u64| item + 1));
1000            let boundary = builder.add(AsyncBoundary::<u64>::new());
1001            let second = builder.add(MapStage::new(|item: u64| item * 2));
1002
1003            builder.connect(first.outlet(), boundary.inlet())?;
1004            builder.connect(boundary.outlet(), second.inlet())?;
1005
1006            Ok(GraphFlowShape::new(first.inlet(), second.outlet()))
1007        })
1008        .unwrap();
1009
1010        let linear = Source::from_iter(1_u64..=4)
1011            .map(|item| item + 1)
1012            .async_boundary_with_buffer(4)
1013            .map(|item| item * 2)
1014            .run_collect()
1015            .unwrap();
1016        let graph_output = graph.run_with_input(1_u64..=4).unwrap();
1017        let report = graph
1018            .run_async_boundary_count_with_input_report(
1019                1_u64..=4,
1020                AsyncBoundaryExecutionConfig {
1021                    fused: FusedExecutionConfig { event_limit: 1024 },
1022                    buffer_size: 4,
1023                },
1024            )
1025            .unwrap();
1026
1027        assert_eq!(linear, graph_output);
1028        assert_eq!(report.result, linear.len());
1029        assert_eq!(report.async_boundary_crossings, linear.len());
1030    }
1031
1032    #[test]
1033    fn async_boundary_regions_run_concurrently() {
1034        let (upstream_tx, upstream_rx) = mpsc::channel::<u64>();
1035        let (downstream_blocked_tx, downstream_blocked_rx) = mpsc::channel::<()>();
1036        let (release_tx, release_rx) = mpsc::channel::<()>();
1037        let release_rx = StdArc::new(Mutex::new(release_rx));
1038
1039        let completion = Source::from_iter(0_u64..3)
1040            .map(move |item| {
1041                upstream_tx.send(item).expect("upstream probe receives");
1042                item
1043            })
1044            .async_boundary_with_buffer(1)
1045            .map({
1046                let release_rx = StdArc::clone(&release_rx);
1047                move |item| {
1048                    if item == 0 {
1049                        downstream_blocked_tx
1050                            .send(())
1051                            .expect("downstream probe receives");
1052                        release_rx
1053                            .lock()
1054                            .expect("release receiver lock")
1055                            .recv_timeout(StdDuration::from_secs(2))
1056                            .expect("downstream release arrives");
1057                    }
1058                    item
1059                }
1060            })
1061            .run_with(Sink::collect())
1062            .unwrap();
1063
1064        assert_eq!(
1065            downstream_blocked_rx.recv_timeout(StdDuration::from_secs(2)),
1066            Ok(())
1067        );
1068        assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1069        assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1070
1071        release_tx.send(()).expect("release downstream");
1072        assert_eq!(completion.wait().unwrap(), vec![0, 1, 2]);
1073    }
1074
1075    #[test]
1076    fn async_boundary_backpressures_slow_downstream() {
1077        let (produced_tx, produced_rx) = mpsc::channel::<u64>();
1078        let (release_tx, release_rx) = mpsc::channel::<()>();
1079        let release_rx = StdArc::new(Mutex::new(release_rx));
1080
1081        let completion = Source::from_iter(0_u64..8)
1082            .map(move |item| {
1083                produced_tx.send(item).expect("producer probe receives");
1084                item
1085            })
1086            .async_boundary_with_buffer(1)
1087            .map({
1088                let release_rx = StdArc::clone(&release_rx);
1089                move |item| {
1090                    if item == 0 {
1091                        release_rx
1092                            .lock()
1093                            .expect("release receiver lock")
1094                            .recv_timeout(StdDuration::from_secs(2))
1095                            .expect("downstream release arrives");
1096                    }
1097                    item
1098                }
1099            })
1100            .run_with(Sink::collect())
1101            .unwrap();
1102
1103        assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1104        assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1105        if let Ok(item) = produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1106            assert_eq!(item, 2);
1107        }
1108        match produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1109            Err(mpsc::RecvTimeoutError::Timeout) => {}
1110            other => panic!("async boundary handoff was not bounded: {other:?}"),
1111        }
1112
1113        release_tx.send(()).expect("release downstream");
1114        assert_eq!(completion.wait().unwrap(), (0_u64..8).collect::<Vec<_>>());
1115    }
1116
1117    #[test]
1118    fn source_blueprints_are_reusable() {
1119        let source = Source::from_iter(0..5).map(|item| item + 1);
1120
1121        assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1122        assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1123    }
1124
1125    #[test]
1126    fn source_map_preserves_materialized_value() {
1127        let graph = Source::single(1)
1128            .map_materialized_value(|_| "source")
1129            .map(|item| item + 1)
1130            .to_mat(Sink::head(), Keep::both);
1131
1132        let materialized = graph.run().unwrap();
1133        assert_eq!(materialized.0, "source");
1134        assert_eq!(wait(materialized.1), 2);
1135    }
1136
1137    #[test]
1138    fn source_and_flow_compose() {
1139        let flow = Flow::identity()
1140            .map(|item: i32| item * 2)
1141            .filter(|item| item % 3 == 0);
1142
1143        let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
1144
1145        assert_eq!(result, vec![0, 6, 12]);
1146    }
1147
1148    #[test]
1149    fn sink_setup_sees_materializer_defaults_and_local_attributes() {
1150        let observed = StdArc::new(Mutex::new(None));
1151        let observed_in_setup = StdArc::clone(&observed);
1152        let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
1153            *observed_in_setup.lock().unwrap() = Some((
1154                attrs.name().map(str::to_owned),
1155                attrs.input_buffer_hint(),
1156                attrs.dispatcher_hint().map(str::to_owned),
1157            ));
1158            Sink::ignore()
1159        })
1160        .add_attributes(Attributes::named("sink-inner"))
1161        .add_attributes(Attributes::input_buffer(4, 4))
1162        .add_attributes(Attributes::dispatcher("bench-dispatcher"));
1163
1164        let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
1165        wait(
1166            Source::from_iter([1, 2, 3])
1167                .run_with_materializer(sink, &materializer)
1168                .unwrap(),
1169        );
1170
1171        assert_eq!(
1172            *observed.lock().unwrap(),
1173            Some((
1174                Some("sink-inner".to_owned()),
1175                Some((4, 4)),
1176                Some("bench-dispatcher".to_owned())
1177            ))
1178        );
1179    }
1180
1181    #[test]
1182    fn sink_pre_materialize_feeds_existing_materialization() {
1183        let materializer = Materializer::new();
1184        let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
1185            .pre_materialize(&materializer)
1186            .unwrap();
1187
1188        Source::from_iter([1, 2, 3])
1189            .run_with_materializer(pre, &materializer)
1190            .unwrap();
1191
1192        assert_eq!(wait(completion), vec![1, 2, 3]);
1193    }
1194
1195    #[test]
1196    fn flow_from_sink_and_source_connects_both_sides() {
1197        assert_eq!(
1198            Source::from_iter([1, 2, 3])
1199                .via(Flow::from_sink_and_source(
1200                    Sink::foreach(|_item: i32| {}),
1201                    Source::from_iter([10, 20, 30]),
1202                ))
1203                .run_collect()
1204                .unwrap(),
1205            vec![10, 20, 30]
1206        );
1207    }
1208
1209    #[test]
1210    fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
1211        let completed = StdArc::new(StdAtomicBool::new(false));
1212        let on_complete = StdArc::clone(&completed);
1213        let flow = Flow::from_sink_and_source(
1214            Sink::on_complete(move || {
1215                on_complete.store(true, StdOrdering::SeqCst);
1216            }),
1217            Source::single(10),
1218        );
1219
1220        let result = Source::from_iter([1, 2, 3])
1221            .via(flow)
1222            .run_collect()
1223            .unwrap();
1224
1225        assert_eq!(result, vec![10]);
1226        assert!(wait_until(StdDuration::from_secs(1), || {
1227            completed.load(StdOrdering::SeqCst)
1228        }));
1229    }
1230
1231    #[test]
1232    fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
1233        let cancellable = StdArc::new(Mutex::new(None));
1234        let observed = StdArc::clone(&cancellable);
1235        let flow = Flow::from_sink_and_source_coupled(
1236            Sink::ignore(),
1237            Source::tick(
1238                StdDuration::from_millis(50),
1239                StdDuration::from_millis(50),
1240                10,
1241            )
1242            .map_materialized_value(move |handle| {
1243                *observed.lock().unwrap() = Some(handle.clone());
1244                handle
1245            }),
1246        );
1247
1248        let completion = Source::from_iter(std::iter::empty::<i32>())
1249            .via(flow)
1250            .run_with(Sink::ignore())
1251            .unwrap();
1252        assert!(wait_until(StdDuration::from_secs(1), || {
1253            cancellable
1254                .lock()
1255                .unwrap()
1256                .as_ref()
1257                .is_some_and(Cancellable::is_cancelled)
1258        }));
1259        assert_eq!(wait(completion), NotUsed);
1260    }
1261
1262    #[test]
1263    fn bidi_flow_join_and_atop_compose() {
1264        let codec = BidiFlow::from_flows(
1265            Flow::identity().map(|item: i32| item + 1),
1266            Flow::identity().map(|item: i32| item * 2),
1267        )
1268        .named("codec");
1269        let framing = BidiFlow::from_flows(
1270            Flow::identity().map(|item: i32| item * 3),
1271            Flow::identity().map(|item: i32| item - 4),
1272        );
1273
1274        let joined = codec
1275            .clone()
1276            .join(Flow::identity().map(|item: i32| item - 5));
1277        let stacked = codec.atop(framing).join(Flow::identity());
1278
1279        assert_eq!(
1280            Source::single(10).via(joined).run_collect().unwrap(),
1281            vec![12]
1282        );
1283        assert_eq!(
1284            Source::single(10).via(stacked).run_collect().unwrap(),
1285            vec![58]
1286        );
1287    }
1288
1289    #[test]
1290    fn flow_buffer_then_map_runs_end_to_end() {
1291        let flow = Flow::identity()
1292            .buffer(8, OverflowStrategy::Backpressure)
1293            .map(|item: i32| item + 1);
1294
1295        let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1296
1297        assert_eq!(result, vec![1, 2, 3, 4]);
1298    }
1299
1300    #[test]
1301    fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1302        fn buffered_flow() -> Flow<i32, i32> {
1303            Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1304        }
1305
1306        assert_eq!(
1307            Source::from_iter(0..4)
1308                .via(buffered_flow().filter(|item| *item % 2 == 0))
1309                .run_collect()
1310                .unwrap(),
1311            vec![0, 2]
1312        );
1313        assert_eq!(
1314            Source::from_iter(0..4)
1315                .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1316                .run_collect()
1317                .unwrap(),
1318            vec![1, 3]
1319        );
1320        assert_eq!(
1321            Source::from_iter(0..4)
1322                .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1323                .run_collect()
1324                .unwrap(),
1325            vec![10, 12]
1326        );
1327        assert_eq!(
1328            Source::from_iter(0..3)
1329                .via(buffered_flow().map_concat(|item| [item, item + 10]))
1330                .run_collect()
1331                .unwrap(),
1332            vec![0, 10, 1, 11, 2, 12]
1333        );
1334        assert_eq!(
1335            Source::from_iter(0..3)
1336                .via(buffered_flow().stateful_map(5, |state, item| {
1337                    *state += item;
1338                    *state
1339                }))
1340                .run_collect()
1341                .unwrap(),
1342            vec![5, 6, 8]
1343        );
1344        assert_eq!(
1345            Source::from_iter(0..3)
1346                .via(buffered_flow().stateful_map_concat(0, |state, item| {
1347                    *state += item;
1348                    [*state, item]
1349                }))
1350                .run_collect()
1351                .unwrap(),
1352            vec![0, 0, 1, 1, 3, 2]
1353        );
1354        assert_eq!(
1355            Source::from_iter(0..4)
1356                .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1357                .run_collect()
1358                .unwrap(),
1359            vec![1, 2, 3, 4]
1360        );
1361        assert_eq!(
1362            Source::from_iter(0..4)
1363                .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1364                .run_collect()
1365                .unwrap(),
1366            vec![1, 2, 3, 4]
1367        );
1368        assert_eq!(
1369            Source::from_iter(0..4)
1370                .via(buffered_flow().map_async_partitioned(
1371                    2,
1372                    1,
1373                    |item| item % 2,
1374                    |item| async move { Ok(item + 1) },
1375                ))
1376                .run_collect()
1377                .unwrap(),
1378            vec![1, 2, 3, 4]
1379        );
1380        assert_eq!(
1381            Source::from_iter(0..5)
1382                .via(buffered_flow().take(3))
1383                .run_collect()
1384                .unwrap(),
1385            vec![0, 1, 2]
1386        );
1387        assert_eq!(
1388            Source::from_iter(0..5)
1389                .via(buffered_flow().drop(2))
1390                .run_collect()
1391                .unwrap(),
1392            vec![2, 3, 4]
1393        );
1394        assert_eq!(
1395            Source::from_iter(0..5)
1396                .via(buffered_flow().take_while(|item| *item < 3))
1397                .run_collect()
1398                .unwrap(),
1399            vec![0, 1, 2]
1400        );
1401        assert_eq!(
1402            Source::from_iter(0..5)
1403                .via(buffered_flow().drop_while(|item| *item < 3))
1404                .run_collect()
1405                .unwrap(),
1406            vec![3, 4]
1407        );
1408        assert_eq!(
1409            Source::from_iter(0..3)
1410                .via(buffered_flow().limit(5))
1411                .run_collect()
1412                .unwrap(),
1413            vec![0, 1, 2]
1414        );
1415        assert_eq!(
1416            Source::from_iter(0..5)
1417                .via(buffered_flow().grouped(2))
1418                .run_collect()
1419                .unwrap(),
1420            vec![vec![0, 1], vec![2, 3], vec![4]]
1421        );
1422        assert_eq!(
1423            Source::from_iter(1..=3)
1424                .via(buffered_flow().scan(0, |acc, item| acc + item))
1425                .run_collect()
1426                .unwrap(),
1427            vec![0, 1, 3, 6]
1428        );
1429        assert_eq!(
1430            Source::from_iter(1..=4)
1431                .via(buffered_flow().sliding(2, 1))
1432                .run_collect()
1433                .unwrap(),
1434            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1435        );
1436        assert_eq!(
1437            Source::from_iter(1..=4)
1438                .via(buffered_flow().fold(0, |acc, item| acc + item))
1439                .run_collect()
1440                .unwrap(),
1441            vec![10]
1442        );
1443        assert_eq!(
1444            Source::from_iter(1..=4)
1445                .via(buffered_flow().reduce(|acc, item| acc + item))
1446                .run_collect()
1447                .unwrap(),
1448            vec![10]
1449        );
1450        assert_eq!(
1451            Source::from_factory(|| {
1452                Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1453            })
1454            .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1455            .run_collect(),
1456            Err(StreamError::Failed("mapped".into()))
1457        );
1458        assert_eq!(
1459            Source::<i32>::failed(StreamError::Failed("boom".into()))
1460                .via(buffered_flow().recover(|_| Some(42)))
1461                .run_collect()
1462                .unwrap(),
1463            vec![42]
1464        );
1465        assert_eq!(
1466            Source::<i32>::failed(StreamError::Failed("boom".into()))
1467                .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1468                .run_collect()
1469                .unwrap(),
1470            vec![7, 8]
1471        );
1472        assert_eq!(
1473            Source::<i32>::failed(StreamError::Failed("boom".into()))
1474                .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1475                .run_collect()
1476                .unwrap(),
1477            vec![9]
1478        );
1479        assert_eq!(
1480            Source::from_factory(|| {
1481                Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1482            })
1483            .via(buffered_flow().on_error_complete())
1484            .run_collect()
1485            .unwrap(),
1486            vec![1]
1487        );
1488
1489        let materialized = Source::from_iter([1, 2, 3])
1490            .run_with(
1491                buffered_flow()
1492                    .via(Flow::identity().map(|item| item + 1))
1493                    .map_materialized_value(|_| "buffered-flow")
1494                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1495            )
1496            .unwrap();
1497        assert_eq!(materialized.0, "buffered-flow");
1498        assert_eq!(wait(materialized.1), 9);
1499
1500        let kept = Source::from_iter([1, 2, 3])
1501            .run_with(
1502                buffered_flow()
1503                    .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1504                    .to(Sink::fold(0, |acc, item| acc + item)),
1505            )
1506            .unwrap();
1507        assert_eq!(kept, "combined");
1508    }
1509
1510    #[test]
1511    fn runtime_rate_flows_compose_in_flow_form() {
1512        let conflate = Flow::identity()
1513            .conflate(|left: i32, right| left + right)
1514            .map(|item| item + 1);
1515        assert_eq!(
1516            Source::single(4).via(conflate).run_collect().unwrap(),
1517            vec![5]
1518        );
1519
1520        let batch = Flow::identity()
1521            .batch(4, |item: i32| item, |left, right| left + right)
1522            .map(|item| item + 1);
1523        assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1524
1525        let expand = Flow::identity()
1526            .expand(std::iter::once::<i32>)
1527            .map(|item| item + 1);
1528        assert_eq!(
1529            Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1530            vec![1, 2, 3, 4]
1531        );
1532
1533        let aggregate = Flow::identity()
1534            .aggregate_with_boundary(
1535                Vec::<i32>::new,
1536                |mut items, item| {
1537                    items.push(item);
1538                    let ready = !items.is_empty();
1539                    (items, ready)
1540                },
1541                |items| items.into_iter().sum::<i32>(),
1542                None,
1543            )
1544            .map(|item| item + 1);
1545        assert_eq!(
1546            Source::from_iter(0..4)
1547                .via(aggregate)
1548                .run_collect()
1549                .unwrap(),
1550            vec![1, 2, 3, 4]
1551        );
1552
1553        let detached = Flow::identity().detach().map(|item: i32| item + 1);
1554        assert_eq!(
1555            Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1556            vec![1, 2, 3, 4]
1557        );
1558    }
1559
1560    #[test]
1561    fn high_use_source_flow_operators_work() {
1562        let result = Source::from_iter(0..8)
1563            .drop(1)
1564            .take(5)
1565            .filter_not(|item| item % 2 == 0)
1566            .map_concat(|item| [item, item + 10])
1567            .grouped(3)
1568            .run_collect()
1569            .unwrap();
1570
1571        assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1572    }
1573
1574    #[test]
1575    fn prefix_and_tail_emits_prefix_and_live_tail() {
1576        let mut outer = Source::from_iter(0..5)
1577            .prefix_and_tail(2)
1578            .run_collect()
1579            .unwrap();
1580        assert_eq!(outer.len(), 1);
1581        let (prefix, tail) = outer.pop().unwrap();
1582        assert_eq!(prefix, vec![0, 1]);
1583        assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1584        assert_eq!(
1585            tail.run_collect(),
1586            Err(StreamError::Failed(
1587                "substream source cannot be materialized more than once".into()
1588            ))
1589        );
1590    }
1591
1592    #[test]
1593    fn prefix_and_tail_fails_before_prefix_is_ready() {
1594        let result = Source::from_factory(|| {
1595            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1596        })
1597        .prefix_and_tail(2)
1598        .run_collect();
1599        assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1600    }
1601
1602    #[test]
1603    fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1604        let mut outer = Source::from_factory(|| {
1605            Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1606        })
1607        .prefix_and_tail(2)
1608        .run_collect()
1609        .unwrap();
1610        let (prefix, tail) = outer.pop().unwrap();
1611        assert_eq!(prefix, vec![1, 2]);
1612        assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1613    }
1614
1615    #[test]
1616    fn prefix_and_tail_accepts_non_clone_elements() {
1617        #[derive(Debug, PartialEq, Eq)]
1618        struct NonClone(u8);
1619
1620        let mut outer = Source::from_factory(|| {
1621            Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1622        })
1623        .prefix_and_tail(2)
1624        .run_collect()
1625        .unwrap();
1626        let (prefix, tail) = outer.pop().unwrap();
1627        assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1628        assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1629    }
1630
1631    #[test]
1632    fn flat_map_prefix_materializes_on_short_upstream_completion() {
1633        let values = Source::from_iter([1, 2])
1634            .flat_map_prefix(3, |prefix| {
1635                let sum = prefix.into_iter().sum::<i32>();
1636                Flow::identity().prepend(Source::single(sum))
1637            })
1638            .run_collect()
1639            .unwrap();
1640        assert_eq!(values, vec![3]);
1641    }
1642
1643    #[test]
1644    fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1645        let invoked = StdArc::new(StdAtomicBool::new(false));
1646        let invoked_for_stage = StdArc::clone(&invoked);
1647        let result = Source::from_factory(|| {
1648            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1649        })
1650        .flat_map_prefix(3, move |_prefix| {
1651            invoked_for_stage.store(true, StdOrdering::SeqCst);
1652            Flow::identity()
1653        })
1654        .run_collect();
1655        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1656        assert!(!invoked.load(StdOrdering::SeqCst));
1657    }
1658
1659    #[test]
1660    fn flat_map_concat_flattens_nested_sources_sequentially() {
1661        let values = Source::from_iter([1, 2, 3])
1662            .flat_map_concat(|item| Source::from_iter(0..item))
1663            .run_collect()
1664            .unwrap();
1665        assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1666    }
1667
1668    #[test]
1669    fn flat_map_merge_respects_breadth_bound() {
1670        let active = StdArc::new(StdAtomicUsize::new(0));
1671        let max_active = StdArc::new(StdAtomicUsize::new(0));
1672        let active_for_stage = StdArc::clone(&active);
1673        let max_for_stage = StdArc::clone(&max_active);
1674
1675        let mut values = Source::from_iter(0..6)
1676            .flat_map_merge(2, move |item| {
1677                let active = StdArc::clone(&active_for_stage);
1678                let max_active = StdArc::clone(&max_for_stage);
1679                Source::future(move || {
1680                    let active = StdArc::clone(&active);
1681                    let max_active = StdArc::clone(&max_active);
1682                    async move {
1683                        let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1684                        loop {
1685                            let seen = max_active.load(StdOrdering::SeqCst);
1686                            if now <= seen {
1687                                break;
1688                            }
1689                            if max_active
1690                                .compare_exchange(
1691                                    seen,
1692                                    now,
1693                                    StdOrdering::SeqCst,
1694                                    StdOrdering::SeqCst,
1695                                )
1696                                .is_ok()
1697                            {
1698                                break;
1699                            }
1700                        }
1701                        thread::sleep(StdDuration::from_millis(20));
1702                        active.fetch_sub(1, StdOrdering::SeqCst);
1703                        Ok(item)
1704                    }
1705                })
1706            })
1707            .run_collect()
1708            .unwrap();
1709        values.sort_unstable();
1710        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1711        assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1712    }
1713
1714    #[test]
1715    fn flat_map_merge_propagates_inner_failures() {
1716        let result = Source::from_iter([0, 1, 2])
1717            .flat_map_merge(2, |item| {
1718                if item == 1 {
1719                    Source::failed(StreamError::Failed("boom".into()))
1720                } else {
1721                    Source::single(item)
1722                }
1723            })
1724            .run_collect();
1725        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1726    }
1727
1728    #[test]
1729    fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1730        let (release_tx, release_rx) = mpsc::channel();
1731        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1732        let queue = Source::from_factory(move || {
1733            let release_rx = StdArc::clone(&release_rx);
1734            let mut step = 0_u8;
1735            Box::new(std::iter::from_fn(move || {
1736                let item = match step {
1737                    0 => Some(Ok(0)),
1738                    1 => {
1739                        release_rx
1740                            .lock()
1741                            .unwrap()
1742                            .as_ref()
1743                            .expect("release receiver available")
1744                            .recv_timeout(StdDuration::from_secs(1))
1745                            .expect("timed out waiting to release second upstream element");
1746                        Some(Ok(1))
1747                    }
1748                    _ => None,
1749                };
1750                step += 1;
1751                item
1752            }))
1753        })
1754        .flat_map_merge(2, |item| Source::single(item + 10))
1755        .run_with(Sink::queue())
1756        .unwrap();
1757
1758        assert_eq!(queue.pull().unwrap(), Some(10));
1759        release_tx.send(()).unwrap();
1760        assert_eq!(queue.pull().unwrap(), Some(11));
1761        assert!(queue.pull().unwrap().is_none());
1762    }
1763
1764    #[test]
1765    fn group_by_routes_keys_and_drops_closed_keys() {
1766        let outer = Source::from_iter([0, 1, 2, 3, 4])
1767            .group_by(4, |item| item % 2, false)
1768            .run_with(Sink::queue())
1769            .unwrap();
1770
1771        let even = outer.pull().unwrap().unwrap();
1772        let even_completion = even.run_with(Sink::ignore()).unwrap();
1773        let odd = outer.pull().unwrap().unwrap();
1774        drop(even_completion);
1775
1776        assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1777        assert!(outer.pull().unwrap().is_none());
1778    }
1779
1780    #[test]
1781    fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1782        let outer = Source::from_iter([0, 1, 2])
1783            .group_by(2, |item| *item, false)
1784            .run_with(Sink::queue())
1785            .unwrap();
1786
1787        let _ = outer.pull().unwrap().unwrap();
1788        let _ = outer.pull().unwrap().unwrap();
1789        assert!(matches!(
1790            outer.pull(),
1791            Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1792        ));
1793    }
1794
1795    #[test]
1796    fn group_by_can_recreate_closed_substreams_when_enabled() {
1797        let (release_tx, release_rx) = mpsc::channel();
1798        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1799        let outer = Source::from_factory(move || {
1800            let release_rx = StdArc::clone(&release_rx);
1801            let mut step = 0_u8;
1802            Box::new(std::iter::from_fn(move || {
1803                let item = match step {
1804                    0 => Some(Ok(0)),
1805                    1 => Some(Ok(1)),
1806                    2 => {
1807                        release_rx
1808                            .lock()
1809                            .unwrap()
1810                            .as_ref()
1811                            .expect("release receiver available")
1812                            .recv_timeout(StdDuration::from_secs(1))
1813                            .expect("timed out waiting to release recreated key");
1814                        Some(Ok(0))
1815                    }
1816                    _ => None,
1817                };
1818                step += 1;
1819                item
1820            }))
1821        })
1822        .group_by(4, |item| item % 2, true)
1823        .run_with(Sink::queue())
1824        .unwrap();
1825
1826        let even = outer.pull().unwrap().unwrap();
1827        assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1828        release_tx.send(()).unwrap();
1829
1830        let odd = outer.pull().unwrap().unwrap();
1831        assert_eq!(odd.run_collect().unwrap(), vec![1]);
1832
1833        let recreated_even = outer.pull().unwrap().unwrap();
1834        assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1835        assert!(outer.pull().unwrap().is_none());
1836    }
1837
1838    #[test]
1839    fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1840        let outer = Source::from_iter([0, 1])
1841            .group_by(
1842                4,
1843                |item| {
1844                    assert_ne!(*item, 1, "boom");
1845                    item % 2
1846                },
1847                false,
1848            )
1849            .run_with(Sink::queue())
1850            .unwrap();
1851
1852        let substream = outer.pull().unwrap().unwrap();
1853        let (result_tx, result_rx) = mpsc::channel();
1854        thread::spawn(move || {
1855            let _ = result_tx.send(substream.run_collect());
1856        });
1857
1858        assert_eq!(
1859            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1860            Err(StreamError::AbruptTermination)
1861        );
1862        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1863    }
1864
1865    #[test]
1866    fn split_when_starts_new_substream_on_boundary_element() {
1867        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1868            .split_when(|item| *item == 0)
1869            .run_with(Sink::queue())
1870            .unwrap();
1871
1872        let first = outer.pull().unwrap().unwrap();
1873        assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1874        let second = outer.pull().unwrap().unwrap();
1875        assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1876        let third = outer.pull().unwrap().unwrap();
1877        assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1878        assert!(outer.pull().unwrap().is_none());
1879    }
1880
1881    #[test]
1882    fn split_after_ends_current_substream_on_boundary_element() {
1883        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1884            .split_after(|item| *item == 0)
1885            .run_with(Sink::queue())
1886            .unwrap();
1887
1888        let first = outer.pull().unwrap().unwrap();
1889        assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1890        let second = outer.pull().unwrap().unwrap();
1891        assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1892        let third = outer.pull().unwrap().unwrap();
1893        assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1894        assert!(outer.pull().unwrap().is_none());
1895    }
1896
1897    #[test]
1898    fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1899        let outer = Source::from_iter([1, 2])
1900            .split_when(|item| {
1901                assert_ne!(*item, 2, "boom");
1902                false
1903            })
1904            .run_with(Sink::queue())
1905            .unwrap();
1906
1907        let substream = outer.pull().unwrap().unwrap();
1908        let (result_tx, result_rx) = mpsc::channel();
1909        thread::spawn(move || {
1910            let _ = result_tx.send(substream.run_collect());
1911        });
1912
1913        assert_eq!(
1914            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1915            Err(StreamError::AbruptTermination)
1916        );
1917        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1918    }
1919
1920    #[test]
1921    fn split_when_pre_buffer_segments_match_expected_count() {
1922        let outer = Source::from_iter(0..100)
1923            .split_when(|item| *item != 0 && *item % 10 == 0)
1924            .run_with(Sink::queue())
1925            .unwrap();
1926        let mut segment_count = 0;
1927        while let Some(substream) = outer.pull().unwrap() {
1928            let items: Vec<i32> = substream.run_collect().unwrap();
1929            assert!(!items.is_empty(), "segment should not be empty");
1930            segment_count += 1;
1931        }
1932        assert_eq!(segment_count, 10, "100 elements in segments of 10");
1933    }
1934
1935    #[test]
1936    fn split_after_pre_buffer_segments_match_expected_count() {
1937        let outer = Source::from_iter(0..100)
1938            .split_after(|item| (*item + 1) % 10 == 0)
1939            .run_with(Sink::queue())
1940            .unwrap();
1941        let mut segment_count = 0;
1942        let mut total = 0_i32;
1943        while let Some(substream) = outer.pull().unwrap() {
1944            let items: Vec<i32> = substream.run_collect().unwrap();
1945            assert!(!items.is_empty(), "segment should not be empty");
1946            total += items.len() as i32;
1947            segment_count += 1;
1948        }
1949        assert_eq!(segment_count, 10);
1950        assert_eq!(total, 100);
1951    }
1952
1953    #[test]
1954    fn group_by_single_key_fused_matches_general_path() {
1955        let outer = Source::from_iter(0..1000i64)
1956            .group_by(1, |_| 0u8, false)
1957            .run_with(Sink::queue())
1958            .unwrap();
1959        let substream = outer.pull().unwrap().unwrap();
1960        let items: Vec<i64> = substream.run_collect().unwrap();
1961        assert_eq!(items.len(), 1000);
1962        assert_eq!(items[0], 0);
1963        assert_eq!(items[999], 999);
1964        assert!(outer.pull().unwrap().is_none());
1965    }
1966
1967    #[test]
1968    fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
1969        let outer = Source::from_iter([0, 1, 0])
1970            .group_by(2, |item| *item, false)
1971            .run_with(Sink::queue())
1972            .unwrap();
1973        let mut sources = vec![];
1974        while let Some(source) = outer.pull().unwrap() {
1975            sources.push(source);
1976        }
1977        assert_eq!(sources.len(), 2);
1978        assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
1979        assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
1980    }
1981
1982    #[test]
1983    fn flat_map_merge_lock_lighter_matches_expected_count() {
1984        let items = Source::from_iter(0..20)
1985            .flat_map_merge(2, |item| Source::single(item + 100))
1986            .run_with(Sink::queue())
1987            .unwrap();
1988        let mut count = 0;
1989        while items.pull().unwrap().is_some() {
1990            count += 1;
1991        }
1992        assert_eq!(count, 20);
1993    }
1994
1995    // Verify that group_by emits the substream BEFORE upstream completes —
1996    // i.e., the worker does not buffer all elements before delivery.
1997    //
1998    // The test uses a rendezvous channel to synchronize: the group_by worker
1999    // will block waiting for more items (the channel is empty after item 0),
2000    // so if the outer substream is NOT emitted before item 1 arrives, the
2001    // `outer.pull()` call in a separate thread cannot return during that window.
2002    // With the old "single_key_buf" batching, pull() would block until a
2003    // key-change or EOF — which is a liveness bug for infinite or slow sources.
2004    #[test]
2005    fn group_by_single_key_emits_substream_before_upstream_completes() {
2006        // sync_channel(0): rendezvous — sender blocks until receiver is ready,
2007        // guaranteeing the worker processes exactly one item then waits.
2008        let (tx, rx) = mpsc::sync_channel::<i32>(0);
2009        let rx = StdArc::new(std::sync::Mutex::new(rx));
2010
2011        let outer = Source::from_factory({
2012            let rx = StdArc::clone(&rx);
2013            move || {
2014                let rx = StdArc::clone(&rx);
2015                Box::new(std::iter::from_fn(move || {
2016                    rx.lock().unwrap().recv().ok().map(Ok)
2017                })) as BoxStream<i32>
2018            }
2019        })
2020        .group_by(1, |_| 0u8, false)
2021        .run_with(Sink::queue())
2022        .unwrap();
2023
2024        // Kick off the producer: sends item 0, then waits for the outer
2025        // substream to be pulled (via a second channel) before sending more.
2026        let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
2027        let outer_thread = thread::spawn(move || {
2028            let substream = outer.pull().unwrap().expect("expected a substream");
2029            sub_tx.send(substream).unwrap();
2030        });
2031
2032        // Send the first item — the worker processes it and (with the fix)
2033        // immediately emits the substream to outer.
2034        tx.send(0).unwrap();
2035
2036        // The outer pull must complete within the timeout.
2037        let substream = sub_rx
2038            .recv_timeout(StdDuration::from_secs(5))
2039            .expect("timed out — group_by buffered first element before emitting substream");
2040
2041        // Now deliver the remaining items and close.
2042        for i in 1..100_i32 {
2043            tx.send(i).unwrap();
2044        }
2045        drop(tx);
2046
2047        let items: Vec<i32> = substream.run_collect().unwrap();
2048        assert_eq!(items.len(), 100);
2049        outer_thread.join().unwrap();
2050    }
2051
2052    #[test]
2053    fn group_by_concurrent_live_substreams_do_not_hold_ready_item_stress() {
2054        const STREAMS: usize = 32;
2055        const ROUNDS: usize = 8;
2056        const ITEMS: i64 = 8;
2057
2058        for _ in 0..ROUNDS {
2059            let barrier = StdArc::new(std::sync::Barrier::new(STREAMS));
2060            let mut handles = Vec::with_capacity(STREAMS);
2061
2062            for _ in 0..STREAMS {
2063                let barrier = StdArc::clone(&barrier);
2064                handles.push(thread::spawn(move || {
2065                    let (tx, rx) = mpsc::sync_channel::<i64>(0);
2066                    let rx = StdArc::new(std::sync::Mutex::new(rx));
2067
2068                    let outer = Source::from_factory({
2069                        let rx = StdArc::clone(&rx);
2070                        move || {
2071                            let rx = StdArc::clone(&rx);
2072                            Box::new(std::iter::from_fn(move || {
2073                                rx.lock().unwrap().recv().ok().map(Ok)
2074                            })) as BoxStream<i64>
2075                        }
2076                    })
2077                    .group_by(1, |_| 0_u8, false)
2078                    .run_with(Sink::queue())
2079                    .unwrap();
2080
2081                    barrier.wait();
2082
2083                    tx.send(0).unwrap();
2084                    let substream = outer.pull().unwrap().expect("expected group_by substream");
2085                    let subqueue = substream.run_with(Sink::queue()).unwrap();
2086                    assert_eq!(subqueue.pull().unwrap(), Some(0));
2087
2088                    for item in 1..ITEMS {
2089                        tx.send(item).unwrap();
2090                        assert_eq!(subqueue.pull().unwrap(), Some(item));
2091                    }
2092                    drop(tx);
2093
2094                    assert!(subqueue.pull().unwrap().is_none());
2095                    assert!(outer.pull().unwrap().is_none());
2096                }));
2097            }
2098
2099            for handle in handles {
2100                handle.join().expect("group_by stress worker panicked");
2101            }
2102        }
2103    }
2104
2105    // Verify that split_when emits each substream as a live stream, NOT after
2106    // accumulating all segment elements into memory.  The channel is sized to
2107    // be larger than LIVE_SUBSTREAM_CAPACITY (256) so that, if the old Vec
2108    // pre-buffering path were active, the first substream would never be emitted
2109    // before the boundary.  With live substreams the substream is visible
2110    // immediately upon the first item.
2111    #[test]
2112    fn split_when_emits_substream_before_segment_ends() {
2113        // 512 > LIVE_SUBSTREAM_CAPACITY (256): the live substream will block
2114        // on backpressure mid-segment, but that's fine — the outer substream
2115        // IS emitted immediately and the consumer can drain it concurrently.
2116        const SEGMENT_LEN: usize = 300;
2117
2118        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2119        for i in 0..SEGMENT_LEN as i32 {
2120            tx.send(i).unwrap();
2121        }
2122        tx.send(-1).unwrap(); // boundary
2123        tx.send(99).unwrap(); // second segment
2124        drop(tx);
2125
2126        let outer = Source::from_iter(rx)
2127            .split_when(|item| *item == -1)
2128            .run_with(Sink::queue())
2129            .unwrap();
2130
2131        let (result_tx, result_rx) = mpsc::channel();
2132        thread::spawn(move || {
2133            let first = outer.pull().unwrap().expect("expected first substream");
2134            let items: Vec<i32> = first.run_collect().unwrap();
2135            let second = outer.pull().unwrap().expect("expected second substream");
2136            let items2: Vec<i32> = second.run_collect().unwrap();
2137            let done = outer.pull().unwrap().is_none();
2138            let _ = result_tx.send((items, items2, done));
2139        });
2140
2141        let (items, items2, done) = result_rx
2142            .recv_timeout(StdDuration::from_secs(5))
2143            .expect("timed out — split_when is buffering the whole segment");
2144        assert_eq!(items.len(), SEGMENT_LEN);
2145        assert_eq!(items2, vec![-1, 99]);
2146        assert!(done);
2147    }
2148
2149    #[test]
2150    fn split_after_emits_substream_before_segment_ends() {
2151        const SEGMENT_LEN: usize = 300;
2152
2153        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2154        for i in 0..SEGMENT_LEN as i32 {
2155            tx.send(i).unwrap();
2156        }
2157        tx.send(-1).unwrap(); // boundary (included in first segment for split_after)
2158        tx.send(99).unwrap();
2159        drop(tx);
2160
2161        let outer = Source::from_iter(rx)
2162            .split_after(|item| *item == -1)
2163            .run_with(Sink::queue())
2164            .unwrap();
2165
2166        let (result_tx, result_rx) = mpsc::channel();
2167        thread::spawn(move || {
2168            let first = outer.pull().unwrap().expect("expected first substream");
2169            let items: Vec<i32> = first.run_collect().unwrap();
2170            let second = outer.pull().unwrap().expect("expected second substream");
2171            let items2: Vec<i32> = second.run_collect().unwrap();
2172            let done = outer.pull().unwrap().is_none();
2173            let _ = result_tx.send((items, items2, done));
2174        });
2175
2176        let (items, items2, done) = result_rx
2177            .recv_timeout(StdDuration::from_secs(5))
2178            .expect("timed out — split_after is buffering the whole segment");
2179        // split_after includes the boundary element in the first segment.
2180        assert_eq!(items.len(), SEGMENT_LEN + 1);
2181        assert_eq!(items2, vec![99]);
2182        assert!(done);
2183    }
2184
2185    // Stress test: run flat_map_merge 20 times with many concurrent inner
2186    // streams to confirm the fixed coordinator tail loop never hangs.
2187    // Intended to be run with --test-threads=32 to expose the race.
2188    #[test]
2189    fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
2190        for _ in 0..20 {
2191            let result = Source::from_iter(0..50_i32)
2192                .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
2193                .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
2194                .unwrap()
2195                .wait();
2196            // Each i in 0..50 contributes i + (i+1) + (i+2) = 3i + 3.
2197            // Sum = 3 * (0+1+..+49) + 3*50 = 3*1225 + 150 = 3675 + 150 = 3825.
2198            assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
2199        }
2200    }
2201
2202    // Stress test for the single-mutex flat_map_merge refactor.
2203    // 20 runs with high concurrency to verify no deadlock, lost wakeup,
2204    // or window-accounting corruption under the merged-lock design.
2205    // Intended to be run with --test-threads=32.
2206    #[test]
2207    fn flat_map_merge_single_mutex_race_stress() {
2208        for _ in 0..20 {
2209            let result = Source::from_iter(0..100_i64)
2210                .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
2211                .run_with(Sink::fold(0i64, |acc, v| acc + v))
2212                .unwrap()
2213                .wait();
2214            // sum of item + (item+1000) for item in 0..100
2215            // = sum(0..100) + sum(0..100) + 100*1000
2216            // = 4950 + 4950 + 100000 = 109900
2217            assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
2218        }
2219    }
2220
2221    // Bounded-memory test for split_when batching: verify that items are
2222    // delivered to a slow consumer and not held indefinitely in the writer
2223    // buffer.  Uses a rendezvous-style channel where the producer sends MANY
2224    // items into the outer queue (larger than LIVE_SUBSTREAM_BATCH = 64), then
2225    // blocks.  The consumer must be able to drain the first segment fully — i.e.
2226    // the writer must have flushed even when the batch was smaller than 64.
2227    #[test]
2228    fn split_when_bounded_memory_rendezvous() {
2229        // Segment of 100 items (> LIVE_SUBSTREAM_BATCH=64): the writer will
2230        // flush at 64, then again at the boundary (remaining 36).
2231        const SEGMENT: usize = 100;
2232        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
2233        for i in 0..SEGMENT as i32 {
2234            tx.send(i).unwrap();
2235        }
2236        tx.send(-1).unwrap(); // boundary (split_when: starts new segment)
2237        // Second segment
2238        for i in 0..10_i32 {
2239            tx.send(i).unwrap();
2240        }
2241        drop(tx);
2242
2243        let outer = Source::from_iter(rx)
2244            .split_when(|item| *item == -1)
2245            .run_with(Sink::queue())
2246            .unwrap();
2247
2248        let (result_tx, result_rx) = mpsc::channel();
2249        thread::spawn(move || {
2250            let first = outer.pull().unwrap().expect("first segment");
2251            let seg1: Vec<i32> = first.run_collect().unwrap();
2252            let second = outer.pull().unwrap().expect("second segment");
2253            let seg2: Vec<i32> = second.run_collect().unwrap();
2254            let done = outer.pull().unwrap().is_none();
2255            result_tx.send((seg1, seg2, done)).unwrap();
2256        });
2257
2258        let (seg1, seg2, done) = result_rx
2259            .recv_timeout(StdDuration::from_secs(5))
2260            .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
2261        assert_eq!(seg1.len(), SEGMENT, "first segment length");
2262        assert_eq!(seg2[0], -1, "boundary element starts second segment");
2263        assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
2264        assert!(done);
2265    }
2266
2267    // Bounded-memory test for group_by batching: verifies that a slow consumer
2268    // waiting on a single-key substream eventually receives all items (i.e. the
2269    // write batch is flushed, not held indefinitely).
2270    #[test]
2271    fn group_by_single_key_bounded_memory_rendezvous() {
2272        // 200 items, all same key, batched in groups of 64 by the writer.
2273        // The consumer blocks on run_collect(); all items must arrive.
2274        const N: usize = 200;
2275        let outer = Source::from_iter(0..N as i64)
2276            .group_by(1, |_| 0u8, false)
2277            .run_with(Sink::queue())
2278            .unwrap();
2279
2280        let (result_tx, result_rx) = mpsc::channel();
2281        thread::spawn(move || {
2282            let substream = outer.pull().unwrap().expect("substream");
2283            let items: Vec<i64> = substream.run_collect().unwrap();
2284            let done = outer.pull().unwrap().is_none();
2285            result_tx.send((items, done)).unwrap();
2286        });
2287
2288        let (items, done) = result_rx
2289            .recv_timeout(StdDuration::from_secs(5))
2290            .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
2291        assert_eq!(items.len(), N, "all items delivered");
2292        assert_eq!(items[0], 0);
2293        assert_eq!(items[N - 1], (N - 1) as i64);
2294        assert!(done);
2295    }
2296
2297    #[test]
2298    fn scan_emits_seed_and_accumulated_values() {
2299        let result = Source::from_iter(1..=3)
2300            .scan(0, |acc, item| acc + item)
2301            .run_collect()
2302            .unwrap();
2303
2304        assert_eq!(result, vec![0, 1, 3, 6]);
2305    }
2306
2307    #[test]
2308    fn limit_fails_after_max_elements() {
2309        let result = Source::from_iter(0..3).limit(2).run_collect();
2310
2311        assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
2312    }
2313
2314    #[test]
2315    fn limit_weighted_fails_with_limit_error_like_akka() {
2316        let result = Source::from_iter(["this", "is", "some", "string"])
2317            .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
2318            .run_collect();
2319
2320        assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
2321    }
2322
2323    #[test]
2324    fn grouped_weighted_allows_oversized_first_element_like_akka() {
2325        let result = Source::from_iter([10_usize, 1, 2])
2326            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2327            .run_collect()
2328            .unwrap();
2329
2330        assert_eq!(result, vec![vec![10], vec![1, 2]]);
2331    }
2332
2333    #[test]
2334    fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2335        let result = Source::from_iter([1_usize, 10, 2])
2336            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2337            .run_collect()
2338            .unwrap();
2339
2340        assert_eq!(result, vec![vec![1, 10], vec![2]]);
2341    }
2342
2343    #[test]
2344    fn sink_terminals_materialize_results() {
2345        let sum = Source::from_iter(1..=4)
2346            .run_with(Sink::fold(0, |acc, item| acc + item))
2347            .unwrap();
2348
2349        assert_eq!(wait(sum), 10);
2350        assert_eq!(
2351            wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2352            1
2353        );
2354        assert_eq!(
2355            wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2356            4
2357        );
2358    }
2359
2360    #[test]
2361    fn all_terminal_sink_variants_complete() {
2362        assert_eq!(
2363            wait(
2364                Source::from_iter([1, 2, 3])
2365                    .run_with(Sink::collect())
2366                    .unwrap()
2367            ),
2368            vec![1, 2, 3]
2369        );
2370        assert_eq!(
2371            wait(
2372                Source::<i32>::empty()
2373                    .run_with(Sink::head_option())
2374                    .unwrap()
2375            ),
2376            None
2377        );
2378        assert_eq!(
2379            wait(
2380                Source::from_iter([1, 2, 3])
2381                    .run_with(Sink::last_option())
2382                    .unwrap()
2383            ),
2384            Some(3)
2385        );
2386        assert_eq!(
2387            wait(
2388                Source::from_iter([1, 2, 3])
2389                    .run_with(Sink::reduce(|acc, item| acc + item))
2390                    .unwrap()
2391            ),
2392            6
2393        );
2394
2395        let seen = StdArc::new(StdAtomicUsize::new(0));
2396        let seen_by_sink = StdArc::clone(&seen);
2397        assert_eq!(
2398            wait(
2399                Source::from_iter([1_usize, 2, 3])
2400                    .run_with(Sink::foreach(move |item| {
2401                        seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2402                    }))
2403                    .unwrap()
2404            ),
2405            NotUsed
2406        );
2407        assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2408    }
2409
2410    #[test]
2411    fn take_last_zero_returns_empty_vector() {
2412        let result = Source::from_iter([1, 2, 3])
2413            .run_with(Sink::take_last(0))
2414            .unwrap();
2415
2416        assert_eq!(wait(result), Vec::<i32>::new());
2417    }
2418
2419    #[test]
2420    fn bounded_head_terminals_complete_inline() {
2421        let materializer = Materializer::new();
2422
2423        let mut head = Source::from_iter(0_u64..1_000)
2424            .run_with_materializer(Sink::head(), &materializer)
2425            .unwrap();
2426        assert_eq!(materializer.active_streams(), 0);
2427        assert_eq!(head.try_wait(), Some(Ok(0)));
2428
2429        let mut filtered_head = Source::from_iter(0_u64..1_000)
2430            .filter(|item| *item >= 10)
2431            .run_with_materializer(Sink::head(), &materializer)
2432            .unwrap();
2433        assert_eq!(materializer.active_streams(), 0);
2434        assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2435
2436        let mut head_option = Source::<u64>::empty()
2437            .run_with_materializer(Sink::head_option(), &materializer)
2438            .unwrap();
2439        assert_eq!(materializer.active_streams(), 0);
2440        assert_eq!(head_option.try_wait(), Some(Ok(None)));
2441    }
2442
2443    #[test]
2444    fn bounded_head_fast_path_preserves_terminal_errors() {
2445        let materializer = Materializer::new();
2446
2447        let mut empty = Source::<u64>::empty()
2448            .run_with_materializer(Sink::head(), &materializer)
2449            .unwrap();
2450        assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2451
2452        let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2453            .run_with_materializer(Sink::head(), &materializer)
2454            .unwrap();
2455        assert_eq!(
2456            failed.try_wait(),
2457            Some(Err(StreamError::Failed("boom".into())))
2458        );
2459        assert_eq!(materializer.active_streams(), 0);
2460    }
2461
2462    #[test]
2463    fn runnable_graph_composes_source_and_sink() {
2464        let graph = Source::from_iter(1..=4)
2465            .map(|item| item * 2)
2466            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2467
2468        assert_eq!(wait(graph.run().unwrap()), 20);
2469
2470        let graph = Source::single(1)
2471            .map_materialized_value(|_| 20)
2472            .to(Sink::ignore())
2473            .map_materialized_value(|value| value + 1);
2474        assert_eq!(graph.run().unwrap(), 21);
2475
2476        let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2477        assert_eq!(ignored, NotUsed);
2478    }
2479
2480    #[test]
2481    fn materialized_values_follow_keep_defaults() {
2482        let source = Source::single(1).map_materialized_value(|_| "source");
2483        let flow = Flow::identity().map_materialized_value(|_| "flow");
2484
2485        let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2486        assert_eq!(source_mat.unwrap(), "source");
2487
2488        let combined = source
2489            .via_mat(flow, Keep::both)
2490            .to_mat(Sink::ignore(), Keep::both)
2491            .run()
2492            .unwrap();
2493        assert_eq!(combined.0, ("source", "flow"));
2494        assert_eq!(wait(combined.1), NotUsed);
2495
2496        let sink_mat = Source::single(41)
2497            .map_materialized_value(|_| "ignored source")
2498            .run_with(Sink::fold(1, |acc, item| acc + item))
2499            .unwrap();
2500        assert_eq!(wait(sink_mat), 42);
2501    }
2502
2503    #[test]
2504    fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2505        let sink = Flow::identity()
2506            .map(|item: i32| item + 1)
2507            .map_materialized_value(|_| "flow")
2508            .to(Sink::fold(0, |acc, item| acc + item));
2509
2510        let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2511
2512        assert_eq!(materialized, "flow");
2513        let explicit = Flow::identity()
2514            .map(|item: i32| item + 1)
2515            .map_materialized_value(|_| "flow")
2516            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2517            .run_with(Source::from_iter([1, 2, 3]))
2518            .unwrap();
2519        assert_eq!(explicit, NotUsed);
2520
2521        let explicit = Source::from_iter([1, 2, 3])
2522            .run_with(
2523                Flow::identity()
2524                    .map(|item: i32| item + 1)
2525                    .map_materialized_value(|_| "flow")
2526                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2527            )
2528            .unwrap();
2529        assert_eq!(explicit.0, "flow");
2530        assert_eq!(wait(explicit.1), 9);
2531    }
2532
2533    #[test]
2534    fn materializer_shutdown_fails_materialization() {
2535        let materializer = Materializer::new();
2536        let named = materializer.with_name_prefix("test-stream");
2537        materializer.shutdown();
2538
2539        let graph = Source::single(1).to(Sink::ignore());
2540
2541        assert_eq!(named.name_prefix(), "test-stream");
2542        assert_eq!(
2543            graph.run_with_materializer(&named),
2544            Err(StreamError::AbruptTermination)
2545        );
2546    }
2547
2548    #[test]
2549    fn materializer_shutdown_fails_running_stream_completion() {
2550        let materializer = Materializer::new();
2551        let completion = Source::repeat(1)
2552            .run_with_materializer(Sink::ignore(), &materializer)
2553            .unwrap();
2554
2555        assert_eq!(materializer.active_streams(), 1);
2556        materializer.shutdown();
2557        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2558        assert_eq!(materializer.active_streams(), 0);
2559    }
2560
2561    #[test]
2562    fn dropped_stream_completion_cancels_running_stream() {
2563        let materializer = Materializer::new();
2564        let completion = Source::repeat(1)
2565            .run_with_materializer(Sink::ignore(), &materializer)
2566            .unwrap();
2567
2568        assert_eq!(materializer.active_streams(), 1);
2569        drop(completion);
2570        for _ in 0..50 {
2571            if materializer.active_streams() == 0 {
2572                break;
2573            }
2574            thread::sleep(Duration::from_millis(5));
2575        }
2576        assert_eq!(materializer.active_streams(), 0);
2577    }
2578
2579    #[test]
2580    fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2581        let materializer = Materializer::new();
2582        let (once_tx, once_rx) = mpsc::channel();
2583        let once = materializer.schedule_once(Duration::from_millis(5), move || {
2584            once_tx.send(()).unwrap();
2585        });
2586        once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2587        assert!(!once.is_cancelled());
2588
2589        let (cancelled_tx, cancelled_rx) = mpsc::channel();
2590        let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2591            cancelled_tx.send(()).unwrap();
2592        });
2593        assert!(cancelled.cancel());
2594        assert!(!cancelled.cancel());
2595        assert!(cancelled.is_cancelled());
2596        assert!(
2597            cancelled_rx
2598                .recv_timeout(Duration::from_millis(75))
2599                .is_err()
2600        );
2601
2602        let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2603        let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2604        let fixed_delay = materializer.schedule_with_fixed_delay(
2605            Duration::from_millis(1),
2606            Duration::from_millis(5),
2607            move || {
2608                fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2609            },
2610        );
2611        thread::sleep(Duration::from_millis(25));
2612        assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2613        fixed_delay.cancel();
2614
2615        let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2616        let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2617        let fixed_rate = materializer.schedule_at_fixed_rate(
2618            Duration::from_millis(1),
2619            Duration::from_millis(5),
2620            move || {
2621                fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2622            },
2623        );
2624        thread::sleep(Duration::from_millis(25));
2625        assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2626        fixed_rate.cancel();
2627
2628        let shutdown_materializer = Materializer::new();
2629        let (shutdown_tx, shutdown_rx) = mpsc::channel();
2630        shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2631            shutdown_tx.send(()).unwrap();
2632        });
2633        shutdown_materializer.shutdown();
2634        assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2635    }
2636
2637    #[test]
2638    fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2639        use std::sync::{Condvar, Mutex};
2640
2641        #[derive(Debug)]
2642        enum TimerEvent {
2643            Started(usize, Instant),
2644            Completed(usize, Instant),
2645        }
2646
2647        let recv_event = |rx: &mpsc::Receiver<TimerEvent>, label: &str| {
2648            rx.recv_timeout(Duration::from_secs(20))
2649                .unwrap_or_else(|err| panic!("{label}: expected timer event within 20 s: {err}"))
2650        };
2651        let release = |gate: &StdArc<(Mutex<bool>, Condvar)>| {
2652            let (released, condvar) = &**gate;
2653            let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2654            *released = true;
2655            condvar.notify_all();
2656        };
2657
2658        let interval = Duration::from_secs(2);
2659        let overrun = interval + Duration::from_millis(250);
2660
2661        let rate_materializer = Materializer::new();
2662        let (rate_tx, rate_rx) = mpsc::channel();
2663        let rate_runs = StdArc::new(StdAtomicUsize::new(0));
2664        let rate_task_runs = StdArc::clone(&rate_runs);
2665        let rate_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2666        let rate_task_gate = StdArc::clone(&rate_gate);
2667        let fixed_rate =
2668            rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2669                let run = rate_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2670                rate_tx
2671                    .send(TimerEvent::Started(run, Instant::now()))
2672                    .unwrap();
2673                if run == 1 {
2674                    let (released, condvar) = &*rate_task_gate;
2675                    let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2676                    while !*released {
2677                        released = condvar
2678                            .wait(released)
2679                            .unwrap_or_else(|poison| poison.into_inner());
2680                    }
2681                    rate_tx
2682                        .send(TimerEvent::Completed(run, Instant::now()))
2683                        .unwrap();
2684                }
2685            });
2686        let rate_first_started = match recv_event(&rate_rx, "fixed-rate first task") {
2687            TimerEvent::Started(1, at) => at,
2688            other => panic!("fixed-rate first task: unexpected event {other:?}"),
2689        };
2690        assert!(wait_until(Duration::from_secs(20), || {
2691            rate_first_started.elapsed() >= overrun
2692        }));
2693        release(&rate_gate);
2694        let rate_first_completed = match recv_event(&rate_rx, "fixed-rate first completion") {
2695            TimerEvent::Completed(1, at) => at,
2696            other => panic!("fixed-rate first completion: unexpected event {other:?}"),
2697        };
2698        let rate_second_started = match recv_event(&rate_rx, "fixed-rate second task") {
2699            TimerEvent::Started(2, at) => at,
2700            other => panic!("fixed-rate second task: unexpected event {other:?}"),
2701        };
2702        fixed_rate.cancel();
2703        rate_materializer.shutdown();
2704
2705        let delay_materializer = Materializer::new();
2706        let (delay_tx, delay_rx) = mpsc::channel();
2707        let delay_runs = StdArc::new(StdAtomicUsize::new(0));
2708        let delay_task_runs = StdArc::clone(&delay_runs);
2709        let delay_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2710        let delay_task_gate = StdArc::clone(&delay_gate);
2711        let fixed_delay =
2712            delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2713                let run = delay_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2714                delay_tx
2715                    .send(TimerEvent::Started(run, Instant::now()))
2716                    .unwrap();
2717                if run == 1 {
2718                    let (released, condvar) = &*delay_task_gate;
2719                    let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2720                    while !*released {
2721                        released = condvar
2722                            .wait(released)
2723                            .unwrap_or_else(|poison| poison.into_inner());
2724                    }
2725                    delay_tx
2726                        .send(TimerEvent::Completed(run, Instant::now()))
2727                        .unwrap();
2728                }
2729            });
2730        let delay_first_started = match recv_event(&delay_rx, "fixed-delay first task") {
2731            TimerEvent::Started(1, at) => at,
2732            other => panic!("fixed-delay first task: unexpected event {other:?}"),
2733        };
2734        assert!(wait_until(Duration::from_secs(20), || {
2735            delay_first_started.elapsed() >= overrun
2736        }));
2737        release(&delay_gate);
2738        let delay_first_completed = match recv_event(&delay_rx, "fixed-delay first completion") {
2739            TimerEvent::Completed(1, at) => at,
2740            other => panic!("fixed-delay first completion: unexpected event {other:?}"),
2741        };
2742        let delay_second_started = match recv_event(&delay_rx, "fixed-delay second task") {
2743            TimerEvent::Started(2, at) => at,
2744            other => panic!("fixed-delay second task: unexpected event {other:?}"),
2745        };
2746        fixed_delay.cancel();
2747        delay_materializer.shutdown();
2748
2749        let rate_task_time = rate_first_completed.duration_since(rate_first_started);
2750        let rate_catch_up = rate_second_started.duration_since(rate_first_completed);
2751        let delay_task_time = delay_first_completed.duration_since(delay_first_started);
2752        let delay_gap = delay_second_started.duration_since(delay_first_completed);
2753        assert!(
2754            rate_task_time >= interval,
2755            "fixed-rate first task should overrun its interval; ran for {rate_task_time:?}"
2756        );
2757        assert!(
2758            rate_catch_up < interval,
2759            "fixed-rate second task should catch up after an overrun; waited {rate_catch_up:?}"
2760        );
2761        assert!(
2762            delay_task_time >= interval,
2763            "fixed-delay first task should overrun its interval; ran for {delay_task_time:?}"
2764        );
2765        assert!(
2766            delay_gap >= interval,
2767            "fixed-delay second task fired before one full delay elapsed after completion: {delay_gap:?}",
2768        );
2769    }
2770
2771    #[test]
2772    fn runtime_repeating_timer_cancellation_stops_future_fires() {
2773        let materializer = Materializer::new();
2774        let (tx, rx) = mpsc::channel();
2775        let timer = materializer.schedule_at_fixed_rate(
2776            Duration::from_millis(1),
2777            Duration::from_millis(30),
2778            move || {
2779                tx.send(()).unwrap();
2780            },
2781        );
2782
2783        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2784        assert!(timer.cancel());
2785        assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2786        materializer.shutdown();
2787    }
2788
2789    #[test]
2790    fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2791        let materializer = Materializer::new();
2792        materializer.schedule_once(Duration::from_millis(1), || {
2793            panic!("timer boom");
2794        });
2795
2796        let (tx, rx) = mpsc::channel();
2797        materializer.schedule_once(Duration::from_millis(20), move || {
2798            tx.send(()).unwrap();
2799        });
2800
2801        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2802        materializer.shutdown();
2803    }
2804
2805    #[test]
2806    fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2807        let materializer = Materializer::new();
2808        let panic_count = StdArc::new(StdAtomicUsize::new(0));
2809        let panic_count_task = StdArc::clone(&panic_count);
2810        materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2811            panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2812            panic!("fixed-rate boom");
2813        });
2814
2815        assert!(wait_until(Duration::from_millis(150), || {
2816            panic_count.load(StdOrdering::SeqCst) == 1
2817        }));
2818
2819        let (tx, rx) = mpsc::channel();
2820        materializer.schedule_once(Duration::from_millis(30), move || {
2821            tx.send(()).unwrap();
2822        });
2823        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2824
2825        thread::sleep(Duration::from_millis(90));
2826        assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2827        materializer.shutdown();
2828    }
2829
2830    #[test]
2831    fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2832        let materializer = Materializer::new();
2833        let started = StdArc::new(StdAtomicBool::new(false));
2834        let started_task = StdArc::clone(&started);
2835        let slow_timer = materializer.schedule_at_fixed_rate(
2836            Duration::ZERO,
2837            Duration::from_millis(250),
2838            move || {
2839                started_task.store(true, StdOrdering::SeqCst);
2840                thread::sleep(Duration::from_millis(200));
2841            },
2842        );
2843
2844        assert!(wait_until(Duration::from_millis(100), || {
2845            started.load(StdOrdering::SeqCst)
2846        }));
2847
2848        let start = Instant::now();
2849        let (tx, rx) = mpsc::channel();
2850        materializer.schedule_once(Duration::from_millis(10), move || {
2851            tx.send(Instant::now()).unwrap();
2852        });
2853        let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2854        let elapsed = fired_at.duration_since(start);
2855
2856        slow_timer.cancel();
2857        materializer.shutdown();
2858        assert!(
2859            elapsed < Duration::from_millis(150),
2860            "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2861        );
2862    }
2863
2864    #[test]
2865    fn runtime_shutdown_stops_timer_driver_thread() {
2866        let materializer = Materializer::new();
2867        assert!(wait_until(Duration::from_secs(1), || materializer
2868            .timer_driver_is_live()));
2869
2870        materializer.shutdown();
2871        assert!(wait_until(Duration::from_secs(2), || !materializer
2872            .timer_driver_is_live()));
2873    }
2874
2875    #[test]
2876    fn runtime_timer_driver_orders_many_timers_by_deadline() {
2877        let materializer = Materializer::new();
2878        let (tx, rx) = mpsc::channel();
2879        let schedule = [(450_u64, 4_u8), (50, 1), (350, 3), (150, 2), (550, 5)];
2880
2881        for (delay_ms, value) in schedule {
2882            let tx = tx.clone();
2883            materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2884                tx.send(value).unwrap();
2885            });
2886        }
2887        drop(tx);
2888
2889        let mut received = Vec::new();
2890        for _ in 0..schedule.len() {
2891            received.push(rx.recv_timeout(Duration::from_secs(10)).unwrap());
2892        }
2893        materializer.shutdown();
2894
2895        assert_eq!(received, vec![1, 2, 3, 4, 5]);
2896    }
2897
2898    #[test]
2899    fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
2900        let materializer = Materializer::new();
2901        let thread_name = materializer.timer_thread_name().to_owned();
2902        // Linux exposes thread names through /proc/self/task/*/comm with the
2903        // kernel's 15-byte task-name limit. Once a full test process has
2904        // created enough runtimes, `datum-timer-1000` is reported as
2905        // `datum-timer-100`, so compare against the Linux-visible name and use
2906        // the observed baseline for collision-safe assertions.
2907        let linux_thread_name = thread_name.chars().take(15).collect::<String>();
2908        assert!(wait_until(Duration::from_secs(5), || {
2909            materializer.timer_driver_is_live() && linux_thread_count(&linux_thread_name) >= 1
2910        }));
2911        let live_timer_threads = linux_thread_count(&linux_thread_name);
2912
2913        for _ in 0..128 {
2914            materializer.schedule_once(Duration::from_secs(60), || {});
2915        }
2916
2917        assert!(
2918            wait_until(Duration::from_secs(5), || {
2919                materializer.timer_driver_is_live()
2920                    && linux_thread_count(&linux_thread_name) == live_timer_threads
2921            }),
2922            "scheduling timers should not create extra timer threads for a runtime",
2923        );
2924        materializer.shutdown();
2925        assert!(wait_until(Duration::from_secs(5), || {
2926            !materializer.timer_driver_is_live()
2927                && linux_thread_count(&linux_thread_name) < live_timer_threads
2928        }));
2929    }
2930
2931    #[test]
2932    fn cancelled_and_never_sinks_have_distinct_materialization_results() {
2933        assert_eq!(
2934            Source::repeat(1)
2935                .run_with(Sink::cancelled())
2936                .expect("cancelled sink materializes"),
2937            NotUsed
2938        );
2939        assert_eq!(
2940            Source::single(1)
2941                .run_with(Sink::never())
2942                .expect("never sink materializes")
2943                .try_wait(),
2944            None
2945        );
2946    }
2947
2948    #[test]
2949    fn never_sink_finishes_on_materializer_shutdown() {
2950        let materializer = Materializer::new();
2951        let completion = Source::single(1)
2952            .run_with_materializer(Sink::never(), &materializer)
2953            .unwrap();
2954
2955        materializer.shutdown();
2956        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2957    }
2958
2959    #[test]
2960    fn dropping_source_never_completion_releases_parked_worker() {
2961        let materializer = Materializer::new();
2962        let completion = Source::<i32>::never()
2963            .run_with_materializer(Sink::ignore(), &materializer)
2964            .unwrap();
2965
2966        assert!(wait_until(StdDuration::from_secs(1), || {
2967            materializer.active_streams() == 1
2968        }));
2969        assert_eq!(materializer.active_streams(), 1);
2970
2971        drop(completion);
2972
2973        assert!(wait_until(StdDuration::from_secs(15), || {
2974            materializer.active_streams() == 0
2975        }));
2976        assert_eq!(materializer.active_streams(), 0);
2977    }
2978
2979    #[test]
2980    fn future_and_maybe_sources_emit_values() {
2981        let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
2982        assert_eq!(future_value, vec![7]);
2983
2984        let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
2985            .run_collect()
2986            .unwrap();
2987        assert_eq!(future_source, vec![1, 2, 3]);
2988
2989        let (handle, source) = Source::maybe();
2990        assert_eq!(
2991            source.clone().run_collect(),
2992            Err(StreamError::MaybeIncomplete)
2993        );
2994        handle.complete(9).unwrap();
2995        assert_eq!(source.run_collect().unwrap(), vec![9]);
2996    }
2997
2998    #[test]
2999    fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
3000        assert_eq!(
3001            Source::cycle(|| [1, 2, 3].into_iter())
3002                .take(8)
3003                .run_collect()
3004                .unwrap(),
3005            vec![1, 2, 3, 1, 2, 3, 1, 2]
3006        );
3007        assert_eq!(
3008            Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
3009            Err(StreamError::Failed("empty iterator".into()))
3010        );
3011        assert_eq!(
3012            Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
3013                .run_collect()
3014                .unwrap(),
3015            vec![0, 1, 2, 3]
3016        );
3017        assert_eq!(
3018            Source::unfold_async(0, |state| async move {
3019                Ok((state < 4).then_some((state + 1, state * 2)))
3020            })
3021            .run_collect()
3022            .unwrap(),
3023            vec![0, 2, 4, 6]
3024        );
3025        assert!(matches!(
3026            Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
3027            Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
3028        ));
3029    }
3030
3031    #[test]
3032    fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
3033        let created = StdArc::new(StdAtomicUsize::new(0));
3034        let created_for_source = StdArc::clone(&created);
3035        let source = Source::<i32>::lazy_source(move || {
3036            created_for_source.fetch_add(1, StdOrdering::SeqCst);
3037            Source::from_iter([7, 8]).map_materialized_value(|_| 99)
3038        });
3039        let materializer = Materializer::new();
3040        let (mut stream, mut mat) = StdArc::clone(&source.factory)
3041            .create(&materializer)
3042            .unwrap();
3043
3044        assert_eq!(created.load(StdOrdering::SeqCst), 0);
3045        assert!(mat.try_wait().is_none());
3046        assert_eq!(stream.next().unwrap().unwrap(), 7);
3047        assert_eq!(mat.wait().unwrap(), 99);
3048        assert_eq!(created.load(StdOrdering::SeqCst), 1);
3049        assert_eq!(stream.next().unwrap().unwrap(), 8);
3050
3051        let never_created = StdArc::new(StdAtomicUsize::new(0));
3052        let never_created_for_source = StdArc::clone(&never_created);
3053        let mat = Source::<i32>::lazy_future_source(move || {
3054            never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
3055            async { Ok(Source::single(1)) }
3056        })
3057        .to(Sink::cancelled())
3058        .run()
3059        .unwrap();
3060        assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
3061        assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
3062
3063        let lazy_future = StdArc::new(StdAtomicUsize::new(0));
3064        let lazy_future_for_source = StdArc::clone(&lazy_future);
3065        let source = Source::lazy_future(move || {
3066            lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
3067            async { Ok(42) }
3068        });
3069        let (mut stream, _) = StdArc::clone(&source.factory)
3070            .create(&Materializer::new())
3071            .unwrap();
3072        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
3073        assert_eq!(stream.next().unwrap().unwrap(), 42);
3074        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
3075    }
3076
3077    #[test]
3078    fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
3079        let closed = StdArc::new(StdAtomicUsize::new(0));
3080        let closed_on_complete = StdArc::clone(&closed);
3081        let values = Source::unfold_resource(
3082            || Ok(std::collections::VecDeque::from([1, 2, 3])),
3083            |items| Ok(items.pop_front()),
3084            move |_items| {
3085                closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
3086                Ok(())
3087            },
3088        )
3089        .run_collect()
3090        .unwrap();
3091        assert_eq!(values, vec![1, 2, 3]);
3092        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3093
3094        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3095        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3096        let failed = Source::<i32>::unfold_resource(
3097            || Ok(()),
3098            |_| Err(StreamError::Failed("read".into())),
3099            move |_| {
3100                closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
3101                Err(StreamError::Failed("close".into()))
3102            },
3103        )
3104        .run_collect();
3105        assert_eq!(failed, Err(StreamError::Failed("read".into())));
3106        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3107
3108        let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
3109        let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
3110        let first = Source::unfold_resource(
3111            || Ok(0_usize),
3112            |next| {
3113                let item = *next;
3114                *next += 1;
3115                Ok(Some(item))
3116            },
3117            move |_| {
3118                closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
3119                Ok(())
3120            },
3121        )
3122        .run_with(Sink::head())
3123        .unwrap();
3124        assert_eq!(first.wait().unwrap(), 0);
3125        assert!(wait_until(Duration::from_millis(250), || {
3126            closed_on_cancel.load(StdOrdering::SeqCst) == 1
3127        }));
3128    }
3129
3130    #[test]
3131    fn wp6b_async_resource_and_async_accumulators_are_sequential() {
3132        let closed = StdArc::new(StdAtomicUsize::new(0));
3133        let closed_for_close = StdArc::clone(&closed);
3134        let values = Source::unfold_resource_async(
3135            || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
3136            |items| {
3137                let item = items.pop_front();
3138                async move { Ok(item) }
3139            },
3140            move |_items| {
3141                let closed = StdArc::clone(&closed_for_close);
3142                async move {
3143                    closed.fetch_add(1, StdOrdering::SeqCst);
3144                    Ok(())
3145                }
3146            },
3147        )
3148        .run_collect()
3149        .unwrap();
3150        assert_eq!(values, vec![1, 2, 3]);
3151        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3152
3153        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3154        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3155        let failed = Source::<i32>::unfold_resource_async(
3156            || async { Ok(()) },
3157            |_resource| async { Err(StreamError::Failed("read".into())) },
3158            move |_resource| {
3159                let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
3160                async move {
3161                    closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
3162                    Err(StreamError::Failed("close".into()))
3163                }
3164            },
3165        )
3166        .run_collect();
3167        assert_eq!(failed, Err(StreamError::Failed("read".into())));
3168        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3169
3170        let active = StdArc::new(StdAtomicUsize::new(0));
3171        let max_active = StdArc::new(StdAtomicUsize::new(0));
3172        let active_for_stage = StdArc::clone(&active);
3173        let max_for_stage = StdArc::clone(&max_active);
3174        let scanned = Source::from_iter(1..=4)
3175            .scan_async(0, move |acc, item| {
3176                let active = StdArc::clone(&active_for_stage);
3177                let max_active = StdArc::clone(&max_for_stage);
3178                async move {
3179                    let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3180                    max_active.fetch_max(now, StdOrdering::SeqCst);
3181                    tokio::time::sleep(Duration::from_millis(1)).await;
3182                    active.fetch_sub(1, StdOrdering::SeqCst);
3183                    Ok(acc + item)
3184                }
3185            })
3186            .run_collect()
3187            .unwrap();
3188        assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
3189        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3190
3191        let folded = Source::from_iter(1..=4)
3192            .fold_async(0, |acc, item| async move { Ok(acc + item) })
3193            .run_collect()
3194            .unwrap();
3195        assert_eq!(folded, vec![10]);
3196    }
3197
3198    #[test]
3199    fn wp6b_fold_async_materialization_does_not_drain_upstream() {
3200        let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
3201        let started = StdArc::new(StdAtomicBool::new(false));
3202        let source = {
3203            let release = StdArc::clone(&release);
3204            let started = StdArc::clone(&started);
3205            Source::from_factory(move || {
3206                let release = StdArc::clone(&release);
3207                let started = StdArc::clone(&started);
3208                let mut emitted = false;
3209                Box::new(std::iter::from_fn(move || {
3210                    if emitted {
3211                        return None;
3212                    }
3213                    emitted = true;
3214                    started.store(true, StdOrdering::SeqCst);
3215                    let (released, available) = &*release;
3216                    let mut released = released.lock().unwrap();
3217                    while !*released {
3218                        released = available.wait(released).unwrap();
3219                    }
3220                    Some(Ok(1))
3221                }))
3222            })
3223        };
3224
3225        let (materialized_tx, materialized_rx) = mpsc::channel();
3226        let join = thread::spawn(move || {
3227            let queue = source
3228                .fold_async(0, |acc, item| async move { Ok(acc + item) })
3229                .run_with(Sink::queue())
3230                .unwrap();
3231            materialized_tx.send(queue).unwrap();
3232        });
3233
3234        let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
3235            Ok(queue) => queue,
3236            Err(error) => {
3237                let (released, available) = &*release;
3238                *released.lock().unwrap() = true;
3239                available.notify_all();
3240                let _ = join.join();
3241                panic!("fold_async materialization did not return before first pull: {error}");
3242            }
3243        };
3244        let (released, _) = &*release;
3245        assert!(
3246            !*released.lock().unwrap(),
3247            "test source was released before materialization returned"
3248        );
3249
3250        let (released, available) = &*release;
3251        *released.lock().unwrap() = true;
3252        available.notify_all();
3253        assert_eq!(queue.pull().unwrap(), Some(1));
3254        assert_eq!(queue.pull().unwrap(), None);
3255        join.join().unwrap();
3256        assert!(started.load(StdOrdering::SeqCst));
3257    }
3258
3259    #[test]
3260    fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
3261        let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
3262        let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
3263        let empty_sink = Source::<i32>::empty()
3264            .run_with(Sink::lazy_sink(move || {
3265                lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3266                Sink::ignore()
3267            }))
3268            .unwrap();
3269        assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
3270        assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
3271
3272        let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
3273        let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
3274        Source::from_iter([1_usize, 2, 3])
3275            .run_with(Sink::foreach_async(2, move |item| {
3276                let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
3277                async move {
3278                    foreach_sum.fetch_add(item, StdOrdering::SeqCst);
3279                    Ok(())
3280                }
3281            }))
3282            .unwrap()
3283            .wait()
3284            .unwrap();
3285        assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
3286
3287        let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
3288        let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
3289        let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
3290            lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3291            Flow::identity()
3292                .map(|item: i32| item + 10)
3293                .map_materialized_value(|_| 123)
3294        });
3295        let mat = (lazy_flow.materialize)().unwrap();
3296        let mut stream = match lazy_flow.transform {
3297            flow::FlowTransform::Runtime(transform) => {
3298                transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
3299            }
3300            flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
3301        };
3302        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
3303        assert_eq!(stream.next().unwrap().unwrap(), 11);
3304        assert_eq!(mat.wait().unwrap(), 123);
3305        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
3306        assert_eq!(stream.next().unwrap().unwrap(), 12);
3307
3308        let future_flow = Source::from_iter([1, 2])
3309            .via_mat(
3310                Flow::future_flow(|| async {
3311                    Ok(Flow::identity()
3312                        .map(|item: i32| item * 2)
3313                        .map_materialized_value(|_| 77))
3314                }),
3315                Keep::right,
3316            )
3317            .to_mat(Sink::collect(), Keep::both)
3318            .run()
3319            .unwrap();
3320        assert_eq!(future_flow.0.wait().unwrap(), 77);
3321        assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
3322    }
3323
3324    #[test]
3325    fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
3326        // Order-sensitive pin for the per-thread FIFO slot pairing: one
3327        // cloned lazy-flow blueprint used twice in the SAME chain on the
3328        // same thread. element = first_stage_id * 100 + second_stage_id, so
3329        // a cross-wired (swapped) mat pairing would yield id2 * 100 + id1.
3330        for round in 0..50 {
3331            let counter = StdArc::new(StdAtomicUsize::new(1));
3332            let factory_counter = StdArc::clone(&counter);
3333            let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
3334                let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
3335                Flow::identity()
3336                    .map(move |x: usize| x * 100 + id)
3337                    .map_materialized_value(move |_| id)
3338            });
3339            let lazy_again = lazy.clone();
3340
3341            let ((first_mat, second_mat), out) = Source::from_iter([0usize])
3342                .via_mat(lazy, Keep::right)
3343                .via_mat(lazy_again, Keep::both)
3344                .to_mat(Sink::collect(), Keep::both)
3345                .run()
3346                .unwrap();
3347
3348            let first_id = first_mat.wait().unwrap();
3349            let second_id = second_mat.wait().unwrap();
3350            let element = out.wait().unwrap()[0];
3351            assert_eq!(
3352                element,
3353                first_id * 100 + second_id,
3354                "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
3355            );
3356            assert_ne!(
3357                first_id, second_id,
3358                "round {round}: same factory instance paired twice"
3359            );
3360        }
3361    }
3362
3363    #[test]
3364    fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
3365        for _ in 0..20 {
3366            let next_id = StdArc::new(StdAtomicUsize::new(0));
3367            let next_id_for_factory = StdArc::clone(&next_id);
3368            let flow = Flow::<i32, i32>::lazy_flow(move || {
3369                let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
3370                Flow::identity()
3371                    .map(move |item: i32| item + (id as i32 * 100))
3372                    .map_materialized_value(move |_| id)
3373            });
3374            let barrier = StdArc::new(std::sync::Barrier::new(3));
3375
3376            let spawn_materialization = |input: i32| {
3377                let flow = flow.clone();
3378                let barrier = StdArc::clone(&barrier);
3379                thread::spawn(move || {
3380                    barrier.wait();
3381                    let (mat, values) = Source::single(input)
3382                        .via_mat(flow, Keep::right)
3383                        .to_mat(Sink::collect(), Keep::both)
3384                        .run()
3385                        .unwrap();
3386                    (input, mat.wait().unwrap(), values.wait().unwrap())
3387                })
3388            };
3389
3390            let first = spawn_materialization(1);
3391            let second = spawn_materialization(2);
3392            barrier.wait();
3393
3394            for result in [first.join().unwrap(), second.join().unwrap()] {
3395                let (input, mat_id, values) = result;
3396                assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
3397            }
3398            assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
3399        }
3400    }
3401
3402    #[test]
3403    fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
3404        let queue = Source::from_factory(|| {
3405            Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
3406        })
3407        .map_with_resource(
3408            || Ok(()),
3409            |_resource, item| Ok(item + 10),
3410            |_resource| Ok(Some(99)),
3411        )
3412        .run_with(Sink::queue())
3413        .unwrap();
3414
3415        assert_eq!(queue.pull().unwrap(), Some(11));
3416        assert_eq!(queue.pull().unwrap(), Some(99));
3417        assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
3418
3419        let failed: StreamResult<Vec<i32>> = Source::single(1)
3420            .map_with_resource(
3421                || Ok(()),
3422                |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
3423                |_resource| -> StreamResult<Option<i32>> {
3424                    Err(StreamError::Failed("close".into()))
3425                },
3426            )
3427            .run_collect();
3428        assert_eq!(failed, Err(StreamError::Failed("map".into())));
3429    }
3430
3431    #[test]
3432    fn stateful_and_terminal_source_operators_work() {
3433        let stateful = Source::from_iter([1, 2, 3])
3434            .stateful_map(0, |sum, item| {
3435                *sum += item;
3436                *sum
3437            })
3438            .run_collect()
3439            .unwrap();
3440        assert_eq!(stateful, vec![1, 3, 6]);
3441
3442        let concat = Source::from_iter([1, 2, 3])
3443            .stateful_map_concat(0, |sum, item| {
3444                *sum += item;
3445                [item, *sum]
3446            })
3447            .run_collect()
3448            .unwrap();
3449        assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3450
3451        assert_eq!(
3452            Source::from_iter([1, 2, 3])
3453                .fold(10, |acc, item| acc + item)
3454                .run_collect()
3455                .unwrap(),
3456            vec![16]
3457        );
3458        assert_eq!(
3459            Source::from_iter([1, 2, 3])
3460                .reduce(|acc, item| acc + item)
3461                .run_collect()
3462                .unwrap(),
3463            vec![6]
3464        );
3465    }
3466
3467    #[test]
3468    fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3469        let concat = Source::single(())
3470            .map_concat(|_| 0_u64..)
3471            .take(1)
3472            .run_collect()
3473            .unwrap();
3474        assert_eq!(concat, vec![0]);
3475
3476        let sliding = Source::repeat(1_u64)
3477            .sliding(2, 1)
3478            .take(1)
3479            .run_collect()
3480            .unwrap();
3481        assert_eq!(sliding, vec![vec![1, 1]]);
3482    }
3483
3484    #[test]
3485    fn fan_in_source_operators_follow_ordering_rules() {
3486        assert_eq!(
3487            Source::from_iter([1, 2])
3488                .concat(Source::from_iter([3, 4]))
3489                .run_collect()
3490                .unwrap(),
3491            vec![1, 2, 3, 4]
3492        );
3493        assert_eq!(
3494            Source::from_iter([3, 4])
3495                .prepend(Source::from_iter([1, 2]))
3496                .run_collect()
3497                .unwrap(),
3498            vec![1, 2, 3, 4]
3499        );
3500        assert_eq!(
3501            Source::empty()
3502                .or_else(Source::from_iter([10, 20]))
3503                .run_collect()
3504                .unwrap(),
3505            vec![10, 20]
3506        );
3507        assert_eq!(
3508            Source::from_iter([1, 2])
3509                .or_else(Source::from_iter([10, 20]))
3510                .run_collect()
3511                .unwrap(),
3512            vec![1, 2]
3513        );
3514        assert_eq!(
3515            Source::from_iter([1, 2, 3])
3516                .interleave(Source::from_iter([10, 11, 12]), 2)
3517                .run_collect()
3518                .unwrap(),
3519            vec![1, 2, 10, 11, 3, 12]
3520        );
3521    }
3522
3523    #[test]
3524    fn fan_in_flow_operators_compose_with_primary_stream() {
3525        let concat = Source::from_iter([1, 2])
3526            .via(Flow::identity().concat(Source::from_iter([3, 4])))
3527            .run_collect()
3528            .unwrap();
3529        assert_eq!(concat, vec![1, 2, 3, 4]);
3530
3531        let prepend = Source::from_iter([3, 4])
3532            .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3533            .run_collect()
3534            .unwrap();
3535        assert_eq!(prepend, vec![1, 2, 3, 4]);
3536
3537        let interleave = Source::from_iter([1, 2, 3])
3538            .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3539            .run_collect()
3540            .unwrap();
3541        assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3542
3543        let merge_sorted = Source::from_iter([1, 4])
3544            .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3545            .run_collect()
3546            .unwrap();
3547        assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3548
3549        let zip_latest = Source::from_iter([1, 2])
3550            .via(Flow::identity().zip_latest(Source::single(10)))
3551            .run_collect()
3552            .unwrap();
3553        assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3554
3555        let zip_latest_with = Source::from_iter([1, 2])
3556            .via(
3557                Flow::identity()
3558                    .zip_latest_with(Source::single(10), false, |left, right| left + right),
3559            )
3560            .run_collect()
3561            .unwrap();
3562        assert_eq!(zip_latest_with, vec![11, 12]);
3563    }
3564
3565    #[test]
3566    fn fan_in_operators_propagate_errors_and_eager_close() {
3567        assert!(matches!(
3568            Source::failed(StreamError::Failed("boom".into()))
3569                .or_else(Source::from_iter([1, 2]))
3570                .run_collect(),
3571            Err(StreamError::Failed(_))
3572        ));
3573        assert!(matches!(
3574            Source::from_iter([1, 2])
3575                .prepend(Source::failed(StreamError::Failed("boom".into())))
3576                .run_collect(),
3577            Err(StreamError::Failed(_))
3578        ));
3579        assert_eq!(
3580            Source::from_iter([1, 2])
3581                .interleave_all([Source::empty()], 1, true)
3582                .run_collect()
3583                .unwrap(),
3584            vec![1]
3585        );
3586    }
3587
3588    #[test]
3589    fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3590        use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3591
3592        let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3593            AtomicUsize::new(0),
3594            AtomicUsize::new(0),
3595            AtomicUsize::new(0),
3596        ]);
3597
3598        let make_source = |idx: usize| {
3599            let pulls = Arc::clone(&pulls);
3600            Source::from_materialized_factory(move |_| {
3601                let pulls = Arc::clone(&pulls);
3602                let mut emitted = false;
3603                Ok((
3604                    Box::new(std::iter::from_fn(move || {
3605                        pulls[idx].fetch_add(1, Ordering::SeqCst);
3606                        if !emitted && idx == 0 {
3607                            emitted = true;
3608                            Some(Ok(42))
3609                        } else {
3610                            None
3611                        }
3612                    })) as BoxStream<i32>,
3613                    NotUsed,
3614                ))
3615            })
3616        };
3617
3618        let result = make_source(0)
3619            .interleave_all([make_source(1), make_source(2)], 1, false)
3620            .run_with(Sink::head());
3621
3622        assert_eq!(wait(result.unwrap()), 42);
3623        assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3624        assert_eq!(
3625            pulls[1].load(Ordering::SeqCst),
3626            0,
3627            "second input should not be pulled when downstream cancels after first element"
3628        );
3629        assert_eq!(
3630            pulls[2].load(Ordering::SeqCst),
3631            0,
3632            "third input should not be pulled before its turn"
3633        );
3634    }
3635
3636    #[test]
3637    fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3638        assert_eq!(
3639            Source::from_iter([1, 2, 3, 4])
3640                .interleave_all(
3641                    [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3642                    1,
3643                    false
3644                )
3645                .run_collect()
3646                .unwrap(),
3647            vec![1, 10, 20, 2, 21, 3, 22, 4]
3648        );
3649    }
3650
3651    #[test]
3652    fn remaining_merge_and_zip_family_matches_expected_ordering() {
3653        assert_eq!(
3654            Source::from_iter([1, 4])
3655                .merge_sorted(Source::from_iter([2, 3, 5]))
3656                .run_collect()
3657                .unwrap(),
3658            vec![1, 2, 3, 4, 5]
3659        );
3660
3661        assert_eq!(
3662            Source::from_iter([1, 2])
3663                .merge_latest(Source::single(10), false)
3664                .run_collect()
3665                .unwrap(),
3666            vec![vec![1, 10], vec![2, 10]]
3667        );
3668
3669        assert_eq!(
3670            Source::from_iter([1, 2, 3])
3671                .merge_all([Source::from_iter([10, 11])], false)
3672                .run_collect()
3673                .unwrap(),
3674            vec![1, 10, 2, 11, 3]
3675        );
3676
3677        assert_eq!(
3678            Source::from_iter([1, 2, 3])
3679                .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3680                .run_collect()
3681                .unwrap(),
3682            vec![11, 13, 15]
3683        );
3684
3685        assert_eq!(
3686            Source::from_iter([1, 2])
3687                .zip_latest(Source::single(10))
3688                .run_collect()
3689                .unwrap(),
3690            vec![(1, 10), (2, 10)]
3691        );
3692
3693        assert_eq!(
3694            Source::from_iter([1, 2, 3])
3695                .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3696                .run_collect()
3697                .unwrap(),
3698            vec![11, 12, 13]
3699        );
3700
3701        assert_eq!(
3702            Source::from_iter([1, 2])
3703                .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3704                .run_collect()
3705                .unwrap(),
3706            vec![(1, 10), (2, 11), (-1, 12)]
3707        );
3708
3709        assert_eq!(
3710            Source::from_iter([5, 6, 7])
3711                .zip_with_index()
3712                .run_collect()
3713                .unwrap(),
3714            vec![(5, 0), (6, 1), (7, 2)]
3715        );
3716
3717        assert_eq!(
3718            Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3719                .run_collect()
3720                .unwrap(),
3721            vec![vec![1, 10], vec![2, 20]]
3722        );
3723
3724        assert_eq!(
3725            Source::zip_with_n(
3726                [
3727                    Source::from_iter([1, 2]),
3728                    Source::from_iter([10, 20]),
3729                    Source::from_iter([100, 200]),
3730                ],
3731                |values| values.into_iter().sum::<i32>(),
3732            )
3733            .run_collect()
3734            .unwrap(),
3735            vec![111, 222]
3736        );
3737
3738        assert_eq!(
3739            Source::merge_prioritized_n(
3740                [
3741                    (Source::from_iter([1, 2, 3, 4]), 2),
3742                    (Source::from_iter([10, 11]), 1),
3743                ],
3744                false,
3745            )
3746            .run_collect()
3747            .unwrap(),
3748            vec![1, 2, 10, 3, 4, 11]
3749        );
3750
3751        assert_eq!(
3752            Source::combine(
3753                Source::from_iter([1, 2, 3]),
3754                Source::from_iter([10, 11]),
3755                std::iter::empty::<Source<i32, NotUsed>>(),
3756                SourceCombineStrategy::Merge {
3757                    eager_complete: false,
3758                },
3759            )
3760            .run_collect()
3761            .unwrap(),
3762            vec![1, 10, 2, 11, 3]
3763        );
3764
3765        let combined_sink = Sink::combine(
3766            Sink::ignore(),
3767            Sink::ignore(),
3768            std::iter::empty::<Sink<i32, NotUsed>>(),
3769            SinkCombineStrategy::Broadcast,
3770        );
3771        assert_eq!(
3772            Source::from_iter([1, 2, 3])
3773                .run_with(combined_sink)
3774                .unwrap(),
3775            NotUsed
3776        );
3777    }
3778
3779    #[test]
3780    fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3781        // Regression: the combined children's materialized values were
3782        // dropped at materialization, tripping cancel-on-drop and silently
3783        // cancelling every child before it consumed anything (0 deliveries).
3784        let first_count = StdArc::new(StdAtomicUsize::new(0));
3785        let second_count = StdArc::new(StdAtomicUsize::new(0));
3786        let first_counter = StdArc::clone(&first_count);
3787        let second_counter = StdArc::clone(&second_count);
3788        let combined = Sink::combine(
3789            Sink::foreach(move |_: i32| {
3790                first_counter.fetch_add(1, StdOrdering::SeqCst);
3791            }),
3792            Sink::foreach(move |_: i32| {
3793                second_counter.fetch_add(1, StdOrdering::SeqCst);
3794            }),
3795            std::iter::empty::<Sink<i32, NotUsed>>(),
3796            SinkCombineStrategy::Broadcast,
3797        );
3798        assert_eq!(
3799            Source::from_iter(0..100).run_with(combined).unwrap(),
3800            NotUsed
3801        );
3802        // The rendezvous channels guarantee each child has received every
3803        // element before the parent send returns. The child callback may still
3804        // be one scheduler tick behind the parent, so assert with a bounded
3805        // condition wait instead of an immediate load.
3806        assert!(wait_until(StdDuration::from_secs(1), || {
3807            first_count.load(StdOrdering::SeqCst) == 100
3808                && second_count.load(StdOrdering::SeqCst) == 100
3809        }));
3810    }
3811
3812    #[test]
3813    fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3814        // Regression: with eager_complete = false, an empty side left
3815        // `latest = None` forever while the loop drained the other side —
3816        // an infinite busy-loop when that side is unbounded.
3817        assert_eq!(
3818            Source::from_iter(std::iter::empty::<i32>())
3819                .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3820                .run_collect()
3821                .unwrap(),
3822            Vec::<i32>::new()
3823        );
3824        assert_eq!(
3825            Source::repeat(10)
3826                .zip_latest_with(
3827                    Source::from_iter(std::iter::empty::<i32>()),
3828                    false,
3829                    |left, right| left + right,
3830                )
3831                .run_collect()
3832                .unwrap(),
3833            Vec::<i32>::new()
3834        );
3835    }
3836
3837    #[test]
3838    fn zip_family_completion_boundaries_match_expected_results() {
3839        assert_eq!(
3840            Source::from_iter([1, 2, 3])
3841                .zip_with(Source::from_iter([10]), |left, right| left + right)
3842                .run_collect()
3843                .unwrap(),
3844            vec![11]
3845        );
3846
3847        assert_eq!(
3848            Source::from_iter([1, 2, 3])
3849                .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3850                .run_collect()
3851                .unwrap(),
3852            vec![11, 12]
3853        );
3854
3855        assert_eq!(
3856            Source::zip_n([
3857                Source::from_iter([1, 2, 3]),
3858                Source::from_iter([10]),
3859                Source::from_iter([100, 200, 300]),
3860            ])
3861            .run_collect()
3862            .unwrap(),
3863            vec![vec![1, 10, 100]]
3864        );
3865    }
3866
3867    #[test]
3868    fn combine_strategies_follow_merge_concat_and_priority_rules() {
3869        assert_eq!(
3870            Source::combine(
3871                Source::from_iter([1, 2]),
3872                Source::from_iter([10, 11]),
3873                [Source::from_iter([100])],
3874                SourceCombineStrategy::Concat,
3875            )
3876            .run_collect()
3877            .unwrap(),
3878            vec![1, 2, 10, 11, 100]
3879        );
3880
3881        assert_eq!(
3882            Source::combine(
3883                Source::from_iter([1, 2, 3, 4]),
3884                Source::from_iter([10, 11]),
3885                std::iter::empty::<Source<i32, NotUsed>>(),
3886                SourceCombineStrategy::Prioritized {
3887                    priorities: vec![2, 1],
3888                    eager_complete: false,
3889                },
3890            )
3891            .run_collect()
3892            .unwrap(),
3893            vec![1, 2, 10, 3, 4, 11]
3894        );
3895    }
3896
3897    #[test]
3898    fn concat_lazy_defers_follow_on_source_until_needed() {
3899        let source_counter = StdArc::new(StdAtomicUsize::new(0));
3900        let source_counter_clone = StdArc::clone(&source_counter);
3901        let lazy_source = Source::from_materialized_factory(move |_| {
3902            source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3903            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3904        });
3905        let source_head = Source::single(1)
3906            .concat_lazy(lazy_source)
3907            .run_with(Sink::head());
3908        assert_eq!(wait(source_head.unwrap()), 1);
3909        assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
3910
3911        let flow_counter = StdArc::new(StdAtomicUsize::new(0));
3912        let flow_counter_clone = StdArc::clone(&flow_counter);
3913        let lazy_flow_source = Source::from_materialized_factory(move |_| {
3914            flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3915            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3916        });
3917        let flow_head = Source::single(1)
3918            .via(Flow::identity().concat_lazy(lazy_flow_source))
3919            .run_with(Sink::head());
3920        assert_eq!(wait(flow_head.unwrap()), 1);
3921        assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
3922    }
3923
3924    #[test]
3925    fn also_to_completes_when_side_sink_cancels() {
3926        assert_eq!(
3927            Source::from_iter([1, 2, 3])
3928                .also_to(Sink::cancelled())
3929                .run_collect()
3930                .unwrap(),
3931            Vec::<i32>::new()
3932        );
3933        assert_eq!(
3934            Source::from_iter([1, 2, 3])
3935                .also_to_all([Sink::cancelled(), Sink::cancelled()])
3936                .run_collect()
3937                .unwrap(),
3938            Vec::<i32>::new()
3939        );
3940    }
3941
3942    #[test]
3943    fn also_to_completes_gracefully_when_side_sink_disconnects() {
3944        let result = Source::from_iter(0..100)
3945            .also_to(Sink::head())
3946            .run_collect()
3947            .unwrap();
3948        assert!(!result.is_empty(), "main should emit at least one element");
3949        assert!(
3950            result.len() < 100,
3951            "main should complete early when side disconnects"
3952        );
3953    }
3954
3955    #[test]
3956    fn also_to_propagates_original_error_when_side_is_disconnected() {
3957        let err = StreamError::Failed("distinctive-boom".into());
3958        assert!(matches!(
3959            Source::<i32>::failed(err.clone())
3960                .also_to(Sink::cancelled())
3961                .run_collect(),
3962            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3963        ));
3964        assert!(matches!(
3965            Source::<i32>::failed(err.clone())
3966                .also_to_all([Sink::cancelled()])
3967                .run_collect(),
3968            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3969        ));
3970        assert!(matches!(
3971            Source::<i32>::failed(err)
3972                .divert_to(Sink::cancelled(), |_: &i32| true)
3973                .run_collect(),
3974            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3975        ));
3976    }
3977
3978    #[test]
3979    fn divert_to_routes_matching_elements_to_side_sink() {
3980        let diverted = Source::from_iter([1, 2, 3, 4])
3981            .divert_to(Sink::ignore(), |item| item % 2 == 0)
3982            .run_collect()
3983            .unwrap();
3984        assert_eq!(diverted, vec![1, 3]);
3985    }
3986
3987    #[test]
3988    fn wire_tap_drops_when_side_sink_backpressures() {
3989        let tapped = Source::from_iter([1, 2, 3])
3990            .wire_tap(Sink::head())
3991            .run_collect()
3992            .unwrap();
3993        assert_eq!(tapped, vec![1, 2, 3]);
3994
3995        let tapped_via_flow = Source::from_iter([1, 2, 3])
3996            .via(Flow::identity().wire_tap(Sink::head()))
3997            .run_collect()
3998            .unwrap();
3999        assert_eq!(tapped_via_flow, vec![1, 2, 3]);
4000    }
4001
4002    #[test]
4003    fn async_mapping_variants_complete() {
4004        let ordered = Source::from_iter(0..4)
4005            .map_async(2, |item| async move { Ok(item * 2) })
4006            .run_collect()
4007            .unwrap();
4008        assert_eq!(ordered, vec![0, 2, 4, 6]);
4009
4010        let unordered = Source::from_iter(0..4)
4011            .map_async_unordered(2, |item| async move { Ok(item * 2) })
4012            .run_collect()
4013            .unwrap();
4014        assert_eq!(unordered, vec![0, 2, 4, 6]);
4015
4016        let partitioned = Source::from_iter(0..4)
4017            .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
4018            .run_collect()
4019            .unwrap();
4020        assert_eq!(partitioned, vec![1, 2, 3, 4]);
4021    }
4022
4023    #[test]
4024    fn map_async_ordered_bounds_pulls_behind_stuck_head() {
4025        let pulls = StdArc::new(StdAtomicUsize::new(0));
4026        let pulls_for_source = StdArc::clone(&pulls);
4027        let probe = Source::from_fn_iter(move || {
4028            let pulls = StdArc::clone(&pulls_for_source);
4029            std::iter::from_fn(move || {
4030                let next = pulls.fetch_add(1, StdOrdering::SeqCst);
4031                Some(next)
4032            })
4033        })
4034        .map_async(2, |item| async move {
4035            if item == 0 {
4036                tokio::time::sleep(StdDuration::from_millis(300)).await;
4037            }
4038            Ok(item)
4039        })
4040        .run_with(TestSink::probe())
4041        .unwrap();
4042
4043        probe.request(16);
4044        thread::sleep(StdDuration::from_millis(100));
4045        assert!(
4046            pulls.load(StdOrdering::SeqCst) <= 3,
4047            "pulled {} elements with parallelism=2 behind a stuck ordered head",
4048            pulls.load(StdOrdering::SeqCst)
4049        );
4050    }
4051
4052    #[test]
4053    fn async_mapping_parks_until_woken_future_completes() {
4054        struct WakeOnceFuture {
4055            value: Option<u64>,
4056            ready: StdArc<StdAtomicBool>,
4057            started: bool,
4058            polls: StdArc<StdAtomicUsize>,
4059            latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
4060        }
4061
4062        impl std::future::Future for WakeOnceFuture {
4063            type Output = StreamResult<u64>;
4064
4065            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
4066                let this = self.as_mut().get_mut();
4067                this.polls.fetch_add(1, StdOrdering::SeqCst);
4068                *this.latest_waker.lock().expect("latest wake slot mutex") =
4069                    Some(cx.waker().clone());
4070                if this.ready.load(StdOrdering::SeqCst) {
4071                    return Poll::Ready(Ok(this.value.take().unwrap()));
4072                }
4073
4074                if !this.started {
4075                    this.started = true;
4076                    let ready = StdArc::clone(&this.ready);
4077                    let latest_waker = StdArc::clone(&this.latest_waker);
4078                    thread::spawn(move || {
4079                        thread::sleep(Duration::from_millis(20));
4080                        ready.store(true, StdOrdering::SeqCst);
4081                        if let Some(waker) =
4082                            latest_waker.lock().expect("latest wake slot mutex").take()
4083                        {
4084                            waker.wake();
4085                        }
4086                    });
4087                }
4088                Poll::Pending
4089            }
4090        }
4091
4092        let polls = StdArc::new(StdAtomicUsize::new(0));
4093        let polls_for_stage = StdArc::clone(&polls);
4094        let start = Instant::now();
4095
4096        let values = Source::single(41)
4097            .map_async(1, move |item| WakeOnceFuture {
4098                value: Some(item + 1),
4099                ready: StdArc::new(StdAtomicBool::new(false)),
4100                started: false,
4101                polls: StdArc::clone(&polls_for_stage),
4102                latest_waker: StdArc::new(Mutex::new(None)),
4103            })
4104            .run_collect()
4105            .unwrap();
4106
4107        assert_eq!(values, vec![42]);
4108        let elapsed = start.elapsed();
4109        assert!(
4110            elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
4111            "pending future should park until woken once, elapsed={elapsed:?}"
4112        );
4113        assert!(
4114            polls.load(StdOrdering::SeqCst) < 4096,
4115            "pending future was repolled too aggressively"
4116        );
4117    }
4118
4119    #[test]
4120    fn async_mapping_emits_before_unbounded_upstream_finishes() {
4121        let ordered = Source::repeat(1)
4122            .map_async(2, |item| async move { Ok(item + 1) })
4123            .take(1)
4124            .run_collect()
4125            .unwrap();
4126        assert_eq!(ordered, vec![2]);
4127
4128        let unordered = Source::repeat(1)
4129            .map_async_unordered(2, |item| async move { Ok(item + 1) })
4130            .take(1)
4131            .run_collect()
4132            .unwrap();
4133        assert_eq!(unordered, vec![2]);
4134
4135        let partitioned = Source::repeat(1)
4136            .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
4137            .take(1)
4138            .run_collect()
4139            .unwrap();
4140        assert_eq!(partitioned, vec![2]);
4141    }
4142
4143    #[test]
4144    fn partitioned_async_mapping_limits_same_key_concurrency() {
4145        let active = StdArc::new(StdAtomicUsize::new(0));
4146        let max_active = StdArc::new(StdAtomicUsize::new(0));
4147        let active_for_stage = StdArc::clone(&active);
4148        let max_for_stage = StdArc::clone(&max_active);
4149
4150        let values = Source::from_iter(0..6)
4151            .map_async_partitioned(
4152                4,
4153                1,
4154                |_| 0_u8,
4155                move |item| {
4156                    let active = StdArc::clone(&active_for_stage);
4157                    let max_active = StdArc::clone(&max_for_stage);
4158                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4159                    max_active.fetch_max(current, StdOrdering::SeqCst);
4160                    async move {
4161                        thread::sleep(Duration::from_millis(1));
4162                        active.fetch_sub(1, StdOrdering::SeqCst);
4163                        Ok(item)
4164                    }
4165                },
4166            )
4167            .run_collect()
4168            .unwrap();
4169
4170        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
4171        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4172    }
4173
4174    #[test]
4175    fn partitioned_async_mapping_scans_past_blocked_pending_key() {
4176        let active = StdArc::new(StdAtomicUsize::new(0));
4177        let max_active = StdArc::new(StdAtomicUsize::new(0));
4178        let active_for_stage = StdArc::clone(&active);
4179        let max_for_stage = StdArc::clone(&max_active);
4180        let (release_tx, release_rx) = oneshot::channel::<()>();
4181        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
4182        let release_rx_for_stage = StdArc::clone(&release_rx);
4183        let max_for_release = StdArc::clone(&max_active);
4184
4185        let releaser = thread::spawn(move || {
4186            let deadline = Instant::now() + StdDuration::from_secs(1);
4187            while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
4188                thread::yield_now();
4189            }
4190            let _ = release_tx.send(());
4191        });
4192
4193        let values = Source::from_iter([0, 2, 1])
4194            .map_async_partitioned(
4195                2,
4196                1,
4197                |item| item % 2,
4198                move |item| {
4199                    let active = StdArc::clone(&active_for_stage);
4200                    let max_active = StdArc::clone(&max_for_stage);
4201                    let release_rx = StdArc::clone(&release_rx_for_stage);
4202                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4203                    max_active.fetch_max(current, StdOrdering::SeqCst);
4204                    async move {
4205                        if item == 0 {
4206                            let receiver = release_rx
4207                                .lock()
4208                                .expect("release receiver mutex")
4209                                .take()
4210                                .expect("release receiver present");
4211                            let _ = receiver.await;
4212                        }
4213                        active.fetch_sub(1, StdOrdering::SeqCst);
4214                        Ok(item)
4215                    }
4216                },
4217            )
4218            .run_collect()
4219            .unwrap();
4220        releaser.join().unwrap();
4221
4222        assert_eq!(values, vec![0, 2, 1]);
4223        assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
4224    }
4225
4226    #[test]
4227    fn partitioned_async_mapping_p1_still_evaluates_partition() {
4228        let partitions = StdArc::new(StdAtomicUsize::new(0));
4229        let partitions_for_stage = StdArc::clone(&partitions);
4230
4231        let values = Source::from_iter(0..8)
4232            .map_async_partitioned(
4233                1,
4234                1,
4235                move |item| {
4236                    partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
4237                    item % 2
4238                },
4239                |item| async move { Ok(item + 1) },
4240            )
4241            .run_collect()
4242            .unwrap();
4243
4244        assert_eq!(values, (1..9).collect::<Vec<_>>());
4245        assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
4246    }
4247
4248    #[test]
4249    fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
4250        let active_by_key =
4251            StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4252        let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4253        let active_for_stage = StdArc::clone(&active_by_key);
4254        let max_for_stage = StdArc::clone(&max_by_key);
4255
4256        let values = Source::from_iter(0..512_usize)
4257            .map_async_partitioned(
4258                32,
4259                1,
4260                |item| item % 16,
4261                move |item| {
4262                    let active = StdArc::clone(&active_for_stage);
4263                    let max_active = StdArc::clone(&max_for_stage);
4264                    let key = item % 16;
4265                    let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
4266                    max_active[key].fetch_max(current, StdOrdering::SeqCst);
4267                    async move {
4268                        active[key].fetch_sub(1, StdOrdering::SeqCst);
4269                        Ok(item)
4270                    }
4271                },
4272            )
4273            .run_collect()
4274            .unwrap();
4275
4276        assert_eq!(values, (0..512).collect::<Vec<_>>());
4277        for max_active in max_by_key.iter() {
4278            assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4279        }
4280    }
4281
4282    #[test]
4283    fn error_operators_map_recover_and_complete() {
4284        let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
4285            .map_error(|_| StreamError::Failed("mapped".into()))
4286            .run_collect();
4287        assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
4288
4289        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4290            .recover(|error| match error {
4291                StreamError::Failed(_) => Some(42),
4292                _ => None,
4293            })
4294            .run_collect()
4295            .unwrap();
4296        assert_eq!(recovered, vec![42]);
4297
4298        let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
4299            .recover(|_| None)
4300            .run_collect();
4301        assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
4302
4303        let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
4304            .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
4305            .run_collect()
4306            .unwrap();
4307        assert_eq!(recovered_with, vec![1, 2]);
4308
4309        let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
4310            .recover_with_retries(1, |_| None)
4311            .run_collect();
4312        assert_eq!(
4313            declined_recover_with,
4314            Err(StreamError::Failed("declined".into()))
4315        );
4316
4317        let completed = Source::from_factory(|| {
4318            Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
4319        })
4320        .on_error_complete()
4321        .run_collect()
4322        .unwrap();
4323        assert_eq!(completed, vec![1]);
4324    }
4325
4326    #[test]
4327    fn sliding_matches_akka_window_semantics() {
4328        // Full windows then stop; no spurious trailing partial windows.
4329        assert_eq!(
4330            Source::from_iter(1..=4)
4331                .sliding(3, 1)
4332                .run_collect()
4333                .unwrap(),
4334            vec![vec![1, 2, 3], vec![2, 3, 4]]
4335        );
4336        assert_eq!(
4337            Source::from_iter(1..=4)
4338                .sliding(2, 1)
4339                .run_collect()
4340                .unwrap(),
4341            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
4342        );
4343        // Exact multiple of size leaves nothing to emit at finish.
4344        assert_eq!(
4345            Source::from_iter(1..=3)
4346                .sliding(3, 1)
4347                .run_collect()
4348                .unwrap(),
4349            vec![vec![1, 2, 3]]
4350        );
4351        // Short streams emit a single partial window.
4352        assert_eq!(
4353            Source::from_iter(1..=2)
4354                .sliding(3, 1)
4355                .run_collect()
4356                .unwrap(),
4357            vec![vec![1, 2]]
4358        );
4359        // Every element in its own window.
4360        assert_eq!(
4361            Source::from_iter(1..=3)
4362                .sliding(1, 1)
4363                .run_collect()
4364                .unwrap(),
4365            vec![vec![1], vec![2], vec![3]]
4366        );
4367        // step > size skips the gap between windows.
4368        assert_eq!(
4369            Source::from_iter(1..=6)
4370                .sliding(2, 3)
4371                .run_collect()
4372                .unwrap(),
4373            vec![vec![1, 2], vec![4, 5]]
4374        );
4375        // step > size with an in-progress trailing window is dropped (Akka).
4376        assert_eq!(
4377            Source::from_iter(1..=3)
4378                .sliding(2, 4)
4379                .run_collect()
4380                .unwrap(),
4381            vec![vec![1, 2]]
4382        );
4383    }
4384
4385    #[test]
4386    fn recover_with_retries_indefinitely_like_akka() {
4387        let attempts = StdArc::new(StdAtomicUsize::new(0));
4388        let attempts_in_stage = StdArc::clone(&attempts);
4389        // The first several recovery sources keep failing; `recover_with` must
4390        // retry past Datum's old single-retry limit to reach the good source.
4391        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4392            .recover_with(move |_error| {
4393                if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
4394                    Some(Source::<i32>::failed(StreamError::Failed("again".into())))
4395                } else {
4396                    Some(Source::from_iter([42]))
4397                }
4398            })
4399            .run_collect()
4400            .unwrap();
4401        assert_eq!(recovered, vec![42]);
4402        assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
4403    }
4404
4405    #[test]
4406    fn many_concurrent_streams_do_not_starve_the_pool() {
4407        // Regression: a fixed-size pool deadlocks once `num_cpus` streams each
4408        // monopolize a worker. The growing pool must keep serving new streams.
4409        //
4410        // Use a small fixed count rather than `available_parallelism() + 2`: the
4411        // point is that several concurrent never-completing streams cannot starve
4412        // a fresh one, and a fixed count keeps this from spawning a large number
4413        // of permanent threads (which, when the whole suite runs under load, can
4414        // exhaust the thread limit and wedge the shared executor).
4415        let materializer = Materializer::new();
4416        let busy = 6_usize;
4417
4418        let mut held = Vec::with_capacity(busy);
4419        for _ in 0..busy {
4420            held.push(
4421                Source::single(1_u64)
4422                    .run_with_materializer(Sink::never(), &materializer)
4423                    .unwrap(),
4424            );
4425        }
4426
4427        for _ in 0..400 {
4428            if materializer.active_streams() >= busy {
4429                break;
4430            }
4431            thread::sleep(Duration::from_millis(5));
4432        }
4433        assert_eq!(materializer.active_streams(), busy);
4434
4435        // A fresh finite stream must still complete despite every prior worker
4436        // being occupied by a never-completing sink.
4437        let sum = Source::from_iter(0_u64..5)
4438            .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
4439            .unwrap();
4440        assert_eq!(sum.wait().unwrap(), 10);
4441
4442        materializer.shutdown();
4443        for completion in held {
4444            assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4445        }
4446    }
4447}