Skip to main content

datum/stream/
source.rs

1use super::flow::{self, FlowTransform};
2use super::*;
3use crate::Attributes;
4use crate::context::SourceWithContext;
5
6#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
7pub struct NotUsed;
8
9type CombinedSourceFactory<Out> =
10    dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
11
12pub struct Keep;
13
14impl Keep {
15    pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
16        left
17    }
18
19    pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
20        right
21    }
22
23    pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
24        (left, right)
25    }
26
27    pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
28        NotUsed
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum SourceCombineStrategy {
34    Merge {
35        eager_complete: bool,
36    },
37    Concat,
38    Prioritized {
39        priorities: Vec<usize>,
40        eager_complete: bool,
41    },
42}
43
44#[derive(Clone)]
45pub struct MaybeHandle<T> {
46    value: Arc<Mutex<Option<StreamResult<T>>>>,
47}
48
49impl<T> MaybeHandle<T> {
50    #[must_use]
51    pub fn is_completed(&self) -> bool {
52        self.value.lock().expect("maybe handle poisoned").is_some()
53    }
54
55    pub fn complete(&self, item: T) -> StreamResult<()> {
56        self.settle(Ok(item))
57    }
58
59    pub fn fail(&self, error: StreamError) -> StreamResult<()> {
60        self.settle(Err(error))
61    }
62
63    fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
64        let mut value = self.value.lock().expect("maybe handle poisoned");
65        if value.is_some() {
66            return Err(StreamError::Failed("maybe source already completed".into()));
67        }
68        *value = Some(result);
69        Ok(())
70    }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub struct Demand(u64);
75
76impl Demand {
77    pub const ZERO: Self = Self(0);
78    pub const ONE: Self = Self(1);
79    pub const MAX: Self = Self(u64::MAX);
80
81    #[must_use]
82    pub const fn new(available: u64) -> Self {
83        Self(available)
84    }
85
86    #[must_use]
87    pub const fn available(self) -> u64 {
88        self.0
89    }
90
91    #[must_use]
92    pub const fn is_unbounded(self) -> bool {
93        self.0 == u64::MAX
94    }
95
96    #[must_use]
97    pub const fn is_empty(self) -> bool {
98        self.0 == 0
99    }
100
101    #[must_use]
102    pub const fn saturating_add(self, rhs: Self) -> Self {
103        Self(self.0.saturating_add(rhs.0))
104    }
105
106    pub fn consume_one(&mut self) -> bool {
107        match self.0 {
108            0 => false,
109            u64::MAX => true,
110            _ => {
111                self.0 -= 1;
112                true
113            }
114        }
115    }
116}
117
118pub trait PushOutlet<T>: Send {
119    fn push(&mut self, item: T) -> StreamResult<Demand>;
120
121    fn complete(&mut self) -> StreamResult<()> {
122        Ok(())
123    }
124
125    fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
126        Err(cause)
127    }
128}
129
130#[derive(Clone)]
131pub struct Source<Out, Mat = NotUsed> {
132    pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
133    pub(super) hints: SourceHints,
134    pub(super) attributes: Attributes,
135    /// Set only for split-segment sources in fast mode. `None` for all other sources.
136    /// Cleared by all composition operators (via/map/to_mat etc.).
137    pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
138}
139
140impl<Out: Send + 'static> Source<Out, NotUsed> {
141    pub(super) fn from_factory<F>(factory: F) -> Self
142    where
143        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
144    {
145        Self::from_factory_with_hints(factory, SourceHints::default())
146    }
147
148    fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
149    where
150        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
151    {
152        Self::from_materialized_factory_with_hints(
153            move |_materializer| Ok((factory(), NotUsed)),
154            hints,
155        )
156    }
157
158    #[must_use]
159    pub fn empty() -> Self {
160        Self::from_factory_with_hints(
161            || Box::new(std::iter::empty()),
162            SourceHints::with_inline_micro(0),
163        )
164    }
165
166    #[must_use]
167    pub fn never() -> Self {
168        Self::from_materialized_factory_with_hints(
169            move |materializer| {
170                let state = Arc::clone(&materializer.inner.state);
171                Ok((
172                    Box::new(std::iter::from_fn(move || {
173                        loop {
174                            if state.shutdown.load(Ordering::SeqCst) {
175                                return Some(Err(StreamError::AbruptTermination));
176                            }
177                            if super::runtime::current_stream_cancelled()
178                                .as_ref()
179                                .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
180                            {
181                                return Some(Err(StreamError::Cancelled));
182                            }
183                            std::thread::park_timeout(Duration::from_millis(1));
184                        }
185                    })),
186                    NotUsed,
187                ))
188            },
189            SourceHints::default(),
190        )
191    }
192
193    #[must_use]
194    pub fn failed(error: StreamError) -> Self {
195        Self::from_factory_with_hints(
196            move || Box::new(std::iter::once(Err(error.clone()))),
197            // max_success_items=0: failed() emits no Ok items before the Err.
198            SourceHints::with_inline_micro(0),
199        )
200    }
201
202    #[must_use]
203    pub fn future<F, Fut>(future: F) -> Self
204    where
205        F: Fn() -> Fut + Send + Sync + 'static,
206        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
207    {
208        let future = Arc::new(future);
209        Self::from_factory(move || {
210            let future = Arc::clone(&future);
211            let mut emitted = false;
212            Box::new(std::iter::from_fn(move || {
213                if emitted {
214                    return None;
215                }
216                emitted = true;
217                Some(
218                    catch_unwind_failed("source future factory", || future())
219                        .and_then(flow::run_future_inline_or_spawn),
220                )
221            }))
222        })
223    }
224
225    #[must_use]
226    pub fn future_source<F, Fut>(future: F) -> Self
227    where
228        F: Fn() -> Fut + Send + Sync + 'static,
229        Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
230    {
231        let future = Arc::new(future);
232        Self::from_materialized_factory(move |materializer| {
233            let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
234            let future = Arc::clone(&future);
235            let mut current = None::<BoxStream<Out>>;
236            let mut initialized = false;
237            let mut terminated = false;
238            Ok((
239                Box::new(std::iter::from_fn(move || {
240                    if terminated {
241                        return None;
242                    }
243
244                    loop {
245                        if let Some(stream) = current.as_mut() {
246                            match stream.next() {
247                                Some(item) => return Some(item),
248                                None => {
249                                    terminated = true;
250                                    return None;
251                                }
252                            }
253                        }
254
255                        if initialized {
256                            terminated = true;
257                            return None;
258                        }
259                        initialized = true;
260                        let source = match catch_unwind_failed("future_source factory", || future())
261                            .and_then(flow::run_future_inline_or_spawn)
262                        {
263                            Ok(source) => source,
264                            Err(error) => {
265                                terminated = true;
266                                return Some(Err(error));
267                            }
268                        };
269                        current = Some(match Arc::clone(&source.factory).create(&materializer) {
270                            Ok((stream, _)) => stream,
271                            Err(error) => {
272                                terminated = true;
273                                return Some(Err(error));
274                            }
275                        });
276                    }
277                })) as BoxStream<Out>,
278                NotUsed,
279            ))
280        })
281    }
282
283    #[must_use]
284    pub fn cycle<F, I>(factory: F) -> Self
285    where
286        F: Fn() -> I + Send + Sync + 'static,
287        I: IntoIterator<Item = Out>,
288        I::IntoIter: Send + 'static,
289    {
290        let factory = Arc::new(factory);
291        Self::from_factory(move || {
292            let factory = Arc::clone(&factory);
293            let mut current = None::<I::IntoIter>;
294            let mut terminated = false;
295            Box::new(std::iter::from_fn(move || {
296                if terminated {
297                    return None;
298                }
299
300                if let Some(iter) = current.as_mut()
301                    && let Some(item) = iter.next()
302                {
303                    return Some(Ok(item));
304                }
305
306                let mut next = match catch_unwind_failed("cycle factory", || factory()) {
307                    Ok(iterable) => iterable.into_iter(),
308                    Err(error) => {
309                        terminated = true;
310                        return Some(Err(error));
311                    }
312                };
313                match next.next() {
314                    Some(item) => {
315                        current = Some(next);
316                        Some(Ok(item))
317                    }
318                    None => {
319                        terminated = true;
320                        Some(Err(StreamError::Failed("empty iterator".into())))
321                    }
322                }
323            }))
324        })
325    }
326
327    #[must_use]
328    pub fn unfold<State, F>(initial: State, f: F) -> Self
329    where
330        State: Clone + Send + Sync + 'static,
331        F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
332    {
333        let f = Arc::new(f);
334        Self::from_factory(move || {
335            let f = Arc::clone(&f);
336            let mut state = Some(initial.clone());
337            let mut terminated = false;
338            Box::new(std::iter::from_fn(move || {
339                if terminated {
340                    return None;
341                }
342                let current = state.take().expect("unfold state present");
343                match catch_unwind_failed("unfold function", || f(current)) {
344                    Ok(Some((next_state, item))) => {
345                        state = Some(next_state);
346                        Some(Ok(item))
347                    }
348                    Ok(None) => {
349                        terminated = true;
350                        None
351                    }
352                    Err(error) => {
353                        terminated = true;
354                        Some(Err(error))
355                    }
356                }
357            }))
358        })
359    }
360
361    #[must_use]
362    pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
363    where
364        State: Clone + Send + Sync + 'static,
365        F: Fn(State) -> Fut + Send + Sync + 'static,
366        Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
367    {
368        let f = Arc::new(f);
369        Self::from_factory(move || {
370            let f = Arc::clone(&f);
371            let mut state = Some(initial.clone());
372            let mut terminated = false;
373            Box::new(std::iter::from_fn(move || {
374                if terminated {
375                    return None;
376                }
377                let current = state.take().expect("unfold_async state present");
378                match catch_unwind_failed("unfold_async factory", || f(current))
379                    .and_then(flow::run_future_inline_or_spawn)
380                {
381                    Ok(Some((next_state, item))) => {
382                        state = Some(next_state);
383                        Some(Ok(item))
384                    }
385                    Ok(None) => {
386                        terminated = true;
387                        None
388                    }
389                    Err(error) => {
390                        terminated = true;
391                        Some(Err(error))
392                    }
393                }
394            }))
395        })
396    }
397
398    #[must_use]
399    pub fn unfold_resource<Resource, Create, Read, Close>(
400        create: Create,
401        read: Read,
402        close: Close,
403    ) -> Self
404    where
405        Resource: Send + 'static,
406        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
407        Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
408        Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
409    {
410        let create = Arc::new(create);
411        let read = Arc::new(read);
412        let close = Arc::new(close);
413        Self::from_factory(move || {
414            Box::new(UnfoldResourceStream {
415                create: Arc::clone(&create),
416                read: Arc::clone(&read),
417                close: Arc::clone(&close),
418                resource: None,
419                created: false,
420                terminated: false,
421                _marker: PhantomData,
422            })
423        })
424    }
425
426    #[must_use]
427    pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
428        create: Create,
429        read: Read,
430        close: Close,
431    ) -> Self
432    where
433        Resource: Send + 'static,
434        Create: Fn() -> CreateFut + Send + Sync + 'static,
435        CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
436        Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
437        ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
438        Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
439        CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
440    {
441        let create = Arc::new(create);
442        let read = Arc::new(read);
443        let close = Arc::new(close);
444        Self::from_factory(move || {
445            Box::new(UnfoldResourceAsyncStream {
446                create: Arc::clone(&create),
447                read: Arc::clone(&read),
448                close: Arc::clone(&close),
449                resource: None,
450                created: false,
451                terminated: false,
452                _marker: PhantomData,
453            })
454        })
455    }
456
457    #[must_use]
458    pub fn lazy_single<F>(create: F) -> Self
459    where
460        F: Fn() -> Out + Send + Sync + 'static,
461    {
462        let create = Arc::new(create);
463        Self::from_factory(move || {
464            let create = Arc::clone(&create);
465            let mut emitted = false;
466            Box::new(std::iter::from_fn(move || {
467                if emitted {
468                    return None;
469                }
470                emitted = true;
471                Some(catch_unwind_failed("lazy_single factory", || create()))
472            }))
473        })
474    }
475
476    #[must_use]
477    pub fn lazy_future<F, Fut>(create: F) -> Self
478    where
479        F: Fn() -> Fut + Send + Sync + 'static,
480        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
481    {
482        let create = Arc::new(create);
483        Self::from_factory(move || {
484            let create = Arc::clone(&create);
485            let mut emitted = false;
486            Box::new(std::iter::from_fn(move || {
487                if emitted {
488                    return None;
489                }
490                emitted = true;
491                Some(
492                    catch_unwind_failed("lazy_future factory", || create())
493                        .and_then(flow::run_future_inline_or_spawn),
494                )
495            }))
496        })
497    }
498
499    #[must_use]
500    pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
501    where
502        InnerMat: Send + 'static,
503        F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
504    {
505        let create = Arc::new(create);
506        Source::from_materialized_factory(move |materializer| {
507            let (sender, receiver) = oneshot::channel();
508            Ok((
509                Box::new(LazySourceStream {
510                    create: Arc::clone(&create),
511                    materializer: materializer
512                        .with_name_prefix(materializer.name_prefix().to_owned()),
513                    current: None,
514                    mat_sender: Some(sender),
515                    initialized: false,
516                    terminated: false,
517                }) as BoxStream<Out>,
518                StreamCompletion::from_receiver(receiver, None),
519            ))
520        })
521    }
522
523    #[must_use]
524    pub fn lazy_future_source<InnerMat, F, Fut>(
525        create: F,
526    ) -> Source<Out, StreamCompletion<InnerMat>>
527    where
528        InnerMat: Send + 'static,
529        F: Fn() -> Fut + Send + Sync + 'static,
530        Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
531    {
532        let create = Arc::new(create);
533        Source::from_materialized_factory(move |materializer| {
534            let (sender, receiver) = oneshot::channel();
535            Ok((
536                Box::new(LazyFutureSourceStream {
537                    create: Arc::clone(&create),
538                    materializer: materializer
539                        .with_name_prefix(materializer.name_prefix().to_owned()),
540                    current: None,
541                    mat_sender: Some(sender),
542                    initialized: false,
543                    terminated: false,
544                    _marker: PhantomData,
545                }) as BoxStream<Out>,
546                StreamCompletion::from_receiver(receiver, None),
547            ))
548        })
549    }
550
551    #[must_use]
552    pub fn from_fn_iter<F, I>(factory: F) -> Self
553    where
554        F: Fn() -> I + Send + Sync + 'static,
555        I: IntoIterator<Item = Out>,
556        I::IntoIter: Send + 'static,
557    {
558        Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
559    }
560}
561
562impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
563    fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
564    where
565        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
566    {
567        Self {
568            factory: Arc::new(FnSourceFactory(factory)),
569            hints,
570            attributes: Attributes::default(),
571            split_hook: None,
572        }
573    }
574
575    pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
576    where
577        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
578    {
579        Self::from_materialized_factory_with_hints(factory, SourceHints::default())
580    }
581
582    pub(crate) fn from_terminal_batch_materialized_factory<F>(factory: F) -> Self
583    where
584        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
585    {
586        Self::from_materialized_factory_with_hints(
587            factory,
588            SourceHints::with_terminal_consumer_batch(),
589        )
590    }
591
592    #[must_use]
593    pub fn as_source_with_context<Ctx, F>(
594        self,
595        extract_context: F,
596    ) -> SourceWithContext<Out, Ctx, Mat>
597    where
598        Ctx: Send + 'static,
599        F: Fn(&Out) -> Ctx + Send + Sync + 'static,
600    {
601        SourceWithContext::from_source(self.map(move |item| {
602            let context = extract_context(&item);
603            (item, context)
604        }))
605    }
606
607    #[must_use]
608    pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
609    where
610        Next: Send + 'static,
611        FlowMat: Send + 'static,
612    {
613        self.via_mat(flow, Keep::left)
614    }
615
616    #[must_use]
617    pub fn via_mat<Next, FlowMat, Combined, F>(
618        self,
619        flow: Flow<Out, Next, FlowMat>,
620        combine: F,
621    ) -> Source<Next, Combined>
622    where
623        Next: Send + 'static,
624        FlowMat: Send + 'static,
625        Combined: Send + 'static,
626        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
627    {
628        let source = self.factory;
629        let transform = flow.transform;
630        let materialize_flow = flow.materialize;
631        let hints = self.hints.after_flow(flow.hints);
632        let combine = Arc::new(combine);
633        Source::from_materialized_factory_with_hints(
634            move |materializer| {
635                let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
636                let flow_mat = materialize_flow()?;
637                let stream = match &transform {
638                    FlowTransform::Pure(transform) => transform(stream),
639                    FlowTransform::Runtime(transform) => transform(stream, materializer)?,
640                };
641                Ok((stream, combine(source_mat, flow_mat)))
642            },
643            hints,
644        )
645    }
646
647    #[must_use]
648    pub fn via_mat_with<Next, FlowMat, Combined, F>(
649        self,
650        flow: Flow<Out, Next, FlowMat>,
651        combine: F,
652    ) -> Source<Next, Combined>
653    where
654        Next: Send + 'static,
655        FlowMat: Send + 'static,
656        Combined: Send + 'static,
657        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
658    {
659        self.via_mat(flow, combine)
660    }
661
662    #[must_use]
663    pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
664    where
665        Next: Send + 'static,
666        F: Fn(Out) -> Next + Send + Sync + 'static,
667    {
668        let hints = self.hints.without_inline_micro();
669        Source {
670            factory: Arc::new(MapSourceFactory {
671                source: self.factory,
672                stage: f,
673                _marker: PhantomData,
674            }),
675            hints,
676            attributes: self.attributes,
677            split_hook: None,
678        }
679    }
680
681    #[must_use]
682    pub fn attributes(&self) -> &Attributes {
683        &self.attributes
684    }
685
686    #[must_use]
687    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
688        self.attributes = attributes;
689        self
690    }
691
692    #[must_use]
693    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
694        self.attributes = self.attributes.and(attributes);
695        self
696    }
697
698    #[must_use]
699    pub fn named(self, name: impl Into<String>) -> Self {
700        self.add_attributes(Attributes::named(name))
701    }
702
703    #[must_use]
704    pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
705    where
706        Next: Send + 'static,
707        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
708    {
709        self.via(Flow::identity().map_result(f))
710    }
711
712    #[must_use]
713    pub fn map_result_with_supervision<Next, F>(
714        self,
715        f: F,
716        decider: SupervisionDecider,
717    ) -> Source<Next, Mat>
718    where
719        Next: Send + 'static,
720        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
721    {
722        self.via(Flow::identity().map_result_with_supervision(f, decider))
723    }
724
725    #[must_use]
726    pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
727    where
728        F: Fn(&Out) -> bool + Send + Sync + 'static,
729    {
730        self.via(Flow::identity().filter(predicate))
731    }
732
733    #[must_use]
734    pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
735    where
736        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
737    {
738        self.via(Flow::identity().filter_result(predicate))
739    }
740
741    #[must_use]
742    pub fn filter_result_with_supervision<F>(
743        self,
744        predicate: F,
745        decider: SupervisionDecider,
746    ) -> Source<Out, Mat>
747    where
748        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
749    {
750        self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
751    }
752
753    #[must_use]
754    pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
755    where
756        F: Fn(&Out) -> bool + Send + Sync + 'static,
757    {
758        self.via(Flow::identity().filter_not(predicate))
759    }
760
761    #[must_use]
762    pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
763    where
764        Next: Send + 'static,
765        F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
766    {
767        self.via(Flow::identity().filter_map(f))
768    }
769
770    #[must_use]
771    pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
772    where
773        Next: Send + 'static,
774        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
775    {
776        self.via(Flow::identity().filter_map_result(f))
777    }
778
779    #[must_use]
780    pub fn filter_map_result_with_supervision<Next, F>(
781        self,
782        f: F,
783        decider: SupervisionDecider,
784    ) -> Source<Next, Mat>
785    where
786        Next: Send + 'static,
787        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
788    {
789        self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
790    }
791
792    #[must_use]
793    pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
794    where
795        Next: Send + 'static,
796        F: Fn(Out) -> I + Send + Sync + 'static,
797        I: IntoIterator<Item = Next>,
798        I::IntoIter: Send + 'static,
799    {
800        self.via(Flow::identity().map_concat(f))
801    }
802
803    #[must_use]
804    pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
805    where
806        Next: Send + 'static,
807        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
808        I: IntoIterator<Item = Next>,
809        I::IntoIter: Send + 'static,
810    {
811        self.via(Flow::identity().map_concat_result(f))
812    }
813
814    #[must_use]
815    pub fn map_concat_result_with_supervision<Next, F, I>(
816        self,
817        f: F,
818        decider: SupervisionDecider,
819    ) -> Source<Next, Mat>
820    where
821        Next: Send + 'static,
822        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
823        I: IntoIterator<Item = Next>,
824        I::IntoIter: Send + 'static,
825    {
826        self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
827    }
828
829    #[must_use]
830    pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
831    where
832        State: Clone + Send + Sync + 'static,
833        Next: Send + 'static,
834        F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
835    {
836        self.via(Flow::identity().stateful_map(seed, f))
837    }
838
839    #[must_use]
840    pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
841    where
842        State: Clone + Send + Sync + 'static,
843        Next: Send + 'static,
844        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
845    {
846        self.via(Flow::identity().stateful_map_result(seed, f))
847    }
848
849    #[must_use]
850    pub fn stateful_map_result_with_supervision<State, Next, F>(
851        self,
852        seed: State,
853        f: F,
854        decider: SupervisionDecider,
855    ) -> Source<Next, Mat>
856    where
857        State: Clone + Send + Sync + 'static,
858        Next: Send + 'static,
859        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
860    {
861        self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
862    }
863
864    #[must_use]
865    pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
866    where
867        State: Clone + Send + Sync + 'static,
868        Next: Send + 'static,
869        F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
870        I: IntoIterator<Item = Next>,
871        I::IntoIter: Send + 'static,
872    {
873        self.via(Flow::identity().stateful_map_concat(seed, f))
874    }
875
876    #[must_use]
877    pub fn stateful_map_concat_result<State, Next, F, I>(
878        self,
879        seed: State,
880        f: F,
881    ) -> Source<Next, Mat>
882    where
883        State: Clone + Send + Sync + 'static,
884        Next: Send + 'static,
885        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
886        I: IntoIterator<Item = Next>,
887        I::IntoIter: Send + 'static,
888    {
889        self.via(Flow::identity().stateful_map_concat_result(seed, f))
890    }
891
892    #[must_use]
893    pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
894        self,
895        seed: State,
896        f: F,
897        decider: SupervisionDecider,
898    ) -> Source<Next, Mat>
899    where
900        State: Clone + Send + Sync + 'static,
901        Next: Send + 'static,
902        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
903        I: IntoIterator<Item = Next>,
904        I::IntoIter: Send + 'static,
905    {
906        self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
907    }
908
909    #[must_use]
910    pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
911    where
912        Next: Send + 'static,
913        F: Fn(Out) -> Fut + Send + Sync + 'static,
914        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
915    {
916        self.via(Flow::identity().map_async(parallelism, f))
917    }
918
919    #[must_use]
920    pub fn map_async_with_supervision<Next, F, Fut>(
921        self,
922        parallelism: usize,
923        f: F,
924        decider: SupervisionDecider,
925    ) -> Source<Next, Mat>
926    where
927        Next: Send + 'static,
928        F: Fn(Out) -> Fut + Send + Sync + 'static,
929        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
930    {
931        self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
932    }
933
934    #[must_use]
935    pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
936    where
937        Next: Send + 'static,
938        F: Fn(Out) -> Fut + Send + Sync + 'static,
939        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
940    {
941        self.via(Flow::identity().map_async_unordered(parallelism, f))
942    }
943
944    #[must_use]
945    pub fn map_async_unordered_with_supervision<Next, F, Fut>(
946        self,
947        parallelism: usize,
948        f: F,
949        decider: SupervisionDecider,
950    ) -> Source<Next, Mat>
951    where
952        Next: Send + 'static,
953        F: Fn(Out) -> Fut + Send + Sync + 'static,
954        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
955    {
956        self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
957    }
958
959    #[must_use]
960    pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
961        self,
962        parallelism: usize,
963        per_partition: usize,
964        partition: Partition,
965        f: F,
966    ) -> Source<Next, Mat>
967    where
968        Key: Clone + Eq + Hash + Send + 'static,
969        Next: Send + 'static,
970        Partition: Fn(&Out) -> Key + Send + Sync + 'static,
971        F: Fn(Out) -> Fut + Send + Sync + 'static,
972        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
973    {
974        self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
975    }
976
977    #[must_use]
978    pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
979        self.via(Flow::identity().prefix_and_tail(n))
980    }
981
982    #[must_use]
983    pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
984    where
985        Next: Send + 'static,
986        FlowMat: Send + 'static,
987        F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
988        Out: Clone,
989    {
990        self.via(Flow::identity().flat_map_prefix(n, f))
991    }
992
993    #[must_use]
994    pub fn group_by<Key, F>(
995        self,
996        max_substreams: usize,
997        f: F,
998        allow_closed_substream_recreation: bool,
999    ) -> Source<Source<Out>, Mat>
1000    where
1001        Key: Clone + Eq + Hash + Send + 'static,
1002        F: Fn(&Out) -> Key + Send + Sync + 'static,
1003        Out: Clone,
1004    {
1005        let batch_mode = if self.hints.inline_micro.is_some() && !allow_closed_substream_recreation
1006        {
1007            flow::GroupByBatchMode::FiniteEagerNoRecreate
1008        } else {
1009            flow::GroupByBatchMode::Immediate
1010        };
1011        self.via(Flow::identity().group_by_with_batching(
1012            max_substreams,
1013            f,
1014            allow_closed_substream_recreation,
1015            batch_mode,
1016        ))
1017    }
1018
1019    #[must_use]
1020    pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1021    where
1022        F: Fn(&Out) -> bool + Send + Sync + 'static,
1023        Out: Clone,
1024    {
1025        self.via(Flow::identity().split_when(predicate))
1026    }
1027
1028    #[must_use]
1029    pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1030    where
1031        F: Fn(&Out) -> bool + Send + Sync + 'static,
1032        Out: Clone,
1033    {
1034        self.via(Flow::identity().split_after(predicate))
1035    }
1036
1037    #[must_use]
1038    pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
1039    where
1040        Next: Send + 'static,
1041        NextMat: Send + 'static,
1042        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1043    {
1044        self.via(Flow::identity().flat_map_concat(f))
1045    }
1046
1047    #[must_use]
1048    pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
1049    where
1050        Next: Send + 'static,
1051        NextMat: Send + 'static,
1052        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1053    {
1054        self.via(Flow::identity().flat_map_merge(breadth, f))
1055    }
1056
1057    #[must_use]
1058    pub fn take(self, n: usize) -> Source<Out, Mat> {
1059        self.via(Flow::identity().take(n))
1060    }
1061
1062    #[must_use]
1063    pub fn drop(self, n: usize) -> Source<Out, Mat> {
1064        self.via(Flow::identity().drop(n))
1065    }
1066
1067    #[must_use]
1068    pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
1069    where
1070        F: Fn(&Out) -> bool + Send + Sync + 'static,
1071    {
1072        self.via(Flow::identity().take_while(predicate))
1073    }
1074
1075    #[must_use]
1076    pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
1077    where
1078        F: Fn(&Out) -> bool + Send + Sync + 'static,
1079    {
1080        self.via(Flow::identity().drop_while(predicate))
1081    }
1082
1083    #[must_use]
1084    pub fn limit(self, max: u64) -> Source<Out, Mat> {
1085        self.via(Flow::identity().limit(max))
1086    }
1087
1088    #[must_use]
1089    pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
1090        self.via(Flow::identity().grouped(size))
1091    }
1092
1093    #[must_use]
1094    pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1095    where
1096        State: Clone + Send + Sync + 'static,
1097        F: Fn(State, Out) -> State + Send + Sync + 'static,
1098    {
1099        self.via(Flow::identity().scan(seed, f))
1100    }
1101
1102    #[must_use]
1103    pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
1104    where
1105        State: Clone + Send + Sync + 'static,
1106        F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1107        Fut: Future<Output = StreamResult<State>> + Send + 'static,
1108    {
1109        self.via(Flow::identity().scan_async(seed, f))
1110    }
1111
1112    #[must_use]
1113    pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1114    where
1115        State: Clone + Send + Sync + 'static,
1116        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1117    {
1118        self.via(Flow::identity().scan_result(seed, f))
1119    }
1120
1121    #[must_use]
1122    pub fn scan_result_with_supervision<State, F>(
1123        self,
1124        seed: State,
1125        f: F,
1126        decider: SupervisionDecider,
1127    ) -> Source<State, Mat>
1128    where
1129        State: Clone + Send + Sync + 'static,
1130        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1131    {
1132        self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
1133    }
1134
1135    #[must_use]
1136    pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
1137    where
1138        Out: Clone,
1139    {
1140        self.via(Flow::identity().sliding(size, step))
1141    }
1142
1143    #[must_use]
1144    pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1145    where
1146        Acc: Clone + Send + Sync + 'static,
1147        F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1148    {
1149        self.via(Flow::identity().fold(zero, f))
1150    }
1151
1152    #[must_use]
1153    pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1154    where
1155        Acc: Clone + Send + Sync + 'static,
1156        F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1157        Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1158    {
1159        self.via(Flow::identity().fold_async(zero, f))
1160    }
1161
1162    #[must_use]
1163    pub fn map_with_resource<Resource, Next, Create, F, Close>(
1164        self,
1165        create: Create,
1166        f: F,
1167        close: Close,
1168    ) -> Source<Next, Mat>
1169    where
1170        Resource: Send + 'static,
1171        Next: Send + 'static,
1172        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1173        F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1174        Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1175    {
1176        self.via(Flow::identity().map_with_resource(create, f, close))
1177    }
1178
1179    #[must_use]
1180    pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1181    where
1182        Acc: Clone + Send + Sync + 'static,
1183        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1184    {
1185        self.via(Flow::identity().fold_result(zero, f))
1186    }
1187
1188    #[must_use]
1189    pub fn fold_result_with_supervision<Acc, F>(
1190        self,
1191        zero: Acc,
1192        f: F,
1193        decider: SupervisionDecider,
1194    ) -> Source<Acc, Mat>
1195    where
1196        Acc: Clone + Send + Sync + 'static,
1197        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1198    {
1199        self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
1200    }
1201
1202    #[must_use]
1203    pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
1204    where
1205        F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1206    {
1207        self.via(Flow::identity().reduce(f))
1208    }
1209
1210    #[must_use]
1211    pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
1212    where
1213        Out: Clone,
1214        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1215    {
1216        self.via(Flow::identity().reduce_result(f))
1217    }
1218
1219    #[must_use]
1220    pub fn reduce_result_with_supervision<F>(
1221        self,
1222        f: F,
1223        decider: SupervisionDecider,
1224    ) -> Source<Out, Mat>
1225    where
1226        Out: Clone,
1227        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1228    {
1229        self.via(Flow::identity().reduce_result_with_supervision(f, decider))
1230    }
1231
1232    #[must_use]
1233    pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
1234    where
1235        F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
1236    {
1237        self.via(Flow::identity().map_error(f))
1238    }
1239
1240    #[must_use]
1241    pub fn recover<F>(self, f: F) -> Source<Out, Mat>
1242    where
1243        F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
1244    {
1245        self.via(Flow::identity().recover(f))
1246    }
1247
1248    #[must_use]
1249    pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
1250    where
1251        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1252    {
1253        self.via(Flow::identity().recover_with(f))
1254    }
1255
1256    #[must_use]
1257    pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
1258    where
1259        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1260    {
1261        self.via(Flow::identity().recover_with_retries(retries, f))
1262    }
1263
1264    #[must_use]
1265    pub fn on_error_complete(self) -> Source<Out, Mat> {
1266        self.via(Flow::identity().on_error_complete())
1267    }
1268
1269    #[must_use]
1270    pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1271    where
1272        Mat2: Send + 'static,
1273    {
1274        let factory = self.factory;
1275        let hints = self.hints;
1276        let that_factory = that.factory;
1277        Source::from_materialized_factory_with_hints(
1278            move |materializer| {
1279                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1280                let secondary = match Arc::clone(&that_factory).create(materializer) {
1281                    Ok((stream, _)) => stream,
1282                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1283                };
1284                Ok((concat_source_streams(vec![primary, secondary]), mat))
1285            },
1286            hints,
1287        )
1288    }
1289
1290    #[must_use]
1291    pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1292    where
1293        Mat2: Send + 'static,
1294    {
1295        let factory = self.factory;
1296        let hints = self.hints;
1297        let that_factory = that.factory;
1298        Source::from_materialized_factory_with_hints(
1299            move |materializer| {
1300                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1301                Ok((
1302                    concat_source_streams_lazy(
1303                        primary,
1304                        vec![Arc::clone(&that_factory)],
1305                        materializer,
1306                    ),
1307                    mat,
1308                ))
1309            },
1310            hints,
1311        )
1312    }
1313
1314    #[must_use]
1315    pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
1316    where
1317        Mat2: Send + 'static,
1318        I: IntoIterator<Item = Source<Out, Mat2>>,
1319    {
1320        let factory = self.factory;
1321        let hints = self.hints;
1322        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1323        Source::from_materialized_factory_with_hints(
1324            move |materializer| {
1325                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1326                Ok((
1327                    concat_source_streams_lazy(primary, other_factories.clone(), materializer),
1328                    mat,
1329                ))
1330            },
1331            hints,
1332        )
1333    }
1334
1335    #[must_use]
1336    pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1337    where
1338        Mat2: Send + 'static,
1339    {
1340        let factory = self.factory;
1341        let hints = self.hints;
1342        let that_factory = that.factory;
1343        Source::from_materialized_factory_with_hints(
1344            move |materializer| {
1345                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1346                let secondary = match Arc::clone(&that_factory).create(materializer) {
1347                    Ok((stream, _)) => stream,
1348                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1349                };
1350                Ok((concat_source_streams(vec![secondary, primary]), mat))
1351            },
1352            hints,
1353        )
1354    }
1355
1356    #[must_use]
1357    pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1358    where
1359        Mat2: Send + 'static,
1360    {
1361        self.prepend(that)
1362    }
1363
1364    #[must_use]
1365    pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
1366    where
1367        Mat2: Send + 'static,
1368    {
1369        let factory = self.factory;
1370        let hints = self.hints;
1371        let secondary_factory = secondary.factory;
1372        Source::from_materialized_factory_with_hints(
1373            move |materializer| {
1374                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1375                let secondary = match Arc::clone(&secondary_factory).create(materializer) {
1376                    Ok((stream, _)) => stream,
1377                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1378                };
1379                Ok((or_else_source_stream(primary, secondary), mat))
1380            },
1381            hints,
1382        )
1383    }
1384
1385    #[must_use]
1386    pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
1387    where
1388        Mat2: Send + 'static,
1389    {
1390        self.interleave_all([that], segment_size, false)
1391    }
1392
1393    #[must_use]
1394    pub fn interleave_all<Mat2, I>(
1395        self,
1396        those: I,
1397        segment_size: usize,
1398        eager_close: bool,
1399    ) -> Source<Out, Mat>
1400    where
1401        Mat2: Send + 'static,
1402        I: IntoIterator<Item = Source<Out, Mat2>>,
1403    {
1404        let factory = self.factory;
1405        let hints = self.hints;
1406        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1407        Source::from_materialized_factory_with_hints(
1408            move |materializer| {
1409                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1410                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1411                streams.push(primary);
1412                for other in &other_factories {
1413                    let stream = match Arc::clone(other).create(materializer) {
1414                        Ok((stream, _)) => stream,
1415                        Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1416                    };
1417                    streams.push(stream);
1418                }
1419                Ok((
1420                    interleave_source_streams(streams, segment_size, eager_close),
1421                    mat,
1422                ))
1423            },
1424            hints,
1425        )
1426    }
1427
1428    #[must_use]
1429    pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1430    where
1431        Out: Ord,
1432        Mat2: Send + 'static,
1433    {
1434        self.via(Flow::identity().merge_sorted(that))
1435    }
1436
1437    #[must_use]
1438    pub fn merge_latest<Mat2>(
1439        self,
1440        that: Source<Out, Mat2>,
1441        eager_complete: bool,
1442    ) -> Source<Vec<Out>, Mat>
1443    where
1444        Out: Clone,
1445        Mat2: Send + 'static,
1446    {
1447        let factory = self.factory;
1448        let hints = self.hints;
1449        let that_factory = that.factory;
1450        Source::from_materialized_factory_with_hints(
1451            move |materializer| {
1452                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1453                let right = match Arc::clone(&that_factory).create(materializer) {
1454                    Ok((stream, _)) => stream,
1455                    Err(error) => {
1456                        return Ok((
1457                            Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
1458                            mat,
1459                        ));
1460                    }
1461                };
1462                Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
1463            },
1464            hints,
1465        )
1466    }
1467
1468    #[must_use]
1469    pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
1470    where
1471        Mat2: Send + 'static,
1472        I: IntoIterator<Item = Source<Out, Mat2>>,
1473    {
1474        let factory = self.factory;
1475        let hints = self.hints;
1476        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1477        Source::from_materialized_factory_with_hints(
1478            move |materializer| {
1479                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1480                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1481                streams.push(primary);
1482                for other in &other_factories {
1483                    let stream = match Arc::clone(other).create(materializer) {
1484                        Ok((stream, _)) => stream,
1485                        Err(error) => {
1486                            return Ok((
1487                                Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1488                                mat,
1489                            ));
1490                        }
1491                    };
1492                    streams.push(stream);
1493                }
1494                Ok((merge_streams(streams, eager_complete), mat))
1495            },
1496            hints,
1497        )
1498    }
1499
1500    #[must_use]
1501    pub fn zip_with<Mat2, Out2, Next, F>(
1502        self,
1503        that: Source<Out2, Mat2>,
1504        combine: F,
1505    ) -> Source<Next, Mat>
1506    where
1507        Out2: Send + 'static,
1508        Next: Send + 'static,
1509        Mat2: Send + 'static,
1510        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1511    {
1512        self.via(Flow::identity().zip_with(that, combine))
1513    }
1514
1515    #[must_use]
1516    pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
1517    where
1518        Out: Clone,
1519        Out2: Clone + Send + 'static,
1520        Mat2: Send + 'static,
1521    {
1522        self.zip_latest_with(that, true, |left, right| (left, right))
1523    }
1524
1525    #[must_use]
1526    pub fn zip_latest_with<Mat2, Out2, Next, F>(
1527        self,
1528        that: Source<Out2, Mat2>,
1529        eager_complete: bool,
1530        combine: F,
1531    ) -> Source<Next, Mat>
1532    where
1533        Out: Clone,
1534        Out2: Clone + Send + 'static,
1535        Next: Send + 'static,
1536        Mat2: Send + 'static,
1537        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1538    {
1539        let factory = self.factory;
1540        let hints = self.hints;
1541        let that_factory = that.factory;
1542        let combine = Arc::new(combine);
1543        Source::from_materialized_factory_with_hints(
1544            move |materializer| {
1545                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1546                let right = match Arc::clone(&that_factory).create(materializer) {
1547                    Ok((stream, _)) => stream,
1548                    Err(error) => {
1549                        return Ok((
1550                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1551                            mat,
1552                        ));
1553                    }
1554                };
1555                Ok((
1556                    zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
1557                    mat,
1558                ))
1559            },
1560            hints,
1561        )
1562    }
1563
1564    #[must_use]
1565    pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
1566        let factory = self.factory;
1567        let hints = self.hints;
1568        Source::from_materialized_factory_with_hints(
1569            move |materializer| {
1570                let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
1571                let mut index = 0_u64;
1572                Ok((
1573                    Box::new(std::iter::from_fn(move || {
1574                        stream.next().map(|item| {
1575                            item.map(|value| {
1576                                let pair = (value, index);
1577                                index = index.wrapping_add(1);
1578                                pair
1579                            })
1580                        })
1581                    })) as BoxStream<(Out, u64)>,
1582                    mat,
1583                ))
1584            },
1585            hints,
1586        )
1587    }
1588
1589    #[must_use]
1590    pub fn zip_all<Mat2, Out2>(
1591        self,
1592        that: Source<Out2, Mat2>,
1593        this_elem: Out,
1594        that_elem: Out2,
1595    ) -> Source<(Out, Out2), Mat>
1596    where
1597        Out: Clone + Sync,
1598        Out2: Clone + Send + Sync + 'static,
1599        Mat2: Send + 'static,
1600    {
1601        let factory = self.factory;
1602        let hints = self.hints;
1603        let that_factory = that.factory;
1604        Source::from_materialized_factory_with_hints(
1605            move |materializer| {
1606                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1607                let right = match Arc::clone(&that_factory).create(materializer) {
1608                    Ok((stream, _)) => stream,
1609                    Err(error) => {
1610                        return Ok((
1611                            Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
1612                            mat,
1613                        ));
1614                    }
1615                };
1616                Ok((
1617                    zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
1618                    mat,
1619                ))
1620            },
1621            hints,
1622        )
1623    }
1624
1625    #[must_use]
1626    pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1627    where
1628        Out: Clone,
1629        SinkMat: Send + 'static,
1630    {
1631        self.via(Flow::identity().also_to(sink))
1632    }
1633
1634    #[must_use]
1635    pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
1636    where
1637        Out: Clone,
1638        SinkMat: Send + 'static,
1639        I: IntoIterator<Item = Sink<Out, SinkMat>>,
1640    {
1641        self.via(Flow::identity().also_to_all(sinks))
1642    }
1643
1644    #[must_use]
1645    pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
1646    where
1647        SinkMat: Send + 'static,
1648        F: Fn(&Out) -> bool + Send + Sync + 'static,
1649    {
1650        self.via(Flow::identity().divert_to(sink, predicate))
1651    }
1652
1653    #[must_use]
1654    pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1655    where
1656        Out: Clone,
1657        SinkMat: Send + 'static,
1658    {
1659        self.via(Flow::identity().wire_tap(sink))
1660    }
1661
1662    pub fn run_with<SinkMat: Send + 'static>(
1663        self,
1664        sink: Sink<Out, SinkMat>,
1665    ) -> StreamResult<SinkMat> {
1666        // Split-segment fast path: if source has a segment hook and sink has a
1667        // fold fast-path descriptor, drive the fold inline without spawning a task.
1668        let fast_result = self
1669            .split_hook
1670            .as_ref()
1671            .zip(sink.fold_fp.as_deref())
1672            .and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
1673        if let Some(result) = fast_result {
1674            return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
1675                StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
1676            });
1677        }
1678        self.to_mat(sink, Keep::right).run()
1679    }
1680
1681    pub fn run_with_materializer<SinkMat: Send + 'static>(
1682        self,
1683        sink: Sink<Out, SinkMat>,
1684        materializer: &Materializer,
1685    ) -> StreamResult<SinkMat> {
1686        self.to_mat(sink, Keep::right)
1687            .run_with_materializer(materializer)
1688    }
1689
1690    #[must_use]
1691    pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
1692    where
1693        SinkMat: Send + 'static,
1694    {
1695        self.to_mat(sink, Keep::left)
1696    }
1697
1698    #[must_use]
1699    pub fn to_mat<SinkMat, Combined, F>(
1700        self,
1701        sink: Sink<Out, SinkMat>,
1702        combine: F,
1703    ) -> RunnableGraph<Combined>
1704    where
1705        SinkMat: Send + 'static,
1706        Combined: Send + 'static,
1707        F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
1708    {
1709        let factory = self.factory;
1710        let hints = self.hints;
1711        let combine = Arc::new(combine);
1712        RunnableGraph::from_runner(move |materializer| {
1713            let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1714            let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
1715                let stream =
1716                    runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
1717                sink.run_inline(stream, materializer)?
1718            } else {
1719                sink.run_from_source(stream, materializer, hints.runtime())?
1720            };
1721            Ok(combine(source_mat, sink_mat))
1722        })
1723    }
1724
1725    pub fn run_collect(self) -> StreamResult<Vec<Out>> {
1726        self.run_with(Sink::collect())?.wait()
1727    }
1728
1729    #[must_use]
1730    pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
1731    where
1732        NextMat: Send + 'static,
1733        F: Fn(Mat) -> NextMat + Send + Sync + 'static,
1734    {
1735        let factory = self.factory;
1736        let hints = self.hints;
1737        let f = Arc::new(f);
1738        Source::from_materialized_factory_with_hints(
1739            move |materializer| {
1740                let (stream, mat) = Arc::clone(&factory).create(materializer)?;
1741                Ok((stream, f(mat)))
1742            },
1743            hints,
1744        )
1745    }
1746}
1747
1748impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
1749    #[must_use]
1750    pub fn combine<Mat1, Mat2, MatRest, I>(
1751        first: Source<Out, Mat1>,
1752        second: Source<Out, Mat2>,
1753        rest: I,
1754        strategy: SourceCombineStrategy,
1755    ) -> Source<Out, NotUsed>
1756    where
1757        Mat1: Send + 'static,
1758        Mat2: Send + 'static,
1759        MatRest: Send + 'static,
1760        I: IntoIterator<Item = Source<Out, MatRest>>,
1761    {
1762        let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
1763            Arc::new(move |materializer| {
1764                Arc::clone(&first.factory)
1765                    .create(materializer)
1766                    .map(|(stream, _)| stream)
1767            }),
1768            Arc::new(move |materializer| {
1769                Arc::clone(&second.factory)
1770                    .create(materializer)
1771                    .map(|(stream, _)| stream)
1772            }),
1773        ];
1774        factories.extend(rest.into_iter().map(|source| {
1775            Arc::new(move |materializer: &Materializer| {
1776                Arc::clone(&source.factory)
1777                    .create(materializer)
1778                    .map(|(stream, _)| stream)
1779            }) as Arc<CombinedSourceFactory<Out>>
1780        }));
1781        Source::from_materialized_factory(move |materializer| {
1782            let mut streams = Vec::with_capacity(factories.len());
1783            for factory in &factories {
1784                let stream = match factory(materializer) {
1785                    Ok(stream) => stream,
1786                    Err(error) => {
1787                        return Ok((
1788                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1789                            NotUsed,
1790                        ));
1791                    }
1792                };
1793                streams.push(stream);
1794            }
1795            let stream = match &strategy {
1796                SourceCombineStrategy::Merge { eager_complete } => {
1797                    merge_streams(streams, *eager_complete)
1798                }
1799                SourceCombineStrategy::Concat => concat_source_streams(streams),
1800                SourceCombineStrategy::Prioritized {
1801                    priorities,
1802                    eager_complete,
1803                } => {
1804                    if priorities.len() != streams.len() {
1805                        return Err(StreamError::GraphValidation(format!(
1806                            "combine priorities length {} did not match source count {}",
1807                            priorities.len(),
1808                            streams.len()
1809                        )));
1810                    }
1811                    merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
1812                }
1813            };
1814            Ok((stream, NotUsed))
1815        })
1816    }
1817
1818    #[must_use]
1819    pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
1820    where
1821        I: IntoIterator<Item = Source<Out, Mat2>>,
1822        Mat2: Send + 'static,
1823        Out: Clone,
1824    {
1825        Self::zip_with_n(sources, |values| values)
1826    }
1827
1828    #[must_use]
1829    pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
1830    where
1831        I: IntoIterator<Item = Source<Out, Mat2>>,
1832        Mat2: Send + 'static,
1833        Next: Send + 'static,
1834        F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
1835    {
1836        let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
1837        let zipper = Arc::new(zipper);
1838        Source::from_materialized_factory(move |materializer| {
1839            let mut streams = Vec::with_capacity(factories.len());
1840            for factory in &factories {
1841                let stream = match Arc::clone(factory).create(materializer) {
1842                    Ok((stream, _)) => stream,
1843                    Err(error) => {
1844                        return Ok((
1845                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1846                            NotUsed,
1847                        ));
1848                    }
1849                };
1850                streams.push(stream);
1851            }
1852            Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
1853        })
1854    }
1855
1856    #[must_use]
1857    pub fn merge_prioritized_n<Mat2, I>(
1858        sources_and_priorities: I,
1859        eager_complete: bool,
1860    ) -> Source<Out, NotUsed>
1861    where
1862        I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
1863        Mat2: Send + 'static,
1864    {
1865        let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
1866        if sources_and_priorities.is_empty() {
1867            return Source::empty();
1868        }
1869        let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
1870            .into_iter()
1871            .map(|(source, priority)| (source.factory, priority))
1872            .unzip();
1873        Source::from_materialized_factory(move |materializer| {
1874            let mut streams = Vec::with_capacity(factories.len());
1875            for factory in &factories {
1876                let stream = match Arc::clone(factory).create(materializer) {
1877                    Ok((stream, _)) => stream,
1878                    Err(error) => {
1879                        return Ok((
1880                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1881                            NotUsed,
1882                        ));
1883                    }
1884                };
1885                streams.push(stream);
1886            }
1887            Ok((
1888                merge_prioritized_streams(streams, priorities.clone(), eager_complete),
1889                NotUsed,
1890            ))
1891        })
1892    }
1893
1894    #[must_use]
1895    pub fn maybe() -> (MaybeHandle<Out>, Self) {
1896        let value = Arc::new(Mutex::new(None));
1897        let handle = MaybeHandle {
1898            value: Arc::clone(&value),
1899        };
1900        let source = Self::from_factory(move || {
1901            let result = value
1902                .lock()
1903                .expect("maybe source poisoned")
1904                .clone()
1905                .unwrap_or(Err(StreamError::MaybeIncomplete));
1906            Box::new(std::iter::once(result))
1907        });
1908        (handle, source)
1909    }
1910
1911    #[must_use]
1912    pub fn single(item: Out) -> Self {
1913        Self::from_factory_with_hints(
1914            move || Box::new(std::iter::once(Ok(item.clone()))),
1915            SourceHints::with_inline_micro(1),
1916        )
1917    }
1918
1919    #[must_use]
1920    pub fn repeat(item: Out) -> Self {
1921        Self::from_factory(move || {
1922            let item = item.clone();
1923            Box::new(std::iter::repeat_with(move || Ok(item.clone())))
1924        })
1925    }
1926
1927    #[must_use]
1928    pub fn from_iterable<I>(items: I) -> Self
1929    where
1930        I: IntoIterator<Item = Out>,
1931    {
1932        items.into_iter().collect()
1933    }
1934}
1935
1936impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
1937    fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
1938        // Share the backing storage across materializations and clone elements
1939        // lazily on demand, instead of cloning the whole container each run.
1940        let items: Arc<[Out]> = iter.into_iter().collect();
1941        let len = items.len();
1942        Self::from_factory_with_hints(
1943            move || {
1944                let items = Arc::clone(&items);
1945                let mut index = 0;
1946                Box::new(std::iter::from_fn(move || {
1947                    let item = items.get(index)?.clone();
1948                    index += 1;
1949                    Some(Ok(item))
1950                }))
1951            },
1952            // Exact success item count: enables the coordinator inline drain path.
1953            SourceHints::with_inline_micro(len),
1954        )
1955    }
1956}
1957
1958/// Test-only helper: create a Source whose factory calls the given closure but
1959/// marks it as inline-micro eligible. Used to test the inline drain path with
1960/// custom (possibly blocking) iterators without broadening production
1961/// eligibility.
1962#[cfg(test)]
1963pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
1964    factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
1965    max_success_items: usize,
1966) -> Source<Out, NotUsed> {
1967    Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
1968}
1969
1970struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
1971where
1972    Create: Fn() -> StreamResult<Resource>,
1973    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1974    Close: Fn(Resource) -> StreamResult<()>,
1975{
1976    create: Arc<Create>,
1977    read: Arc<Read>,
1978    close: Arc<Close>,
1979    resource: Option<Resource>,
1980    created: bool,
1981    terminated: bool,
1982    _marker: PhantomData<fn() -> Out>,
1983}
1984
1985impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
1986where
1987    Create: Fn() -> StreamResult<Resource>,
1988    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1989    Close: Fn(Resource) -> StreamResult<()>,
1990{
1991    fn ensure_created(&mut self) -> StreamResult<()> {
1992        if self.created {
1993            return Ok(());
1994        }
1995        self.created = true;
1996        let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
1997            .and_then(|result| result)?;
1998        self.resource = Some(resource);
1999        Ok(())
2000    }
2001
2002    fn close_resource(&mut self) -> StreamResult<()> {
2003        match self.resource.take() {
2004            Some(resource) => {
2005                catch_unwind_failed("unfold_resource close", || (self.close)(resource))
2006                    .and_then(|result| result)
2007            }
2008            None => Ok(()),
2009        }
2010    }
2011}
2012
2013impl<Resource, Out, Create, Read, Close> Iterator
2014    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2015where
2016    Create: Fn() -> StreamResult<Resource>,
2017    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2018    Close: Fn(Resource) -> StreamResult<()>,
2019{
2020    type Item = StreamResult<Out>;
2021
2022    fn next(&mut self) -> Option<Self::Item> {
2023        if self.terminated {
2024            return None;
2025        }
2026        if let Err(error) = self.ensure_created() {
2027            self.terminated = true;
2028            return Some(Err(error));
2029        }
2030
2031        let result = {
2032            let resource = self
2033                .resource
2034                .as_mut()
2035                .expect("unfold_resource resource is open");
2036            catch_unwind_failed("unfold_resource read", || (self.read)(resource))
2037                .and_then(|result| result)
2038        };
2039
2040        match result {
2041            Ok(Some(item)) => Some(Ok(item)),
2042            Ok(None) => {
2043                self.terminated = true;
2044                match self.close_resource() {
2045                    Ok(()) => None,
2046                    Err(error) => Some(Err(error)),
2047                }
2048            }
2049            Err(read_error) => {
2050                self.terminated = true;
2051                let _ = self.close_resource();
2052                Some(Err(read_error))
2053            }
2054        }
2055    }
2056}
2057
2058impl<Resource, Out, Create, Read, Close> Drop
2059    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2060where
2061    Create: Fn() -> StreamResult<Resource>,
2062    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2063    Close: Fn(Resource) -> StreamResult<()>,
2064{
2065    fn drop(&mut self) {
2066        let _ = self.close_resource();
2067    }
2068}
2069
2070type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
2071    fn() -> (Out, CreateFut, ReadFut, CloseFut);
2072
2073struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2074where
2075    Resource: Send + 'static,
2076    Create: Fn() -> CreateFut,
2077    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2078    Read: Fn(&mut Resource) -> ReadFut,
2079    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2080    Close: Fn(Resource) -> CloseFut,
2081    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2082{
2083    create: Arc<Create>,
2084    read: Arc<Read>,
2085    close: Arc<Close>,
2086    resource: Option<Resource>,
2087    created: bool,
2088    terminated: bool,
2089    _marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
2090}
2091
2092impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2093    UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2094where
2095    Resource: Send + 'static,
2096    Create: Fn() -> CreateFut,
2097    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2098    Read: Fn(&mut Resource) -> ReadFut,
2099    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2100    Close: Fn(Resource) -> CloseFut,
2101    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2102{
2103    fn ensure_created(&mut self) -> StreamResult<()> {
2104        if self.created {
2105            return Ok(());
2106        }
2107        self.created = true;
2108        let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
2109            .and_then(flow::run_future_inline_or_spawn)?;
2110        self.resource = Some(resource);
2111        Ok(())
2112    }
2113
2114    fn close_resource(&mut self) -> StreamResult<()> {
2115        match self.resource.take() {
2116            Some(resource) => {
2117                catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
2118                    .and_then(flow::run_future_inline_or_spawn)
2119            }
2120            None => Ok(()),
2121        }
2122    }
2123}
2124
2125impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
2126    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2127where
2128    Resource: Send + 'static,
2129    Out: Send + 'static,
2130    Create: Fn() -> CreateFut,
2131    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2132    Read: Fn(&mut Resource) -> ReadFut,
2133    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2134    Close: Fn(Resource) -> CloseFut,
2135    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2136{
2137    type Item = StreamResult<Out>;
2138
2139    fn next(&mut self) -> Option<Self::Item> {
2140        if self.terminated {
2141            return None;
2142        }
2143        if let Err(error) = self.ensure_created() {
2144            self.terminated = true;
2145            return Some(Err(error));
2146        }
2147
2148        let result = {
2149            let resource = self
2150                .resource
2151                .as_mut()
2152                .expect("unfold_resource_async resource is open");
2153            catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
2154                .and_then(flow::run_future_inline_or_spawn)
2155        };
2156
2157        match result {
2158            Ok(Some(item)) => Some(Ok(item)),
2159            Ok(None) => {
2160                self.terminated = true;
2161                match self.close_resource() {
2162                    Ok(()) => None,
2163                    Err(error) => Some(Err(error)),
2164                }
2165            }
2166            Err(read_error) => {
2167                self.terminated = true;
2168                let _ = self.close_resource();
2169                Some(Err(read_error))
2170            }
2171        }
2172    }
2173}
2174
2175impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
2176    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2177where
2178    Resource: Send + 'static,
2179    Create: Fn() -> CreateFut,
2180    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2181    Read: Fn(&mut Resource) -> ReadFut,
2182    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2183    Close: Fn(Resource) -> CloseFut,
2184    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2185{
2186    fn drop(&mut self) {
2187        let _ = self.close_resource();
2188    }
2189}
2190
2191struct LazySourceStream<Out, InnerMat, F> {
2192    create: Arc<F>,
2193    materializer: Materializer,
2194    current: Option<BoxStream<Out>>,
2195    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2196    initialized: bool,
2197    terminated: bool,
2198}
2199
2200impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
2201where
2202    Out: Send + 'static,
2203    InnerMat: Send + 'static,
2204    F: Fn() -> Source<Out, InnerMat>,
2205{
2206    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2207        if let Some(sender) = self.mat_sender.take() {
2208            let _ = sender.send(result);
2209        }
2210    }
2211
2212    fn initialize(&mut self) -> StreamResult<()> {
2213        if self.initialized {
2214            return Ok(());
2215        }
2216        self.initialized = true;
2217        let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
2218            Ok(source) => source,
2219            Err(error) => {
2220                self.complete_mat(Err(error.clone()));
2221                return Err(error);
2222            }
2223        };
2224        match Arc::clone(&source.factory).create(&self.materializer) {
2225            Ok((stream, mat)) => {
2226                self.current = Some(stream);
2227                self.complete_mat(Ok(mat));
2228                Ok(())
2229            }
2230            Err(error) => {
2231                self.complete_mat(Err(error.clone()));
2232                Err(error)
2233            }
2234        }
2235    }
2236}
2237
2238impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
2239where
2240    Out: Send + 'static,
2241    InnerMat: Send + 'static,
2242    F: Fn() -> Source<Out, InnerMat>,
2243{
2244    type Item = StreamResult<Out>;
2245
2246    fn next(&mut self) -> Option<Self::Item> {
2247        if self.terminated {
2248            return None;
2249        }
2250        if let Err(error) = self.initialize() {
2251            self.terminated = true;
2252            return Some(Err(error));
2253        }
2254        match self
2255            .current
2256            .as_mut()
2257            .expect("lazy_source current stream initialized")
2258            .next()
2259        {
2260            Some(Ok(item)) => Some(Ok(item)),
2261            Some(Err(error)) => {
2262                self.terminated = true;
2263                Some(Err(error))
2264            }
2265            None => {
2266                self.terminated = true;
2267                None
2268            }
2269        }
2270    }
2271}
2272
2273impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
2274    fn drop(&mut self) {
2275        if !self.initialized
2276            && let Some(sender) = self.mat_sender.take()
2277        {
2278            let _ = sender.send(Err(StreamError::Failed(
2279                "lazy source was never materialized".into(),
2280            )));
2281        }
2282    }
2283}
2284
2285struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2286    create: Arc<F>,
2287    materializer: Materializer,
2288    current: Option<BoxStream<Out>>,
2289    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2290    initialized: bool,
2291    terminated: bool,
2292    _marker: PhantomData<fn() -> Fut>,
2293}
2294
2295impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
2296where
2297    Out: Send + 'static,
2298    InnerMat: Send + 'static,
2299    F: Fn() -> Fut,
2300    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2301{
2302    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2303        if let Some(sender) = self.mat_sender.take() {
2304            let _ = sender.send(result);
2305        }
2306    }
2307
2308    fn initialize(&mut self) -> StreamResult<()> {
2309        if self.initialized {
2310            return Ok(());
2311        }
2312        self.initialized = true;
2313        let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
2314            .and_then(flow::run_future_inline_or_spawn)
2315        {
2316            Ok(source) => source,
2317            Err(error) => {
2318                self.complete_mat(Err(error.clone()));
2319                return Err(error);
2320            }
2321        };
2322        match Arc::clone(&source.factory).create(&self.materializer) {
2323            Ok((stream, mat)) => {
2324                self.current = Some(stream);
2325                self.complete_mat(Ok(mat));
2326                Ok(())
2327            }
2328            Err(error) => {
2329                self.complete_mat(Err(error.clone()));
2330                Err(error)
2331            }
2332        }
2333    }
2334}
2335
2336impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
2337where
2338    Out: Send + 'static,
2339    InnerMat: Send + 'static,
2340    F: Fn() -> Fut,
2341    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2342{
2343    type Item = StreamResult<Out>;
2344
2345    fn next(&mut self) -> Option<Self::Item> {
2346        if self.terminated {
2347            return None;
2348        }
2349        if let Err(error) = self.initialize() {
2350            self.terminated = true;
2351            return Some(Err(error));
2352        }
2353        match self
2354            .current
2355            .as_mut()
2356            .expect("lazy_future_source current stream initialized")
2357            .next()
2358        {
2359            Some(Ok(item)) => Some(Ok(item)),
2360            Some(Err(error)) => {
2361                self.terminated = true;
2362                Some(Err(error))
2363            }
2364            None => {
2365                self.terminated = true;
2366                None
2367            }
2368        }
2369    }
2370}
2371
2372impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2373    fn drop(&mut self) {
2374        if !self.initialized
2375            && let Some(sender) = self.mat_sender.take()
2376        {
2377            let _ = sender.send(Err(StreamError::Failed(
2378                "lazy future source was never materialized".into(),
2379            )));
2380        }
2381    }
2382}
2383
2384fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
2385where
2386    Out: Send + 'static,
2387{
2388    let mut streams: VecDeque<_> = streams.into();
2389    let mut current = streams.pop_front();
2390    Box::new(std::iter::from_fn(move || {
2391        loop {
2392            match current.as_mut() {
2393                Some(stream) => match stream.next() {
2394                    Some(item) => return Some(item),
2395                    None => current = streams.pop_front(),
2396                },
2397                None => return None,
2398            }
2399        }
2400    }))
2401}
2402
2403fn concat_source_streams_lazy<Out, Mat>(
2404    initial: BoxStream<Out>,
2405    factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
2406    materializer: &Materializer,
2407) -> BoxStream<Out>
2408where
2409    Out: Send + 'static,
2410    Mat: Send + 'static,
2411{
2412    let mut current = Some(initial);
2413    let mut remaining: VecDeque<_> = factories.into();
2414    let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
2415    Box::new(std::iter::from_fn(move || {
2416        loop {
2417            match current.as_mut() {
2418                Some(stream) => match stream.next() {
2419                    Some(item) => return Some(item),
2420                    None => {
2421                        current = remaining.pop_front().map(|factory| {
2422                            match factory.create(&materializer) {
2423                                Ok((stream, _)) => stream,
2424                                Err(error) => {
2425                                    Box::new(std::iter::once(Err(error))) as BoxStream<Out>
2426                                }
2427                            }
2428                        });
2429                    }
2430                },
2431                None => return None,
2432            }
2433        }
2434    }))
2435}
2436
2437fn or_else_source_stream<Out>(
2438    mut primary: BoxStream<Out>,
2439    mut secondary: BoxStream<Out>,
2440) -> BoxStream<Out>
2441where
2442    Out: Send + 'static,
2443{
2444    let mut primary_emitted = false;
2445    let mut using_secondary = false;
2446    Box::new(std::iter::from_fn(move || {
2447        loop {
2448            if using_secondary {
2449                return secondary.next();
2450            }
2451
2452            match primary.next() {
2453                Some(Ok(item)) => {
2454                    primary_emitted = true;
2455                    return Some(Ok(item));
2456                }
2457                Some(Err(error)) => return Some(Err(error)),
2458                None if primary_emitted => return None,
2459                None => using_secondary = true,
2460            }
2461        }
2462    }))
2463}
2464
2465fn interleave_source_streams<Out>(
2466    streams: Vec<BoxStream<Out>>,
2467    segment_size: usize,
2468    eager_close: bool,
2469) -> BoxStream<Out>
2470where
2471    Out: Send + 'static,
2472{
2473    if segment_size == 0 {
2474        return Box::new(std::iter::once(Err(StreamError::GraphValidation(
2475            "interleave segment size must be greater than zero".into(),
2476        ))));
2477    }
2478
2479    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
2480    let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
2481    let mut current = 0usize;
2482    let mut emitted = 0usize;
2483    Box::new(std::iter::from_fn(move || {
2484        loop {
2485            if streams.iter().all(Option::is_none) {
2486                return None;
2487            }
2488            if streams[current].is_none() {
2489                match next_active_source_stream(&streams, current) {
2490                    Some(next) => {
2491                        current = next;
2492                        emitted = 0;
2493                    }
2494                    None => return None,
2495                }
2496            }
2497
2498            let Some(stream) = streams[current].as_mut() else {
2499                continue;
2500            };
2501            let next_item = pending[current].take().or_else(|| stream.next());
2502            match next_item {
2503                Some(Ok(item)) => {
2504                    emitted += 1;
2505                    if emitted == segment_size {
2506                        emitted = 0;
2507                        if let Some(next) = next_active_source_stream(&streams, current) {
2508                            current = next;
2509                        }
2510                    }
2511                    return Some(Ok(item));
2512                }
2513                Some(Err(error)) => return Some(Err(error)),
2514                None => {
2515                    streams[current] = None;
2516                    emitted = 0;
2517                    if eager_close {
2518                        return None;
2519                    }
2520                    match next_active_source_stream(&streams, current) {
2521                        Some(next) => current = next,
2522                        None => return None,
2523                    }
2524                }
2525            }
2526        }
2527    }))
2528}
2529
2530fn next_active_source_stream<Out>(
2531    streams: &[Option<BoxStream<Out>>],
2532    current: usize,
2533) -> Option<usize>
2534where
2535    Out: Send + 'static,
2536{
2537    if streams.is_empty() {
2538        return None;
2539    }
2540    for offset in 1..=streams.len() {
2541        let index = (current + offset) % streams.len();
2542        if streams[index].is_some() {
2543            return Some(index);
2544        }
2545    }
2546    None
2547}