Skip to main content

datum/stream/
source.rs

1//! `Source<Out, Mat>` — the origin of a linear stream — plus its constructors,
2//! terminal runners (`run_with`/`run_fold`/`run_foreach`/`run_reduce`/…), and the
3//! supporting types `NotUsed`, `Keep`, `Demand`, `PushOutlet`, `MaybeHandle`, and
4//! `IntoSource`.
5//!
6//! Construction is side-effect-free: every constructor stores a factory closure
7//! that only runs at materialization. See this module's `AGENTS.md` and the crate
8//! `CLAUDE.md` blueprint-vs-materialization rule.
9
10use super::flow::{self, FlowTransform};
11use super::*;
12use crate::Attributes;
13use crate::context::SourceWithContext;
14
15#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
16pub struct NotUsed;
17
18type CombinedSourceFactory<Out> =
19    dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
20
21pub struct Keep;
22
23impl Keep {
24    pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
25        left
26    }
27
28    pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
29        right
30    }
31
32    pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
33        (left, right)
34    }
35
36    pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
37        NotUsed
38    }
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum SourceCombineStrategy {
43    Merge {
44        eager_complete: bool,
45    },
46    Concat,
47    Prioritized {
48        priorities: Vec<usize>,
49        eager_complete: bool,
50    },
51}
52
53#[derive(Clone)]
54pub struct MaybeHandle<T> {
55    value: Arc<Mutex<Option<StreamResult<T>>>>,
56}
57
58impl<T> MaybeHandle<T> {
59    #[must_use]
60    pub fn is_completed(&self) -> bool {
61        self.value.lock().expect("maybe handle poisoned").is_some()
62    }
63
64    pub fn complete(&self, item: T) -> StreamResult<()> {
65        self.settle(Ok(item))
66    }
67
68    pub fn fail(&self, error: StreamError) -> StreamResult<()> {
69        self.settle(Err(error))
70    }
71
72    fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
73        let mut value = self.value.lock().expect("maybe handle poisoned");
74        if value.is_some() {
75            return Err(StreamError::Failed("maybe source already completed".into()));
76        }
77        *value = Some(result);
78        Ok(())
79    }
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
83pub struct Demand(u64);
84
85impl Demand {
86    pub const ZERO: Self = Self(0);
87    pub const ONE: Self = Self(1);
88    pub const MAX: Self = Self(u64::MAX);
89
90    #[must_use]
91    pub const fn new(available: u64) -> Self {
92        Self(available)
93    }
94
95    #[must_use]
96    pub const fn available(self) -> u64 {
97        self.0
98    }
99
100    #[must_use]
101    pub const fn is_unbounded(self) -> bool {
102        self.0 == u64::MAX
103    }
104
105    #[must_use]
106    pub const fn is_empty(self) -> bool {
107        self.0 == 0
108    }
109
110    #[must_use]
111    pub const fn saturating_add(self, rhs: Self) -> Self {
112        Self(self.0.saturating_add(rhs.0))
113    }
114
115    pub fn consume_one(&mut self) -> bool {
116        match self.0 {
117            0 => false,
118            u64::MAX => true,
119            _ => {
120                self.0 -= 1;
121                true
122            }
123        }
124    }
125}
126
127pub trait PushOutlet<T>: Send {
128    fn push(&mut self, item: T) -> StreamResult<Demand>;
129
130    fn complete(&mut self) -> StreamResult<()> {
131        Ok(())
132    }
133
134    fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
135        Err(cause)
136    }
137}
138
139#[derive(Clone)]
140pub struct Source<Out, Mat = NotUsed> {
141    pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
142    pub(crate) terminal_factory: Option<Arc<TerminalSourceFactory<Out, Mat>>>,
143    pub(super) hints: SourceHints,
144    pub(super) attributes: Attributes,
145    /// Set only for split-segment sources in fast mode. `None` for all other sources.
146    /// Cleared by all composition operators (via/map/to_mat etc.).
147    pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
148}
149
150pub(crate) type TerminalSourceFactory<Out, Mat> =
151    dyn Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)> + Send + Sync;
152
153impl<Out: Send + 'static> Source<Out, NotUsed> {
154    pub(super) fn from_factory<F>(factory: F) -> Self
155    where
156        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
157    {
158        Self::from_factory_with_hints(factory, SourceHints::default())
159    }
160
161    fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
162    where
163        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
164    {
165        Self::from_materialized_factory_with_hints(
166            move |_materializer| Ok((factory(), NotUsed)),
167            hints,
168        )
169    }
170
171    #[must_use]
172    pub fn empty() -> Self {
173        Self::from_factory_with_hints(
174            || Box::new(std::iter::empty()),
175            SourceHints::with_inline_micro(0),
176        )
177    }
178
179    #[must_use]
180    pub fn never() -> Self {
181        Self::from_materialized_factory_with_hints(
182            move |materializer| {
183                let state = Arc::clone(&materializer.inner.state);
184                Ok((
185                    Box::new(std::iter::from_fn(move || {
186                        loop {
187                            if state.shutdown.load(Ordering::SeqCst) {
188                                return Some(Err(StreamError::AbruptTermination));
189                            }
190                            if super::runtime::current_stream_cancelled()
191                                .as_ref()
192                                .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
193                            {
194                                return Some(Err(StreamError::Cancelled));
195                            }
196                            std::thread::park_timeout(Duration::from_millis(1));
197                        }
198                    })),
199                    NotUsed,
200                ))
201            },
202            SourceHints::default(),
203        )
204    }
205
206    #[must_use]
207    pub fn failed(error: StreamError) -> Self {
208        Self::from_factory_with_hints(
209            move || Box::new(std::iter::once(Err(error.clone()))),
210            // max_success_items=0: failed() emits no Ok items before the Err.
211            SourceHints::with_inline_micro(0),
212        )
213    }
214
215    #[must_use]
216    pub fn future<F, Fut>(future: F) -> Self
217    where
218        F: Fn() -> Fut + Send + Sync + 'static,
219        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
220    {
221        let future = Arc::new(future);
222        Self::from_factory(move || {
223            let future = Arc::clone(&future);
224            let mut emitted = false;
225            Box::new(std::iter::from_fn(move || {
226                if emitted {
227                    return None;
228                }
229                emitted = true;
230                Some(
231                    catch_unwind_failed("source future factory", || future())
232                        .and_then(flow::run_future_inline_or_spawn),
233                )
234            }))
235        })
236    }
237
238    #[must_use]
239    pub fn future_source<F, Fut>(future: F) -> Self
240    where
241        F: Fn() -> Fut + Send + Sync + 'static,
242        Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
243    {
244        let future = Arc::new(future);
245        Self::from_materialized_factory(move |materializer| {
246            let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
247            let future = Arc::clone(&future);
248            let mut current = None::<BoxStream<Out>>;
249            let mut initialized = false;
250            let mut terminated = false;
251            Ok((
252                Box::new(std::iter::from_fn(move || {
253                    if terminated {
254                        return None;
255                    }
256
257                    loop {
258                        if let Some(stream) = current.as_mut() {
259                            match stream.next() {
260                                Some(item) => return Some(item),
261                                None => {
262                                    terminated = true;
263                                    return None;
264                                }
265                            }
266                        }
267
268                        if initialized {
269                            terminated = true;
270                            return None;
271                        }
272                        initialized = true;
273                        let source = match catch_unwind_failed("future_source factory", || future())
274                            .and_then(flow::run_future_inline_or_spawn)
275                        {
276                            Ok(source) => source,
277                            Err(error) => {
278                                terminated = true;
279                                return Some(Err(error));
280                            }
281                        };
282                        current = Some(match Arc::clone(&source.factory).create(&materializer) {
283                            Ok((stream, _)) => stream,
284                            Err(error) => {
285                                terminated = true;
286                                return Some(Err(error));
287                            }
288                        });
289                    }
290                })) as BoxStream<Out>,
291                NotUsed,
292            ))
293        })
294    }
295
296    #[must_use]
297    pub fn cycle<F, I>(factory: F) -> Self
298    where
299        F: Fn() -> I + Send + Sync + 'static,
300        I: IntoIterator<Item = Out>,
301        I::IntoIter: Send + 'static,
302    {
303        let factory = Arc::new(factory);
304        Self::from_factory(move || {
305            let factory = Arc::clone(&factory);
306            let mut current = None::<I::IntoIter>;
307            let mut terminated = false;
308            Box::new(std::iter::from_fn(move || {
309                if terminated {
310                    return None;
311                }
312
313                if let Some(iter) = current.as_mut()
314                    && let Some(item) = iter.next()
315                {
316                    return Some(Ok(item));
317                }
318
319                let mut next = match catch_unwind_failed("cycle factory", || factory()) {
320                    Ok(iterable) => iterable.into_iter(),
321                    Err(error) => {
322                        terminated = true;
323                        return Some(Err(error));
324                    }
325                };
326                match next.next() {
327                    Some(item) => {
328                        current = Some(next);
329                        Some(Ok(item))
330                    }
331                    None => {
332                        terminated = true;
333                        Some(Err(StreamError::Failed("empty iterator".into())))
334                    }
335                }
336            }))
337        })
338    }
339
340    #[must_use]
341    pub fn unfold<State, F>(initial: State, f: F) -> Self
342    where
343        State: Clone + Send + Sync + 'static,
344        F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
345    {
346        let f = Arc::new(f);
347        Self::from_factory(move || {
348            let f = Arc::clone(&f);
349            let mut state = Some(initial.clone());
350            let mut terminated = false;
351            Box::new(std::iter::from_fn(move || {
352                if terminated {
353                    return None;
354                }
355                let current = state.take().expect("unfold state present");
356                match catch_unwind_failed("unfold function", || f(current)) {
357                    Ok(Some((next_state, item))) => {
358                        state = Some(next_state);
359                        Some(Ok(item))
360                    }
361                    Ok(None) => {
362                        terminated = true;
363                        None
364                    }
365                    Err(error) => {
366                        terminated = true;
367                        Some(Err(error))
368                    }
369                }
370            }))
371        })
372    }
373
374    #[must_use]
375    pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
376    where
377        State: Clone + Send + Sync + 'static,
378        F: Fn(State) -> Fut + Send + Sync + 'static,
379        Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
380    {
381        let f = Arc::new(f);
382        Self::from_factory(move || {
383            let f = Arc::clone(&f);
384            let mut state = Some(initial.clone());
385            let mut terminated = false;
386            Box::new(std::iter::from_fn(move || {
387                if terminated {
388                    return None;
389                }
390                let current = state.take().expect("unfold_async state present");
391                match catch_unwind_failed("unfold_async factory", || f(current))
392                    .and_then(flow::run_future_inline_or_spawn)
393                {
394                    Ok(Some((next_state, item))) => {
395                        state = Some(next_state);
396                        Some(Ok(item))
397                    }
398                    Ok(None) => {
399                        terminated = true;
400                        None
401                    }
402                    Err(error) => {
403                        terminated = true;
404                        Some(Err(error))
405                    }
406                }
407            }))
408        })
409    }
410
411    #[must_use]
412    pub fn unfold_resource<Resource, Create, Read, Close>(
413        create: Create,
414        read: Read,
415        close: Close,
416    ) -> Self
417    where
418        Resource: Send + 'static,
419        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
420        Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
421        Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
422    {
423        let create = Arc::new(create);
424        let read = Arc::new(read);
425        let close = Arc::new(close);
426        Self::from_factory(move || {
427            Box::new(UnfoldResourceStream {
428                create: Arc::clone(&create),
429                read: Arc::clone(&read),
430                close: Arc::clone(&close),
431                resource: None,
432                created: false,
433                terminated: false,
434                _marker: PhantomData,
435            })
436        })
437    }
438
439    #[must_use]
440    pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
441        create: Create,
442        read: Read,
443        close: Close,
444    ) -> Self
445    where
446        Resource: Send + 'static,
447        Create: Fn() -> CreateFut + Send + Sync + 'static,
448        CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
449        Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
450        ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
451        Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
452        CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
453    {
454        let create = Arc::new(create);
455        let read = Arc::new(read);
456        let close = Arc::new(close);
457        Self::from_factory(move || {
458            Box::new(UnfoldResourceAsyncStream {
459                create: Arc::clone(&create),
460                read: Arc::clone(&read),
461                close: Arc::clone(&close),
462                resource: None,
463                created: false,
464                terminated: false,
465                _marker: PhantomData,
466            })
467        })
468    }
469
470    #[must_use]
471    pub fn lazy_single<F>(create: F) -> Self
472    where
473        F: Fn() -> Out + Send + Sync + 'static,
474    {
475        let create = Arc::new(create);
476        Self::from_factory(move || {
477            let create = Arc::clone(&create);
478            let mut emitted = false;
479            Box::new(std::iter::from_fn(move || {
480                if emitted {
481                    return None;
482                }
483                emitted = true;
484                Some(catch_unwind_failed("lazy_single factory", || create()))
485            }))
486        })
487    }
488
489    #[must_use]
490    pub fn lazy_future<F, Fut>(create: F) -> Self
491    where
492        F: Fn() -> Fut + Send + Sync + 'static,
493        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
494    {
495        let create = Arc::new(create);
496        Self::from_factory(move || {
497            let create = Arc::clone(&create);
498            let mut emitted = false;
499            Box::new(std::iter::from_fn(move || {
500                if emitted {
501                    return None;
502                }
503                emitted = true;
504                Some(
505                    catch_unwind_failed("lazy_future factory", || create())
506                        .and_then(flow::run_future_inline_or_spawn),
507                )
508            }))
509        })
510    }
511
512    #[must_use]
513    pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
514    where
515        InnerMat: Send + 'static,
516        F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
517    {
518        let create = Arc::new(create);
519        Source::from_materialized_factory(move |materializer| {
520            let (sender, receiver) = oneshot::channel();
521            Ok((
522                Box::new(LazySourceStream {
523                    create: Arc::clone(&create),
524                    materializer: materializer
525                        .with_name_prefix(materializer.name_prefix().to_owned()),
526                    current: None,
527                    mat_sender: Some(sender),
528                    initialized: false,
529                    terminated: false,
530                }) as BoxStream<Out>,
531                StreamCompletion::from_receiver(receiver, None),
532            ))
533        })
534    }
535
536    #[must_use]
537    pub fn lazy_future_source<InnerMat, F, Fut>(
538        create: F,
539    ) -> Source<Out, StreamCompletion<InnerMat>>
540    where
541        InnerMat: Send + 'static,
542        F: Fn() -> Fut + Send + Sync + 'static,
543        Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
544    {
545        let create = Arc::new(create);
546        Source::from_materialized_factory(move |materializer| {
547            let (sender, receiver) = oneshot::channel();
548            Ok((
549                Box::new(LazyFutureSourceStream {
550                    create: Arc::clone(&create),
551                    materializer: materializer
552                        .with_name_prefix(materializer.name_prefix().to_owned()),
553                    current: None,
554                    mat_sender: Some(sender),
555                    initialized: false,
556                    terminated: false,
557                    _marker: PhantomData,
558                }) as BoxStream<Out>,
559                StreamCompletion::from_receiver(receiver, None),
560            ))
561        })
562    }
563
564    #[must_use]
565    pub fn from_fn_iter<F, I>(factory: F) -> Self
566    where
567        F: Fn() -> I + Send + Sync + 'static,
568        I: IntoIterator<Item = Out>,
569        I::IntoIter: Send + 'static,
570    {
571        Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
572    }
573}
574
575impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
576    fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
577    where
578        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
579    {
580        Self {
581            factory: Arc::new(FnSourceFactory(factory)),
582            terminal_factory: None,
583            hints,
584            attributes: Attributes::default(),
585            split_hook: None,
586        }
587    }
588
589    pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
590    where
591        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
592    {
593        Self::from_materialized_factory_with_hints(factory, SourceHints::default())
594    }
595
596    pub(crate) fn from_terminal_direct_materialized_factory<F, D>(
597        factory: F,
598        terminal_factory: D,
599    ) -> Self
600    where
601        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
602        D: Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)>
603            + Send
604            + Sync
605            + 'static,
606    {
607        Self {
608            factory: Arc::new(FnSourceFactory(factory)),
609            terminal_factory: Some(Arc::new(terminal_factory)),
610            hints: SourceHints::with_terminal_consumer_batch(),
611            attributes: Attributes::default(),
612            split_hook: None,
613        }
614    }
615
616    #[must_use]
617    pub fn as_source_with_context<Ctx, F>(
618        self,
619        extract_context: F,
620    ) -> SourceWithContext<Out, Ctx, Mat>
621    where
622        Ctx: Send + 'static,
623        F: Fn(&Out) -> Ctx + Send + Sync + 'static,
624    {
625        SourceWithContext::from_source(self.map(move |item| {
626            let context = extract_context(&item);
627            (item, context)
628        }))
629    }
630
631    #[must_use]
632    pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
633    where
634        Next: Send + 'static,
635        FlowMat: Send + 'static,
636    {
637        self.via_mat(flow, Keep::left)
638    }
639
640    #[must_use]
641    pub fn via_mat<Next, FlowMat, Combined, F>(
642        self,
643        flow: Flow<Out, Next, FlowMat>,
644        combine: F,
645    ) -> Source<Next, Combined>
646    where
647        Next: Send + 'static,
648        FlowMat: Send + 'static,
649        Combined: Send + 'static,
650        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
651    {
652        let source = self.factory;
653        let transform = flow.transform;
654        let materialize_flow = flow.materialize;
655        let hints = self.hints.after_flow(flow.hints);
656        let combine = Arc::new(combine);
657        Source::from_materialized_factory_with_hints(
658            move |materializer| {
659                let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
660                let flow_mat = materialize_flow()?;
661                let stream = match &transform {
662                    FlowTransform::Pure(transform) => transform(stream),
663                    FlowTransform::Runtime(transform) => transform(stream, materializer)?,
664                };
665                Ok((stream, combine(source_mat, flow_mat)))
666            },
667            hints,
668        )
669    }
670
671    #[must_use]
672    pub fn via_mat_with<Next, FlowMat, Combined, F>(
673        self,
674        flow: Flow<Out, Next, FlowMat>,
675        combine: F,
676    ) -> Source<Next, Combined>
677    where
678        Next: Send + 'static,
679        FlowMat: Send + 'static,
680        Combined: Send + 'static,
681        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
682    {
683        self.via_mat(flow, combine)
684    }
685
686    #[must_use]
687    pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
688    where
689        Next: Send + 'static,
690        F: Fn(Out) -> Next + Send + Sync + 'static,
691    {
692        let hints = self.hints.without_inline_micro();
693        Source {
694            factory: Arc::new(MapSourceFactory {
695                source: self.factory,
696                stage: f,
697                _marker: PhantomData,
698            }),
699            terminal_factory: None,
700            hints,
701            attributes: self.attributes,
702            split_hook: None,
703        }
704    }
705
706    #[must_use]
707    pub fn attributes(&self) -> &Attributes {
708        &self.attributes
709    }
710
711    #[must_use]
712    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
713        self.attributes = attributes;
714        self
715    }
716
717    #[must_use]
718    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
719        self.attributes = self.attributes.and(attributes);
720        self
721    }
722
723    #[must_use]
724    pub fn named(self, name: impl Into<String>) -> Self {
725        self.add_attributes(Attributes::named(name))
726    }
727
728    /// Insert an async boundary after this source.
729    ///
730    /// The boundary uses the same [`crate::AsyncBoundaryExecutionConfig`] defaults
731    /// as the GraphDSL `AsyncBoundary` runner and hands elements across a bounded
732    /// Ractor-backed queue. `async_boundary` is the Rust-friendly primary name;
733    /// `Source::r#async` is provided as the Akka-mirroring alias.
734    #[must_use]
735    pub fn async_boundary(self) -> Self {
736        self.via(Flow::identity().async_boundary())
737    }
738
739    /// Akka-mirroring alias for [`Source::async_boundary`].
740    #[must_use]
741    pub fn r#async(self) -> Self {
742        self.async_boundary()
743    }
744
745    /// Insert an async boundary with an explicit bounded handoff configuration.
746    ///
747    /// `config.buffer_size` controls the number of elements that may be queued
748    /// between the upstream and downstream fused regions. A zero buffer is
749    /// rejected when the stream is materialized.
750    #[must_use]
751    pub fn async_boundary_with_config(
752        self,
753        config: crate::graph::AsyncBoundaryExecutionConfig,
754    ) -> Self {
755        self.via(Flow::identity().async_boundary_with_config(config))
756    }
757
758    /// Insert an async boundary with a custom bounded handoff size.
759    #[must_use]
760    pub fn async_boundary_with_buffer(self, buffer_size: usize) -> Self {
761        self.via(Flow::identity().async_boundary_with_buffer(buffer_size))
762    }
763
764    #[must_use]
765    pub fn try_map<Next, F>(self, f: F) -> Source<Next, Mat>
766    where
767        Next: Send + 'static,
768        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
769    {
770        self.via(Flow::identity().try_map(f))
771    }
772
773    /// Deprecated alias for [`Source::try_map`].
774    #[must_use]
775    #[deprecated(
776        since = "0.9.0",
777        note = "renamed to `try_map` (idiomatic); the `_result` name still works"
778    )]
779    pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
780    where
781        Next: Send + 'static,
782        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
783    {
784        self.try_map(f)
785    }
786
787    #[must_use]
788    pub fn map_result_with_supervision<Next, F>(
789        self,
790        f: F,
791        decider: SupervisionDecider,
792    ) -> Source<Next, Mat>
793    where
794        Next: Send + 'static,
795        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
796    {
797        self.via(Flow::identity().map_result_with_supervision(f, decider))
798    }
799
800    #[must_use]
801    pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
802    where
803        F: Fn(&Out) -> bool + Send + Sync + 'static,
804    {
805        self.via(Flow::identity().filter(predicate))
806    }
807
808    #[must_use]
809    pub fn try_filter<F>(self, predicate: F) -> Source<Out, Mat>
810    where
811        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
812    {
813        self.via(Flow::identity().try_filter(predicate))
814    }
815
816    /// Deprecated alias for [`Source::try_filter`].
817    #[must_use]
818    #[deprecated(
819        since = "0.9.0",
820        note = "renamed to `try_filter` (idiomatic); the `_result` name still works"
821    )]
822    pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
823    where
824        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
825    {
826        self.try_filter(predicate)
827    }
828
829    #[must_use]
830    pub fn filter_result_with_supervision<F>(
831        self,
832        predicate: F,
833        decider: SupervisionDecider,
834    ) -> Source<Out, Mat>
835    where
836        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
837    {
838        self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
839    }
840
841    #[must_use]
842    pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
843    where
844        F: Fn(&Out) -> bool + Send + Sync + 'static,
845    {
846        self.via(Flow::identity().filter_not(predicate))
847    }
848
849    #[must_use]
850    pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
851    where
852        Next: Send + 'static,
853        F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
854    {
855        self.via(Flow::identity().filter_map(f))
856    }
857
858    #[must_use]
859    pub fn try_filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
860    where
861        Next: Send + 'static,
862        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
863    {
864        self.via(Flow::identity().try_filter_map(f))
865    }
866
867    /// Deprecated alias for [`Source::try_filter_map`].
868    #[must_use]
869    #[deprecated(
870        since = "0.9.0",
871        note = "renamed to `try_filter_map` (idiomatic); the `_result` name still works"
872    )]
873    pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
874    where
875        Next: Send + 'static,
876        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
877    {
878        self.try_filter_map(f)
879    }
880
881    #[must_use]
882    pub fn filter_map_result_with_supervision<Next, F>(
883        self,
884        f: F,
885        decider: SupervisionDecider,
886    ) -> Source<Next, Mat>
887    where
888        Next: Send + 'static,
889        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
890    {
891        self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
892    }
893
894    #[must_use]
895    pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
896    where
897        Next: Send + 'static,
898        F: Fn(Out) -> I + Send + Sync + 'static,
899        I: IntoIterator<Item = Next>,
900        I::IntoIter: Send + 'static,
901    {
902        self.via(Flow::identity().map_concat(f))
903    }
904
905    #[must_use]
906    pub fn try_map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
907    where
908        Next: Send + 'static,
909        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
910        I: IntoIterator<Item = Next>,
911        I::IntoIter: Send + 'static,
912    {
913        self.via(Flow::identity().try_map_concat(f))
914    }
915
916    /// Deprecated alias for [`Source::try_map_concat`].
917    #[must_use]
918    #[deprecated(
919        since = "0.9.0",
920        note = "renamed to `try_map_concat` (idiomatic); the `_result` name still works"
921    )]
922    pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
923    where
924        Next: Send + 'static,
925        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
926        I: IntoIterator<Item = Next>,
927        I::IntoIter: Send + 'static,
928    {
929        self.try_map_concat(f)
930    }
931
932    #[must_use]
933    pub fn map_concat_result_with_supervision<Next, F, I>(
934        self,
935        f: F,
936        decider: SupervisionDecider,
937    ) -> Source<Next, Mat>
938    where
939        Next: Send + 'static,
940        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
941        I: IntoIterator<Item = Next>,
942        I::IntoIter: Send + 'static,
943    {
944        self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
945    }
946
947    #[must_use]
948    pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
949    where
950        State: Clone + Send + Sync + 'static,
951        Next: Send + 'static,
952        F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
953    {
954        self.via(Flow::identity().stateful_map(seed, f))
955    }
956
957    #[must_use]
958    pub fn try_stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
959    where
960        State: Clone + Send + Sync + 'static,
961        Next: Send + 'static,
962        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
963    {
964        self.via(Flow::identity().try_stateful_map(seed, f))
965    }
966
967    /// Deprecated alias for [`Source::try_stateful_map`].
968    #[must_use]
969    #[deprecated(
970        since = "0.9.0",
971        note = "renamed to `try_stateful_map` (idiomatic); the `_result` name still works"
972    )]
973    pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
974    where
975        State: Clone + Send + Sync + 'static,
976        Next: Send + 'static,
977        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
978    {
979        self.try_stateful_map(seed, f)
980    }
981
982    #[must_use]
983    pub fn stateful_map_result_with_supervision<State, Next, F>(
984        self,
985        seed: State,
986        f: F,
987        decider: SupervisionDecider,
988    ) -> Source<Next, Mat>
989    where
990        State: Clone + Send + Sync + 'static,
991        Next: Send + 'static,
992        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
993    {
994        self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
995    }
996
997    #[must_use]
998    pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
999    where
1000        State: Clone + Send + Sync + 'static,
1001        Next: Send + 'static,
1002        F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
1003        I: IntoIterator<Item = Next>,
1004        I::IntoIter: Send + 'static,
1005    {
1006        self.via(Flow::identity().stateful_map_concat(seed, f))
1007    }
1008
1009    #[must_use]
1010    pub fn try_stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
1011    where
1012        State: Clone + Send + Sync + 'static,
1013        Next: Send + 'static,
1014        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1015        I: IntoIterator<Item = Next>,
1016        I::IntoIter: Send + 'static,
1017    {
1018        self.via(Flow::identity().try_stateful_map_concat(seed, f))
1019    }
1020
1021    /// Deprecated alias for [`Source::try_stateful_map_concat`].
1022    #[must_use]
1023    #[deprecated(
1024        since = "0.9.0",
1025        note = "renamed to `try_stateful_map_concat` (idiomatic); the `_result` name still works"
1026    )]
1027    pub fn stateful_map_concat_result<State, Next, F, I>(
1028        self,
1029        seed: State,
1030        f: F,
1031    ) -> Source<Next, Mat>
1032    where
1033        State: Clone + Send + Sync + 'static,
1034        Next: Send + 'static,
1035        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1036        I: IntoIterator<Item = Next>,
1037        I::IntoIter: Send + 'static,
1038    {
1039        self.try_stateful_map_concat(seed, f)
1040    }
1041
1042    #[must_use]
1043    pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
1044        self,
1045        seed: State,
1046        f: F,
1047        decider: SupervisionDecider,
1048    ) -> Source<Next, Mat>
1049    where
1050        State: Clone + Send + Sync + 'static,
1051        Next: Send + 'static,
1052        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1053        I: IntoIterator<Item = Next>,
1054        I::IntoIter: Send + 'static,
1055    {
1056        self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
1057    }
1058
1059    #[must_use]
1060    pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
1061    where
1062        Next: Send + 'static,
1063        F: Fn(Out) -> Fut + Send + Sync + 'static,
1064        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1065    {
1066        self.via(Flow::identity().map_async(parallelism, f))
1067    }
1068
1069    #[must_use]
1070    pub fn map_async_with_supervision<Next, F, Fut>(
1071        self,
1072        parallelism: usize,
1073        f: F,
1074        decider: SupervisionDecider,
1075    ) -> Source<Next, Mat>
1076    where
1077        Next: Send + 'static,
1078        F: Fn(Out) -> Fut + Send + Sync + 'static,
1079        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1080    {
1081        self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
1082    }
1083
1084    #[must_use]
1085    pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
1086    where
1087        Next: Send + 'static,
1088        F: Fn(Out) -> Fut + Send + Sync + 'static,
1089        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1090    {
1091        self.via(Flow::identity().map_async_unordered(parallelism, f))
1092    }
1093
1094    #[must_use]
1095    pub fn map_async_unordered_with_supervision<Next, F, Fut>(
1096        self,
1097        parallelism: usize,
1098        f: F,
1099        decider: SupervisionDecider,
1100    ) -> Source<Next, Mat>
1101    where
1102        Next: Send + 'static,
1103        F: Fn(Out) -> Fut + Send + Sync + 'static,
1104        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1105    {
1106        self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
1107    }
1108
1109    #[must_use]
1110    pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
1111        self,
1112        parallelism: usize,
1113        per_partition: usize,
1114        partition: Partition,
1115        f: F,
1116    ) -> Source<Next, Mat>
1117    where
1118        Key: Clone + Eq + Hash + Send + 'static,
1119        Next: Send + 'static,
1120        Partition: Fn(&Out) -> Key + Send + Sync + 'static,
1121        F: Fn(Out) -> Fut + Send + Sync + 'static,
1122        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1123    {
1124        self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
1125    }
1126
1127    #[must_use]
1128    pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
1129        self.via(Flow::identity().prefix_and_tail(n))
1130    }
1131
1132    #[must_use]
1133    pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
1134    where
1135        Next: Send + 'static,
1136        FlowMat: Send + 'static,
1137        F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
1138        Out: Clone,
1139    {
1140        self.via(Flow::identity().flat_map_prefix(n, f))
1141    }
1142
1143    #[must_use]
1144    pub fn group_by<Key, F>(
1145        self,
1146        max_substreams: usize,
1147        f: F,
1148        allow_closed_substream_recreation: bool,
1149    ) -> Source<Source<Out>, Mat>
1150    where
1151        Key: Clone + Eq + Hash + Send + 'static,
1152        F: Fn(&Out) -> Key + Send + Sync + 'static,
1153        Out: Clone,
1154    {
1155        let batch_mode = if self.hints.inline_micro.is_some() && !allow_closed_substream_recreation
1156        {
1157            flow::GroupByBatchMode::FiniteEagerNoRecreate
1158        } else {
1159            flow::GroupByBatchMode::Immediate
1160        };
1161        self.via(Flow::identity().group_by_with_batching(
1162            max_substreams,
1163            f,
1164            allow_closed_substream_recreation,
1165            batch_mode,
1166        ))
1167    }
1168
1169    #[must_use]
1170    pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1171    where
1172        F: Fn(&Out) -> bool + Send + Sync + 'static,
1173        Out: Clone,
1174    {
1175        self.via(Flow::identity().split_when(predicate))
1176    }
1177
1178    #[must_use]
1179    pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1180    where
1181        F: Fn(&Out) -> bool + Send + Sync + 'static,
1182        Out: Clone,
1183    {
1184        self.via(Flow::identity().split_after(predicate))
1185    }
1186
1187    #[must_use]
1188    pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
1189    where
1190        Next: Send + 'static,
1191        NextMat: Send + 'static,
1192        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1193    {
1194        self.via(Flow::identity().flat_map_concat(f))
1195    }
1196
1197    #[must_use]
1198    pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
1199    where
1200        Next: Send + 'static,
1201        NextMat: Send + 'static,
1202        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1203    {
1204        self.via(Flow::identity().flat_map_merge(breadth, f))
1205    }
1206
1207    #[must_use]
1208    pub fn take(self, n: usize) -> Source<Out, Mat> {
1209        self.via(Flow::identity().take(n))
1210    }
1211
1212    #[must_use]
1213    pub fn drop(self, n: usize) -> Source<Out, Mat> {
1214        self.via(Flow::identity().drop(n))
1215    }
1216
1217    #[must_use]
1218    pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
1219    where
1220        F: Fn(&Out) -> bool + Send + Sync + 'static,
1221    {
1222        self.via(Flow::identity().take_while(predicate))
1223    }
1224
1225    #[must_use]
1226    pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
1227    where
1228        F: Fn(&Out) -> bool + Send + Sync + 'static,
1229    {
1230        self.via(Flow::identity().drop_while(predicate))
1231    }
1232
1233    #[must_use]
1234    pub fn limit(self, max: u64) -> Source<Out, Mat> {
1235        self.via(Flow::identity().limit(max))
1236    }
1237
1238    #[must_use]
1239    pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
1240        self.via(Flow::identity().grouped(size))
1241    }
1242
1243    #[must_use]
1244    pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1245    where
1246        State: Clone + Send + Sync + 'static,
1247        F: Fn(State, Out) -> State + Send + Sync + 'static,
1248    {
1249        self.via(Flow::identity().scan(seed, f))
1250    }
1251
1252    #[must_use]
1253    pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
1254    where
1255        State: Clone + Send + Sync + 'static,
1256        F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1257        Fut: Future<Output = StreamResult<State>> + Send + 'static,
1258    {
1259        self.via(Flow::identity().scan_async(seed, f))
1260    }
1261
1262    #[must_use]
1263    pub fn try_scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1264    where
1265        State: Clone + Send + Sync + 'static,
1266        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1267    {
1268        self.via(Flow::identity().try_scan(seed, f))
1269    }
1270
1271    /// Deprecated alias for [`Source::try_scan`].
1272    #[must_use]
1273    #[deprecated(
1274        since = "0.9.0",
1275        note = "renamed to `try_scan` (idiomatic); the `_result` name still works"
1276    )]
1277    pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1278    where
1279        State: Clone + Send + Sync + 'static,
1280        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1281    {
1282        self.try_scan(seed, f)
1283    }
1284
1285    #[must_use]
1286    pub fn scan_result_with_supervision<State, F>(
1287        self,
1288        seed: State,
1289        f: F,
1290        decider: SupervisionDecider,
1291    ) -> Source<State, Mat>
1292    where
1293        State: Clone + Send + Sync + 'static,
1294        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1295    {
1296        self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
1297    }
1298
1299    #[must_use]
1300    pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
1301    where
1302        Out: Clone,
1303    {
1304        self.via(Flow::identity().sliding(size, step))
1305    }
1306
1307    #[must_use]
1308    pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1309    where
1310        Acc: Clone + Send + Sync + 'static,
1311        F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1312    {
1313        self.via(Flow::identity().fold(zero, f))
1314    }
1315
1316    #[must_use]
1317    pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1318    where
1319        Acc: Clone + Send + Sync + 'static,
1320        F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1321        Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1322    {
1323        self.via(Flow::identity().fold_async(zero, f))
1324    }
1325
1326    #[must_use]
1327    pub fn map_with_resource<Resource, Next, Create, F, Close>(
1328        self,
1329        create: Create,
1330        f: F,
1331        close: Close,
1332    ) -> Source<Next, Mat>
1333    where
1334        Resource: Send + 'static,
1335        Next: Send + 'static,
1336        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1337        F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1338        Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1339    {
1340        self.via(Flow::identity().map_with_resource(create, f, close))
1341    }
1342
1343    #[must_use]
1344    pub fn try_fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1345    where
1346        Acc: Clone + Send + Sync + 'static,
1347        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1348    {
1349        self.via(Flow::identity().try_fold(zero, f))
1350    }
1351
1352    /// Deprecated alias for [`Source::try_fold`].
1353    #[must_use]
1354    #[deprecated(
1355        since = "0.9.0",
1356        note = "renamed to `try_fold` (idiomatic); the `_result` name still works"
1357    )]
1358    pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1359    where
1360        Acc: Clone + Send + Sync + 'static,
1361        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1362    {
1363        self.try_fold(zero, f)
1364    }
1365
1366    #[must_use]
1367    pub fn fold_result_with_supervision<Acc, F>(
1368        self,
1369        zero: Acc,
1370        f: F,
1371        decider: SupervisionDecider,
1372    ) -> Source<Acc, Mat>
1373    where
1374        Acc: Clone + Send + Sync + 'static,
1375        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1376    {
1377        self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
1378    }
1379
1380    #[must_use]
1381    pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
1382    where
1383        F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1384    {
1385        self.via(Flow::identity().reduce(f))
1386    }
1387
1388    #[must_use]
1389    pub fn try_reduce<F>(self, f: F) -> Source<Out, Mat>
1390    where
1391        Out: Clone,
1392        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1393    {
1394        self.via(Flow::identity().try_reduce(f))
1395    }
1396
1397    /// Deprecated alias for [`Source::try_reduce`].
1398    #[must_use]
1399    #[deprecated(
1400        since = "0.9.0",
1401        note = "renamed to `try_reduce` (idiomatic); the `_result` name still works"
1402    )]
1403    pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
1404    where
1405        Out: Clone,
1406        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1407    {
1408        self.try_reduce(f)
1409    }
1410
1411    #[must_use]
1412    pub fn reduce_result_with_supervision<F>(
1413        self,
1414        f: F,
1415        decider: SupervisionDecider,
1416    ) -> Source<Out, Mat>
1417    where
1418        Out: Clone,
1419        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1420    {
1421        self.via(Flow::identity().reduce_result_with_supervision(f, decider))
1422    }
1423
1424    #[must_use]
1425    pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
1426    where
1427        F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
1428    {
1429        self.via(Flow::identity().map_error(f))
1430    }
1431
1432    #[must_use]
1433    pub fn recover<F>(self, f: F) -> Source<Out, Mat>
1434    where
1435        F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
1436    {
1437        self.via(Flow::identity().recover(f))
1438    }
1439
1440    #[must_use]
1441    pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
1442    where
1443        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1444    {
1445        self.via(Flow::identity().recover_with(f))
1446    }
1447
1448    #[must_use]
1449    pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
1450    where
1451        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1452    {
1453        self.via(Flow::identity().recover_with_retries(retries, f))
1454    }
1455
1456    #[must_use]
1457    pub fn on_error_complete(self) -> Source<Out, Mat> {
1458        self.via(Flow::identity().on_error_complete())
1459    }
1460
1461    #[must_use]
1462    pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1463    where
1464        Mat2: Send + 'static,
1465    {
1466        let factory = self.factory;
1467        let hints = self.hints;
1468        let that_factory = that.factory;
1469        Source::from_materialized_factory_with_hints(
1470            move |materializer| {
1471                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1472                let secondary = match Arc::clone(&that_factory).create(materializer) {
1473                    Ok((stream, _)) => stream,
1474                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1475                };
1476                Ok((concat_source_streams(vec![primary, secondary]), mat))
1477            },
1478            hints,
1479        )
1480    }
1481
1482    #[must_use]
1483    pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1484    where
1485        Mat2: Send + 'static,
1486    {
1487        let factory = self.factory;
1488        let hints = self.hints;
1489        let that_factory = that.factory;
1490        Source::from_materialized_factory_with_hints(
1491            move |materializer| {
1492                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1493                Ok((
1494                    concat_source_streams_lazy(
1495                        primary,
1496                        vec![Arc::clone(&that_factory)],
1497                        materializer,
1498                    ),
1499                    mat,
1500                ))
1501            },
1502            hints,
1503        )
1504    }
1505
1506    #[must_use]
1507    pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
1508    where
1509        Mat2: Send + 'static,
1510        I: IntoIterator<Item = Source<Out, Mat2>>,
1511    {
1512        let factory = self.factory;
1513        let hints = self.hints;
1514        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1515        Source::from_materialized_factory_with_hints(
1516            move |materializer| {
1517                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1518                Ok((
1519                    concat_source_streams_lazy(primary, other_factories.clone(), materializer),
1520                    mat,
1521                ))
1522            },
1523            hints,
1524        )
1525    }
1526
1527    #[must_use]
1528    pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1529    where
1530        Mat2: Send + 'static,
1531    {
1532        let factory = self.factory;
1533        let hints = self.hints;
1534        let that_factory = that.factory;
1535        Source::from_materialized_factory_with_hints(
1536            move |materializer| {
1537                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1538                let secondary = match Arc::clone(&that_factory).create(materializer) {
1539                    Ok((stream, _)) => stream,
1540                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1541                };
1542                Ok((concat_source_streams(vec![secondary, primary]), mat))
1543            },
1544            hints,
1545        )
1546    }
1547
1548    #[must_use]
1549    pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1550    where
1551        Mat2: Send + 'static,
1552    {
1553        self.prepend(that)
1554    }
1555
1556    #[must_use]
1557    pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
1558    where
1559        Mat2: Send + 'static,
1560    {
1561        let factory = self.factory;
1562        let hints = self.hints;
1563        let secondary_factory = secondary.factory;
1564        Source::from_materialized_factory_with_hints(
1565            move |materializer| {
1566                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1567                let secondary = match Arc::clone(&secondary_factory).create(materializer) {
1568                    Ok((stream, _)) => stream,
1569                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1570                };
1571                Ok((or_else_source_stream(primary, secondary), mat))
1572            },
1573            hints,
1574        )
1575    }
1576
1577    #[must_use]
1578    pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
1579    where
1580        Mat2: Send + 'static,
1581    {
1582        self.interleave_all([that], segment_size, false)
1583    }
1584
1585    #[must_use]
1586    pub fn interleave_all<Mat2, I>(
1587        self,
1588        those: I,
1589        segment_size: usize,
1590        eager_close: bool,
1591    ) -> Source<Out, Mat>
1592    where
1593        Mat2: Send + 'static,
1594        I: IntoIterator<Item = Source<Out, Mat2>>,
1595    {
1596        let factory = self.factory;
1597        let hints = self.hints;
1598        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1599        Source::from_materialized_factory_with_hints(
1600            move |materializer| {
1601                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1602                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1603                streams.push(primary);
1604                for other in &other_factories {
1605                    let stream = match Arc::clone(other).create(materializer) {
1606                        Ok((stream, _)) => stream,
1607                        Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1608                    };
1609                    streams.push(stream);
1610                }
1611                Ok((
1612                    interleave_source_streams(streams, segment_size, eager_close),
1613                    mat,
1614                ))
1615            },
1616            hints,
1617        )
1618    }
1619
1620    #[must_use]
1621    pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1622    where
1623        Out: Ord,
1624        Mat2: Send + 'static,
1625    {
1626        self.via(Flow::identity().merge_sorted(that))
1627    }
1628
1629    #[must_use]
1630    pub fn merge_latest<Mat2>(
1631        self,
1632        that: Source<Out, Mat2>,
1633        eager_complete: bool,
1634    ) -> Source<Vec<Out>, Mat>
1635    where
1636        Out: Clone,
1637        Mat2: Send + 'static,
1638    {
1639        let factory = self.factory;
1640        let hints = self.hints;
1641        let that_factory = that.factory;
1642        Source::from_materialized_factory_with_hints(
1643            move |materializer| {
1644                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1645                let right = match Arc::clone(&that_factory).create(materializer) {
1646                    Ok((stream, _)) => stream,
1647                    Err(error) => {
1648                        return Ok((
1649                            Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
1650                            mat,
1651                        ));
1652                    }
1653                };
1654                Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
1655            },
1656            hints,
1657        )
1658    }
1659
1660    #[must_use]
1661    pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
1662    where
1663        Mat2: Send + 'static,
1664        I: IntoIterator<Item = Source<Out, Mat2>>,
1665    {
1666        let factory = self.factory;
1667        let hints = self.hints;
1668        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1669        Source::from_materialized_factory_with_hints(
1670            move |materializer| {
1671                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1672                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1673                streams.push(primary);
1674                for other in &other_factories {
1675                    let stream = match Arc::clone(other).create(materializer) {
1676                        Ok((stream, _)) => stream,
1677                        Err(error) => {
1678                            return Ok((
1679                                Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1680                                mat,
1681                            ));
1682                        }
1683                    };
1684                    streams.push(stream);
1685                }
1686                Ok((merge_streams(streams, eager_complete), mat))
1687            },
1688            hints,
1689        )
1690    }
1691
1692    #[must_use]
1693    pub fn zip_with<Mat2, Out2, Next, F>(
1694        self,
1695        that: Source<Out2, Mat2>,
1696        combine: F,
1697    ) -> Source<Next, Mat>
1698    where
1699        Out2: Send + 'static,
1700        Next: Send + 'static,
1701        Mat2: Send + 'static,
1702        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1703    {
1704        self.via(Flow::identity().zip_with(that, combine))
1705    }
1706
1707    #[must_use]
1708    pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
1709    where
1710        Out: Clone,
1711        Out2: Clone + Send + 'static,
1712        Mat2: Send + 'static,
1713    {
1714        self.zip_latest_with(that, true, |left, right| (left, right))
1715    }
1716
1717    #[must_use]
1718    pub fn zip_latest_with<Mat2, Out2, Next, F>(
1719        self,
1720        that: Source<Out2, Mat2>,
1721        eager_complete: bool,
1722        combine: F,
1723    ) -> Source<Next, Mat>
1724    where
1725        Out: Clone,
1726        Out2: Clone + Send + 'static,
1727        Next: Send + 'static,
1728        Mat2: Send + 'static,
1729        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1730    {
1731        let factory = self.factory;
1732        let hints = self.hints;
1733        let that_factory = that.factory;
1734        let combine = Arc::new(combine);
1735        Source::from_materialized_factory_with_hints(
1736            move |materializer| {
1737                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1738                let right = match Arc::clone(&that_factory).create(materializer) {
1739                    Ok((stream, _)) => stream,
1740                    Err(error) => {
1741                        return Ok((
1742                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1743                            mat,
1744                        ));
1745                    }
1746                };
1747                Ok((
1748                    zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
1749                    mat,
1750                ))
1751            },
1752            hints,
1753        )
1754    }
1755
1756    #[must_use]
1757    pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
1758        let factory = self.factory;
1759        let hints = self.hints;
1760        Source::from_materialized_factory_with_hints(
1761            move |materializer| {
1762                let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
1763                let mut index = 0_u64;
1764                Ok((
1765                    Box::new(std::iter::from_fn(move || {
1766                        stream.next().map(|item| {
1767                            item.map(|value| {
1768                                let pair = (value, index);
1769                                index = index.wrapping_add(1);
1770                                pair
1771                            })
1772                        })
1773                    })) as BoxStream<(Out, u64)>,
1774                    mat,
1775                ))
1776            },
1777            hints,
1778        )
1779    }
1780
1781    #[must_use]
1782    pub fn zip_all<Mat2, Out2>(
1783        self,
1784        that: Source<Out2, Mat2>,
1785        this_elem: Out,
1786        that_elem: Out2,
1787    ) -> Source<(Out, Out2), Mat>
1788    where
1789        Out: Clone + Sync,
1790        Out2: Clone + Send + Sync + 'static,
1791        Mat2: Send + 'static,
1792    {
1793        let factory = self.factory;
1794        let hints = self.hints;
1795        let that_factory = that.factory;
1796        Source::from_materialized_factory_with_hints(
1797            move |materializer| {
1798                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1799                let right = match Arc::clone(&that_factory).create(materializer) {
1800                    Ok((stream, _)) => stream,
1801                    Err(error) => {
1802                        return Ok((
1803                            Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
1804                            mat,
1805                        ));
1806                    }
1807                };
1808                Ok((
1809                    zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
1810                    mat,
1811                ))
1812            },
1813            hints,
1814        )
1815    }
1816
1817    #[must_use]
1818    pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1819    where
1820        Out: Clone,
1821        SinkMat: Send + 'static,
1822    {
1823        self.via(Flow::identity().also_to(sink))
1824    }
1825
1826    #[must_use]
1827    pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
1828    where
1829        Out: Clone,
1830        SinkMat: Send + 'static,
1831        I: IntoIterator<Item = Sink<Out, SinkMat>>,
1832    {
1833        self.via(Flow::identity().also_to_all(sinks))
1834    }
1835
1836    #[must_use]
1837    pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
1838    where
1839        SinkMat: Send + 'static,
1840        F: Fn(&Out) -> bool + Send + Sync + 'static,
1841    {
1842        self.via(Flow::identity().divert_to(sink, predicate))
1843    }
1844
1845    #[must_use]
1846    pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1847    where
1848        Out: Clone,
1849        SinkMat: Send + 'static,
1850    {
1851        self.via(Flow::identity().wire_tap(sink))
1852    }
1853
1854    pub fn run_with<SinkMat: Send + 'static>(
1855        self,
1856        sink: Sink<Out, SinkMat>,
1857    ) -> StreamResult<SinkMat> {
1858        // Split-segment fast path: if source has a segment hook and sink has a
1859        // fold fast-path descriptor, drive the fold inline without spawning a task.
1860        let fast_result = self
1861            .split_hook
1862            .as_ref()
1863            .zip(sink.fold_fp.as_deref())
1864            .and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
1865        if let Some(result) = fast_result {
1866            return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
1867                StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
1868            });
1869        }
1870        self.to_mat(sink, Keep::right).run()
1871    }
1872
1873    pub fn run_with_materializer<SinkMat: Send + 'static>(
1874        self,
1875        sink: Sink<Out, SinkMat>,
1876        materializer: &Materializer,
1877    ) -> StreamResult<SinkMat> {
1878        self.to_mat(sink, Keep::right)
1879            .run_with_materializer(materializer)
1880    }
1881
1882    #[must_use]
1883    pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
1884    where
1885        SinkMat: Send + 'static,
1886    {
1887        self.to_mat(sink, Keep::left)
1888    }
1889
1890    #[must_use]
1891    pub fn to_mat<SinkMat, Combined, F>(
1892        self,
1893        sink: Sink<Out, SinkMat>,
1894        combine: F,
1895    ) -> RunnableGraph<Combined>
1896    where
1897        SinkMat: Send + 'static,
1898        Combined: Send + 'static,
1899        F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
1900    {
1901        let factory = self.factory;
1902        let terminal_factory = self.terminal_factory;
1903        let hints = self.hints;
1904        let combine = Arc::new(combine);
1905        RunnableGraph::from_runner(move |materializer| {
1906            if let Some(terminal_factory) = &terminal_factory
1907                && let Some(fold_fp) = sink.fold_fp.as_deref()
1908            {
1909                let (hook, source_mat) = terminal_factory(materializer)?;
1910                if hook.supports_direct_terminal()
1911                    && let Some(sink_mat) =
1912                        fold_fp.try_register_direct_terminal(Arc::clone(&hook), materializer)
1913                {
1914                    let sink_mat = sink_mat.and_then(|mat| {
1915                        mat.downcast::<SinkMat>().map(|boxed| *boxed).map_err(|_| {
1916                            StreamError::Failed(
1917                                "terminal fast path: unexpected mat type (internal error)".into(),
1918                            )
1919                        })
1920                    })?;
1921                    return Ok(combine(source_mat, sink_mat));
1922                }
1923                if !fold_fp.supports_terminal_drain() {
1924                    let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1925                    let sink_mat = sink.run_from_source(stream, materializer, hints.runtime())?;
1926                    return Ok(combine(source_mat, sink_mat));
1927                }
1928                let sink_mat = fold_fp
1929                    .try_register_terminal_drain(hook, materializer)
1930                    .expect("terminal drain support advertised")
1931                    .and_then(|mat| {
1932                        mat.downcast::<SinkMat>().map(|boxed| *boxed).map_err(|_| {
1933                            StreamError::Failed(
1934                                "terminal fast path: unexpected mat type (internal error)".into(),
1935                            )
1936                        })
1937                    })?;
1938                return Ok(combine(source_mat, sink_mat));
1939            }
1940            let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1941            let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
1942                let stream =
1943                    runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
1944                sink.run_inline(stream, materializer)?
1945            } else {
1946                sink.run_from_source(stream, materializer, hints.runtime())?
1947            };
1948            Ok(combine(source_mat, sink_mat))
1949        })
1950    }
1951
1952    pub fn run_collect(self) -> StreamResult<Vec<Out>> {
1953        self.run_with(Sink::collect())?.wait()
1954    }
1955
1956    pub fn run_fold<Acc, F>(self, zero: Acc, f: F) -> StreamResult<StreamCompletion<Acc>>
1957    where
1958        Acc: Clone + Send + Sync + 'static,
1959        F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1960    {
1961        self.run_with(Sink::fold(zero, f))
1962    }
1963
1964    pub fn run_foreach<F>(self, f: F) -> StreamResult<StreamCompletion<NotUsed>>
1965    where
1966        F: Fn(Out) + Send + Sync + 'static,
1967    {
1968        self.run_with(Sink::foreach(f))
1969    }
1970
1971    pub fn run_for_each<F>(self, f: F) -> StreamResult<StreamCompletion<NotUsed>>
1972    where
1973        F: Fn(Out) + Send + Sync + 'static,
1974    {
1975        self.run_with(Sink::foreach(f))
1976    }
1977
1978    pub fn run_reduce<F>(self, f: F) -> StreamResult<StreamCompletion<Out>>
1979    where
1980        F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1981    {
1982        self.run_with(Sink::reduce(f))
1983    }
1984
1985    #[must_use]
1986    pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
1987    where
1988        NextMat: Send + 'static,
1989        F: Fn(Mat) -> NextMat + Send + Sync + 'static,
1990    {
1991        let factory = self.factory;
1992        let terminal_factory = self.terminal_factory;
1993        let hints = self.hints;
1994        let f = Arc::new(f);
1995        let factory_f = Arc::clone(&f);
1996        let mapped_terminal_factory = terminal_factory.map(|terminal_factory| {
1997            let f = Arc::clone(&f);
1998            Arc::new(move |materializer: &Materializer| {
1999                let (hook, mat) = terminal_factory(materializer)?;
2000                Ok((hook, f(mat)))
2001            }) as Arc<TerminalSourceFactory<Out, NextMat>>
2002        });
2003        Source {
2004            factory: Arc::new(FnSourceFactory(move |materializer: &Materializer| {
2005                let (stream, mat) = Arc::clone(&factory).create(materializer)?;
2006                Ok((stream, factory_f(mat)))
2007            })),
2008            terminal_factory: mapped_terminal_factory,
2009            hints,
2010            attributes: Attributes::default(),
2011            split_hook: None,
2012        }
2013    }
2014}
2015
2016impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
2017    #[must_use]
2018    pub fn combine<Mat1, Mat2, MatRest, I>(
2019        first: Source<Out, Mat1>,
2020        second: Source<Out, Mat2>,
2021        rest: I,
2022        strategy: SourceCombineStrategy,
2023    ) -> Source<Out, NotUsed>
2024    where
2025        Mat1: Send + 'static,
2026        Mat2: Send + 'static,
2027        MatRest: Send + 'static,
2028        I: IntoIterator<Item = Source<Out, MatRest>>,
2029    {
2030        let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
2031            Arc::new(move |materializer| {
2032                Arc::clone(&first.factory)
2033                    .create(materializer)
2034                    .map(|(stream, _)| stream)
2035            }),
2036            Arc::new(move |materializer| {
2037                Arc::clone(&second.factory)
2038                    .create(materializer)
2039                    .map(|(stream, _)| stream)
2040            }),
2041        ];
2042        factories.extend(rest.into_iter().map(|source| {
2043            Arc::new(move |materializer: &Materializer| {
2044                Arc::clone(&source.factory)
2045                    .create(materializer)
2046                    .map(|(stream, _)| stream)
2047            }) as Arc<CombinedSourceFactory<Out>>
2048        }));
2049        Source::from_materialized_factory(move |materializer| {
2050            let mut streams = Vec::with_capacity(factories.len());
2051            for factory in &factories {
2052                let stream = match factory(materializer) {
2053                    Ok(stream) => stream,
2054                    Err(error) => {
2055                        return Ok((
2056                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
2057                            NotUsed,
2058                        ));
2059                    }
2060                };
2061                streams.push(stream);
2062            }
2063            let stream = match &strategy {
2064                SourceCombineStrategy::Merge { eager_complete } => {
2065                    merge_streams(streams, *eager_complete)
2066                }
2067                SourceCombineStrategy::Concat => concat_source_streams(streams),
2068                SourceCombineStrategy::Prioritized {
2069                    priorities,
2070                    eager_complete,
2071                } => {
2072                    if priorities.len() != streams.len() {
2073                        return Err(StreamError::GraphValidation(format!(
2074                            "combine priorities length {} did not match source count {}",
2075                            priorities.len(),
2076                            streams.len()
2077                        )));
2078                    }
2079                    merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
2080                }
2081            };
2082            Ok((stream, NotUsed))
2083        })
2084    }
2085
2086    #[must_use]
2087    pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
2088    where
2089        I: IntoIterator<Item = Source<Out, Mat2>>,
2090        Mat2: Send + 'static,
2091        Out: Clone,
2092    {
2093        Self::zip_with_n(sources, |values| values)
2094    }
2095
2096    #[must_use]
2097    pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
2098    where
2099        I: IntoIterator<Item = Source<Out, Mat2>>,
2100        Mat2: Send + 'static,
2101        Next: Send + 'static,
2102        F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
2103    {
2104        let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
2105        let zipper = Arc::new(zipper);
2106        Source::from_materialized_factory(move |materializer| {
2107            let mut streams = Vec::with_capacity(factories.len());
2108            for factory in &factories {
2109                let stream = match Arc::clone(factory).create(materializer) {
2110                    Ok((stream, _)) => stream,
2111                    Err(error) => {
2112                        return Ok((
2113                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
2114                            NotUsed,
2115                        ));
2116                    }
2117                };
2118                streams.push(stream);
2119            }
2120            Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
2121        })
2122    }
2123
2124    #[must_use]
2125    pub fn merge_prioritized_n<Mat2, I>(
2126        sources_and_priorities: I,
2127        eager_complete: bool,
2128    ) -> Source<Out, NotUsed>
2129    where
2130        I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
2131        Mat2: Send + 'static,
2132    {
2133        let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
2134        if sources_and_priorities.is_empty() {
2135            return Source::empty();
2136        }
2137        let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
2138            .into_iter()
2139            .map(|(source, priority)| (source.factory, priority))
2140            .unzip();
2141        Source::from_materialized_factory(move |materializer| {
2142            let mut streams = Vec::with_capacity(factories.len());
2143            for factory in &factories {
2144                let stream = match Arc::clone(factory).create(materializer) {
2145                    Ok((stream, _)) => stream,
2146                    Err(error) => {
2147                        return Ok((
2148                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
2149                            NotUsed,
2150                        ));
2151                    }
2152                };
2153                streams.push(stream);
2154            }
2155            Ok((
2156                merge_prioritized_streams(streams, priorities.clone(), eager_complete),
2157                NotUsed,
2158            ))
2159        })
2160    }
2161
2162    #[must_use]
2163    pub fn maybe() -> (MaybeHandle<Out>, Self) {
2164        let value = Arc::new(Mutex::new(None));
2165        let handle = MaybeHandle {
2166            value: Arc::clone(&value),
2167        };
2168        let source = Self::from_factory(move || {
2169            let result = value
2170                .lock()
2171                .expect("maybe source poisoned")
2172                .clone()
2173                .unwrap_or(Err(StreamError::MaybeIncomplete));
2174            Box::new(std::iter::once(result))
2175        });
2176        (handle, source)
2177    }
2178
2179    #[must_use]
2180    pub fn single(item: Out) -> Self {
2181        Self::from_factory_with_hints(
2182            move || Box::new(std::iter::once(Ok(item.clone()))),
2183            SourceHints::with_inline_micro(1),
2184        )
2185    }
2186
2187    #[must_use]
2188    pub fn repeat(item: Out) -> Self {
2189        Self::from_factory(move || {
2190            let item = item.clone();
2191            Box::new(std::iter::repeat_with(move || Ok(item.clone())))
2192        })
2193    }
2194
2195    #[must_use]
2196    pub fn from_iterable<I>(items: I) -> Self
2197    where
2198        I: IntoIterator<Item = Out>,
2199    {
2200        items.into_iter().collect()
2201    }
2202}
2203
2204impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
2205    fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
2206        // Share the backing storage across materializations and clone elements
2207        // lazily on demand, instead of cloning the whole container each run.
2208        let items: Arc<[Out]> = iter.into_iter().collect();
2209        let len = items.len();
2210        Self::from_factory_with_hints(
2211            move || {
2212                let items = Arc::clone(&items);
2213                let mut index = 0;
2214                Box::new(std::iter::from_fn(move || {
2215                    let item = items.get(index)?.clone();
2216                    index += 1;
2217                    Some(Ok(item))
2218                }))
2219            },
2220            // Exact success item count: enables the coordinator inline drain path.
2221            SourceHints::with_inline_micro(len),
2222        )
2223    }
2224}
2225
2226impl<Out: Clone + Send + Sync + 'static> From<Vec<Out>> for Source<Out, NotUsed> {
2227    fn from(items: Vec<Out>) -> Self {
2228        Source::from_iter(items)
2229    }
2230}
2231
2232impl<Out: Clone + Send + Sync + 'static, const N: usize> From<[Out; N]> for Source<Out, NotUsed> {
2233    fn from(items: [Out; N]) -> Self {
2234        Source::from_iter(items)
2235    }
2236}
2237
2238/// Converts any [`IntoIterator`] into a [`Source`] explicitly.
2239///
2240/// `Vec<T>` and `[T; N]` also implement `From` for `Source<T>`, while broader
2241/// iterables such as ranges intentionally use this trait to avoid a blanket
2242/// `From<I: IntoIterator>` inference trap.
2243///
2244/// ```
2245/// use datum::{IntoSource, Source};
2246///
2247/// let via_vec_into: Source<u64> = vec![1, 2, 3].into();
2248/// let via_array_into: Source<u64> = [1, 2, 3].into();
2249/// let via_range = (0_u64..3).into_source();
2250/// let via_from_iter = Source::from_iter(0_u64..3);
2251/// let via_from_iterator: Source<u64> = (0_u64..3).collect();
2252///
2253/// let _ = (
2254///     via_vec_into,
2255///     via_array_into,
2256///     via_range,
2257///     via_from_iter,
2258///     via_from_iterator,
2259/// );
2260/// ```
2261///
2262/// ```compile_fail
2263/// use datum::Source;
2264///
2265/// let _source: Source<u64> = (0_u64..3).into();
2266/// ```
2267pub trait IntoSource: IntoIterator + Sized {
2268    #[must_use]
2269    fn into_source(self) -> Source<Self::Item, NotUsed>
2270    where
2271        Self::Item: Clone + Send + Sync + 'static,
2272    {
2273        Source::from_iter(self)
2274    }
2275}
2276
2277impl<I> IntoSource for I where I: IntoIterator {}
2278
2279/// Test-only helper: create a Source whose factory calls the given closure but
2280/// marks it as inline-micro eligible. Used to test the inline drain path with
2281/// custom (possibly blocking) iterators without broadening production
2282/// eligibility.
2283#[cfg(test)]
2284pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
2285    factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
2286    max_success_items: usize,
2287) -> Source<Out, NotUsed> {
2288    Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
2289}
2290
2291struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
2292where
2293    Create: Fn() -> StreamResult<Resource>,
2294    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2295    Close: Fn(Resource) -> StreamResult<()>,
2296{
2297    create: Arc<Create>,
2298    read: Arc<Read>,
2299    close: Arc<Close>,
2300    resource: Option<Resource>,
2301    created: bool,
2302    terminated: bool,
2303    _marker: PhantomData<fn() -> Out>,
2304}
2305
2306impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
2307where
2308    Create: Fn() -> StreamResult<Resource>,
2309    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2310    Close: Fn(Resource) -> StreamResult<()>,
2311{
2312    fn ensure_created(&mut self) -> StreamResult<()> {
2313        if self.created {
2314            return Ok(());
2315        }
2316        self.created = true;
2317        let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
2318            .and_then(|result| result)?;
2319        self.resource = Some(resource);
2320        Ok(())
2321    }
2322
2323    fn close_resource(&mut self) -> StreamResult<()> {
2324        match self.resource.take() {
2325            Some(resource) => {
2326                catch_unwind_failed("unfold_resource close", || (self.close)(resource))
2327                    .and_then(|result| result)
2328            }
2329            None => Ok(()),
2330        }
2331    }
2332}
2333
2334impl<Resource, Out, Create, Read, Close> Iterator
2335    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2336where
2337    Create: Fn() -> StreamResult<Resource>,
2338    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2339    Close: Fn(Resource) -> StreamResult<()>,
2340{
2341    type Item = StreamResult<Out>;
2342
2343    fn next(&mut self) -> Option<Self::Item> {
2344        if self.terminated {
2345            return None;
2346        }
2347        if let Err(error) = self.ensure_created() {
2348            self.terminated = true;
2349            return Some(Err(error));
2350        }
2351
2352        let result = {
2353            let resource = self
2354                .resource
2355                .as_mut()
2356                .expect("unfold_resource resource is open");
2357            catch_unwind_failed("unfold_resource read", || (self.read)(resource))
2358                .and_then(|result| result)
2359        };
2360
2361        match result {
2362            Ok(Some(item)) => Some(Ok(item)),
2363            Ok(None) => {
2364                self.terminated = true;
2365                match self.close_resource() {
2366                    Ok(()) => None,
2367                    Err(error) => Some(Err(error)),
2368                }
2369            }
2370            Err(read_error) => {
2371                self.terminated = true;
2372                let _ = self.close_resource();
2373                Some(Err(read_error))
2374            }
2375        }
2376    }
2377}
2378
2379impl<Resource, Out, Create, Read, Close> Drop
2380    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2381where
2382    Create: Fn() -> StreamResult<Resource>,
2383    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2384    Close: Fn(Resource) -> StreamResult<()>,
2385{
2386    fn drop(&mut self) {
2387        let _ = self.close_resource();
2388    }
2389}
2390
2391type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
2392    fn() -> (Out, CreateFut, ReadFut, CloseFut);
2393
2394struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2395where
2396    Resource: Send + 'static,
2397    Create: Fn() -> CreateFut,
2398    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2399    Read: Fn(&mut Resource) -> ReadFut,
2400    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2401    Close: Fn(Resource) -> CloseFut,
2402    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2403{
2404    create: Arc<Create>,
2405    read: Arc<Read>,
2406    close: Arc<Close>,
2407    resource: Option<Resource>,
2408    created: bool,
2409    terminated: bool,
2410    _marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
2411}
2412
2413impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2414    UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2415where
2416    Resource: Send + 'static,
2417    Create: Fn() -> CreateFut,
2418    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2419    Read: Fn(&mut Resource) -> ReadFut,
2420    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2421    Close: Fn(Resource) -> CloseFut,
2422    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2423{
2424    fn ensure_created(&mut self) -> StreamResult<()> {
2425        if self.created {
2426            return Ok(());
2427        }
2428        self.created = true;
2429        let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
2430            .and_then(flow::run_future_inline_or_spawn)?;
2431        self.resource = Some(resource);
2432        Ok(())
2433    }
2434
2435    fn close_resource(&mut self) -> StreamResult<()> {
2436        match self.resource.take() {
2437            Some(resource) => {
2438                catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
2439                    .and_then(flow::run_future_inline_or_spawn)
2440            }
2441            None => Ok(()),
2442        }
2443    }
2444}
2445
2446impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
2447    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2448where
2449    Resource: Send + 'static,
2450    Out: Send + 'static,
2451    Create: Fn() -> CreateFut,
2452    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2453    Read: Fn(&mut Resource) -> ReadFut,
2454    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2455    Close: Fn(Resource) -> CloseFut,
2456    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2457{
2458    type Item = StreamResult<Out>;
2459
2460    fn next(&mut self) -> Option<Self::Item> {
2461        if self.terminated {
2462            return None;
2463        }
2464        if let Err(error) = self.ensure_created() {
2465            self.terminated = true;
2466            return Some(Err(error));
2467        }
2468
2469        let result = {
2470            let resource = self
2471                .resource
2472                .as_mut()
2473                .expect("unfold_resource_async resource is open");
2474            catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
2475                .and_then(flow::run_future_inline_or_spawn)
2476        };
2477
2478        match result {
2479            Ok(Some(item)) => Some(Ok(item)),
2480            Ok(None) => {
2481                self.terminated = true;
2482                match self.close_resource() {
2483                    Ok(()) => None,
2484                    Err(error) => Some(Err(error)),
2485                }
2486            }
2487            Err(read_error) => {
2488                self.terminated = true;
2489                let _ = self.close_resource();
2490                Some(Err(read_error))
2491            }
2492        }
2493    }
2494}
2495
2496impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
2497    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2498where
2499    Resource: Send + 'static,
2500    Create: Fn() -> CreateFut,
2501    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2502    Read: Fn(&mut Resource) -> ReadFut,
2503    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2504    Close: Fn(Resource) -> CloseFut,
2505    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2506{
2507    fn drop(&mut self) {
2508        let _ = self.close_resource();
2509    }
2510}
2511
2512struct LazySourceStream<Out, InnerMat, F> {
2513    create: Arc<F>,
2514    materializer: Materializer,
2515    current: Option<BoxStream<Out>>,
2516    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2517    initialized: bool,
2518    terminated: bool,
2519}
2520
2521impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
2522where
2523    Out: Send + 'static,
2524    InnerMat: Send + 'static,
2525    F: Fn() -> Source<Out, InnerMat>,
2526{
2527    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2528        if let Some(sender) = self.mat_sender.take() {
2529            let _ = sender.send(result);
2530        }
2531    }
2532
2533    fn initialize(&mut self) -> StreamResult<()> {
2534        if self.initialized {
2535            return Ok(());
2536        }
2537        self.initialized = true;
2538        let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
2539            Ok(source) => source,
2540            Err(error) => {
2541                self.complete_mat(Err(error.clone()));
2542                return Err(error);
2543            }
2544        };
2545        match Arc::clone(&source.factory).create(&self.materializer) {
2546            Ok((stream, mat)) => {
2547                self.current = Some(stream);
2548                self.complete_mat(Ok(mat));
2549                Ok(())
2550            }
2551            Err(error) => {
2552                self.complete_mat(Err(error.clone()));
2553                Err(error)
2554            }
2555        }
2556    }
2557}
2558
2559impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
2560where
2561    Out: Send + 'static,
2562    InnerMat: Send + 'static,
2563    F: Fn() -> Source<Out, InnerMat>,
2564{
2565    type Item = StreamResult<Out>;
2566
2567    fn next(&mut self) -> Option<Self::Item> {
2568        if self.terminated {
2569            return None;
2570        }
2571        if let Err(error) = self.initialize() {
2572            self.terminated = true;
2573            return Some(Err(error));
2574        }
2575        match self
2576            .current
2577            .as_mut()
2578            .expect("lazy_source current stream initialized")
2579            .next()
2580        {
2581            Some(Ok(item)) => Some(Ok(item)),
2582            Some(Err(error)) => {
2583                self.terminated = true;
2584                Some(Err(error))
2585            }
2586            None => {
2587                self.terminated = true;
2588                None
2589            }
2590        }
2591    }
2592}
2593
2594impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
2595    fn drop(&mut self) {
2596        if !self.initialized
2597            && let Some(sender) = self.mat_sender.take()
2598        {
2599            let _ = sender.send(Err(StreamError::Failed(
2600                "lazy source was never materialized".into(),
2601            )));
2602        }
2603    }
2604}
2605
2606struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2607    create: Arc<F>,
2608    materializer: Materializer,
2609    current: Option<BoxStream<Out>>,
2610    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2611    initialized: bool,
2612    terminated: bool,
2613    _marker: PhantomData<fn() -> Fut>,
2614}
2615
2616impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
2617where
2618    Out: Send + 'static,
2619    InnerMat: Send + 'static,
2620    F: Fn() -> Fut,
2621    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2622{
2623    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2624        if let Some(sender) = self.mat_sender.take() {
2625            let _ = sender.send(result);
2626        }
2627    }
2628
2629    fn initialize(&mut self) -> StreamResult<()> {
2630        if self.initialized {
2631            return Ok(());
2632        }
2633        self.initialized = true;
2634        let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
2635            .and_then(flow::run_future_inline_or_spawn)
2636        {
2637            Ok(source) => source,
2638            Err(error) => {
2639                self.complete_mat(Err(error.clone()));
2640                return Err(error);
2641            }
2642        };
2643        match Arc::clone(&source.factory).create(&self.materializer) {
2644            Ok((stream, mat)) => {
2645                self.current = Some(stream);
2646                self.complete_mat(Ok(mat));
2647                Ok(())
2648            }
2649            Err(error) => {
2650                self.complete_mat(Err(error.clone()));
2651                Err(error)
2652            }
2653        }
2654    }
2655}
2656
2657impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
2658where
2659    Out: Send + 'static,
2660    InnerMat: Send + 'static,
2661    F: Fn() -> Fut,
2662    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2663{
2664    type Item = StreamResult<Out>;
2665
2666    fn next(&mut self) -> Option<Self::Item> {
2667        if self.terminated {
2668            return None;
2669        }
2670        if let Err(error) = self.initialize() {
2671            self.terminated = true;
2672            return Some(Err(error));
2673        }
2674        match self
2675            .current
2676            .as_mut()
2677            .expect("lazy_future_source current stream initialized")
2678            .next()
2679        {
2680            Some(Ok(item)) => Some(Ok(item)),
2681            Some(Err(error)) => {
2682                self.terminated = true;
2683                Some(Err(error))
2684            }
2685            None => {
2686                self.terminated = true;
2687                None
2688            }
2689        }
2690    }
2691}
2692
2693impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2694    fn drop(&mut self) {
2695        if !self.initialized
2696            && let Some(sender) = self.mat_sender.take()
2697        {
2698            let _ = sender.send(Err(StreamError::Failed(
2699                "lazy future source was never materialized".into(),
2700            )));
2701        }
2702    }
2703}
2704
2705fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
2706where
2707    Out: Send + 'static,
2708{
2709    let mut streams: VecDeque<_> = streams.into();
2710    let mut current = streams.pop_front();
2711    Box::new(std::iter::from_fn(move || {
2712        loop {
2713            match current.as_mut() {
2714                Some(stream) => match stream.next() {
2715                    Some(item) => return Some(item),
2716                    None => current = streams.pop_front(),
2717                },
2718                None => return None,
2719            }
2720        }
2721    }))
2722}
2723
2724fn concat_source_streams_lazy<Out, Mat>(
2725    initial: BoxStream<Out>,
2726    factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
2727    materializer: &Materializer,
2728) -> BoxStream<Out>
2729where
2730    Out: Send + 'static,
2731    Mat: Send + 'static,
2732{
2733    let mut current = Some(initial);
2734    let mut remaining: VecDeque<_> = factories.into();
2735    let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
2736    Box::new(std::iter::from_fn(move || {
2737        loop {
2738            match current.as_mut() {
2739                Some(stream) => match stream.next() {
2740                    Some(item) => return Some(item),
2741                    None => {
2742                        current = remaining.pop_front().map(|factory| {
2743                            match factory.create(&materializer) {
2744                                Ok((stream, _)) => stream,
2745                                Err(error) => {
2746                                    Box::new(std::iter::once(Err(error))) as BoxStream<Out>
2747                                }
2748                            }
2749                        });
2750                    }
2751                },
2752                None => return None,
2753            }
2754        }
2755    }))
2756}
2757
2758fn or_else_source_stream<Out>(
2759    mut primary: BoxStream<Out>,
2760    mut secondary: BoxStream<Out>,
2761) -> BoxStream<Out>
2762where
2763    Out: Send + 'static,
2764{
2765    let mut primary_emitted = false;
2766    let mut using_secondary = false;
2767    Box::new(std::iter::from_fn(move || {
2768        loop {
2769            if using_secondary {
2770                return secondary.next();
2771            }
2772
2773            match primary.next() {
2774                Some(Ok(item)) => {
2775                    primary_emitted = true;
2776                    return Some(Ok(item));
2777                }
2778                Some(Err(error)) => return Some(Err(error)),
2779                None if primary_emitted => return None,
2780                None => using_secondary = true,
2781            }
2782        }
2783    }))
2784}
2785
2786fn interleave_source_streams<Out>(
2787    streams: Vec<BoxStream<Out>>,
2788    segment_size: usize,
2789    eager_close: bool,
2790) -> BoxStream<Out>
2791where
2792    Out: Send + 'static,
2793{
2794    if segment_size == 0 {
2795        return Box::new(std::iter::once(Err(StreamError::GraphValidation(
2796            "interleave segment size must be greater than zero".into(),
2797        ))));
2798    }
2799
2800    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
2801    let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
2802    let mut current = 0usize;
2803    let mut emitted = 0usize;
2804    Box::new(std::iter::from_fn(move || {
2805        loop {
2806            if streams.iter().all(Option::is_none) {
2807                return None;
2808            }
2809            if streams[current].is_none() {
2810                match next_active_source_stream(&streams, current) {
2811                    Some(next) => {
2812                        current = next;
2813                        emitted = 0;
2814                    }
2815                    None => return None,
2816                }
2817            }
2818
2819            let Some(stream) = streams[current].as_mut() else {
2820                continue;
2821            };
2822            let next_item = pending[current].take().or_else(|| stream.next());
2823            match next_item {
2824                Some(Ok(item)) => {
2825                    emitted += 1;
2826                    if emitted == segment_size {
2827                        emitted = 0;
2828                        if let Some(next) = next_active_source_stream(&streams, current) {
2829                            current = next;
2830                        }
2831                    }
2832                    return Some(Ok(item));
2833                }
2834                Some(Err(error)) => return Some(Err(error)),
2835                None => {
2836                    streams[current] = None;
2837                    emitted = 0;
2838                    if eager_close {
2839                        return None;
2840                    }
2841                    match next_active_source_stream(&streams, current) {
2842                        Some(next) => current = next,
2843                        None => return None,
2844                    }
2845                }
2846            }
2847        }
2848    }))
2849}
2850
2851fn next_active_source_stream<Out>(
2852    streams: &[Option<BoxStream<Out>>],
2853    current: usize,
2854) -> Option<usize>
2855where
2856    Out: Send + 'static,
2857{
2858    if streams.is_empty() {
2859        return None;
2860    }
2861    for offset in 1..=streams.len() {
2862        let index = (current + offset) % streams.len();
2863        if streams[index].is_some() {
2864            return Some(index);
2865        }
2866    }
2867    None
2868}