1use super::flow::{self, FlowTransform};
2use super::*;
3use crate::Attributes;
4use crate::context::SourceWithContext;
5
6#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
7pub struct NotUsed;
8
9type CombinedSourceFactory<Out> =
10 dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
11
12pub struct Keep;
13
14impl Keep {
15 pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
16 left
17 }
18
19 pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
20 right
21 }
22
23 pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
24 (left, right)
25 }
26
27 pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
28 NotUsed
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum SourceCombineStrategy {
34 Merge {
35 eager_complete: bool,
36 },
37 Concat,
38 Prioritized {
39 priorities: Vec<usize>,
40 eager_complete: bool,
41 },
42}
43
44#[derive(Clone)]
45pub struct MaybeHandle<T> {
46 value: Arc<Mutex<Option<StreamResult<T>>>>,
47}
48
49impl<T> MaybeHandle<T> {
50 #[must_use]
51 pub fn is_completed(&self) -> bool {
52 self.value.lock().expect("maybe handle poisoned").is_some()
53 }
54
55 pub fn complete(&self, item: T) -> StreamResult<()> {
56 self.settle(Ok(item))
57 }
58
59 pub fn fail(&self, error: StreamError) -> StreamResult<()> {
60 self.settle(Err(error))
61 }
62
63 fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
64 let mut value = self.value.lock().expect("maybe handle poisoned");
65 if value.is_some() {
66 return Err(StreamError::Failed("maybe source already completed".into()));
67 }
68 *value = Some(result);
69 Ok(())
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub struct Demand(u64);
75
76impl Demand {
77 pub const ZERO: Self = Self(0);
78 pub const ONE: Self = Self(1);
79 pub const MAX: Self = Self(u64::MAX);
80
81 #[must_use]
82 pub const fn new(available: u64) -> Self {
83 Self(available)
84 }
85
86 #[must_use]
87 pub const fn available(self) -> u64 {
88 self.0
89 }
90
91 #[must_use]
92 pub const fn is_unbounded(self) -> bool {
93 self.0 == u64::MAX
94 }
95
96 #[must_use]
97 pub const fn is_empty(self) -> bool {
98 self.0 == 0
99 }
100
101 #[must_use]
102 pub const fn saturating_add(self, rhs: Self) -> Self {
103 Self(self.0.saturating_add(rhs.0))
104 }
105
106 pub fn consume_one(&mut self) -> bool {
107 match self.0 {
108 0 => false,
109 u64::MAX => true,
110 _ => {
111 self.0 -= 1;
112 true
113 }
114 }
115 }
116}
117
118pub trait PushOutlet<T>: Send {
119 fn push(&mut self, item: T) -> StreamResult<Demand>;
120
121 fn complete(&mut self) -> StreamResult<()> {
122 Ok(())
123 }
124
125 fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
126 Err(cause)
127 }
128}
129
130#[derive(Clone)]
131pub struct Source<Out, Mat = NotUsed> {
132 pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
133 pub(super) hints: SourceHints,
134 pub(super) attributes: Attributes,
135 pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
138}
139
140impl<Out: Send + 'static> Source<Out, NotUsed> {
141 pub(super) fn from_factory<F>(factory: F) -> Self
142 where
143 F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
144 {
145 Self::from_factory_with_hints(factory, SourceHints::default())
146 }
147
148 fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
149 where
150 F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
151 {
152 Self::from_materialized_factory_with_hints(
153 move |_materializer| Ok((factory(), NotUsed)),
154 hints,
155 )
156 }
157
158 #[must_use]
159 pub fn empty() -> Self {
160 Self::from_factory_with_hints(
161 || Box::new(std::iter::empty()),
162 SourceHints::with_inline_micro(0),
163 )
164 }
165
166 #[must_use]
167 pub fn never() -> Self {
168 Self::from_materialized_factory_with_hints(
169 move |materializer| {
170 let state = Arc::clone(&materializer.inner.state);
171 Ok((
172 Box::new(std::iter::from_fn(move || {
173 loop {
174 if state.shutdown.load(Ordering::SeqCst) {
175 return Some(Err(StreamError::AbruptTermination));
176 }
177 if super::runtime::current_stream_cancelled()
178 .as_ref()
179 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
180 {
181 return Some(Err(StreamError::Cancelled));
182 }
183 std::thread::park_timeout(Duration::from_millis(1));
184 }
185 })),
186 NotUsed,
187 ))
188 },
189 SourceHints::default(),
190 )
191 }
192
193 #[must_use]
194 pub fn failed(error: StreamError) -> Self {
195 Self::from_factory_with_hints(
196 move || Box::new(std::iter::once(Err(error.clone()))),
197 SourceHints::with_inline_micro(0),
199 )
200 }
201
202 #[must_use]
203 pub fn future<F, Fut>(future: F) -> Self
204 where
205 F: Fn() -> Fut + Send + Sync + 'static,
206 Fut: Future<Output = StreamResult<Out>> + Send + 'static,
207 {
208 let future = Arc::new(future);
209 Self::from_factory(move || {
210 let future = Arc::clone(&future);
211 let mut emitted = false;
212 Box::new(std::iter::from_fn(move || {
213 if emitted {
214 return None;
215 }
216 emitted = true;
217 Some(
218 catch_unwind_failed("source future factory", || future())
219 .and_then(flow::run_future_inline_or_spawn),
220 )
221 }))
222 })
223 }
224
225 #[must_use]
226 pub fn future_source<F, Fut>(future: F) -> Self
227 where
228 F: Fn() -> Fut + Send + Sync + 'static,
229 Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
230 {
231 let future = Arc::new(future);
232 Self::from_materialized_factory(move |materializer| {
233 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
234 let future = Arc::clone(&future);
235 let mut current = None::<BoxStream<Out>>;
236 let mut initialized = false;
237 let mut terminated = false;
238 Ok((
239 Box::new(std::iter::from_fn(move || {
240 if terminated {
241 return None;
242 }
243
244 loop {
245 if let Some(stream) = current.as_mut() {
246 match stream.next() {
247 Some(item) => return Some(item),
248 None => {
249 terminated = true;
250 return None;
251 }
252 }
253 }
254
255 if initialized {
256 terminated = true;
257 return None;
258 }
259 initialized = true;
260 let source = match catch_unwind_failed("future_source factory", || future())
261 .and_then(flow::run_future_inline_or_spawn)
262 {
263 Ok(source) => source,
264 Err(error) => {
265 terminated = true;
266 return Some(Err(error));
267 }
268 };
269 current = Some(match Arc::clone(&source.factory).create(&materializer) {
270 Ok((stream, _)) => stream,
271 Err(error) => {
272 terminated = true;
273 return Some(Err(error));
274 }
275 });
276 }
277 })) as BoxStream<Out>,
278 NotUsed,
279 ))
280 })
281 }
282
283 #[must_use]
284 pub fn cycle<F, I>(factory: F) -> Self
285 where
286 F: Fn() -> I + Send + Sync + 'static,
287 I: IntoIterator<Item = Out>,
288 I::IntoIter: Send + 'static,
289 {
290 let factory = Arc::new(factory);
291 Self::from_factory(move || {
292 let factory = Arc::clone(&factory);
293 let mut current = None::<I::IntoIter>;
294 let mut terminated = false;
295 Box::new(std::iter::from_fn(move || {
296 if terminated {
297 return None;
298 }
299
300 if let Some(iter) = current.as_mut()
301 && let Some(item) = iter.next()
302 {
303 return Some(Ok(item));
304 }
305
306 let mut next = match catch_unwind_failed("cycle factory", || factory()) {
307 Ok(iterable) => iterable.into_iter(),
308 Err(error) => {
309 terminated = true;
310 return Some(Err(error));
311 }
312 };
313 match next.next() {
314 Some(item) => {
315 current = Some(next);
316 Some(Ok(item))
317 }
318 None => {
319 terminated = true;
320 Some(Err(StreamError::Failed("empty iterator".into())))
321 }
322 }
323 }))
324 })
325 }
326
327 #[must_use]
328 pub fn unfold<State, F>(initial: State, f: F) -> Self
329 where
330 State: Clone + Send + Sync + 'static,
331 F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
332 {
333 let f = Arc::new(f);
334 Self::from_factory(move || {
335 let f = Arc::clone(&f);
336 let mut state = Some(initial.clone());
337 let mut terminated = false;
338 Box::new(std::iter::from_fn(move || {
339 if terminated {
340 return None;
341 }
342 let current = state.take().expect("unfold state present");
343 match catch_unwind_failed("unfold function", || f(current)) {
344 Ok(Some((next_state, item))) => {
345 state = Some(next_state);
346 Some(Ok(item))
347 }
348 Ok(None) => {
349 terminated = true;
350 None
351 }
352 Err(error) => {
353 terminated = true;
354 Some(Err(error))
355 }
356 }
357 }))
358 })
359 }
360
361 #[must_use]
362 pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
363 where
364 State: Clone + Send + Sync + 'static,
365 F: Fn(State) -> Fut + Send + Sync + 'static,
366 Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
367 {
368 let f = Arc::new(f);
369 Self::from_factory(move || {
370 let f = Arc::clone(&f);
371 let mut state = Some(initial.clone());
372 let mut terminated = false;
373 Box::new(std::iter::from_fn(move || {
374 if terminated {
375 return None;
376 }
377 let current = state.take().expect("unfold_async state present");
378 match catch_unwind_failed("unfold_async factory", || f(current))
379 .and_then(flow::run_future_inline_or_spawn)
380 {
381 Ok(Some((next_state, item))) => {
382 state = Some(next_state);
383 Some(Ok(item))
384 }
385 Ok(None) => {
386 terminated = true;
387 None
388 }
389 Err(error) => {
390 terminated = true;
391 Some(Err(error))
392 }
393 }
394 }))
395 })
396 }
397
398 #[must_use]
399 pub fn unfold_resource<Resource, Create, Read, Close>(
400 create: Create,
401 read: Read,
402 close: Close,
403 ) -> Self
404 where
405 Resource: Send + 'static,
406 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
407 Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
408 Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
409 {
410 let create = Arc::new(create);
411 let read = Arc::new(read);
412 let close = Arc::new(close);
413 Self::from_factory(move || {
414 Box::new(UnfoldResourceStream {
415 create: Arc::clone(&create),
416 read: Arc::clone(&read),
417 close: Arc::clone(&close),
418 resource: None,
419 created: false,
420 terminated: false,
421 _marker: PhantomData,
422 })
423 })
424 }
425
426 #[must_use]
427 pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
428 create: Create,
429 read: Read,
430 close: Close,
431 ) -> Self
432 where
433 Resource: Send + 'static,
434 Create: Fn() -> CreateFut + Send + Sync + 'static,
435 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
436 Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
437 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
438 Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
439 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
440 {
441 let create = Arc::new(create);
442 let read = Arc::new(read);
443 let close = Arc::new(close);
444 Self::from_factory(move || {
445 Box::new(UnfoldResourceAsyncStream {
446 create: Arc::clone(&create),
447 read: Arc::clone(&read),
448 close: Arc::clone(&close),
449 resource: None,
450 created: false,
451 terminated: false,
452 _marker: PhantomData,
453 })
454 })
455 }
456
457 #[must_use]
458 pub fn lazy_single<F>(create: F) -> Self
459 where
460 F: Fn() -> Out + Send + Sync + 'static,
461 {
462 let create = Arc::new(create);
463 Self::from_factory(move || {
464 let create = Arc::clone(&create);
465 let mut emitted = false;
466 Box::new(std::iter::from_fn(move || {
467 if emitted {
468 return None;
469 }
470 emitted = true;
471 Some(catch_unwind_failed("lazy_single factory", || create()))
472 }))
473 })
474 }
475
476 #[must_use]
477 pub fn lazy_future<F, Fut>(create: F) -> Self
478 where
479 F: Fn() -> Fut + Send + Sync + 'static,
480 Fut: Future<Output = StreamResult<Out>> + Send + 'static,
481 {
482 let create = Arc::new(create);
483 Self::from_factory(move || {
484 let create = Arc::clone(&create);
485 let mut emitted = false;
486 Box::new(std::iter::from_fn(move || {
487 if emitted {
488 return None;
489 }
490 emitted = true;
491 Some(
492 catch_unwind_failed("lazy_future factory", || create())
493 .and_then(flow::run_future_inline_or_spawn),
494 )
495 }))
496 })
497 }
498
499 #[must_use]
500 pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
501 where
502 InnerMat: Send + 'static,
503 F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
504 {
505 let create = Arc::new(create);
506 Source::from_materialized_factory(move |materializer| {
507 let (sender, receiver) = oneshot::channel();
508 Ok((
509 Box::new(LazySourceStream {
510 create: Arc::clone(&create),
511 materializer: materializer
512 .with_name_prefix(materializer.name_prefix().to_owned()),
513 current: None,
514 mat_sender: Some(sender),
515 initialized: false,
516 terminated: false,
517 }) as BoxStream<Out>,
518 StreamCompletion::from_receiver(receiver, None),
519 ))
520 })
521 }
522
523 #[must_use]
524 pub fn lazy_future_source<InnerMat, F, Fut>(
525 create: F,
526 ) -> Source<Out, StreamCompletion<InnerMat>>
527 where
528 InnerMat: Send + 'static,
529 F: Fn() -> Fut + Send + Sync + 'static,
530 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
531 {
532 let create = Arc::new(create);
533 Source::from_materialized_factory(move |materializer| {
534 let (sender, receiver) = oneshot::channel();
535 Ok((
536 Box::new(LazyFutureSourceStream {
537 create: Arc::clone(&create),
538 materializer: materializer
539 .with_name_prefix(materializer.name_prefix().to_owned()),
540 current: None,
541 mat_sender: Some(sender),
542 initialized: false,
543 terminated: false,
544 _marker: PhantomData,
545 }) as BoxStream<Out>,
546 StreamCompletion::from_receiver(receiver, None),
547 ))
548 })
549 }
550
551 #[must_use]
552 pub fn from_fn_iter<F, I>(factory: F) -> Self
553 where
554 F: Fn() -> I + Send + Sync + 'static,
555 I: IntoIterator<Item = Out>,
556 I::IntoIter: Send + 'static,
557 {
558 Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
559 }
560}
561
562impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
563 fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
564 where
565 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
566 {
567 Self {
568 factory: Arc::new(FnSourceFactory(factory)),
569 hints,
570 attributes: Attributes::default(),
571 split_hook: None,
572 }
573 }
574
575 pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
576 where
577 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
578 {
579 Self::from_materialized_factory_with_hints(factory, SourceHints::default())
580 }
581
582 #[must_use]
583 pub fn as_source_with_context<Ctx, F>(
584 self,
585 extract_context: F,
586 ) -> SourceWithContext<Out, Ctx, Mat>
587 where
588 Ctx: Send + 'static,
589 F: Fn(&Out) -> Ctx + Send + Sync + 'static,
590 {
591 SourceWithContext::from_source(self.map(move |item| {
592 let context = extract_context(&item);
593 (item, context)
594 }))
595 }
596
597 #[must_use]
598 pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
599 where
600 Next: Send + 'static,
601 FlowMat: Send + 'static,
602 {
603 self.via_mat(flow, Keep::left)
604 }
605
606 #[must_use]
607 pub fn via_mat<Next, FlowMat, Combined, F>(
608 self,
609 flow: Flow<Out, Next, FlowMat>,
610 combine: F,
611 ) -> Source<Next, Combined>
612 where
613 Next: Send + 'static,
614 FlowMat: Send + 'static,
615 Combined: Send + 'static,
616 F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
617 {
618 let source = self.factory;
619 let transform = flow.transform;
620 let materialize_flow = flow.materialize;
621 let hints = self.hints.after_flow(flow.hints);
622 let combine = Arc::new(combine);
623 Source::from_materialized_factory_with_hints(
624 move |materializer| {
625 let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
626 let flow_mat = materialize_flow()?;
627 let stream = match &transform {
628 FlowTransform::Pure(transform) => transform(stream),
629 FlowTransform::Runtime(transform) => transform(stream, materializer)?,
630 };
631 Ok((stream, combine(source_mat, flow_mat)))
632 },
633 hints,
634 )
635 }
636
637 #[must_use]
638 pub fn via_mat_with<Next, FlowMat, Combined, F>(
639 self,
640 flow: Flow<Out, Next, FlowMat>,
641 combine: F,
642 ) -> Source<Next, Combined>
643 where
644 Next: Send + 'static,
645 FlowMat: Send + 'static,
646 Combined: Send + 'static,
647 F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
648 {
649 self.via_mat(flow, combine)
650 }
651
652 #[must_use]
653 pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
654 where
655 Next: Send + 'static,
656 F: Fn(Out) -> Next + Send + Sync + 'static,
657 {
658 Source {
659 factory: Arc::new(MapSourceFactory {
660 source: self.factory,
661 stage: f,
662 _marker: PhantomData,
663 }),
664 hints: self.hints,
665 attributes: self.attributes,
666 split_hook: None,
667 }
668 }
669
670 #[must_use]
671 pub fn attributes(&self) -> &Attributes {
672 &self.attributes
673 }
674
675 #[must_use]
676 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
677 self.attributes = attributes;
678 self
679 }
680
681 #[must_use]
682 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
683 self.attributes = self.attributes.and(attributes);
684 self
685 }
686
687 #[must_use]
688 pub fn named(self, name: impl Into<String>) -> Self {
689 self.add_attributes(Attributes::named(name))
690 }
691
692 #[must_use]
693 pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
694 where
695 Next: Send + 'static,
696 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
697 {
698 self.via(Flow::identity().map_result(f))
699 }
700
701 #[must_use]
702 pub fn map_result_with_supervision<Next, F>(
703 self,
704 f: F,
705 decider: SupervisionDecider,
706 ) -> Source<Next, Mat>
707 where
708 Next: Send + 'static,
709 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
710 {
711 self.via(Flow::identity().map_result_with_supervision(f, decider))
712 }
713
714 #[must_use]
715 pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
716 where
717 F: Fn(&Out) -> bool + Send + Sync + 'static,
718 {
719 self.via(Flow::identity().filter(predicate))
720 }
721
722 #[must_use]
723 pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
724 where
725 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
726 {
727 self.via(Flow::identity().filter_result(predicate))
728 }
729
730 #[must_use]
731 pub fn filter_result_with_supervision<F>(
732 self,
733 predicate: F,
734 decider: SupervisionDecider,
735 ) -> Source<Out, Mat>
736 where
737 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
738 {
739 self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
740 }
741
742 #[must_use]
743 pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
744 where
745 F: Fn(&Out) -> bool + Send + Sync + 'static,
746 {
747 self.via(Flow::identity().filter_not(predicate))
748 }
749
750 #[must_use]
751 pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
752 where
753 Next: Send + 'static,
754 F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
755 {
756 self.via(Flow::identity().filter_map(f))
757 }
758
759 #[must_use]
760 pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
761 where
762 Next: Send + 'static,
763 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
764 {
765 self.via(Flow::identity().filter_map_result(f))
766 }
767
768 #[must_use]
769 pub fn filter_map_result_with_supervision<Next, F>(
770 self,
771 f: F,
772 decider: SupervisionDecider,
773 ) -> Source<Next, Mat>
774 where
775 Next: Send + 'static,
776 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
777 {
778 self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
779 }
780
781 #[must_use]
782 pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
783 where
784 Next: Send + 'static,
785 F: Fn(Out) -> I + Send + Sync + 'static,
786 I: IntoIterator<Item = Next>,
787 I::IntoIter: Send + 'static,
788 {
789 self.via(Flow::identity().map_concat(f))
790 }
791
792 #[must_use]
793 pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
794 where
795 Next: Send + 'static,
796 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
797 I: IntoIterator<Item = Next>,
798 I::IntoIter: Send + 'static,
799 {
800 self.via(Flow::identity().map_concat_result(f))
801 }
802
803 #[must_use]
804 pub fn map_concat_result_with_supervision<Next, F, I>(
805 self,
806 f: F,
807 decider: SupervisionDecider,
808 ) -> Source<Next, Mat>
809 where
810 Next: Send + 'static,
811 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
812 I: IntoIterator<Item = Next>,
813 I::IntoIter: Send + 'static,
814 {
815 self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
816 }
817
818 #[must_use]
819 pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
820 where
821 State: Clone + Send + Sync + 'static,
822 Next: Send + 'static,
823 F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
824 {
825 self.via(Flow::identity().stateful_map(seed, f))
826 }
827
828 #[must_use]
829 pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
830 where
831 State: Clone + Send + Sync + 'static,
832 Next: Send + 'static,
833 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
834 {
835 self.via(Flow::identity().stateful_map_result(seed, f))
836 }
837
838 #[must_use]
839 pub fn stateful_map_result_with_supervision<State, Next, F>(
840 self,
841 seed: State,
842 f: F,
843 decider: SupervisionDecider,
844 ) -> Source<Next, Mat>
845 where
846 State: Clone + Send + Sync + 'static,
847 Next: Send + 'static,
848 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
849 {
850 self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
851 }
852
853 #[must_use]
854 pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
855 where
856 State: Clone + Send + Sync + 'static,
857 Next: Send + 'static,
858 F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
859 I: IntoIterator<Item = Next>,
860 I::IntoIter: Send + 'static,
861 {
862 self.via(Flow::identity().stateful_map_concat(seed, f))
863 }
864
865 #[must_use]
866 pub fn stateful_map_concat_result<State, Next, F, I>(
867 self,
868 seed: State,
869 f: F,
870 ) -> Source<Next, Mat>
871 where
872 State: Clone + Send + Sync + 'static,
873 Next: Send + 'static,
874 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
875 I: IntoIterator<Item = Next>,
876 I::IntoIter: Send + 'static,
877 {
878 self.via(Flow::identity().stateful_map_concat_result(seed, f))
879 }
880
881 #[must_use]
882 pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
883 self,
884 seed: State,
885 f: F,
886 decider: SupervisionDecider,
887 ) -> Source<Next, Mat>
888 where
889 State: Clone + Send + Sync + 'static,
890 Next: Send + 'static,
891 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
892 I: IntoIterator<Item = Next>,
893 I::IntoIter: Send + 'static,
894 {
895 self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
896 }
897
898 #[must_use]
899 pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
900 where
901 Next: Send + 'static,
902 F: Fn(Out) -> Fut + Send + Sync + 'static,
903 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
904 {
905 self.via(Flow::identity().map_async(parallelism, f))
906 }
907
908 #[must_use]
909 pub fn map_async_with_supervision<Next, F, Fut>(
910 self,
911 parallelism: usize,
912 f: F,
913 decider: SupervisionDecider,
914 ) -> Source<Next, Mat>
915 where
916 Next: Send + 'static,
917 F: Fn(Out) -> Fut + Send + Sync + 'static,
918 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
919 {
920 self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
921 }
922
923 #[must_use]
924 pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
925 where
926 Next: Send + 'static,
927 F: Fn(Out) -> Fut + Send + Sync + 'static,
928 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
929 {
930 self.via(Flow::identity().map_async_unordered(parallelism, f))
931 }
932
933 #[must_use]
934 pub fn map_async_unordered_with_supervision<Next, F, Fut>(
935 self,
936 parallelism: usize,
937 f: F,
938 decider: SupervisionDecider,
939 ) -> Source<Next, Mat>
940 where
941 Next: Send + 'static,
942 F: Fn(Out) -> Fut + Send + Sync + 'static,
943 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
944 {
945 self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
946 }
947
948 #[must_use]
949 pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
950 self,
951 parallelism: usize,
952 per_partition: usize,
953 partition: Partition,
954 f: F,
955 ) -> Source<Next, Mat>
956 where
957 Key: Clone + Eq + Hash + Send + 'static,
958 Next: Send + 'static,
959 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
960 F: Fn(Out) -> Fut + Send + Sync + 'static,
961 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
962 {
963 self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
964 }
965
966 #[must_use]
967 pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
968 self.via(Flow::identity().prefix_and_tail(n))
969 }
970
971 #[must_use]
972 pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
973 where
974 Next: Send + 'static,
975 FlowMat: Send + 'static,
976 F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
977 Out: Clone,
978 {
979 self.via(Flow::identity().flat_map_prefix(n, f))
980 }
981
982 #[must_use]
983 pub fn group_by<Key, F>(
984 self,
985 max_substreams: usize,
986 f: F,
987 allow_closed_substream_recreation: bool,
988 ) -> Source<Source<Out>, Mat>
989 where
990 Key: Clone + Eq + Hash + Send + 'static,
991 F: Fn(&Out) -> Key + Send + Sync + 'static,
992 Out: Clone,
993 {
994 self.via(Flow::identity().group_by(max_substreams, f, allow_closed_substream_recreation))
995 }
996
997 #[must_use]
998 pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
999 where
1000 F: Fn(&Out) -> bool + Send + Sync + 'static,
1001 Out: Clone,
1002 {
1003 self.via(Flow::identity().split_when(predicate))
1004 }
1005
1006 #[must_use]
1007 pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1008 where
1009 F: Fn(&Out) -> bool + Send + Sync + 'static,
1010 Out: Clone,
1011 {
1012 self.via(Flow::identity().split_after(predicate))
1013 }
1014
1015 #[must_use]
1016 pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
1017 where
1018 Next: Send + 'static,
1019 NextMat: Send + 'static,
1020 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1021 {
1022 self.via(Flow::identity().flat_map_concat(f))
1023 }
1024
1025 #[must_use]
1026 pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
1027 where
1028 Next: Send + 'static,
1029 NextMat: Send + 'static,
1030 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1031 {
1032 self.via(Flow::identity().flat_map_merge(breadth, f))
1033 }
1034
1035 #[must_use]
1036 pub fn take(self, n: usize) -> Source<Out, Mat> {
1037 self.via(Flow::identity().take(n))
1038 }
1039
1040 #[must_use]
1041 pub fn drop(self, n: usize) -> Source<Out, Mat> {
1042 self.via(Flow::identity().drop(n))
1043 }
1044
1045 #[must_use]
1046 pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
1047 where
1048 F: Fn(&Out) -> bool + Send + Sync + 'static,
1049 {
1050 self.via(Flow::identity().take_while(predicate))
1051 }
1052
1053 #[must_use]
1054 pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
1055 where
1056 F: Fn(&Out) -> bool + Send + Sync + 'static,
1057 {
1058 self.via(Flow::identity().drop_while(predicate))
1059 }
1060
1061 #[must_use]
1062 pub fn limit(self, max: u64) -> Source<Out, Mat> {
1063 self.via(Flow::identity().limit(max))
1064 }
1065
1066 #[must_use]
1067 pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
1068 self.via(Flow::identity().grouped(size))
1069 }
1070
1071 #[must_use]
1072 pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1073 where
1074 State: Clone + Send + Sync + 'static,
1075 F: Fn(State, Out) -> State + Send + Sync + 'static,
1076 {
1077 self.via(Flow::identity().scan(seed, f))
1078 }
1079
1080 #[must_use]
1081 pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
1082 where
1083 State: Clone + Send + Sync + 'static,
1084 F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1085 Fut: Future<Output = StreamResult<State>> + Send + 'static,
1086 {
1087 self.via(Flow::identity().scan_async(seed, f))
1088 }
1089
1090 #[must_use]
1091 pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1092 where
1093 State: Clone + Send + Sync + 'static,
1094 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1095 {
1096 self.via(Flow::identity().scan_result(seed, f))
1097 }
1098
1099 #[must_use]
1100 pub fn scan_result_with_supervision<State, F>(
1101 self,
1102 seed: State,
1103 f: F,
1104 decider: SupervisionDecider,
1105 ) -> Source<State, Mat>
1106 where
1107 State: Clone + Send + Sync + 'static,
1108 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1109 {
1110 self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
1111 }
1112
1113 #[must_use]
1114 pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
1115 where
1116 Out: Clone,
1117 {
1118 self.via(Flow::identity().sliding(size, step))
1119 }
1120
1121 #[must_use]
1122 pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1123 where
1124 Acc: Clone + Send + Sync + 'static,
1125 F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1126 {
1127 self.via(Flow::identity().fold(zero, f))
1128 }
1129
1130 #[must_use]
1131 pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1132 where
1133 Acc: Clone + Send + Sync + 'static,
1134 F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1135 Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1136 {
1137 self.via(Flow::identity().fold_async(zero, f))
1138 }
1139
1140 #[must_use]
1141 pub fn map_with_resource<Resource, Next, Create, F, Close>(
1142 self,
1143 create: Create,
1144 f: F,
1145 close: Close,
1146 ) -> Source<Next, Mat>
1147 where
1148 Resource: Send + 'static,
1149 Next: Send + 'static,
1150 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1151 F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1152 Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1153 {
1154 self.via(Flow::identity().map_with_resource(create, f, close))
1155 }
1156
1157 #[must_use]
1158 pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1159 where
1160 Acc: Clone + Send + Sync + 'static,
1161 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1162 {
1163 self.via(Flow::identity().fold_result(zero, f))
1164 }
1165
1166 #[must_use]
1167 pub fn fold_result_with_supervision<Acc, F>(
1168 self,
1169 zero: Acc,
1170 f: F,
1171 decider: SupervisionDecider,
1172 ) -> Source<Acc, Mat>
1173 where
1174 Acc: Clone + Send + Sync + 'static,
1175 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1176 {
1177 self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
1178 }
1179
1180 #[must_use]
1181 pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
1182 where
1183 F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1184 {
1185 self.via(Flow::identity().reduce(f))
1186 }
1187
1188 #[must_use]
1189 pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
1190 where
1191 Out: Clone,
1192 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1193 {
1194 self.via(Flow::identity().reduce_result(f))
1195 }
1196
1197 #[must_use]
1198 pub fn reduce_result_with_supervision<F>(
1199 self,
1200 f: F,
1201 decider: SupervisionDecider,
1202 ) -> Source<Out, Mat>
1203 where
1204 Out: Clone,
1205 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1206 {
1207 self.via(Flow::identity().reduce_result_with_supervision(f, decider))
1208 }
1209
1210 #[must_use]
1211 pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
1212 where
1213 F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
1214 {
1215 self.via(Flow::identity().map_error(f))
1216 }
1217
1218 #[must_use]
1219 pub fn recover<F>(self, f: F) -> Source<Out, Mat>
1220 where
1221 F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
1222 {
1223 self.via(Flow::identity().recover(f))
1224 }
1225
1226 #[must_use]
1227 pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
1228 where
1229 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1230 {
1231 self.via(Flow::identity().recover_with(f))
1232 }
1233
1234 #[must_use]
1235 pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
1236 where
1237 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1238 {
1239 self.via(Flow::identity().recover_with_retries(retries, f))
1240 }
1241
1242 #[must_use]
1243 pub fn on_error_complete(self) -> Source<Out, Mat> {
1244 self.via(Flow::identity().on_error_complete())
1245 }
1246
1247 #[must_use]
1248 pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1249 where
1250 Mat2: Send + 'static,
1251 {
1252 let factory = self.factory;
1253 let hints = self.hints;
1254 let that_factory = that.factory;
1255 Source::from_materialized_factory_with_hints(
1256 move |materializer| {
1257 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1258 let secondary = match Arc::clone(&that_factory).create(materializer) {
1259 Ok((stream, _)) => stream,
1260 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1261 };
1262 Ok((concat_source_streams(vec![primary, secondary]), mat))
1263 },
1264 hints,
1265 )
1266 }
1267
1268 #[must_use]
1269 pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1270 where
1271 Mat2: Send + 'static,
1272 {
1273 let factory = self.factory;
1274 let hints = self.hints;
1275 let that_factory = that.factory;
1276 Source::from_materialized_factory_with_hints(
1277 move |materializer| {
1278 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1279 Ok((
1280 concat_source_streams_lazy(
1281 primary,
1282 vec![Arc::clone(&that_factory)],
1283 materializer,
1284 ),
1285 mat,
1286 ))
1287 },
1288 hints,
1289 )
1290 }
1291
1292 #[must_use]
1293 pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
1294 where
1295 Mat2: Send + 'static,
1296 I: IntoIterator<Item = Source<Out, Mat2>>,
1297 {
1298 let factory = self.factory;
1299 let hints = self.hints;
1300 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1301 Source::from_materialized_factory_with_hints(
1302 move |materializer| {
1303 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1304 Ok((
1305 concat_source_streams_lazy(primary, other_factories.clone(), materializer),
1306 mat,
1307 ))
1308 },
1309 hints,
1310 )
1311 }
1312
1313 #[must_use]
1314 pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1315 where
1316 Mat2: Send + 'static,
1317 {
1318 let factory = self.factory;
1319 let hints = self.hints;
1320 let that_factory = that.factory;
1321 Source::from_materialized_factory_with_hints(
1322 move |materializer| {
1323 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1324 let secondary = match Arc::clone(&that_factory).create(materializer) {
1325 Ok((stream, _)) => stream,
1326 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1327 };
1328 Ok((concat_source_streams(vec![secondary, primary]), mat))
1329 },
1330 hints,
1331 )
1332 }
1333
1334 #[must_use]
1335 pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1336 where
1337 Mat2: Send + 'static,
1338 {
1339 self.prepend(that)
1340 }
1341
1342 #[must_use]
1343 pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
1344 where
1345 Mat2: Send + 'static,
1346 {
1347 let factory = self.factory;
1348 let hints = self.hints;
1349 let secondary_factory = secondary.factory;
1350 Source::from_materialized_factory_with_hints(
1351 move |materializer| {
1352 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1353 let secondary = match Arc::clone(&secondary_factory).create(materializer) {
1354 Ok((stream, _)) => stream,
1355 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1356 };
1357 Ok((or_else_source_stream(primary, secondary), mat))
1358 },
1359 hints,
1360 )
1361 }
1362
1363 #[must_use]
1364 pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
1365 where
1366 Mat2: Send + 'static,
1367 {
1368 self.interleave_all([that], segment_size, false)
1369 }
1370
1371 #[must_use]
1372 pub fn interleave_all<Mat2, I>(
1373 self,
1374 those: I,
1375 segment_size: usize,
1376 eager_close: bool,
1377 ) -> Source<Out, Mat>
1378 where
1379 Mat2: Send + 'static,
1380 I: IntoIterator<Item = Source<Out, Mat2>>,
1381 {
1382 let factory = self.factory;
1383 let hints = self.hints;
1384 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1385 Source::from_materialized_factory_with_hints(
1386 move |materializer| {
1387 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1388 let mut streams = Vec::with_capacity(other_factories.len() + 1);
1389 streams.push(primary);
1390 for other in &other_factories {
1391 let stream = match Arc::clone(other).create(materializer) {
1392 Ok((stream, _)) => stream,
1393 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1394 };
1395 streams.push(stream);
1396 }
1397 Ok((
1398 interleave_source_streams(streams, segment_size, eager_close),
1399 mat,
1400 ))
1401 },
1402 hints,
1403 )
1404 }
1405
1406 #[must_use]
1407 pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1408 where
1409 Out: Ord,
1410 Mat2: Send + 'static,
1411 {
1412 self.via(Flow::identity().merge_sorted(that))
1413 }
1414
1415 #[must_use]
1416 pub fn merge_latest<Mat2>(
1417 self,
1418 that: Source<Out, Mat2>,
1419 eager_complete: bool,
1420 ) -> Source<Vec<Out>, Mat>
1421 where
1422 Out: Clone,
1423 Mat2: Send + 'static,
1424 {
1425 let factory = self.factory;
1426 let hints = self.hints;
1427 let that_factory = that.factory;
1428 Source::from_materialized_factory_with_hints(
1429 move |materializer| {
1430 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1431 let right = match Arc::clone(&that_factory).create(materializer) {
1432 Ok((stream, _)) => stream,
1433 Err(error) => {
1434 return Ok((
1435 Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
1436 mat,
1437 ));
1438 }
1439 };
1440 Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
1441 },
1442 hints,
1443 )
1444 }
1445
1446 #[must_use]
1447 pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
1448 where
1449 Mat2: Send + 'static,
1450 I: IntoIterator<Item = Source<Out, Mat2>>,
1451 {
1452 let factory = self.factory;
1453 let hints = self.hints;
1454 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1455 Source::from_materialized_factory_with_hints(
1456 move |materializer| {
1457 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1458 let mut streams = Vec::with_capacity(other_factories.len() + 1);
1459 streams.push(primary);
1460 for other in &other_factories {
1461 let stream = match Arc::clone(other).create(materializer) {
1462 Ok((stream, _)) => stream,
1463 Err(error) => {
1464 return Ok((
1465 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1466 mat,
1467 ));
1468 }
1469 };
1470 streams.push(stream);
1471 }
1472 Ok((merge_streams(streams, eager_complete), mat))
1473 },
1474 hints,
1475 )
1476 }
1477
1478 #[must_use]
1479 pub fn zip_with<Mat2, Out2, Next, F>(
1480 self,
1481 that: Source<Out2, Mat2>,
1482 combine: F,
1483 ) -> Source<Next, Mat>
1484 where
1485 Out2: Send + 'static,
1486 Next: Send + 'static,
1487 Mat2: Send + 'static,
1488 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1489 {
1490 self.via(Flow::identity().zip_with(that, combine))
1491 }
1492
1493 #[must_use]
1494 pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
1495 where
1496 Out: Clone,
1497 Out2: Clone + Send + 'static,
1498 Mat2: Send + 'static,
1499 {
1500 self.zip_latest_with(that, true, |left, right| (left, right))
1501 }
1502
1503 #[must_use]
1504 pub fn zip_latest_with<Mat2, Out2, Next, F>(
1505 self,
1506 that: Source<Out2, Mat2>,
1507 eager_complete: bool,
1508 combine: F,
1509 ) -> Source<Next, Mat>
1510 where
1511 Out: Clone,
1512 Out2: Clone + Send + 'static,
1513 Next: Send + 'static,
1514 Mat2: Send + 'static,
1515 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1516 {
1517 let factory = self.factory;
1518 let hints = self.hints;
1519 let that_factory = that.factory;
1520 let combine = Arc::new(combine);
1521 Source::from_materialized_factory_with_hints(
1522 move |materializer| {
1523 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1524 let right = match Arc::clone(&that_factory).create(materializer) {
1525 Ok((stream, _)) => stream,
1526 Err(error) => {
1527 return Ok((
1528 Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1529 mat,
1530 ));
1531 }
1532 };
1533 Ok((
1534 zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
1535 mat,
1536 ))
1537 },
1538 hints,
1539 )
1540 }
1541
1542 #[must_use]
1543 pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
1544 let factory = self.factory;
1545 let hints = self.hints;
1546 Source::from_materialized_factory_with_hints(
1547 move |materializer| {
1548 let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
1549 let mut index = 0_u64;
1550 Ok((
1551 Box::new(std::iter::from_fn(move || {
1552 stream.next().map(|item| {
1553 item.map(|value| {
1554 let pair = (value, index);
1555 index = index.wrapping_add(1);
1556 pair
1557 })
1558 })
1559 })) as BoxStream<(Out, u64)>,
1560 mat,
1561 ))
1562 },
1563 hints,
1564 )
1565 }
1566
1567 #[must_use]
1568 pub fn zip_all<Mat2, Out2>(
1569 self,
1570 that: Source<Out2, Mat2>,
1571 this_elem: Out,
1572 that_elem: Out2,
1573 ) -> Source<(Out, Out2), Mat>
1574 where
1575 Out: Clone + Sync,
1576 Out2: Clone + Send + Sync + 'static,
1577 Mat2: Send + 'static,
1578 {
1579 let factory = self.factory;
1580 let hints = self.hints;
1581 let that_factory = that.factory;
1582 Source::from_materialized_factory_with_hints(
1583 move |materializer| {
1584 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1585 let right = match Arc::clone(&that_factory).create(materializer) {
1586 Ok((stream, _)) => stream,
1587 Err(error) => {
1588 return Ok((
1589 Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
1590 mat,
1591 ));
1592 }
1593 };
1594 Ok((
1595 zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
1596 mat,
1597 ))
1598 },
1599 hints,
1600 )
1601 }
1602
1603 #[must_use]
1604 pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1605 where
1606 Out: Clone,
1607 SinkMat: Send + 'static,
1608 {
1609 self.via(Flow::identity().also_to(sink))
1610 }
1611
1612 #[must_use]
1613 pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
1614 where
1615 Out: Clone,
1616 SinkMat: Send + 'static,
1617 I: IntoIterator<Item = Sink<Out, SinkMat>>,
1618 {
1619 self.via(Flow::identity().also_to_all(sinks))
1620 }
1621
1622 #[must_use]
1623 pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
1624 where
1625 SinkMat: Send + 'static,
1626 F: Fn(&Out) -> bool + Send + Sync + 'static,
1627 {
1628 self.via(Flow::identity().divert_to(sink, predicate))
1629 }
1630
1631 #[must_use]
1632 pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1633 where
1634 Out: Clone,
1635 SinkMat: Send + 'static,
1636 {
1637 self.via(Flow::identity().wire_tap(sink))
1638 }
1639
1640 pub fn run_with<SinkMat: Send + 'static>(
1641 self,
1642 sink: Sink<Out, SinkMat>,
1643 ) -> StreamResult<SinkMat> {
1644 let fast_result = self
1647 .split_hook
1648 .as_ref()
1649 .zip(sink.fold_fp.as_deref())
1650 .and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
1651 if let Some(result) = fast_result {
1652 return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
1653 StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
1654 });
1655 }
1656 self.to_mat(sink, Keep::right).run()
1657 }
1658
1659 pub fn run_with_materializer<SinkMat: Send + 'static>(
1660 self,
1661 sink: Sink<Out, SinkMat>,
1662 materializer: &Materializer,
1663 ) -> StreamResult<SinkMat> {
1664 self.to_mat(sink, Keep::right)
1665 .run_with_materializer(materializer)
1666 }
1667
1668 #[must_use]
1669 pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
1670 where
1671 SinkMat: Send + 'static,
1672 {
1673 self.to_mat(sink, Keep::left)
1674 }
1675
1676 #[must_use]
1677 pub fn to_mat<SinkMat, Combined, F>(
1678 self,
1679 sink: Sink<Out, SinkMat>,
1680 combine: F,
1681 ) -> RunnableGraph<Combined>
1682 where
1683 SinkMat: Send + 'static,
1684 Combined: Send + 'static,
1685 F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
1686 {
1687 let factory = self.factory;
1688 let hints = self.hints;
1689 let combine = Arc::new(combine);
1690 RunnableGraph::from_runner(move |materializer| {
1691 let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1692 let stream =
1693 runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
1694 let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
1695 sink.run_inline(stream, materializer)?
1696 } else {
1697 sink.run(stream, materializer)?
1698 };
1699 Ok(combine(source_mat, sink_mat))
1700 })
1701 }
1702
1703 pub fn run_collect(self) -> StreamResult<Vec<Out>> {
1704 self.run_with(Sink::collect())?.wait()
1705 }
1706
1707 #[must_use]
1708 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
1709 where
1710 NextMat: Send + 'static,
1711 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
1712 {
1713 let factory = self.factory;
1714 let hints = self.hints;
1715 let f = Arc::new(f);
1716 Source::from_materialized_factory_with_hints(
1717 move |materializer| {
1718 let (stream, mat) = Arc::clone(&factory).create(materializer)?;
1719 Ok((stream, f(mat)))
1720 },
1721 hints,
1722 )
1723 }
1724}
1725
1726impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
1727 #[must_use]
1728 pub fn combine<Mat1, Mat2, MatRest, I>(
1729 first: Source<Out, Mat1>,
1730 second: Source<Out, Mat2>,
1731 rest: I,
1732 strategy: SourceCombineStrategy,
1733 ) -> Source<Out, NotUsed>
1734 where
1735 Mat1: Send + 'static,
1736 Mat2: Send + 'static,
1737 MatRest: Send + 'static,
1738 I: IntoIterator<Item = Source<Out, MatRest>>,
1739 {
1740 let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
1741 Arc::new(move |materializer| {
1742 Arc::clone(&first.factory)
1743 .create(materializer)
1744 .map(|(stream, _)| stream)
1745 }),
1746 Arc::new(move |materializer| {
1747 Arc::clone(&second.factory)
1748 .create(materializer)
1749 .map(|(stream, _)| stream)
1750 }),
1751 ];
1752 factories.extend(rest.into_iter().map(|source| {
1753 Arc::new(move |materializer: &Materializer| {
1754 Arc::clone(&source.factory)
1755 .create(materializer)
1756 .map(|(stream, _)| stream)
1757 }) as Arc<CombinedSourceFactory<Out>>
1758 }));
1759 Source::from_materialized_factory(move |materializer| {
1760 let mut streams = Vec::with_capacity(factories.len());
1761 for factory in &factories {
1762 let stream = match factory(materializer) {
1763 Ok(stream) => stream,
1764 Err(error) => {
1765 return Ok((
1766 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1767 NotUsed,
1768 ));
1769 }
1770 };
1771 streams.push(stream);
1772 }
1773 let stream = match &strategy {
1774 SourceCombineStrategy::Merge { eager_complete } => {
1775 merge_streams(streams, *eager_complete)
1776 }
1777 SourceCombineStrategy::Concat => concat_source_streams(streams),
1778 SourceCombineStrategy::Prioritized {
1779 priorities,
1780 eager_complete,
1781 } => {
1782 if priorities.len() != streams.len() {
1783 return Err(StreamError::GraphValidation(format!(
1784 "combine priorities length {} did not match source count {}",
1785 priorities.len(),
1786 streams.len()
1787 )));
1788 }
1789 merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
1790 }
1791 };
1792 Ok((stream, NotUsed))
1793 })
1794 }
1795
1796 #[must_use]
1797 pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
1798 where
1799 I: IntoIterator<Item = Source<Out, Mat2>>,
1800 Mat2: Send + 'static,
1801 Out: Clone,
1802 {
1803 Self::zip_with_n(sources, |values| values)
1804 }
1805
1806 #[must_use]
1807 pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
1808 where
1809 I: IntoIterator<Item = Source<Out, Mat2>>,
1810 Mat2: Send + 'static,
1811 Next: Send + 'static,
1812 F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
1813 {
1814 let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
1815 let zipper = Arc::new(zipper);
1816 Source::from_materialized_factory(move |materializer| {
1817 let mut streams = Vec::with_capacity(factories.len());
1818 for factory in &factories {
1819 let stream = match Arc::clone(factory).create(materializer) {
1820 Ok((stream, _)) => stream,
1821 Err(error) => {
1822 return Ok((
1823 Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1824 NotUsed,
1825 ));
1826 }
1827 };
1828 streams.push(stream);
1829 }
1830 Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
1831 })
1832 }
1833
1834 #[must_use]
1835 pub fn merge_prioritized_n<Mat2, I>(
1836 sources_and_priorities: I,
1837 eager_complete: bool,
1838 ) -> Source<Out, NotUsed>
1839 where
1840 I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
1841 Mat2: Send + 'static,
1842 {
1843 let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
1844 if sources_and_priorities.is_empty() {
1845 return Source::empty();
1846 }
1847 let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
1848 .into_iter()
1849 .map(|(source, priority)| (source.factory, priority))
1850 .unzip();
1851 Source::from_materialized_factory(move |materializer| {
1852 let mut streams = Vec::with_capacity(factories.len());
1853 for factory in &factories {
1854 let stream = match Arc::clone(factory).create(materializer) {
1855 Ok((stream, _)) => stream,
1856 Err(error) => {
1857 return Ok((
1858 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1859 NotUsed,
1860 ));
1861 }
1862 };
1863 streams.push(stream);
1864 }
1865 Ok((
1866 merge_prioritized_streams(streams, priorities.clone(), eager_complete),
1867 NotUsed,
1868 ))
1869 })
1870 }
1871
1872 #[must_use]
1873 pub fn maybe() -> (MaybeHandle<Out>, Self) {
1874 let value = Arc::new(Mutex::new(None));
1875 let handle = MaybeHandle {
1876 value: Arc::clone(&value),
1877 };
1878 let source = Self::from_factory(move || {
1879 let result = value
1880 .lock()
1881 .expect("maybe source poisoned")
1882 .clone()
1883 .unwrap_or(Err(StreamError::MaybeIncomplete));
1884 Box::new(std::iter::once(result))
1885 });
1886 (handle, source)
1887 }
1888
1889 #[must_use]
1890 pub fn single(item: Out) -> Self {
1891 Self::from_factory_with_hints(
1892 move || Box::new(std::iter::once(Ok(item.clone()))),
1893 SourceHints::with_inline_micro(1),
1894 )
1895 }
1896
1897 #[must_use]
1898 pub fn repeat(item: Out) -> Self {
1899 Self::from_factory(move || {
1900 let item = item.clone();
1901 Box::new(std::iter::repeat_with(move || Ok(item.clone())))
1902 })
1903 }
1904
1905 #[must_use]
1906 pub fn from_iterable<I>(items: I) -> Self
1907 where
1908 I: IntoIterator<Item = Out>,
1909 {
1910 items.into_iter().collect()
1911 }
1912}
1913
1914impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
1915 fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
1916 let items: Arc<[Out]> = iter.into_iter().collect();
1919 let len = items.len();
1920 Self::from_factory_with_hints(
1921 move || {
1922 let items = Arc::clone(&items);
1923 let mut index = 0;
1924 Box::new(std::iter::from_fn(move || {
1925 let item = items.get(index)?.clone();
1926 index += 1;
1927 Some(Ok(item))
1928 }))
1929 },
1930 SourceHints::with_inline_micro(len),
1932 )
1933 }
1934}
1935
1936#[cfg(test)]
1941pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
1942 factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
1943 max_success_items: usize,
1944) -> Source<Out, NotUsed> {
1945 Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
1946}
1947
1948struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
1949where
1950 Create: Fn() -> StreamResult<Resource>,
1951 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1952 Close: Fn(Resource) -> StreamResult<()>,
1953{
1954 create: Arc<Create>,
1955 read: Arc<Read>,
1956 close: Arc<Close>,
1957 resource: Option<Resource>,
1958 created: bool,
1959 terminated: bool,
1960 _marker: PhantomData<fn() -> Out>,
1961}
1962
1963impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
1964where
1965 Create: Fn() -> StreamResult<Resource>,
1966 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1967 Close: Fn(Resource) -> StreamResult<()>,
1968{
1969 fn ensure_created(&mut self) -> StreamResult<()> {
1970 if self.created {
1971 return Ok(());
1972 }
1973 self.created = true;
1974 let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
1975 .and_then(|result| result)?;
1976 self.resource = Some(resource);
1977 Ok(())
1978 }
1979
1980 fn close_resource(&mut self) -> StreamResult<()> {
1981 match self.resource.take() {
1982 Some(resource) => {
1983 catch_unwind_failed("unfold_resource close", || (self.close)(resource))
1984 .and_then(|result| result)
1985 }
1986 None => Ok(()),
1987 }
1988 }
1989}
1990
1991impl<Resource, Out, Create, Read, Close> Iterator
1992 for UnfoldResourceStream<Resource, Out, Create, Read, Close>
1993where
1994 Create: Fn() -> StreamResult<Resource>,
1995 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
1996 Close: Fn(Resource) -> StreamResult<()>,
1997{
1998 type Item = StreamResult<Out>;
1999
2000 fn next(&mut self) -> Option<Self::Item> {
2001 if self.terminated {
2002 return None;
2003 }
2004 if let Err(error) = self.ensure_created() {
2005 self.terminated = true;
2006 return Some(Err(error));
2007 }
2008
2009 let result = {
2010 let resource = self
2011 .resource
2012 .as_mut()
2013 .expect("unfold_resource resource is open");
2014 catch_unwind_failed("unfold_resource read", || (self.read)(resource))
2015 .and_then(|result| result)
2016 };
2017
2018 match result {
2019 Ok(Some(item)) => Some(Ok(item)),
2020 Ok(None) => {
2021 self.terminated = true;
2022 match self.close_resource() {
2023 Ok(()) => None,
2024 Err(error) => Some(Err(error)),
2025 }
2026 }
2027 Err(read_error) => {
2028 self.terminated = true;
2029 let _ = self.close_resource();
2030 Some(Err(read_error))
2031 }
2032 }
2033 }
2034}
2035
2036impl<Resource, Out, Create, Read, Close> Drop
2037 for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2038where
2039 Create: Fn() -> StreamResult<Resource>,
2040 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2041 Close: Fn(Resource) -> StreamResult<()>,
2042{
2043 fn drop(&mut self) {
2044 let _ = self.close_resource();
2045 }
2046}
2047
2048type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
2049 fn() -> (Out, CreateFut, ReadFut, CloseFut);
2050
2051struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2052where
2053 Resource: Send + 'static,
2054 Create: Fn() -> CreateFut,
2055 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2056 Read: Fn(&mut Resource) -> ReadFut,
2057 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2058 Close: Fn(Resource) -> CloseFut,
2059 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2060{
2061 create: Arc<Create>,
2062 read: Arc<Read>,
2063 close: Arc<Close>,
2064 resource: Option<Resource>,
2065 created: bool,
2066 terminated: bool,
2067 _marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
2068}
2069
2070impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2071 UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2072where
2073 Resource: Send + 'static,
2074 Create: Fn() -> CreateFut,
2075 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2076 Read: Fn(&mut Resource) -> ReadFut,
2077 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2078 Close: Fn(Resource) -> CloseFut,
2079 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2080{
2081 fn ensure_created(&mut self) -> StreamResult<()> {
2082 if self.created {
2083 return Ok(());
2084 }
2085 self.created = true;
2086 let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
2087 .and_then(flow::run_future_inline_or_spawn)?;
2088 self.resource = Some(resource);
2089 Ok(())
2090 }
2091
2092 fn close_resource(&mut self) -> StreamResult<()> {
2093 match self.resource.take() {
2094 Some(resource) => {
2095 catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
2096 .and_then(flow::run_future_inline_or_spawn)
2097 }
2098 None => Ok(()),
2099 }
2100 }
2101}
2102
2103impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
2104 for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2105where
2106 Resource: Send + 'static,
2107 Out: Send + 'static,
2108 Create: Fn() -> CreateFut,
2109 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2110 Read: Fn(&mut Resource) -> ReadFut,
2111 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2112 Close: Fn(Resource) -> CloseFut,
2113 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2114{
2115 type Item = StreamResult<Out>;
2116
2117 fn next(&mut self) -> Option<Self::Item> {
2118 if self.terminated {
2119 return None;
2120 }
2121 if let Err(error) = self.ensure_created() {
2122 self.terminated = true;
2123 return Some(Err(error));
2124 }
2125
2126 let result = {
2127 let resource = self
2128 .resource
2129 .as_mut()
2130 .expect("unfold_resource_async resource is open");
2131 catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
2132 .and_then(flow::run_future_inline_or_spawn)
2133 };
2134
2135 match result {
2136 Ok(Some(item)) => Some(Ok(item)),
2137 Ok(None) => {
2138 self.terminated = true;
2139 match self.close_resource() {
2140 Ok(()) => None,
2141 Err(error) => Some(Err(error)),
2142 }
2143 }
2144 Err(read_error) => {
2145 self.terminated = true;
2146 let _ = self.close_resource();
2147 Some(Err(read_error))
2148 }
2149 }
2150 }
2151}
2152
2153impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
2154 for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2155where
2156 Resource: Send + 'static,
2157 Create: Fn() -> CreateFut,
2158 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2159 Read: Fn(&mut Resource) -> ReadFut,
2160 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2161 Close: Fn(Resource) -> CloseFut,
2162 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2163{
2164 fn drop(&mut self) {
2165 let _ = self.close_resource();
2166 }
2167}
2168
2169struct LazySourceStream<Out, InnerMat, F> {
2170 create: Arc<F>,
2171 materializer: Materializer,
2172 current: Option<BoxStream<Out>>,
2173 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2174 initialized: bool,
2175 terminated: bool,
2176}
2177
2178impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
2179where
2180 Out: Send + 'static,
2181 InnerMat: Send + 'static,
2182 F: Fn() -> Source<Out, InnerMat>,
2183{
2184 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2185 if let Some(sender) = self.mat_sender.take() {
2186 let _ = sender.send(result);
2187 }
2188 }
2189
2190 fn initialize(&mut self) -> StreamResult<()> {
2191 if self.initialized {
2192 return Ok(());
2193 }
2194 self.initialized = true;
2195 let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
2196 Ok(source) => source,
2197 Err(error) => {
2198 self.complete_mat(Err(error.clone()));
2199 return Err(error);
2200 }
2201 };
2202 match Arc::clone(&source.factory).create(&self.materializer) {
2203 Ok((stream, mat)) => {
2204 self.current = Some(stream);
2205 self.complete_mat(Ok(mat));
2206 Ok(())
2207 }
2208 Err(error) => {
2209 self.complete_mat(Err(error.clone()));
2210 Err(error)
2211 }
2212 }
2213 }
2214}
2215
2216impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
2217where
2218 Out: Send + 'static,
2219 InnerMat: Send + 'static,
2220 F: Fn() -> Source<Out, InnerMat>,
2221{
2222 type Item = StreamResult<Out>;
2223
2224 fn next(&mut self) -> Option<Self::Item> {
2225 if self.terminated {
2226 return None;
2227 }
2228 if let Err(error) = self.initialize() {
2229 self.terminated = true;
2230 return Some(Err(error));
2231 }
2232 match self
2233 .current
2234 .as_mut()
2235 .expect("lazy_source current stream initialized")
2236 .next()
2237 {
2238 Some(Ok(item)) => Some(Ok(item)),
2239 Some(Err(error)) => {
2240 self.terminated = true;
2241 Some(Err(error))
2242 }
2243 None => {
2244 self.terminated = true;
2245 None
2246 }
2247 }
2248 }
2249}
2250
2251impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
2252 fn drop(&mut self) {
2253 if !self.initialized
2254 && let Some(sender) = self.mat_sender.take()
2255 {
2256 let _ = sender.send(Err(StreamError::Failed(
2257 "lazy source was never materialized".into(),
2258 )));
2259 }
2260 }
2261}
2262
2263struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2264 create: Arc<F>,
2265 materializer: Materializer,
2266 current: Option<BoxStream<Out>>,
2267 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2268 initialized: bool,
2269 terminated: bool,
2270 _marker: PhantomData<fn() -> Fut>,
2271}
2272
2273impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
2274where
2275 Out: Send + 'static,
2276 InnerMat: Send + 'static,
2277 F: Fn() -> Fut,
2278 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2279{
2280 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2281 if let Some(sender) = self.mat_sender.take() {
2282 let _ = sender.send(result);
2283 }
2284 }
2285
2286 fn initialize(&mut self) -> StreamResult<()> {
2287 if self.initialized {
2288 return Ok(());
2289 }
2290 self.initialized = true;
2291 let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
2292 .and_then(flow::run_future_inline_or_spawn)
2293 {
2294 Ok(source) => source,
2295 Err(error) => {
2296 self.complete_mat(Err(error.clone()));
2297 return Err(error);
2298 }
2299 };
2300 match Arc::clone(&source.factory).create(&self.materializer) {
2301 Ok((stream, mat)) => {
2302 self.current = Some(stream);
2303 self.complete_mat(Ok(mat));
2304 Ok(())
2305 }
2306 Err(error) => {
2307 self.complete_mat(Err(error.clone()));
2308 Err(error)
2309 }
2310 }
2311 }
2312}
2313
2314impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
2315where
2316 Out: Send + 'static,
2317 InnerMat: Send + 'static,
2318 F: Fn() -> Fut,
2319 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2320{
2321 type Item = StreamResult<Out>;
2322
2323 fn next(&mut self) -> Option<Self::Item> {
2324 if self.terminated {
2325 return None;
2326 }
2327 if let Err(error) = self.initialize() {
2328 self.terminated = true;
2329 return Some(Err(error));
2330 }
2331 match self
2332 .current
2333 .as_mut()
2334 .expect("lazy_future_source current stream initialized")
2335 .next()
2336 {
2337 Some(Ok(item)) => Some(Ok(item)),
2338 Some(Err(error)) => {
2339 self.terminated = true;
2340 Some(Err(error))
2341 }
2342 None => {
2343 self.terminated = true;
2344 None
2345 }
2346 }
2347 }
2348}
2349
2350impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2351 fn drop(&mut self) {
2352 if !self.initialized
2353 && let Some(sender) = self.mat_sender.take()
2354 {
2355 let _ = sender.send(Err(StreamError::Failed(
2356 "lazy future source was never materialized".into(),
2357 )));
2358 }
2359 }
2360}
2361
2362fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
2363where
2364 Out: Send + 'static,
2365{
2366 let mut streams: VecDeque<_> = streams.into();
2367 let mut current = streams.pop_front();
2368 Box::new(std::iter::from_fn(move || {
2369 loop {
2370 match current.as_mut() {
2371 Some(stream) => match stream.next() {
2372 Some(item) => return Some(item),
2373 None => current = streams.pop_front(),
2374 },
2375 None => return None,
2376 }
2377 }
2378 }))
2379}
2380
2381fn concat_source_streams_lazy<Out, Mat>(
2382 initial: BoxStream<Out>,
2383 factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
2384 materializer: &Materializer,
2385) -> BoxStream<Out>
2386where
2387 Out: Send + 'static,
2388 Mat: Send + 'static,
2389{
2390 let mut current = Some(initial);
2391 let mut remaining: VecDeque<_> = factories.into();
2392 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
2393 Box::new(std::iter::from_fn(move || {
2394 loop {
2395 match current.as_mut() {
2396 Some(stream) => match stream.next() {
2397 Some(item) => return Some(item),
2398 None => {
2399 current = remaining.pop_front().map(|factory| {
2400 match factory.create(&materializer) {
2401 Ok((stream, _)) => stream,
2402 Err(error) => {
2403 Box::new(std::iter::once(Err(error))) as BoxStream<Out>
2404 }
2405 }
2406 });
2407 }
2408 },
2409 None => return None,
2410 }
2411 }
2412 }))
2413}
2414
2415fn or_else_source_stream<Out>(
2416 mut primary: BoxStream<Out>,
2417 mut secondary: BoxStream<Out>,
2418) -> BoxStream<Out>
2419where
2420 Out: Send + 'static,
2421{
2422 let mut primary_emitted = false;
2423 let mut using_secondary = false;
2424 Box::new(std::iter::from_fn(move || {
2425 loop {
2426 if using_secondary {
2427 return secondary.next();
2428 }
2429
2430 match primary.next() {
2431 Some(Ok(item)) => {
2432 primary_emitted = true;
2433 return Some(Ok(item));
2434 }
2435 Some(Err(error)) => return Some(Err(error)),
2436 None if primary_emitted => return None,
2437 None => using_secondary = true,
2438 }
2439 }
2440 }))
2441}
2442
2443fn interleave_source_streams<Out>(
2444 streams: Vec<BoxStream<Out>>,
2445 segment_size: usize,
2446 eager_close: bool,
2447) -> BoxStream<Out>
2448where
2449 Out: Send + 'static,
2450{
2451 if segment_size == 0 {
2452 return Box::new(std::iter::once(Err(StreamError::GraphValidation(
2453 "interleave segment size must be greater than zero".into(),
2454 ))));
2455 }
2456
2457 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
2458 let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
2459 let mut current = 0usize;
2460 let mut emitted = 0usize;
2461 Box::new(std::iter::from_fn(move || {
2462 loop {
2463 if streams.iter().all(Option::is_none) {
2464 return None;
2465 }
2466 if streams[current].is_none() {
2467 match next_active_source_stream(&streams, current) {
2468 Some(next) => {
2469 current = next;
2470 emitted = 0;
2471 }
2472 None => return None,
2473 }
2474 }
2475
2476 let Some(stream) = streams[current].as_mut() else {
2477 continue;
2478 };
2479 let next_item = pending[current].take().or_else(|| stream.next());
2480 match next_item {
2481 Some(Ok(item)) => {
2482 emitted += 1;
2483 if emitted == segment_size {
2484 emitted = 0;
2485 if let Some(next) = next_active_source_stream(&streams, current) {
2486 current = next;
2487 }
2488 }
2489 return Some(Ok(item));
2490 }
2491 Some(Err(error)) => return Some(Err(error)),
2492 None => {
2493 streams[current] = None;
2494 emitted = 0;
2495 if eager_close {
2496 return None;
2497 }
2498 match next_active_source_stream(&streams, current) {
2499 Some(next) => current = next,
2500 None => return None,
2501 }
2502 }
2503 }
2504 }
2505 }))
2506}
2507
2508fn next_active_source_stream<Out>(
2509 streams: &[Option<BoxStream<Out>>],
2510 current: usize,
2511) -> Option<usize>
2512where
2513 Out: Send + 'static,
2514{
2515 if streams.is_empty() {
2516 return None;
2517 }
2518 for offset in 1..=streams.len() {
2519 let index = (current + offset) % streams.len();
2520 if streams[index].is_some() {
2521 return Some(index);
2522 }
2523 }
2524 None
2525}