Skip to main content

datum/stream/
source.rs

1use super::flow::{self, FlowTransform};
2use super::*;
3use crate::Attributes;
4use crate::context::SourceWithContext;
5
6#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
7pub struct NotUsed;
8
9type CombinedSourceFactory<Out> =
10    dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
11
12pub struct Keep;
13
14impl Keep {
15    pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
16        left
17    }
18
19    pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
20        right
21    }
22
23    pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
24        (left, right)
25    }
26
27    pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
28        NotUsed
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum SourceCombineStrategy {
34    Merge {
35        eager_complete: bool,
36    },
37    Concat,
38    Prioritized {
39        priorities: Vec<usize>,
40        eager_complete: bool,
41    },
42}
43
44#[derive(Clone)]
45pub struct MaybeHandle<T> {
46    value: Arc<Mutex<Option<StreamResult<T>>>>,
47}
48
49impl<T> MaybeHandle<T> {
50    #[must_use]
51    pub fn is_completed(&self) -> bool {
52        self.value.lock().expect("maybe handle poisoned").is_some()
53    }
54
55    pub fn complete(&self, item: T) -> StreamResult<()> {
56        self.settle(Ok(item))
57    }
58
59    pub fn fail(&self, error: StreamError) -> StreamResult<()> {
60        self.settle(Err(error))
61    }
62
63    fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
64        let mut value = self.value.lock().expect("maybe handle poisoned");
65        if value.is_some() {
66            return Err(StreamError::Failed("maybe source already completed".into()));
67        }
68        *value = Some(result);
69        Ok(())
70    }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub struct Demand(u64);
75
76impl Demand {
77    pub const ZERO: Self = Self(0);
78    pub const ONE: Self = Self(1);
79    pub const MAX: Self = Self(u64::MAX);
80
81    #[must_use]
82    pub const fn new(available: u64) -> Self {
83        Self(available)
84    }
85
86    #[must_use]
87    pub const fn available(self) -> u64 {
88        self.0
89    }
90
91    #[must_use]
92    pub const fn is_unbounded(self) -> bool {
93        self.0 == u64::MAX
94    }
95
96    #[must_use]
97    pub const fn is_empty(self) -> bool {
98        self.0 == 0
99    }
100
101    #[must_use]
102    pub const fn saturating_add(self, rhs: Self) -> Self {
103        Self(self.0.saturating_add(rhs.0))
104    }
105
106    pub fn consume_one(&mut self) -> bool {
107        match self.0 {
108            0 => false,
109            u64::MAX => true,
110            _ => {
111                self.0 -= 1;
112                true
113            }
114        }
115    }
116}
117
118pub trait PushOutlet<T>: Send {
119    fn push(&mut self, item: T) -> StreamResult<Demand>;
120
121    fn complete(&mut self) -> StreamResult<()> {
122        Ok(())
123    }
124
125    fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
126        Err(cause)
127    }
128}
129
130#[derive(Clone)]
131pub struct Source<Out, Mat = NotUsed> {
132    pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
133    pub(crate) terminal_factory: Option<Arc<TerminalSourceFactory<Out, Mat>>>,
134    pub(super) hints: SourceHints,
135    pub(super) attributes: Attributes,
136    /// Set only for split-segment sources in fast mode. `None` for all other sources.
137    /// Cleared by all composition operators (via/map/to_mat etc.).
138    pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
139}
140
141pub(crate) type TerminalSourceFactory<Out, Mat> =
142    dyn Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)> + Send + Sync;
143
144impl<Out: Send + 'static> Source<Out, NotUsed> {
145    pub(super) fn from_factory<F>(factory: F) -> Self
146    where
147        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
148    {
149        Self::from_factory_with_hints(factory, SourceHints::default())
150    }
151
152    fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
153    where
154        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
155    {
156        Self::from_materialized_factory_with_hints(
157            move |_materializer| Ok((factory(), NotUsed)),
158            hints,
159        )
160    }
161
162    #[must_use]
163    pub fn empty() -> Self {
164        Self::from_factory_with_hints(
165            || Box::new(std::iter::empty()),
166            SourceHints::with_inline_micro(0),
167        )
168    }
169
170    #[must_use]
171    pub fn never() -> Self {
172        Self::from_materialized_factory_with_hints(
173            move |materializer| {
174                let state = Arc::clone(&materializer.inner.state);
175                Ok((
176                    Box::new(std::iter::from_fn(move || {
177                        loop {
178                            if state.shutdown.load(Ordering::SeqCst) {
179                                return Some(Err(StreamError::AbruptTermination));
180                            }
181                            if super::runtime::current_stream_cancelled()
182                                .as_ref()
183                                .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
184                            {
185                                return Some(Err(StreamError::Cancelled));
186                            }
187                            std::thread::park_timeout(Duration::from_millis(1));
188                        }
189                    })),
190                    NotUsed,
191                ))
192            },
193            SourceHints::default(),
194        )
195    }
196
197    #[must_use]
198    pub fn failed(error: StreamError) -> Self {
199        Self::from_factory_with_hints(
200            move || Box::new(std::iter::once(Err(error.clone()))),
201            // max_success_items=0: failed() emits no Ok items before the Err.
202            SourceHints::with_inline_micro(0),
203        )
204    }
205
206    #[must_use]
207    pub fn future<F, Fut>(future: F) -> Self
208    where
209        F: Fn() -> Fut + Send + Sync + 'static,
210        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
211    {
212        let future = Arc::new(future);
213        Self::from_factory(move || {
214            let future = Arc::clone(&future);
215            let mut emitted = false;
216            Box::new(std::iter::from_fn(move || {
217                if emitted {
218                    return None;
219                }
220                emitted = true;
221                Some(
222                    catch_unwind_failed("source future factory", || future())
223                        .and_then(flow::run_future_inline_or_spawn),
224                )
225            }))
226        })
227    }
228
229    #[must_use]
230    pub fn future_source<F, Fut>(future: F) -> Self
231    where
232        F: Fn() -> Fut + Send + Sync + 'static,
233        Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
234    {
235        let future = Arc::new(future);
236        Self::from_materialized_factory(move |materializer| {
237            let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
238            let future = Arc::clone(&future);
239            let mut current = None::<BoxStream<Out>>;
240            let mut initialized = false;
241            let mut terminated = false;
242            Ok((
243                Box::new(std::iter::from_fn(move || {
244                    if terminated {
245                        return None;
246                    }
247
248                    loop {
249                        if let Some(stream) = current.as_mut() {
250                            match stream.next() {
251                                Some(item) => return Some(item),
252                                None => {
253                                    terminated = true;
254                                    return None;
255                                }
256                            }
257                        }
258
259                        if initialized {
260                            terminated = true;
261                            return None;
262                        }
263                        initialized = true;
264                        let source = match catch_unwind_failed("future_source factory", || future())
265                            .and_then(flow::run_future_inline_or_spawn)
266                        {
267                            Ok(source) => source,
268                            Err(error) => {
269                                terminated = true;
270                                return Some(Err(error));
271                            }
272                        };
273                        current = Some(match Arc::clone(&source.factory).create(&materializer) {
274                            Ok((stream, _)) => stream,
275                            Err(error) => {
276                                terminated = true;
277                                return Some(Err(error));
278                            }
279                        });
280                    }
281                })) as BoxStream<Out>,
282                NotUsed,
283            ))
284        })
285    }
286
287    #[must_use]
288    pub fn cycle<F, I>(factory: F) -> Self
289    where
290        F: Fn() -> I + Send + Sync + 'static,
291        I: IntoIterator<Item = Out>,
292        I::IntoIter: Send + 'static,
293    {
294        let factory = Arc::new(factory);
295        Self::from_factory(move || {
296            let factory = Arc::clone(&factory);
297            let mut current = None::<I::IntoIter>;
298            let mut terminated = false;
299            Box::new(std::iter::from_fn(move || {
300                if terminated {
301                    return None;
302                }
303
304                if let Some(iter) = current.as_mut()
305                    && let Some(item) = iter.next()
306                {
307                    return Some(Ok(item));
308                }
309
310                let mut next = match catch_unwind_failed("cycle factory", || factory()) {
311                    Ok(iterable) => iterable.into_iter(),
312                    Err(error) => {
313                        terminated = true;
314                        return Some(Err(error));
315                    }
316                };
317                match next.next() {
318                    Some(item) => {
319                        current = Some(next);
320                        Some(Ok(item))
321                    }
322                    None => {
323                        terminated = true;
324                        Some(Err(StreamError::Failed("empty iterator".into())))
325                    }
326                }
327            }))
328        })
329    }
330
331    #[must_use]
332    pub fn unfold<State, F>(initial: State, f: F) -> Self
333    where
334        State: Clone + Send + Sync + 'static,
335        F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
336    {
337        let f = Arc::new(f);
338        Self::from_factory(move || {
339            let f = Arc::clone(&f);
340            let mut state = Some(initial.clone());
341            let mut terminated = false;
342            Box::new(std::iter::from_fn(move || {
343                if terminated {
344                    return None;
345                }
346                let current = state.take().expect("unfold state present");
347                match catch_unwind_failed("unfold function", || f(current)) {
348                    Ok(Some((next_state, item))) => {
349                        state = Some(next_state);
350                        Some(Ok(item))
351                    }
352                    Ok(None) => {
353                        terminated = true;
354                        None
355                    }
356                    Err(error) => {
357                        terminated = true;
358                        Some(Err(error))
359                    }
360                }
361            }))
362        })
363    }
364
365    #[must_use]
366    pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
367    where
368        State: Clone + Send + Sync + 'static,
369        F: Fn(State) -> Fut + Send + Sync + 'static,
370        Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
371    {
372        let f = Arc::new(f);
373        Self::from_factory(move || {
374            let f = Arc::clone(&f);
375            let mut state = Some(initial.clone());
376            let mut terminated = false;
377            Box::new(std::iter::from_fn(move || {
378                if terminated {
379                    return None;
380                }
381                let current = state.take().expect("unfold_async state present");
382                match catch_unwind_failed("unfold_async factory", || f(current))
383                    .and_then(flow::run_future_inline_or_spawn)
384                {
385                    Ok(Some((next_state, item))) => {
386                        state = Some(next_state);
387                        Some(Ok(item))
388                    }
389                    Ok(None) => {
390                        terminated = true;
391                        None
392                    }
393                    Err(error) => {
394                        terminated = true;
395                        Some(Err(error))
396                    }
397                }
398            }))
399        })
400    }
401
402    #[must_use]
403    pub fn unfold_resource<Resource, Create, Read, Close>(
404        create: Create,
405        read: Read,
406        close: Close,
407    ) -> Self
408    where
409        Resource: Send + 'static,
410        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
411        Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
412        Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
413    {
414        let create = Arc::new(create);
415        let read = Arc::new(read);
416        let close = Arc::new(close);
417        Self::from_factory(move || {
418            Box::new(UnfoldResourceStream {
419                create: Arc::clone(&create),
420                read: Arc::clone(&read),
421                close: Arc::clone(&close),
422                resource: None,
423                created: false,
424                terminated: false,
425                _marker: PhantomData,
426            })
427        })
428    }
429
430    #[must_use]
431    pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
432        create: Create,
433        read: Read,
434        close: Close,
435    ) -> Self
436    where
437        Resource: Send + 'static,
438        Create: Fn() -> CreateFut + Send + Sync + 'static,
439        CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
440        Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
441        ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
442        Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
443        CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
444    {
445        let create = Arc::new(create);
446        let read = Arc::new(read);
447        let close = Arc::new(close);
448        Self::from_factory(move || {
449            Box::new(UnfoldResourceAsyncStream {
450                create: Arc::clone(&create),
451                read: Arc::clone(&read),
452                close: Arc::clone(&close),
453                resource: None,
454                created: false,
455                terminated: false,
456                _marker: PhantomData,
457            })
458        })
459    }
460
461    #[must_use]
462    pub fn lazy_single<F>(create: F) -> Self
463    where
464        F: Fn() -> Out + Send + Sync + 'static,
465    {
466        let create = Arc::new(create);
467        Self::from_factory(move || {
468            let create = Arc::clone(&create);
469            let mut emitted = false;
470            Box::new(std::iter::from_fn(move || {
471                if emitted {
472                    return None;
473                }
474                emitted = true;
475                Some(catch_unwind_failed("lazy_single factory", || create()))
476            }))
477        })
478    }
479
480    #[must_use]
481    pub fn lazy_future<F, Fut>(create: F) -> Self
482    where
483        F: Fn() -> Fut + Send + Sync + 'static,
484        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
485    {
486        let create = Arc::new(create);
487        Self::from_factory(move || {
488            let create = Arc::clone(&create);
489            let mut emitted = false;
490            Box::new(std::iter::from_fn(move || {
491                if emitted {
492                    return None;
493                }
494                emitted = true;
495                Some(
496                    catch_unwind_failed("lazy_future factory", || create())
497                        .and_then(flow::run_future_inline_or_spawn),
498                )
499            }))
500        })
501    }
502
503    #[must_use]
504    pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
505    where
506        InnerMat: Send + 'static,
507        F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
508    {
509        let create = Arc::new(create);
510        Source::from_materialized_factory(move |materializer| {
511            let (sender, receiver) = oneshot::channel();
512            Ok((
513                Box::new(LazySourceStream {
514                    create: Arc::clone(&create),
515                    materializer: materializer
516                        .with_name_prefix(materializer.name_prefix().to_owned()),
517                    current: None,
518                    mat_sender: Some(sender),
519                    initialized: false,
520                    terminated: false,
521                }) as BoxStream<Out>,
522                StreamCompletion::from_receiver(receiver, None),
523            ))
524        })
525    }
526
527    #[must_use]
528    pub fn lazy_future_source<InnerMat, F, Fut>(
529        create: F,
530    ) -> Source<Out, StreamCompletion<InnerMat>>
531    where
532        InnerMat: Send + 'static,
533        F: Fn() -> Fut + Send + Sync + 'static,
534        Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
535    {
536        let create = Arc::new(create);
537        Source::from_materialized_factory(move |materializer| {
538            let (sender, receiver) = oneshot::channel();
539            Ok((
540                Box::new(LazyFutureSourceStream {
541                    create: Arc::clone(&create),
542                    materializer: materializer
543                        .with_name_prefix(materializer.name_prefix().to_owned()),
544                    current: None,
545                    mat_sender: Some(sender),
546                    initialized: false,
547                    terminated: false,
548                    _marker: PhantomData,
549                }) as BoxStream<Out>,
550                StreamCompletion::from_receiver(receiver, None),
551            ))
552        })
553    }
554
555    #[must_use]
556    pub fn from_fn_iter<F, I>(factory: F) -> Self
557    where
558        F: Fn() -> I + Send + Sync + 'static,
559        I: IntoIterator<Item = Out>,
560        I::IntoIter: Send + 'static,
561    {
562        Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
563    }
564}
565
566impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
567    fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
568    where
569        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
570    {
571        Self {
572            factory: Arc::new(FnSourceFactory(factory)),
573            terminal_factory: None,
574            hints,
575            attributes: Attributes::default(),
576            split_hook: None,
577        }
578    }
579
580    pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
581    where
582        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
583    {
584        Self::from_materialized_factory_with_hints(factory, SourceHints::default())
585    }
586
587    pub(crate) fn from_terminal_direct_materialized_factory<F, D>(
588        factory: F,
589        terminal_factory: D,
590    ) -> Self
591    where
592        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
593        D: Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)>
594            + Send
595            + Sync
596            + 'static,
597    {
598        Self {
599            factory: Arc::new(FnSourceFactory(factory)),
600            terminal_factory: Some(Arc::new(terminal_factory)),
601            hints: SourceHints::with_terminal_consumer_batch(),
602            attributes: Attributes::default(),
603            split_hook: None,
604        }
605    }
606
607    #[must_use]
608    pub fn as_source_with_context<Ctx, F>(
609        self,
610        extract_context: F,
611    ) -> SourceWithContext<Out, Ctx, Mat>
612    where
613        Ctx: Send + 'static,
614        F: Fn(&Out) -> Ctx + Send + Sync + 'static,
615    {
616        SourceWithContext::from_source(self.map(move |item| {
617            let context = extract_context(&item);
618            (item, context)
619        }))
620    }
621
622    #[must_use]
623    pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
624    where
625        Next: Send + 'static,
626        FlowMat: Send + 'static,
627    {
628        self.via_mat(flow, Keep::left)
629    }
630
631    #[must_use]
632    pub fn via_mat<Next, FlowMat, Combined, F>(
633        self,
634        flow: Flow<Out, Next, FlowMat>,
635        combine: F,
636    ) -> Source<Next, Combined>
637    where
638        Next: Send + 'static,
639        FlowMat: Send + 'static,
640        Combined: Send + 'static,
641        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
642    {
643        let source = self.factory;
644        let transform = flow.transform;
645        let materialize_flow = flow.materialize;
646        let hints = self.hints.after_flow(flow.hints);
647        let combine = Arc::new(combine);
648        Source::from_materialized_factory_with_hints(
649            move |materializer| {
650                let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
651                let flow_mat = materialize_flow()?;
652                let stream = match &transform {
653                    FlowTransform::Pure(transform) => transform(stream),
654                    FlowTransform::Runtime(transform) => transform(stream, materializer)?,
655                };
656                Ok((stream, combine(source_mat, flow_mat)))
657            },
658            hints,
659        )
660    }
661
662    #[must_use]
663    pub fn via_mat_with<Next, FlowMat, Combined, F>(
664        self,
665        flow: Flow<Out, Next, FlowMat>,
666        combine: F,
667    ) -> Source<Next, Combined>
668    where
669        Next: Send + 'static,
670        FlowMat: Send + 'static,
671        Combined: Send + 'static,
672        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
673    {
674        self.via_mat(flow, combine)
675    }
676
677    #[must_use]
678    pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
679    where
680        Next: Send + 'static,
681        F: Fn(Out) -> Next + Send + Sync + 'static,
682    {
683        let hints = self.hints.without_inline_micro();
684        Source {
685            factory: Arc::new(MapSourceFactory {
686                source: self.factory,
687                stage: f,
688                _marker: PhantomData,
689            }),
690            terminal_factory: None,
691            hints,
692            attributes: self.attributes,
693            split_hook: None,
694        }
695    }
696
697    #[must_use]
698    pub fn attributes(&self) -> &Attributes {
699        &self.attributes
700    }
701
702    #[must_use]
703    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
704        self.attributes = attributes;
705        self
706    }
707
708    #[must_use]
709    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
710        self.attributes = self.attributes.and(attributes);
711        self
712    }
713
714    #[must_use]
715    pub fn named(self, name: impl Into<String>) -> Self {
716        self.add_attributes(Attributes::named(name))
717    }
718
719    /// Insert an async boundary after this source.
720    ///
721    /// The boundary uses the same [`crate::AsyncBoundaryExecutionConfig`] defaults
722    /// as the GraphDSL `AsyncBoundary` runner and hands elements across a bounded
723    /// Ractor-backed queue. `async_boundary` is the Rust-friendly primary name;
724    /// `Source::r#async` is provided as the Akka-mirroring alias.
725    #[must_use]
726    pub fn async_boundary(self) -> Self {
727        self.via(Flow::identity().async_boundary())
728    }
729
730    /// Akka-mirroring alias for [`Source::async_boundary`].
731    #[must_use]
732    pub fn r#async(self) -> Self {
733        self.async_boundary()
734    }
735
736    /// Insert an async boundary with an explicit bounded handoff configuration.
737    ///
738    /// `config.buffer_size` controls the number of elements that may be queued
739    /// between the upstream and downstream fused regions. A zero buffer is
740    /// rejected when the stream is materialized.
741    #[must_use]
742    pub fn async_boundary_with_config(
743        self,
744        config: crate::graph::AsyncBoundaryExecutionConfig,
745    ) -> Self {
746        self.via(Flow::identity().async_boundary_with_config(config))
747    }
748
749    /// Insert an async boundary with a custom bounded handoff size.
750    #[must_use]
751    pub fn async_boundary_with_buffer(self, buffer_size: usize) -> Self {
752        self.via(Flow::identity().async_boundary_with_buffer(buffer_size))
753    }
754
755    #[must_use]
756    pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
757    where
758        Next: Send + 'static,
759        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
760    {
761        self.via(Flow::identity().map_result(f))
762    }
763
764    #[must_use]
765    pub fn map_result_with_supervision<Next, F>(
766        self,
767        f: F,
768        decider: SupervisionDecider,
769    ) -> Source<Next, Mat>
770    where
771        Next: Send + 'static,
772        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
773    {
774        self.via(Flow::identity().map_result_with_supervision(f, decider))
775    }
776
777    #[must_use]
778    pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
779    where
780        F: Fn(&Out) -> bool + Send + Sync + 'static,
781    {
782        self.via(Flow::identity().filter(predicate))
783    }
784
785    #[must_use]
786    pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
787    where
788        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
789    {
790        self.via(Flow::identity().filter_result(predicate))
791    }
792
793    #[must_use]
794    pub fn filter_result_with_supervision<F>(
795        self,
796        predicate: F,
797        decider: SupervisionDecider,
798    ) -> Source<Out, Mat>
799    where
800        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
801    {
802        self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
803    }
804
805    #[must_use]
806    pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
807    where
808        F: Fn(&Out) -> bool + Send + Sync + 'static,
809    {
810        self.via(Flow::identity().filter_not(predicate))
811    }
812
813    #[must_use]
814    pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
815    where
816        Next: Send + 'static,
817        F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
818    {
819        self.via(Flow::identity().filter_map(f))
820    }
821
822    #[must_use]
823    pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
824    where
825        Next: Send + 'static,
826        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
827    {
828        self.via(Flow::identity().filter_map_result(f))
829    }
830
831    #[must_use]
832    pub fn filter_map_result_with_supervision<Next, F>(
833        self,
834        f: F,
835        decider: SupervisionDecider,
836    ) -> Source<Next, Mat>
837    where
838        Next: Send + 'static,
839        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
840    {
841        self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
842    }
843
844    #[must_use]
845    pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
846    where
847        Next: Send + 'static,
848        F: Fn(Out) -> I + Send + Sync + 'static,
849        I: IntoIterator<Item = Next>,
850        I::IntoIter: Send + 'static,
851    {
852        self.via(Flow::identity().map_concat(f))
853    }
854
855    #[must_use]
856    pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
857    where
858        Next: Send + 'static,
859        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
860        I: IntoIterator<Item = Next>,
861        I::IntoIter: Send + 'static,
862    {
863        self.via(Flow::identity().map_concat_result(f))
864    }
865
866    #[must_use]
867    pub fn map_concat_result_with_supervision<Next, F, I>(
868        self,
869        f: F,
870        decider: SupervisionDecider,
871    ) -> Source<Next, Mat>
872    where
873        Next: Send + 'static,
874        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
875        I: IntoIterator<Item = Next>,
876        I::IntoIter: Send + 'static,
877    {
878        self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
879    }
880
881    #[must_use]
882    pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
883    where
884        State: Clone + Send + Sync + 'static,
885        Next: Send + 'static,
886        F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
887    {
888        self.via(Flow::identity().stateful_map(seed, f))
889    }
890
891    #[must_use]
892    pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
893    where
894        State: Clone + Send + Sync + 'static,
895        Next: Send + 'static,
896        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
897    {
898        self.via(Flow::identity().stateful_map_result(seed, f))
899    }
900
901    #[must_use]
902    pub fn stateful_map_result_with_supervision<State, Next, F>(
903        self,
904        seed: State,
905        f: F,
906        decider: SupervisionDecider,
907    ) -> Source<Next, Mat>
908    where
909        State: Clone + Send + Sync + 'static,
910        Next: Send + 'static,
911        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
912    {
913        self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
914    }
915
916    #[must_use]
917    pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
918    where
919        State: Clone + Send + Sync + 'static,
920        Next: Send + 'static,
921        F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
922        I: IntoIterator<Item = Next>,
923        I::IntoIter: Send + 'static,
924    {
925        self.via(Flow::identity().stateful_map_concat(seed, f))
926    }
927
928    #[must_use]
929    pub fn stateful_map_concat_result<State, Next, F, I>(
930        self,
931        seed: State,
932        f: F,
933    ) -> Source<Next, Mat>
934    where
935        State: Clone + Send + Sync + 'static,
936        Next: Send + 'static,
937        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
938        I: IntoIterator<Item = Next>,
939        I::IntoIter: Send + 'static,
940    {
941        self.via(Flow::identity().stateful_map_concat_result(seed, f))
942    }
943
944    #[must_use]
945    pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
946        self,
947        seed: State,
948        f: F,
949        decider: SupervisionDecider,
950    ) -> Source<Next, Mat>
951    where
952        State: Clone + Send + Sync + 'static,
953        Next: Send + 'static,
954        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
955        I: IntoIterator<Item = Next>,
956        I::IntoIter: Send + 'static,
957    {
958        self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
959    }
960
961    #[must_use]
962    pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
963    where
964        Next: Send + 'static,
965        F: Fn(Out) -> Fut + Send + Sync + 'static,
966        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
967    {
968        self.via(Flow::identity().map_async(parallelism, f))
969    }
970
971    #[must_use]
972    pub fn map_async_with_supervision<Next, F, Fut>(
973        self,
974        parallelism: usize,
975        f: F,
976        decider: SupervisionDecider,
977    ) -> Source<Next, Mat>
978    where
979        Next: Send + 'static,
980        F: Fn(Out) -> Fut + Send + Sync + 'static,
981        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
982    {
983        self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
984    }
985
986    #[must_use]
987    pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
988    where
989        Next: Send + 'static,
990        F: Fn(Out) -> Fut + Send + Sync + 'static,
991        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
992    {
993        self.via(Flow::identity().map_async_unordered(parallelism, f))
994    }
995
996    #[must_use]
997    pub fn map_async_unordered_with_supervision<Next, F, Fut>(
998        self,
999        parallelism: usize,
1000        f: F,
1001        decider: SupervisionDecider,
1002    ) -> Source<Next, Mat>
1003    where
1004        Next: Send + 'static,
1005        F: Fn(Out) -> Fut + Send + Sync + 'static,
1006        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1007    {
1008        self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
1009    }
1010
1011    #[must_use]
1012    pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
1013        self,
1014        parallelism: usize,
1015        per_partition: usize,
1016        partition: Partition,
1017        f: F,
1018    ) -> Source<Next, Mat>
1019    where
1020        Key: Clone + Eq + Hash + Send + 'static,
1021        Next: Send + 'static,
1022        Partition: Fn(&Out) -> Key + Send + Sync + 'static,
1023        F: Fn(Out) -> Fut + Send + Sync + 'static,
1024        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1025    {
1026        self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
1027    }
1028
1029    #[must_use]
1030    pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
1031        self.via(Flow::identity().prefix_and_tail(n))
1032    }
1033
1034    #[must_use]
1035    pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
1036    where
1037        Next: Send + 'static,
1038        FlowMat: Send + 'static,
1039        F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
1040        Out: Clone,
1041    {
1042        self.via(Flow::identity().flat_map_prefix(n, f))
1043    }
1044
1045    #[must_use]
1046    pub fn group_by<Key, F>(
1047        self,
1048        max_substreams: usize,
1049        f: F,
1050        allow_closed_substream_recreation: bool,
1051    ) -> Source<Source<Out>, Mat>
1052    where
1053        Key: Clone + Eq + Hash + Send + 'static,
1054        F: Fn(&Out) -> Key + Send + Sync + 'static,
1055        Out: Clone,
1056    {
1057        let batch_mode = if self.hints.inline_micro.is_some() && !allow_closed_substream_recreation
1058        {
1059            flow::GroupByBatchMode::FiniteEagerNoRecreate
1060        } else {
1061            flow::GroupByBatchMode::Immediate
1062        };
1063        self.via(Flow::identity().group_by_with_batching(
1064            max_substreams,
1065            f,
1066            allow_closed_substream_recreation,
1067            batch_mode,
1068        ))
1069    }
1070
1071    #[must_use]
1072    pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1073    where
1074        F: Fn(&Out) -> bool + Send + Sync + 'static,
1075        Out: Clone,
1076    {
1077        self.via(Flow::identity().split_when(predicate))
1078    }
1079
1080    #[must_use]
1081    pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1082    where
1083        F: Fn(&Out) -> bool + Send + Sync + 'static,
1084        Out: Clone,
1085    {
1086        self.via(Flow::identity().split_after(predicate))
1087    }
1088
1089    #[must_use]
1090    pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
1091    where
1092        Next: Send + 'static,
1093        NextMat: Send + 'static,
1094        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1095    {
1096        self.via(Flow::identity().flat_map_concat(f))
1097    }
1098
1099    #[must_use]
1100    pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
1101    where
1102        Next: Send + 'static,
1103        NextMat: Send + 'static,
1104        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1105    {
1106        self.via(Flow::identity().flat_map_merge(breadth, f))
1107    }
1108
1109    #[must_use]
1110    pub fn take(self, n: usize) -> Source<Out, Mat> {
1111        self.via(Flow::identity().take(n))
1112    }
1113
1114    #[must_use]
1115    pub fn drop(self, n: usize) -> Source<Out, Mat> {
1116        self.via(Flow::identity().drop(n))
1117    }
1118
1119    #[must_use]
1120    pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
1121    where
1122        F: Fn(&Out) -> bool + Send + Sync + 'static,
1123    {
1124        self.via(Flow::identity().take_while(predicate))
1125    }
1126
1127    #[must_use]
1128    pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
1129    where
1130        F: Fn(&Out) -> bool + Send + Sync + 'static,
1131    {
1132        self.via(Flow::identity().drop_while(predicate))
1133    }
1134
1135    #[must_use]
1136    pub fn limit(self, max: u64) -> Source<Out, Mat> {
1137        self.via(Flow::identity().limit(max))
1138    }
1139
1140    #[must_use]
1141    pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
1142        self.via(Flow::identity().grouped(size))
1143    }
1144
1145    #[must_use]
1146    pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1147    where
1148        State: Clone + Send + Sync + 'static,
1149        F: Fn(State, Out) -> State + Send + Sync + 'static,
1150    {
1151        self.via(Flow::identity().scan(seed, f))
1152    }
1153
1154    #[must_use]
1155    pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
1156    where
1157        State: Clone + Send + Sync + 'static,
1158        F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1159        Fut: Future<Output = StreamResult<State>> + Send + 'static,
1160    {
1161        self.via(Flow::identity().scan_async(seed, f))
1162    }
1163
1164    #[must_use]
1165    pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1166    where
1167        State: Clone + Send + Sync + 'static,
1168        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1169    {
1170        self.via(Flow::identity().scan_result(seed, f))
1171    }
1172
1173    #[must_use]
1174    pub fn scan_result_with_supervision<State, F>(
1175        self,
1176        seed: State,
1177        f: F,
1178        decider: SupervisionDecider,
1179    ) -> Source<State, Mat>
1180    where
1181        State: Clone + Send + Sync + 'static,
1182        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1183    {
1184        self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
1185    }
1186
1187    #[must_use]
1188    pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
1189    where
1190        Out: Clone,
1191    {
1192        self.via(Flow::identity().sliding(size, step))
1193    }
1194
1195    #[must_use]
1196    pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1197    where
1198        Acc: Clone + Send + Sync + 'static,
1199        F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1200    {
1201        self.via(Flow::identity().fold(zero, f))
1202    }
1203
1204    #[must_use]
1205    pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1206    where
1207        Acc: Clone + Send + Sync + 'static,
1208        F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1209        Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1210    {
1211        self.via(Flow::identity().fold_async(zero, f))
1212    }
1213
1214    #[must_use]
1215    pub fn map_with_resource<Resource, Next, Create, F, Close>(
1216        self,
1217        create: Create,
1218        f: F,
1219        close: Close,
1220    ) -> Source<Next, Mat>
1221    where
1222        Resource: Send + 'static,
1223        Next: Send + 'static,
1224        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1225        F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1226        Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1227    {
1228        self.via(Flow::identity().map_with_resource(create, f, close))
1229    }
1230
1231    #[must_use]
1232    pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1233    where
1234        Acc: Clone + Send + Sync + 'static,
1235        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1236    {
1237        self.via(Flow::identity().fold_result(zero, f))
1238    }
1239
1240    #[must_use]
1241    pub fn fold_result_with_supervision<Acc, F>(
1242        self,
1243        zero: Acc,
1244        f: F,
1245        decider: SupervisionDecider,
1246    ) -> Source<Acc, Mat>
1247    where
1248        Acc: Clone + Send + Sync + 'static,
1249        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1250    {
1251        self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
1252    }
1253
1254    #[must_use]
1255    pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
1256    where
1257        F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1258    {
1259        self.via(Flow::identity().reduce(f))
1260    }
1261
1262    #[must_use]
1263    pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
1264    where
1265        Out: Clone,
1266        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1267    {
1268        self.via(Flow::identity().reduce_result(f))
1269    }
1270
1271    #[must_use]
1272    pub fn reduce_result_with_supervision<F>(
1273        self,
1274        f: F,
1275        decider: SupervisionDecider,
1276    ) -> Source<Out, Mat>
1277    where
1278        Out: Clone,
1279        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1280    {
1281        self.via(Flow::identity().reduce_result_with_supervision(f, decider))
1282    }
1283
1284    #[must_use]
1285    pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
1286    where
1287        F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
1288    {
1289        self.via(Flow::identity().map_error(f))
1290    }
1291
1292    #[must_use]
1293    pub fn recover<F>(self, f: F) -> Source<Out, Mat>
1294    where
1295        F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
1296    {
1297        self.via(Flow::identity().recover(f))
1298    }
1299
1300    #[must_use]
1301    pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
1302    where
1303        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1304    {
1305        self.via(Flow::identity().recover_with(f))
1306    }
1307
1308    #[must_use]
1309    pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
1310    where
1311        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1312    {
1313        self.via(Flow::identity().recover_with_retries(retries, f))
1314    }
1315
1316    #[must_use]
1317    pub fn on_error_complete(self) -> Source<Out, Mat> {
1318        self.via(Flow::identity().on_error_complete())
1319    }
1320
1321    #[must_use]
1322    pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1323    where
1324        Mat2: Send + 'static,
1325    {
1326        let factory = self.factory;
1327        let hints = self.hints;
1328        let that_factory = that.factory;
1329        Source::from_materialized_factory_with_hints(
1330            move |materializer| {
1331                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1332                let secondary = match Arc::clone(&that_factory).create(materializer) {
1333                    Ok((stream, _)) => stream,
1334                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1335                };
1336                Ok((concat_source_streams(vec![primary, secondary]), mat))
1337            },
1338            hints,
1339        )
1340    }
1341
1342    #[must_use]
1343    pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1344    where
1345        Mat2: Send + 'static,
1346    {
1347        let factory = self.factory;
1348        let hints = self.hints;
1349        let that_factory = that.factory;
1350        Source::from_materialized_factory_with_hints(
1351            move |materializer| {
1352                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1353                Ok((
1354                    concat_source_streams_lazy(
1355                        primary,
1356                        vec![Arc::clone(&that_factory)],
1357                        materializer,
1358                    ),
1359                    mat,
1360                ))
1361            },
1362            hints,
1363        )
1364    }
1365
1366    #[must_use]
1367    pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
1368    where
1369        Mat2: Send + 'static,
1370        I: IntoIterator<Item = Source<Out, Mat2>>,
1371    {
1372        let factory = self.factory;
1373        let hints = self.hints;
1374        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1375        Source::from_materialized_factory_with_hints(
1376            move |materializer| {
1377                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1378                Ok((
1379                    concat_source_streams_lazy(primary, other_factories.clone(), materializer),
1380                    mat,
1381                ))
1382            },
1383            hints,
1384        )
1385    }
1386
1387    #[must_use]
1388    pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1389    where
1390        Mat2: Send + 'static,
1391    {
1392        let factory = self.factory;
1393        let hints = self.hints;
1394        let that_factory = that.factory;
1395        Source::from_materialized_factory_with_hints(
1396            move |materializer| {
1397                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1398                let secondary = match Arc::clone(&that_factory).create(materializer) {
1399                    Ok((stream, _)) => stream,
1400                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1401                };
1402                Ok((concat_source_streams(vec![secondary, primary]), mat))
1403            },
1404            hints,
1405        )
1406    }
1407
1408    #[must_use]
1409    pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1410    where
1411        Mat2: Send + 'static,
1412    {
1413        self.prepend(that)
1414    }
1415
1416    #[must_use]
1417    pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
1418    where
1419        Mat2: Send + 'static,
1420    {
1421        let factory = self.factory;
1422        let hints = self.hints;
1423        let secondary_factory = secondary.factory;
1424        Source::from_materialized_factory_with_hints(
1425            move |materializer| {
1426                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1427                let secondary = match Arc::clone(&secondary_factory).create(materializer) {
1428                    Ok((stream, _)) => stream,
1429                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1430                };
1431                Ok((or_else_source_stream(primary, secondary), mat))
1432            },
1433            hints,
1434        )
1435    }
1436
1437    #[must_use]
1438    pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
1439    where
1440        Mat2: Send + 'static,
1441    {
1442        self.interleave_all([that], segment_size, false)
1443    }
1444
1445    #[must_use]
1446    pub fn interleave_all<Mat2, I>(
1447        self,
1448        those: I,
1449        segment_size: usize,
1450        eager_close: bool,
1451    ) -> Source<Out, Mat>
1452    where
1453        Mat2: Send + 'static,
1454        I: IntoIterator<Item = Source<Out, Mat2>>,
1455    {
1456        let factory = self.factory;
1457        let hints = self.hints;
1458        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1459        Source::from_materialized_factory_with_hints(
1460            move |materializer| {
1461                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1462                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1463                streams.push(primary);
1464                for other in &other_factories {
1465                    let stream = match Arc::clone(other).create(materializer) {
1466                        Ok((stream, _)) => stream,
1467                        Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1468                    };
1469                    streams.push(stream);
1470                }
1471                Ok((
1472                    interleave_source_streams(streams, segment_size, eager_close),
1473                    mat,
1474                ))
1475            },
1476            hints,
1477        )
1478    }
1479
1480    #[must_use]
1481    pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1482    where
1483        Out: Ord,
1484        Mat2: Send + 'static,
1485    {
1486        self.via(Flow::identity().merge_sorted(that))
1487    }
1488
1489    #[must_use]
1490    pub fn merge_latest<Mat2>(
1491        self,
1492        that: Source<Out, Mat2>,
1493        eager_complete: bool,
1494    ) -> Source<Vec<Out>, Mat>
1495    where
1496        Out: Clone,
1497        Mat2: Send + 'static,
1498    {
1499        let factory = self.factory;
1500        let hints = self.hints;
1501        let that_factory = that.factory;
1502        Source::from_materialized_factory_with_hints(
1503            move |materializer| {
1504                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1505                let right = match Arc::clone(&that_factory).create(materializer) {
1506                    Ok((stream, _)) => stream,
1507                    Err(error) => {
1508                        return Ok((
1509                            Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
1510                            mat,
1511                        ));
1512                    }
1513                };
1514                Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
1515            },
1516            hints,
1517        )
1518    }
1519
1520    #[must_use]
1521    pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
1522    where
1523        Mat2: Send + 'static,
1524        I: IntoIterator<Item = Source<Out, Mat2>>,
1525    {
1526        let factory = self.factory;
1527        let hints = self.hints;
1528        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1529        Source::from_materialized_factory_with_hints(
1530            move |materializer| {
1531                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1532                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1533                streams.push(primary);
1534                for other in &other_factories {
1535                    let stream = match Arc::clone(other).create(materializer) {
1536                        Ok((stream, _)) => stream,
1537                        Err(error) => {
1538                            return Ok((
1539                                Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1540                                mat,
1541                            ));
1542                        }
1543                    };
1544                    streams.push(stream);
1545                }
1546                Ok((merge_streams(streams, eager_complete), mat))
1547            },
1548            hints,
1549        )
1550    }
1551
1552    #[must_use]
1553    pub fn zip_with<Mat2, Out2, Next, F>(
1554        self,
1555        that: Source<Out2, Mat2>,
1556        combine: F,
1557    ) -> Source<Next, Mat>
1558    where
1559        Out2: Send + 'static,
1560        Next: Send + 'static,
1561        Mat2: Send + 'static,
1562        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1563    {
1564        self.via(Flow::identity().zip_with(that, combine))
1565    }
1566
1567    #[must_use]
1568    pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
1569    where
1570        Out: Clone,
1571        Out2: Clone + Send + 'static,
1572        Mat2: Send + 'static,
1573    {
1574        self.zip_latest_with(that, true, |left, right| (left, right))
1575    }
1576
1577    #[must_use]
1578    pub fn zip_latest_with<Mat2, Out2, Next, F>(
1579        self,
1580        that: Source<Out2, Mat2>,
1581        eager_complete: bool,
1582        combine: F,
1583    ) -> Source<Next, Mat>
1584    where
1585        Out: Clone,
1586        Out2: Clone + Send + 'static,
1587        Next: Send + 'static,
1588        Mat2: Send + 'static,
1589        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1590    {
1591        let factory = self.factory;
1592        let hints = self.hints;
1593        let that_factory = that.factory;
1594        let combine = Arc::new(combine);
1595        Source::from_materialized_factory_with_hints(
1596            move |materializer| {
1597                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1598                let right = match Arc::clone(&that_factory).create(materializer) {
1599                    Ok((stream, _)) => stream,
1600                    Err(error) => {
1601                        return Ok((
1602                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1603                            mat,
1604                        ));
1605                    }
1606                };
1607                Ok((
1608                    zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
1609                    mat,
1610                ))
1611            },
1612            hints,
1613        )
1614    }
1615
1616    #[must_use]
1617    pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
1618        let factory = self.factory;
1619        let hints = self.hints;
1620        Source::from_materialized_factory_with_hints(
1621            move |materializer| {
1622                let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
1623                let mut index = 0_u64;
1624                Ok((
1625                    Box::new(std::iter::from_fn(move || {
1626                        stream.next().map(|item| {
1627                            item.map(|value| {
1628                                let pair = (value, index);
1629                                index = index.wrapping_add(1);
1630                                pair
1631                            })
1632                        })
1633                    })) as BoxStream<(Out, u64)>,
1634                    mat,
1635                ))
1636            },
1637            hints,
1638        )
1639    }
1640
1641    #[must_use]
1642    pub fn zip_all<Mat2, Out2>(
1643        self,
1644        that: Source<Out2, Mat2>,
1645        this_elem: Out,
1646        that_elem: Out2,
1647    ) -> Source<(Out, Out2), Mat>
1648    where
1649        Out: Clone + Sync,
1650        Out2: Clone + Send + Sync + 'static,
1651        Mat2: Send + 'static,
1652    {
1653        let factory = self.factory;
1654        let hints = self.hints;
1655        let that_factory = that.factory;
1656        Source::from_materialized_factory_with_hints(
1657            move |materializer| {
1658                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1659                let right = match Arc::clone(&that_factory).create(materializer) {
1660                    Ok((stream, _)) => stream,
1661                    Err(error) => {
1662                        return Ok((
1663                            Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
1664                            mat,
1665                        ));
1666                    }
1667                };
1668                Ok((
1669                    zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
1670                    mat,
1671                ))
1672            },
1673            hints,
1674        )
1675    }
1676
1677    #[must_use]
1678    pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1679    where
1680        Out: Clone,
1681        SinkMat: Send + 'static,
1682    {
1683        self.via(Flow::identity().also_to(sink))
1684    }
1685
1686    #[must_use]
1687    pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
1688    where
1689        Out: Clone,
1690        SinkMat: Send + 'static,
1691        I: IntoIterator<Item = Sink<Out, SinkMat>>,
1692    {
1693        self.via(Flow::identity().also_to_all(sinks))
1694    }
1695
1696    #[must_use]
1697    pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
1698    where
1699        SinkMat: Send + 'static,
1700        F: Fn(&Out) -> bool + Send + Sync + 'static,
1701    {
1702        self.via(Flow::identity().divert_to(sink, predicate))
1703    }
1704
1705    #[must_use]
1706    pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1707    where
1708        Out: Clone,
1709        SinkMat: Send + 'static,
1710    {
1711        self.via(Flow::identity().wire_tap(sink))
1712    }
1713
1714    pub fn run_with<SinkMat: Send + 'static>(
1715        self,
1716        sink: Sink<Out, SinkMat>,
1717    ) -> StreamResult<SinkMat> {
1718        // Split-segment fast path: if source has a segment hook and sink has a
1719        // fold fast-path descriptor, drive the fold inline without spawning a task.
1720        let fast_result = self
1721            .split_hook
1722            .as_ref()
1723            .zip(sink.fold_fp.as_deref())
1724            .and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
1725        if let Some(result) = fast_result {
1726            return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
1727                StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
1728            });
1729        }
1730        self.to_mat(sink, Keep::right).run()
1731    }
1732
1733    pub fn run_with_materializer<SinkMat: Send + 'static>(
1734        self,
1735        sink: Sink<Out, SinkMat>,
1736        materializer: &Materializer,
1737    ) -> StreamResult<SinkMat> {
1738        self.to_mat(sink, Keep::right)
1739            .run_with_materializer(materializer)
1740    }
1741
1742    #[must_use]
1743    pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
1744    where
1745        SinkMat: Send + 'static,
1746    {
1747        self.to_mat(sink, Keep::left)
1748    }
1749
1750    #[must_use]
1751    pub fn to_mat<SinkMat, Combined, F>(
1752        self,
1753        sink: Sink<Out, SinkMat>,
1754        combine: F,
1755    ) -> RunnableGraph<Combined>
1756    where
1757        SinkMat: Send + 'static,
1758        Combined: Send + 'static,
1759        F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
1760    {
1761        let factory = self.factory;
1762        let terminal_factory = self.terminal_factory;
1763        let hints = self.hints;
1764        let combine = Arc::new(combine);
1765        RunnableGraph::from_runner(move |materializer| {
1766            if let Some(terminal_factory) = &terminal_factory
1767                && let Some(fold_fp) = sink.fold_fp.as_deref()
1768            {
1769                let (hook, source_mat) = terminal_factory(materializer)?;
1770                if hook.supports_direct_terminal()
1771                    && let Some(sink_mat) =
1772                        fold_fp.try_register_direct_terminal(Arc::clone(&hook), materializer)
1773                {
1774                    let sink_mat = sink_mat.and_then(|mat| {
1775                        mat.downcast::<SinkMat>().map(|boxed| *boxed).map_err(|_| {
1776                            StreamError::Failed(
1777                                "terminal fast path: unexpected mat type (internal error)".into(),
1778                            )
1779                        })
1780                    })?;
1781                    return Ok(combine(source_mat, sink_mat));
1782                }
1783                if !fold_fp.supports_terminal_drain() {
1784                    let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1785                    let sink_mat = sink.run_from_source(stream, materializer, hints.runtime())?;
1786                    return Ok(combine(source_mat, sink_mat));
1787                }
1788                let sink_mat = fold_fp
1789                    .try_register_terminal_drain(hook, materializer)
1790                    .expect("terminal drain support advertised")
1791                    .and_then(|mat| {
1792                        mat.downcast::<SinkMat>().map(|boxed| *boxed).map_err(|_| {
1793                            StreamError::Failed(
1794                                "terminal fast path: unexpected mat type (internal error)".into(),
1795                            )
1796                        })
1797                    })?;
1798                return Ok(combine(source_mat, sink_mat));
1799            }
1800            let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1801            let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
1802                let stream =
1803                    runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
1804                sink.run_inline(stream, materializer)?
1805            } else {
1806                sink.run_from_source(stream, materializer, hints.runtime())?
1807            };
1808            Ok(combine(source_mat, sink_mat))
1809        })
1810    }
1811
1812    pub fn run_collect(self) -> StreamResult<Vec<Out>> {
1813        self.run_with(Sink::collect())?.wait()
1814    }
1815
1816    #[must_use]
1817    pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
1818    where
1819        NextMat: Send + 'static,
1820        F: Fn(Mat) -> NextMat + Send + Sync + 'static,
1821    {
1822        let factory = self.factory;
1823        let terminal_factory = self.terminal_factory;
1824        let hints = self.hints;
1825        let f = Arc::new(f);
1826        let factory_f = Arc::clone(&f);
1827        let mapped_terminal_factory = terminal_factory.map(|terminal_factory| {
1828            let f = Arc::clone(&f);
1829            Arc::new(move |materializer: &Materializer| {
1830                let (hook, mat) = terminal_factory(materializer)?;
1831                Ok((hook, f(mat)))
1832            }) as Arc<TerminalSourceFactory<Out, NextMat>>
1833        });
1834        Source {
1835            factory: Arc::new(FnSourceFactory(move |materializer: &Materializer| {
1836                let (stream, mat) = Arc::clone(&factory).create(materializer)?;
1837                Ok((stream, factory_f(mat)))
1838            })),
1839            terminal_factory: mapped_terminal_factory,
1840            hints,
1841            attributes: Attributes::default(),
1842            split_hook: None,
1843        }
1844    }
1845}
1846
1847impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
1848    #[must_use]
1849    pub fn combine<Mat1, Mat2, MatRest, I>(
1850        first: Source<Out, Mat1>,
1851        second: Source<Out, Mat2>,
1852        rest: I,
1853        strategy: SourceCombineStrategy,
1854    ) -> Source<Out, NotUsed>
1855    where
1856        Mat1: Send + 'static,
1857        Mat2: Send + 'static,
1858        MatRest: Send + 'static,
1859        I: IntoIterator<Item = Source<Out, MatRest>>,
1860    {
1861        let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
1862            Arc::new(move |materializer| {
1863                Arc::clone(&first.factory)
1864                    .create(materializer)
1865                    .map(|(stream, _)| stream)
1866            }),
1867            Arc::new(move |materializer| {
1868                Arc::clone(&second.factory)
1869                    .create(materializer)
1870                    .map(|(stream, _)| stream)
1871            }),
1872        ];
1873        factories.extend(rest.into_iter().map(|source| {
1874            Arc::new(move |materializer: &Materializer| {
1875                Arc::clone(&source.factory)
1876                    .create(materializer)
1877                    .map(|(stream, _)| stream)
1878            }) as Arc<CombinedSourceFactory<Out>>
1879        }));
1880        Source::from_materialized_factory(move |materializer| {
1881            let mut streams = Vec::with_capacity(factories.len());
1882            for factory in &factories {
1883                let stream = match factory(materializer) {
1884                    Ok(stream) => stream,
1885                    Err(error) => {
1886                        return Ok((
1887                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1888                            NotUsed,
1889                        ));
1890                    }
1891                };
1892                streams.push(stream);
1893            }
1894            let stream = match &strategy {
1895                SourceCombineStrategy::Merge { eager_complete } => {
1896                    merge_streams(streams, *eager_complete)
1897                }
1898                SourceCombineStrategy::Concat => concat_source_streams(streams),
1899                SourceCombineStrategy::Prioritized {
1900                    priorities,
1901                    eager_complete,
1902                } => {
1903                    if priorities.len() != streams.len() {
1904                        return Err(StreamError::GraphValidation(format!(
1905                            "combine priorities length {} did not match source count {}",
1906                            priorities.len(),
1907                            streams.len()
1908                        )));
1909                    }
1910                    merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
1911                }
1912            };
1913            Ok((stream, NotUsed))
1914        })
1915    }
1916
1917    #[must_use]
1918    pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
1919    where
1920        I: IntoIterator<Item = Source<Out, Mat2>>,
1921        Mat2: Send + 'static,
1922        Out: Clone,
1923    {
1924        Self::zip_with_n(sources, |values| values)
1925    }
1926
1927    #[must_use]
1928    pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
1929    where
1930        I: IntoIterator<Item = Source<Out, Mat2>>,
1931        Mat2: Send + 'static,
1932        Next: Send + 'static,
1933        F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
1934    {
1935        let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
1936        let zipper = Arc::new(zipper);
1937        Source::from_materialized_factory(move |materializer| {
1938            let mut streams = Vec::with_capacity(factories.len());
1939            for factory in &factories {
1940                let stream = match Arc::clone(factory).create(materializer) {
1941                    Ok((stream, _)) => stream,
1942                    Err(error) => {
1943                        return Ok((
1944                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1945                            NotUsed,
1946                        ));
1947                    }
1948                };
1949                streams.push(stream);
1950            }
1951            Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
1952        })
1953    }
1954
1955    #[must_use]
1956    pub fn merge_prioritized_n<Mat2, I>(
1957        sources_and_priorities: I,
1958        eager_complete: bool,
1959    ) -> Source<Out, NotUsed>
1960    where
1961        I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
1962        Mat2: Send + 'static,
1963    {
1964        let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
1965        if sources_and_priorities.is_empty() {
1966            return Source::empty();
1967        }
1968        let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
1969            .into_iter()
1970            .map(|(source, priority)| (source.factory, priority))
1971            .unzip();
1972        Source::from_materialized_factory(move |materializer| {
1973            let mut streams = Vec::with_capacity(factories.len());
1974            for factory in &factories {
1975                let stream = match Arc::clone(factory).create(materializer) {
1976                    Ok((stream, _)) => stream,
1977                    Err(error) => {
1978                        return Ok((
1979                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1980                            NotUsed,
1981                        ));
1982                    }
1983                };
1984                streams.push(stream);
1985            }
1986            Ok((
1987                merge_prioritized_streams(streams, priorities.clone(), eager_complete),
1988                NotUsed,
1989            ))
1990        })
1991    }
1992
1993    #[must_use]
1994    pub fn maybe() -> (MaybeHandle<Out>, Self) {
1995        let value = Arc::new(Mutex::new(None));
1996        let handle = MaybeHandle {
1997            value: Arc::clone(&value),
1998        };
1999        let source = Self::from_factory(move || {
2000            let result = value
2001                .lock()
2002                .expect("maybe source poisoned")
2003                .clone()
2004                .unwrap_or(Err(StreamError::MaybeIncomplete));
2005            Box::new(std::iter::once(result))
2006        });
2007        (handle, source)
2008    }
2009
2010    #[must_use]
2011    pub fn single(item: Out) -> Self {
2012        Self::from_factory_with_hints(
2013            move || Box::new(std::iter::once(Ok(item.clone()))),
2014            SourceHints::with_inline_micro(1),
2015        )
2016    }
2017
2018    #[must_use]
2019    pub fn repeat(item: Out) -> Self {
2020        Self::from_factory(move || {
2021            let item = item.clone();
2022            Box::new(std::iter::repeat_with(move || Ok(item.clone())))
2023        })
2024    }
2025
2026    #[must_use]
2027    pub fn from_iterable<I>(items: I) -> Self
2028    where
2029        I: IntoIterator<Item = Out>,
2030    {
2031        items.into_iter().collect()
2032    }
2033}
2034
2035impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
2036    fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
2037        // Share the backing storage across materializations and clone elements
2038        // lazily on demand, instead of cloning the whole container each run.
2039        let items: Arc<[Out]> = iter.into_iter().collect();
2040        let len = items.len();
2041        Self::from_factory_with_hints(
2042            move || {
2043                let items = Arc::clone(&items);
2044                let mut index = 0;
2045                Box::new(std::iter::from_fn(move || {
2046                    let item = items.get(index)?.clone();
2047                    index += 1;
2048                    Some(Ok(item))
2049                }))
2050            },
2051            // Exact success item count: enables the coordinator inline drain path.
2052            SourceHints::with_inline_micro(len),
2053        )
2054    }
2055}
2056
2057/// Test-only helper: create a Source whose factory calls the given closure but
2058/// marks it as inline-micro eligible. Used to test the inline drain path with
2059/// custom (possibly blocking) iterators without broadening production
2060/// eligibility.
2061#[cfg(test)]
2062pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
2063    factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
2064    max_success_items: usize,
2065) -> Source<Out, NotUsed> {
2066    Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
2067}
2068
2069struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
2070where
2071    Create: Fn() -> StreamResult<Resource>,
2072    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2073    Close: Fn(Resource) -> StreamResult<()>,
2074{
2075    create: Arc<Create>,
2076    read: Arc<Read>,
2077    close: Arc<Close>,
2078    resource: Option<Resource>,
2079    created: bool,
2080    terminated: bool,
2081    _marker: PhantomData<fn() -> Out>,
2082}
2083
2084impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
2085where
2086    Create: Fn() -> StreamResult<Resource>,
2087    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2088    Close: Fn(Resource) -> StreamResult<()>,
2089{
2090    fn ensure_created(&mut self) -> StreamResult<()> {
2091        if self.created {
2092            return Ok(());
2093        }
2094        self.created = true;
2095        let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
2096            .and_then(|result| result)?;
2097        self.resource = Some(resource);
2098        Ok(())
2099    }
2100
2101    fn close_resource(&mut self) -> StreamResult<()> {
2102        match self.resource.take() {
2103            Some(resource) => {
2104                catch_unwind_failed("unfold_resource close", || (self.close)(resource))
2105                    .and_then(|result| result)
2106            }
2107            None => Ok(()),
2108        }
2109    }
2110}
2111
2112impl<Resource, Out, Create, Read, Close> Iterator
2113    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2114where
2115    Create: Fn() -> StreamResult<Resource>,
2116    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2117    Close: Fn(Resource) -> StreamResult<()>,
2118{
2119    type Item = StreamResult<Out>;
2120
2121    fn next(&mut self) -> Option<Self::Item> {
2122        if self.terminated {
2123            return None;
2124        }
2125        if let Err(error) = self.ensure_created() {
2126            self.terminated = true;
2127            return Some(Err(error));
2128        }
2129
2130        let result = {
2131            let resource = self
2132                .resource
2133                .as_mut()
2134                .expect("unfold_resource resource is open");
2135            catch_unwind_failed("unfold_resource read", || (self.read)(resource))
2136                .and_then(|result| result)
2137        };
2138
2139        match result {
2140            Ok(Some(item)) => Some(Ok(item)),
2141            Ok(None) => {
2142                self.terminated = true;
2143                match self.close_resource() {
2144                    Ok(()) => None,
2145                    Err(error) => Some(Err(error)),
2146                }
2147            }
2148            Err(read_error) => {
2149                self.terminated = true;
2150                let _ = self.close_resource();
2151                Some(Err(read_error))
2152            }
2153        }
2154    }
2155}
2156
2157impl<Resource, Out, Create, Read, Close> Drop
2158    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2159where
2160    Create: Fn() -> StreamResult<Resource>,
2161    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2162    Close: Fn(Resource) -> StreamResult<()>,
2163{
2164    fn drop(&mut self) {
2165        let _ = self.close_resource();
2166    }
2167}
2168
2169type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
2170    fn() -> (Out, CreateFut, ReadFut, CloseFut);
2171
2172struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2173where
2174    Resource: Send + 'static,
2175    Create: Fn() -> CreateFut,
2176    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2177    Read: Fn(&mut Resource) -> ReadFut,
2178    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2179    Close: Fn(Resource) -> CloseFut,
2180    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2181{
2182    create: Arc<Create>,
2183    read: Arc<Read>,
2184    close: Arc<Close>,
2185    resource: Option<Resource>,
2186    created: bool,
2187    terminated: bool,
2188    _marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
2189}
2190
2191impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2192    UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2193where
2194    Resource: Send + 'static,
2195    Create: Fn() -> CreateFut,
2196    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2197    Read: Fn(&mut Resource) -> ReadFut,
2198    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2199    Close: Fn(Resource) -> CloseFut,
2200    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2201{
2202    fn ensure_created(&mut self) -> StreamResult<()> {
2203        if self.created {
2204            return Ok(());
2205        }
2206        self.created = true;
2207        let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
2208            .and_then(flow::run_future_inline_or_spawn)?;
2209        self.resource = Some(resource);
2210        Ok(())
2211    }
2212
2213    fn close_resource(&mut self) -> StreamResult<()> {
2214        match self.resource.take() {
2215            Some(resource) => {
2216                catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
2217                    .and_then(flow::run_future_inline_or_spawn)
2218            }
2219            None => Ok(()),
2220        }
2221    }
2222}
2223
2224impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
2225    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2226where
2227    Resource: Send + 'static,
2228    Out: Send + 'static,
2229    Create: Fn() -> CreateFut,
2230    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2231    Read: Fn(&mut Resource) -> ReadFut,
2232    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2233    Close: Fn(Resource) -> CloseFut,
2234    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2235{
2236    type Item = StreamResult<Out>;
2237
2238    fn next(&mut self) -> Option<Self::Item> {
2239        if self.terminated {
2240            return None;
2241        }
2242        if let Err(error) = self.ensure_created() {
2243            self.terminated = true;
2244            return Some(Err(error));
2245        }
2246
2247        let result = {
2248            let resource = self
2249                .resource
2250                .as_mut()
2251                .expect("unfold_resource_async resource is open");
2252            catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
2253                .and_then(flow::run_future_inline_or_spawn)
2254        };
2255
2256        match result {
2257            Ok(Some(item)) => Some(Ok(item)),
2258            Ok(None) => {
2259                self.terminated = true;
2260                match self.close_resource() {
2261                    Ok(()) => None,
2262                    Err(error) => Some(Err(error)),
2263                }
2264            }
2265            Err(read_error) => {
2266                self.terminated = true;
2267                let _ = self.close_resource();
2268                Some(Err(read_error))
2269            }
2270        }
2271    }
2272}
2273
2274impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
2275    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2276where
2277    Resource: Send + 'static,
2278    Create: Fn() -> CreateFut,
2279    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2280    Read: Fn(&mut Resource) -> ReadFut,
2281    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2282    Close: Fn(Resource) -> CloseFut,
2283    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2284{
2285    fn drop(&mut self) {
2286        let _ = self.close_resource();
2287    }
2288}
2289
2290struct LazySourceStream<Out, InnerMat, F> {
2291    create: Arc<F>,
2292    materializer: Materializer,
2293    current: Option<BoxStream<Out>>,
2294    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2295    initialized: bool,
2296    terminated: bool,
2297}
2298
2299impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
2300where
2301    Out: Send + 'static,
2302    InnerMat: Send + 'static,
2303    F: Fn() -> Source<Out, InnerMat>,
2304{
2305    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2306        if let Some(sender) = self.mat_sender.take() {
2307            let _ = sender.send(result);
2308        }
2309    }
2310
2311    fn initialize(&mut self) -> StreamResult<()> {
2312        if self.initialized {
2313            return Ok(());
2314        }
2315        self.initialized = true;
2316        let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
2317            Ok(source) => source,
2318            Err(error) => {
2319                self.complete_mat(Err(error.clone()));
2320                return Err(error);
2321            }
2322        };
2323        match Arc::clone(&source.factory).create(&self.materializer) {
2324            Ok((stream, mat)) => {
2325                self.current = Some(stream);
2326                self.complete_mat(Ok(mat));
2327                Ok(())
2328            }
2329            Err(error) => {
2330                self.complete_mat(Err(error.clone()));
2331                Err(error)
2332            }
2333        }
2334    }
2335}
2336
2337impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
2338where
2339    Out: Send + 'static,
2340    InnerMat: Send + 'static,
2341    F: Fn() -> Source<Out, InnerMat>,
2342{
2343    type Item = StreamResult<Out>;
2344
2345    fn next(&mut self) -> Option<Self::Item> {
2346        if self.terminated {
2347            return None;
2348        }
2349        if let Err(error) = self.initialize() {
2350            self.terminated = true;
2351            return Some(Err(error));
2352        }
2353        match self
2354            .current
2355            .as_mut()
2356            .expect("lazy_source current stream initialized")
2357            .next()
2358        {
2359            Some(Ok(item)) => Some(Ok(item)),
2360            Some(Err(error)) => {
2361                self.terminated = true;
2362                Some(Err(error))
2363            }
2364            None => {
2365                self.terminated = true;
2366                None
2367            }
2368        }
2369    }
2370}
2371
2372impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
2373    fn drop(&mut self) {
2374        if !self.initialized
2375            && let Some(sender) = self.mat_sender.take()
2376        {
2377            let _ = sender.send(Err(StreamError::Failed(
2378                "lazy source was never materialized".into(),
2379            )));
2380        }
2381    }
2382}
2383
2384struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2385    create: Arc<F>,
2386    materializer: Materializer,
2387    current: Option<BoxStream<Out>>,
2388    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2389    initialized: bool,
2390    terminated: bool,
2391    _marker: PhantomData<fn() -> Fut>,
2392}
2393
2394impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
2395where
2396    Out: Send + 'static,
2397    InnerMat: Send + 'static,
2398    F: Fn() -> Fut,
2399    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2400{
2401    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2402        if let Some(sender) = self.mat_sender.take() {
2403            let _ = sender.send(result);
2404        }
2405    }
2406
2407    fn initialize(&mut self) -> StreamResult<()> {
2408        if self.initialized {
2409            return Ok(());
2410        }
2411        self.initialized = true;
2412        let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
2413            .and_then(flow::run_future_inline_or_spawn)
2414        {
2415            Ok(source) => source,
2416            Err(error) => {
2417                self.complete_mat(Err(error.clone()));
2418                return Err(error);
2419            }
2420        };
2421        match Arc::clone(&source.factory).create(&self.materializer) {
2422            Ok((stream, mat)) => {
2423                self.current = Some(stream);
2424                self.complete_mat(Ok(mat));
2425                Ok(())
2426            }
2427            Err(error) => {
2428                self.complete_mat(Err(error.clone()));
2429                Err(error)
2430            }
2431        }
2432    }
2433}
2434
2435impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
2436where
2437    Out: Send + 'static,
2438    InnerMat: Send + 'static,
2439    F: Fn() -> Fut,
2440    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2441{
2442    type Item = StreamResult<Out>;
2443
2444    fn next(&mut self) -> Option<Self::Item> {
2445        if self.terminated {
2446            return None;
2447        }
2448        if let Err(error) = self.initialize() {
2449            self.terminated = true;
2450            return Some(Err(error));
2451        }
2452        match self
2453            .current
2454            .as_mut()
2455            .expect("lazy_future_source current stream initialized")
2456            .next()
2457        {
2458            Some(Ok(item)) => Some(Ok(item)),
2459            Some(Err(error)) => {
2460                self.terminated = true;
2461                Some(Err(error))
2462            }
2463            None => {
2464                self.terminated = true;
2465                None
2466            }
2467        }
2468    }
2469}
2470
2471impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2472    fn drop(&mut self) {
2473        if !self.initialized
2474            && let Some(sender) = self.mat_sender.take()
2475        {
2476            let _ = sender.send(Err(StreamError::Failed(
2477                "lazy future source was never materialized".into(),
2478            )));
2479        }
2480    }
2481}
2482
2483fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
2484where
2485    Out: Send + 'static,
2486{
2487    let mut streams: VecDeque<_> = streams.into();
2488    let mut current = streams.pop_front();
2489    Box::new(std::iter::from_fn(move || {
2490        loop {
2491            match current.as_mut() {
2492                Some(stream) => match stream.next() {
2493                    Some(item) => return Some(item),
2494                    None => current = streams.pop_front(),
2495                },
2496                None => return None,
2497            }
2498        }
2499    }))
2500}
2501
2502fn concat_source_streams_lazy<Out, Mat>(
2503    initial: BoxStream<Out>,
2504    factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
2505    materializer: &Materializer,
2506) -> BoxStream<Out>
2507where
2508    Out: Send + 'static,
2509    Mat: Send + 'static,
2510{
2511    let mut current = Some(initial);
2512    let mut remaining: VecDeque<_> = factories.into();
2513    let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
2514    Box::new(std::iter::from_fn(move || {
2515        loop {
2516            match current.as_mut() {
2517                Some(stream) => match stream.next() {
2518                    Some(item) => return Some(item),
2519                    None => {
2520                        current = remaining.pop_front().map(|factory| {
2521                            match factory.create(&materializer) {
2522                                Ok((stream, _)) => stream,
2523                                Err(error) => {
2524                                    Box::new(std::iter::once(Err(error))) as BoxStream<Out>
2525                                }
2526                            }
2527                        });
2528                    }
2529                },
2530                None => return None,
2531            }
2532        }
2533    }))
2534}
2535
2536fn or_else_source_stream<Out>(
2537    mut primary: BoxStream<Out>,
2538    mut secondary: BoxStream<Out>,
2539) -> BoxStream<Out>
2540where
2541    Out: Send + 'static,
2542{
2543    let mut primary_emitted = false;
2544    let mut using_secondary = false;
2545    Box::new(std::iter::from_fn(move || {
2546        loop {
2547            if using_secondary {
2548                return secondary.next();
2549            }
2550
2551            match primary.next() {
2552                Some(Ok(item)) => {
2553                    primary_emitted = true;
2554                    return Some(Ok(item));
2555                }
2556                Some(Err(error)) => return Some(Err(error)),
2557                None if primary_emitted => return None,
2558                None => using_secondary = true,
2559            }
2560        }
2561    }))
2562}
2563
2564fn interleave_source_streams<Out>(
2565    streams: Vec<BoxStream<Out>>,
2566    segment_size: usize,
2567    eager_close: bool,
2568) -> BoxStream<Out>
2569where
2570    Out: Send + 'static,
2571{
2572    if segment_size == 0 {
2573        return Box::new(std::iter::once(Err(StreamError::GraphValidation(
2574            "interleave segment size must be greater than zero".into(),
2575        ))));
2576    }
2577
2578    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
2579    let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
2580    let mut current = 0usize;
2581    let mut emitted = 0usize;
2582    Box::new(std::iter::from_fn(move || {
2583        loop {
2584            if streams.iter().all(Option::is_none) {
2585                return None;
2586            }
2587            if streams[current].is_none() {
2588                match next_active_source_stream(&streams, current) {
2589                    Some(next) => {
2590                        current = next;
2591                        emitted = 0;
2592                    }
2593                    None => return None,
2594                }
2595            }
2596
2597            let Some(stream) = streams[current].as_mut() else {
2598                continue;
2599            };
2600            let next_item = pending[current].take().or_else(|| stream.next());
2601            match next_item {
2602                Some(Ok(item)) => {
2603                    emitted += 1;
2604                    if emitted == segment_size {
2605                        emitted = 0;
2606                        if let Some(next) = next_active_source_stream(&streams, current) {
2607                            current = next;
2608                        }
2609                    }
2610                    return Some(Ok(item));
2611                }
2612                Some(Err(error)) => return Some(Err(error)),
2613                None => {
2614                    streams[current] = None;
2615                    emitted = 0;
2616                    if eager_close {
2617                        return None;
2618                    }
2619                    match next_active_source_stream(&streams, current) {
2620                        Some(next) => current = next,
2621                        None => return None,
2622                    }
2623                }
2624            }
2625        }
2626    }))
2627}
2628
2629fn next_active_source_stream<Out>(
2630    streams: &[Option<BoxStream<Out>>],
2631    current: usize,
2632) -> Option<usize>
2633where
2634    Out: Send + 'static,
2635{
2636    if streams.is_empty() {
2637        return None;
2638    }
2639    for offset in 1..=streams.len() {
2640        let index = (current + offset) % streams.len();
2641        if streams[index].is_some() {
2642            return Some(index);
2643        }
2644    }
2645    None
2646}