1use super::flow::{self, FlowTransform};
2use super::*;
3use crate::Attributes;
4use crate::context::SourceWithContext;
5
6#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
7pub struct NotUsed;
8
9type CombinedSourceFactory<Out> =
10 dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
11
12pub struct Keep;
13
14impl Keep {
15 pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
16 left
17 }
18
19 pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
20 right
21 }
22
23 pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
24 (left, right)
25 }
26
27 pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
28 NotUsed
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum SourceCombineStrategy {
34 Merge {
35 eager_complete: bool,
36 },
37 Concat,
38 Prioritized {
39 priorities: Vec<usize>,
40 eager_complete: bool,
41 },
42}
43
44#[derive(Clone)]
45pub struct MaybeHandle<T> {
46 value: Arc<Mutex<Option<StreamResult<T>>>>,
47}
48
49impl<T> MaybeHandle<T> {
50 #[must_use]
51 pub fn is_completed(&self) -> bool {
52 self.value.lock().expect("maybe handle poisoned").is_some()
53 }
54
55 pub fn complete(&self, item: T) -> StreamResult<()> {
56 self.settle(Ok(item))
57 }
58
59 pub fn fail(&self, error: StreamError) -> StreamResult<()> {
60 self.settle(Err(error))
61 }
62
63 fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
64 let mut value = self.value.lock().expect("maybe handle poisoned");
65 if value.is_some() {
66 return Err(StreamError::Failed("maybe source already completed".into()));
67 }
68 *value = Some(result);
69 Ok(())
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub struct Demand(u64);
75
76impl Demand {
77 pub const ZERO: Self = Self(0);
78 pub const ONE: Self = Self(1);
79 pub const MAX: Self = Self(u64::MAX);
80
81 #[must_use]
82 pub const fn new(available: u64) -> Self {
83 Self(available)
84 }
85
86 #[must_use]
87 pub const fn available(self) -> u64 {
88 self.0
89 }
90
91 #[must_use]
92 pub const fn is_unbounded(self) -> bool {
93 self.0 == u64::MAX
94 }
95
96 #[must_use]
97 pub const fn is_empty(self) -> bool {
98 self.0 == 0
99 }
100
101 #[must_use]
102 pub const fn saturating_add(self, rhs: Self) -> Self {
103 Self(self.0.saturating_add(rhs.0))
104 }
105
106 pub fn consume_one(&mut self) -> bool {
107 match self.0 {
108 0 => false,
109 u64::MAX => true,
110 _ => {
111 self.0 -= 1;
112 true
113 }
114 }
115 }
116}
117
118pub trait PushOutlet<T>: Send {
119 fn push(&mut self, item: T) -> StreamResult<Demand>;
120
121 fn complete(&mut self) -> StreamResult<()> {
122 Ok(())
123 }
124
125 fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
126 Err(cause)
127 }
128}
129
130#[derive(Clone)]
131pub struct Source<Out, Mat = NotUsed> {
132 pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
133 pub(super) hints: SourceHints,
134 pub(super) attributes: Attributes,
135 pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
138}
139
140impl<Out: Send + 'static> Source<Out, NotUsed> {
141 pub(super) fn from_factory<F>(factory: F) -> Self
142 where
143 F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
144 {
145 Self::from_factory_with_hints(factory, SourceHints::default())
146 }
147
148 fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
149 where
150 F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
151 {
152 Self::from_materialized_factory_with_hints(
153 move |_materializer| Ok((factory(), NotUsed)),
154 hints,
155 )
156 }
157
158 #[must_use]
159 pub fn empty() -> Self {
160 Self::from_factory_with_hints(
161 || Box::new(std::iter::empty()),
162 SourceHints::with_inline_micro(0),
163 )
164 }
165
166 #[must_use]
167 pub fn never() -> Self {
168 Self::from_materialized_factory_with_hints(
169 move |materializer| {
170 let state = Arc::clone(&materializer.inner.state);
171 Ok((
172 Box::new(std::iter::from_fn(move || {
173 loop {
174 if state.shutdown.load(Ordering::SeqCst) {
175 return Some(Err(StreamError::AbruptTermination));
176 }
177 if super::runtime::current_stream_cancelled()
178 .as_ref()
179 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
180 {
181 return Some(Err(StreamError::Cancelled));
182 }
183 std::thread::park_timeout(Duration::from_millis(1));
184 }
185 })),
186 NotUsed,
187 ))
188 },
189 SourceHints::default(),
190 )
191 }
192
193 #[must_use]
194 pub fn failed(error: StreamError) -> Self {
195 Self::from_factory_with_hints(
196 move || Box::new(std::iter::once(Err(error.clone()))),
197 SourceHints::with_inline_micro(0),
199 )
200 }
201
202 #[must_use]
203 pub fn future<F, Fut>(future: F) -> Self
204 where
205 F: Fn() -> Fut + Send + Sync + 'static,
206 Fut: Future<Output = StreamResult<Out>> + Send + 'static,
207 {
208 let future = Arc::new(future);
209 Self::from_factory(move || {
210 let future = Arc::clone(&future);
211 let mut emitted = false;
212 Box::new(std::iter::from_fn(move || {
213 if emitted {
214 return None;
215 }
216 emitted = true;
217 Some(
218 catch_unwind_failed("source future factory", || future())
219 .and_then(flow::run_future_inline_or_spawn),
220 )
221 }))
222 })
223 }
224
225 #[must_use]
226 pub fn future_source<F, Fut>(future: F) -> Self
227 where
228 F: Fn() -> Fut + Send + Sync + 'static,
229 Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
230 {
231 let future = Arc::new(future);
232 Self::from_materialized_factory(move |materializer| {
233 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
234 let future = Arc::clone(&future);
235 let mut current = None::<BoxStream<Out>>;
236 let mut initialized = false;
237 let mut terminated = false;
238 Ok((
239 Box::new(std::iter::from_fn(move || {
240 if terminated {
241 return None;
242 }
243
244 loop {
245 if let Some(stream) = current.as_mut() {
246 match stream.next() {
247 Some(item) => return Some(item),
248 None => {
249 terminated = true;
250 return None;
251 }
252 }
253 }
254
255 if initialized {
256 terminated = true;
257 return None;
258 }
259 initialized = true;
260 let source = match catch_unwind_failed("future_source factory", || future())
261 .and_then(flow::run_future_inline_or_spawn)
262 {
263 Ok(source) => source,
264 Err(error) => {
265 terminated = true;
266 return Some(Err(error));
267 }
268 };
269 current = Some(match Arc::clone(&source.factory).create(&materializer) {
270 Ok((stream, _)) => stream,
271 Err(error) => {
272 terminated = true;
273 return Some(Err(error));
274 }
275 });
276 }
277 })) as BoxStream<Out>,
278 NotUsed,
279 ))
280 })
281 }
282
283 #[must_use]
284 pub fn cycle<F, I>(factory: F) -> Self
285 where
286 F: Fn() -> I + Send + Sync + 'static,
287 I: IntoIterator<Item = Out>,
288 I::IntoIter: Send + 'static,
289 {
290 let factory = Arc::new(factory);
291 Self::from_factory(move || {
292 let factory = Arc::clone(&factory);
293 let mut current = None::<I::IntoIter>;
294 let mut terminated = false;
295 Box::new(std::iter::from_fn(move || {
296 if terminated {
297 return None;
298 }
299
300 if let Some(iter) = current.as_mut()
301 && let Some(item) = iter.next()
302 {
303 return Some(Ok(item));
304 }
305
306 let mut next = match catch_unwind_failed("cycle factory", || factory()) {
307 Ok(iterable) => iterable.into_iter(),
308 Err(error) => {
309 terminated = true;
310 return Some(Err(error));
311 }
312 };
313 match next.next() {
314 Some(item) => {
315 current = Some(next);
316 Some(Ok(item))
317 }
318 None => {
319 terminated = true;
320 Some(Err(StreamError::Failed("empty iterator".into())))
321 }
322 }
323 }))
324 })
325 }
326
327 #[must_use]
328 pub fn unfold<State, F>(initial: State, f: F) -> Self
329 where
330 State: Clone + Send + Sync + 'static,
331 F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
332 {
333 let f = Arc::new(f);
334 Self::from_factory(move || {
335 let f = Arc::clone(&f);
336 let mut state = Some(initial.clone());
337 let mut terminated = false;
338 Box::new(std::iter::from_fn(move || {
339 if terminated {
340 return None;
341 }
342 let current = state.take().expect("unfold state present");
343 match catch_unwind_failed("unfold function", || f(current)) {
344 Ok(Some((next_state, item))) => {
345 state = Some(next_state);
346 Some(Ok(item))
347 }
348 Ok(None) => {
349 terminated = true;
350 None
351 }
352 Err(error) => {
353 terminated = true;
354 Some(Err(error))
355 }
356 }
357 }))
358 })
359 }
360
361 #[must_use]
362 pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
363 where
364 State: Clone + Send + Sync + 'static,
365 F: Fn(State) -> Fut + Send + Sync + 'static,
366 Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
367 {
368 let f = Arc::new(f);
369 Self::from_factory(move || {
370 let f = Arc::clone(&f);
371 let mut state = Some(initial.clone());
372 let mut terminated = false;
373 Box::new(std::iter::from_fn(move || {
374 if terminated {
375 return None;
376 }
377 let current = state.take().expect("unfold_async state present");
378 match catch_unwind_failed("unfold_async factory", || f(current))
379 .and_then(flow::run_future_inline_or_spawn)
380 {
381 Ok(Some((next_state, item))) => {
382 state = Some(next_state);
383 Some(Ok(item))
384 }
385 Ok(None) => {
386 terminated = true;
387 None
388 }
389 Err(error) => {
390 terminated = true;
391 Some(Err(error))
392 }
393 }
394 }))
395 })
396 }
397
398 #[must_use]
399 pub fn unfold_resource<Resource, Create, Read, Close>(
400 create: Create,
401 read: Read,
402 close: Close,
403 ) -> Self
404 where
405 Resource: Send + 'static,
406 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
407 Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
408 Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
409 {
410 let create = Arc::new(create);
411 let read = Arc::new(read);
412 let close = Arc::new(close);
413 Self::from_factory(move || {
414 Box::new(UnfoldResourceStream {
415 create: Arc::clone(&create),
416 read: Arc::clone(&read),
417 close: Arc::clone(&close),
418 resource: None,
419 created: false,
420 terminated: false,
421 _marker: PhantomData,
422 })
423 })
424 }
425
426 #[must_use]
427 pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
428 create: Create,
429 read: Read,
430 close: Close,
431 ) -> Self
432 where
433 Resource: Send + 'static,
434 Create: Fn() -> CreateFut + Send + Sync + 'static,
435 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
436 Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
437 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
438 Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
439 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
440 {
441 let create = Arc::new(create);
442 let read = Arc::new(read);
443 let close = Arc::new(close);
444 Self::from_factory(move || {
445 Box::new(UnfoldResourceAsyncStream {
446 create: Arc::clone(&create),
447 read: Arc::clone(&read),
448 close: Arc::clone(&close),
449 resource: None,
450 created: false,
451 terminated: false,
452 _marker: PhantomData,
453 })
454 })
455 }
456
457 #[must_use]
458 pub fn lazy_single<F>(create: F) -> Self
459 where
460 F: Fn() -> Out + Send + Sync + 'static,
461 {
462 let create = Arc::new(create);
463 Self::from_factory(move || {
464 let create = Arc::clone(&create);
465 let mut emitted = false;
466 Box::new(std::iter::from_fn(move || {
467 if emitted {
468 return None;
469 }
470 emitted = true;
471 Some(catch_unwind_failed("lazy_single factory", || create()))
472 }))
473 })
474 }
475
476 #[must_use]
477 pub fn lazy_future<F, Fut>(create: F) -> Self
478 where
479 F: Fn() -> Fut + Send + Sync + 'static,
480 Fut: Future<Output = StreamResult<Out>> + Send + 'static,
481 {
482 let create = Arc::new(create);
483 Self::from_factory(move || {
484 let create = Arc::clone(&create);
485 let mut emitted = false;
486 Box::new(std::iter::from_fn(move || {
487 if emitted {
488 return None;
489 }
490 emitted = true;
491 Some(
492 catch_unwind_failed("lazy_future factory", || create())
493 .and_then(flow::run_future_inline_or_spawn),
494 )
495 }))
496 })
497 }
498
499 #[must_use]
500 pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
501 where
502 InnerMat: Send + 'static,
503 F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
504 {
505 let create = Arc::new(create);
506 Source::from_materialized_factory(move |materializer| {
507 let (sender, receiver) = oneshot::channel();
508 Ok((
509 Box::new(LazySourceStream {
510 create: Arc::clone(&create),
511 materializer: materializer
512 .with_name_prefix(materializer.name_prefix().to_owned()),
513 current: None,
514 mat_sender: Some(sender),
515 initialized: false,
516 terminated: false,
517 }) as BoxStream<Out>,
518 StreamCompletion::from_receiver(receiver, None),
519 ))
520 })
521 }
522
523 #[must_use]
524 pub fn lazy_future_source<InnerMat, F, Fut>(
525 create: F,
526 ) -> Source<Out, StreamCompletion<InnerMat>>
527 where
528 InnerMat: Send + 'static,
529 F: Fn() -> Fut + Send + Sync + 'static,
530 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
531 {
532 let create = Arc::new(create);
533 Source::from_materialized_factory(move |materializer| {
534 let (sender, receiver) = oneshot::channel();
535 Ok((
536 Box::new(LazyFutureSourceStream {
537 create: Arc::clone(&create),
538 materializer: materializer
539 .with_name_prefix(materializer.name_prefix().to_owned()),
540 current: None,
541 mat_sender: Some(sender),
542 initialized: false,
543 terminated: false,
544 _marker: PhantomData,
545 }) as BoxStream<Out>,
546 StreamCompletion::from_receiver(receiver, None),
547 ))
548 })
549 }
550
551 #[must_use]
552 pub fn from_fn_iter<F, I>(factory: F) -> Self
553 where
554 F: Fn() -> I + Send + Sync + 'static,
555 I: IntoIterator<Item = Out>,
556 I::IntoIter: Send + 'static,
557 {
558 Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
559 }
560}
561
562impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
563 fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
564 where
565 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
566 {
567 Self {
568 factory: Arc::new(FnSourceFactory(factory)),
569 hints,
570 attributes: Attributes::default(),
571 split_hook: None,
572 }
573 }
574
575 pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
576 where
577 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
578 {
579 Self::from_materialized_factory_with_hints(factory, SourceHints::default())
580 }
581
582 pub(crate) fn from_terminal_batch_materialized_factory<F>(factory: F) -> Self
583 where
584 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
585 {
586 Self::from_materialized_factory_with_hints(
587 factory,
588 SourceHints::with_terminal_consumer_batch(),
589 )
590 }
591
592 #[must_use]
593 pub fn as_source_with_context<Ctx, F>(
594 self,
595 extract_context: F,
596 ) -> SourceWithContext<Out, Ctx, Mat>
597 where
598 Ctx: Send + 'static,
599 F: Fn(&Out) -> Ctx + Send + Sync + 'static,
600 {
601 SourceWithContext::from_source(self.map(move |item| {
602 let context = extract_context(&item);
603 (item, context)
604 }))
605 }
606
607 #[must_use]
608 pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
609 where
610 Next: Send + 'static,
611 FlowMat: Send + 'static,
612 {
613 self.via_mat(flow, Keep::left)
614 }
615
616 #[must_use]
617 pub fn via_mat<Next, FlowMat, Combined, F>(
618 self,
619 flow: Flow<Out, Next, FlowMat>,
620 combine: F,
621 ) -> Source<Next, Combined>
622 where
623 Next: Send + 'static,
624 FlowMat: Send + 'static,
625 Combined: Send + 'static,
626 F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
627 {
628 let source = self.factory;
629 let transform = flow.transform;
630 let materialize_flow = flow.materialize;
631 let hints = self.hints.after_flow(flow.hints);
632 let combine = Arc::new(combine);
633 Source::from_materialized_factory_with_hints(
634 move |materializer| {
635 let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
636 let flow_mat = materialize_flow()?;
637 let stream = match &transform {
638 FlowTransform::Pure(transform) => transform(stream),
639 FlowTransform::Runtime(transform) => transform(stream, materializer)?,
640 };
641 Ok((stream, combine(source_mat, flow_mat)))
642 },
643 hints,
644 )
645 }
646
647 #[must_use]
648 pub fn via_mat_with<Next, FlowMat, Combined, F>(
649 self,
650 flow: Flow<Out, Next, FlowMat>,
651 combine: F,
652 ) -> Source<Next, Combined>
653 where
654 Next: Send + 'static,
655 FlowMat: Send + 'static,
656 Combined: Send + 'static,
657 F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
658 {
659 self.via_mat(flow, combine)
660 }
661
662 #[must_use]
663 pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
664 where
665 Next: Send + 'static,
666 F: Fn(Out) -> Next + Send + Sync + 'static,
667 {
668 let hints = self.hints.without_inline_micro();
669 Source {
670 factory: Arc::new(MapSourceFactory {
671 source: self.factory,
672 stage: f,
673 _marker: PhantomData,
674 }),
675 hints,
676 attributes: self.attributes,
677 split_hook: None,
678 }
679 }
680
681 #[must_use]
682 pub fn attributes(&self) -> &Attributes {
683 &self.attributes
684 }
685
686 #[must_use]
687 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
688 self.attributes = attributes;
689 self
690 }
691
692 #[must_use]
693 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
694 self.attributes = self.attributes.and(attributes);
695 self
696 }
697
698 #[must_use]
699 pub fn named(self, name: impl Into<String>) -> Self {
700 self.add_attributes(Attributes::named(name))
701 }
702
703 #[must_use]
704 pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
705 where
706 Next: Send + 'static,
707 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
708 {
709 self.via(Flow::identity().map_result(f))
710 }
711
712 #[must_use]
713 pub fn map_result_with_supervision<Next, F>(
714 self,
715 f: F,
716 decider: SupervisionDecider,
717 ) -> Source<Next, Mat>
718 where
719 Next: Send + 'static,
720 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
721 {
722 self.via(Flow::identity().map_result_with_supervision(f, decider))
723 }
724
725 #[must_use]
726 pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
727 where
728 F: Fn(&Out) -> bool + Send + Sync + 'static,
729 {
730 self.via(Flow::identity().filter(predicate))
731 }
732
733 #[must_use]
734 pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
735 where
736 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
737 {
738 self.via(Flow::identity().filter_result(predicate))
739 }
740
741 #[must_use]
742 pub fn filter_result_with_supervision<F>(
743 self,
744 predicate: F,
745 decider: SupervisionDecider,
746 ) -> Source<Out, Mat>
747 where
748 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
749 {
750 self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
751 }
752
753 #[must_use]
754 pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
755 where
756 F: Fn(&Out) -> bool + Send + Sync + 'static,
757 {
758 self.via(Flow::identity().filter_not(predicate))
759 }
760
761 #[must_use]
762 pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
763 where
764 Next: Send + 'static,
765 F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
766 {
767 self.via(Flow::identity().filter_map(f))
768 }
769
770 #[must_use]
771 pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
772 where
773 Next: Send + 'static,
774 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
775 {
776 self.via(Flow::identity().filter_map_result(f))
777 }
778
779 #[must_use]
780 pub fn filter_map_result_with_supervision<Next, F>(
781 self,
782 f: F,
783 decider: SupervisionDecider,
784 ) -> Source<Next, Mat>
785 where
786 Next: Send + 'static,
787 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
788 {
789 self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
790 }
791
792 #[must_use]
793 pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
794 where
795 Next: Send + 'static,
796 F: Fn(Out) -> I + Send + Sync + 'static,
797 I: IntoIterator<Item = Next>,
798 I::IntoIter: Send + 'static,
799 {
800 self.via(Flow::identity().map_concat(f))
801 }
802
803 #[must_use]
804 pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
805 where
806 Next: Send + 'static,
807 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
808 I: IntoIterator<Item = Next>,
809 I::IntoIter: Send + 'static,
810 {
811 self.via(Flow::identity().map_concat_result(f))
812 }
813
814 #[must_use]
815 pub fn map_concat_result_with_supervision<Next, F, I>(
816 self,
817 f: F,
818 decider: SupervisionDecider,
819 ) -> Source<Next, Mat>
820 where
821 Next: Send + 'static,
822 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
823 I: IntoIterator<Item = Next>,
824 I::IntoIter: Send + 'static,
825 {
826 self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
827 }
828
829 #[must_use]
830 pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
831 where
832 State: Clone + Send + Sync + 'static,
833 Next: Send + 'static,
834 F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
835 {
836 self.via(Flow::identity().stateful_map(seed, f))
837 }
838
839 #[must_use]
840 pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
841 where
842 State: Clone + Send + Sync + 'static,
843 Next: Send + 'static,
844 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
845 {
846 self.via(Flow::identity().stateful_map_result(seed, f))
847 }
848
849 #[must_use]
850 pub fn stateful_map_result_with_supervision<State, Next, F>(
851 self,
852 seed: State,
853 f: F,
854 decider: SupervisionDecider,
855 ) -> Source<Next, Mat>
856 where
857 State: Clone + Send + Sync + 'static,
858 Next: Send + 'static,
859 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
860 {
861 self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
862 }
863
864 #[must_use]
865 pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
866 where
867 State: Clone + Send + Sync + 'static,
868 Next: Send + 'static,
869 F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
870 I: IntoIterator<Item = Next>,
871 I::IntoIter: Send + 'static,
872 {
873 self.via(Flow::identity().stateful_map_concat(seed, f))
874 }
875
876 #[must_use]
877 pub fn stateful_map_concat_result<State, Next, F, I>(
878 self,
879 seed: State,
880 f: F,
881 ) -> Source<Next, Mat>
882 where
883 State: Clone + Send + Sync + 'static,
884 Next: Send + 'static,
885 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
886 I: IntoIterator<Item = Next>,
887 I::IntoIter: Send + 'static,
888 {
889 self.via(Flow::identity().stateful_map_concat_result(seed, f))
890 }
891
892 #[must_use]
893 pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
894 self,
895 seed: State,
896 f: F,
897 decider: SupervisionDecider,
898 ) -> Source<Next, Mat>
899 where
900 State: Clone + Send + Sync + 'static,
901 Next: Send + 'static,
902 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
903 I: IntoIterator<Item = Next>,
904 I::IntoIter: Send + 'static,
905 {
906 self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
907 }
908
909 #[must_use]
910 pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
911 where
912 Next: Send + 'static,
913 F: Fn(Out) -> Fut + Send + Sync + 'static,
914 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
915 {
916 self.via(Flow::identity().map_async(parallelism, f))
917 }
918
919 #[must_use]
920 pub fn map_async_with_supervision<Next, F, Fut>(
921 self,
922 parallelism: usize,
923 f: F,
924 decider: SupervisionDecider,
925 ) -> Source<Next, Mat>
926 where
927 Next: Send + 'static,
928 F: Fn(Out) -> Fut + Send + Sync + 'static,
929 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
930 {
931 self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
932 }
933
934 #[must_use]
935 pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
936 where
937 Next: Send + 'static,
938 F: Fn(Out) -> Fut + Send + Sync + 'static,
939 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
940 {
941 self.via(Flow::identity().map_async_unordered(parallelism, f))
942 }
943
944 #[must_use]
945 pub fn map_async_unordered_with_supervision<Next, F, Fut>(
946 self,
947 parallelism: usize,
948 f: F,
949 decider: SupervisionDecider,
950 ) -> Source<Next, Mat>
951 where
952 Next: Send + 'static,
953 F: Fn(Out) -> Fut + Send + Sync + 'static,
954 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
955 {
956 self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
957 }
958
959 #[must_use]
960 pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
961 self,
962 parallelism: usize,
963 per_partition: usize,
964 partition: Partition,
965 f: F,
966 ) -> Source<Next, Mat>
967 where
968 Key: Clone + Eq + Hash + Send + 'static,
969 Next: Send + 'static,
970 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
971 F: Fn(Out) -> Fut + Send + Sync + 'static,
972 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
973 {
974 self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
975 }
976
977 #[must_use]
978 pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
979 self.via(Flow::identity().prefix_and_tail(n))
980 }
981
982 #[must_use]
983 pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
984 where
985 Next: Send + 'static,
986 FlowMat: Send + 'static,
987 F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
988 Out: Clone,
989 {
990 self.via(Flow::identity().flat_map_prefix(n, f))
991 }
992
993 #[must_use]
994 pub fn group_by<Key, F>(
995 self,
996 max_substreams: usize,
997 f: F,
998 allow_closed_substream_recreation: bool,
999 ) -> Source<Source<Out>, Mat>
1000 where
1001 Key: Clone + Eq + Hash + Send + 'static,
1002 F: Fn(&Out) -> Key + Send + Sync + 'static,
1003 Out: Clone,
1004 {
1005 let batch_mode = if self.hints.inline_micro.is_some() && !allow_closed_substream_recreation
1006 {
1007 flow::GroupByBatchMode::FiniteEagerNoRecreate
1008 } else {
1009 flow::GroupByBatchMode::Immediate
1010 };
1011 self.via(Flow::identity().group_by_with_batching(
1012 max_substreams,
1013 f,
1014 allow_closed_substream_recreation,
1015 batch_mode,
1016 ))
1017 }
1018
1019 #[must_use]
1020 pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1021 where
1022 F: Fn(&Out) -> bool + Send + Sync + 'static,
1023 Out: Clone,
1024 {
1025 self.via(Flow::identity().split_when(predicate))
1026 }
1027
1028 #[must_use]
1029 pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1030 where
1031 F: Fn(&Out) -> bool + Send + Sync + 'static,
1032 Out: Clone,
1033 {
1034 self.via(Flow::identity().split_after(predicate))
1035 }
1036
1037 #[must_use]
1038 pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
1039 where
1040 Next: Send + 'static,
1041 NextMat: Send + 'static,
1042 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1043 {
1044 self.via(Flow::identity().flat_map_concat(f))
1045 }
1046
1047 #[must_use]
1048 pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
1049 where
1050 Next: Send + 'static,
1051 NextMat: Send + 'static,
1052 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1053 {
1054 self.via(Flow::identity().flat_map_merge(breadth, f))
1055 }
1056
1057 #[must_use]
1058 pub fn take(self, n: usize) -> Source<Out, Mat> {
1059 self.via(Flow::identity().take(n))
1060 }
1061
1062 #[must_use]
1063 pub fn drop(self, n: usize) -> Source<Out, Mat> {
1064 self.via(Flow::identity().drop(n))
1065 }
1066
1067 #[must_use]
1068 pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
1069 where
1070 F: Fn(&Out) -> bool + Send + Sync + 'static,
1071 {
1072 self.via(Flow::identity().take_while(predicate))
1073 }
1074
1075 #[must_use]
1076 pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
1077 where
1078 F: Fn(&Out) -> bool + Send + Sync + 'static,
1079 {
1080 self.via(Flow::identity().drop_while(predicate))
1081 }
1082
1083 #[must_use]
1084 pub fn limit(self, max: u64) -> Source<Out, Mat> {
1085 self.via(Flow::identity().limit(max))
1086 }
1087
1088 #[must_use]
1089 pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
1090 self.via(Flow::identity().grouped(size))
1091 }
1092
1093 #[must_use]
1094 pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1095 where
1096 State: Clone + Send + Sync + 'static,
1097 F: Fn(State, Out) -> State + Send + Sync + 'static,
1098 {
1099 self.via(Flow::identity().scan(seed, f))
1100 }
1101
1102 #[must_use]
1103 pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
1104 where
1105 State: Clone + Send + Sync + 'static,
1106 F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1107 Fut: Future<Output = StreamResult<State>> + Send + 'static,
1108 {
1109 self.via(Flow::identity().scan_async(seed, f))
1110 }
1111
1112 #[must_use]
1113 pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1114 where
1115 State: Clone + Send + Sync + 'static,
1116 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1117 {
1118 self.via(Flow::identity().scan_result(seed, f))
1119 }
1120
1121 #[must_use]
1122 pub fn scan_result_with_supervision<State, F>(
1123 self,
1124 seed: State,
1125 f: F,
1126 decider: SupervisionDecider,
1127 ) -> Source<State, Mat>
1128 where
1129 State: Clone + Send + Sync + 'static,
1130 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1131 {
1132 self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
1133 }
1134
1135 #[must_use]
1136 pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
1137 where
1138 Out: Clone,
1139 {
1140 self.via(Flow::identity().sliding(size, step))
1141 }
1142
1143 #[must_use]
1144 pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1145 where
1146 Acc: Clone + Send + Sync + 'static,
1147 F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1148 {
1149 self.via(Flow::identity().fold(zero, f))
1150 }
1151
1152 #[must_use]
1153 pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1154 where
1155 Acc: Clone + Send + Sync + 'static,
1156 F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1157 Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1158 {
1159 self.via(Flow::identity().fold_async(zero, f))
1160 }
1161
1162 #[must_use]
1163 pub fn map_with_resource<Resource, Next, Create, F, Close>(
1164 self,
1165 create: Create,
1166 f: F,
1167 close: Close,
1168 ) -> Source<Next, Mat>
1169 where
1170 Resource: Send + 'static,
1171 Next: Send + 'static,
1172 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1173 F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1174 Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1175 {
1176 self.via(Flow::identity().map_with_resource(create, f, close))
1177 }
1178
1179 #[must_use]
1180 pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1181 where
1182 Acc: Clone + Send + Sync + 'static,
1183 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1184 {
1185 self.via(Flow::identity().fold_result(zero, f))
1186 }
1187
1188 #[must_use]
1189 pub fn fold_result_with_supervision<Acc, F>(
1190 self,
1191 zero: Acc,
1192 f: F,
1193 decider: SupervisionDecider,
1194 ) -> Source<Acc, Mat>
1195 where
1196 Acc: Clone + Send + Sync + 'static,
1197 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1198 {
1199 self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
1200 }
1201
1202 #[must_use]
1203 pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
1204 where
1205 F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1206 {
1207 self.via(Flow::identity().reduce(f))
1208 }
1209
1210 #[must_use]
1211 pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
1212 where
1213 Out: Clone,
1214 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1215 {
1216 self.via(Flow::identity().reduce_result(f))
1217 }
1218
1219 #[must_use]
1220 pub fn reduce_result_with_supervision<F>(
1221 self,
1222 f: F,
1223 decider: SupervisionDecider,
1224 ) -> Source<Out, Mat>
1225 where
1226 Out: Clone,
1227 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1228 {
1229 self.via(Flow::identity().reduce_result_with_supervision(f, decider))
1230 }
1231
1232 #[must_use]
1233 pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
1234 where
1235 F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
1236 {
1237 self.via(Flow::identity().map_error(f))
1238 }
1239
1240 #[must_use]
1241 pub fn recover<F>(self, f: F) -> Source<Out, Mat>
1242 where
1243 F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
1244 {
1245 self.via(Flow::identity().recover(f))
1246 }
1247
1248 #[must_use]
1249 pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
1250 where
1251 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1252 {
1253 self.via(Flow::identity().recover_with(f))
1254 }
1255
1256 #[must_use]
1257 pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
1258 where
1259 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1260 {
1261 self.via(Flow::identity().recover_with_retries(retries, f))
1262 }
1263
1264 #[must_use]
1265 pub fn on_error_complete(self) -> Source<Out, Mat> {
1266 self.via(Flow::identity().on_error_complete())
1267 }
1268
1269 #[must_use]
1270 pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1271 where
1272 Mat2: Send + 'static,
1273 {
1274 let factory = self.factory;
1275 let hints = self.hints;
1276 let that_factory = that.factory;
1277 Source::from_materialized_factory_with_hints(
1278 move |materializer| {
1279 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1280 let secondary = match Arc::clone(&that_factory).create(materializer) {
1281 Ok((stream, _)) => stream,
1282 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1283 };
1284 Ok((concat_source_streams(vec![primary, secondary]), mat))
1285 },
1286 hints,
1287 )
1288 }
1289
1290 #[must_use]
1291 pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1292 where
1293 Mat2: Send + 'static,
1294 {
1295 let factory = self.factory;
1296 let hints = self.hints;
1297 let that_factory = that.factory;
1298 Source::from_materialized_factory_with_hints(
1299 move |materializer| {
1300 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1301 Ok((
1302 concat_source_streams_lazy(
1303 primary,
1304 vec![Arc::clone(&that_factory)],
1305 materializer,
1306 ),
1307 mat,
1308 ))
1309 },
1310 hints,
1311 )
1312 }
1313
1314 #[must_use]
1315 pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
1316 where
1317 Mat2: Send + 'static,
1318 I: IntoIterator<Item = Source<Out, Mat2>>,
1319 {
1320 let factory = self.factory;
1321 let hints = self.hints;
1322 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1323 Source::from_materialized_factory_with_hints(
1324 move |materializer| {
1325 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1326 Ok((
1327 concat_source_streams_lazy(primary, other_factories.clone(), materializer),
1328 mat,
1329 ))
1330 },
1331 hints,
1332 )
1333 }
1334
1335 #[must_use]
1336 pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1337 where
1338 Mat2: Send + 'static,
1339 {
1340 let factory = self.factory;
1341 let hints = self.hints;
1342 let that_factory = that.factory;
1343 Source::from_materialized_factory_with_hints(
1344 move |materializer| {
1345 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1346 let secondary = match Arc::clone(&that_factory).create(materializer) {
1347 Ok((stream, _)) => stream,
1348 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1349 };
1350 Ok((concat_source_streams(vec![secondary, primary]), mat))
1351 },
1352 hints,
1353 )
1354 }
1355
1356 #[must_use]
1357 pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1358 where
1359 Mat2: Send + 'static,
1360 {
1361 self.prepend(that)
1362 }
1363
1364 #[must_use]
1365 pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
1366 where
1367 Mat2: Send + 'static,
1368 {
1369 let factory = self.factory;
1370 let hints = self.hints;
1371 let secondary_factory = secondary.factory;
1372 Source::from_materialized_factory_with_hints(
1373 move |materializer| {
1374 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1375 let secondary = match Arc::clone(&secondary_factory).create(materializer) {
1376 Ok((stream, _)) => stream,
1377 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1378 };
1379 Ok((or_else_source_stream(primary, secondary), mat))
1380 },
1381 hints,
1382 )
1383 }
1384
1385 #[must_use]
1386 pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
1387 where
1388 Mat2: Send + 'static,
1389 {
1390 self.interleave_all([that], segment_size, false)
1391 }
1392
1393 #[must_use]
1394 pub fn interleave_all<Mat2, I>(
1395 self,
1396 those: I,
1397 segment_size: usize,
1398 eager_close: bool,
1399 ) -> Source<Out, Mat>
1400 where
1401 Mat2: Send + 'static,
1402 I: IntoIterator<Item = Source<Out, Mat2>>,
1403 {
1404 let factory = self.factory;
1405 let hints = self.hints;
1406 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1407 Source::from_materialized_factory_with_hints(
1408 move |materializer| {
1409 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1410 let mut streams = Vec::with_capacity(other_factories.len() + 1);
1411 streams.push(primary);
1412 for other in &other_factories {
1413 let stream = match Arc::clone(other).create(materializer) {
1414 Ok((stream, _)) => stream,
1415 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1416 };
1417 streams.push(stream);
1418 }
1419 Ok((
1420 interleave_source_streams(streams, segment_size, eager_close),
1421 mat,
1422 ))
1423 },
1424 hints,
1425 )
1426 }
1427
1428 #[must_use]
1429 pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1430 where
1431 Out: Ord,
1432 Mat2: Send + 'static,
1433 {
1434 self.via(Flow::identity().merge_sorted(that))
1435 }
1436
1437 #[must_use]
1438 pub fn merge_latest<Mat2>(
1439 self,
1440 that: Source<Out, Mat2>,
1441 eager_complete: bool,
1442 ) -> Source<Vec<Out>, Mat>
1443 where
1444 Out: Clone,
1445 Mat2: Send + 'static,
1446 {
1447 let factory = self.factory;
1448 let hints = self.hints;
1449 let that_factory = that.factory;
1450 Source::from_materialized_factory_with_hints(
1451 move |materializer| {
1452 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1453 let right = match Arc::clone(&that_factory).create(materializer) {
1454 Ok((stream, _)) => stream,
1455 Err(error) => {
1456 return Ok((
1457 Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
1458 mat,
1459 ));
1460 }
1461 };
1462 Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
1463 },
1464 hints,
1465 )
1466 }
1467
1468 #[must_use]
1469 pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
1470 where
1471 Mat2: Send + 'static,
1472 I: IntoIterator<Item = Source<Out, Mat2>>,
1473 {
1474 let factory = self.factory;
1475 let hints = self.hints;
1476 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1477 Source::from_materialized_factory_with_hints(
1478 move |materializer| {
1479 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1480 let mut streams = Vec::with_capacity(other_factories.len() + 1);
1481 streams.push(primary);
1482 for other in &other_factories {
1483 let stream = match Arc::clone(other).create(materializer) {
1484 Ok((stream, _)) => stream,
1485 Err(error) => {
1486 return Ok((
1487 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1488 mat,
1489 ));
1490 }
1491 };
1492 streams.push(stream);
1493 }
1494 Ok((merge_streams(streams, eager_complete), mat))
1495 },
1496 hints,
1497 )
1498 }
1499
1500 #[must_use]
1501 pub fn zip_with<Mat2, Out2, Next, F>(
1502 self,
1503 that: Source<Out2, Mat2>,
1504 combine: F,
1505 ) -> Source<Next, Mat>
1506 where
1507 Out2: Send + 'static,
1508 Next: Send + 'static,
1509 Mat2: Send + 'static,
1510 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1511 {
1512 self.via(Flow::identity().zip_with(that, combine))
1513 }
1514
1515 #[must_use]
1516 pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
1517 where
1518 Out: Clone,
1519 Out2: Clone + Send + 'static,
1520 Mat2: Send + 'static,
1521 {
1522 self.zip_latest_with(that, true, |left, right| (left, right))
1523 }
1524
1525 #[must_use]
1526 pub fn zip_latest_with<Mat2, Out2, Next, F>(
1527 self,
1528 that: Source<Out2, Mat2>,
1529 eager_complete: bool,
1530 combine: F,
1531 ) -> Source<Next, Mat>
1532 where
1533 Out: Clone,
1534 Out2: Clone + Send + 'static,
1535 Next: Send + 'static,
1536 Mat2: Send + 'static,
1537 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1538 {
1539 let factory = self.factory;
1540 let hints = self.hints;
1541 let that_factory = that.factory;
1542 let combine = Arc::new(combine);
1543 Source::from_materialized_factory_with_hints(
1544 move |materializer| {
1545 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1546 let right = match Arc::clone(&that_factory).create(materializer) {
1547 Ok((stream, _)) => stream,
1548 Err(error) => {
1549 return Ok((
1550 Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1551 mat,
1552 ));
1553 }
1554 };
1555 Ok((
1556 zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
1557 mat,
1558 ))
1559 },
1560 hints,
1561 )
1562 }
1563
1564 #[must_use]
1565 pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
1566 let factory = self.factory;
1567 let hints = self.hints;
1568 Source::from_materialized_factory_with_hints(
1569 move |materializer| {
1570 let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
1571 let mut index = 0_u64;
1572 Ok((
1573 Box::new(std::iter::from_fn(move || {
1574 stream.next().map(|item| {
1575 item.map(|value| {
1576 let pair = (value, index);
1577 index = index.wrapping_add(1);
1578 pair
1579 })
1580 })
1581 })) as BoxStream<(Out, u64)>,
1582 mat,
1583 ))
1584 },
1585 hints,
1586 )
1587 }
1588
1589 #[must_use]
1590 pub fn zip_all<Mat2, Out2>(
1591 self,
1592 that: Source<Out2, Mat2>,
1593 this_elem: Out,
1594 that_elem: Out2,
1595 ) -> Source<(Out, Out2), Mat>
1596 where
1597 Out: Clone + Sync,
1598 Out2: Clone + Send + Sync + 'static,
1599 Mat2: Send + 'static,
1600 {
1601 let factory = self.factory;
1602 let hints = self.hints;
1603 let that_factory = that.factory;
1604 Source::from_materialized_factory_with_hints(
1605 move |materializer| {
1606 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1607 let right = match Arc::clone(&that_factory).create(materializer) {
1608 Ok((stream, _)) => stream,
1609 Err(error) => {
1610 return Ok((
1611 Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
1612 mat,
1613 ));
1614 }
1615 };
1616 Ok((
1617 zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
1618 mat,
1619 ))
1620 },
1621 hints,
1622 )
1623 }
1624
1625 #[must_use]
1626 pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1627 where
1628 Out: Clone,
1629 SinkMat: Send + 'static,
1630 {
1631 self.via(Flow::identity().also_to(sink))
1632 }
1633
1634 #[must_use]
1635 pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
1636 where
1637 Out: Clone,
1638 SinkMat: Send + 'static,
1639 I: IntoIterator<Item = Sink<Out, SinkMat>>,
1640 {
1641 self.via(Flow::identity().also_to_all(sinks))
1642 }
1643
1644 #[must_use]
1645 pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
1646 where
1647 SinkMat: Send + 'static,
1648 F: Fn(&Out) -> bool + Send + Sync + 'static,
1649 {
1650 self.via(Flow::identity().divert_to(sink, predicate))
1651 }
1652
1653 #[must_use]
1654 pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1655 where
1656 Out: Clone,
1657 SinkMat: Send + 'static,
1658 {
1659 self.via(Flow::identity().wire_tap(sink))
1660 }
1661
1662 pub fn run_with<SinkMat: Send + 'static>(
1663 self,
1664 sink: Sink<Out, SinkMat>,
1665 ) -> StreamResult<SinkMat> {
1666 let fast_result = self
1669 .split_hook
1670 .as_ref()
1671 .zip(sink.fold_fp.as_deref())
1672 .and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
1673 if let Some(result) = fast_result {
1674 return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
1675 StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
1676 });
1677 }
1678 self.to_mat(sink, Keep::right).run()
1679 }
1680
1681 pub fn run_with_materializer<SinkMat: Send + 'static>(
1682 self,
1683 sink: Sink<Out, SinkMat>,
1684 materializer: &Materializer,
1685 ) -> StreamResult<SinkMat> {
1686 self.to_mat(sink, Keep::right)
1687 .run_with_materializer(materializer)
1688 }
1689
1690 #[must_use]
1691 pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
1692 where
1693 SinkMat: Send + 'static,
1694 {
1695 self.to_mat(sink, Keep::left)
1696 }
1697
1698 #[must_use]
1699 pub fn to_mat<SinkMat, Combined, F>(
1700 self,
1701 sink: Sink<Out, SinkMat>,
1702 combine: F,
1703 ) -> RunnableGraph<Combined>
1704 where
1705 SinkMat: Send + 'static,
1706 Combined: Send + 'static,
1707 F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
1708 {
1709 let factory = self.factory;
1710 let hints = self.hints;
1711 let combine = Arc::new(combine);
1712 RunnableGraph::from_runner(move |materializer| {
1713 let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1714 let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
1715 let stream =
1716 runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
1717 sink.run_inline(stream, materializer)?
1718 } else {
1719 sink.run_from_source(stream, materializer, hints.runtime())?
1720 };
1721 Ok(combine(source_mat, sink_mat))
1722 })
1723 }
1724
1725 pub fn run_collect(self) -> StreamResult<Vec<Out>> {
1726 self.run_with(Sink::collect())?.wait()
1727 }
1728
1729 #[must_use]
1730 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
1731 where
1732 NextMat: Send + 'static,
1733 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
1734 {
1735 let factory = self.factory;
1736 let hints = self.hints;
1737 let f = Arc::new(f);
1738 Source::from_materialized_factory_with_hints(
1739 move |materializer| {
1740 let (stream, mat) = Arc::clone(&factory).create(materializer)?;
1741 Ok((stream, f(mat)))
1742 },
1743 hints,
1744 )
1745 }
1746}
1747
1748impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
1749 #[must_use]
1750 pub fn combine<Mat1, Mat2, MatRest, I>(
1751 first: Source<Out, Mat1>,
1752 second: Source<Out, Mat2>,
1753 rest: I,
1754 strategy: SourceCombineStrategy,
1755 ) -> Source<Out, NotUsed>
1756 where
1757 Mat1: Send + 'static,
1758 Mat2: Send + 'static,
1759 MatRest: Send + 'static,
1760 I: IntoIterator<Item = Source<Out, MatRest>>,
1761 {
1762 let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
1763 Arc::new(move |materializer| {
1764 Arc::clone(&first.factory)
1765 .create(materializer)
1766 .map(|(stream, _)| stream)
1767 }),
1768 Arc::new(move |materializer| {
1769 Arc::clone(&second.factory)
1770 .create(materializer)
1771 .map(|(stream, _)| stream)
1772 }),
1773 ];
1774 factories.extend(rest.into_iter().map(|source| {
1775 Arc::new(move |materializer: &Materializer| {
1776 Arc::clone(&source.factory)
1777 .create(materializer)
1778 .map(|(stream, _)| stream)
1779 }) as Arc<CombinedSourceFactory<Out>>
1780 }));
1781 Source::from_materialized_factory(move |materializer| {
1782 let mut streams = Vec::with_capacity(factories.len());
1783 for factory in &factories {
1784 let stream = match factory(materializer) {
1785 Ok(stream) => stream,
1786 Err(error) => {
1787 return Ok((
1788 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1789 NotUsed,
1790 ));
1791 }
1792 };
1793 streams.push(stream);
1794 }
1795 let stream = match &strategy {
1796 SourceCombineStrategy::Merge { eager_complete } => {
1797 merge_streams(streams, *eager_complete)
1798 }
1799 SourceCombineStrategy::Concat => concat_source_streams(streams),
1800 SourceCombineStrategy::Prioritized {
1801 priorities,
1802 eager_complete,
1803 } => {
1804 if priorities.len() != streams.len() {
1805 return Err(StreamError::GraphValidation(format!(
1806 "combine priorities length {} did not match source count {}",
1807 priorities.len(),
1808 streams.len()
1809 )));
1810 }
1811 merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
1812 }
1813 };
1814 Ok((stream, NotUsed))
1815 })
1816 }
1817
1818 #[must_use]
1819 pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
1820 where
1821 I: IntoIterator<Item = Source<Out, Mat2>>,
1822 Mat2: Send + 'static,
1823 Out: Clone,
1824 {
1825 Self::zip_with_n(sources, |values| values)
1826 }
1827
1828 #[must_use]
1829 pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
1830 where
1831 I: IntoIterator<Item = Source<Out, Mat2>>,
1832 Mat2: Send + 'static,
1833 Next: Send + 'static,
1834 F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
1835 {
1836 let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
1837 let zipper = Arc::new(zipper);
1838 Source::from_materialized_factory(move |materializer| {
1839 let mut streams = Vec::with_capacity(factories.len());
1840 for factory in &factories {
1841 let stream = match Arc::clone(factory).create(materializer) {
1842 Ok((stream, _)) => stream,
1843 Err(error) => {
1844 return Ok((
1845 Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1846 NotUsed,
1847 ));
1848 }
1849 };
1850 streams.push(stream);
1851 }
1852 Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
1853 })
1854 }
1855
1856 #[must_use]
1857 pub fn merge_prioritized_n<Mat2, I>(
1858 sources_and_priorities: I,
1859 eager_complete: bool,
1860 ) -> Source<Out, NotUsed>
1861 where
1862 I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
1863 Mat2: Send + 'static,
1864 {
1865 let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
1866 if sources_and_priorities.is_empty() {
1867 return Source::empty();
1868 }
1869 let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
1870 .into_iter()
1871 .map(|(source, priority)| (source.factory, priority))
1872 .unzip();
1873 Source::from_materialized_factory(move |materializer| {
1874 let mut streams = Vec::with_capacity(factories.len());
1875 for factory in &factories {
1876 let stream = match Arc::clone(factory).create(materializer) {
1877 Ok((stream, _)) => stream,
1878 Err(error) => {
1879 return Ok((
1880 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1881 NotUsed,
1882 ));
1883 }
1884 };
1885 streams.push(stream);
1886 }
1887 Ok((
1888 merge_prioritized_streams(streams, priorities.clone(), eager_complete),
1889 NotUsed,
1890 ))
1891 })
1892 }
1893
1894 #[must_use]
1895 pub fn maybe() -> (MaybeHandle<Out>, Self) {
1896 let value = Arc::new(Mutex::new(None));
1897 let handle = MaybeHandle {
1898 value: Arc::clone(&value),
1899 };
1900 let source = Self::from_factory(move || {
1901 let result = value
1902 .lock()
1903 .expect("maybe source poisoned")
1904 .clone()
1905 .unwrap_or(Err(StreamError::MaybeIncomplete));
1906 Box::new(std::iter::once(result))
1907 });
1908 (handle, source)
1909 }
1910
1911 #[must_use]
1912 pub fn single(item: Out) -> Self {
1913 Self::from_factory_with_hints(
1914 move || Box::new(std::iter::once(Ok(item.clone()))),
1915 SourceHints::with_inline_micro(1),
1916 )
1917 }
1918
1919 #[must_use]
1920 pub fn repeat(item: Out) -> Self {
1921 Self::from_factory(move || {
1922 let item = item.clone();
1923 Box::new(std::iter::repeat_with(move || Ok(item.clone())))
1924 })
1925 }
1926
1927 #[must_use]
1928 pub fn from_iterable<I>(items: I) -> Self
1929 where
1930 I: IntoIterator<Item = Out>,
1931 {
1932 items.into_iter().collect()
1933 }
1934}
1935
1936impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
1937 fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
1938 let items: Arc<[Out]> = iter.into_iter().collect();
1941 let len = items.len();
1942 Self::from_factory_with_hints(
1943 move || {
1944 let items = Arc::clone(&items);
1945 let mut index = 0;
1946 Box::new(std::iter::from_fn(move || {
1947 let item = items.get(index)?.clone();
1948 index += 1;
1949 Some(Ok(item))
1950 }))
1951 },
1952 SourceHints::with_inline_micro(len),
1954 )
1955 }
1956}
1957
1958#[cfg(test)]
1963pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
1964 factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
1965 max_success_items: usize,
1966) -> Source<Out, NotUsed> {
1967 Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
1968}
1969
1970struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
1971where
1972 Create: Fn() -> StreamResult<Resource>,
1973 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1974 Close: Fn(Resource) -> StreamResult<()>,
1975{
1976 create: Arc<Create>,
1977 read: Arc<Read>,
1978 close: Arc<Close>,
1979 resource: Option<Resource>,
1980 created: bool,
1981 terminated: bool,
1982 _marker: PhantomData<fn() -> Out>,
1983}
1984
1985impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
1986where
1987 Create: Fn() -> StreamResult<Resource>,
1988 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1989 Close: Fn(Resource) -> StreamResult<()>,
1990{
1991 fn ensure_created(&mut self) -> StreamResult<()> {
1992 if self.created {
1993 return Ok(());
1994 }
1995 self.created = true;
1996 let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
1997 .and_then(|result| result)?;
1998 self.resource = Some(resource);
1999 Ok(())
2000 }
2001
2002 fn close_resource(&mut self) -> StreamResult<()> {
2003 match self.resource.take() {
2004 Some(resource) => {
2005 catch_unwind_failed("unfold_resource close", || (self.close)(resource))
2006 .and_then(|result| result)
2007 }
2008 None => Ok(()),
2009 }
2010 }
2011}
2012
2013impl<Resource, Out, Create, Read, Close> Iterator
2014 for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2015where
2016 Create: Fn() -> StreamResult<Resource>,
2017 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2018 Close: Fn(Resource) -> StreamResult<()>,
2019{
2020 type Item = StreamResult<Out>;
2021
2022 fn next(&mut self) -> Option<Self::Item> {
2023 if self.terminated {
2024 return None;
2025 }
2026 if let Err(error) = self.ensure_created() {
2027 self.terminated = true;
2028 return Some(Err(error));
2029 }
2030
2031 let result = {
2032 let resource = self
2033 .resource
2034 .as_mut()
2035 .expect("unfold_resource resource is open");
2036 catch_unwind_failed("unfold_resource read", || (self.read)(resource))
2037 .and_then(|result| result)
2038 };
2039
2040 match result {
2041 Ok(Some(item)) => Some(Ok(item)),
2042 Ok(None) => {
2043 self.terminated = true;
2044 match self.close_resource() {
2045 Ok(()) => None,
2046 Err(error) => Some(Err(error)),
2047 }
2048 }
2049 Err(read_error) => {
2050 self.terminated = true;
2051 let _ = self.close_resource();
2052 Some(Err(read_error))
2053 }
2054 }
2055 }
2056}
2057
2058impl<Resource, Out, Create, Read, Close> Drop
2059 for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2060where
2061 Create: Fn() -> StreamResult<Resource>,
2062 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2063 Close: Fn(Resource) -> StreamResult<()>,
2064{
2065 fn drop(&mut self) {
2066 let _ = self.close_resource();
2067 }
2068}
2069
2070type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
2071 fn() -> (Out, CreateFut, ReadFut, CloseFut);
2072
2073struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2074where
2075 Resource: Send + 'static,
2076 Create: Fn() -> CreateFut,
2077 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2078 Read: Fn(&mut Resource) -> ReadFut,
2079 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2080 Close: Fn(Resource) -> CloseFut,
2081 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2082{
2083 create: Arc<Create>,
2084 read: Arc<Read>,
2085 close: Arc<Close>,
2086 resource: Option<Resource>,
2087 created: bool,
2088 terminated: bool,
2089 _marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
2090}
2091
2092impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2093 UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2094where
2095 Resource: Send + 'static,
2096 Create: Fn() -> CreateFut,
2097 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2098 Read: Fn(&mut Resource) -> ReadFut,
2099 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2100 Close: Fn(Resource) -> CloseFut,
2101 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2102{
2103 fn ensure_created(&mut self) -> StreamResult<()> {
2104 if self.created {
2105 return Ok(());
2106 }
2107 self.created = true;
2108 let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
2109 .and_then(flow::run_future_inline_or_spawn)?;
2110 self.resource = Some(resource);
2111 Ok(())
2112 }
2113
2114 fn close_resource(&mut self) -> StreamResult<()> {
2115 match self.resource.take() {
2116 Some(resource) => {
2117 catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
2118 .and_then(flow::run_future_inline_or_spawn)
2119 }
2120 None => Ok(()),
2121 }
2122 }
2123}
2124
2125impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
2126 for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2127where
2128 Resource: Send + 'static,
2129 Out: Send + 'static,
2130 Create: Fn() -> CreateFut,
2131 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2132 Read: Fn(&mut Resource) -> ReadFut,
2133 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2134 Close: Fn(Resource) -> CloseFut,
2135 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2136{
2137 type Item = StreamResult<Out>;
2138
2139 fn next(&mut self) -> Option<Self::Item> {
2140 if self.terminated {
2141 return None;
2142 }
2143 if let Err(error) = self.ensure_created() {
2144 self.terminated = true;
2145 return Some(Err(error));
2146 }
2147
2148 let result = {
2149 let resource = self
2150 .resource
2151 .as_mut()
2152 .expect("unfold_resource_async resource is open");
2153 catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
2154 .and_then(flow::run_future_inline_or_spawn)
2155 };
2156
2157 match result {
2158 Ok(Some(item)) => Some(Ok(item)),
2159 Ok(None) => {
2160 self.terminated = true;
2161 match self.close_resource() {
2162 Ok(()) => None,
2163 Err(error) => Some(Err(error)),
2164 }
2165 }
2166 Err(read_error) => {
2167 self.terminated = true;
2168 let _ = self.close_resource();
2169 Some(Err(read_error))
2170 }
2171 }
2172 }
2173}
2174
2175impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
2176 for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2177where
2178 Resource: Send + 'static,
2179 Create: Fn() -> CreateFut,
2180 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2181 Read: Fn(&mut Resource) -> ReadFut,
2182 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2183 Close: Fn(Resource) -> CloseFut,
2184 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2185{
2186 fn drop(&mut self) {
2187 let _ = self.close_resource();
2188 }
2189}
2190
2191struct LazySourceStream<Out, InnerMat, F> {
2192 create: Arc<F>,
2193 materializer: Materializer,
2194 current: Option<BoxStream<Out>>,
2195 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2196 initialized: bool,
2197 terminated: bool,
2198}
2199
2200impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
2201where
2202 Out: Send + 'static,
2203 InnerMat: Send + 'static,
2204 F: Fn() -> Source<Out, InnerMat>,
2205{
2206 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2207 if let Some(sender) = self.mat_sender.take() {
2208 let _ = sender.send(result);
2209 }
2210 }
2211
2212 fn initialize(&mut self) -> StreamResult<()> {
2213 if self.initialized {
2214 return Ok(());
2215 }
2216 self.initialized = true;
2217 let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
2218 Ok(source) => source,
2219 Err(error) => {
2220 self.complete_mat(Err(error.clone()));
2221 return Err(error);
2222 }
2223 };
2224 match Arc::clone(&source.factory).create(&self.materializer) {
2225 Ok((stream, mat)) => {
2226 self.current = Some(stream);
2227 self.complete_mat(Ok(mat));
2228 Ok(())
2229 }
2230 Err(error) => {
2231 self.complete_mat(Err(error.clone()));
2232 Err(error)
2233 }
2234 }
2235 }
2236}
2237
2238impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
2239where
2240 Out: Send + 'static,
2241 InnerMat: Send + 'static,
2242 F: Fn() -> Source<Out, InnerMat>,
2243{
2244 type Item = StreamResult<Out>;
2245
2246 fn next(&mut self) -> Option<Self::Item> {
2247 if self.terminated {
2248 return None;
2249 }
2250 if let Err(error) = self.initialize() {
2251 self.terminated = true;
2252 return Some(Err(error));
2253 }
2254 match self
2255 .current
2256 .as_mut()
2257 .expect("lazy_source current stream initialized")
2258 .next()
2259 {
2260 Some(Ok(item)) => Some(Ok(item)),
2261 Some(Err(error)) => {
2262 self.terminated = true;
2263 Some(Err(error))
2264 }
2265 None => {
2266 self.terminated = true;
2267 None
2268 }
2269 }
2270 }
2271}
2272
2273impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
2274 fn drop(&mut self) {
2275 if !self.initialized
2276 && let Some(sender) = self.mat_sender.take()
2277 {
2278 let _ = sender.send(Err(StreamError::Failed(
2279 "lazy source was never materialized".into(),
2280 )));
2281 }
2282 }
2283}
2284
2285struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2286 create: Arc<F>,
2287 materializer: Materializer,
2288 current: Option<BoxStream<Out>>,
2289 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2290 initialized: bool,
2291 terminated: bool,
2292 _marker: PhantomData<fn() -> Fut>,
2293}
2294
2295impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
2296where
2297 Out: Send + 'static,
2298 InnerMat: Send + 'static,
2299 F: Fn() -> Fut,
2300 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2301{
2302 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2303 if let Some(sender) = self.mat_sender.take() {
2304 let _ = sender.send(result);
2305 }
2306 }
2307
2308 fn initialize(&mut self) -> StreamResult<()> {
2309 if self.initialized {
2310 return Ok(());
2311 }
2312 self.initialized = true;
2313 let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
2314 .and_then(flow::run_future_inline_or_spawn)
2315 {
2316 Ok(source) => source,
2317 Err(error) => {
2318 self.complete_mat(Err(error.clone()));
2319 return Err(error);
2320 }
2321 };
2322 match Arc::clone(&source.factory).create(&self.materializer) {
2323 Ok((stream, mat)) => {
2324 self.current = Some(stream);
2325 self.complete_mat(Ok(mat));
2326 Ok(())
2327 }
2328 Err(error) => {
2329 self.complete_mat(Err(error.clone()));
2330 Err(error)
2331 }
2332 }
2333 }
2334}
2335
2336impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
2337where
2338 Out: Send + 'static,
2339 InnerMat: Send + 'static,
2340 F: Fn() -> Fut,
2341 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2342{
2343 type Item = StreamResult<Out>;
2344
2345 fn next(&mut self) -> Option<Self::Item> {
2346 if self.terminated {
2347 return None;
2348 }
2349 if let Err(error) = self.initialize() {
2350 self.terminated = true;
2351 return Some(Err(error));
2352 }
2353 match self
2354 .current
2355 .as_mut()
2356 .expect("lazy_future_source current stream initialized")
2357 .next()
2358 {
2359 Some(Ok(item)) => Some(Ok(item)),
2360 Some(Err(error)) => {
2361 self.terminated = true;
2362 Some(Err(error))
2363 }
2364 None => {
2365 self.terminated = true;
2366 None
2367 }
2368 }
2369 }
2370}
2371
2372impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2373 fn drop(&mut self) {
2374 if !self.initialized
2375 && let Some(sender) = self.mat_sender.take()
2376 {
2377 let _ = sender.send(Err(StreamError::Failed(
2378 "lazy future source was never materialized".into(),
2379 )));
2380 }
2381 }
2382}
2383
2384fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
2385where
2386 Out: Send + 'static,
2387{
2388 let mut streams: VecDeque<_> = streams.into();
2389 let mut current = streams.pop_front();
2390 Box::new(std::iter::from_fn(move || {
2391 loop {
2392 match current.as_mut() {
2393 Some(stream) => match stream.next() {
2394 Some(item) => return Some(item),
2395 None => current = streams.pop_front(),
2396 },
2397 None => return None,
2398 }
2399 }
2400 }))
2401}
2402
2403fn concat_source_streams_lazy<Out, Mat>(
2404 initial: BoxStream<Out>,
2405 factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
2406 materializer: &Materializer,
2407) -> BoxStream<Out>
2408where
2409 Out: Send + 'static,
2410 Mat: Send + 'static,
2411{
2412 let mut current = Some(initial);
2413 let mut remaining: VecDeque<_> = factories.into();
2414 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
2415 Box::new(std::iter::from_fn(move || {
2416 loop {
2417 match current.as_mut() {
2418 Some(stream) => match stream.next() {
2419 Some(item) => return Some(item),
2420 None => {
2421 current = remaining.pop_front().map(|factory| {
2422 match factory.create(&materializer) {
2423 Ok((stream, _)) => stream,
2424 Err(error) => {
2425 Box::new(std::iter::once(Err(error))) as BoxStream<Out>
2426 }
2427 }
2428 });
2429 }
2430 },
2431 None => return None,
2432 }
2433 }
2434 }))
2435}
2436
2437fn or_else_source_stream<Out>(
2438 mut primary: BoxStream<Out>,
2439 mut secondary: BoxStream<Out>,
2440) -> BoxStream<Out>
2441where
2442 Out: Send + 'static,
2443{
2444 let mut primary_emitted = false;
2445 let mut using_secondary = false;
2446 Box::new(std::iter::from_fn(move || {
2447 loop {
2448 if using_secondary {
2449 return secondary.next();
2450 }
2451
2452 match primary.next() {
2453 Some(Ok(item)) => {
2454 primary_emitted = true;
2455 return Some(Ok(item));
2456 }
2457 Some(Err(error)) => return Some(Err(error)),
2458 None if primary_emitted => return None,
2459 None => using_secondary = true,
2460 }
2461 }
2462 }))
2463}
2464
2465fn interleave_source_streams<Out>(
2466 streams: Vec<BoxStream<Out>>,
2467 segment_size: usize,
2468 eager_close: bool,
2469) -> BoxStream<Out>
2470where
2471 Out: Send + 'static,
2472{
2473 if segment_size == 0 {
2474 return Box::new(std::iter::once(Err(StreamError::GraphValidation(
2475 "interleave segment size must be greater than zero".into(),
2476 ))));
2477 }
2478
2479 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
2480 let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
2481 let mut current = 0usize;
2482 let mut emitted = 0usize;
2483 Box::new(std::iter::from_fn(move || {
2484 loop {
2485 if streams.iter().all(Option::is_none) {
2486 return None;
2487 }
2488 if streams[current].is_none() {
2489 match next_active_source_stream(&streams, current) {
2490 Some(next) => {
2491 current = next;
2492 emitted = 0;
2493 }
2494 None => return None,
2495 }
2496 }
2497
2498 let Some(stream) = streams[current].as_mut() else {
2499 continue;
2500 };
2501 let next_item = pending[current].take().or_else(|| stream.next());
2502 match next_item {
2503 Some(Ok(item)) => {
2504 emitted += 1;
2505 if emitted == segment_size {
2506 emitted = 0;
2507 if let Some(next) = next_active_source_stream(&streams, current) {
2508 current = next;
2509 }
2510 }
2511 return Some(Ok(item));
2512 }
2513 Some(Err(error)) => return Some(Err(error)),
2514 None => {
2515 streams[current] = None;
2516 emitted = 0;
2517 if eager_close {
2518 return None;
2519 }
2520 match next_active_source_stream(&streams, current) {
2521 Some(next) => current = next,
2522 None => return None,
2523 }
2524 }
2525 }
2526 }
2527 }))
2528}
2529
2530fn next_active_source_stream<Out>(
2531 streams: &[Option<BoxStream<Out>>],
2532 current: usize,
2533) -> Option<usize>
2534where
2535 Out: Send + 'static,
2536{
2537 if streams.is_empty() {
2538 return None;
2539 }
2540 for offset in 1..=streams.len() {
2541 let index = (current + offset) % streams.len();
2542 if streams[index].is_some() {
2543 return Some(index);
2544 }
2545 }
2546 None
2547}