Skip to main content

datum/stream/
mod.rs

1//! Core push-stream vocabulary and the first reusable Source/Flow API slice.
2
3use std::{
4    collections::{BTreeMap, HashMap, VecDeque},
5    fmt,
6    future::Future,
7    hash::Hash,
8    marker::PhantomData,
9    panic::{AssertUnwindSafe, catch_unwind},
10    pin::Pin,
11    sync::{
12        Arc, Condvar, Mutex, OnceLock,
13        atomic::{AtomicBool, AtomicUsize, Ordering},
14    },
15    task::{Context, Poll},
16    thread,
17    time::Duration,
18};
19
20use futures::{channel::oneshot, executor::block_on};
21use thiserror::Error;
22use tokio::{
23    runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
24    task::{JoinError, JoinHandle},
25};
26
27pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
28pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
29pub(crate) type RuntimeTransform<In, Out> =
30    Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
31type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
32type HintedSinkRunner<In, Mat> =
33    dyn Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat> + Send + Sync;
34type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
35const STREAM_READY_SPINS: usize = 256;
36/// Spin-loop hints between consecutive drain-thread readiness checks. Back off
37/// a small block of hints between polls so the spin window still covers the
38/// common fast-completion case without burning a full busy loop before
39/// parking.
40const STREAM_SPIN_BACKOFF: usize = 8;
41const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
42
43/// Private metadata for known finite, synchronous micro-sources.
44/// Set only on Datum-owned constructors whose `next()` is guaranteed
45/// memory-backed and whose total success item count is known at blueprint time.
46#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47struct InlineMicroSourceHint {
48    max_success_items: usize,
49}
50
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52struct SourceHints {
53    inline_head_terminal: bool,
54    /// Set only for constructors where Datum owns the full iterator and can
55    /// prove it is finite, synchronous, and memory-backed.
56    inline_micro: Option<InlineMicroSourceHint>,
57    /// Set only when terminal `fold`/`collect`/`ignore` may amortize the
58    /// checked-stream cancellation/shutdown wrapper. Unknown, probe, queue, IO,
59    /// and timer-backed sources keep the per-element checked path.
60    terminal_consumer_batch: bool,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub(crate) struct SourceRuntimeHints {
65    pub(crate) inline_micro_max_success_items: Option<usize>,
66    pub(crate) terminal_consumer_batch: bool,
67}
68
69impl SourceHints {
70    const fn with_inline_micro(max_success_items: usize) -> Self {
71        Self {
72            inline_head_terminal: true,
73            inline_micro: Some(InlineMicroSourceHint { max_success_items }),
74            terminal_consumer_batch: true,
75        }
76    }
77
78    const fn with_terminal_consumer_batch() -> Self {
79        Self {
80            inline_head_terminal: false,
81            inline_micro: None,
82            terminal_consumer_batch: true,
83        }
84    }
85
86    fn after_flow(self, flow: FlowHints) -> Self {
87        if flow.preserves_inline_head_terminal {
88            // inline_micro is intentionally not propagated through any flow:
89            // even a trivial map wraps the iterator, making eligibility uncertain.
90            Self {
91                inline_head_terminal: true,
92                inline_micro: None,
93                terminal_consumer_batch: self.terminal_consumer_batch
94                    && flow.preserves_terminal_consumer_batch,
95            }
96        } else {
97            Self {
98                inline_head_terminal: false,
99                inline_micro: None,
100                terminal_consumer_batch: self.terminal_consumer_batch
101                    && flow.preserves_terminal_consumer_batch,
102            }
103        }
104    }
105
106    fn without_inline_micro(self) -> Self {
107        Self {
108            inline_head_terminal: self.inline_head_terminal,
109            inline_micro: None,
110            terminal_consumer_batch: self.terminal_consumer_batch,
111        }
112    }
113
114    fn runtime(self) -> SourceRuntimeHints {
115        SourceRuntimeHints {
116            inline_micro_max_success_items: self.inline_micro.map(|hint| hint.max_success_items),
117            terminal_consumer_batch: self.terminal_consumer_batch,
118        }
119    }
120}
121
122#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
123struct FlowHints {
124    preserves_inline_head_terminal: bool,
125    preserves_terminal_consumer_batch: bool,
126}
127
128impl FlowHints {
129    const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
130        preserves_inline_head_terminal: true,
131        preserves_terminal_consumer_batch: true,
132    };
133
134    const PRESERVES_TERMINAL_CONSUMER_BATCH: Self = Self {
135        preserves_inline_head_terminal: false,
136        preserves_terminal_consumer_batch: true,
137    };
138
139    fn then(self, next: Self) -> Self {
140        Self {
141            preserves_inline_head_terminal: self.preserves_inline_head_terminal
142                && next.preserves_inline_head_terminal,
143            preserves_terminal_consumer_batch: self.preserves_terminal_consumer_batch
144                && next.preserves_terminal_consumer_batch,
145        }
146    }
147}
148
149struct PartitionSlot<Key, Out> {
150    key: Option<Key>,
151    active: usize,
152    queued: VecDeque<(usize, Out)>,
153    in_ready_queue: bool,
154}
155
156struct AbortOnDropHandle<T> {
157    handle: JoinHandle<T>,
158}
159
160impl<T> AbortOnDropHandle<T> {
161    fn new(handle: JoinHandle<T>) -> Self {
162        Self { handle }
163    }
164}
165
166impl<T> Drop for AbortOnDropHandle<T> {
167    fn drop(&mut self) {
168        self.handle.abort();
169    }
170}
171
172impl<T> Future for AbortOnDropHandle<T> {
173    type Output = Result<T, JoinError>;
174
175    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176        Pin::new(&mut self.handle).poll(cx)
177    }
178}
179
180impl<T> Unpin for AbortOnDropHandle<T> {}
181
182pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
183    static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
184    RUNTIME.get_or_init(|| {
185        TokioRuntimeBuilder::new_multi_thread()
186            .enable_all()
187            .thread_name("datum-stream-tokio")
188            .build()
189            .expect("stream tokio runtime")
190    })
191}
192
193fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
194where
195    Fut: Future<Output = T> + Send + 'static,
196    T: Send + 'static,
197{
198    AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
199}
200
201pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
202    runtime::current_stream_cancelled()
203}
204
205pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
206where
207    F: FnOnce() -> T,
208{
209    catch_unwind(AssertUnwindSafe(f))
210        .map_err(|_| StreamError::Failed(format!("{context} panicked")))
211}
212
213impl<Key, Out> PartitionSlot<Key, Out> {
214    fn new(key: Key) -> Self {
215        Self {
216            key: Some(key),
217            active: 0,
218            queued: VecDeque::new(),
219            in_ready_queue: false,
220        }
221    }
222}
223
224#[inline(always)]
225fn partition_slot_for<Key, Out>(
226    key: Key,
227    slots_by_key: &mut HashMap<Key, usize>,
228    slots: &mut Vec<PartitionSlot<Key, Out>>,
229    free_slots: &mut Vec<usize>,
230) -> usize
231where
232    Key: Clone + Eq + Hash,
233{
234    if let Some(slot) = slots_by_key.get(&key) {
235        return *slot;
236    }
237
238    let slot = if let Some(slot) = free_slots.pop() {
239        let state = &mut slots[slot];
240        state.key = Some(key.clone());
241        state.active = 0;
242        state.queued.clear();
243        state.in_ready_queue = false;
244        slot
245    } else {
246        slots.push(PartitionSlot::new(key.clone()));
247        slots.len() - 1
248    };
249    slots_by_key.insert(key, slot);
250    slot
251}
252
253#[inline(always)]
254fn retire_partition_slot<Key, Out>(
255    slot: usize,
256    slots_by_key: &mut HashMap<Key, usize>,
257    slots: &mut [PartitionSlot<Key, Out>],
258    free_slots: &mut Vec<usize>,
259) where
260    Key: Eq + Hash,
261{
262    let state = &mut slots[slot];
263    if let Some(key) = state.key.take() {
264        slots_by_key.remove(&key);
265    }
266    state.active = 0;
267    state.queued.clear();
268    state.in_ready_queue = false;
269    free_slots.push(slot);
270}
271
272#[inline(always)]
273fn ready_partition_slot<Key, Out>(
274    slots: &mut [PartitionSlot<Key, Out>],
275    ready_slots: &mut VecDeque<usize>,
276    slot: usize,
277    per_partition: usize,
278) {
279    if let Some(state) = slots.get_mut(slot)
280        && state.key.is_some()
281        && !state.in_ready_queue
282        && state.active < per_partition
283        && !state.queued.is_empty()
284    {
285        state.in_ready_queue = true;
286        ready_slots.push_back(slot);
287    }
288}
289
290#[inline(always)]
291fn pop_ready_partition_slot<Key, Out>(
292    slots: &mut [PartitionSlot<Key, Out>],
293    ready_slots: &mut VecDeque<usize>,
294    per_partition: usize,
295) -> Option<(usize, usize, Out)> {
296    while let Some(slot) = ready_slots.pop_front() {
297        let mut requeue = false;
298        let item = if let Some(state) = slots.get_mut(slot) {
299            state.in_ready_queue = false;
300            if state.key.is_some() && state.active < per_partition {
301                let item = state.queued.pop_front().map(|(index, item)| {
302                    state.active += 1;
303                    (index, slot, item)
304                });
305                if !state.queued.is_empty() && state.active < per_partition {
306                    state.in_ready_queue = true;
307                    requeue = true;
308                }
309                item
310            } else {
311                None
312            }
313        } else {
314            None
315        };
316
317        if requeue {
318            ready_slots.push_back(slot);
319        }
320        if item.is_some() {
321            return item;
322        }
323    }
324    None
325}
326
327pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
328    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
329}
330
331struct FnSourceFactory<F>(F);
332
333impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
334where
335    F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
336{
337    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
338        (self.0)(materializer)
339    }
340}
341
342struct MapSourceFactory<In, Out, Mat, F> {
343    source: Arc<dyn SourceFactory<In, Mat>>,
344    stage: F,
345    _marker: PhantomData<fn(In) -> Out>,
346}
347
348impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
349where
350    In: Send + 'static,
351    Out: Send + 'static,
352    Mat: Send + 'static,
353    F: Fn(In) -> Out + Send + Sync + 'static,
354{
355    fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
356        let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
357        Ok((
358            Box::new(MapSourceStream {
359                input: stream,
360                factory: self,
361            }),
362            mat,
363        ))
364    }
365}
366
367struct MapSourceStream<In, Out, Mat, F> {
368    input: BoxStream<In>,
369    factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
370}
371
372impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
373where
374    F: Fn(In) -> Out,
375{
376    type Item = StreamResult<Out>;
377
378    fn next(&mut self) -> Option<Self::Item> {
379        self.input
380            .next()
381            .map(|item| item.map(|item| (self.factory.stage)(item)))
382    }
383}
384
385fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
386where
387    Out: Send + 'static,
388{
389    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
390    let mut current = 0usize;
391    Box::new(std::iter::from_fn(move || {
392        loop {
393            let index = next_active_optional_stream(&streams, current, |_| true)?;
394            current = (index + 1) % streams.len().max(1);
395            let Some(stream) = streams[index].as_mut() else {
396                continue;
397            };
398            match stream.next() {
399                Some(item) => return Some(item),
400                None => {
401                    streams[index] = None;
402                    if eager_complete {
403                        return None;
404                    }
405                }
406            }
407        }
408    }))
409}
410
411fn merge_prioritized_streams<Out>(
412    streams: Vec<BoxStream<Out>>,
413    priorities: Vec<usize>,
414    eager_complete: bool,
415) -> BoxStream<Out>
416where
417    Out: Send + 'static,
418{
419    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
420    let schedule: Vec<usize> = priorities
421        .into_iter()
422        .enumerate()
423        .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
424        .collect();
425    let mut schedule_index = 0usize;
426    Box::new(std::iter::from_fn(move || {
427        loop {
428            if streams.iter().all(Option::is_none) {
429                return None;
430            }
431            let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
432            let Some(stream) = streams[index].as_mut() else {
433                continue;
434            };
435            match stream.next() {
436                Some(item) => return Some(item),
437                None => {
438                    streams[index] = None;
439                    if eager_complete {
440                        return None;
441                    }
442                }
443            }
444        }
445    }))
446}
447
448fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
449where
450    Out: Ord + Send + 'static,
451{
452    let mut left_next: Option<Out> = None;
453    let mut right_next: Option<Out> = None;
454    let mut left_done = false;
455    let mut right_done = false;
456    Box::new(std::iter::from_fn(move || {
457        loop {
458            if left_next.is_none() && !left_done {
459                match left.next() {
460                    Some(Ok(item)) => left_next = Some(item),
461                    Some(Err(error)) => return Some(Err(error)),
462                    None => left_done = true,
463                }
464            }
465            if right_next.is_none() && !right_done {
466                match right.next() {
467                    Some(Ok(item)) => right_next = Some(item),
468                    Some(Err(error)) => return Some(Err(error)),
469                    None => right_done = true,
470                }
471            }
472
473            let next = match (&left_next, &right_next) {
474                (Some(left_item), Some(right_item)) => {
475                    if left_item <= right_item {
476                        left_next.take()
477                    } else {
478                        right_next.take()
479                    }
480                }
481                (Some(_), None) if right_done => left_next.take(),
482                (None, Some(_)) if left_done => right_next.take(),
483                (None, None) if left_done && right_done => return None,
484                _ => continue,
485            };
486            if let Some(item) = next {
487                return Some(Ok(item));
488            }
489        }
490    }))
491}
492
493fn merge_latest_streams<Out>(
494    streams: Vec<BoxStream<Out>>,
495    eager_complete: bool,
496) -> BoxStream<Vec<Out>>
497where
498    Out: Clone + Send + 'static,
499{
500    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
501    let mut latest = vec![None; streams.len()];
502    let mut seen = 0usize;
503    let mut current = 0usize;
504    let mut pending = VecDeque::<Vec<Out>>::new();
505    Box::new(std::iter::from_fn(move || {
506        loop {
507            if let Some(output) = pending.pop_front() {
508                return Some(Ok(output));
509            }
510            if streams.iter().all(Option::is_none) {
511                return None;
512            }
513            let index = next_active_optional_stream(&streams, current, |_| true)?;
514            current = (index + 1) % streams.len().max(1);
515            let Some(stream) = streams[index].as_mut() else {
516                continue;
517            };
518            match stream.next() {
519                Some(Ok(item)) => {
520                    if latest[index].is_none() {
521                        seen += 1;
522                    }
523                    latest[index] = Some(item);
524                    if seen == latest.len() {
525                        pending.push_back(
526                            latest
527                                .iter()
528                                .map(|item| item.clone().expect("merge-latest initialized"))
529                                .collect(),
530                        );
531                    }
532                }
533                Some(Err(error)) => return Some(Err(error)),
534                None => {
535                    streams[index] = None;
536                    if eager_complete {
537                        return None;
538                    }
539                }
540            }
541        }
542    }))
543}
544
545fn zip_streams<Left, Right>(
546    mut left: BoxStream<Left>,
547    mut right: BoxStream<Right>,
548) -> BoxStream<(Left, Right)>
549where
550    Left: Send + 'static,
551    Right: Send + 'static,
552{
553    let mut left_next: Option<Left> = None;
554    let mut right_next: Option<Right> = None;
555    let mut left_done = false;
556    let mut right_done = false;
557    Box::new(std::iter::from_fn(move || {
558        loop {
559            if left_next.is_none() && !left_done {
560                match left.next() {
561                    Some(Ok(item)) => left_next = Some(item),
562                    Some(Err(error)) => return Some(Err(error)),
563                    None => left_done = true,
564                }
565            }
566            if right_next.is_none() && !right_done {
567                match right.next() {
568                    Some(Ok(item)) => right_next = Some(item),
569                    Some(Err(error)) => return Some(Err(error)),
570                    None => right_done = true,
571                }
572            }
573            match (left_next.take(), right_next.take()) {
574                (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
575                (left_item, right_item) => {
576                    left_next = left_item;
577                    right_next = right_item;
578                    if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
579                        return None;
580                    }
581                }
582            }
583        }
584    }))
585}
586
587fn zip_latest_with_stream<Left, Right, Out, F>(
588    mut left: BoxStream<Left>,
589    mut right: BoxStream<Right>,
590    eager_complete: bool,
591    combine: Arc<F>,
592) -> BoxStream<Out>
593where
594    Left: Clone + Send + 'static,
595    Right: Clone + Send + 'static,
596    Out: Send + 'static,
597    F: Fn(Left, Right) -> Out + Send + Sync + 'static,
598{
599    let mut left_latest: Option<Left> = None;
600    let mut right_latest: Option<Right> = None;
601    let mut left_done = false;
602    let mut right_done = false;
603    let mut turn_left = true;
604    let mut pending = VecDeque::<Out>::new();
605
606    Box::new(std::iter::from_fn(move || {
607        loop {
608            if let Some(output) = pending.pop_front() {
609                return Some(Ok(output));
610            }
611            if eager_complete && (left_done || right_done) {
612                return None;
613            }
614            if left_done && right_done {
615                return None;
616            }
617            // A side that completed without ever emitting can never be paired:
618            // no further output is possible, so draining the other (possibly
619            // infinite) side would loop forever producing nothing.
620            if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
621                return None;
622            }
623
624            let pull_left = if left_done {
625                false
626            } else if right_done {
627                true
628            } else {
629                let value = turn_left;
630                turn_left = !turn_left;
631                value
632            };
633
634            if pull_left {
635                match left.next() {
636                    Some(Ok(item)) => {
637                        left_latest = Some(item);
638                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
639                            pending.push_back(combine(left_item.clone(), right_item.clone()));
640                        }
641                    }
642                    Some(Err(error)) => return Some(Err(error)),
643                    None => {
644                        left_done = true;
645                        if eager_complete {
646                            return None;
647                        }
648                    }
649                }
650            } else {
651                match right.next() {
652                    Some(Ok(item)) => {
653                        right_latest = Some(item);
654                        if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
655                            pending.push_back(combine(left_item.clone(), right_item.clone()));
656                        }
657                    }
658                    Some(Err(error)) => return Some(Err(error)),
659                    None => {
660                        right_done = true;
661                        if eager_complete {
662                            return None;
663                        }
664                    }
665                }
666            }
667        }
668    }))
669}
670
671fn zip_all_stream<Left, Right>(
672    mut left: BoxStream<Left>,
673    mut right: BoxStream<Right>,
674    left_fill: Left,
675    right_fill: Right,
676) -> BoxStream<(Left, Right)>
677where
678    Left: Clone + Send + 'static,
679    Right: Clone + Send + 'static,
680{
681    let mut left_done = false;
682    let mut right_done = false;
683    Box::new(std::iter::from_fn(move || {
684        if left_done && right_done {
685            return None;
686        }
687
688        let left_item = if left_done {
689            None
690        } else {
691            match left.next() {
692                Some(Ok(item)) => Some(item),
693                Some(Err(error)) => return Some(Err(error)),
694                None => {
695                    left_done = true;
696                    None
697                }
698            }
699        };
700        let right_item = if right_done {
701            None
702        } else {
703            match right.next() {
704                Some(Ok(item)) => Some(item),
705                Some(Err(error)) => return Some(Err(error)),
706                None => {
707                    right_done = true;
708                    None
709                }
710            }
711        };
712
713        match (left_item, right_item) {
714            (None, None) if left_done && right_done => None,
715            (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
716            (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
717            (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
718            (None, None) => None,
719        }
720    }))
721}
722
723fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
724where
725    Out: Send + 'static,
726    Next: Send + 'static,
727    F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
728{
729    let count = streams.len();
730    if count == 0 {
731        return Box::new(std::iter::empty());
732    }
733    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
734    let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
735    let mut current = 0usize;
736    Box::new(std::iter::from_fn(move || {
737        loop {
738            if slots.iter().all(Option::is_some) {
739                let values = slots
740                    .iter_mut()
741                    .map(|slot| slot.take().expect("zip-n slot filled"))
742                    .collect();
743                return Some(Ok(zipper(values)));
744            }
745
746            let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
747            current = (index + 1) % count.max(1);
748            let Some(stream) = streams[index].as_mut() else {
749                continue;
750            };
751            match stream.next() {
752                Some(Ok(item)) => slots[index] = Some(item),
753                Some(Err(error)) => return Some(Err(error)),
754                None => {
755                    streams[index] = None;
756                    slots[index].as_ref()?;
757                }
758            }
759        }
760    }))
761}
762
763fn next_active_optional_stream<T, F>(
764    streams: &[Option<BoxStream<T>>],
765    current: usize,
766    predicate: F,
767) -> Option<usize>
768where
769    T: Send + 'static,
770    F: Fn(usize) -> bool,
771{
772    if streams.is_empty() {
773        return None;
774    }
775    for offset in 0..streams.len() {
776        let index = (current + offset) % streams.len();
777        if streams[index].is_some() && predicate(index) {
778            return Some(index);
779        }
780    }
781    None
782}
783
784fn next_weighted_stream<T>(
785    streams: &[Option<BoxStream<T>>],
786    schedule: &[usize],
787    schedule_index: &mut usize,
788) -> Option<usize>
789where
790    T: Send + 'static,
791{
792    if streams.is_empty() || schedule.is_empty() {
793        return None;
794    }
795    for _ in 0..schedule.len() {
796        let index = schedule[*schedule_index % schedule.len()];
797        *schedule_index = (*schedule_index + 1) % schedule.len();
798        if streams.get(index).is_some_and(Option::is_some) {
799            return Some(index);
800        }
801    }
802    None
803}
804
805pub(crate) mod async_boundary;
806mod completion;
807mod error;
808mod flow;
809mod rate;
810mod restart;
811mod runtime;
812mod sink;
813mod source;
814mod time;
815mod timer;
816
817/// Opaque hook on `Source<T>` for split-segment fast-path sources.
818/// Non-`None` only on `Source`s returned by the split fast path.
819/// Cleared by all composition operators (via/map/to_mat etc.).
820pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
821    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
822}
823
824/// Opaque hook on `Source<T>` for a private source-to-terminal fast path.
825///
826/// Implementations synchronously fill bounded batches on the sink worker
827/// spawned by the terminal sink descriptor. Sink descriptors process the batch
828/// after the hook returns, so user closures do not run under source locks.
829pub(crate) trait TerminalSourceHookDyn<In>: Send + Sync + 'static {
830    fn drain_terminal_batch(
831        &self,
832        materializer: &Materializer,
833        cancelled: &Arc<AtomicBool>,
834        batch: &mut Vec<In>,
835    ) -> StreamResult<TerminalSourceStatus>;
836
837    fn cancel_terminal(&self) {}
838}
839
840#[derive(Clone, Copy, Debug, PartialEq, Eq)]
841pub(crate) enum TerminalSourceStatus {
842    Active,
843    Completed,
844}
845
846/// Fast-path descriptor for fold/collect/ignore sinks in split-segment context.
847/// Non-`None` only for recognised sinks (`Sink::fold`, `fold_result`, `collect`, `ignore`).
848pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
849    /// Try to register directly with a split segment hook.
850    /// Returns `None` if the hook type is not recognised.
851    /// Returns `Some(Ok(mat))` on success where `mat` is `Box<StreamCompletion<Acc>>`.
852    fn try_register(
853        &self,
854        hook: Arc<dyn SplitSegmentHookDyn>,
855    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
856
857    fn supports_terminal_drain(&self) -> bool {
858        false
859    }
860
861    fn try_register_terminal_drain(
862        &self,
863        _hook: Arc<dyn TerminalSourceHookDyn<In>>,
864        _materializer: &Materializer,
865    ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
866        None
867    }
868}
869
870use self::runtime::{runtime_checked_stream, set_current_stream_cancelled};
871
872pub(crate) use self::completion::StreamCancellation;
873
874pub use self::{
875    completion::{Cancellable, StreamCompletion},
876    error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
877    flow::{BidiFlow, Flow},
878    rate::{AggregateTimer, OverflowStrategy},
879    restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
880    runtime::{Materializer, Runtime},
881    sink::{RunnableGraph, Sink, SinkCombineStrategy},
882    source::{Demand, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy},
883    time::{DelayOverflowStrategy, ThrottleMode},
884};
885
886#[cfg(test)]
887mod tests {
888    use super::*;
889    use crate::Attributes;
890    use crate::testkit::TestSink;
891    use std::fs;
892    use std::sync::{
893        Arc as StdArc,
894        atomic::{
895            AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
896        },
897        mpsc,
898    };
899    use std::time::Duration as StdDuration;
900    use std::time::Instant;
901
902    fn wait<T>(completion: StreamCompletion<T>) -> T {
903        completion.wait().unwrap()
904    }
905
906    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
907        let deadline = Instant::now() + timeout;
908        while Instant::now() < deadline {
909            if condition() {
910                return true;
911            }
912            thread::sleep(Duration::from_millis(2));
913        }
914        condition()
915    }
916
917    fn linux_thread_count(thread_name: &str) -> usize {
918        fs::read_dir("/proc/self/task")
919            .expect("task directory readable")
920            .filter_map(Result::ok)
921            .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
922            .filter(|name| name.trim() == thread_name)
923            .count()
924    }
925
926    #[test]
927    fn source_async_boundary_preserves_results() {
928        let expected = Source::from_iter(0_u64..128)
929            .map(|item| item.wrapping_add(1))
930            .filter(|item| item % 3 != 0)
931            .map(|item| item * 2)
932            .run_collect()
933            .unwrap();
934
935        let actual = Source::from_iter(0_u64..128)
936            .map(|item| item.wrapping_add(1))
937            .async_boundary()
938            .filter(|item| item % 3 != 0)
939            .map(|item| item * 2)
940            .run_collect()
941            .unwrap();
942
943        assert_eq!(actual, expected);
944    }
945
946    #[test]
947    fn flow_async_boundary_preserves_results() {
948        let expected = Source::from_iter(0_u64..128)
949            .map(|item| item + 1)
950            .map(|item| item * 3)
951            .run_collect()
952            .unwrap();
953
954        let flow = Flow::identity()
955            .map(|item: u64| item + 1)
956            .r#async()
957            .map(|item| item * 3);
958        let actual = Source::from_iter(0_u64..128)
959            .via(flow)
960            .run_collect()
961            .unwrap();
962
963        assert_eq!(actual, expected);
964    }
965
966    #[test]
967    fn linear_async_boundary_matches_graph_async_boundary_shape() {
968        use crate::{
969            AsyncBoundary, AsyncBoundaryExecutionConfig, FusedExecutionConfig, GraphDsl,
970            GraphFlowShape, MapStage,
971        };
972
973        let graph = GraphDsl::try_create(|builder| {
974            let first = builder.add(MapStage::new(|item: u64| item + 1));
975            let boundary = builder.add(AsyncBoundary::<u64>::new());
976            let second = builder.add(MapStage::new(|item: u64| item * 2));
977
978            builder.connect(first.outlet(), boundary.inlet())?;
979            builder.connect(boundary.outlet(), second.inlet())?;
980
981            Ok(GraphFlowShape::new(first.inlet(), second.outlet()))
982        })
983        .unwrap();
984
985        let linear = Source::from_iter(1_u64..=4)
986            .map(|item| item + 1)
987            .async_boundary_with_buffer(4)
988            .map(|item| item * 2)
989            .run_collect()
990            .unwrap();
991        let graph_output = graph.run_with_input(1_u64..=4).unwrap();
992        let report = graph
993            .run_async_boundary_count_with_input_report(
994                1_u64..=4,
995                AsyncBoundaryExecutionConfig {
996                    fused: FusedExecutionConfig { event_limit: 1024 },
997                    buffer_size: 4,
998                },
999            )
1000            .unwrap();
1001
1002        assert_eq!(linear, graph_output);
1003        assert_eq!(report.result, linear.len());
1004        assert_eq!(report.async_boundary_crossings, linear.len());
1005    }
1006
1007    #[test]
1008    fn async_boundary_regions_run_concurrently() {
1009        let (upstream_tx, upstream_rx) = mpsc::channel::<u64>();
1010        let (downstream_blocked_tx, downstream_blocked_rx) = mpsc::channel::<()>();
1011        let (release_tx, release_rx) = mpsc::channel::<()>();
1012        let release_rx = StdArc::new(Mutex::new(release_rx));
1013
1014        let completion = Source::from_iter(0_u64..3)
1015            .map(move |item| {
1016                upstream_tx.send(item).expect("upstream probe receives");
1017                item
1018            })
1019            .async_boundary_with_buffer(1)
1020            .map({
1021                let release_rx = StdArc::clone(&release_rx);
1022                move |item| {
1023                    if item == 0 {
1024                        downstream_blocked_tx
1025                            .send(())
1026                            .expect("downstream probe receives");
1027                        release_rx
1028                            .lock()
1029                            .expect("release receiver lock")
1030                            .recv_timeout(StdDuration::from_secs(2))
1031                            .expect("downstream release arrives");
1032                    }
1033                    item
1034                }
1035            })
1036            .run_with(Sink::collect())
1037            .unwrap();
1038
1039        assert_eq!(
1040            downstream_blocked_rx.recv_timeout(StdDuration::from_secs(2)),
1041            Ok(())
1042        );
1043        assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1044        assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1045
1046        release_tx.send(()).expect("release downstream");
1047        assert_eq!(completion.wait().unwrap(), vec![0, 1, 2]);
1048    }
1049
1050    #[test]
1051    fn async_boundary_backpressures_slow_downstream() {
1052        let (produced_tx, produced_rx) = mpsc::channel::<u64>();
1053        let (release_tx, release_rx) = mpsc::channel::<()>();
1054        let release_rx = StdArc::new(Mutex::new(release_rx));
1055
1056        let completion = Source::from_iter(0_u64..8)
1057            .map(move |item| {
1058                produced_tx.send(item).expect("producer probe receives");
1059                item
1060            })
1061            .async_boundary_with_buffer(1)
1062            .map({
1063                let release_rx = StdArc::clone(&release_rx);
1064                move |item| {
1065                    if item == 0 {
1066                        release_rx
1067                            .lock()
1068                            .expect("release receiver lock")
1069                            .recv_timeout(StdDuration::from_secs(2))
1070                            .expect("downstream release arrives");
1071                    }
1072                    item
1073                }
1074            })
1075            .run_with(Sink::collect())
1076            .unwrap();
1077
1078        assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1079        assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1080        if let Ok(item) = produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1081            assert_eq!(item, 2);
1082        }
1083        match produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1084            Err(mpsc::RecvTimeoutError::Timeout) => {}
1085            other => panic!("async boundary handoff was not bounded: {other:?}"),
1086        }
1087
1088        release_tx.send(()).expect("release downstream");
1089        assert_eq!(completion.wait().unwrap(), (0_u64..8).collect::<Vec<_>>());
1090    }
1091
1092    #[test]
1093    fn source_blueprints_are_reusable() {
1094        let source = Source::from_iter(0..5).map(|item| item + 1);
1095
1096        assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1097        assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1098    }
1099
1100    #[test]
1101    fn source_map_preserves_materialized_value() {
1102        let graph = Source::single(1)
1103            .map_materialized_value(|_| "source")
1104            .map(|item| item + 1)
1105            .to_mat(Sink::head(), Keep::both);
1106
1107        let materialized = graph.run().unwrap();
1108        assert_eq!(materialized.0, "source");
1109        assert_eq!(wait(materialized.1), 2);
1110    }
1111
1112    #[test]
1113    fn source_and_flow_compose() {
1114        let flow = Flow::identity()
1115            .map(|item: i32| item * 2)
1116            .filter(|item| item % 3 == 0);
1117
1118        let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
1119
1120        assert_eq!(result, vec![0, 6, 12]);
1121    }
1122
1123    #[test]
1124    fn sink_setup_sees_materializer_defaults_and_local_attributes() {
1125        let observed = StdArc::new(Mutex::new(None));
1126        let observed_in_setup = StdArc::clone(&observed);
1127        let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
1128            *observed_in_setup.lock().unwrap() = Some((
1129                attrs.name().map(str::to_owned),
1130                attrs.input_buffer_hint(),
1131                attrs.dispatcher_hint().map(str::to_owned),
1132            ));
1133            Sink::ignore()
1134        })
1135        .add_attributes(Attributes::named("sink-inner"))
1136        .add_attributes(Attributes::input_buffer(4, 4))
1137        .add_attributes(Attributes::dispatcher("bench-dispatcher"));
1138
1139        let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
1140        wait(
1141            Source::from_iter([1, 2, 3])
1142                .run_with_materializer(sink, &materializer)
1143                .unwrap(),
1144        );
1145
1146        assert_eq!(
1147            *observed.lock().unwrap(),
1148            Some((
1149                Some("sink-inner".to_owned()),
1150                Some((4, 4)),
1151                Some("bench-dispatcher".to_owned())
1152            ))
1153        );
1154    }
1155
1156    #[test]
1157    fn sink_pre_materialize_feeds_existing_materialization() {
1158        let materializer = Materializer::new();
1159        let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
1160            .pre_materialize(&materializer)
1161            .unwrap();
1162
1163        Source::from_iter([1, 2, 3])
1164            .run_with_materializer(pre, &materializer)
1165            .unwrap();
1166
1167        assert_eq!(wait(completion), vec![1, 2, 3]);
1168    }
1169
1170    #[test]
1171    fn flow_from_sink_and_source_connects_both_sides() {
1172        assert_eq!(
1173            Source::from_iter([1, 2, 3])
1174                .via(Flow::from_sink_and_source(
1175                    Sink::foreach(|_item: i32| {}),
1176                    Source::from_iter([10, 20, 30]),
1177                ))
1178                .run_collect()
1179                .unwrap(),
1180            vec![10, 20, 30]
1181        );
1182    }
1183
1184    #[test]
1185    fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
1186        let completed = StdArc::new(StdAtomicBool::new(false));
1187        let on_complete = StdArc::clone(&completed);
1188        let flow = Flow::from_sink_and_source(
1189            Sink::on_complete(move || {
1190                on_complete.store(true, StdOrdering::SeqCst);
1191            }),
1192            Source::single(10),
1193        );
1194
1195        let result = Source::from_iter([1, 2, 3])
1196            .via(flow)
1197            .run_collect()
1198            .unwrap();
1199
1200        assert_eq!(result, vec![10]);
1201        assert!(wait_until(StdDuration::from_secs(1), || {
1202            completed.load(StdOrdering::SeqCst)
1203        }));
1204    }
1205
1206    #[test]
1207    fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
1208        let cancellable = StdArc::new(Mutex::new(None));
1209        let observed = StdArc::clone(&cancellable);
1210        let flow = Flow::from_sink_and_source_coupled(
1211            Sink::ignore(),
1212            Source::tick(
1213                StdDuration::from_millis(50),
1214                StdDuration::from_millis(50),
1215                10,
1216            )
1217            .map_materialized_value(move |handle| {
1218                *observed.lock().unwrap() = Some(handle.clone());
1219                handle
1220            }),
1221        );
1222
1223        let completion = Source::from_iter(std::iter::empty::<i32>())
1224            .via(flow)
1225            .run_with(Sink::ignore())
1226            .unwrap();
1227        assert!(wait_until(StdDuration::from_secs(1), || {
1228            cancellable
1229                .lock()
1230                .unwrap()
1231                .as_ref()
1232                .is_some_and(Cancellable::is_cancelled)
1233        }));
1234        assert_eq!(wait(completion), NotUsed);
1235    }
1236
1237    #[test]
1238    fn bidi_flow_join_and_atop_compose() {
1239        let codec = BidiFlow::from_flows(
1240            Flow::identity().map(|item: i32| item + 1),
1241            Flow::identity().map(|item: i32| item * 2),
1242        )
1243        .named("codec");
1244        let framing = BidiFlow::from_flows(
1245            Flow::identity().map(|item: i32| item * 3),
1246            Flow::identity().map(|item: i32| item - 4),
1247        );
1248
1249        let joined = codec
1250            .clone()
1251            .join(Flow::identity().map(|item: i32| item - 5));
1252        let stacked = codec.atop(framing).join(Flow::identity());
1253
1254        assert_eq!(
1255            Source::single(10).via(joined).run_collect().unwrap(),
1256            vec![12]
1257        );
1258        assert_eq!(
1259            Source::single(10).via(stacked).run_collect().unwrap(),
1260            vec![58]
1261        );
1262    }
1263
1264    #[test]
1265    fn flow_buffer_then_map_runs_end_to_end() {
1266        let flow = Flow::identity()
1267            .buffer(8, OverflowStrategy::Backpressure)
1268            .map(|item: i32| item + 1);
1269
1270        let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1271
1272        assert_eq!(result, vec![1, 2, 3, 4]);
1273    }
1274
1275    #[test]
1276    fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1277        fn buffered_flow() -> Flow<i32, i32> {
1278            Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1279        }
1280
1281        assert_eq!(
1282            Source::from_iter(0..4)
1283                .via(buffered_flow().filter(|item| *item % 2 == 0))
1284                .run_collect()
1285                .unwrap(),
1286            vec![0, 2]
1287        );
1288        assert_eq!(
1289            Source::from_iter(0..4)
1290                .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1291                .run_collect()
1292                .unwrap(),
1293            vec![1, 3]
1294        );
1295        assert_eq!(
1296            Source::from_iter(0..4)
1297                .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1298                .run_collect()
1299                .unwrap(),
1300            vec![10, 12]
1301        );
1302        assert_eq!(
1303            Source::from_iter(0..3)
1304                .via(buffered_flow().map_concat(|item| [item, item + 10]))
1305                .run_collect()
1306                .unwrap(),
1307            vec![0, 10, 1, 11, 2, 12]
1308        );
1309        assert_eq!(
1310            Source::from_iter(0..3)
1311                .via(buffered_flow().stateful_map(5, |state, item| {
1312                    *state += item;
1313                    *state
1314                }))
1315                .run_collect()
1316                .unwrap(),
1317            vec![5, 6, 8]
1318        );
1319        assert_eq!(
1320            Source::from_iter(0..3)
1321                .via(buffered_flow().stateful_map_concat(0, |state, item| {
1322                    *state += item;
1323                    [*state, item]
1324                }))
1325                .run_collect()
1326                .unwrap(),
1327            vec![0, 0, 1, 1, 3, 2]
1328        );
1329        assert_eq!(
1330            Source::from_iter(0..4)
1331                .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1332                .run_collect()
1333                .unwrap(),
1334            vec![1, 2, 3, 4]
1335        );
1336        assert_eq!(
1337            Source::from_iter(0..4)
1338                .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1339                .run_collect()
1340                .unwrap(),
1341            vec![1, 2, 3, 4]
1342        );
1343        assert_eq!(
1344            Source::from_iter(0..4)
1345                .via(buffered_flow().map_async_partitioned(
1346                    2,
1347                    1,
1348                    |item| item % 2,
1349                    |item| async move { Ok(item + 1) },
1350                ))
1351                .run_collect()
1352                .unwrap(),
1353            vec![1, 2, 3, 4]
1354        );
1355        assert_eq!(
1356            Source::from_iter(0..5)
1357                .via(buffered_flow().take(3))
1358                .run_collect()
1359                .unwrap(),
1360            vec![0, 1, 2]
1361        );
1362        assert_eq!(
1363            Source::from_iter(0..5)
1364                .via(buffered_flow().drop(2))
1365                .run_collect()
1366                .unwrap(),
1367            vec![2, 3, 4]
1368        );
1369        assert_eq!(
1370            Source::from_iter(0..5)
1371                .via(buffered_flow().take_while(|item| *item < 3))
1372                .run_collect()
1373                .unwrap(),
1374            vec![0, 1, 2]
1375        );
1376        assert_eq!(
1377            Source::from_iter(0..5)
1378                .via(buffered_flow().drop_while(|item| *item < 3))
1379                .run_collect()
1380                .unwrap(),
1381            vec![3, 4]
1382        );
1383        assert_eq!(
1384            Source::from_iter(0..3)
1385                .via(buffered_flow().limit(5))
1386                .run_collect()
1387                .unwrap(),
1388            vec![0, 1, 2]
1389        );
1390        assert_eq!(
1391            Source::from_iter(0..5)
1392                .via(buffered_flow().grouped(2))
1393                .run_collect()
1394                .unwrap(),
1395            vec![vec![0, 1], vec![2, 3], vec![4]]
1396        );
1397        assert_eq!(
1398            Source::from_iter(1..=3)
1399                .via(buffered_flow().scan(0, |acc, item| acc + item))
1400                .run_collect()
1401                .unwrap(),
1402            vec![0, 1, 3, 6]
1403        );
1404        assert_eq!(
1405            Source::from_iter(1..=4)
1406                .via(buffered_flow().sliding(2, 1))
1407                .run_collect()
1408                .unwrap(),
1409            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1410        );
1411        assert_eq!(
1412            Source::from_iter(1..=4)
1413                .via(buffered_flow().fold(0, |acc, item| acc + item))
1414                .run_collect()
1415                .unwrap(),
1416            vec![10]
1417        );
1418        assert_eq!(
1419            Source::from_iter(1..=4)
1420                .via(buffered_flow().reduce(|acc, item| acc + item))
1421                .run_collect()
1422                .unwrap(),
1423            vec![10]
1424        );
1425        assert_eq!(
1426            Source::from_factory(|| {
1427                Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1428            })
1429            .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1430            .run_collect(),
1431            Err(StreamError::Failed("mapped".into()))
1432        );
1433        assert_eq!(
1434            Source::<i32>::failed(StreamError::Failed("boom".into()))
1435                .via(buffered_flow().recover(|_| Some(42)))
1436                .run_collect()
1437                .unwrap(),
1438            vec![42]
1439        );
1440        assert_eq!(
1441            Source::<i32>::failed(StreamError::Failed("boom".into()))
1442                .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1443                .run_collect()
1444                .unwrap(),
1445            vec![7, 8]
1446        );
1447        assert_eq!(
1448            Source::<i32>::failed(StreamError::Failed("boom".into()))
1449                .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1450                .run_collect()
1451                .unwrap(),
1452            vec![9]
1453        );
1454        assert_eq!(
1455            Source::from_factory(|| {
1456                Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1457            })
1458            .via(buffered_flow().on_error_complete())
1459            .run_collect()
1460            .unwrap(),
1461            vec![1]
1462        );
1463
1464        let materialized = Source::from_iter([1, 2, 3])
1465            .run_with(
1466                buffered_flow()
1467                    .via(Flow::identity().map(|item| item + 1))
1468                    .map_materialized_value(|_| "buffered-flow")
1469                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1470            )
1471            .unwrap();
1472        assert_eq!(materialized.0, "buffered-flow");
1473        assert_eq!(wait(materialized.1), 9);
1474
1475        let kept = Source::from_iter([1, 2, 3])
1476            .run_with(
1477                buffered_flow()
1478                    .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1479                    .to(Sink::fold(0, |acc, item| acc + item)),
1480            )
1481            .unwrap();
1482        assert_eq!(kept, "combined");
1483    }
1484
1485    #[test]
1486    fn runtime_rate_flows_compose_in_flow_form() {
1487        let conflate = Flow::identity()
1488            .conflate(|left: i32, right| left + right)
1489            .map(|item| item + 1);
1490        assert_eq!(
1491            Source::single(4).via(conflate).run_collect().unwrap(),
1492            vec![5]
1493        );
1494
1495        let batch = Flow::identity()
1496            .batch(4, |item: i32| item, |left, right| left + right)
1497            .map(|item| item + 1);
1498        assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1499
1500        let expand = Flow::identity()
1501            .expand(std::iter::once::<i32>)
1502            .map(|item| item + 1);
1503        assert_eq!(
1504            Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1505            vec![1, 2, 3, 4]
1506        );
1507
1508        let aggregate = Flow::identity()
1509            .aggregate_with_boundary(
1510                Vec::<i32>::new,
1511                |mut items, item| {
1512                    items.push(item);
1513                    let ready = !items.is_empty();
1514                    (items, ready)
1515                },
1516                |items| items.into_iter().sum::<i32>(),
1517                None,
1518            )
1519            .map(|item| item + 1);
1520        assert_eq!(
1521            Source::from_iter(0..4)
1522                .via(aggregate)
1523                .run_collect()
1524                .unwrap(),
1525            vec![1, 2, 3, 4]
1526        );
1527
1528        let detached = Flow::identity().detach().map(|item: i32| item + 1);
1529        assert_eq!(
1530            Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1531            vec![1, 2, 3, 4]
1532        );
1533    }
1534
1535    #[test]
1536    fn high_use_source_flow_operators_work() {
1537        let result = Source::from_iter(0..8)
1538            .drop(1)
1539            .take(5)
1540            .filter_not(|item| item % 2 == 0)
1541            .map_concat(|item| [item, item + 10])
1542            .grouped(3)
1543            .run_collect()
1544            .unwrap();
1545
1546        assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1547    }
1548
1549    #[test]
1550    fn prefix_and_tail_emits_prefix_and_live_tail() {
1551        let mut outer = Source::from_iter(0..5)
1552            .prefix_and_tail(2)
1553            .run_collect()
1554            .unwrap();
1555        assert_eq!(outer.len(), 1);
1556        let (prefix, tail) = outer.pop().unwrap();
1557        assert_eq!(prefix, vec![0, 1]);
1558        assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1559        assert_eq!(
1560            tail.run_collect(),
1561            Err(StreamError::Failed(
1562                "substream source cannot be materialized more than once".into()
1563            ))
1564        );
1565    }
1566
1567    #[test]
1568    fn prefix_and_tail_fails_before_prefix_is_ready() {
1569        let result = Source::from_factory(|| {
1570            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1571        })
1572        .prefix_and_tail(2)
1573        .run_collect();
1574        assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1575    }
1576
1577    #[test]
1578    fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1579        let mut outer = Source::from_factory(|| {
1580            Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1581        })
1582        .prefix_and_tail(2)
1583        .run_collect()
1584        .unwrap();
1585        let (prefix, tail) = outer.pop().unwrap();
1586        assert_eq!(prefix, vec![1, 2]);
1587        assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1588    }
1589
1590    #[test]
1591    fn prefix_and_tail_accepts_non_clone_elements() {
1592        #[derive(Debug, PartialEq, Eq)]
1593        struct NonClone(u8);
1594
1595        let mut outer = Source::from_factory(|| {
1596            Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1597        })
1598        .prefix_and_tail(2)
1599        .run_collect()
1600        .unwrap();
1601        let (prefix, tail) = outer.pop().unwrap();
1602        assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1603        assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1604    }
1605
1606    #[test]
1607    fn flat_map_prefix_materializes_on_short_upstream_completion() {
1608        let values = Source::from_iter([1, 2])
1609            .flat_map_prefix(3, |prefix| {
1610                let sum = prefix.into_iter().sum::<i32>();
1611                Flow::identity().prepend(Source::single(sum))
1612            })
1613            .run_collect()
1614            .unwrap();
1615        assert_eq!(values, vec![3]);
1616    }
1617
1618    #[test]
1619    fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1620        let invoked = StdArc::new(StdAtomicBool::new(false));
1621        let invoked_for_stage = StdArc::clone(&invoked);
1622        let result = Source::from_factory(|| {
1623            Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1624        })
1625        .flat_map_prefix(3, move |_prefix| {
1626            invoked_for_stage.store(true, StdOrdering::SeqCst);
1627            Flow::identity()
1628        })
1629        .run_collect();
1630        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1631        assert!(!invoked.load(StdOrdering::SeqCst));
1632    }
1633
1634    #[test]
1635    fn flat_map_concat_flattens_nested_sources_sequentially() {
1636        let values = Source::from_iter([1, 2, 3])
1637            .flat_map_concat(|item| Source::from_iter(0..item))
1638            .run_collect()
1639            .unwrap();
1640        assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1641    }
1642
1643    #[test]
1644    fn flat_map_merge_respects_breadth_bound() {
1645        let active = StdArc::new(StdAtomicUsize::new(0));
1646        let max_active = StdArc::new(StdAtomicUsize::new(0));
1647        let active_for_stage = StdArc::clone(&active);
1648        let max_for_stage = StdArc::clone(&max_active);
1649
1650        let mut values = Source::from_iter(0..6)
1651            .flat_map_merge(2, move |item| {
1652                let active = StdArc::clone(&active_for_stage);
1653                let max_active = StdArc::clone(&max_for_stage);
1654                Source::future(move || {
1655                    let active = StdArc::clone(&active);
1656                    let max_active = StdArc::clone(&max_active);
1657                    async move {
1658                        let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1659                        loop {
1660                            let seen = max_active.load(StdOrdering::SeqCst);
1661                            if now <= seen {
1662                                break;
1663                            }
1664                            if max_active
1665                                .compare_exchange(
1666                                    seen,
1667                                    now,
1668                                    StdOrdering::SeqCst,
1669                                    StdOrdering::SeqCst,
1670                                )
1671                                .is_ok()
1672                            {
1673                                break;
1674                            }
1675                        }
1676                        thread::sleep(StdDuration::from_millis(20));
1677                        active.fetch_sub(1, StdOrdering::SeqCst);
1678                        Ok(item)
1679                    }
1680                })
1681            })
1682            .run_collect()
1683            .unwrap();
1684        values.sort_unstable();
1685        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1686        assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1687    }
1688
1689    #[test]
1690    fn flat_map_merge_propagates_inner_failures() {
1691        let result = Source::from_iter([0, 1, 2])
1692            .flat_map_merge(2, |item| {
1693                if item == 1 {
1694                    Source::failed(StreamError::Failed("boom".into()))
1695                } else {
1696                    Source::single(item)
1697                }
1698            })
1699            .run_collect();
1700        assert_eq!(result, Err(StreamError::Failed("boom".into())));
1701    }
1702
1703    #[test]
1704    fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1705        let (release_tx, release_rx) = mpsc::channel();
1706        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1707        let queue = Source::from_factory(move || {
1708            let release_rx = StdArc::clone(&release_rx);
1709            let mut step = 0_u8;
1710            Box::new(std::iter::from_fn(move || {
1711                let item = match step {
1712                    0 => Some(Ok(0)),
1713                    1 => {
1714                        release_rx
1715                            .lock()
1716                            .unwrap()
1717                            .as_ref()
1718                            .expect("release receiver available")
1719                            .recv_timeout(StdDuration::from_secs(1))
1720                            .expect("timed out waiting to release second upstream element");
1721                        Some(Ok(1))
1722                    }
1723                    _ => None,
1724                };
1725                step += 1;
1726                item
1727            }))
1728        })
1729        .flat_map_merge(2, |item| Source::single(item + 10))
1730        .run_with(Sink::queue())
1731        .unwrap();
1732
1733        assert_eq!(queue.pull().unwrap(), Some(10));
1734        release_tx.send(()).unwrap();
1735        assert_eq!(queue.pull().unwrap(), Some(11));
1736        assert!(queue.pull().unwrap().is_none());
1737    }
1738
1739    #[test]
1740    fn group_by_routes_keys_and_drops_closed_keys() {
1741        let outer = Source::from_iter([0, 1, 2, 3, 4])
1742            .group_by(4, |item| item % 2, false)
1743            .run_with(Sink::queue())
1744            .unwrap();
1745
1746        let even = outer.pull().unwrap().unwrap();
1747        let even_completion = even.run_with(Sink::ignore()).unwrap();
1748        let odd = outer.pull().unwrap().unwrap();
1749        drop(even_completion);
1750
1751        assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1752        assert!(outer.pull().unwrap().is_none());
1753    }
1754
1755    #[test]
1756    fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1757        let outer = Source::from_iter([0, 1, 2])
1758            .group_by(2, |item| *item, false)
1759            .run_with(Sink::queue())
1760            .unwrap();
1761
1762        let _ = outer.pull().unwrap().unwrap();
1763        let _ = outer.pull().unwrap().unwrap();
1764        assert!(matches!(
1765            outer.pull(),
1766            Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1767        ));
1768    }
1769
1770    #[test]
1771    fn group_by_can_recreate_closed_substreams_when_enabled() {
1772        let (release_tx, release_rx) = mpsc::channel();
1773        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1774        let outer = Source::from_factory(move || {
1775            let release_rx = StdArc::clone(&release_rx);
1776            let mut step = 0_u8;
1777            Box::new(std::iter::from_fn(move || {
1778                let item = match step {
1779                    0 => Some(Ok(0)),
1780                    1 => Some(Ok(1)),
1781                    2 => {
1782                        release_rx
1783                            .lock()
1784                            .unwrap()
1785                            .as_ref()
1786                            .expect("release receiver available")
1787                            .recv_timeout(StdDuration::from_secs(1))
1788                            .expect("timed out waiting to release recreated key");
1789                        Some(Ok(0))
1790                    }
1791                    _ => None,
1792                };
1793                step += 1;
1794                item
1795            }))
1796        })
1797        .group_by(4, |item| item % 2, true)
1798        .run_with(Sink::queue())
1799        .unwrap();
1800
1801        let even = outer.pull().unwrap().unwrap();
1802        assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1803        release_tx.send(()).unwrap();
1804
1805        let odd = outer.pull().unwrap().unwrap();
1806        assert_eq!(odd.run_collect().unwrap(), vec![1]);
1807
1808        let recreated_even = outer.pull().unwrap().unwrap();
1809        assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1810        assert!(outer.pull().unwrap().is_none());
1811    }
1812
1813    #[test]
1814    fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1815        let outer = Source::from_iter([0, 1])
1816            .group_by(
1817                4,
1818                |item| {
1819                    assert_ne!(*item, 1, "boom");
1820                    item % 2
1821                },
1822                false,
1823            )
1824            .run_with(Sink::queue())
1825            .unwrap();
1826
1827        let substream = outer.pull().unwrap().unwrap();
1828        let (result_tx, result_rx) = mpsc::channel();
1829        thread::spawn(move || {
1830            let _ = result_tx.send(substream.run_collect());
1831        });
1832
1833        assert_eq!(
1834            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1835            Err(StreamError::AbruptTermination)
1836        );
1837        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1838    }
1839
1840    #[test]
1841    fn split_when_starts_new_substream_on_boundary_element() {
1842        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1843            .split_when(|item| *item == 0)
1844            .run_with(Sink::queue())
1845            .unwrap();
1846
1847        let first = outer.pull().unwrap().unwrap();
1848        assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1849        let second = outer.pull().unwrap().unwrap();
1850        assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1851        let third = outer.pull().unwrap().unwrap();
1852        assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1853        assert!(outer.pull().unwrap().is_none());
1854    }
1855
1856    #[test]
1857    fn split_after_ends_current_substream_on_boundary_element() {
1858        let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1859            .split_after(|item| *item == 0)
1860            .run_with(Sink::queue())
1861            .unwrap();
1862
1863        let first = outer.pull().unwrap().unwrap();
1864        assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1865        let second = outer.pull().unwrap().unwrap();
1866        assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1867        let third = outer.pull().unwrap().unwrap();
1868        assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1869        assert!(outer.pull().unwrap().is_none());
1870    }
1871
1872    #[test]
1873    fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1874        let outer = Source::from_iter([1, 2])
1875            .split_when(|item| {
1876                assert_ne!(*item, 2, "boom");
1877                false
1878            })
1879            .run_with(Sink::queue())
1880            .unwrap();
1881
1882        let substream = outer.pull().unwrap().unwrap();
1883        let (result_tx, result_rx) = mpsc::channel();
1884        thread::spawn(move || {
1885            let _ = result_tx.send(substream.run_collect());
1886        });
1887
1888        assert_eq!(
1889            result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1890            Err(StreamError::AbruptTermination)
1891        );
1892        assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1893    }
1894
1895    #[test]
1896    fn split_when_pre_buffer_segments_match_expected_count() {
1897        let outer = Source::from_iter(0..100)
1898            .split_when(|item| *item != 0 && *item % 10 == 0)
1899            .run_with(Sink::queue())
1900            .unwrap();
1901        let mut segment_count = 0;
1902        while let Some(substream) = outer.pull().unwrap() {
1903            let items: Vec<i32> = substream.run_collect().unwrap();
1904            assert!(!items.is_empty(), "segment should not be empty");
1905            segment_count += 1;
1906        }
1907        assert_eq!(segment_count, 10, "100 elements in segments of 10");
1908    }
1909
1910    #[test]
1911    fn split_after_pre_buffer_segments_match_expected_count() {
1912        let outer = Source::from_iter(0..100)
1913            .split_after(|item| (*item + 1) % 10 == 0)
1914            .run_with(Sink::queue())
1915            .unwrap();
1916        let mut segment_count = 0;
1917        let mut total = 0_i32;
1918        while let Some(substream) = outer.pull().unwrap() {
1919            let items: Vec<i32> = substream.run_collect().unwrap();
1920            assert!(!items.is_empty(), "segment should not be empty");
1921            total += items.len() as i32;
1922            segment_count += 1;
1923        }
1924        assert_eq!(segment_count, 10);
1925        assert_eq!(total, 100);
1926    }
1927
1928    #[test]
1929    fn group_by_single_key_fused_matches_general_path() {
1930        let outer = Source::from_iter(0..1000i64)
1931            .group_by(1, |_| 0u8, false)
1932            .run_with(Sink::queue())
1933            .unwrap();
1934        let substream = outer.pull().unwrap().unwrap();
1935        let items: Vec<i64> = substream.run_collect().unwrap();
1936        assert_eq!(items.len(), 1000);
1937        assert_eq!(items[0], 0);
1938        assert_eq!(items[999], 999);
1939        assert!(outer.pull().unwrap().is_none());
1940    }
1941
1942    #[test]
1943    fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
1944        let outer = Source::from_iter([0, 1, 0])
1945            .group_by(2, |item| *item, false)
1946            .run_with(Sink::queue())
1947            .unwrap();
1948        let mut sources = vec![];
1949        while let Some(source) = outer.pull().unwrap() {
1950            sources.push(source);
1951        }
1952        assert_eq!(sources.len(), 2);
1953        assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
1954        assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
1955    }
1956
1957    #[test]
1958    fn flat_map_merge_lock_lighter_matches_expected_count() {
1959        let items = Source::from_iter(0..20)
1960            .flat_map_merge(2, |item| Source::single(item + 100))
1961            .run_with(Sink::queue())
1962            .unwrap();
1963        let mut count = 0;
1964        while items.pull().unwrap().is_some() {
1965            count += 1;
1966        }
1967        assert_eq!(count, 20);
1968    }
1969
1970    // Verify that group_by emits the substream BEFORE upstream completes —
1971    // i.e., the worker does not buffer all elements before delivery.
1972    //
1973    // The test uses a rendezvous channel to synchronize: the group_by worker
1974    // will block waiting for more items (the channel is empty after item 0),
1975    // so if the outer substream is NOT emitted before item 1 arrives, the
1976    // `outer.pull()` call in a separate thread cannot return during that window.
1977    // With the old "single_key_buf" batching, pull() would block until a
1978    // key-change or EOF — which is a liveness bug for infinite or slow sources.
1979    #[test]
1980    fn group_by_single_key_emits_substream_before_upstream_completes() {
1981        // sync_channel(0): rendezvous — sender blocks until receiver is ready,
1982        // guaranteeing the worker processes exactly one item then waits.
1983        let (tx, rx) = mpsc::sync_channel::<i32>(0);
1984        let rx = StdArc::new(std::sync::Mutex::new(rx));
1985
1986        let outer = Source::from_factory({
1987            let rx = StdArc::clone(&rx);
1988            move || {
1989                let rx = StdArc::clone(&rx);
1990                Box::new(std::iter::from_fn(move || {
1991                    rx.lock().unwrap().recv().ok().map(Ok)
1992                })) as BoxStream<i32>
1993            }
1994        })
1995        .group_by(1, |_| 0u8, false)
1996        .run_with(Sink::queue())
1997        .unwrap();
1998
1999        // Kick off the producer: sends item 0, then waits for the outer
2000        // substream to be pulled (via a second channel) before sending more.
2001        let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
2002        let outer_thread = thread::spawn(move || {
2003            let substream = outer.pull().unwrap().expect("expected a substream");
2004            sub_tx.send(substream).unwrap();
2005        });
2006
2007        // Send the first item — the worker processes it and (with the fix)
2008        // immediately emits the substream to outer.
2009        tx.send(0).unwrap();
2010
2011        // The outer pull must complete within the timeout.
2012        let substream = sub_rx
2013            .recv_timeout(StdDuration::from_secs(5))
2014            .expect("timed out — group_by buffered first element before emitting substream");
2015
2016        // Now deliver the remaining items and close.
2017        for i in 1..100_i32 {
2018            tx.send(i).unwrap();
2019        }
2020        drop(tx);
2021
2022        let items: Vec<i32> = substream.run_collect().unwrap();
2023        assert_eq!(items.len(), 100);
2024        outer_thread.join().unwrap();
2025    }
2026
2027    #[test]
2028    fn group_by_concurrent_live_substreams_do_not_hold_ready_item_stress() {
2029        const STREAMS: usize = 32;
2030        const ROUNDS: usize = 8;
2031        const ITEMS: i64 = 8;
2032
2033        for _ in 0..ROUNDS {
2034            let barrier = StdArc::new(std::sync::Barrier::new(STREAMS));
2035            let mut handles = Vec::with_capacity(STREAMS);
2036
2037            for _ in 0..STREAMS {
2038                let barrier = StdArc::clone(&barrier);
2039                handles.push(thread::spawn(move || {
2040                    let (tx, rx) = mpsc::sync_channel::<i64>(0);
2041                    let rx = StdArc::new(std::sync::Mutex::new(rx));
2042
2043                    let outer = Source::from_factory({
2044                        let rx = StdArc::clone(&rx);
2045                        move || {
2046                            let rx = StdArc::clone(&rx);
2047                            Box::new(std::iter::from_fn(move || {
2048                                rx.lock().unwrap().recv().ok().map(Ok)
2049                            })) as BoxStream<i64>
2050                        }
2051                    })
2052                    .group_by(1, |_| 0_u8, false)
2053                    .run_with(Sink::queue())
2054                    .unwrap();
2055
2056                    barrier.wait();
2057
2058                    tx.send(0).unwrap();
2059                    let substream = outer.pull().unwrap().expect("expected group_by substream");
2060                    let subqueue = substream.run_with(Sink::queue()).unwrap();
2061                    assert_eq!(subqueue.pull().unwrap(), Some(0));
2062
2063                    for item in 1..ITEMS {
2064                        tx.send(item).unwrap();
2065                        assert_eq!(subqueue.pull().unwrap(), Some(item));
2066                    }
2067                    drop(tx);
2068
2069                    assert!(subqueue.pull().unwrap().is_none());
2070                    assert!(outer.pull().unwrap().is_none());
2071                }));
2072            }
2073
2074            for handle in handles {
2075                handle.join().expect("group_by stress worker panicked");
2076            }
2077        }
2078    }
2079
2080    // Verify that split_when emits each substream as a live stream, NOT after
2081    // accumulating all segment elements into memory.  The channel is sized to
2082    // be larger than LIVE_SUBSTREAM_CAPACITY (256) so that, if the old Vec
2083    // pre-buffering path were active, the first substream would never be emitted
2084    // before the boundary.  With live substreams the substream is visible
2085    // immediately upon the first item.
2086    #[test]
2087    fn split_when_emits_substream_before_segment_ends() {
2088        // 512 > LIVE_SUBSTREAM_CAPACITY (256): the live substream will block
2089        // on backpressure mid-segment, but that's fine — the outer substream
2090        // IS emitted immediately and the consumer can drain it concurrently.
2091        const SEGMENT_LEN: usize = 300;
2092
2093        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2094        for i in 0..SEGMENT_LEN as i32 {
2095            tx.send(i).unwrap();
2096        }
2097        tx.send(-1).unwrap(); // boundary
2098        tx.send(99).unwrap(); // second segment
2099        drop(tx);
2100
2101        let outer = Source::from_iter(rx)
2102            .split_when(|item| *item == -1)
2103            .run_with(Sink::queue())
2104            .unwrap();
2105
2106        let (result_tx, result_rx) = mpsc::channel();
2107        thread::spawn(move || {
2108            let first = outer.pull().unwrap().expect("expected first substream");
2109            let items: Vec<i32> = first.run_collect().unwrap();
2110            let second = outer.pull().unwrap().expect("expected second substream");
2111            let items2: Vec<i32> = second.run_collect().unwrap();
2112            let done = outer.pull().unwrap().is_none();
2113            let _ = result_tx.send((items, items2, done));
2114        });
2115
2116        let (items, items2, done) = result_rx
2117            .recv_timeout(StdDuration::from_secs(5))
2118            .expect("timed out — split_when is buffering the whole segment");
2119        assert_eq!(items.len(), SEGMENT_LEN);
2120        assert_eq!(items2, vec![-1, 99]);
2121        assert!(done);
2122    }
2123
2124    #[test]
2125    fn split_after_emits_substream_before_segment_ends() {
2126        const SEGMENT_LEN: usize = 300;
2127
2128        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2129        for i in 0..SEGMENT_LEN as i32 {
2130            tx.send(i).unwrap();
2131        }
2132        tx.send(-1).unwrap(); // boundary (included in first segment for split_after)
2133        tx.send(99).unwrap();
2134        drop(tx);
2135
2136        let outer = Source::from_iter(rx)
2137            .split_after(|item| *item == -1)
2138            .run_with(Sink::queue())
2139            .unwrap();
2140
2141        let (result_tx, result_rx) = mpsc::channel();
2142        thread::spawn(move || {
2143            let first = outer.pull().unwrap().expect("expected first substream");
2144            let items: Vec<i32> = first.run_collect().unwrap();
2145            let second = outer.pull().unwrap().expect("expected second substream");
2146            let items2: Vec<i32> = second.run_collect().unwrap();
2147            let done = outer.pull().unwrap().is_none();
2148            let _ = result_tx.send((items, items2, done));
2149        });
2150
2151        let (items, items2, done) = result_rx
2152            .recv_timeout(StdDuration::from_secs(5))
2153            .expect("timed out — split_after is buffering the whole segment");
2154        // split_after includes the boundary element in the first segment.
2155        assert_eq!(items.len(), SEGMENT_LEN + 1);
2156        assert_eq!(items2, vec![99]);
2157        assert!(done);
2158    }
2159
2160    // Stress test: run flat_map_merge 20 times with many concurrent inner
2161    // streams to confirm the fixed coordinator tail loop never hangs.
2162    // Intended to be run with --test-threads=32 to expose the race.
2163    #[test]
2164    fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
2165        for _ in 0..20 {
2166            let result = Source::from_iter(0..50_i32)
2167                .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
2168                .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
2169                .unwrap()
2170                .wait();
2171            // Each i in 0..50 contributes i + (i+1) + (i+2) = 3i + 3.
2172            // Sum = 3 * (0+1+..+49) + 3*50 = 3*1225 + 150 = 3675 + 150 = 3825.
2173            assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
2174        }
2175    }
2176
2177    // Stress test for the single-mutex flat_map_merge refactor.
2178    // 20 runs with high concurrency to verify no deadlock, lost wakeup,
2179    // or window-accounting corruption under the merged-lock design.
2180    // Intended to be run with --test-threads=32.
2181    #[test]
2182    fn flat_map_merge_single_mutex_race_stress() {
2183        for _ in 0..20 {
2184            let result = Source::from_iter(0..100_i64)
2185                .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
2186                .run_with(Sink::fold(0i64, |acc, v| acc + v))
2187                .unwrap()
2188                .wait();
2189            // sum of item + (item+1000) for item in 0..100
2190            // = sum(0..100) + sum(0..100) + 100*1000
2191            // = 4950 + 4950 + 100000 = 109900
2192            assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
2193        }
2194    }
2195
2196    // Bounded-memory test for split_when batching: verify that items are
2197    // delivered to a slow consumer and not held indefinitely in the writer
2198    // buffer.  Uses a rendezvous-style channel where the producer sends MANY
2199    // items into the outer queue (larger than LIVE_SUBSTREAM_BATCH = 64), then
2200    // blocks.  The consumer must be able to drain the first segment fully — i.e.
2201    // the writer must have flushed even when the batch was smaller than 64.
2202    #[test]
2203    fn split_when_bounded_memory_rendezvous() {
2204        // Segment of 100 items (> LIVE_SUBSTREAM_BATCH=64): the writer will
2205        // flush at 64, then again at the boundary (remaining 36).
2206        const SEGMENT: usize = 100;
2207        let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
2208        for i in 0..SEGMENT as i32 {
2209            tx.send(i).unwrap();
2210        }
2211        tx.send(-1).unwrap(); // boundary (split_when: starts new segment)
2212        // Second segment
2213        for i in 0..10_i32 {
2214            tx.send(i).unwrap();
2215        }
2216        drop(tx);
2217
2218        let outer = Source::from_iter(rx)
2219            .split_when(|item| *item == -1)
2220            .run_with(Sink::queue())
2221            .unwrap();
2222
2223        let (result_tx, result_rx) = mpsc::channel();
2224        thread::spawn(move || {
2225            let first = outer.pull().unwrap().expect("first segment");
2226            let seg1: Vec<i32> = first.run_collect().unwrap();
2227            let second = outer.pull().unwrap().expect("second segment");
2228            let seg2: Vec<i32> = second.run_collect().unwrap();
2229            let done = outer.pull().unwrap().is_none();
2230            result_tx.send((seg1, seg2, done)).unwrap();
2231        });
2232
2233        let (seg1, seg2, done) = result_rx
2234            .recv_timeout(StdDuration::from_secs(5))
2235            .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
2236        assert_eq!(seg1.len(), SEGMENT, "first segment length");
2237        assert_eq!(seg2[0], -1, "boundary element starts second segment");
2238        assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
2239        assert!(done);
2240    }
2241
2242    // Bounded-memory test for group_by batching: verifies that a slow consumer
2243    // waiting on a single-key substream eventually receives all items (i.e. the
2244    // write batch is flushed, not held indefinitely).
2245    #[test]
2246    fn group_by_single_key_bounded_memory_rendezvous() {
2247        // 200 items, all same key, batched in groups of 64 by the writer.
2248        // The consumer blocks on run_collect(); all items must arrive.
2249        const N: usize = 200;
2250        let outer = Source::from_iter(0..N as i64)
2251            .group_by(1, |_| 0u8, false)
2252            .run_with(Sink::queue())
2253            .unwrap();
2254
2255        let (result_tx, result_rx) = mpsc::channel();
2256        thread::spawn(move || {
2257            let substream = outer.pull().unwrap().expect("substream");
2258            let items: Vec<i64> = substream.run_collect().unwrap();
2259            let done = outer.pull().unwrap().is_none();
2260            result_tx.send((items, done)).unwrap();
2261        });
2262
2263        let (items, done) = result_rx
2264            .recv_timeout(StdDuration::from_secs(5))
2265            .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
2266        assert_eq!(items.len(), N, "all items delivered");
2267        assert_eq!(items[0], 0);
2268        assert_eq!(items[N - 1], (N - 1) as i64);
2269        assert!(done);
2270    }
2271
2272    #[test]
2273    fn scan_emits_seed_and_accumulated_values() {
2274        let result = Source::from_iter(1..=3)
2275            .scan(0, |acc, item| acc + item)
2276            .run_collect()
2277            .unwrap();
2278
2279        assert_eq!(result, vec![0, 1, 3, 6]);
2280    }
2281
2282    #[test]
2283    fn limit_fails_after_max_elements() {
2284        let result = Source::from_iter(0..3).limit(2).run_collect();
2285
2286        assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
2287    }
2288
2289    #[test]
2290    fn limit_weighted_fails_with_limit_error_like_akka() {
2291        let result = Source::from_iter(["this", "is", "some", "string"])
2292            .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
2293            .run_collect();
2294
2295        assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
2296    }
2297
2298    #[test]
2299    fn grouped_weighted_allows_oversized_first_element_like_akka() {
2300        let result = Source::from_iter([10_usize, 1, 2])
2301            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2302            .run_collect()
2303            .unwrap();
2304
2305        assert_eq!(result, vec![vec![10], vec![1, 2]]);
2306    }
2307
2308    #[test]
2309    fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2310        let result = Source::from_iter([1_usize, 10, 2])
2311            .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2312            .run_collect()
2313            .unwrap();
2314
2315        assert_eq!(result, vec![vec![1, 10], vec![2]]);
2316    }
2317
2318    #[test]
2319    fn sink_terminals_materialize_results() {
2320        let sum = Source::from_iter(1..=4)
2321            .run_with(Sink::fold(0, |acc, item| acc + item))
2322            .unwrap();
2323
2324        assert_eq!(wait(sum), 10);
2325        assert_eq!(
2326            wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2327            1
2328        );
2329        assert_eq!(
2330            wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2331            4
2332        );
2333    }
2334
2335    #[test]
2336    fn all_terminal_sink_variants_complete() {
2337        assert_eq!(
2338            wait(
2339                Source::from_iter([1, 2, 3])
2340                    .run_with(Sink::collect())
2341                    .unwrap()
2342            ),
2343            vec![1, 2, 3]
2344        );
2345        assert_eq!(
2346            wait(
2347                Source::<i32>::empty()
2348                    .run_with(Sink::head_option())
2349                    .unwrap()
2350            ),
2351            None
2352        );
2353        assert_eq!(
2354            wait(
2355                Source::from_iter([1, 2, 3])
2356                    .run_with(Sink::last_option())
2357                    .unwrap()
2358            ),
2359            Some(3)
2360        );
2361        assert_eq!(
2362            wait(
2363                Source::from_iter([1, 2, 3])
2364                    .run_with(Sink::reduce(|acc, item| acc + item))
2365                    .unwrap()
2366            ),
2367            6
2368        );
2369
2370        let seen = StdArc::new(StdAtomicUsize::new(0));
2371        let seen_by_sink = StdArc::clone(&seen);
2372        assert_eq!(
2373            wait(
2374                Source::from_iter([1_usize, 2, 3])
2375                    .run_with(Sink::foreach(move |item| {
2376                        seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2377                    }))
2378                    .unwrap()
2379            ),
2380            NotUsed
2381        );
2382        assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2383    }
2384
2385    #[test]
2386    fn take_last_zero_returns_empty_vector() {
2387        let result = Source::from_iter([1, 2, 3])
2388            .run_with(Sink::take_last(0))
2389            .unwrap();
2390
2391        assert_eq!(wait(result), Vec::<i32>::new());
2392    }
2393
2394    #[test]
2395    fn bounded_head_terminals_complete_inline() {
2396        let materializer = Materializer::new();
2397
2398        let mut head = Source::from_iter(0_u64..1_000)
2399            .run_with_materializer(Sink::head(), &materializer)
2400            .unwrap();
2401        assert_eq!(materializer.active_streams(), 0);
2402        assert_eq!(head.try_wait(), Some(Ok(0)));
2403
2404        let mut filtered_head = Source::from_iter(0_u64..1_000)
2405            .filter(|item| *item >= 10)
2406            .run_with_materializer(Sink::head(), &materializer)
2407            .unwrap();
2408        assert_eq!(materializer.active_streams(), 0);
2409        assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2410
2411        let mut head_option = Source::<u64>::empty()
2412            .run_with_materializer(Sink::head_option(), &materializer)
2413            .unwrap();
2414        assert_eq!(materializer.active_streams(), 0);
2415        assert_eq!(head_option.try_wait(), Some(Ok(None)));
2416    }
2417
2418    #[test]
2419    fn bounded_head_fast_path_preserves_terminal_errors() {
2420        let materializer = Materializer::new();
2421
2422        let mut empty = Source::<u64>::empty()
2423            .run_with_materializer(Sink::head(), &materializer)
2424            .unwrap();
2425        assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2426
2427        let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2428            .run_with_materializer(Sink::head(), &materializer)
2429            .unwrap();
2430        assert_eq!(
2431            failed.try_wait(),
2432            Some(Err(StreamError::Failed("boom".into())))
2433        );
2434        assert_eq!(materializer.active_streams(), 0);
2435    }
2436
2437    #[test]
2438    fn runnable_graph_composes_source_and_sink() {
2439        let graph = Source::from_iter(1..=4)
2440            .map(|item| item * 2)
2441            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2442
2443        assert_eq!(wait(graph.run().unwrap()), 20);
2444
2445        let graph = Source::single(1)
2446            .map_materialized_value(|_| 20)
2447            .to(Sink::ignore())
2448            .map_materialized_value(|value| value + 1);
2449        assert_eq!(graph.run().unwrap(), 21);
2450
2451        let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2452        assert_eq!(ignored, NotUsed);
2453    }
2454
2455    #[test]
2456    fn materialized_values_follow_keep_defaults() {
2457        let source = Source::single(1).map_materialized_value(|_| "source");
2458        let flow = Flow::identity().map_materialized_value(|_| "flow");
2459
2460        let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2461        assert_eq!(source_mat.unwrap(), "source");
2462
2463        let combined = source
2464            .via_mat(flow, Keep::both)
2465            .to_mat(Sink::ignore(), Keep::both)
2466            .run()
2467            .unwrap();
2468        assert_eq!(combined.0, ("source", "flow"));
2469        assert_eq!(wait(combined.1), NotUsed);
2470
2471        let sink_mat = Source::single(41)
2472            .map_materialized_value(|_| "ignored source")
2473            .run_with(Sink::fold(1, |acc, item| acc + item))
2474            .unwrap();
2475        assert_eq!(wait(sink_mat), 42);
2476    }
2477
2478    #[test]
2479    fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2480        let sink = Flow::identity()
2481            .map(|item: i32| item + 1)
2482            .map_materialized_value(|_| "flow")
2483            .to(Sink::fold(0, |acc, item| acc + item));
2484
2485        let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2486
2487        assert_eq!(materialized, "flow");
2488        let explicit = Flow::identity()
2489            .map(|item: i32| item + 1)
2490            .map_materialized_value(|_| "flow")
2491            .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2492            .run_with(Source::from_iter([1, 2, 3]))
2493            .unwrap();
2494        assert_eq!(explicit, NotUsed);
2495
2496        let explicit = Source::from_iter([1, 2, 3])
2497            .run_with(
2498                Flow::identity()
2499                    .map(|item: i32| item + 1)
2500                    .map_materialized_value(|_| "flow")
2501                    .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2502            )
2503            .unwrap();
2504        assert_eq!(explicit.0, "flow");
2505        assert_eq!(wait(explicit.1), 9);
2506    }
2507
2508    #[test]
2509    fn materializer_shutdown_fails_materialization() {
2510        let materializer = Materializer::new();
2511        let named = materializer.with_name_prefix("test-stream");
2512        materializer.shutdown();
2513
2514        let graph = Source::single(1).to(Sink::ignore());
2515
2516        assert_eq!(named.name_prefix(), "test-stream");
2517        assert_eq!(
2518            graph.run_with_materializer(&named),
2519            Err(StreamError::AbruptTermination)
2520        );
2521    }
2522
2523    #[test]
2524    fn materializer_shutdown_fails_running_stream_completion() {
2525        let materializer = Materializer::new();
2526        let completion = Source::repeat(1)
2527            .run_with_materializer(Sink::ignore(), &materializer)
2528            .unwrap();
2529
2530        assert_eq!(materializer.active_streams(), 1);
2531        materializer.shutdown();
2532        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2533        assert_eq!(materializer.active_streams(), 0);
2534    }
2535
2536    #[test]
2537    fn dropped_stream_completion_cancels_running_stream() {
2538        let materializer = Materializer::new();
2539        let completion = Source::repeat(1)
2540            .run_with_materializer(Sink::ignore(), &materializer)
2541            .unwrap();
2542
2543        assert_eq!(materializer.active_streams(), 1);
2544        drop(completion);
2545        for _ in 0..50 {
2546            if materializer.active_streams() == 0 {
2547                break;
2548            }
2549            thread::sleep(Duration::from_millis(5));
2550        }
2551        assert_eq!(materializer.active_streams(), 0);
2552    }
2553
2554    #[test]
2555    fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2556        let materializer = Materializer::new();
2557        let (once_tx, once_rx) = mpsc::channel();
2558        let once = materializer.schedule_once(Duration::from_millis(5), move || {
2559            once_tx.send(()).unwrap();
2560        });
2561        once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2562        assert!(!once.is_cancelled());
2563
2564        let (cancelled_tx, cancelled_rx) = mpsc::channel();
2565        let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2566            cancelled_tx.send(()).unwrap();
2567        });
2568        assert!(cancelled.cancel());
2569        assert!(!cancelled.cancel());
2570        assert!(cancelled.is_cancelled());
2571        assert!(
2572            cancelled_rx
2573                .recv_timeout(Duration::from_millis(75))
2574                .is_err()
2575        );
2576
2577        let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2578        let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2579        let fixed_delay = materializer.schedule_with_fixed_delay(
2580            Duration::from_millis(1),
2581            Duration::from_millis(5),
2582            move || {
2583                fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2584            },
2585        );
2586        thread::sleep(Duration::from_millis(25));
2587        assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2588        fixed_delay.cancel();
2589
2590        let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2591        let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2592        let fixed_rate = materializer.schedule_at_fixed_rate(
2593            Duration::from_millis(1),
2594            Duration::from_millis(5),
2595            move || {
2596                fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2597            },
2598        );
2599        thread::sleep(Duration::from_millis(25));
2600        assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2601        fixed_rate.cancel();
2602
2603        let shutdown_materializer = Materializer::new();
2604        let (shutdown_tx, shutdown_rx) = mpsc::channel();
2605        shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2606            shutdown_tx.send(()).unwrap();
2607        });
2608        shutdown_materializer.shutdown();
2609        assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2610    }
2611
2612    #[test]
2613    fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2614        use std::sync::{Condvar, Mutex};
2615
2616        #[derive(Debug)]
2617        enum TimerEvent {
2618            Started(usize, Instant),
2619            Completed(usize, Instant),
2620        }
2621
2622        let recv_event = |rx: &mpsc::Receiver<TimerEvent>, label: &str| {
2623            rx.recv_timeout(Duration::from_secs(20))
2624                .unwrap_or_else(|err| panic!("{label}: expected timer event within 20 s: {err}"))
2625        };
2626        let release = |gate: &StdArc<(Mutex<bool>, Condvar)>| {
2627            let (released, condvar) = &**gate;
2628            let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2629            *released = true;
2630            condvar.notify_all();
2631        };
2632
2633        let interval = Duration::from_secs(2);
2634        let overrun = interval + Duration::from_millis(250);
2635
2636        let rate_materializer = Materializer::new();
2637        let (rate_tx, rate_rx) = mpsc::channel();
2638        let rate_runs = StdArc::new(StdAtomicUsize::new(0));
2639        let rate_task_runs = StdArc::clone(&rate_runs);
2640        let rate_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2641        let rate_task_gate = StdArc::clone(&rate_gate);
2642        let fixed_rate =
2643            rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2644                let run = rate_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2645                rate_tx
2646                    .send(TimerEvent::Started(run, Instant::now()))
2647                    .unwrap();
2648                if run == 1 {
2649                    let (released, condvar) = &*rate_task_gate;
2650                    let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2651                    while !*released {
2652                        released = condvar
2653                            .wait(released)
2654                            .unwrap_or_else(|poison| poison.into_inner());
2655                    }
2656                    rate_tx
2657                        .send(TimerEvent::Completed(run, Instant::now()))
2658                        .unwrap();
2659                }
2660            });
2661        let rate_first_started = match recv_event(&rate_rx, "fixed-rate first task") {
2662            TimerEvent::Started(1, at) => at,
2663            other => panic!("fixed-rate first task: unexpected event {other:?}"),
2664        };
2665        assert!(wait_until(Duration::from_secs(20), || {
2666            rate_first_started.elapsed() >= overrun
2667        }));
2668        release(&rate_gate);
2669        let rate_first_completed = match recv_event(&rate_rx, "fixed-rate first completion") {
2670            TimerEvent::Completed(1, at) => at,
2671            other => panic!("fixed-rate first completion: unexpected event {other:?}"),
2672        };
2673        let rate_second_started = match recv_event(&rate_rx, "fixed-rate second task") {
2674            TimerEvent::Started(2, at) => at,
2675            other => panic!("fixed-rate second task: unexpected event {other:?}"),
2676        };
2677        fixed_rate.cancel();
2678        rate_materializer.shutdown();
2679
2680        let delay_materializer = Materializer::new();
2681        let (delay_tx, delay_rx) = mpsc::channel();
2682        let delay_runs = StdArc::new(StdAtomicUsize::new(0));
2683        let delay_task_runs = StdArc::clone(&delay_runs);
2684        let delay_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2685        let delay_task_gate = StdArc::clone(&delay_gate);
2686        let fixed_delay =
2687            delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2688                let run = delay_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2689                delay_tx
2690                    .send(TimerEvent::Started(run, Instant::now()))
2691                    .unwrap();
2692                if run == 1 {
2693                    let (released, condvar) = &*delay_task_gate;
2694                    let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2695                    while !*released {
2696                        released = condvar
2697                            .wait(released)
2698                            .unwrap_or_else(|poison| poison.into_inner());
2699                    }
2700                    delay_tx
2701                        .send(TimerEvent::Completed(run, Instant::now()))
2702                        .unwrap();
2703                }
2704            });
2705        let delay_first_started = match recv_event(&delay_rx, "fixed-delay first task") {
2706            TimerEvent::Started(1, at) => at,
2707            other => panic!("fixed-delay first task: unexpected event {other:?}"),
2708        };
2709        assert!(wait_until(Duration::from_secs(20), || {
2710            delay_first_started.elapsed() >= overrun
2711        }));
2712        release(&delay_gate);
2713        let delay_first_completed = match recv_event(&delay_rx, "fixed-delay first completion") {
2714            TimerEvent::Completed(1, at) => at,
2715            other => panic!("fixed-delay first completion: unexpected event {other:?}"),
2716        };
2717        let delay_second_started = match recv_event(&delay_rx, "fixed-delay second task") {
2718            TimerEvent::Started(2, at) => at,
2719            other => panic!("fixed-delay second task: unexpected event {other:?}"),
2720        };
2721        fixed_delay.cancel();
2722        delay_materializer.shutdown();
2723
2724        let rate_task_time = rate_first_completed.duration_since(rate_first_started);
2725        let rate_catch_up = rate_second_started.duration_since(rate_first_completed);
2726        let delay_task_time = delay_first_completed.duration_since(delay_first_started);
2727        let delay_gap = delay_second_started.duration_since(delay_first_completed);
2728        assert!(
2729            rate_task_time >= interval,
2730            "fixed-rate first task should overrun its interval; ran for {rate_task_time:?}"
2731        );
2732        assert!(
2733            rate_catch_up < interval,
2734            "fixed-rate second task should catch up after an overrun; waited {rate_catch_up:?}"
2735        );
2736        assert!(
2737            delay_task_time >= interval,
2738            "fixed-delay first task should overrun its interval; ran for {delay_task_time:?}"
2739        );
2740        assert!(
2741            delay_gap >= interval,
2742            "fixed-delay second task fired before one full delay elapsed after completion: {delay_gap:?}",
2743        );
2744    }
2745
2746    #[test]
2747    fn runtime_repeating_timer_cancellation_stops_future_fires() {
2748        let materializer = Materializer::new();
2749        let (tx, rx) = mpsc::channel();
2750        let timer = materializer.schedule_at_fixed_rate(
2751            Duration::from_millis(1),
2752            Duration::from_millis(30),
2753            move || {
2754                tx.send(()).unwrap();
2755            },
2756        );
2757
2758        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2759        assert!(timer.cancel());
2760        assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2761        materializer.shutdown();
2762    }
2763
2764    #[test]
2765    fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2766        let materializer = Materializer::new();
2767        materializer.schedule_once(Duration::from_millis(1), || {
2768            panic!("timer boom");
2769        });
2770
2771        let (tx, rx) = mpsc::channel();
2772        materializer.schedule_once(Duration::from_millis(20), move || {
2773            tx.send(()).unwrap();
2774        });
2775
2776        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2777        materializer.shutdown();
2778    }
2779
2780    #[test]
2781    fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2782        let materializer = Materializer::new();
2783        let panic_count = StdArc::new(StdAtomicUsize::new(0));
2784        let panic_count_task = StdArc::clone(&panic_count);
2785        materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2786            panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2787            panic!("fixed-rate boom");
2788        });
2789
2790        assert!(wait_until(Duration::from_millis(150), || {
2791            panic_count.load(StdOrdering::SeqCst) == 1
2792        }));
2793
2794        let (tx, rx) = mpsc::channel();
2795        materializer.schedule_once(Duration::from_millis(30), move || {
2796            tx.send(()).unwrap();
2797        });
2798        rx.recv_timeout(Duration::from_millis(250)).unwrap();
2799
2800        thread::sleep(Duration::from_millis(90));
2801        assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2802        materializer.shutdown();
2803    }
2804
2805    #[test]
2806    fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2807        let materializer = Materializer::new();
2808        let started = StdArc::new(StdAtomicBool::new(false));
2809        let started_task = StdArc::clone(&started);
2810        let slow_timer = materializer.schedule_at_fixed_rate(
2811            Duration::ZERO,
2812            Duration::from_millis(250),
2813            move || {
2814                started_task.store(true, StdOrdering::SeqCst);
2815                thread::sleep(Duration::from_millis(200));
2816            },
2817        );
2818
2819        assert!(wait_until(Duration::from_millis(100), || {
2820            started.load(StdOrdering::SeqCst)
2821        }));
2822
2823        let start = Instant::now();
2824        let (tx, rx) = mpsc::channel();
2825        materializer.schedule_once(Duration::from_millis(10), move || {
2826            tx.send(Instant::now()).unwrap();
2827        });
2828        let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2829        let elapsed = fired_at.duration_since(start);
2830
2831        slow_timer.cancel();
2832        materializer.shutdown();
2833        assert!(
2834            elapsed < Duration::from_millis(150),
2835            "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2836        );
2837    }
2838
2839    #[test]
2840    fn runtime_shutdown_stops_timer_driver_thread() {
2841        let materializer = Materializer::new();
2842        assert!(wait_until(Duration::from_secs(1), || materializer
2843            .timer_driver_is_live()));
2844
2845        materializer.shutdown();
2846        assert!(wait_until(Duration::from_secs(2), || !materializer
2847            .timer_driver_is_live()));
2848    }
2849
2850    #[test]
2851    fn runtime_timer_driver_orders_many_timers_by_deadline() {
2852        let materializer = Materializer::new();
2853        let (tx, rx) = mpsc::channel();
2854        let schedule = [(450_u64, 4_u8), (50, 1), (350, 3), (150, 2), (550, 5)];
2855
2856        for (delay_ms, value) in schedule {
2857            let tx = tx.clone();
2858            materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2859                tx.send(value).unwrap();
2860            });
2861        }
2862        drop(tx);
2863
2864        let mut received = Vec::new();
2865        for _ in 0..schedule.len() {
2866            received.push(rx.recv_timeout(Duration::from_secs(10)).unwrap());
2867        }
2868        materializer.shutdown();
2869
2870        assert_eq!(received, vec![1, 2, 3, 4, 5]);
2871    }
2872
2873    #[test]
2874    fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
2875        let materializer = Materializer::new();
2876        let thread_name = materializer.timer_thread_name().to_owned();
2877        // Linux exposes thread names through /proc/self/task/*/comm with the
2878        // kernel's 15-byte task-name limit. Once a full test process has
2879        // created enough runtimes, `datum-timer-1000` is reported as
2880        // `datum-timer-100`, so compare against the Linux-visible name and use
2881        // the observed baseline for collision-safe assertions.
2882        let linux_thread_name = thread_name.chars().take(15).collect::<String>();
2883        assert!(wait_until(Duration::from_secs(5), || {
2884            materializer.timer_driver_is_live() && linux_thread_count(&linux_thread_name) >= 1
2885        }));
2886        let live_timer_threads = linux_thread_count(&linux_thread_name);
2887
2888        for _ in 0..128 {
2889            materializer.schedule_once(Duration::from_secs(60), || {});
2890        }
2891
2892        assert!(
2893            wait_until(Duration::from_secs(5), || {
2894                materializer.timer_driver_is_live()
2895                    && linux_thread_count(&linux_thread_name) == live_timer_threads
2896            }),
2897            "scheduling timers should not create extra timer threads for a runtime",
2898        );
2899        materializer.shutdown();
2900        assert!(wait_until(Duration::from_secs(5), || {
2901            !materializer.timer_driver_is_live()
2902                && linux_thread_count(&linux_thread_name) < live_timer_threads
2903        }));
2904    }
2905
2906    #[test]
2907    fn cancelled_and_never_sinks_have_distinct_materialization_results() {
2908        assert_eq!(
2909            Source::repeat(1)
2910                .run_with(Sink::cancelled())
2911                .expect("cancelled sink materializes"),
2912            NotUsed
2913        );
2914        assert_eq!(
2915            Source::single(1)
2916                .run_with(Sink::never())
2917                .expect("never sink materializes")
2918                .try_wait(),
2919            None
2920        );
2921    }
2922
2923    #[test]
2924    fn never_sink_finishes_on_materializer_shutdown() {
2925        let materializer = Materializer::new();
2926        let completion = Source::single(1)
2927            .run_with_materializer(Sink::never(), &materializer)
2928            .unwrap();
2929
2930        materializer.shutdown();
2931        assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2932    }
2933
2934    #[test]
2935    fn dropping_source_never_completion_releases_parked_worker() {
2936        let materializer = Materializer::new();
2937        let completion = Source::<i32>::never()
2938            .run_with_materializer(Sink::ignore(), &materializer)
2939            .unwrap();
2940
2941        assert!(wait_until(StdDuration::from_secs(1), || {
2942            materializer.active_streams() == 1
2943        }));
2944        assert_eq!(materializer.active_streams(), 1);
2945
2946        drop(completion);
2947
2948        assert!(wait_until(StdDuration::from_secs(15), || {
2949            materializer.active_streams() == 0
2950        }));
2951        assert_eq!(materializer.active_streams(), 0);
2952    }
2953
2954    #[test]
2955    fn future_and_maybe_sources_emit_values() {
2956        let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
2957        assert_eq!(future_value, vec![7]);
2958
2959        let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
2960            .run_collect()
2961            .unwrap();
2962        assert_eq!(future_source, vec![1, 2, 3]);
2963
2964        let (handle, source) = Source::maybe();
2965        assert_eq!(
2966            source.clone().run_collect(),
2967            Err(StreamError::MaybeIncomplete)
2968        );
2969        handle.complete(9).unwrap();
2970        assert_eq!(source.run_collect().unwrap(), vec![9]);
2971    }
2972
2973    #[test]
2974    fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
2975        assert_eq!(
2976            Source::cycle(|| [1, 2, 3].into_iter())
2977                .take(8)
2978                .run_collect()
2979                .unwrap(),
2980            vec![1, 2, 3, 1, 2, 3, 1, 2]
2981        );
2982        assert_eq!(
2983            Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
2984            Err(StreamError::Failed("empty iterator".into()))
2985        );
2986        assert_eq!(
2987            Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
2988                .run_collect()
2989                .unwrap(),
2990            vec![0, 1, 2, 3]
2991        );
2992        assert_eq!(
2993            Source::unfold_async(0, |state| async move {
2994                Ok((state < 4).then_some((state + 1, state * 2)))
2995            })
2996            .run_collect()
2997            .unwrap(),
2998            vec![0, 2, 4, 6]
2999        );
3000        assert!(matches!(
3001            Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
3002            Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
3003        ));
3004    }
3005
3006    #[test]
3007    fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
3008        let created = StdArc::new(StdAtomicUsize::new(0));
3009        let created_for_source = StdArc::clone(&created);
3010        let source = Source::<i32>::lazy_source(move || {
3011            created_for_source.fetch_add(1, StdOrdering::SeqCst);
3012            Source::from_iter([7, 8]).map_materialized_value(|_| 99)
3013        });
3014        let materializer = Materializer::new();
3015        let (mut stream, mut mat) = StdArc::clone(&source.factory)
3016            .create(&materializer)
3017            .unwrap();
3018
3019        assert_eq!(created.load(StdOrdering::SeqCst), 0);
3020        assert!(mat.try_wait().is_none());
3021        assert_eq!(stream.next().unwrap().unwrap(), 7);
3022        assert_eq!(mat.wait().unwrap(), 99);
3023        assert_eq!(created.load(StdOrdering::SeqCst), 1);
3024        assert_eq!(stream.next().unwrap().unwrap(), 8);
3025
3026        let never_created = StdArc::new(StdAtomicUsize::new(0));
3027        let never_created_for_source = StdArc::clone(&never_created);
3028        let mat = Source::<i32>::lazy_future_source(move || {
3029            never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
3030            async { Ok(Source::single(1)) }
3031        })
3032        .to(Sink::cancelled())
3033        .run()
3034        .unwrap();
3035        assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
3036        assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
3037
3038        let lazy_future = StdArc::new(StdAtomicUsize::new(0));
3039        let lazy_future_for_source = StdArc::clone(&lazy_future);
3040        let source = Source::lazy_future(move || {
3041            lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
3042            async { Ok(42) }
3043        });
3044        let (mut stream, _) = StdArc::clone(&source.factory)
3045            .create(&Materializer::new())
3046            .unwrap();
3047        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
3048        assert_eq!(stream.next().unwrap().unwrap(), 42);
3049        assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
3050    }
3051
3052    #[test]
3053    fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
3054        let closed = StdArc::new(StdAtomicUsize::new(0));
3055        let closed_on_complete = StdArc::clone(&closed);
3056        let values = Source::unfold_resource(
3057            || Ok(std::collections::VecDeque::from([1, 2, 3])),
3058            |items| Ok(items.pop_front()),
3059            move |_items| {
3060                closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
3061                Ok(())
3062            },
3063        )
3064        .run_collect()
3065        .unwrap();
3066        assert_eq!(values, vec![1, 2, 3]);
3067        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3068
3069        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3070        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3071        let failed = Source::<i32>::unfold_resource(
3072            || Ok(()),
3073            |_| Err(StreamError::Failed("read".into())),
3074            move |_| {
3075                closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
3076                Err(StreamError::Failed("close".into()))
3077            },
3078        )
3079        .run_collect();
3080        assert_eq!(failed, Err(StreamError::Failed("read".into())));
3081        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3082
3083        let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
3084        let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
3085        let first = Source::unfold_resource(
3086            || Ok(0_usize),
3087            |next| {
3088                let item = *next;
3089                *next += 1;
3090                Ok(Some(item))
3091            },
3092            move |_| {
3093                closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
3094                Ok(())
3095            },
3096        )
3097        .run_with(Sink::head())
3098        .unwrap();
3099        assert_eq!(first.wait().unwrap(), 0);
3100        assert!(wait_until(Duration::from_millis(250), || {
3101            closed_on_cancel.load(StdOrdering::SeqCst) == 1
3102        }));
3103    }
3104
3105    #[test]
3106    fn wp6b_async_resource_and_async_accumulators_are_sequential() {
3107        let closed = StdArc::new(StdAtomicUsize::new(0));
3108        let closed_for_close = StdArc::clone(&closed);
3109        let values = Source::unfold_resource_async(
3110            || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
3111            |items| {
3112                let item = items.pop_front();
3113                async move { Ok(item) }
3114            },
3115            move |_items| {
3116                let closed = StdArc::clone(&closed_for_close);
3117                async move {
3118                    closed.fetch_add(1, StdOrdering::SeqCst);
3119                    Ok(())
3120                }
3121            },
3122        )
3123        .run_collect()
3124        .unwrap();
3125        assert_eq!(values, vec![1, 2, 3]);
3126        assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3127
3128        let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3129        let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3130        let failed = Source::<i32>::unfold_resource_async(
3131            || async { Ok(()) },
3132            |_resource| async { Err(StreamError::Failed("read".into())) },
3133            move |_resource| {
3134                let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
3135                async move {
3136                    closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
3137                    Err(StreamError::Failed("close".into()))
3138                }
3139            },
3140        )
3141        .run_collect();
3142        assert_eq!(failed, Err(StreamError::Failed("read".into())));
3143        assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3144
3145        let active = StdArc::new(StdAtomicUsize::new(0));
3146        let max_active = StdArc::new(StdAtomicUsize::new(0));
3147        let active_for_stage = StdArc::clone(&active);
3148        let max_for_stage = StdArc::clone(&max_active);
3149        let scanned = Source::from_iter(1..=4)
3150            .scan_async(0, move |acc, item| {
3151                let active = StdArc::clone(&active_for_stage);
3152                let max_active = StdArc::clone(&max_for_stage);
3153                async move {
3154                    let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3155                    max_active.fetch_max(now, StdOrdering::SeqCst);
3156                    tokio::time::sleep(Duration::from_millis(1)).await;
3157                    active.fetch_sub(1, StdOrdering::SeqCst);
3158                    Ok(acc + item)
3159                }
3160            })
3161            .run_collect()
3162            .unwrap();
3163        assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
3164        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3165
3166        let folded = Source::from_iter(1..=4)
3167            .fold_async(0, |acc, item| async move { Ok(acc + item) })
3168            .run_collect()
3169            .unwrap();
3170        assert_eq!(folded, vec![10]);
3171    }
3172
3173    #[test]
3174    fn wp6b_fold_async_materialization_does_not_drain_upstream() {
3175        let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
3176        let started = StdArc::new(StdAtomicBool::new(false));
3177        let source = {
3178            let release = StdArc::clone(&release);
3179            let started = StdArc::clone(&started);
3180            Source::from_factory(move || {
3181                let release = StdArc::clone(&release);
3182                let started = StdArc::clone(&started);
3183                let mut emitted = false;
3184                Box::new(std::iter::from_fn(move || {
3185                    if emitted {
3186                        return None;
3187                    }
3188                    emitted = true;
3189                    started.store(true, StdOrdering::SeqCst);
3190                    let (released, available) = &*release;
3191                    let mut released = released.lock().unwrap();
3192                    while !*released {
3193                        released = available.wait(released).unwrap();
3194                    }
3195                    Some(Ok(1))
3196                }))
3197            })
3198        };
3199
3200        let (materialized_tx, materialized_rx) = mpsc::channel();
3201        let join = thread::spawn(move || {
3202            let queue = source
3203                .fold_async(0, |acc, item| async move { Ok(acc + item) })
3204                .run_with(Sink::queue())
3205                .unwrap();
3206            materialized_tx.send(queue).unwrap();
3207        });
3208
3209        let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
3210            Ok(queue) => queue,
3211            Err(error) => {
3212                let (released, available) = &*release;
3213                *released.lock().unwrap() = true;
3214                available.notify_all();
3215                let _ = join.join();
3216                panic!("fold_async materialization did not return before first pull: {error}");
3217            }
3218        };
3219        let (released, _) = &*release;
3220        assert!(
3221            !*released.lock().unwrap(),
3222            "test source was released before materialization returned"
3223        );
3224
3225        let (released, available) = &*release;
3226        *released.lock().unwrap() = true;
3227        available.notify_all();
3228        assert_eq!(queue.pull().unwrap(), Some(1));
3229        assert_eq!(queue.pull().unwrap(), None);
3230        join.join().unwrap();
3231        assert!(started.load(StdOrdering::SeqCst));
3232    }
3233
3234    #[test]
3235    fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
3236        let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
3237        let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
3238        let empty_sink = Source::<i32>::empty()
3239            .run_with(Sink::lazy_sink(move || {
3240                lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3241                Sink::ignore()
3242            }))
3243            .unwrap();
3244        assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
3245        assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
3246
3247        let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
3248        let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
3249        Source::from_iter([1_usize, 2, 3])
3250            .run_with(Sink::foreach_async(2, move |item| {
3251                let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
3252                async move {
3253                    foreach_sum.fetch_add(item, StdOrdering::SeqCst);
3254                    Ok(())
3255                }
3256            }))
3257            .unwrap()
3258            .wait()
3259            .unwrap();
3260        assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
3261
3262        let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
3263        let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
3264        let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
3265            lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3266            Flow::identity()
3267                .map(|item: i32| item + 10)
3268                .map_materialized_value(|_| 123)
3269        });
3270        let mat = (lazy_flow.materialize)().unwrap();
3271        let mut stream = match lazy_flow.transform {
3272            flow::FlowTransform::Runtime(transform) => {
3273                transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
3274            }
3275            flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
3276        };
3277        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
3278        assert_eq!(stream.next().unwrap().unwrap(), 11);
3279        assert_eq!(mat.wait().unwrap(), 123);
3280        assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
3281        assert_eq!(stream.next().unwrap().unwrap(), 12);
3282
3283        let future_flow = Source::from_iter([1, 2])
3284            .via_mat(
3285                Flow::future_flow(|| async {
3286                    Ok(Flow::identity()
3287                        .map(|item: i32| item * 2)
3288                        .map_materialized_value(|_| 77))
3289                }),
3290                Keep::right,
3291            )
3292            .to_mat(Sink::collect(), Keep::both)
3293            .run()
3294            .unwrap();
3295        assert_eq!(future_flow.0.wait().unwrap(), 77);
3296        assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
3297    }
3298
3299    #[test]
3300    fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
3301        // Order-sensitive pin for the per-thread FIFO slot pairing: one
3302        // cloned lazy-flow blueprint used twice in the SAME chain on the
3303        // same thread. element = first_stage_id * 100 + second_stage_id, so
3304        // a cross-wired (swapped) mat pairing would yield id2 * 100 + id1.
3305        for round in 0..50 {
3306            let counter = StdArc::new(StdAtomicUsize::new(1));
3307            let factory_counter = StdArc::clone(&counter);
3308            let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
3309                let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
3310                Flow::identity()
3311                    .map(move |x: usize| x * 100 + id)
3312                    .map_materialized_value(move |_| id)
3313            });
3314            let lazy_again = lazy.clone();
3315
3316            let ((first_mat, second_mat), out) = Source::from_iter([0usize])
3317                .via_mat(lazy, Keep::right)
3318                .via_mat(lazy_again, Keep::both)
3319                .to_mat(Sink::collect(), Keep::both)
3320                .run()
3321                .unwrap();
3322
3323            let first_id = first_mat.wait().unwrap();
3324            let second_id = second_mat.wait().unwrap();
3325            let element = out.wait().unwrap()[0];
3326            assert_eq!(
3327                element,
3328                first_id * 100 + second_id,
3329                "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
3330            );
3331            assert_ne!(
3332                first_id, second_id,
3333                "round {round}: same factory instance paired twice"
3334            );
3335        }
3336    }
3337
3338    #[test]
3339    fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
3340        for _ in 0..20 {
3341            let next_id = StdArc::new(StdAtomicUsize::new(0));
3342            let next_id_for_factory = StdArc::clone(&next_id);
3343            let flow = Flow::<i32, i32>::lazy_flow(move || {
3344                let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
3345                Flow::identity()
3346                    .map(move |item: i32| item + (id as i32 * 100))
3347                    .map_materialized_value(move |_| id)
3348            });
3349            let barrier = StdArc::new(std::sync::Barrier::new(3));
3350
3351            let spawn_materialization = |input: i32| {
3352                let flow = flow.clone();
3353                let barrier = StdArc::clone(&barrier);
3354                thread::spawn(move || {
3355                    barrier.wait();
3356                    let (mat, values) = Source::single(input)
3357                        .via_mat(flow, Keep::right)
3358                        .to_mat(Sink::collect(), Keep::both)
3359                        .run()
3360                        .unwrap();
3361                    (input, mat.wait().unwrap(), values.wait().unwrap())
3362                })
3363            };
3364
3365            let first = spawn_materialization(1);
3366            let second = spawn_materialization(2);
3367            barrier.wait();
3368
3369            for result in [first.join().unwrap(), second.join().unwrap()] {
3370                let (input, mat_id, values) = result;
3371                assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
3372            }
3373            assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
3374        }
3375    }
3376
3377    #[test]
3378    fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
3379        let queue = Source::from_factory(|| {
3380            Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
3381        })
3382        .map_with_resource(
3383            || Ok(()),
3384            |_resource, item| Ok(item + 10),
3385            |_resource| Ok(Some(99)),
3386        )
3387        .run_with(Sink::queue())
3388        .unwrap();
3389
3390        assert_eq!(queue.pull().unwrap(), Some(11));
3391        assert_eq!(queue.pull().unwrap(), Some(99));
3392        assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
3393
3394        let failed: StreamResult<Vec<i32>> = Source::single(1)
3395            .map_with_resource(
3396                || Ok(()),
3397                |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
3398                |_resource| -> StreamResult<Option<i32>> {
3399                    Err(StreamError::Failed("close".into()))
3400                },
3401            )
3402            .run_collect();
3403        assert_eq!(failed, Err(StreamError::Failed("map".into())));
3404    }
3405
3406    #[test]
3407    fn stateful_and_terminal_source_operators_work() {
3408        let stateful = Source::from_iter([1, 2, 3])
3409            .stateful_map(0, |sum, item| {
3410                *sum += item;
3411                *sum
3412            })
3413            .run_collect()
3414            .unwrap();
3415        assert_eq!(stateful, vec![1, 3, 6]);
3416
3417        let concat = Source::from_iter([1, 2, 3])
3418            .stateful_map_concat(0, |sum, item| {
3419                *sum += item;
3420                [item, *sum]
3421            })
3422            .run_collect()
3423            .unwrap();
3424        assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3425
3426        assert_eq!(
3427            Source::from_iter([1, 2, 3])
3428                .fold(10, |acc, item| acc + item)
3429                .run_collect()
3430                .unwrap(),
3431            vec![16]
3432        );
3433        assert_eq!(
3434            Source::from_iter([1, 2, 3])
3435                .reduce(|acc, item| acc + item)
3436                .run_collect()
3437                .unwrap(),
3438            vec![6]
3439        );
3440    }
3441
3442    #[test]
3443    fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3444        let concat = Source::single(())
3445            .map_concat(|_| 0_u64..)
3446            .take(1)
3447            .run_collect()
3448            .unwrap();
3449        assert_eq!(concat, vec![0]);
3450
3451        let sliding = Source::repeat(1_u64)
3452            .sliding(2, 1)
3453            .take(1)
3454            .run_collect()
3455            .unwrap();
3456        assert_eq!(sliding, vec![vec![1, 1]]);
3457    }
3458
3459    #[test]
3460    fn fan_in_source_operators_follow_ordering_rules() {
3461        assert_eq!(
3462            Source::from_iter([1, 2])
3463                .concat(Source::from_iter([3, 4]))
3464                .run_collect()
3465                .unwrap(),
3466            vec![1, 2, 3, 4]
3467        );
3468        assert_eq!(
3469            Source::from_iter([3, 4])
3470                .prepend(Source::from_iter([1, 2]))
3471                .run_collect()
3472                .unwrap(),
3473            vec![1, 2, 3, 4]
3474        );
3475        assert_eq!(
3476            Source::empty()
3477                .or_else(Source::from_iter([10, 20]))
3478                .run_collect()
3479                .unwrap(),
3480            vec![10, 20]
3481        );
3482        assert_eq!(
3483            Source::from_iter([1, 2])
3484                .or_else(Source::from_iter([10, 20]))
3485                .run_collect()
3486                .unwrap(),
3487            vec![1, 2]
3488        );
3489        assert_eq!(
3490            Source::from_iter([1, 2, 3])
3491                .interleave(Source::from_iter([10, 11, 12]), 2)
3492                .run_collect()
3493                .unwrap(),
3494            vec![1, 2, 10, 11, 3, 12]
3495        );
3496    }
3497
3498    #[test]
3499    fn fan_in_flow_operators_compose_with_primary_stream() {
3500        let concat = Source::from_iter([1, 2])
3501            .via(Flow::identity().concat(Source::from_iter([3, 4])))
3502            .run_collect()
3503            .unwrap();
3504        assert_eq!(concat, vec![1, 2, 3, 4]);
3505
3506        let prepend = Source::from_iter([3, 4])
3507            .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3508            .run_collect()
3509            .unwrap();
3510        assert_eq!(prepend, vec![1, 2, 3, 4]);
3511
3512        let interleave = Source::from_iter([1, 2, 3])
3513            .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3514            .run_collect()
3515            .unwrap();
3516        assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3517
3518        let merge_sorted = Source::from_iter([1, 4])
3519            .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3520            .run_collect()
3521            .unwrap();
3522        assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3523
3524        let zip_latest = Source::from_iter([1, 2])
3525            .via(Flow::identity().zip_latest(Source::single(10)))
3526            .run_collect()
3527            .unwrap();
3528        assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3529
3530        let zip_latest_with = Source::from_iter([1, 2])
3531            .via(
3532                Flow::identity()
3533                    .zip_latest_with(Source::single(10), false, |left, right| left + right),
3534            )
3535            .run_collect()
3536            .unwrap();
3537        assert_eq!(zip_latest_with, vec![11, 12]);
3538    }
3539
3540    #[test]
3541    fn fan_in_operators_propagate_errors_and_eager_close() {
3542        assert!(matches!(
3543            Source::failed(StreamError::Failed("boom".into()))
3544                .or_else(Source::from_iter([1, 2]))
3545                .run_collect(),
3546            Err(StreamError::Failed(_))
3547        ));
3548        assert!(matches!(
3549            Source::from_iter([1, 2])
3550                .prepend(Source::failed(StreamError::Failed("boom".into())))
3551                .run_collect(),
3552            Err(StreamError::Failed(_))
3553        ));
3554        assert_eq!(
3555            Source::from_iter([1, 2])
3556                .interleave_all([Source::empty()], 1, true)
3557                .run_collect()
3558                .unwrap(),
3559            vec![1]
3560        );
3561    }
3562
3563    #[test]
3564    fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3565        use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3566
3567        let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3568            AtomicUsize::new(0),
3569            AtomicUsize::new(0),
3570            AtomicUsize::new(0),
3571        ]);
3572
3573        let make_source = |idx: usize| {
3574            let pulls = Arc::clone(&pulls);
3575            Source::from_materialized_factory(move |_| {
3576                let pulls = Arc::clone(&pulls);
3577                let mut emitted = false;
3578                Ok((
3579                    Box::new(std::iter::from_fn(move || {
3580                        pulls[idx].fetch_add(1, Ordering::SeqCst);
3581                        if !emitted && idx == 0 {
3582                            emitted = true;
3583                            Some(Ok(42))
3584                        } else {
3585                            None
3586                        }
3587                    })) as BoxStream<i32>,
3588                    NotUsed,
3589                ))
3590            })
3591        };
3592
3593        let result = make_source(0)
3594            .interleave_all([make_source(1), make_source(2)], 1, false)
3595            .run_with(Sink::head());
3596
3597        assert_eq!(wait(result.unwrap()), 42);
3598        assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3599        assert_eq!(
3600            pulls[1].load(Ordering::SeqCst),
3601            0,
3602            "second input should not be pulled when downstream cancels after first element"
3603        );
3604        assert_eq!(
3605            pulls[2].load(Ordering::SeqCst),
3606            0,
3607            "third input should not be pulled before its turn"
3608        );
3609    }
3610
3611    #[test]
3612    fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3613        assert_eq!(
3614            Source::from_iter([1, 2, 3, 4])
3615                .interleave_all(
3616                    [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3617                    1,
3618                    false
3619                )
3620                .run_collect()
3621                .unwrap(),
3622            vec![1, 10, 20, 2, 21, 3, 22, 4]
3623        );
3624    }
3625
3626    #[test]
3627    fn remaining_merge_and_zip_family_matches_expected_ordering() {
3628        assert_eq!(
3629            Source::from_iter([1, 4])
3630                .merge_sorted(Source::from_iter([2, 3, 5]))
3631                .run_collect()
3632                .unwrap(),
3633            vec![1, 2, 3, 4, 5]
3634        );
3635
3636        assert_eq!(
3637            Source::from_iter([1, 2])
3638                .merge_latest(Source::single(10), false)
3639                .run_collect()
3640                .unwrap(),
3641            vec![vec![1, 10], vec![2, 10]]
3642        );
3643
3644        assert_eq!(
3645            Source::from_iter([1, 2, 3])
3646                .merge_all([Source::from_iter([10, 11])], false)
3647                .run_collect()
3648                .unwrap(),
3649            vec![1, 10, 2, 11, 3]
3650        );
3651
3652        assert_eq!(
3653            Source::from_iter([1, 2, 3])
3654                .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3655                .run_collect()
3656                .unwrap(),
3657            vec![11, 13, 15]
3658        );
3659
3660        assert_eq!(
3661            Source::from_iter([1, 2])
3662                .zip_latest(Source::single(10))
3663                .run_collect()
3664                .unwrap(),
3665            vec![(1, 10), (2, 10)]
3666        );
3667
3668        assert_eq!(
3669            Source::from_iter([1, 2, 3])
3670                .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3671                .run_collect()
3672                .unwrap(),
3673            vec![11, 12, 13]
3674        );
3675
3676        assert_eq!(
3677            Source::from_iter([1, 2])
3678                .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3679                .run_collect()
3680                .unwrap(),
3681            vec![(1, 10), (2, 11), (-1, 12)]
3682        );
3683
3684        assert_eq!(
3685            Source::from_iter([5, 6, 7])
3686                .zip_with_index()
3687                .run_collect()
3688                .unwrap(),
3689            vec![(5, 0), (6, 1), (7, 2)]
3690        );
3691
3692        assert_eq!(
3693            Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3694                .run_collect()
3695                .unwrap(),
3696            vec![vec![1, 10], vec![2, 20]]
3697        );
3698
3699        assert_eq!(
3700            Source::zip_with_n(
3701                [
3702                    Source::from_iter([1, 2]),
3703                    Source::from_iter([10, 20]),
3704                    Source::from_iter([100, 200]),
3705                ],
3706                |values| values.into_iter().sum::<i32>(),
3707            )
3708            .run_collect()
3709            .unwrap(),
3710            vec![111, 222]
3711        );
3712
3713        assert_eq!(
3714            Source::merge_prioritized_n(
3715                [
3716                    (Source::from_iter([1, 2, 3, 4]), 2),
3717                    (Source::from_iter([10, 11]), 1),
3718                ],
3719                false,
3720            )
3721            .run_collect()
3722            .unwrap(),
3723            vec![1, 2, 10, 3, 4, 11]
3724        );
3725
3726        assert_eq!(
3727            Source::combine(
3728                Source::from_iter([1, 2, 3]),
3729                Source::from_iter([10, 11]),
3730                std::iter::empty::<Source<i32, NotUsed>>(),
3731                SourceCombineStrategy::Merge {
3732                    eager_complete: false,
3733                },
3734            )
3735            .run_collect()
3736            .unwrap(),
3737            vec![1, 10, 2, 11, 3]
3738        );
3739
3740        let combined_sink = Sink::combine(
3741            Sink::ignore(),
3742            Sink::ignore(),
3743            std::iter::empty::<Sink<i32, NotUsed>>(),
3744            SinkCombineStrategy::Broadcast,
3745        );
3746        assert_eq!(
3747            Source::from_iter([1, 2, 3])
3748                .run_with(combined_sink)
3749                .unwrap(),
3750            NotUsed
3751        );
3752    }
3753
3754    #[test]
3755    fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3756        // Regression: the combined children's materialized values were
3757        // dropped at materialization, tripping cancel-on-drop and silently
3758        // cancelling every child before it consumed anything (0 deliveries).
3759        let first_count = StdArc::new(StdAtomicUsize::new(0));
3760        let second_count = StdArc::new(StdAtomicUsize::new(0));
3761        let first_counter = StdArc::clone(&first_count);
3762        let second_counter = StdArc::clone(&second_count);
3763        let combined = Sink::combine(
3764            Sink::foreach(move |_: i32| {
3765                first_counter.fetch_add(1, StdOrdering::SeqCst);
3766            }),
3767            Sink::foreach(move |_: i32| {
3768                second_counter.fetch_add(1, StdOrdering::SeqCst);
3769            }),
3770            std::iter::empty::<Sink<i32, NotUsed>>(),
3771            SinkCombineStrategy::Broadcast,
3772        );
3773        assert_eq!(
3774            Source::from_iter(0..100).run_with(combined).unwrap(),
3775            NotUsed
3776        );
3777        // The rendezvous channels guarantee each child has received every
3778        // element before the parent send returns. The child callback may still
3779        // be one scheduler tick behind the parent, so assert with a bounded
3780        // condition wait instead of an immediate load.
3781        assert!(wait_until(StdDuration::from_secs(1), || {
3782            first_count.load(StdOrdering::SeqCst) == 100
3783                && second_count.load(StdOrdering::SeqCst) == 100
3784        }));
3785    }
3786
3787    #[test]
3788    fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3789        // Regression: with eager_complete = false, an empty side left
3790        // `latest = None` forever while the loop drained the other side —
3791        // an infinite busy-loop when that side is unbounded.
3792        assert_eq!(
3793            Source::from_iter(std::iter::empty::<i32>())
3794                .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3795                .run_collect()
3796                .unwrap(),
3797            Vec::<i32>::new()
3798        );
3799        assert_eq!(
3800            Source::repeat(10)
3801                .zip_latest_with(
3802                    Source::from_iter(std::iter::empty::<i32>()),
3803                    false,
3804                    |left, right| left + right,
3805                )
3806                .run_collect()
3807                .unwrap(),
3808            Vec::<i32>::new()
3809        );
3810    }
3811
3812    #[test]
3813    fn zip_family_completion_boundaries_match_expected_results() {
3814        assert_eq!(
3815            Source::from_iter([1, 2, 3])
3816                .zip_with(Source::from_iter([10]), |left, right| left + right)
3817                .run_collect()
3818                .unwrap(),
3819            vec![11]
3820        );
3821
3822        assert_eq!(
3823            Source::from_iter([1, 2, 3])
3824                .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3825                .run_collect()
3826                .unwrap(),
3827            vec![11, 12]
3828        );
3829
3830        assert_eq!(
3831            Source::zip_n([
3832                Source::from_iter([1, 2, 3]),
3833                Source::from_iter([10]),
3834                Source::from_iter([100, 200, 300]),
3835            ])
3836            .run_collect()
3837            .unwrap(),
3838            vec![vec![1, 10, 100]]
3839        );
3840    }
3841
3842    #[test]
3843    fn combine_strategies_follow_merge_concat_and_priority_rules() {
3844        assert_eq!(
3845            Source::combine(
3846                Source::from_iter([1, 2]),
3847                Source::from_iter([10, 11]),
3848                [Source::from_iter([100])],
3849                SourceCombineStrategy::Concat,
3850            )
3851            .run_collect()
3852            .unwrap(),
3853            vec![1, 2, 10, 11, 100]
3854        );
3855
3856        assert_eq!(
3857            Source::combine(
3858                Source::from_iter([1, 2, 3, 4]),
3859                Source::from_iter([10, 11]),
3860                std::iter::empty::<Source<i32, NotUsed>>(),
3861                SourceCombineStrategy::Prioritized {
3862                    priorities: vec![2, 1],
3863                    eager_complete: false,
3864                },
3865            )
3866            .run_collect()
3867            .unwrap(),
3868            vec![1, 2, 10, 3, 4, 11]
3869        );
3870    }
3871
3872    #[test]
3873    fn concat_lazy_defers_follow_on_source_until_needed() {
3874        let source_counter = StdArc::new(StdAtomicUsize::new(0));
3875        let source_counter_clone = StdArc::clone(&source_counter);
3876        let lazy_source = Source::from_materialized_factory(move |_| {
3877            source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3878            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3879        });
3880        let source_head = Source::single(1)
3881            .concat_lazy(lazy_source)
3882            .run_with(Sink::head());
3883        assert_eq!(wait(source_head.unwrap()), 1);
3884        assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
3885
3886        let flow_counter = StdArc::new(StdAtomicUsize::new(0));
3887        let flow_counter_clone = StdArc::clone(&flow_counter);
3888        let lazy_flow_source = Source::from_materialized_factory(move |_| {
3889            flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3890            Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3891        });
3892        let flow_head = Source::single(1)
3893            .via(Flow::identity().concat_lazy(lazy_flow_source))
3894            .run_with(Sink::head());
3895        assert_eq!(wait(flow_head.unwrap()), 1);
3896        assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
3897    }
3898
3899    #[test]
3900    fn also_to_completes_when_side_sink_cancels() {
3901        assert_eq!(
3902            Source::from_iter([1, 2, 3])
3903                .also_to(Sink::cancelled())
3904                .run_collect()
3905                .unwrap(),
3906            Vec::<i32>::new()
3907        );
3908        assert_eq!(
3909            Source::from_iter([1, 2, 3])
3910                .also_to_all([Sink::cancelled(), Sink::cancelled()])
3911                .run_collect()
3912                .unwrap(),
3913            Vec::<i32>::new()
3914        );
3915    }
3916
3917    #[test]
3918    fn also_to_completes_gracefully_when_side_sink_disconnects() {
3919        let result = Source::from_iter(0..100)
3920            .also_to(Sink::head())
3921            .run_collect()
3922            .unwrap();
3923        assert!(!result.is_empty(), "main should emit at least one element");
3924        assert!(
3925            result.len() < 100,
3926            "main should complete early when side disconnects"
3927        );
3928    }
3929
3930    #[test]
3931    fn also_to_propagates_original_error_when_side_is_disconnected() {
3932        let err = StreamError::Failed("distinctive-boom".into());
3933        assert!(matches!(
3934            Source::<i32>::failed(err.clone())
3935                .also_to(Sink::cancelled())
3936                .run_collect(),
3937            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3938        ));
3939        assert!(matches!(
3940            Source::<i32>::failed(err.clone())
3941                .also_to_all([Sink::cancelled()])
3942                .run_collect(),
3943            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3944        ));
3945        assert!(matches!(
3946            Source::<i32>::failed(err)
3947                .divert_to(Sink::cancelled(), |_: &i32| true)
3948                .run_collect(),
3949            Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3950        ));
3951    }
3952
3953    #[test]
3954    fn divert_to_routes_matching_elements_to_side_sink() {
3955        let diverted = Source::from_iter([1, 2, 3, 4])
3956            .divert_to(Sink::ignore(), |item| item % 2 == 0)
3957            .run_collect()
3958            .unwrap();
3959        assert_eq!(diverted, vec![1, 3]);
3960    }
3961
3962    #[test]
3963    fn wire_tap_drops_when_side_sink_backpressures() {
3964        let tapped = Source::from_iter([1, 2, 3])
3965            .wire_tap(Sink::head())
3966            .run_collect()
3967            .unwrap();
3968        assert_eq!(tapped, vec![1, 2, 3]);
3969
3970        let tapped_via_flow = Source::from_iter([1, 2, 3])
3971            .via(Flow::identity().wire_tap(Sink::head()))
3972            .run_collect()
3973            .unwrap();
3974        assert_eq!(tapped_via_flow, vec![1, 2, 3]);
3975    }
3976
3977    #[test]
3978    fn async_mapping_variants_complete() {
3979        let ordered = Source::from_iter(0..4)
3980            .map_async(2, |item| async move { Ok(item * 2) })
3981            .run_collect()
3982            .unwrap();
3983        assert_eq!(ordered, vec![0, 2, 4, 6]);
3984
3985        let unordered = Source::from_iter(0..4)
3986            .map_async_unordered(2, |item| async move { Ok(item * 2) })
3987            .run_collect()
3988            .unwrap();
3989        assert_eq!(unordered, vec![0, 2, 4, 6]);
3990
3991        let partitioned = Source::from_iter(0..4)
3992            .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
3993            .run_collect()
3994            .unwrap();
3995        assert_eq!(partitioned, vec![1, 2, 3, 4]);
3996    }
3997
3998    #[test]
3999    fn map_async_ordered_bounds_pulls_behind_stuck_head() {
4000        let pulls = StdArc::new(StdAtomicUsize::new(0));
4001        let pulls_for_source = StdArc::clone(&pulls);
4002        let probe = Source::from_fn_iter(move || {
4003            let pulls = StdArc::clone(&pulls_for_source);
4004            std::iter::from_fn(move || {
4005                let next = pulls.fetch_add(1, StdOrdering::SeqCst);
4006                Some(next)
4007            })
4008        })
4009        .map_async(2, |item| async move {
4010            if item == 0 {
4011                tokio::time::sleep(StdDuration::from_millis(300)).await;
4012            }
4013            Ok(item)
4014        })
4015        .run_with(TestSink::probe())
4016        .unwrap();
4017
4018        probe.request(16);
4019        thread::sleep(StdDuration::from_millis(100));
4020        assert!(
4021            pulls.load(StdOrdering::SeqCst) <= 3,
4022            "pulled {} elements with parallelism=2 behind a stuck ordered head",
4023            pulls.load(StdOrdering::SeqCst)
4024        );
4025    }
4026
4027    #[test]
4028    fn async_mapping_parks_until_woken_future_completes() {
4029        struct WakeOnceFuture {
4030            value: Option<u64>,
4031            ready: StdArc<StdAtomicBool>,
4032            started: bool,
4033            polls: StdArc<StdAtomicUsize>,
4034            latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
4035        }
4036
4037        impl std::future::Future for WakeOnceFuture {
4038            type Output = StreamResult<u64>;
4039
4040            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
4041                let this = self.as_mut().get_mut();
4042                this.polls.fetch_add(1, StdOrdering::SeqCst);
4043                *this.latest_waker.lock().expect("latest wake slot mutex") =
4044                    Some(cx.waker().clone());
4045                if this.ready.load(StdOrdering::SeqCst) {
4046                    return Poll::Ready(Ok(this.value.take().unwrap()));
4047                }
4048
4049                if !this.started {
4050                    this.started = true;
4051                    let ready = StdArc::clone(&this.ready);
4052                    let latest_waker = StdArc::clone(&this.latest_waker);
4053                    thread::spawn(move || {
4054                        thread::sleep(Duration::from_millis(20));
4055                        ready.store(true, StdOrdering::SeqCst);
4056                        if let Some(waker) =
4057                            latest_waker.lock().expect("latest wake slot mutex").take()
4058                        {
4059                            waker.wake();
4060                        }
4061                    });
4062                }
4063                Poll::Pending
4064            }
4065        }
4066
4067        let polls = StdArc::new(StdAtomicUsize::new(0));
4068        let polls_for_stage = StdArc::clone(&polls);
4069        let start = Instant::now();
4070
4071        let values = Source::single(41)
4072            .map_async(1, move |item| WakeOnceFuture {
4073                value: Some(item + 1),
4074                ready: StdArc::new(StdAtomicBool::new(false)),
4075                started: false,
4076                polls: StdArc::clone(&polls_for_stage),
4077                latest_waker: StdArc::new(Mutex::new(None)),
4078            })
4079            .run_collect()
4080            .unwrap();
4081
4082        assert_eq!(values, vec![42]);
4083        let elapsed = start.elapsed();
4084        assert!(
4085            elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
4086            "pending future should park until woken once, elapsed={elapsed:?}"
4087        );
4088        assert!(
4089            polls.load(StdOrdering::SeqCst) < 4096,
4090            "pending future was repolled too aggressively"
4091        );
4092    }
4093
4094    #[test]
4095    fn async_mapping_emits_before_unbounded_upstream_finishes() {
4096        let ordered = Source::repeat(1)
4097            .map_async(2, |item| async move { Ok(item + 1) })
4098            .take(1)
4099            .run_collect()
4100            .unwrap();
4101        assert_eq!(ordered, vec![2]);
4102
4103        let unordered = Source::repeat(1)
4104            .map_async_unordered(2, |item| async move { Ok(item + 1) })
4105            .take(1)
4106            .run_collect()
4107            .unwrap();
4108        assert_eq!(unordered, vec![2]);
4109
4110        let partitioned = Source::repeat(1)
4111            .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
4112            .take(1)
4113            .run_collect()
4114            .unwrap();
4115        assert_eq!(partitioned, vec![2]);
4116    }
4117
4118    #[test]
4119    fn partitioned_async_mapping_limits_same_key_concurrency() {
4120        let active = StdArc::new(StdAtomicUsize::new(0));
4121        let max_active = StdArc::new(StdAtomicUsize::new(0));
4122        let active_for_stage = StdArc::clone(&active);
4123        let max_for_stage = StdArc::clone(&max_active);
4124
4125        let values = Source::from_iter(0..6)
4126            .map_async_partitioned(
4127                4,
4128                1,
4129                |_| 0_u8,
4130                move |item| {
4131                    let active = StdArc::clone(&active_for_stage);
4132                    let max_active = StdArc::clone(&max_for_stage);
4133                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4134                    max_active.fetch_max(current, StdOrdering::SeqCst);
4135                    async move {
4136                        thread::sleep(Duration::from_millis(1));
4137                        active.fetch_sub(1, StdOrdering::SeqCst);
4138                        Ok(item)
4139                    }
4140                },
4141            )
4142            .run_collect()
4143            .unwrap();
4144
4145        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
4146        assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4147    }
4148
4149    #[test]
4150    fn partitioned_async_mapping_scans_past_blocked_pending_key() {
4151        let active = StdArc::new(StdAtomicUsize::new(0));
4152        let max_active = StdArc::new(StdAtomicUsize::new(0));
4153        let active_for_stage = StdArc::clone(&active);
4154        let max_for_stage = StdArc::clone(&max_active);
4155        let (release_tx, release_rx) = oneshot::channel::<()>();
4156        let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
4157        let release_rx_for_stage = StdArc::clone(&release_rx);
4158        let max_for_release = StdArc::clone(&max_active);
4159
4160        let releaser = thread::spawn(move || {
4161            let deadline = Instant::now() + StdDuration::from_secs(1);
4162            while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
4163                thread::yield_now();
4164            }
4165            let _ = release_tx.send(());
4166        });
4167
4168        let values = Source::from_iter([0, 2, 1])
4169            .map_async_partitioned(
4170                2,
4171                1,
4172                |item| item % 2,
4173                move |item| {
4174                    let active = StdArc::clone(&active_for_stage);
4175                    let max_active = StdArc::clone(&max_for_stage);
4176                    let release_rx = StdArc::clone(&release_rx_for_stage);
4177                    let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4178                    max_active.fetch_max(current, StdOrdering::SeqCst);
4179                    async move {
4180                        if item == 0 {
4181                            let receiver = release_rx
4182                                .lock()
4183                                .expect("release receiver mutex")
4184                                .take()
4185                                .expect("release receiver present");
4186                            let _ = receiver.await;
4187                        }
4188                        active.fetch_sub(1, StdOrdering::SeqCst);
4189                        Ok(item)
4190                    }
4191                },
4192            )
4193            .run_collect()
4194            .unwrap();
4195        releaser.join().unwrap();
4196
4197        assert_eq!(values, vec![0, 2, 1]);
4198        assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
4199    }
4200
4201    #[test]
4202    fn partitioned_async_mapping_p1_still_evaluates_partition() {
4203        let partitions = StdArc::new(StdAtomicUsize::new(0));
4204        let partitions_for_stage = StdArc::clone(&partitions);
4205
4206        let values = Source::from_iter(0..8)
4207            .map_async_partitioned(
4208                1,
4209                1,
4210                move |item| {
4211                    partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
4212                    item % 2
4213                },
4214                |item| async move { Ok(item + 1) },
4215            )
4216            .run_collect()
4217            .unwrap();
4218
4219        assert_eq!(values, (1..9).collect::<Vec<_>>());
4220        assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
4221    }
4222
4223    #[test]
4224    fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
4225        let active_by_key =
4226            StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4227        let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4228        let active_for_stage = StdArc::clone(&active_by_key);
4229        let max_for_stage = StdArc::clone(&max_by_key);
4230
4231        let values = Source::from_iter(0..512_usize)
4232            .map_async_partitioned(
4233                32,
4234                1,
4235                |item| item % 16,
4236                move |item| {
4237                    let active = StdArc::clone(&active_for_stage);
4238                    let max_active = StdArc::clone(&max_for_stage);
4239                    let key = item % 16;
4240                    let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
4241                    max_active[key].fetch_max(current, StdOrdering::SeqCst);
4242                    async move {
4243                        active[key].fetch_sub(1, StdOrdering::SeqCst);
4244                        Ok(item)
4245                    }
4246                },
4247            )
4248            .run_collect()
4249            .unwrap();
4250
4251        assert_eq!(values, (0..512).collect::<Vec<_>>());
4252        for max_active in max_by_key.iter() {
4253            assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4254        }
4255    }
4256
4257    #[test]
4258    fn error_operators_map_recover_and_complete() {
4259        let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
4260            .map_error(|_| StreamError::Failed("mapped".into()))
4261            .run_collect();
4262        assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
4263
4264        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4265            .recover(|error| match error {
4266                StreamError::Failed(_) => Some(42),
4267                _ => None,
4268            })
4269            .run_collect()
4270            .unwrap();
4271        assert_eq!(recovered, vec![42]);
4272
4273        let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
4274            .recover(|_| None)
4275            .run_collect();
4276        assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
4277
4278        let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
4279            .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
4280            .run_collect()
4281            .unwrap();
4282        assert_eq!(recovered_with, vec![1, 2]);
4283
4284        let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
4285            .recover_with_retries(1, |_| None)
4286            .run_collect();
4287        assert_eq!(
4288            declined_recover_with,
4289            Err(StreamError::Failed("declined".into()))
4290        );
4291
4292        let completed = Source::from_factory(|| {
4293            Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
4294        })
4295        .on_error_complete()
4296        .run_collect()
4297        .unwrap();
4298        assert_eq!(completed, vec![1]);
4299    }
4300
4301    #[test]
4302    fn sliding_matches_akka_window_semantics() {
4303        // Full windows then stop; no spurious trailing partial windows.
4304        assert_eq!(
4305            Source::from_iter(1..=4)
4306                .sliding(3, 1)
4307                .run_collect()
4308                .unwrap(),
4309            vec![vec![1, 2, 3], vec![2, 3, 4]]
4310        );
4311        assert_eq!(
4312            Source::from_iter(1..=4)
4313                .sliding(2, 1)
4314                .run_collect()
4315                .unwrap(),
4316            vec![vec![1, 2], vec![2, 3], vec![3, 4]]
4317        );
4318        // Exact multiple of size leaves nothing to emit at finish.
4319        assert_eq!(
4320            Source::from_iter(1..=3)
4321                .sliding(3, 1)
4322                .run_collect()
4323                .unwrap(),
4324            vec![vec![1, 2, 3]]
4325        );
4326        // Short streams emit a single partial window.
4327        assert_eq!(
4328            Source::from_iter(1..=2)
4329                .sliding(3, 1)
4330                .run_collect()
4331                .unwrap(),
4332            vec![vec![1, 2]]
4333        );
4334        // Every element in its own window.
4335        assert_eq!(
4336            Source::from_iter(1..=3)
4337                .sliding(1, 1)
4338                .run_collect()
4339                .unwrap(),
4340            vec![vec![1], vec![2], vec![3]]
4341        );
4342        // step > size skips the gap between windows.
4343        assert_eq!(
4344            Source::from_iter(1..=6)
4345                .sliding(2, 3)
4346                .run_collect()
4347                .unwrap(),
4348            vec![vec![1, 2], vec![4, 5]]
4349        );
4350        // step > size with an in-progress trailing window is dropped (Akka).
4351        assert_eq!(
4352            Source::from_iter(1..=3)
4353                .sliding(2, 4)
4354                .run_collect()
4355                .unwrap(),
4356            vec![vec![1, 2]]
4357        );
4358    }
4359
4360    #[test]
4361    fn recover_with_retries_indefinitely_like_akka() {
4362        let attempts = StdArc::new(StdAtomicUsize::new(0));
4363        let attempts_in_stage = StdArc::clone(&attempts);
4364        // The first several recovery sources keep failing; `recover_with` must
4365        // retry past Datum's old single-retry limit to reach the good source.
4366        let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4367            .recover_with(move |_error| {
4368                if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
4369                    Some(Source::<i32>::failed(StreamError::Failed("again".into())))
4370                } else {
4371                    Some(Source::from_iter([42]))
4372                }
4373            })
4374            .run_collect()
4375            .unwrap();
4376        assert_eq!(recovered, vec![42]);
4377        assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
4378    }
4379
4380    #[test]
4381    fn many_concurrent_streams_do_not_starve_the_pool() {
4382        // Regression: a fixed-size pool deadlocks once `num_cpus` streams each
4383        // monopolize a worker. The growing pool must keep serving new streams.
4384        //
4385        // Use a small fixed count rather than `available_parallelism() + 2`: the
4386        // point is that several concurrent never-completing streams cannot starve
4387        // a fresh one, and a fixed count keeps this from spawning a large number
4388        // of permanent threads (which, when the whole suite runs under load, can
4389        // exhaust the thread limit and wedge the shared executor).
4390        let materializer = Materializer::new();
4391        let busy = 6_usize;
4392
4393        let mut held = Vec::with_capacity(busy);
4394        for _ in 0..busy {
4395            held.push(
4396                Source::single(1_u64)
4397                    .run_with_materializer(Sink::never(), &materializer)
4398                    .unwrap(),
4399            );
4400        }
4401
4402        for _ in 0..400 {
4403            if materializer.active_streams() >= busy {
4404                break;
4405            }
4406            thread::sleep(Duration::from_millis(5));
4407        }
4408        assert_eq!(materializer.active_streams(), busy);
4409
4410        // A fresh finite stream must still complete despite every prior worker
4411        // being occupied by a never-completing sink.
4412        let sum = Source::from_iter(0_u64..5)
4413            .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
4414            .unwrap();
4415        assert_eq!(sum.wait().unwrap(), 10);
4416
4417        materializer.shutdown();
4418        for completion in held {
4419            assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4420        }
4421    }
4422}