Skip to main content

datum/stream/
source.rs

1use super::flow::{self, FlowTransform};
2use super::*;
3use crate::Attributes;
4use crate::context::SourceWithContext;
5
6#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
7pub struct NotUsed;
8
9type CombinedSourceFactory<Out> =
10    dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
11
12pub struct Keep;
13
14impl Keep {
15    pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
16        left
17    }
18
19    pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
20        right
21    }
22
23    pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
24        (left, right)
25    }
26
27    pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
28        NotUsed
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum SourceCombineStrategy {
34    Merge {
35        eager_complete: bool,
36    },
37    Concat,
38    Prioritized {
39        priorities: Vec<usize>,
40        eager_complete: bool,
41    },
42}
43
44#[derive(Clone)]
45pub struct MaybeHandle<T> {
46    value: Arc<Mutex<Option<StreamResult<T>>>>,
47}
48
49impl<T> MaybeHandle<T> {
50    #[must_use]
51    pub fn is_completed(&self) -> bool {
52        self.value.lock().expect("maybe handle poisoned").is_some()
53    }
54
55    pub fn complete(&self, item: T) -> StreamResult<()> {
56        self.settle(Ok(item))
57    }
58
59    pub fn fail(&self, error: StreamError) -> StreamResult<()> {
60        self.settle(Err(error))
61    }
62
63    fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
64        let mut value = self.value.lock().expect("maybe handle poisoned");
65        if value.is_some() {
66            return Err(StreamError::Failed("maybe source already completed".into()));
67        }
68        *value = Some(result);
69        Ok(())
70    }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub struct Demand(u64);
75
76impl Demand {
77    pub const ZERO: Self = Self(0);
78    pub const ONE: Self = Self(1);
79    pub const MAX: Self = Self(u64::MAX);
80
81    #[must_use]
82    pub const fn new(available: u64) -> Self {
83        Self(available)
84    }
85
86    #[must_use]
87    pub const fn available(self) -> u64 {
88        self.0
89    }
90
91    #[must_use]
92    pub const fn is_unbounded(self) -> bool {
93        self.0 == u64::MAX
94    }
95
96    #[must_use]
97    pub const fn is_empty(self) -> bool {
98        self.0 == 0
99    }
100
101    #[must_use]
102    pub const fn saturating_add(self, rhs: Self) -> Self {
103        Self(self.0.saturating_add(rhs.0))
104    }
105
106    pub fn consume_one(&mut self) -> bool {
107        match self.0 {
108            0 => false,
109            u64::MAX => true,
110            _ => {
111                self.0 -= 1;
112                true
113            }
114        }
115    }
116}
117
118pub trait PushOutlet<T>: Send {
119    fn push(&mut self, item: T) -> StreamResult<Demand>;
120
121    fn complete(&mut self) -> StreamResult<()> {
122        Ok(())
123    }
124
125    fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
126        Err(cause)
127    }
128}
129
130#[derive(Clone)]
131pub struct Source<Out, Mat = NotUsed> {
132    pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
133    pub(super) hints: SourceHints,
134    pub(super) attributes: Attributes,
135    /// Set only for split-segment sources in fast mode. `None` for all other sources.
136    /// Cleared by all composition operators (via/map/to_mat etc.).
137    pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
138}
139
140impl<Out: Send + 'static> Source<Out, NotUsed> {
141    pub(super) fn from_factory<F>(factory: F) -> Self
142    where
143        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
144    {
145        Self::from_factory_with_hints(factory, SourceHints::default())
146    }
147
148    fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
149    where
150        F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
151    {
152        Self::from_materialized_factory_with_hints(
153            move |_materializer| Ok((factory(), NotUsed)),
154            hints,
155        )
156    }
157
158    #[must_use]
159    pub fn empty() -> Self {
160        Self::from_factory_with_hints(
161            || Box::new(std::iter::empty()),
162            SourceHints::with_inline_micro(0),
163        )
164    }
165
166    #[must_use]
167    pub fn never() -> Self {
168        Self::from_materialized_factory_with_hints(
169            move |materializer| {
170                let state = Arc::clone(&materializer.inner.state);
171                Ok((
172                    Box::new(std::iter::from_fn(move || {
173                        loop {
174                            if state.shutdown.load(Ordering::SeqCst) {
175                                return Some(Err(StreamError::AbruptTermination));
176                            }
177                            if super::runtime::current_stream_cancelled()
178                                .as_ref()
179                                .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
180                            {
181                                return Some(Err(StreamError::Cancelled));
182                            }
183                            std::thread::park_timeout(Duration::from_millis(1));
184                        }
185                    })),
186                    NotUsed,
187                ))
188            },
189            SourceHints::default(),
190        )
191    }
192
193    #[must_use]
194    pub fn failed(error: StreamError) -> Self {
195        Self::from_factory_with_hints(
196            move || Box::new(std::iter::once(Err(error.clone()))),
197            // max_success_items=0: failed() emits no Ok items before the Err.
198            SourceHints::with_inline_micro(0),
199        )
200    }
201
202    #[must_use]
203    pub fn future<F, Fut>(future: F) -> Self
204    where
205        F: Fn() -> Fut + Send + Sync + 'static,
206        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
207    {
208        let future = Arc::new(future);
209        Self::from_factory(move || {
210            let future = Arc::clone(&future);
211            let mut emitted = false;
212            Box::new(std::iter::from_fn(move || {
213                if emitted {
214                    return None;
215                }
216                emitted = true;
217                Some(
218                    catch_unwind_failed("source future factory", || future())
219                        .and_then(flow::run_future_inline_or_spawn),
220                )
221            }))
222        })
223    }
224
225    #[must_use]
226    pub fn future_source<F, Fut>(future: F) -> Self
227    where
228        F: Fn() -> Fut + Send + Sync + 'static,
229        Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
230    {
231        let future = Arc::new(future);
232        Self::from_materialized_factory(move |materializer| {
233            let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
234            let future = Arc::clone(&future);
235            let mut current = None::<BoxStream<Out>>;
236            let mut initialized = false;
237            let mut terminated = false;
238            Ok((
239                Box::new(std::iter::from_fn(move || {
240                    if terminated {
241                        return None;
242                    }
243
244                    loop {
245                        if let Some(stream) = current.as_mut() {
246                            match stream.next() {
247                                Some(item) => return Some(item),
248                                None => {
249                                    terminated = true;
250                                    return None;
251                                }
252                            }
253                        }
254
255                        if initialized {
256                            terminated = true;
257                            return None;
258                        }
259                        initialized = true;
260                        let source = match catch_unwind_failed("future_source factory", || future())
261                            .and_then(flow::run_future_inline_or_spawn)
262                        {
263                            Ok(source) => source,
264                            Err(error) => {
265                                terminated = true;
266                                return Some(Err(error));
267                            }
268                        };
269                        current = Some(match Arc::clone(&source.factory).create(&materializer) {
270                            Ok((stream, _)) => stream,
271                            Err(error) => {
272                                terminated = true;
273                                return Some(Err(error));
274                            }
275                        });
276                    }
277                })) as BoxStream<Out>,
278                NotUsed,
279            ))
280        })
281    }
282
283    #[must_use]
284    pub fn cycle<F, I>(factory: F) -> Self
285    where
286        F: Fn() -> I + Send + Sync + 'static,
287        I: IntoIterator<Item = Out>,
288        I::IntoIter: Send + 'static,
289    {
290        let factory = Arc::new(factory);
291        Self::from_factory(move || {
292            let factory = Arc::clone(&factory);
293            let mut current = None::<I::IntoIter>;
294            let mut terminated = false;
295            Box::new(std::iter::from_fn(move || {
296                if terminated {
297                    return None;
298                }
299
300                if let Some(iter) = current.as_mut()
301                    && let Some(item) = iter.next()
302                {
303                    return Some(Ok(item));
304                }
305
306                let mut next = match catch_unwind_failed("cycle factory", || factory()) {
307                    Ok(iterable) => iterable.into_iter(),
308                    Err(error) => {
309                        terminated = true;
310                        return Some(Err(error));
311                    }
312                };
313                match next.next() {
314                    Some(item) => {
315                        current = Some(next);
316                        Some(Ok(item))
317                    }
318                    None => {
319                        terminated = true;
320                        Some(Err(StreamError::Failed("empty iterator".into())))
321                    }
322                }
323            }))
324        })
325    }
326
327    #[must_use]
328    pub fn unfold<State, F>(initial: State, f: F) -> Self
329    where
330        State: Clone + Send + Sync + 'static,
331        F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
332    {
333        let f = Arc::new(f);
334        Self::from_factory(move || {
335            let f = Arc::clone(&f);
336            let mut state = Some(initial.clone());
337            let mut terminated = false;
338            Box::new(std::iter::from_fn(move || {
339                if terminated {
340                    return None;
341                }
342                let current = state.take().expect("unfold state present");
343                match catch_unwind_failed("unfold function", || f(current)) {
344                    Ok(Some((next_state, item))) => {
345                        state = Some(next_state);
346                        Some(Ok(item))
347                    }
348                    Ok(None) => {
349                        terminated = true;
350                        None
351                    }
352                    Err(error) => {
353                        terminated = true;
354                        Some(Err(error))
355                    }
356                }
357            }))
358        })
359    }
360
361    #[must_use]
362    pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
363    where
364        State: Clone + Send + Sync + 'static,
365        F: Fn(State) -> Fut + Send + Sync + 'static,
366        Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
367    {
368        let f = Arc::new(f);
369        Self::from_factory(move || {
370            let f = Arc::clone(&f);
371            let mut state = Some(initial.clone());
372            let mut terminated = false;
373            Box::new(std::iter::from_fn(move || {
374                if terminated {
375                    return None;
376                }
377                let current = state.take().expect("unfold_async state present");
378                match catch_unwind_failed("unfold_async factory", || f(current))
379                    .and_then(flow::run_future_inline_or_spawn)
380                {
381                    Ok(Some((next_state, item))) => {
382                        state = Some(next_state);
383                        Some(Ok(item))
384                    }
385                    Ok(None) => {
386                        terminated = true;
387                        None
388                    }
389                    Err(error) => {
390                        terminated = true;
391                        Some(Err(error))
392                    }
393                }
394            }))
395        })
396    }
397
398    #[must_use]
399    pub fn unfold_resource<Resource, Create, Read, Close>(
400        create: Create,
401        read: Read,
402        close: Close,
403    ) -> Self
404    where
405        Resource: Send + 'static,
406        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
407        Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
408        Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
409    {
410        let create = Arc::new(create);
411        let read = Arc::new(read);
412        let close = Arc::new(close);
413        Self::from_factory(move || {
414            Box::new(UnfoldResourceStream {
415                create: Arc::clone(&create),
416                read: Arc::clone(&read),
417                close: Arc::clone(&close),
418                resource: None,
419                created: false,
420                terminated: false,
421                _marker: PhantomData,
422            })
423        })
424    }
425
426    #[must_use]
427    pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
428        create: Create,
429        read: Read,
430        close: Close,
431    ) -> Self
432    where
433        Resource: Send + 'static,
434        Create: Fn() -> CreateFut + Send + Sync + 'static,
435        CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
436        Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
437        ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
438        Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
439        CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
440    {
441        let create = Arc::new(create);
442        let read = Arc::new(read);
443        let close = Arc::new(close);
444        Self::from_factory(move || {
445            Box::new(UnfoldResourceAsyncStream {
446                create: Arc::clone(&create),
447                read: Arc::clone(&read),
448                close: Arc::clone(&close),
449                resource: None,
450                created: false,
451                terminated: false,
452                _marker: PhantomData,
453            })
454        })
455    }
456
457    #[must_use]
458    pub fn lazy_single<F>(create: F) -> Self
459    where
460        F: Fn() -> Out + Send + Sync + 'static,
461    {
462        let create = Arc::new(create);
463        Self::from_factory(move || {
464            let create = Arc::clone(&create);
465            let mut emitted = false;
466            Box::new(std::iter::from_fn(move || {
467                if emitted {
468                    return None;
469                }
470                emitted = true;
471                Some(catch_unwind_failed("lazy_single factory", || create()))
472            }))
473        })
474    }
475
476    #[must_use]
477    pub fn lazy_future<F, Fut>(create: F) -> Self
478    where
479        F: Fn() -> Fut + Send + Sync + 'static,
480        Fut: Future<Output = StreamResult<Out>> + Send + 'static,
481    {
482        let create = Arc::new(create);
483        Self::from_factory(move || {
484            let create = Arc::clone(&create);
485            let mut emitted = false;
486            Box::new(std::iter::from_fn(move || {
487                if emitted {
488                    return None;
489                }
490                emitted = true;
491                Some(
492                    catch_unwind_failed("lazy_future factory", || create())
493                        .and_then(flow::run_future_inline_or_spawn),
494                )
495            }))
496        })
497    }
498
499    #[must_use]
500    pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
501    where
502        InnerMat: Send + 'static,
503        F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
504    {
505        let create = Arc::new(create);
506        Source::from_materialized_factory(move |materializer| {
507            let (sender, receiver) = oneshot::channel();
508            Ok((
509                Box::new(LazySourceStream {
510                    create: Arc::clone(&create),
511                    materializer: materializer
512                        .with_name_prefix(materializer.name_prefix().to_owned()),
513                    current: None,
514                    mat_sender: Some(sender),
515                    initialized: false,
516                    terminated: false,
517                }) as BoxStream<Out>,
518                StreamCompletion::from_receiver(receiver, None),
519            ))
520        })
521    }
522
523    #[must_use]
524    pub fn lazy_future_source<InnerMat, F, Fut>(
525        create: F,
526    ) -> Source<Out, StreamCompletion<InnerMat>>
527    where
528        InnerMat: Send + 'static,
529        F: Fn() -> Fut + Send + Sync + 'static,
530        Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
531    {
532        let create = Arc::new(create);
533        Source::from_materialized_factory(move |materializer| {
534            let (sender, receiver) = oneshot::channel();
535            Ok((
536                Box::new(LazyFutureSourceStream {
537                    create: Arc::clone(&create),
538                    materializer: materializer
539                        .with_name_prefix(materializer.name_prefix().to_owned()),
540                    current: None,
541                    mat_sender: Some(sender),
542                    initialized: false,
543                    terminated: false,
544                    _marker: PhantomData,
545                }) as BoxStream<Out>,
546                StreamCompletion::from_receiver(receiver, None),
547            ))
548        })
549    }
550
551    #[must_use]
552    pub fn from_fn_iter<F, I>(factory: F) -> Self
553    where
554        F: Fn() -> I + Send + Sync + 'static,
555        I: IntoIterator<Item = Out>,
556        I::IntoIter: Send + 'static,
557    {
558        Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
559    }
560}
561
562impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
563    fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
564    where
565        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
566    {
567        Self {
568            factory: Arc::new(FnSourceFactory(factory)),
569            hints,
570            attributes: Attributes::default(),
571            split_hook: None,
572        }
573    }
574
575    pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
576    where
577        F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
578    {
579        Self::from_materialized_factory_with_hints(factory, SourceHints::default())
580    }
581
582    #[must_use]
583    pub fn as_source_with_context<Ctx, F>(
584        self,
585        extract_context: F,
586    ) -> SourceWithContext<Out, Ctx, Mat>
587    where
588        Ctx: Send + 'static,
589        F: Fn(&Out) -> Ctx + Send + Sync + 'static,
590    {
591        SourceWithContext::from_source(self.map(move |item| {
592            let context = extract_context(&item);
593            (item, context)
594        }))
595    }
596
597    #[must_use]
598    pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
599    where
600        Next: Send + 'static,
601        FlowMat: Send + 'static,
602    {
603        self.via_mat(flow, Keep::left)
604    }
605
606    #[must_use]
607    pub fn via_mat<Next, FlowMat, Combined, F>(
608        self,
609        flow: Flow<Out, Next, FlowMat>,
610        combine: F,
611    ) -> Source<Next, Combined>
612    where
613        Next: Send + 'static,
614        FlowMat: Send + 'static,
615        Combined: Send + 'static,
616        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
617    {
618        let source = self.factory;
619        let transform = flow.transform;
620        let materialize_flow = flow.materialize;
621        let hints = self.hints.after_flow(flow.hints);
622        let combine = Arc::new(combine);
623        Source::from_materialized_factory_with_hints(
624            move |materializer| {
625                let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
626                let flow_mat = materialize_flow()?;
627                let stream = match &transform {
628                    FlowTransform::Pure(transform) => transform(stream),
629                    FlowTransform::Runtime(transform) => transform(stream, materializer)?,
630                };
631                Ok((stream, combine(source_mat, flow_mat)))
632            },
633            hints,
634        )
635    }
636
637    #[must_use]
638    pub fn via_mat_with<Next, FlowMat, Combined, F>(
639        self,
640        flow: Flow<Out, Next, FlowMat>,
641        combine: F,
642    ) -> Source<Next, Combined>
643    where
644        Next: Send + 'static,
645        FlowMat: Send + 'static,
646        Combined: Send + 'static,
647        F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
648    {
649        self.via_mat(flow, combine)
650    }
651
652    #[must_use]
653    pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
654    where
655        Next: Send + 'static,
656        F: Fn(Out) -> Next + Send + Sync + 'static,
657    {
658        Source {
659            factory: Arc::new(MapSourceFactory {
660                source: self.factory,
661                stage: f,
662                _marker: PhantomData,
663            }),
664            hints: self.hints,
665            attributes: self.attributes,
666            split_hook: None,
667        }
668    }
669
670    #[must_use]
671    pub fn attributes(&self) -> &Attributes {
672        &self.attributes
673    }
674
675    #[must_use]
676    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
677        self.attributes = attributes;
678        self
679    }
680
681    #[must_use]
682    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
683        self.attributes = self.attributes.and(attributes);
684        self
685    }
686
687    #[must_use]
688    pub fn named(self, name: impl Into<String>) -> Self {
689        self.add_attributes(Attributes::named(name))
690    }
691
692    #[must_use]
693    pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
694    where
695        Next: Send + 'static,
696        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
697    {
698        self.via(Flow::identity().map_result(f))
699    }
700
701    #[must_use]
702    pub fn map_result_with_supervision<Next, F>(
703        self,
704        f: F,
705        decider: SupervisionDecider,
706    ) -> Source<Next, Mat>
707    where
708        Next: Send + 'static,
709        F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
710    {
711        self.via(Flow::identity().map_result_with_supervision(f, decider))
712    }
713
714    #[must_use]
715    pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
716    where
717        F: Fn(&Out) -> bool + Send + Sync + 'static,
718    {
719        self.via(Flow::identity().filter(predicate))
720    }
721
722    #[must_use]
723    pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
724    where
725        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
726    {
727        self.via(Flow::identity().filter_result(predicate))
728    }
729
730    #[must_use]
731    pub fn filter_result_with_supervision<F>(
732        self,
733        predicate: F,
734        decider: SupervisionDecider,
735    ) -> Source<Out, Mat>
736    where
737        F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
738    {
739        self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
740    }
741
742    #[must_use]
743    pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
744    where
745        F: Fn(&Out) -> bool + Send + Sync + 'static,
746    {
747        self.via(Flow::identity().filter_not(predicate))
748    }
749
750    #[must_use]
751    pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
752    where
753        Next: Send + 'static,
754        F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
755    {
756        self.via(Flow::identity().filter_map(f))
757    }
758
759    #[must_use]
760    pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
761    where
762        Next: Send + 'static,
763        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
764    {
765        self.via(Flow::identity().filter_map_result(f))
766    }
767
768    #[must_use]
769    pub fn filter_map_result_with_supervision<Next, F>(
770        self,
771        f: F,
772        decider: SupervisionDecider,
773    ) -> Source<Next, Mat>
774    where
775        Next: Send + 'static,
776        F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
777    {
778        self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
779    }
780
781    #[must_use]
782    pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
783    where
784        Next: Send + 'static,
785        F: Fn(Out) -> I + Send + Sync + 'static,
786        I: IntoIterator<Item = Next>,
787        I::IntoIter: Send + 'static,
788    {
789        self.via(Flow::identity().map_concat(f))
790    }
791
792    #[must_use]
793    pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
794    where
795        Next: Send + 'static,
796        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
797        I: IntoIterator<Item = Next>,
798        I::IntoIter: Send + 'static,
799    {
800        self.via(Flow::identity().map_concat_result(f))
801    }
802
803    #[must_use]
804    pub fn map_concat_result_with_supervision<Next, F, I>(
805        self,
806        f: F,
807        decider: SupervisionDecider,
808    ) -> Source<Next, Mat>
809    where
810        Next: Send + 'static,
811        F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
812        I: IntoIterator<Item = Next>,
813        I::IntoIter: Send + 'static,
814    {
815        self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
816    }
817
818    #[must_use]
819    pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
820    where
821        State: Clone + Send + Sync + 'static,
822        Next: Send + 'static,
823        F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
824    {
825        self.via(Flow::identity().stateful_map(seed, f))
826    }
827
828    #[must_use]
829    pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
830    where
831        State: Clone + Send + Sync + 'static,
832        Next: Send + 'static,
833        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
834    {
835        self.via(Flow::identity().stateful_map_result(seed, f))
836    }
837
838    #[must_use]
839    pub fn stateful_map_result_with_supervision<State, Next, F>(
840        self,
841        seed: State,
842        f: F,
843        decider: SupervisionDecider,
844    ) -> Source<Next, Mat>
845    where
846        State: Clone + Send + Sync + 'static,
847        Next: Send + 'static,
848        F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
849    {
850        self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
851    }
852
853    #[must_use]
854    pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
855    where
856        State: Clone + Send + Sync + 'static,
857        Next: Send + 'static,
858        F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
859        I: IntoIterator<Item = Next>,
860        I::IntoIter: Send + 'static,
861    {
862        self.via(Flow::identity().stateful_map_concat(seed, f))
863    }
864
865    #[must_use]
866    pub fn stateful_map_concat_result<State, Next, F, I>(
867        self,
868        seed: State,
869        f: F,
870    ) -> Source<Next, Mat>
871    where
872        State: Clone + Send + Sync + 'static,
873        Next: Send + 'static,
874        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
875        I: IntoIterator<Item = Next>,
876        I::IntoIter: Send + 'static,
877    {
878        self.via(Flow::identity().stateful_map_concat_result(seed, f))
879    }
880
881    #[must_use]
882    pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
883        self,
884        seed: State,
885        f: F,
886        decider: SupervisionDecider,
887    ) -> Source<Next, Mat>
888    where
889        State: Clone + Send + Sync + 'static,
890        Next: Send + 'static,
891        F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
892        I: IntoIterator<Item = Next>,
893        I::IntoIter: Send + 'static,
894    {
895        self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
896    }
897
898    #[must_use]
899    pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
900    where
901        Next: Send + 'static,
902        F: Fn(Out) -> Fut + Send + Sync + 'static,
903        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
904    {
905        self.via(Flow::identity().map_async(parallelism, f))
906    }
907
908    #[must_use]
909    pub fn map_async_with_supervision<Next, F, Fut>(
910        self,
911        parallelism: usize,
912        f: F,
913        decider: SupervisionDecider,
914    ) -> Source<Next, Mat>
915    where
916        Next: Send + 'static,
917        F: Fn(Out) -> Fut + Send + Sync + 'static,
918        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
919    {
920        self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
921    }
922
923    #[must_use]
924    pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
925    where
926        Next: Send + 'static,
927        F: Fn(Out) -> Fut + Send + Sync + 'static,
928        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
929    {
930        self.via(Flow::identity().map_async_unordered(parallelism, f))
931    }
932
933    #[must_use]
934    pub fn map_async_unordered_with_supervision<Next, F, Fut>(
935        self,
936        parallelism: usize,
937        f: F,
938        decider: SupervisionDecider,
939    ) -> Source<Next, Mat>
940    where
941        Next: Send + 'static,
942        F: Fn(Out) -> Fut + Send + Sync + 'static,
943        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
944    {
945        self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
946    }
947
948    #[must_use]
949    pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
950        self,
951        parallelism: usize,
952        per_partition: usize,
953        partition: Partition,
954        f: F,
955    ) -> Source<Next, Mat>
956    where
957        Key: Clone + Eq + Hash + Send + 'static,
958        Next: Send + 'static,
959        Partition: Fn(&Out) -> Key + Send + Sync + 'static,
960        F: Fn(Out) -> Fut + Send + Sync + 'static,
961        Fut: Future<Output = StreamResult<Next>> + Send + 'static,
962    {
963        self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
964    }
965
966    #[must_use]
967    pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
968        self.via(Flow::identity().prefix_and_tail(n))
969    }
970
971    #[must_use]
972    pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
973    where
974        Next: Send + 'static,
975        FlowMat: Send + 'static,
976        F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
977        Out: Clone,
978    {
979        self.via(Flow::identity().flat_map_prefix(n, f))
980    }
981
982    #[must_use]
983    pub fn group_by<Key, F>(
984        self,
985        max_substreams: usize,
986        f: F,
987        allow_closed_substream_recreation: bool,
988    ) -> Source<Source<Out>, Mat>
989    where
990        Key: Clone + Eq + Hash + Send + 'static,
991        F: Fn(&Out) -> Key + Send + Sync + 'static,
992        Out: Clone,
993    {
994        self.via(Flow::identity().group_by(max_substreams, f, allow_closed_substream_recreation))
995    }
996
997    #[must_use]
998    pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
999    where
1000        F: Fn(&Out) -> bool + Send + Sync + 'static,
1001        Out: Clone,
1002    {
1003        self.via(Flow::identity().split_when(predicate))
1004    }
1005
1006    #[must_use]
1007    pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1008    where
1009        F: Fn(&Out) -> bool + Send + Sync + 'static,
1010        Out: Clone,
1011    {
1012        self.via(Flow::identity().split_after(predicate))
1013    }
1014
1015    #[must_use]
1016    pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
1017    where
1018        Next: Send + 'static,
1019        NextMat: Send + 'static,
1020        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1021    {
1022        self.via(Flow::identity().flat_map_concat(f))
1023    }
1024
1025    #[must_use]
1026    pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
1027    where
1028        Next: Send + 'static,
1029        NextMat: Send + 'static,
1030        F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1031    {
1032        self.via(Flow::identity().flat_map_merge(breadth, f))
1033    }
1034
1035    #[must_use]
1036    pub fn take(self, n: usize) -> Source<Out, Mat> {
1037        self.via(Flow::identity().take(n))
1038    }
1039
1040    #[must_use]
1041    pub fn drop(self, n: usize) -> Source<Out, Mat> {
1042        self.via(Flow::identity().drop(n))
1043    }
1044
1045    #[must_use]
1046    pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
1047    where
1048        F: Fn(&Out) -> bool + Send + Sync + 'static,
1049    {
1050        self.via(Flow::identity().take_while(predicate))
1051    }
1052
1053    #[must_use]
1054    pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
1055    where
1056        F: Fn(&Out) -> bool + Send + Sync + 'static,
1057    {
1058        self.via(Flow::identity().drop_while(predicate))
1059    }
1060
1061    #[must_use]
1062    pub fn limit(self, max: u64) -> Source<Out, Mat> {
1063        self.via(Flow::identity().limit(max))
1064    }
1065
1066    #[must_use]
1067    pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
1068        self.via(Flow::identity().grouped(size))
1069    }
1070
1071    #[must_use]
1072    pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1073    where
1074        State: Clone + Send + Sync + 'static,
1075        F: Fn(State, Out) -> State + Send + Sync + 'static,
1076    {
1077        self.via(Flow::identity().scan(seed, f))
1078    }
1079
1080    #[must_use]
1081    pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
1082    where
1083        State: Clone + Send + Sync + 'static,
1084        F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1085        Fut: Future<Output = StreamResult<State>> + Send + 'static,
1086    {
1087        self.via(Flow::identity().scan_async(seed, f))
1088    }
1089
1090    #[must_use]
1091    pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1092    where
1093        State: Clone + Send + Sync + 'static,
1094        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1095    {
1096        self.via(Flow::identity().scan_result(seed, f))
1097    }
1098
1099    #[must_use]
1100    pub fn scan_result_with_supervision<State, F>(
1101        self,
1102        seed: State,
1103        f: F,
1104        decider: SupervisionDecider,
1105    ) -> Source<State, Mat>
1106    where
1107        State: Clone + Send + Sync + 'static,
1108        F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1109    {
1110        self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
1111    }
1112
1113    #[must_use]
1114    pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
1115    where
1116        Out: Clone,
1117    {
1118        self.via(Flow::identity().sliding(size, step))
1119    }
1120
1121    #[must_use]
1122    pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1123    where
1124        Acc: Clone + Send + Sync + 'static,
1125        F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1126    {
1127        self.via(Flow::identity().fold(zero, f))
1128    }
1129
1130    #[must_use]
1131    pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1132    where
1133        Acc: Clone + Send + Sync + 'static,
1134        F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1135        Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1136    {
1137        self.via(Flow::identity().fold_async(zero, f))
1138    }
1139
1140    #[must_use]
1141    pub fn map_with_resource<Resource, Next, Create, F, Close>(
1142        self,
1143        create: Create,
1144        f: F,
1145        close: Close,
1146    ) -> Source<Next, Mat>
1147    where
1148        Resource: Send + 'static,
1149        Next: Send + 'static,
1150        Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1151        F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1152        Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1153    {
1154        self.via(Flow::identity().map_with_resource(create, f, close))
1155    }
1156
1157    #[must_use]
1158    pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1159    where
1160        Acc: Clone + Send + Sync + 'static,
1161        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1162    {
1163        self.via(Flow::identity().fold_result(zero, f))
1164    }
1165
1166    #[must_use]
1167    pub fn fold_result_with_supervision<Acc, F>(
1168        self,
1169        zero: Acc,
1170        f: F,
1171        decider: SupervisionDecider,
1172    ) -> Source<Acc, Mat>
1173    where
1174        Acc: Clone + Send + Sync + 'static,
1175        F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1176    {
1177        self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
1178    }
1179
1180    #[must_use]
1181    pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
1182    where
1183        F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1184    {
1185        self.via(Flow::identity().reduce(f))
1186    }
1187
1188    #[must_use]
1189    pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
1190    where
1191        Out: Clone,
1192        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1193    {
1194        self.via(Flow::identity().reduce_result(f))
1195    }
1196
1197    #[must_use]
1198    pub fn reduce_result_with_supervision<F>(
1199        self,
1200        f: F,
1201        decider: SupervisionDecider,
1202    ) -> Source<Out, Mat>
1203    where
1204        Out: Clone,
1205        F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1206    {
1207        self.via(Flow::identity().reduce_result_with_supervision(f, decider))
1208    }
1209
1210    #[must_use]
1211    pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
1212    where
1213        F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
1214    {
1215        self.via(Flow::identity().map_error(f))
1216    }
1217
1218    #[must_use]
1219    pub fn recover<F>(self, f: F) -> Source<Out, Mat>
1220    where
1221        F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
1222    {
1223        self.via(Flow::identity().recover(f))
1224    }
1225
1226    #[must_use]
1227    pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
1228    where
1229        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1230    {
1231        self.via(Flow::identity().recover_with(f))
1232    }
1233
1234    #[must_use]
1235    pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
1236    where
1237        F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1238    {
1239        self.via(Flow::identity().recover_with_retries(retries, f))
1240    }
1241
1242    #[must_use]
1243    pub fn on_error_complete(self) -> Source<Out, Mat> {
1244        self.via(Flow::identity().on_error_complete())
1245    }
1246
1247    #[must_use]
1248    pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1249    where
1250        Mat2: Send + 'static,
1251    {
1252        let factory = self.factory;
1253        let hints = self.hints;
1254        let that_factory = that.factory;
1255        Source::from_materialized_factory_with_hints(
1256            move |materializer| {
1257                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1258                let secondary = match Arc::clone(&that_factory).create(materializer) {
1259                    Ok((stream, _)) => stream,
1260                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1261                };
1262                Ok((concat_source_streams(vec![primary, secondary]), mat))
1263            },
1264            hints,
1265        )
1266    }
1267
1268    #[must_use]
1269    pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1270    where
1271        Mat2: Send + 'static,
1272    {
1273        let factory = self.factory;
1274        let hints = self.hints;
1275        let that_factory = that.factory;
1276        Source::from_materialized_factory_with_hints(
1277            move |materializer| {
1278                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1279                Ok((
1280                    concat_source_streams_lazy(
1281                        primary,
1282                        vec![Arc::clone(&that_factory)],
1283                        materializer,
1284                    ),
1285                    mat,
1286                ))
1287            },
1288            hints,
1289        )
1290    }
1291
1292    #[must_use]
1293    pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
1294    where
1295        Mat2: Send + 'static,
1296        I: IntoIterator<Item = Source<Out, Mat2>>,
1297    {
1298        let factory = self.factory;
1299        let hints = self.hints;
1300        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1301        Source::from_materialized_factory_with_hints(
1302            move |materializer| {
1303                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1304                Ok((
1305                    concat_source_streams_lazy(primary, other_factories.clone(), materializer),
1306                    mat,
1307                ))
1308            },
1309            hints,
1310        )
1311    }
1312
1313    #[must_use]
1314    pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1315    where
1316        Mat2: Send + 'static,
1317    {
1318        let factory = self.factory;
1319        let hints = self.hints;
1320        let that_factory = that.factory;
1321        Source::from_materialized_factory_with_hints(
1322            move |materializer| {
1323                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1324                let secondary = match Arc::clone(&that_factory).create(materializer) {
1325                    Ok((stream, _)) => stream,
1326                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1327                };
1328                Ok((concat_source_streams(vec![secondary, primary]), mat))
1329            },
1330            hints,
1331        )
1332    }
1333
1334    #[must_use]
1335    pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1336    where
1337        Mat2: Send + 'static,
1338    {
1339        self.prepend(that)
1340    }
1341
1342    #[must_use]
1343    pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
1344    where
1345        Mat2: Send + 'static,
1346    {
1347        let factory = self.factory;
1348        let hints = self.hints;
1349        let secondary_factory = secondary.factory;
1350        Source::from_materialized_factory_with_hints(
1351            move |materializer| {
1352                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1353                let secondary = match Arc::clone(&secondary_factory).create(materializer) {
1354                    Ok((stream, _)) => stream,
1355                    Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1356                };
1357                Ok((or_else_source_stream(primary, secondary), mat))
1358            },
1359            hints,
1360        )
1361    }
1362
1363    #[must_use]
1364    pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
1365    where
1366        Mat2: Send + 'static,
1367    {
1368        self.interleave_all([that], segment_size, false)
1369    }
1370
1371    #[must_use]
1372    pub fn interleave_all<Mat2, I>(
1373        self,
1374        those: I,
1375        segment_size: usize,
1376        eager_close: bool,
1377    ) -> Source<Out, Mat>
1378    where
1379        Mat2: Send + 'static,
1380        I: IntoIterator<Item = Source<Out, Mat2>>,
1381    {
1382        let factory = self.factory;
1383        let hints = self.hints;
1384        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1385        Source::from_materialized_factory_with_hints(
1386            move |materializer| {
1387                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1388                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1389                streams.push(primary);
1390                for other in &other_factories {
1391                    let stream = match Arc::clone(other).create(materializer) {
1392                        Ok((stream, _)) => stream,
1393                        Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1394                    };
1395                    streams.push(stream);
1396                }
1397                Ok((
1398                    interleave_source_streams(streams, segment_size, eager_close),
1399                    mat,
1400                ))
1401            },
1402            hints,
1403        )
1404    }
1405
1406    #[must_use]
1407    pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1408    where
1409        Out: Ord,
1410        Mat2: Send + 'static,
1411    {
1412        self.via(Flow::identity().merge_sorted(that))
1413    }
1414
1415    #[must_use]
1416    pub fn merge_latest<Mat2>(
1417        self,
1418        that: Source<Out, Mat2>,
1419        eager_complete: bool,
1420    ) -> Source<Vec<Out>, Mat>
1421    where
1422        Out: Clone,
1423        Mat2: Send + 'static,
1424    {
1425        let factory = self.factory;
1426        let hints = self.hints;
1427        let that_factory = that.factory;
1428        Source::from_materialized_factory_with_hints(
1429            move |materializer| {
1430                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1431                let right = match Arc::clone(&that_factory).create(materializer) {
1432                    Ok((stream, _)) => stream,
1433                    Err(error) => {
1434                        return Ok((
1435                            Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
1436                            mat,
1437                        ));
1438                    }
1439                };
1440                Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
1441            },
1442            hints,
1443        )
1444    }
1445
1446    #[must_use]
1447    pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
1448    where
1449        Mat2: Send + 'static,
1450        I: IntoIterator<Item = Source<Out, Mat2>>,
1451    {
1452        let factory = self.factory;
1453        let hints = self.hints;
1454        let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1455        Source::from_materialized_factory_with_hints(
1456            move |materializer| {
1457                let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1458                let mut streams = Vec::with_capacity(other_factories.len() + 1);
1459                streams.push(primary);
1460                for other in &other_factories {
1461                    let stream = match Arc::clone(other).create(materializer) {
1462                        Ok((stream, _)) => stream,
1463                        Err(error) => {
1464                            return Ok((
1465                                Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1466                                mat,
1467                            ));
1468                        }
1469                    };
1470                    streams.push(stream);
1471                }
1472                Ok((merge_streams(streams, eager_complete), mat))
1473            },
1474            hints,
1475        )
1476    }
1477
1478    #[must_use]
1479    pub fn zip_with<Mat2, Out2, Next, F>(
1480        self,
1481        that: Source<Out2, Mat2>,
1482        combine: F,
1483    ) -> Source<Next, Mat>
1484    where
1485        Out2: Send + 'static,
1486        Next: Send + 'static,
1487        Mat2: Send + 'static,
1488        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1489    {
1490        self.via(Flow::identity().zip_with(that, combine))
1491    }
1492
1493    #[must_use]
1494    pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
1495    where
1496        Out: Clone,
1497        Out2: Clone + Send + 'static,
1498        Mat2: Send + 'static,
1499    {
1500        self.zip_latest_with(that, true, |left, right| (left, right))
1501    }
1502
1503    #[must_use]
1504    pub fn zip_latest_with<Mat2, Out2, Next, F>(
1505        self,
1506        that: Source<Out2, Mat2>,
1507        eager_complete: bool,
1508        combine: F,
1509    ) -> Source<Next, Mat>
1510    where
1511        Out: Clone,
1512        Out2: Clone + Send + 'static,
1513        Next: Send + 'static,
1514        Mat2: Send + 'static,
1515        F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1516    {
1517        let factory = self.factory;
1518        let hints = self.hints;
1519        let that_factory = that.factory;
1520        let combine = Arc::new(combine);
1521        Source::from_materialized_factory_with_hints(
1522            move |materializer| {
1523                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1524                let right = match Arc::clone(&that_factory).create(materializer) {
1525                    Ok((stream, _)) => stream,
1526                    Err(error) => {
1527                        return Ok((
1528                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1529                            mat,
1530                        ));
1531                    }
1532                };
1533                Ok((
1534                    zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
1535                    mat,
1536                ))
1537            },
1538            hints,
1539        )
1540    }
1541
1542    #[must_use]
1543    pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
1544        let factory = self.factory;
1545        let hints = self.hints;
1546        Source::from_materialized_factory_with_hints(
1547            move |materializer| {
1548                let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
1549                let mut index = 0_u64;
1550                Ok((
1551                    Box::new(std::iter::from_fn(move || {
1552                        stream.next().map(|item| {
1553                            item.map(|value| {
1554                                let pair = (value, index);
1555                                index = index.wrapping_add(1);
1556                                pair
1557                            })
1558                        })
1559                    })) as BoxStream<(Out, u64)>,
1560                    mat,
1561                ))
1562            },
1563            hints,
1564        )
1565    }
1566
1567    #[must_use]
1568    pub fn zip_all<Mat2, Out2>(
1569        self,
1570        that: Source<Out2, Mat2>,
1571        this_elem: Out,
1572        that_elem: Out2,
1573    ) -> Source<(Out, Out2), Mat>
1574    where
1575        Out: Clone + Sync,
1576        Out2: Clone + Send + Sync + 'static,
1577        Mat2: Send + 'static,
1578    {
1579        let factory = self.factory;
1580        let hints = self.hints;
1581        let that_factory = that.factory;
1582        Source::from_materialized_factory_with_hints(
1583            move |materializer| {
1584                let (left, mat) = Arc::clone(&factory).create(materializer)?;
1585                let right = match Arc::clone(&that_factory).create(materializer) {
1586                    Ok((stream, _)) => stream,
1587                    Err(error) => {
1588                        return Ok((
1589                            Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
1590                            mat,
1591                        ));
1592                    }
1593                };
1594                Ok((
1595                    zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
1596                    mat,
1597                ))
1598            },
1599            hints,
1600        )
1601    }
1602
1603    #[must_use]
1604    pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1605    where
1606        Out: Clone,
1607        SinkMat: Send + 'static,
1608    {
1609        self.via(Flow::identity().also_to(sink))
1610    }
1611
1612    #[must_use]
1613    pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
1614    where
1615        Out: Clone,
1616        SinkMat: Send + 'static,
1617        I: IntoIterator<Item = Sink<Out, SinkMat>>,
1618    {
1619        self.via(Flow::identity().also_to_all(sinks))
1620    }
1621
1622    #[must_use]
1623    pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
1624    where
1625        SinkMat: Send + 'static,
1626        F: Fn(&Out) -> bool + Send + Sync + 'static,
1627    {
1628        self.via(Flow::identity().divert_to(sink, predicate))
1629    }
1630
1631    #[must_use]
1632    pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1633    where
1634        Out: Clone,
1635        SinkMat: Send + 'static,
1636    {
1637        self.via(Flow::identity().wire_tap(sink))
1638    }
1639
1640    pub fn run_with<SinkMat: Send + 'static>(
1641        self,
1642        sink: Sink<Out, SinkMat>,
1643    ) -> StreamResult<SinkMat> {
1644        // Split-segment fast path: if source has a segment hook and sink has a
1645        // fold fast-path descriptor, drive the fold inline without spawning a task.
1646        let fast_result = self
1647            .split_hook
1648            .as_ref()
1649            .zip(sink.fold_fp.as_deref())
1650            .and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
1651        if let Some(result) = fast_result {
1652            return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
1653                StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
1654            });
1655        }
1656        self.to_mat(sink, Keep::right).run()
1657    }
1658
1659    pub fn run_with_materializer<SinkMat: Send + 'static>(
1660        self,
1661        sink: Sink<Out, SinkMat>,
1662        materializer: &Materializer,
1663    ) -> StreamResult<SinkMat> {
1664        self.to_mat(sink, Keep::right)
1665            .run_with_materializer(materializer)
1666    }
1667
1668    #[must_use]
1669    pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
1670    where
1671        SinkMat: Send + 'static,
1672    {
1673        self.to_mat(sink, Keep::left)
1674    }
1675
1676    #[must_use]
1677    pub fn to_mat<SinkMat, Combined, F>(
1678        self,
1679        sink: Sink<Out, SinkMat>,
1680        combine: F,
1681    ) -> RunnableGraph<Combined>
1682    where
1683        SinkMat: Send + 'static,
1684        Combined: Send + 'static,
1685        F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
1686    {
1687        let factory = self.factory;
1688        let hints = self.hints;
1689        let combine = Arc::new(combine);
1690        RunnableGraph::from_runner(move |materializer| {
1691            let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1692            let stream =
1693                runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
1694            let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
1695                sink.run_inline(stream, materializer)?
1696            } else {
1697                sink.run(stream, materializer)?
1698            };
1699            Ok(combine(source_mat, sink_mat))
1700        })
1701    }
1702
1703    pub fn run_collect(self) -> StreamResult<Vec<Out>> {
1704        self.run_with(Sink::collect())?.wait()
1705    }
1706
1707    #[must_use]
1708    pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
1709    where
1710        NextMat: Send + 'static,
1711        F: Fn(Mat) -> NextMat + Send + Sync + 'static,
1712    {
1713        let factory = self.factory;
1714        let hints = self.hints;
1715        let f = Arc::new(f);
1716        Source::from_materialized_factory_with_hints(
1717            move |materializer| {
1718                let (stream, mat) = Arc::clone(&factory).create(materializer)?;
1719                Ok((stream, f(mat)))
1720            },
1721            hints,
1722        )
1723    }
1724}
1725
1726impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
1727    #[must_use]
1728    pub fn combine<Mat1, Mat2, MatRest, I>(
1729        first: Source<Out, Mat1>,
1730        second: Source<Out, Mat2>,
1731        rest: I,
1732        strategy: SourceCombineStrategy,
1733    ) -> Source<Out, NotUsed>
1734    where
1735        Mat1: Send + 'static,
1736        Mat2: Send + 'static,
1737        MatRest: Send + 'static,
1738        I: IntoIterator<Item = Source<Out, MatRest>>,
1739    {
1740        let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
1741            Arc::new(move |materializer| {
1742                Arc::clone(&first.factory)
1743                    .create(materializer)
1744                    .map(|(stream, _)| stream)
1745            }),
1746            Arc::new(move |materializer| {
1747                Arc::clone(&second.factory)
1748                    .create(materializer)
1749                    .map(|(stream, _)| stream)
1750            }),
1751        ];
1752        factories.extend(rest.into_iter().map(|source| {
1753            Arc::new(move |materializer: &Materializer| {
1754                Arc::clone(&source.factory)
1755                    .create(materializer)
1756                    .map(|(stream, _)| stream)
1757            }) as Arc<CombinedSourceFactory<Out>>
1758        }));
1759        Source::from_materialized_factory(move |materializer| {
1760            let mut streams = Vec::with_capacity(factories.len());
1761            for factory in &factories {
1762                let stream = match factory(materializer) {
1763                    Ok(stream) => stream,
1764                    Err(error) => {
1765                        return Ok((
1766                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1767                            NotUsed,
1768                        ));
1769                    }
1770                };
1771                streams.push(stream);
1772            }
1773            let stream = match &strategy {
1774                SourceCombineStrategy::Merge { eager_complete } => {
1775                    merge_streams(streams, *eager_complete)
1776                }
1777                SourceCombineStrategy::Concat => concat_source_streams(streams),
1778                SourceCombineStrategy::Prioritized {
1779                    priorities,
1780                    eager_complete,
1781                } => {
1782                    if priorities.len() != streams.len() {
1783                        return Err(StreamError::GraphValidation(format!(
1784                            "combine priorities length {} did not match source count {}",
1785                            priorities.len(),
1786                            streams.len()
1787                        )));
1788                    }
1789                    merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
1790                }
1791            };
1792            Ok((stream, NotUsed))
1793        })
1794    }
1795
1796    #[must_use]
1797    pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
1798    where
1799        I: IntoIterator<Item = Source<Out, Mat2>>,
1800        Mat2: Send + 'static,
1801        Out: Clone,
1802    {
1803        Self::zip_with_n(sources, |values| values)
1804    }
1805
1806    #[must_use]
1807    pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
1808    where
1809        I: IntoIterator<Item = Source<Out, Mat2>>,
1810        Mat2: Send + 'static,
1811        Next: Send + 'static,
1812        F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
1813    {
1814        let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
1815        let zipper = Arc::new(zipper);
1816        Source::from_materialized_factory(move |materializer| {
1817            let mut streams = Vec::with_capacity(factories.len());
1818            for factory in &factories {
1819                let stream = match Arc::clone(factory).create(materializer) {
1820                    Ok((stream, _)) => stream,
1821                    Err(error) => {
1822                        return Ok((
1823                            Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1824                            NotUsed,
1825                        ));
1826                    }
1827                };
1828                streams.push(stream);
1829            }
1830            Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
1831        })
1832    }
1833
1834    #[must_use]
1835    pub fn merge_prioritized_n<Mat2, I>(
1836        sources_and_priorities: I,
1837        eager_complete: bool,
1838    ) -> Source<Out, NotUsed>
1839    where
1840        I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
1841        Mat2: Send + 'static,
1842    {
1843        let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
1844        if sources_and_priorities.is_empty() {
1845            return Source::empty();
1846        }
1847        let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
1848            .into_iter()
1849            .map(|(source, priority)| (source.factory, priority))
1850            .unzip();
1851        Source::from_materialized_factory(move |materializer| {
1852            let mut streams = Vec::with_capacity(factories.len());
1853            for factory in &factories {
1854                let stream = match Arc::clone(factory).create(materializer) {
1855                    Ok((stream, _)) => stream,
1856                    Err(error) => {
1857                        return Ok((
1858                            Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1859                            NotUsed,
1860                        ));
1861                    }
1862                };
1863                streams.push(stream);
1864            }
1865            Ok((
1866                merge_prioritized_streams(streams, priorities.clone(), eager_complete),
1867                NotUsed,
1868            ))
1869        })
1870    }
1871
1872    #[must_use]
1873    pub fn maybe() -> (MaybeHandle<Out>, Self) {
1874        let value = Arc::new(Mutex::new(None));
1875        let handle = MaybeHandle {
1876            value: Arc::clone(&value),
1877        };
1878        let source = Self::from_factory(move || {
1879            let result = value
1880                .lock()
1881                .expect("maybe source poisoned")
1882                .clone()
1883                .unwrap_or(Err(StreamError::MaybeIncomplete));
1884            Box::new(std::iter::once(result))
1885        });
1886        (handle, source)
1887    }
1888
1889    #[must_use]
1890    pub fn single(item: Out) -> Self {
1891        Self::from_factory_with_hints(
1892            move || Box::new(std::iter::once(Ok(item.clone()))),
1893            SourceHints::with_inline_micro(1),
1894        )
1895    }
1896
1897    #[must_use]
1898    pub fn repeat(item: Out) -> Self {
1899        Self::from_factory(move || {
1900            let item = item.clone();
1901            Box::new(std::iter::repeat_with(move || Ok(item.clone())))
1902        })
1903    }
1904
1905    #[must_use]
1906    pub fn from_iterable<I>(items: I) -> Self
1907    where
1908        I: IntoIterator<Item = Out>,
1909    {
1910        items.into_iter().collect()
1911    }
1912}
1913
1914impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
1915    fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
1916        // Share the backing storage across materializations and clone elements
1917        // lazily on demand, instead of cloning the whole container each run.
1918        let items: Arc<[Out]> = iter.into_iter().collect();
1919        let len = items.len();
1920        Self::from_factory_with_hints(
1921            move || {
1922                let items = Arc::clone(&items);
1923                let mut index = 0;
1924                Box::new(std::iter::from_fn(move || {
1925                    let item = items.get(index)?.clone();
1926                    index += 1;
1927                    Some(Ok(item))
1928                }))
1929            },
1930            // Exact success item count: enables the coordinator inline drain path.
1931            SourceHints::with_inline_micro(len),
1932        )
1933    }
1934}
1935
1936/// Test-only helper: create a Source whose factory calls the given closure but
1937/// marks it as inline-micro eligible. Used to test the inline drain path with
1938/// custom (possibly blocking) iterators without broadening production
1939/// eligibility.
1940#[cfg(test)]
1941pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
1942    factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
1943    max_success_items: usize,
1944) -> Source<Out, NotUsed> {
1945    Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
1946}
1947
1948struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
1949where
1950    Create: Fn() -> StreamResult<Resource>,
1951    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1952    Close: Fn(Resource) -> StreamResult<()>,
1953{
1954    create: Arc<Create>,
1955    read: Arc<Read>,
1956    close: Arc<Close>,
1957    resource: Option<Resource>,
1958    created: bool,
1959    terminated: bool,
1960    _marker: PhantomData<fn() -> Out>,
1961}
1962
1963impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
1964where
1965    Create: Fn() -> StreamResult<Resource>,
1966    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1967    Close: Fn(Resource) -> StreamResult<()>,
1968{
1969    fn ensure_created(&mut self) -> StreamResult<()> {
1970        if self.created {
1971            return Ok(());
1972        }
1973        self.created = true;
1974        let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
1975            .and_then(|result| result)?;
1976        self.resource = Some(resource);
1977        Ok(())
1978    }
1979
1980    fn close_resource(&mut self) -> StreamResult<()> {
1981        match self.resource.take() {
1982            Some(resource) => {
1983                catch_unwind_failed("unfold_resource close", || (self.close)(resource))
1984                    .and_then(|result| result)
1985            }
1986            None => Ok(()),
1987        }
1988    }
1989}
1990
1991impl<Resource, Out, Create, Read, Close> Iterator
1992    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
1993where
1994    Create: Fn() -> StreamResult<Resource>,
1995    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1996    Close: Fn(Resource) -> StreamResult<()>,
1997{
1998    type Item = StreamResult<Out>;
1999
2000    fn next(&mut self) -> Option<Self::Item> {
2001        if self.terminated {
2002            return None;
2003        }
2004        if let Err(error) = self.ensure_created() {
2005            self.terminated = true;
2006            return Some(Err(error));
2007        }
2008
2009        let result = {
2010            let resource = self
2011                .resource
2012                .as_mut()
2013                .expect("unfold_resource resource is open");
2014            catch_unwind_failed("unfold_resource read", || (self.read)(resource))
2015                .and_then(|result| result)
2016        };
2017
2018        match result {
2019            Ok(Some(item)) => Some(Ok(item)),
2020            Ok(None) => {
2021                self.terminated = true;
2022                match self.close_resource() {
2023                    Ok(()) => None,
2024                    Err(error) => Some(Err(error)),
2025                }
2026            }
2027            Err(read_error) => {
2028                self.terminated = true;
2029                let _ = self.close_resource();
2030                Some(Err(read_error))
2031            }
2032        }
2033    }
2034}
2035
2036impl<Resource, Out, Create, Read, Close> Drop
2037    for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2038where
2039    Create: Fn() -> StreamResult<Resource>,
2040    Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2041    Close: Fn(Resource) -> StreamResult<()>,
2042{
2043    fn drop(&mut self) {
2044        let _ = self.close_resource();
2045    }
2046}
2047
2048type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
2049    fn() -> (Out, CreateFut, ReadFut, CloseFut);
2050
2051struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2052where
2053    Resource: Send + 'static,
2054    Create: Fn() -> CreateFut,
2055    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2056    Read: Fn(&mut Resource) -> ReadFut,
2057    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2058    Close: Fn(Resource) -> CloseFut,
2059    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2060{
2061    create: Arc<Create>,
2062    read: Arc<Read>,
2063    close: Arc<Close>,
2064    resource: Option<Resource>,
2065    created: bool,
2066    terminated: bool,
2067    _marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
2068}
2069
2070impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2071    UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2072where
2073    Resource: Send + 'static,
2074    Create: Fn() -> CreateFut,
2075    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2076    Read: Fn(&mut Resource) -> ReadFut,
2077    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2078    Close: Fn(Resource) -> CloseFut,
2079    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2080{
2081    fn ensure_created(&mut self) -> StreamResult<()> {
2082        if self.created {
2083            return Ok(());
2084        }
2085        self.created = true;
2086        let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
2087            .and_then(flow::run_future_inline_or_spawn)?;
2088        self.resource = Some(resource);
2089        Ok(())
2090    }
2091
2092    fn close_resource(&mut self) -> StreamResult<()> {
2093        match self.resource.take() {
2094            Some(resource) => {
2095                catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
2096                    .and_then(flow::run_future_inline_or_spawn)
2097            }
2098            None => Ok(()),
2099        }
2100    }
2101}
2102
2103impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
2104    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2105where
2106    Resource: Send + 'static,
2107    Out: Send + 'static,
2108    Create: Fn() -> CreateFut,
2109    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2110    Read: Fn(&mut Resource) -> ReadFut,
2111    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2112    Close: Fn(Resource) -> CloseFut,
2113    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2114{
2115    type Item = StreamResult<Out>;
2116
2117    fn next(&mut self) -> Option<Self::Item> {
2118        if self.terminated {
2119            return None;
2120        }
2121        if let Err(error) = self.ensure_created() {
2122            self.terminated = true;
2123            return Some(Err(error));
2124        }
2125
2126        let result = {
2127            let resource = self
2128                .resource
2129                .as_mut()
2130                .expect("unfold_resource_async resource is open");
2131            catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
2132                .and_then(flow::run_future_inline_or_spawn)
2133        };
2134
2135        match result {
2136            Ok(Some(item)) => Some(Ok(item)),
2137            Ok(None) => {
2138                self.terminated = true;
2139                match self.close_resource() {
2140                    Ok(()) => None,
2141                    Err(error) => Some(Err(error)),
2142                }
2143            }
2144            Err(read_error) => {
2145                self.terminated = true;
2146                let _ = self.close_resource();
2147                Some(Err(read_error))
2148            }
2149        }
2150    }
2151}
2152
2153impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
2154    for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2155where
2156    Resource: Send + 'static,
2157    Create: Fn() -> CreateFut,
2158    CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2159    Read: Fn(&mut Resource) -> ReadFut,
2160    ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2161    Close: Fn(Resource) -> CloseFut,
2162    CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2163{
2164    fn drop(&mut self) {
2165        let _ = self.close_resource();
2166    }
2167}
2168
2169struct LazySourceStream<Out, InnerMat, F> {
2170    create: Arc<F>,
2171    materializer: Materializer,
2172    current: Option<BoxStream<Out>>,
2173    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2174    initialized: bool,
2175    terminated: bool,
2176}
2177
2178impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
2179where
2180    Out: Send + 'static,
2181    InnerMat: Send + 'static,
2182    F: Fn() -> Source<Out, InnerMat>,
2183{
2184    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2185        if let Some(sender) = self.mat_sender.take() {
2186            let _ = sender.send(result);
2187        }
2188    }
2189
2190    fn initialize(&mut self) -> StreamResult<()> {
2191        if self.initialized {
2192            return Ok(());
2193        }
2194        self.initialized = true;
2195        let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
2196            Ok(source) => source,
2197            Err(error) => {
2198                self.complete_mat(Err(error.clone()));
2199                return Err(error);
2200            }
2201        };
2202        match Arc::clone(&source.factory).create(&self.materializer) {
2203            Ok((stream, mat)) => {
2204                self.current = Some(stream);
2205                self.complete_mat(Ok(mat));
2206                Ok(())
2207            }
2208            Err(error) => {
2209                self.complete_mat(Err(error.clone()));
2210                Err(error)
2211            }
2212        }
2213    }
2214}
2215
2216impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
2217where
2218    Out: Send + 'static,
2219    InnerMat: Send + 'static,
2220    F: Fn() -> Source<Out, InnerMat>,
2221{
2222    type Item = StreamResult<Out>;
2223
2224    fn next(&mut self) -> Option<Self::Item> {
2225        if self.terminated {
2226            return None;
2227        }
2228        if let Err(error) = self.initialize() {
2229            self.terminated = true;
2230            return Some(Err(error));
2231        }
2232        match self
2233            .current
2234            .as_mut()
2235            .expect("lazy_source current stream initialized")
2236            .next()
2237        {
2238            Some(Ok(item)) => Some(Ok(item)),
2239            Some(Err(error)) => {
2240                self.terminated = true;
2241                Some(Err(error))
2242            }
2243            None => {
2244                self.terminated = true;
2245                None
2246            }
2247        }
2248    }
2249}
2250
2251impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
2252    fn drop(&mut self) {
2253        if !self.initialized
2254            && let Some(sender) = self.mat_sender.take()
2255        {
2256            let _ = sender.send(Err(StreamError::Failed(
2257                "lazy source was never materialized".into(),
2258            )));
2259        }
2260    }
2261}
2262
2263struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2264    create: Arc<F>,
2265    materializer: Materializer,
2266    current: Option<BoxStream<Out>>,
2267    mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2268    initialized: bool,
2269    terminated: bool,
2270    _marker: PhantomData<fn() -> Fut>,
2271}
2272
2273impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
2274where
2275    Out: Send + 'static,
2276    InnerMat: Send + 'static,
2277    F: Fn() -> Fut,
2278    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2279{
2280    fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2281        if let Some(sender) = self.mat_sender.take() {
2282            let _ = sender.send(result);
2283        }
2284    }
2285
2286    fn initialize(&mut self) -> StreamResult<()> {
2287        if self.initialized {
2288            return Ok(());
2289        }
2290        self.initialized = true;
2291        let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
2292            .and_then(flow::run_future_inline_or_spawn)
2293        {
2294            Ok(source) => source,
2295            Err(error) => {
2296                self.complete_mat(Err(error.clone()));
2297                return Err(error);
2298            }
2299        };
2300        match Arc::clone(&source.factory).create(&self.materializer) {
2301            Ok((stream, mat)) => {
2302                self.current = Some(stream);
2303                self.complete_mat(Ok(mat));
2304                Ok(())
2305            }
2306            Err(error) => {
2307                self.complete_mat(Err(error.clone()));
2308                Err(error)
2309            }
2310        }
2311    }
2312}
2313
2314impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
2315where
2316    Out: Send + 'static,
2317    InnerMat: Send + 'static,
2318    F: Fn() -> Fut,
2319    Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2320{
2321    type Item = StreamResult<Out>;
2322
2323    fn next(&mut self) -> Option<Self::Item> {
2324        if self.terminated {
2325            return None;
2326        }
2327        if let Err(error) = self.initialize() {
2328            self.terminated = true;
2329            return Some(Err(error));
2330        }
2331        match self
2332            .current
2333            .as_mut()
2334            .expect("lazy_future_source current stream initialized")
2335            .next()
2336        {
2337            Some(Ok(item)) => Some(Ok(item)),
2338            Some(Err(error)) => {
2339                self.terminated = true;
2340                Some(Err(error))
2341            }
2342            None => {
2343                self.terminated = true;
2344                None
2345            }
2346        }
2347    }
2348}
2349
2350impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2351    fn drop(&mut self) {
2352        if !self.initialized
2353            && let Some(sender) = self.mat_sender.take()
2354        {
2355            let _ = sender.send(Err(StreamError::Failed(
2356                "lazy future source was never materialized".into(),
2357            )));
2358        }
2359    }
2360}
2361
2362fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
2363where
2364    Out: Send + 'static,
2365{
2366    let mut streams: VecDeque<_> = streams.into();
2367    let mut current = streams.pop_front();
2368    Box::new(std::iter::from_fn(move || {
2369        loop {
2370            match current.as_mut() {
2371                Some(stream) => match stream.next() {
2372                    Some(item) => return Some(item),
2373                    None => current = streams.pop_front(),
2374                },
2375                None => return None,
2376            }
2377        }
2378    }))
2379}
2380
2381fn concat_source_streams_lazy<Out, Mat>(
2382    initial: BoxStream<Out>,
2383    factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
2384    materializer: &Materializer,
2385) -> BoxStream<Out>
2386where
2387    Out: Send + 'static,
2388    Mat: Send + 'static,
2389{
2390    let mut current = Some(initial);
2391    let mut remaining: VecDeque<_> = factories.into();
2392    let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
2393    Box::new(std::iter::from_fn(move || {
2394        loop {
2395            match current.as_mut() {
2396                Some(stream) => match stream.next() {
2397                    Some(item) => return Some(item),
2398                    None => {
2399                        current = remaining.pop_front().map(|factory| {
2400                            match factory.create(&materializer) {
2401                                Ok((stream, _)) => stream,
2402                                Err(error) => {
2403                                    Box::new(std::iter::once(Err(error))) as BoxStream<Out>
2404                                }
2405                            }
2406                        });
2407                    }
2408                },
2409                None => return None,
2410            }
2411        }
2412    }))
2413}
2414
2415fn or_else_source_stream<Out>(
2416    mut primary: BoxStream<Out>,
2417    mut secondary: BoxStream<Out>,
2418) -> BoxStream<Out>
2419where
2420    Out: Send + 'static,
2421{
2422    let mut primary_emitted = false;
2423    let mut using_secondary = false;
2424    Box::new(std::iter::from_fn(move || {
2425        loop {
2426            if using_secondary {
2427                return secondary.next();
2428            }
2429
2430            match primary.next() {
2431                Some(Ok(item)) => {
2432                    primary_emitted = true;
2433                    return Some(Ok(item));
2434                }
2435                Some(Err(error)) => return Some(Err(error)),
2436                None if primary_emitted => return None,
2437                None => using_secondary = true,
2438            }
2439        }
2440    }))
2441}
2442
2443fn interleave_source_streams<Out>(
2444    streams: Vec<BoxStream<Out>>,
2445    segment_size: usize,
2446    eager_close: bool,
2447) -> BoxStream<Out>
2448where
2449    Out: Send + 'static,
2450{
2451    if segment_size == 0 {
2452        return Box::new(std::iter::once(Err(StreamError::GraphValidation(
2453            "interleave segment size must be greater than zero".into(),
2454        ))));
2455    }
2456
2457    let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
2458    let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
2459    let mut current = 0usize;
2460    let mut emitted = 0usize;
2461    Box::new(std::iter::from_fn(move || {
2462        loop {
2463            if streams.iter().all(Option::is_none) {
2464                return None;
2465            }
2466            if streams[current].is_none() {
2467                match next_active_source_stream(&streams, current) {
2468                    Some(next) => {
2469                        current = next;
2470                        emitted = 0;
2471                    }
2472                    None => return None,
2473                }
2474            }
2475
2476            let Some(stream) = streams[current].as_mut() else {
2477                continue;
2478            };
2479            let next_item = pending[current].take().or_else(|| stream.next());
2480            match next_item {
2481                Some(Ok(item)) => {
2482                    emitted += 1;
2483                    if emitted == segment_size {
2484                        emitted = 0;
2485                        if let Some(next) = next_active_source_stream(&streams, current) {
2486                            current = next;
2487                        }
2488                    }
2489                    return Some(Ok(item));
2490                }
2491                Some(Err(error)) => return Some(Err(error)),
2492                None => {
2493                    streams[current] = None;
2494                    emitted = 0;
2495                    if eager_close {
2496                        return None;
2497                    }
2498                    match next_active_source_stream(&streams, current) {
2499                        Some(next) => current = next,
2500                        None => return None,
2501                    }
2502                }
2503            }
2504        }
2505    }))
2506}
2507
2508fn next_active_source_stream<Out>(
2509    streams: &[Option<BoxStream<Out>>],
2510    current: usize,
2511) -> Option<usize>
2512where
2513    Out: Send + 'static,
2514{
2515    if streams.is_empty() {
2516        return None;
2517    }
2518    for offset in 1..=streams.len() {
2519        let index = (current + offset) % streams.len();
2520        if streams[index].is_some() {
2521            return Some(index);
2522        }
2523    }
2524    None
2525}