Skip to main content

datum/stream/
source.rs

1use super::flow::{self, FlowTransform};
2use super::*;
3use crate::Attributes;
4use crate::context::SourceWithContext;
5
6#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
7pub struct NotUsed;
8
9type CombinedSourceFactory<Out> =
10    dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
11
12pub struct Keep;
13
14impl Keep {
15    pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
16        left
17    }
18
19    pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
20        right
21    }
22
23    pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
24        (left, right)
25    }
26
27    pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
28        NotUsed
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum SourceCombineStrategy {
34    Merge {
35        eager_complete: bool,
36    },
37    Concat,
38    Prioritized {
39        priorities: Vec<usize>,
40        eager_complete: bool,
41    },
42}
43
44#[derive(Clone)]
45pub struct MaybeHandle<T> {
46    value: Arc<Mutex<Option<StreamResult<T>>>>,
47}
48
49impl<T> MaybeHandle<T> {
50    #[must_use]
51    pub fn is_completed(&self) -> bool {
52        self.value.lock().expect("maybe handle poisoned").is_some()
53    }
54
55    pub fn complete(&self, item: T) -> StreamResult<()> {
56        self.settle(Ok(item))
57    }
58
59    pub fn fail(&self, error: StreamError) -> StreamResult<()> {
60        self.settle(Err(error))
61    }
62
63    fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
64        let mut value = self.value.lock().expect("maybe handle poisoned");
65        if value.is_some() {
66            return Err(StreamError::Failed("maybe source already completed".into()));
67        }
68        *value = Some(result);
69        Ok(())
70    }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub struct Demand(u64);
75
76impl Demand {
77    pub const ZERO: Self = Self(0);
78    pub const ONE: Self = Self(1);
79    pub const MAX: Self = Self(u64::MAX);
80
81    #[must_use]
82    pub const fn new(available: u64) -> Self {
83        Self(available)
84    }
85
86    #[must_use]
87    pub const fn available(self) -> u64 {
88        self.0
89    }
90
91    #[must_use]
92    pub const fn is_unbounded(self) -> bool {
93        self.0 == u64::MAX
94    }
95
96    #[must_use]
97    pub const fn is_empty(self) -> bool {
98        self.0 == 0
99    }
100
101    #[must_use]
102    pub const fn saturating_add(self, rhs: Self) -> Self {
103        Self(self.0.saturating_add(rhs.0))
104    }
105
106    pub fn consume_one(&mut self) -> bool {
107        match self.0 {
108            0 => false,
109            u64::MAX => true,
110            _ => {
111                self.0 -= 1;
112                true
113            }
114        }
115    }
116}
117
118pub trait PushOutlet<T>: Send {
119    fn push(&mut self, item: T) -> StreamResult<Demand>;
120
121    fn complete(&mut self) -> StreamResult<()> {
122        Ok(())
123    }
124
125    fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
126        Err(cause)
127    }
128}
129
130#[derive(Clone)]
131pub struct Source<Out, Mat = NotUsed> {
132    pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
133    pub(crate) terminal_factory: Option<Arc<TerminalSourceFactory<Out, Mat>>>,
134    pub(super) hints: SourceHints,
135    pub(super) attributes: Attributes,
136    /// Set only for split-segment sources in fast mode. `None` for all other sources.
137    /// Cleared by all composition operators (via/map/to_mat etc.).
138    pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
139}
140
141pub(crate) type TerminalSourceFactory<Out, Mat> =
142    dyn Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)> + Send + Sync;
143
144impl<Out: Send + 'static> Source<Out, NotUsed> {
145    pub(super) fn from_factory<F>(factory: F) -> Self
146    where
147        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
148    {
149        Self::from_factory_with_hints(factory, SourceHints::default())
150    }
151
152    fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
153    where
154        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
155    {
156        Self::from_materialized_factory_with_hints(
157            move |_materializer| Ok((factory(), NotUsed)),
158            hints,
159        )
160    }
161
162    #[must_use]
163    pub fn empty() -> Self {
164        Self::from_factory_with_hints(
165            || Box::new(std::iter::empty()),
166            SourceHints::with_inline_micro(0),
167        )
168    }
169
170    #[must_use]
171    pub fn never() -> Self {
172        Self::from_materialized_factory_with_hints(
173            move |materializer| {
174                let state = Arc::clone(&materializer.inner.state);
175                Ok((
176                    Box::new(std::iter::from_fn(move || {
177                        loop {
178                            if state.shutdown.load(Ordering::SeqCst) {
179                                return Some(Err(StreamError::AbruptTermination));
180                            }
181                            if super::runtime::current_stream_cancelled()
182                                .as_ref()
183                                .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
184                            {
185                                return Some(Err(StreamError::Cancelled));
186                            }
187                            std::thread::park_timeout(Duration::from_millis(1));
188                        }
189                    })),
190                    NotUsed,
191                ))
192            },
193            SourceHints::default(),
194        )
195    }
196
197    #[must_use]
198    pub fn failed(error: StreamError) -> Self {
199        Self::from_factory_with_hints(
200            move || Box::new(std::iter::once(Err(error.clone()))),
201            // max_success_items=0: failed() emits no Ok items before the Err.
202            SourceHints::with_inline_micro(0),
203        )
204    }
205
206    #[must_use]
207    pub fn future<F, Fut>(future: F) -> Self
208    where
209        F: Fn() -> Fut + Send + Sync + 'static,
210        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
211    {
212        let future = Arc::new(future);
213        Self::from_factory(move || {
214            let future = Arc::clone(&future);
215            let mut emitted = false;
216            Box::new(std::iter::from_fn(move || {
217                if emitted {
218                    return None;
219                }
220                emitted = true;
221                Some(
222                    catch_unwind_failed("source future factory", || future())
223                        .and_then(flow::run_future_inline_or_spawn),
224                )
225            }))
226        })
227    }
228
229    #[must_use]
230    pub fn future_source<F, Fut>(future: F) -> Self
231    where
232        F: Fn() -> Fut + Send + Sync + 'static,
233        Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
234    {
235        let future = Arc::new(future);
236        Self::from_materialized_factory(move |materializer| {
237            let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
238            let future = Arc::clone(&future);
239            let mut current = None::<BoxStream<Out>>;
240            let mut initialized = false;
241            let mut terminated = false;
242            Ok((
243                Box::new(std::iter::from_fn(move || {
244                    if terminated {
245                        return None;
246                    }
247
248                    loop {
249                        if let Some(stream) = current.as_mut() {
250                            match stream.next() {
251                                Some(item) => return Some(item),
252                                None => {
253                                    terminated = true;
254                                    return None;
255                                }
256                            }
257                        }
258
259                        if initialized {
260                            terminated = true;
261                            return None;
262                        }
263                        initialized = true;
264                        let source = match catch_unwind_failed("future_source factory", || future())
265                            .and_then(flow::run_future_inline_or_spawn)
266                        {
267                            Ok(source) => source,
268                            Err(error) => {
269                                terminated = true;
270                                return Some(Err(error));
271                            }
272                        };
273                        current = Some(match Arc::clone(&source.factory).create(&materializer) {
274                            Ok((stream, _)) => stream,
275                            Err(error) => {
276                                terminated = true;
277                                return Some(Err(error));
278                            }
279                        });
280                    }
281                })) as BoxStream<Out>,
282                NotUsed,
283            ))
284        })
285    }
286
287    #[must_use]
288    pub fn cycle<F, I>(factory: F) -> Self
289    where
290        F: Fn() -> I + Send + Sync + 'static,
291        I: IntoIterator<Item = Out>,
292        I::IntoIter: Send + 'static,
293    {
294        let factory = Arc::new(factory);
295        Self::from_factory(move || {
296            let factory = Arc::clone(&factory);
297            let mut current = None::<I::IntoIter>;
298            let mut terminated = false;
299            Box::new(std::iter::from_fn(move || {
300                if terminated {
301                    return None;
302                }
303
304                if let Some(iter) = current.as_mut()
305                    && let Some(item) = iter.next()
306                {
307                    return Some(Ok(item));
308                }
309
310                let mut next = match catch_unwind_failed("cycle factory", || factory()) {
311                    Ok(iterable) => iterable.into_iter(),
312                    Err(error) => {
313                        terminated = true;
314                        return Some(Err(error));
315                    }
316                };
317                match next.next() {
318                    Some(item) => {
319                        current = Some(next);
320                        Some(Ok(item))
321                    }
322                    None => {
323                        terminated = true;
324                        Some(Err(StreamError::Failed("empty iterator".into())))
325                    }
326                }
327            }))
328        })
329    }
330
331    #[must_use]
332    pub fn unfold<State, F>(initial: State, f: F) -> Self
333    where
334        State: Clone + Send + Sync + 'static,
335        F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
336    {
337        let f = Arc::new(f);
338        Self::from_factory(move || {
339            let f = Arc::clone(&f);
340            let mut state = Some(initial.clone());
341            let mut terminated = false;
342            Box::new(std::iter::from_fn(move || {
343                if terminated {
344                    return None;
345                }
346                let current = state.take().expect("unfold state present");
347                match catch_unwind_failed("unfold function", || f(current)) {
348                    Ok(Some((next_state, item))) => {
349                        state = Some(next_state);
350                        Some(Ok(item))
351                    }
352                    Ok(None) => {
353                        terminated = true;
354                        None
355                    }
356                    Err(error) => {
357                        terminated = true;
358                        Some(Err(error))
359                    }
360                }
361            }))
362        })
363    }
364
365    #[must_use]
366    pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
367    where
368        State: Clone + Send + Sync + 'static,
369        F: Fn(State) -> Fut + Send + Sync + 'static,
370        Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
371    {
372        let f = Arc::new(f);
373        Self::from_factory(move || {
374            let f = Arc::clone(&f);
375            let mut state = Some(initial.clone());
376            let mut terminated = false;
377            Box::new(std::iter::from_fn(move || {
378                if terminated {
379                    return None;
380                }
381                let current = state.take().expect("unfold_async state present");
382                match catch_unwind_failed("unfold_async factory", || f(current))
383                    .and_then(flow::run_future_inline_or_spawn)
384                {
385                    Ok(Some((next_state, item))) => {
386                        state = Some(next_state);
387                        Some(Ok(item))
388                    }
389                    Ok(None) => {
390                        terminated = true;
391                        None
392                    }
393                    Err(error) => {
394                        terminated = true;
395                        Some(Err(error))
396                    }
397                }
398            }))
399        })
400    }
401
402    #[must_use]
403    pub fn unfold_resource<Resource, Create, Read, Close>(
404        create: Create,
405        read: Read,
406        close: Close,
407    ) -> Self
408    where
409        Resource: Send + 'static,
410        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
411        Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
412        Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
413    {
414        let create = Arc::new(create);
415        let read = Arc::new(read);
416        let close = Arc::new(close);
417        Self::from_factory(move || {
418            Box::new(UnfoldResourceStream {
419                create: Arc::clone(&create),
420                read: Arc::clone(&read),
421                close: Arc::clone(&close),
422                resource: None,
423                created: false,
424                terminated: false,
425                _marker: PhantomData,
426            })
427        })
428    }
429
430    #[must_use]
431    pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
432        create: Create,
433        read: Read,
434        close: Close,
435    ) -> Self
436    where
437        Resource: Send + 'static,
438        Create: Fn() -> CreateFut + Send + Sync + 'static,
439        CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
440        Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
441        ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
442        Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
443        CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
444    {
445        let create = Arc::new(create);
446        let read = Arc::new(read);
447        let close = Arc::new(close);
448        Self::from_factory(move || {
449            Box::new(UnfoldResourceAsyncStream {
450                create: Arc::clone(&create),
451                read: Arc::clone(&read),
452                close: Arc::clone(&close),
453                resource: None,
454                created: false,
455                terminated: false,
456                _marker: PhantomData,
457            })
458        })
459    }
460
461    #[must_use]
462    pub fn lazy_single<F>(create: F) -> Self
463    where
464        F: Fn() -> Out + Send + Sync + 'static,
465    {
466        let create = Arc::new(create);
467        Self::from_factory(move || {
468            let create = Arc::clone(&create);
469            let mut emitted = false;
470            Box::new(std::iter::from_fn(move || {
471                if emitted {
472                    return None;
473                }
474                emitted = true;
475                Some(catch_unwind_failed("lazy_single factory", || create()))
476            }))
477        })
478    }
479
480    #[must_use]
481    pub fn lazy_future<F, Fut>(create: F) -> Self
482    where
483        F: Fn() -> Fut + Send + Sync + 'static,
484        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
485    {
486        let create = Arc::new(create);
487        Self::from_factory(move || {
488            let create = Arc::clone(&create);
489            let mut emitted = false;
490            Box::new(std::iter::from_fn(move || {
491                if emitted {
492                    return None;
493                }
494                emitted = true;
495                Some(
496                    catch_unwind_failed("lazy_future factory", || create())
497                        .and_then(flow::run_future_inline_or_spawn),
498                )
499            }))
500        })
501    }
502
503    #[must_use]
504    pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
505    where
506        InnerMat: Send + 'static,
507        F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
508    {
509        let create = Arc::new(create);
510        Source::from_materialized_factory(move |materializer| {
511            let (sender, receiver) = oneshot::channel();
512            Ok((
513                Box::new(LazySourceStream {
514                    create: Arc::clone(&create),
515                    materializer: materializer
516                        .with_name_prefix(materializer.name_prefix().to_owned()),
517                    current: None,
518                    mat_sender: Some(sender),
519                    initialized: false,
520                    terminated: false,
521                }) as BoxStream<Out>,
522                StreamCompletion::from_receiver(receiver, None),
523            ))
524        })
525    }
526
527    #[must_use]
528    pub fn lazy_future_source<InnerMat, F, Fut>(
529        create: F,
530    ) -> Source<Out, StreamCompletion<InnerMat>>
531    where
532        InnerMat: Send + 'static,
533        F: Fn() -> Fut + Send + Sync + 'static,
534        Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
535    {
536        let create = Arc::new(create);
537        Source::from_materialized_factory(move |materializer| {
538            let (sender, receiver) = oneshot::channel();
539            Ok((
540                Box::new(LazyFutureSourceStream {
541                    create: Arc::clone(&create),
542                    materializer: materializer
543                        .with_name_prefix(materializer.name_prefix().to_owned()),
544                    current: None,
545                    mat_sender: Some(sender),
546                    initialized: false,
547                    terminated: false,
548                    _marker: PhantomData,
549                }) as BoxStream<Out>,
550                StreamCompletion::from_receiver(receiver, None),
551            ))
552        })
553    }
554
555    #[must_use]
556    pub fn from_fn_iter<F, I>(factory: F) -> Self
557    where
558        F: Fn() -> I + Send + Sync + 'static,
559        I: IntoIterator<Item = Out>,
560        I::IntoIter: Send + 'static,
561    {
562        Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
563    }
564}
565
566impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
567    fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
568    where
569        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
570    {
571        Self {
572            factory: Arc::new(FnSourceFactory(factory)),
573            terminal_factory: None,
574            hints,
575            attributes: Attributes::default(),
576            split_hook: None,
577        }
578    }
579
580    pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
581    where
582        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
583    {
584        Self::from_materialized_factory_with_hints(factory, SourceHints::default())
585    }
586
587    pub(crate) fn from_terminal_direct_materialized_factory<F, D>(
588        factory: F,
589        terminal_factory: D,
590    ) -> Self
591    where
592        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
593        D: Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)>
594            + Send
595            + Sync
596            + 'static,
597    {
598        Self {
599            factory: Arc::new(FnSourceFactory(factory)),
600            terminal_factory: Some(Arc::new(terminal_factory)),
601            hints: SourceHints::with_terminal_consumer_batch(),
602            attributes: Attributes::default(),
603            split_hook: None,
604        }
605    }
606
607    #[must_use]
608    pub fn as_source_with_context<Ctx, F>(
609        self,
610        extract_context: F,
611    ) -> SourceWithContext<Out, Ctx, Mat>
612    where
613        Ctx: Send + 'static,
614        F: Fn(&Out) -> Ctx + Send + Sync + 'static,
615    {
616        SourceWithContext::from_source(self.map(move |item| {
617            let context = extract_context(&item);
618            (item, context)
619        }))
620    }
621
622    #[must_use]
623    pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
624    where
625        Next: Send + 'static,
626        FlowMat: Send + 'static,
627    {
628        self.via_mat(flow, Keep::left)
629    }
630
631    #[must_use]
632    pub fn via_mat<Next, FlowMat, Combined, F>(
633        self,
634        flow: Flow<Out, Next, FlowMat>,
635        combine: F,
636    ) -> Source<Next, Combined>
637    where
638        Next: Send + 'static,
639        FlowMat: Send + 'static,
640        Combined: Send + 'static,
641        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
642    {
643        let source = self.factory;
644        let transform = flow.transform;
645        let materialize_flow = flow.materialize;
646        let hints = self.hints.after_flow(flow.hints);
647        let combine = Arc::new(combine);
648        Source::from_materialized_factory_with_hints(
649            move |materializer| {
650                let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
651                let flow_mat = materialize_flow()?;
652                let stream = match &transform {
653                    FlowTransform::Pure(transform) => transform(stream),
654                    FlowTransform::Runtime(transform) => transform(stream, materializer)?,
655                };
656                Ok((stream, combine(source_mat, flow_mat)))
657            },
658            hints,
659        )
660    }
661
662    #[must_use]
663    pub fn via_mat_with<Next, FlowMat, Combined, F>(
664        self,
665        flow: Flow<Out, Next, FlowMat>,
666        combine: F,
667    ) -> Source<Next, Combined>
668    where
669        Next: Send + 'static,
670        FlowMat: Send + 'static,
671        Combined: Send + 'static,
672        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
673    {
674        self.via_mat(flow, combine)
675    }
676
677    #[must_use]
678    pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
679    where
680        Next: Send + 'static,
681        F: Fn(Out) -> Next + Send + Sync + 'static,
682    {
683        let hints = self.hints.without_inline_micro();
684        Source {
685            factory: Arc::new(MapSourceFactory {
686                source: self.factory,
687                stage: f,
688                _marker: PhantomData,
689            }),
690            terminal_factory: None,
691            hints,
692            attributes: self.attributes,
693            split_hook: None,
694        }
695    }
696
697    #[must_use]
698    pub fn attributes(&self) -> &Attributes {
699        &self.attributes
700    }
701
702    #[must_use]
703    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
704        self.attributes = attributes;
705        self
706    }
707
708    #[must_use]
709    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
710        self.attributes = self.attributes.and(attributes);
711        self
712    }
713
714    #[must_use]
715    pub fn named(self, name: impl Into<String>) -> Self {
716        self.add_attributes(Attributes::named(name))
717    }
718
719    /// Insert an async boundary after this source.
720    ///
721    /// The boundary uses the same [`crate::AsyncBoundaryExecutionConfig`] defaults
722    /// as the GraphDSL `AsyncBoundary` runner and hands elements across a bounded
723    /// Ractor-backed queue. `async_boundary` is the Rust-friendly primary name;
724    /// `Source::r#async` is provided as the Akka-mirroring alias.
725    #[must_use]
726    pub fn async_boundary(self) -> Self {
727        self.via(Flow::identity().async_boundary())
728    }
729
730    /// Akka-mirroring alias for [`Source::async_boundary`].
731    #[must_use]
732    pub fn r#async(self) -> Self {
733        self.async_boundary()
734    }
735
736    /// Insert an async boundary with an explicit bounded handoff configuration.
737    ///
738    /// `config.buffer_size` controls the number of elements that may be queued
739    /// between the upstream and downstream fused regions. A zero buffer is
740    /// rejected when the stream is materialized.
741    #[must_use]
742    pub fn async_boundary_with_config(
743        self,
744        config: crate::graph::AsyncBoundaryExecutionConfig,
745    ) -> Self {
746        self.via(Flow::identity().async_boundary_with_config(config))
747    }
748
749    /// Insert an async boundary with a custom bounded handoff size.
750    #[must_use]
751    pub fn async_boundary_with_buffer(self, buffer_size: usize) -> Self {
752        self.via(Flow::identity().async_boundary_with_buffer(buffer_size))
753    }
754
755    #[must_use]
756    pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
757    where
758        Next: Send + 'static,
759        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
760    {
761        self.via(Flow::identity().map_result(f))
762    }
763
764    #[must_use]
765    pub fn map_result_with_supervision<Next, F>(
766        self,
767        f: F,
768        decider: SupervisionDecider,
769    ) -> Source<Next, Mat>
770    where
771        Next: Send + 'static,
772        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
773    {
774        self.via(Flow::identity().map_result_with_supervision(f, decider))
775    }
776
777    #[must_use]
778    pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
779    where
780        F: Fn(&Out) -> bool + Send + Sync + 'static,
781    {
782        self.via(Flow::identity().filter(predicate))
783    }
784
785    #[must_use]
786    pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
787    where
788        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
789    {
790        self.via(Flow::identity().filter_result(predicate))
791    }
792
793    #[must_use]
794    pub fn filter_result_with_supervision<F>(
795        self,
796        predicate: F,
797        decider: SupervisionDecider,
798    ) -> Source<Out, Mat>
799    where
800        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
801    {
802        self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
803    }
804
805    #[must_use]
806    pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
807    where
808        F: Fn(&Out) -> bool + Send + Sync + 'static,
809    {
810        self.via(Flow::identity().filter_not(predicate))
811    }
812
813    #[must_use]
814    pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
815    where
816        Next: Send + 'static,
817        F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
818    {
819        self.via(Flow::identity().filter_map(f))
820    }
821
822    #[must_use]
823    pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
824    where
825        Next: Send + 'static,
826        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
827    {
828        self.via(Flow::identity().filter_map_result(f))
829    }
830
831    #[must_use]
832    pub fn filter_map_result_with_supervision<Next, F>(
833        self,
834        f: F,
835        decider: SupervisionDecider,
836    ) -> Source<Next, Mat>
837    where
838        Next: Send + 'static,
839        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
840    {
841        self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
842    }
843
844    #[must_use]
845    pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
846    where
847        Next: Send + 'static,
848        F: Fn(Out) -> I + Send + Sync + 'static,
849        I: IntoIterator<Item = Next>,
850        I::IntoIter: Send + 'static,
851    {
852        self.via(Flow::identity().map_concat(f))
853    }
854
855    #[must_use]
856    pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
857    where
858        Next: Send + 'static,
859        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
860        I: IntoIterator<Item = Next>,
861        I::IntoIter: Send + 'static,
862    {
863        self.via(Flow::identity().map_concat_result(f))
864    }
865
866    #[must_use]
867    pub fn map_concat_result_with_supervision<Next, F, I>(
868        self,
869        f: F,
870        decider: SupervisionDecider,
871    ) -> Source<Next, Mat>
872    where
873        Next: Send + 'static,
874        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
875        I: IntoIterator<Item = Next>,
876        I::IntoIter: Send + 'static,
877    {
878        self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
879    }
880
881    #[must_use]
882    pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
883    where
884        State: Clone + Send + Sync + 'static,
885        Next: Send + 'static,
886        F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
887    {
888        self.via(Flow::identity().stateful_map(seed, f))
889    }
890
891    #[must_use]
892    pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
893    where
894        State: Clone + Send + Sync + 'static,
895        Next: Send + 'static,
896        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
897    {
898        self.via(Flow::identity().stateful_map_result(seed, f))
899    }
900
901    #[must_use]
902    pub fn stateful_map_result_with_supervision<State, Next, F>(
903        self,
904        seed: State,
905        f: F,
906        decider: SupervisionDecider,
907    ) -> Source<Next, Mat>
908    where
909        State: Clone + Send + Sync + 'static,
910        Next: Send + 'static,
911        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
912    {
913        self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
914    }
915
916    #[must_use]
917    pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
918    where
919        State: Clone + Send + Sync + 'static,
920        Next: Send + 'static,
921        F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
922        I: IntoIterator<Item = Next>,
923        I::IntoIter: Send + 'static,
924    {
925        self.via(Flow::identity().stateful_map_concat(seed, f))
926    }
927
928    #[must_use]
929    pub fn stateful_map_concat_result<State, Next, F, I>(
930        self,
931        seed: State,
932        f: F,
933    ) -> Source<Next, Mat>
934    where
935        State: Clone + Send + Sync + 'static,
936        Next: Send + 'static,
937        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
938        I: IntoIterator<Item = Next>,
939        I::IntoIter: Send + 'static,
940    {
941        self.via(Flow::identity().stateful_map_concat_result(seed, f))
942    }
943
944    #[must_use]
945    pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
946        self,
947        seed: State,
948        f: F,
949        decider: SupervisionDecider,
950    ) -> Source<Next, Mat>
951    where
952        State: Clone + Send + Sync + 'static,
953        Next: Send + 'static,
954        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
955        I: IntoIterator<Item = Next>,
956        I::IntoIter: Send + 'static,
957    {
958        self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
959    }
960
961    #[must_use]
962    pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
963    where
964        Next: Send + 'static,
965        F: Fn(Out) -> Fut + Send + Sync + 'static,
966        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
967    {
968        self.via(Flow::identity().map_async(parallelism, f))
969    }
970
971    #[must_use]
972    pub fn map_async_with_supervision<Next, F, Fut>(
973        self,
974        parallelism: usize,
975        f: F,
976        decider: SupervisionDecider,
977    ) -> Source<Next, Mat>
978    where
979        Next: Send + 'static,
980        F: Fn(Out) -> Fut + Send + Sync + 'static,
981        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
982    {
983        self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
984    }
985
986    #[must_use]
987    pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
988    where
989        Next: Send + 'static,
990        F: Fn(Out) -> Fut + Send + Sync + 'static,
991        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
992    {
993        self.via(Flow::identity().map_async_unordered(parallelism, f))
994    }
995
996    #[must_use]
997    pub fn map_async_unordered_with_supervision<Next, F, Fut>(
998        self,
999        parallelism: usize,
1000        f: F,
1001        decider: SupervisionDecider,
1002    ) -> Source<Next, Mat>
1003    where
1004        Next: Send + 'static,
1005        F: Fn(Out) -> Fut + Send + Sync + 'static,
1006        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1007    {
1008        self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
1009    }
1010
1011    #[must_use]
1012    pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
1013        self,
1014        parallelism: usize,
1015        per_partition: usize,
1016        partition: Partition,
1017        f: F,
1018    ) -> Source<Next, Mat>
1019    where
1020        Key: Clone + Eq + Hash + Send + 'static,
1021        Next: Send + 'static,
1022        Partition: Fn(&Out) -> Key + Send + Sync + 'static,
1023        F: Fn(Out) -> Fut + Send + Sync + 'static,
1024        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1025    {
1026        self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
1027    }
1028
1029    #[must_use]
1030    pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
1031        self.via(Flow::identity().prefix_and_tail(n))
1032    }
1033
1034    #[must_use]
1035    pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
1036    where
1037        Next: Send + 'static,
1038        FlowMat: Send + 'static,
1039        F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
1040        Out: Clone,
1041    {
1042        self.via(Flow::identity().flat_map_prefix(n, f))
1043    }
1044
1045    #[must_use]
1046    pub fn group_by<Key, F>(
1047        self,
1048        max_substreams: usize,
1049        f: F,
1050        allow_closed_substream_recreation: bool,
1051    ) -> Source<Source<Out>, Mat>
1052    where
1053        Key: Clone + Eq + Hash + Send + 'static,
1054        F: Fn(&Out) -> Key + Send + Sync + 'static,
1055        Out: Clone,
1056    {
1057        let batch_mode = if self.hints.inline_micro.is_some() && !allow_closed_substream_recreation
1058        {
1059            flow::GroupByBatchMode::FiniteEagerNoRecreate
1060        } else {
1061            flow::GroupByBatchMode::Immediate
1062        };
1063        self.via(Flow::identity().group_by_with_batching(
1064            max_substreams,
1065            f,
1066            allow_closed_substream_recreation,
1067            batch_mode,
1068        ))
1069    }
1070
1071    #[must_use]
1072    pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1073    where
1074        F: Fn(&Out) -> bool + Send + Sync + 'static,
1075        Out: Clone,
1076    {
1077        self.via(Flow::identity().split_when(predicate))
1078    }
1079
1080    #[must_use]
1081    pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1082    where
1083        F: Fn(&Out) -> bool + Send + Sync + 'static,
1084        Out: Clone,
1085    {
1086        self.via(Flow::identity().split_after(predicate))
1087    }
1088
1089    #[must_use]
1090    pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
1091    where
1092        Next: Send + 'static,
1093        NextMat: Send + 'static,
1094        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1095    {
1096        self.via(Flow::identity().flat_map_concat(f))
1097    }
1098
1099    #[must_use]
1100    pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
1101    where
1102        Next: Send + 'static,
1103        NextMat: Send + 'static,
1104        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1105    {
1106        self.via(Flow::identity().flat_map_merge(breadth, f))
1107    }
1108
1109    #[must_use]
1110    pub fn take(self, n: usize) -> Source<Out, Mat> {
1111        self.via(Flow::identity().take(n))
1112    }
1113
1114    #[must_use]
1115    pub fn drop(self, n: usize) -> Source<Out, Mat> {
1116        self.via(Flow::identity().drop(n))
1117    }
1118
1119    #[must_use]
1120    pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
1121    where
1122        F: Fn(&Out) -> bool + Send + Sync + 'static,
1123    {
1124        self.via(Flow::identity().take_while(predicate))
1125    }
1126
1127    #[must_use]
1128    pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
1129    where
1130        F: Fn(&Out) -> bool + Send + Sync + 'static,
1131    {
1132        self.via(Flow::identity().drop_while(predicate))
1133    }
1134
1135    #[must_use]
1136    pub fn limit(self, max: u64) -> Source<Out, Mat> {
1137        self.via(Flow::identity().limit(max))
1138    }
1139
1140    #[must_use]
1141    pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
1142        self.via(Flow::identity().grouped(size))
1143    }
1144
1145    #[must_use]
1146    pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1147    where
1148        State: Clone + Send + Sync + 'static,
1149        F: Fn(State, Out) -> State + Send + Sync + 'static,
1150    {
1151        self.via(Flow::identity().scan(seed, f))
1152    }
1153
1154    #[must_use]
1155    pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
1156    where
1157        State: Clone + Send + Sync + 'static,
1158        F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1159        Fut: Future<Output = StreamResult<State>> + Send + 'static,
1160    {
1161        self.via(Flow::identity().scan_async(seed, f))
1162    }
1163
1164    #[must_use]
1165    pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1166    where
1167        State: Clone + Send + Sync + 'static,
1168        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1169    {
1170        self.via(Flow::identity().scan_result(seed, f))
1171    }
1172
1173    #[must_use]
1174    pub fn scan_result_with_supervision<State, F>(
1175        self,
1176        seed: State,
1177        f: F,
1178        decider: SupervisionDecider,
1179    ) -> Source<State, Mat>
1180    where
1181        State: Clone + Send + Sync + 'static,
1182        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1183    {
1184        self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
1185    }
1186
1187    #[must_use]
1188    pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
1189    where
1190        Out: Clone,
1191    {
1192        self.via(Flow::identity().sliding(size, step))
1193    }
1194
1195    #[must_use]
1196    pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1197    where
1198        Acc: Clone + Send + Sync + 'static,
1199        F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1200    {
1201        self.via(Flow::identity().fold(zero, f))
1202    }
1203
1204    #[must_use]
1205    pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1206    where
1207        Acc: Clone + Send + Sync + 'static,
1208        F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1209        Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1210    {
1211        self.via(Flow::identity().fold_async(zero, f))
1212    }
1213
1214    #[must_use]
1215    pub fn map_with_resource<Resource, Next, Create, F, Close>(
1216        self,
1217        create: Create,
1218        f: F,
1219        close: Close,
1220    ) -> Source<Next, Mat>
1221    where
1222        Resource: Send + 'static,
1223        Next: Send + 'static,
1224        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1225        F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1226        Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1227    {
1228        self.via(Flow::identity().map_with_resource(create, f, close))
1229    }
1230
1231    #[must_use]
1232    pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1233    where
1234        Acc: Clone + Send + Sync + 'static,
1235        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1236    {
1237        self.via(Flow::identity().fold_result(zero, f))
1238    }
1239
1240    #[must_use]
1241    pub fn fold_result_with_supervision<Acc, F>(
1242        self,
1243        zero: Acc,
1244        f: F,
1245        decider: SupervisionDecider,
1246    ) -> Source<Acc, Mat>
1247    where
1248        Acc: Clone + Send + Sync + 'static,
1249        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1250    {
1251        self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
1252    }
1253
1254    #[must_use]
1255    pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
1256    where
1257        F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1258    {
1259        self.via(Flow::identity().reduce(f))
1260    }
1261
1262    #[must_use]
1263    pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
1264    where
1265        Out: Clone,
1266        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1267    {
1268        self.via(Flow::identity().reduce_result(f))
1269    }
1270
1271    #[must_use]
1272    pub fn reduce_result_with_supervision<F>(
1273        self,
1274        f: F,
1275        decider: SupervisionDecider,
1276    ) -> Source<Out, Mat>
1277    where
1278        Out: Clone,
1279        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1280    {
1281        self.via(Flow::identity().reduce_result_with_supervision(f, decider))
1282    }
1283
1284    #[must_use]
1285    pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
1286    where
1287        F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
1288    {
1289        self.via(Flow::identity().map_error(f))
1290    }
1291
1292    #[must_use]
1293    pub fn recover<F>(self, f: F) -> Source<Out, Mat>
1294    where
1295        F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
1296    {
1297        self.via(Flow::identity().recover(f))
1298    }
1299
1300    #[must_use]
1301    pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
1302    where
1303        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1304    {
1305        self.via(Flow::identity().recover_with(f))
1306    }
1307
1308    #[must_use]
1309    pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
1310    where
1311        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1312    {
1313        self.via(Flow::identity().recover_with_retries(retries, f))
1314    }
1315
1316    #[must_use]
1317    pub fn on_error_complete(self) -> Source<Out, Mat> {
1318        self.via(Flow::identity().on_error_complete())
1319    }
1320
1321    #[must_use]
1322    pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1323    where
1324        Mat2: Send + 'static,
1325    {
1326        let factory = self.factory;
1327        let hints = self.hints;
1328        let that_factory = that.factory;
1329        Source::from_materialized_factory_with_hints(
1330            move |materializer| {
1331                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1332                let secondary = match Arc::clone(&that_factory).create(materializer) {
1333                    Ok((stream, _)) => stream,
1334                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1335                };
1336                Ok((concat_source_streams(vec![primary, secondary]), mat))
1337            },
1338            hints,
1339        )
1340    }
1341
1342    #[must_use]
1343    pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1344    where
1345        Mat2: Send + 'static,
1346    {
1347        let factory = self.factory;
1348        let hints = self.hints;
1349        let that_factory = that.factory;
1350        Source::from_materialized_factory_with_hints(
1351            move |materializer| {
1352                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1353                Ok((
1354                    concat_source_streams_lazy(
1355                        primary,
1356                        vec![Arc::clone(&that_factory)],
1357                        materializer,
1358                    ),
1359                    mat,
1360                ))
1361            },
1362            hints,
1363        )
1364    }
1365
1366    #[must_use]
1367    pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
1368    where
1369        Mat2: Send + 'static,
1370        I: IntoIterator<Item = Source<Out, Mat2>>,
1371    {
1372        let factory = self.factory;
1373        let hints = self.hints;
1374        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1375        Source::from_materialized_factory_with_hints(
1376            move |materializer| {
1377                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1378                Ok((
1379                    concat_source_streams_lazy(primary, other_factories.clone(), materializer),
1380                    mat,
1381                ))
1382            },
1383            hints,
1384        )
1385    }
1386
1387    #[must_use]
1388    pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1389    where
1390        Mat2: Send + 'static,
1391    {
1392        let factory = self.factory;
1393        let hints = self.hints;
1394        let that_factory = that.factory;
1395        Source::from_materialized_factory_with_hints(
1396            move |materializer| {
1397                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1398                let secondary = match Arc::clone(&that_factory).create(materializer) {
1399                    Ok((stream, _)) => stream,
1400                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1401                };
1402                Ok((concat_source_streams(vec![secondary, primary]), mat))
1403            },
1404            hints,
1405        )
1406    }
1407
1408    #[must_use]
1409    pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1410    where
1411        Mat2: Send + 'static,
1412    {
1413        self.prepend(that)
1414    }
1415
1416    #[must_use]
1417    pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
1418    where
1419        Mat2: Send + 'static,
1420    {
1421        let factory = self.factory;
1422        let hints = self.hints;
1423        let secondary_factory = secondary.factory;
1424        Source::from_materialized_factory_with_hints(
1425            move |materializer| {
1426                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1427                let secondary = match Arc::clone(&secondary_factory).create(materializer) {
1428                    Ok((stream, _)) => stream,
1429                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1430                };
1431                Ok((or_else_source_stream(primary, secondary), mat))
1432            },
1433            hints,
1434        )
1435    }
1436
1437    #[must_use]
1438    pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
1439    where
1440        Mat2: Send + 'static,
1441    {
1442        self.interleave_all([that], segment_size, false)
1443    }
1444
1445    #[must_use]
1446    pub fn interleave_all<Mat2, I>(
1447        self,
1448        those: I,
1449        segment_size: usize,
1450        eager_close: bool,
1451    ) -> Source<Out, Mat>
1452    where
1453        Mat2: Send + 'static,
1454        I: IntoIterator<Item = Source<Out, Mat2>>,
1455    {
1456        let factory = self.factory;
1457        let hints = self.hints;
1458        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1459        Source::from_materialized_factory_with_hints(
1460            move |materializer| {
1461                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1462                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1463                streams.push(primary);
1464                for other in &other_factories {
1465                    let stream = match Arc::clone(other).create(materializer) {
1466                        Ok((stream, _)) => stream,
1467                        Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1468                    };
1469                    streams.push(stream);
1470                }
1471                Ok((
1472                    interleave_source_streams(streams, segment_size, eager_close),
1473                    mat,
1474                ))
1475            },
1476            hints,
1477        )
1478    }
1479
1480    #[must_use]
1481    pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1482    where
1483        Out: Ord,
1484        Mat2: Send + 'static,
1485    {
1486        self.via(Flow::identity().merge_sorted(that))
1487    }
1488
1489    #[must_use]
1490    pub fn merge_latest<Mat2>(
1491        self,
1492        that: Source<Out, Mat2>,
1493        eager_complete: bool,
1494    ) -> Source<Vec<Out>, Mat>
1495    where
1496        Out: Clone,
1497        Mat2: Send + 'static,
1498    {
1499        let factory = self.factory;
1500        let hints = self.hints;
1501        let that_factory = that.factory;
1502        Source::from_materialized_factory_with_hints(
1503            move |materializer| {
1504                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1505                let right = match Arc::clone(&that_factory).create(materializer) {
1506                    Ok((stream, _)) => stream,
1507                    Err(error) => {
1508                        return Ok((
1509                            Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
1510                            mat,
1511                        ));
1512                    }
1513                };
1514                Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
1515            },
1516            hints,
1517        )
1518    }
1519
1520    #[must_use]
1521    pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
1522    where
1523        Mat2: Send + 'static,
1524        I: IntoIterator<Item = Source<Out, Mat2>>,
1525    {
1526        let factory = self.factory;
1527        let hints = self.hints;
1528        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1529        Source::from_materialized_factory_with_hints(
1530            move |materializer| {
1531                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1532                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1533                streams.push(primary);
1534                for other in &other_factories {
1535                    let stream = match Arc::clone(other).create(materializer) {
1536                        Ok((stream, _)) => stream,
1537                        Err(error) => {
1538                            return Ok((
1539                                Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1540                                mat,
1541                            ));
1542                        }
1543                    };
1544                    streams.push(stream);
1545                }
1546                Ok((merge_streams(streams, eager_complete), mat))
1547            },
1548            hints,
1549        )
1550    }
1551
1552    #[must_use]
1553    pub fn zip_with<Mat2, Out2, Next, F>(
1554        self,
1555        that: Source<Out2, Mat2>,
1556        combine: F,
1557    ) -> Source<Next, Mat>
1558    where
1559        Out2: Send + 'static,
1560        Next: Send + 'static,
1561        Mat2: Send + 'static,
1562        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1563    {
1564        self.via(Flow::identity().zip_with(that, combine))
1565    }
1566
1567    #[must_use]
1568    pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
1569    where
1570        Out: Clone,
1571        Out2: Clone + Send + 'static,
1572        Mat2: Send + 'static,
1573    {
1574        self.zip_latest_with(that, true, |left, right| (left, right))
1575    }
1576
1577    #[must_use]
1578    pub fn zip_latest_with<Mat2, Out2, Next, F>(
1579        self,
1580        that: Source<Out2, Mat2>,
1581        eager_complete: bool,
1582        combine: F,
1583    ) -> Source<Next, Mat>
1584    where
1585        Out: Clone,
1586        Out2: Clone + Send + 'static,
1587        Next: Send + 'static,
1588        Mat2: Send + 'static,
1589        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1590    {
1591        let factory = self.factory;
1592        let hints = self.hints;
1593        let that_factory = that.factory;
1594        let combine = Arc::new(combine);
1595        Source::from_materialized_factory_with_hints(
1596            move |materializer| {
1597                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1598                let right = match Arc::clone(&that_factory).create(materializer) {
1599                    Ok((stream, _)) => stream,
1600                    Err(error) => {
1601                        return Ok((
1602                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1603                            mat,
1604                        ));
1605                    }
1606                };
1607                Ok((
1608                    zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
1609                    mat,
1610                ))
1611            },
1612            hints,
1613        )
1614    }
1615
1616    #[must_use]
1617    pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
1618        let factory = self.factory;
1619        let hints = self.hints;
1620        Source::from_materialized_factory_with_hints(
1621            move |materializer| {
1622                let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
1623                let mut index = 0_u64;
1624                Ok((
1625                    Box::new(std::iter::from_fn(move || {
1626                        stream.next().map(|item| {
1627                            item.map(|value| {
1628                                let pair = (value, index);
1629                                index = index.wrapping_add(1);
1630                                pair
1631                            })
1632                        })
1633                    })) as BoxStream<(Out, u64)>,
1634                    mat,
1635                ))
1636            },
1637            hints,
1638        )
1639    }
1640
1641    #[must_use]
1642    pub fn zip_all<Mat2, Out2>(
1643        self,
1644        that: Source<Out2, Mat2>,
1645        this_elem: Out,
1646        that_elem: Out2,
1647    ) -> Source<(Out, Out2), Mat>
1648    where
1649        Out: Clone + Sync,
1650        Out2: Clone + Send + Sync + 'static,
1651        Mat2: Send + 'static,
1652    {
1653        let factory = self.factory;
1654        let hints = self.hints;
1655        let that_factory = that.factory;
1656        Source::from_materialized_factory_with_hints(
1657            move |materializer| {
1658                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1659                let right = match Arc::clone(&that_factory).create(materializer) {
1660                    Ok((stream, _)) => stream,
1661                    Err(error) => {
1662                        return Ok((
1663                            Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
1664                            mat,
1665                        ));
1666                    }
1667                };
1668                Ok((
1669                    zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
1670                    mat,
1671                ))
1672            },
1673            hints,
1674        )
1675    }
1676
1677    #[must_use]
1678    pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1679    where
1680        Out: Clone,
1681        SinkMat: Send + 'static,
1682    {
1683        self.via(Flow::identity().also_to(sink))
1684    }
1685
1686    #[must_use]
1687    pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
1688    where
1689        Out: Clone,
1690        SinkMat: Send + 'static,
1691        I: IntoIterator<Item = Sink<Out, SinkMat>>,
1692    {
1693        self.via(Flow::identity().also_to_all(sinks))
1694    }
1695
1696    #[must_use]
1697    pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
1698    where
1699        SinkMat: Send + 'static,
1700        F: Fn(&Out) -> bool + Send + Sync + 'static,
1701    {
1702        self.via(Flow::identity().divert_to(sink, predicate))
1703    }
1704
1705    #[must_use]
1706    pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1707    where
1708        Out: Clone,
1709        SinkMat: Send + 'static,
1710    {
1711        self.via(Flow::identity().wire_tap(sink))
1712    }
1713
1714    pub fn run_with<SinkMat: Send + 'static>(
1715        self,
1716        sink: Sink<Out, SinkMat>,
1717    ) -> StreamResult<SinkMat> {
1718        // Split-segment fast path: if source has a segment hook and sink has a
1719        // fold fast-path descriptor, drive the fold inline without spawning a task.
1720        let fast_result = self
1721            .split_hook
1722            .as_ref()
1723            .zip(sink.fold_fp.as_deref())
1724            .and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
1725        if let Some(result) = fast_result {
1726            return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
1727                StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
1728            });
1729        }
1730        self.to_mat(sink, Keep::right).run()
1731    }
1732
1733    pub fn run_with_materializer<SinkMat: Send + 'static>(
1734        self,
1735        sink: Sink<Out, SinkMat>,
1736        materializer: &Materializer,
1737    ) -> StreamResult<SinkMat> {
1738        self.to_mat(sink, Keep::right)
1739            .run_with_materializer(materializer)
1740    }
1741
1742    #[must_use]
1743    pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
1744    where
1745        SinkMat: Send + 'static,
1746    {
1747        self.to_mat(sink, Keep::left)
1748    }
1749
1750    #[must_use]
1751    pub fn to_mat<SinkMat, Combined, F>(
1752        self,
1753        sink: Sink<Out, SinkMat>,
1754        combine: F,
1755    ) -> RunnableGraph<Combined>
1756    where
1757        SinkMat: Send + 'static,
1758        Combined: Send + 'static,
1759        F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
1760    {
1761        let factory = self.factory;
1762        let terminal_factory = self.terminal_factory;
1763        let hints = self.hints;
1764        let combine = Arc::new(combine);
1765        RunnableGraph::from_runner(move |materializer| {
1766            if let Some(terminal_factory) = &terminal_factory
1767                && let Some(fold_fp) = sink.fold_fp.as_deref()
1768                && fold_fp.supports_terminal_drain()
1769            {
1770                let (hook, source_mat) = terminal_factory(materializer)?;
1771                let sink_mat = fold_fp
1772                    .try_register_terminal_drain(hook, materializer)
1773                    .expect("terminal drain support advertised")
1774                    .and_then(|mat| {
1775                        mat.downcast::<SinkMat>().map(|boxed| *boxed).map_err(|_| {
1776                            StreamError::Failed(
1777                                "terminal fast path: unexpected mat type (internal error)".into(),
1778                            )
1779                        })
1780                    })?;
1781                return Ok(combine(source_mat, sink_mat));
1782            }
1783            let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1784            let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
1785                let stream =
1786                    runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
1787                sink.run_inline(stream, materializer)?
1788            } else {
1789                sink.run_from_source(stream, materializer, hints.runtime())?
1790            };
1791            Ok(combine(source_mat, sink_mat))
1792        })
1793    }
1794
1795    pub fn run_collect(self) -> StreamResult<Vec<Out>> {
1796        self.run_with(Sink::collect())?.wait()
1797    }
1798
1799    #[must_use]
1800    pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
1801    where
1802        NextMat: Send + 'static,
1803        F: Fn(Mat) -> NextMat + Send + Sync + 'static,
1804    {
1805        let factory = self.factory;
1806        let terminal_factory = self.terminal_factory;
1807        let hints = self.hints;
1808        let f = Arc::new(f);
1809        let factory_f = Arc::clone(&f);
1810        let mapped_terminal_factory = terminal_factory.map(|terminal_factory| {
1811            let f = Arc::clone(&f);
1812            Arc::new(move |materializer: &Materializer| {
1813                let (hook, mat) = terminal_factory(materializer)?;
1814                Ok((hook, f(mat)))
1815            }) as Arc<TerminalSourceFactory<Out, NextMat>>
1816        });
1817        Source {
1818            factory: Arc::new(FnSourceFactory(move |materializer: &Materializer| {
1819                let (stream, mat) = Arc::clone(&factory).create(materializer)?;
1820                Ok((stream, factory_f(mat)))
1821            })),
1822            terminal_factory: mapped_terminal_factory,
1823            hints,
1824            attributes: Attributes::default(),
1825            split_hook: None,
1826        }
1827    }
1828}
1829
1830impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
1831    #[must_use]
1832    pub fn combine<Mat1, Mat2, MatRest, I>(
1833        first: Source<Out, Mat1>,
1834        second: Source<Out, Mat2>,
1835        rest: I,
1836        strategy: SourceCombineStrategy,
1837    ) -> Source<Out, NotUsed>
1838    where
1839        Mat1: Send + 'static,
1840        Mat2: Send + 'static,
1841        MatRest: Send + 'static,
1842        I: IntoIterator<Item = Source<Out, MatRest>>,
1843    {
1844        let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
1845            Arc::new(move |materializer| {
1846                Arc::clone(&first.factory)
1847                    .create(materializer)
1848                    .map(|(stream, _)| stream)
1849            }),
1850            Arc::new(move |materializer| {
1851                Arc::clone(&second.factory)
1852                    .create(materializer)
1853                    .map(|(stream, _)| stream)
1854            }),
1855        ];
1856        factories.extend(rest.into_iter().map(|source| {
1857            Arc::new(move |materializer: &Materializer| {
1858                Arc::clone(&source.factory)
1859                    .create(materializer)
1860                    .map(|(stream, _)| stream)
1861            }) as Arc<CombinedSourceFactory<Out>>
1862        }));
1863        Source::from_materialized_factory(move |materializer| {
1864            let mut streams = Vec::with_capacity(factories.len());
1865            for factory in &factories {
1866                let stream = match factory(materializer) {
1867                    Ok(stream) => stream,
1868                    Err(error) => {
1869                        return Ok((
1870                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1871                            NotUsed,
1872                        ));
1873                    }
1874                };
1875                streams.push(stream);
1876            }
1877            let stream = match &strategy {
1878                SourceCombineStrategy::Merge { eager_complete } => {
1879                    merge_streams(streams, *eager_complete)
1880                }
1881                SourceCombineStrategy::Concat => concat_source_streams(streams),
1882                SourceCombineStrategy::Prioritized {
1883                    priorities,
1884                    eager_complete,
1885                } => {
1886                    if priorities.len() != streams.len() {
1887                        return Err(StreamError::GraphValidation(format!(
1888                            "combine priorities length {} did not match source count {}",
1889                            priorities.len(),
1890                            streams.len()
1891                        )));
1892                    }
1893                    merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
1894                }
1895            };
1896            Ok((stream, NotUsed))
1897        })
1898    }
1899
1900    #[must_use]
1901    pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
1902    where
1903        I: IntoIterator<Item = Source<Out, Mat2>>,
1904        Mat2: Send + 'static,
1905        Out: Clone,
1906    {
1907        Self::zip_with_n(sources, |values| values)
1908    }
1909
1910    #[must_use]
1911    pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
1912    where
1913        I: IntoIterator<Item = Source<Out, Mat2>>,
1914        Mat2: Send + 'static,
1915        Next: Send + 'static,
1916        F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
1917    {
1918        let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
1919        let zipper = Arc::new(zipper);
1920        Source::from_materialized_factory(move |materializer| {
1921            let mut streams = Vec::with_capacity(factories.len());
1922            for factory in &factories {
1923                let stream = match Arc::clone(factory).create(materializer) {
1924                    Ok((stream, _)) => stream,
1925                    Err(error) => {
1926                        return Ok((
1927                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1928                            NotUsed,
1929                        ));
1930                    }
1931                };
1932                streams.push(stream);
1933            }
1934            Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
1935        })
1936    }
1937
1938    #[must_use]
1939    pub fn merge_prioritized_n<Mat2, I>(
1940        sources_and_priorities: I,
1941        eager_complete: bool,
1942    ) -> Source<Out, NotUsed>
1943    where
1944        I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
1945        Mat2: Send + 'static,
1946    {
1947        let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
1948        if sources_and_priorities.is_empty() {
1949            return Source::empty();
1950        }
1951        let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
1952            .into_iter()
1953            .map(|(source, priority)| (source.factory, priority))
1954            .unzip();
1955        Source::from_materialized_factory(move |materializer| {
1956            let mut streams = Vec::with_capacity(factories.len());
1957            for factory in &factories {
1958                let stream = match Arc::clone(factory).create(materializer) {
1959                    Ok((stream, _)) => stream,
1960                    Err(error) => {
1961                        return Ok((
1962                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1963                            NotUsed,
1964                        ));
1965                    }
1966                };
1967                streams.push(stream);
1968            }
1969            Ok((
1970                merge_prioritized_streams(streams, priorities.clone(), eager_complete),
1971                NotUsed,
1972            ))
1973        })
1974    }
1975
1976    #[must_use]
1977    pub fn maybe() -> (MaybeHandle<Out>, Self) {
1978        let value = Arc::new(Mutex::new(None));
1979        let handle = MaybeHandle {
1980            value: Arc::clone(&value),
1981        };
1982        let source = Self::from_factory(move || {
1983            let result = value
1984                .lock()
1985                .expect("maybe source poisoned")
1986                .clone()
1987                .unwrap_or(Err(StreamError::MaybeIncomplete));
1988            Box::new(std::iter::once(result))
1989        });
1990        (handle, source)
1991    }
1992
1993    #[must_use]
1994    pub fn single(item: Out) -> Self {
1995        Self::from_factory_with_hints(
1996            move || Box::new(std::iter::once(Ok(item.clone()))),
1997            SourceHints::with_inline_micro(1),
1998        )
1999    }
2000
2001    #[must_use]
2002    pub fn repeat(item: Out) -> Self {
2003        Self::from_factory(move || {
2004            let item = item.clone();
2005            Box::new(std::iter::repeat_with(move || Ok(item.clone())))
2006        })
2007    }
2008
2009    #[must_use]
2010    pub fn from_iterable<I>(items: I) -> Self
2011    where
2012        I: IntoIterator<Item = Out>,
2013    {
2014        items.into_iter().collect()
2015    }
2016}
2017
2018impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
2019    fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
2020        // Share the backing storage across materializations and clone elements
2021        // lazily on demand, instead of cloning the whole container each run.
2022        let items: Arc<[Out]> = iter.into_iter().collect();
2023        let len = items.len();
2024        Self::from_factory_with_hints(
2025            move || {
2026                let items = Arc::clone(&items);
2027                let mut index = 0;
2028                Box::new(std::iter::from_fn(move || {
2029                    let item = items.get(index)?.clone();
2030                    index += 1;
2031                    Some(Ok(item))
2032                }))
2033            },
2034            // Exact success item count: enables the coordinator inline drain path.
2035            SourceHints::with_inline_micro(len),
2036        )
2037    }
2038}
2039
2040/// Test-only helper: create a Source whose factory calls the given closure but
2041/// marks it as inline-micro eligible. Used to test the inline drain path with
2042/// custom (possibly blocking) iterators without broadening production
2043/// eligibility.
2044#[cfg(test)]
2045pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
2046    factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
2047    max_success_items: usize,
2048) -> Source<Out, NotUsed> {
2049    Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
2050}
2051
2052struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
2053where
2054    Create: Fn() -> StreamResult<Resource>,
2055    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2056    Close: Fn(Resource) -> StreamResult<()>,
2057{
2058    create: Arc<Create>,
2059    read: Arc<Read>,
2060    close: Arc<Close>,
2061    resource: Option<Resource>,
2062    created: bool,
2063    terminated: bool,
2064    _marker: PhantomData<fn() -> Out>,
2065}
2066
2067impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
2068where
2069    Create: Fn() -> StreamResult<Resource>,
2070    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2071    Close: Fn(Resource) -> StreamResult<()>,
2072{
2073    fn ensure_created(&mut self) -> StreamResult<()> {
2074        if self.created {
2075            return Ok(());
2076        }
2077        self.created = true;
2078        let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
2079            .and_then(|result| result)?;
2080        self.resource = Some(resource);
2081        Ok(())
2082    }
2083
2084    fn close_resource(&mut self) -> StreamResult<()> {
2085        match self.resource.take() {
2086            Some(resource) => {
2087                catch_unwind_failed("unfold_resource close", || (self.close)(resource))
2088                    .and_then(|result| result)
2089            }
2090            None => Ok(()),
2091        }
2092    }
2093}
2094
2095impl<Resource, Out, Create, Read, Close> Iterator
2096    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2097where
2098    Create: Fn() -> StreamResult<Resource>,
2099    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2100    Close: Fn(Resource) -> StreamResult<()>,
2101{
2102    type Item = StreamResult<Out>;
2103
2104    fn next(&mut self) -> Option<Self::Item> {
2105        if self.terminated {
2106            return None;
2107        }
2108        if let Err(error) = self.ensure_created() {
2109            self.terminated = true;
2110            return Some(Err(error));
2111        }
2112
2113        let result = {
2114            let resource = self
2115                .resource
2116                .as_mut()
2117                .expect("unfold_resource resource is open");
2118            catch_unwind_failed("unfold_resource read", || (self.read)(resource))
2119                .and_then(|result| result)
2120        };
2121
2122        match result {
2123            Ok(Some(item)) => Some(Ok(item)),
2124            Ok(None) => {
2125                self.terminated = true;
2126                match self.close_resource() {
2127                    Ok(()) => None,
2128                    Err(error) => Some(Err(error)),
2129                }
2130            }
2131            Err(read_error) => {
2132                self.terminated = true;
2133                let _ = self.close_resource();
2134                Some(Err(read_error))
2135            }
2136        }
2137    }
2138}
2139
2140impl<Resource, Out, Create, Read, Close> Drop
2141    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2142where
2143    Create: Fn() -> StreamResult<Resource>,
2144    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2145    Close: Fn(Resource) -> StreamResult<()>,
2146{
2147    fn drop(&mut self) {
2148        let _ = self.close_resource();
2149    }
2150}
2151
2152type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
2153    fn() -> (Out, CreateFut, ReadFut, CloseFut);
2154
2155struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2156where
2157    Resource: Send + 'static,
2158    Create: Fn() -> CreateFut,
2159    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2160    Read: Fn(&mut Resource) -> ReadFut,
2161    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2162    Close: Fn(Resource) -> CloseFut,
2163    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2164{
2165    create: Arc<Create>,
2166    read: Arc<Read>,
2167    close: Arc<Close>,
2168    resource: Option<Resource>,
2169    created: bool,
2170    terminated: bool,
2171    _marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
2172}
2173
2174impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2175    UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2176where
2177    Resource: Send + 'static,
2178    Create: Fn() -> CreateFut,
2179    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2180    Read: Fn(&mut Resource) -> ReadFut,
2181    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2182    Close: Fn(Resource) -> CloseFut,
2183    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2184{
2185    fn ensure_created(&mut self) -> StreamResult<()> {
2186        if self.created {
2187            return Ok(());
2188        }
2189        self.created = true;
2190        let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
2191            .and_then(flow::run_future_inline_or_spawn)?;
2192        self.resource = Some(resource);
2193        Ok(())
2194    }
2195
2196    fn close_resource(&mut self) -> StreamResult<()> {
2197        match self.resource.take() {
2198            Some(resource) => {
2199                catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
2200                    .and_then(flow::run_future_inline_or_spawn)
2201            }
2202            None => Ok(()),
2203        }
2204    }
2205}
2206
2207impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
2208    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2209where
2210    Resource: Send + 'static,
2211    Out: Send + 'static,
2212    Create: Fn() -> CreateFut,
2213    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2214    Read: Fn(&mut Resource) -> ReadFut,
2215    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2216    Close: Fn(Resource) -> CloseFut,
2217    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2218{
2219    type Item = StreamResult<Out>;
2220
2221    fn next(&mut self) -> Option<Self::Item> {
2222        if self.terminated {
2223            return None;
2224        }
2225        if let Err(error) = self.ensure_created() {
2226            self.terminated = true;
2227            return Some(Err(error));
2228        }
2229
2230        let result = {
2231            let resource = self
2232                .resource
2233                .as_mut()
2234                .expect("unfold_resource_async resource is open");
2235            catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
2236                .and_then(flow::run_future_inline_or_spawn)
2237        };
2238
2239        match result {
2240            Ok(Some(item)) => Some(Ok(item)),
2241            Ok(None) => {
2242                self.terminated = true;
2243                match self.close_resource() {
2244                    Ok(()) => None,
2245                    Err(error) => Some(Err(error)),
2246                }
2247            }
2248            Err(read_error) => {
2249                self.terminated = true;
2250                let _ = self.close_resource();
2251                Some(Err(read_error))
2252            }
2253        }
2254    }
2255}
2256
2257impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
2258    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2259where
2260    Resource: Send + 'static,
2261    Create: Fn() -> CreateFut,
2262    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2263    Read: Fn(&mut Resource) -> ReadFut,
2264    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2265    Close: Fn(Resource) -> CloseFut,
2266    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2267{
2268    fn drop(&mut self) {
2269        let _ = self.close_resource();
2270    }
2271}
2272
2273struct LazySourceStream<Out, InnerMat, F> {
2274    create: Arc<F>,
2275    materializer: Materializer,
2276    current: Option<BoxStream<Out>>,
2277    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2278    initialized: bool,
2279    terminated: bool,
2280}
2281
2282impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
2283where
2284    Out: Send + 'static,
2285    InnerMat: Send + 'static,
2286    F: Fn() -> Source<Out, InnerMat>,
2287{
2288    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2289        if let Some(sender) = self.mat_sender.take() {
2290            let _ = sender.send(result);
2291        }
2292    }
2293
2294    fn initialize(&mut self) -> StreamResult<()> {
2295        if self.initialized {
2296            return Ok(());
2297        }
2298        self.initialized = true;
2299        let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
2300            Ok(source) => source,
2301            Err(error) => {
2302                self.complete_mat(Err(error.clone()));
2303                return Err(error);
2304            }
2305        };
2306        match Arc::clone(&source.factory).create(&self.materializer) {
2307            Ok((stream, mat)) => {
2308                self.current = Some(stream);
2309                self.complete_mat(Ok(mat));
2310                Ok(())
2311            }
2312            Err(error) => {
2313                self.complete_mat(Err(error.clone()));
2314                Err(error)
2315            }
2316        }
2317    }
2318}
2319
2320impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
2321where
2322    Out: Send + 'static,
2323    InnerMat: Send + 'static,
2324    F: Fn() -> Source<Out, InnerMat>,
2325{
2326    type Item = StreamResult<Out>;
2327
2328    fn next(&mut self) -> Option<Self::Item> {
2329        if self.terminated {
2330            return None;
2331        }
2332        if let Err(error) = self.initialize() {
2333            self.terminated = true;
2334            return Some(Err(error));
2335        }
2336        match self
2337            .current
2338            .as_mut()
2339            .expect("lazy_source current stream initialized")
2340            .next()
2341        {
2342            Some(Ok(item)) => Some(Ok(item)),
2343            Some(Err(error)) => {
2344                self.terminated = true;
2345                Some(Err(error))
2346            }
2347            None => {
2348                self.terminated = true;
2349                None
2350            }
2351        }
2352    }
2353}
2354
2355impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
2356    fn drop(&mut self) {
2357        if !self.initialized
2358            && let Some(sender) = self.mat_sender.take()
2359        {
2360            let _ = sender.send(Err(StreamError::Failed(
2361                "lazy source was never materialized".into(),
2362            )));
2363        }
2364    }
2365}
2366
2367struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2368    create: Arc<F>,
2369    materializer: Materializer,
2370    current: Option<BoxStream<Out>>,
2371    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2372    initialized: bool,
2373    terminated: bool,
2374    _marker: PhantomData<fn() -> Fut>,
2375}
2376
2377impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
2378where
2379    Out: Send + 'static,
2380    InnerMat: Send + 'static,
2381    F: Fn() -> Fut,
2382    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2383{
2384    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2385        if let Some(sender) = self.mat_sender.take() {
2386            let _ = sender.send(result);
2387        }
2388    }
2389
2390    fn initialize(&mut self) -> StreamResult<()> {
2391        if self.initialized {
2392            return Ok(());
2393        }
2394        self.initialized = true;
2395        let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
2396            .and_then(flow::run_future_inline_or_spawn)
2397        {
2398            Ok(source) => source,
2399            Err(error) => {
2400                self.complete_mat(Err(error.clone()));
2401                return Err(error);
2402            }
2403        };
2404        match Arc::clone(&source.factory).create(&self.materializer) {
2405            Ok((stream, mat)) => {
2406                self.current = Some(stream);
2407                self.complete_mat(Ok(mat));
2408                Ok(())
2409            }
2410            Err(error) => {
2411                self.complete_mat(Err(error.clone()));
2412                Err(error)
2413            }
2414        }
2415    }
2416}
2417
2418impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
2419where
2420    Out: Send + 'static,
2421    InnerMat: Send + 'static,
2422    F: Fn() -> Fut,
2423    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2424{
2425    type Item = StreamResult<Out>;
2426
2427    fn next(&mut self) -> Option<Self::Item> {
2428        if self.terminated {
2429            return None;
2430        }
2431        if let Err(error) = self.initialize() {
2432            self.terminated = true;
2433            return Some(Err(error));
2434        }
2435        match self
2436            .current
2437            .as_mut()
2438            .expect("lazy_future_source current stream initialized")
2439            .next()
2440        {
2441            Some(Ok(item)) => Some(Ok(item)),
2442            Some(Err(error)) => {
2443                self.terminated = true;
2444                Some(Err(error))
2445            }
2446            None => {
2447                self.terminated = true;
2448                None
2449            }
2450        }
2451    }
2452}
2453
2454impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2455    fn drop(&mut self) {
2456        if !self.initialized
2457            && let Some(sender) = self.mat_sender.take()
2458        {
2459            let _ = sender.send(Err(StreamError::Failed(
2460                "lazy future source was never materialized".into(),
2461            )));
2462        }
2463    }
2464}
2465
2466fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
2467where
2468    Out: Send + 'static,
2469{
2470    let mut streams: VecDeque<_> = streams.into();
2471    let mut current = streams.pop_front();
2472    Box::new(std::iter::from_fn(move || {
2473        loop {
2474            match current.as_mut() {
2475                Some(stream) => match stream.next() {
2476                    Some(item) => return Some(item),
2477                    None => current = streams.pop_front(),
2478                },
2479                None => return None,
2480            }
2481        }
2482    }))
2483}
2484
2485fn concat_source_streams_lazy<Out, Mat>(
2486    initial: BoxStream<Out>,
2487    factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
2488    materializer: &Materializer,
2489) -> BoxStream<Out>
2490where
2491    Out: Send + 'static,
2492    Mat: Send + 'static,
2493{
2494    let mut current = Some(initial);
2495    let mut remaining: VecDeque<_> = factories.into();
2496    let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
2497    Box::new(std::iter::from_fn(move || {
2498        loop {
2499            match current.as_mut() {
2500                Some(stream) => match stream.next() {
2501                    Some(item) => return Some(item),
2502                    None => {
2503                        current = remaining.pop_front().map(|factory| {
2504                            match factory.create(&materializer) {
2505                                Ok((stream, _)) => stream,
2506                                Err(error) => {
2507                                    Box::new(std::iter::once(Err(error))) as BoxStream<Out>
2508                                }
2509                            }
2510                        });
2511                    }
2512                },
2513                None => return None,
2514            }
2515        }
2516    }))
2517}
2518
2519fn or_else_source_stream<Out>(
2520    mut primary: BoxStream<Out>,
2521    mut secondary: BoxStream<Out>,
2522) -> BoxStream<Out>
2523where
2524    Out: Send + 'static,
2525{
2526    let mut primary_emitted = false;
2527    let mut using_secondary = false;
2528    Box::new(std::iter::from_fn(move || {
2529        loop {
2530            if using_secondary {
2531                return secondary.next();
2532            }
2533
2534            match primary.next() {
2535                Some(Ok(item)) => {
2536                    primary_emitted = true;
2537                    return Some(Ok(item));
2538                }
2539                Some(Err(error)) => return Some(Err(error)),
2540                None if primary_emitted => return None,
2541                None => using_secondary = true,
2542            }
2543        }
2544    }))
2545}
2546
2547fn interleave_source_streams<Out>(
2548    streams: Vec<BoxStream<Out>>,
2549    segment_size: usize,
2550    eager_close: bool,
2551) -> BoxStream<Out>
2552where
2553    Out: Send + 'static,
2554{
2555    if segment_size == 0 {
2556        return Box::new(std::iter::once(Err(StreamError::GraphValidation(
2557            "interleave segment size must be greater than zero".into(),
2558        ))));
2559    }
2560
2561    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
2562    let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
2563    let mut current = 0usize;
2564    let mut emitted = 0usize;
2565    Box::new(std::iter::from_fn(move || {
2566        loop {
2567            if streams.iter().all(Option::is_none) {
2568                return None;
2569            }
2570            if streams[current].is_none() {
2571                match next_active_source_stream(&streams, current) {
2572                    Some(next) => {
2573                        current = next;
2574                        emitted = 0;
2575                    }
2576                    None => return None,
2577                }
2578            }
2579
2580            let Some(stream) = streams[current].as_mut() else {
2581                continue;
2582            };
2583            let next_item = pending[current].take().or_else(|| stream.next());
2584            match next_item {
2585                Some(Ok(item)) => {
2586                    emitted += 1;
2587                    if emitted == segment_size {
2588                        emitted = 0;
2589                        if let Some(next) = next_active_source_stream(&streams, current) {
2590                            current = next;
2591                        }
2592                    }
2593                    return Some(Ok(item));
2594                }
2595                Some(Err(error)) => return Some(Err(error)),
2596                None => {
2597                    streams[current] = None;
2598                    emitted = 0;
2599                    if eager_close {
2600                        return None;
2601                    }
2602                    match next_active_source_stream(&streams, current) {
2603                        Some(next) => current = next,
2604                        None => return None,
2605                    }
2606                }
2607            }
2608        }
2609    }))
2610}
2611
2612fn next_active_source_stream<Out>(
2613    streams: &[Option<BoxStream<Out>>],
2614    current: usize,
2615) -> Option<usize>
2616where
2617    Out: Send + 'static,
2618{
2619    if streams.is_empty() {
2620        return None;
2621    }
2622    for offset in 1..=streams.len() {
2623        let index = (current + offset) % streams.len();
2624        if streams[index].is_some() {
2625            return Some(index);
2626        }
2627    }
2628    None
2629}