1use super::flow::{self, FlowTransform};
2use super::*;
3use crate::Attributes;
4use crate::context::SourceWithContext;
5
6#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
7pub struct NotUsed;
8
9type CombinedSourceFactory<Out> =
10 dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
11
12pub struct Keep;
13
14impl Keep {
15 pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
16 left
17 }
18
19 pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
20 right
21 }
22
23 pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
24 (left, right)
25 }
26
27 pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
28 NotUsed
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum SourceCombineStrategy {
34 Merge {
35 eager_complete: bool,
36 },
37 Concat,
38 Prioritized {
39 priorities: Vec<usize>,
40 eager_complete: bool,
41 },
42}
43
44#[derive(Clone)]
45pub struct MaybeHandle<T> {
46 value: Arc<Mutex<Option<StreamResult<T>>>>,
47}
48
49impl<T> MaybeHandle<T> {
50 #[must_use]
51 pub fn is_completed(&self) -> bool {
52 self.value.lock().expect("maybe handle poisoned").is_some()
53 }
54
55 pub fn complete(&self, item: T) -> StreamResult<()> {
56 self.settle(Ok(item))
57 }
58
59 pub fn fail(&self, error: StreamError) -> StreamResult<()> {
60 self.settle(Err(error))
61 }
62
63 fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
64 let mut value = self.value.lock().expect("maybe handle poisoned");
65 if value.is_some() {
66 return Err(StreamError::Failed("maybe source already completed".into()));
67 }
68 *value = Some(result);
69 Ok(())
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub struct Demand(u64);
75
76impl Demand {
77 pub const ZERO: Self = Self(0);
78 pub const ONE: Self = Self(1);
79 pub const MAX: Self = Self(u64::MAX);
80
81 #[must_use]
82 pub const fn new(available: u64) -> Self {
83 Self(available)
84 }
85
86 #[must_use]
87 pub const fn available(self) -> u64 {
88 self.0
89 }
90
91 #[must_use]
92 pub const fn is_unbounded(self) -> bool {
93 self.0 == u64::MAX
94 }
95
96 #[must_use]
97 pub const fn is_empty(self) -> bool {
98 self.0 == 0
99 }
100
101 #[must_use]
102 pub const fn saturating_add(self, rhs: Self) -> Self {
103 Self(self.0.saturating_add(rhs.0))
104 }
105
106 pub fn consume_one(&mut self) -> bool {
107 match self.0 {
108 0 => false,
109 u64::MAX => true,
110 _ => {
111 self.0 -= 1;
112 true
113 }
114 }
115 }
116}
117
118pub trait PushOutlet<T>: Send {
119 fn push(&mut self, item: T) -> StreamResult<Demand>;
120
121 fn complete(&mut self) -> StreamResult<()> {
122 Ok(())
123 }
124
125 fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
126 Err(cause)
127 }
128}
129
130#[derive(Clone)]
131pub struct Source<Out, Mat = NotUsed> {
132 pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
133 pub(crate) terminal_factory: Option<Arc<TerminalSourceFactory<Out, Mat>>>,
134 pub(super) hints: SourceHints,
135 pub(super) attributes: Attributes,
136 pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
139}
140
141pub(crate) type TerminalSourceFactory<Out, Mat> =
142 dyn Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)> + Send + Sync;
143
144impl<Out: Send + 'static> Source<Out, NotUsed> {
145 pub(super) fn from_factory<F>(factory: F) -> Self
146 where
147 F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
148 {
149 Self::from_factory_with_hints(factory, SourceHints::default())
150 }
151
152 fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
153 where
154 F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
155 {
156 Self::from_materialized_factory_with_hints(
157 move |_materializer| Ok((factory(), NotUsed)),
158 hints,
159 )
160 }
161
162 #[must_use]
163 pub fn empty() -> Self {
164 Self::from_factory_with_hints(
165 || Box::new(std::iter::empty()),
166 SourceHints::with_inline_micro(0),
167 )
168 }
169
170 #[must_use]
171 pub fn never() -> Self {
172 Self::from_materialized_factory_with_hints(
173 move |materializer| {
174 let state = Arc::clone(&materializer.inner.state);
175 Ok((
176 Box::new(std::iter::from_fn(move || {
177 loop {
178 if state.shutdown.load(Ordering::SeqCst) {
179 return Some(Err(StreamError::AbruptTermination));
180 }
181 if super::runtime::current_stream_cancelled()
182 .as_ref()
183 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
184 {
185 return Some(Err(StreamError::Cancelled));
186 }
187 std::thread::park_timeout(Duration::from_millis(1));
188 }
189 })),
190 NotUsed,
191 ))
192 },
193 SourceHints::default(),
194 )
195 }
196
197 #[must_use]
198 pub fn failed(error: StreamError) -> Self {
199 Self::from_factory_with_hints(
200 move || Box::new(std::iter::once(Err(error.clone()))),
201 SourceHints::with_inline_micro(0),
203 )
204 }
205
206 #[must_use]
207 pub fn future<F, Fut>(future: F) -> Self
208 where
209 F: Fn() -> Fut + Send + Sync + 'static,
210 Fut: Future<Output = StreamResult<Out>> + Send + 'static,
211 {
212 let future = Arc::new(future);
213 Self::from_factory(move || {
214 let future = Arc::clone(&future);
215 let mut emitted = false;
216 Box::new(std::iter::from_fn(move || {
217 if emitted {
218 return None;
219 }
220 emitted = true;
221 Some(
222 catch_unwind_failed("source future factory", || future())
223 .and_then(flow::run_future_inline_or_spawn),
224 )
225 }))
226 })
227 }
228
229 #[must_use]
230 pub fn future_source<F, Fut>(future: F) -> Self
231 where
232 F: Fn() -> Fut + Send + Sync + 'static,
233 Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
234 {
235 let future = Arc::new(future);
236 Self::from_materialized_factory(move |materializer| {
237 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
238 let future = Arc::clone(&future);
239 let mut current = None::<BoxStream<Out>>;
240 let mut initialized = false;
241 let mut terminated = false;
242 Ok((
243 Box::new(std::iter::from_fn(move || {
244 if terminated {
245 return None;
246 }
247
248 loop {
249 if let Some(stream) = current.as_mut() {
250 match stream.next() {
251 Some(item) => return Some(item),
252 None => {
253 terminated = true;
254 return None;
255 }
256 }
257 }
258
259 if initialized {
260 terminated = true;
261 return None;
262 }
263 initialized = true;
264 let source = match catch_unwind_failed("future_source factory", || future())
265 .and_then(flow::run_future_inline_or_spawn)
266 {
267 Ok(source) => source,
268 Err(error) => {
269 terminated = true;
270 return Some(Err(error));
271 }
272 };
273 current = Some(match Arc::clone(&source.factory).create(&materializer) {
274 Ok((stream, _)) => stream,
275 Err(error) => {
276 terminated = true;
277 return Some(Err(error));
278 }
279 });
280 }
281 })) as BoxStream<Out>,
282 NotUsed,
283 ))
284 })
285 }
286
287 #[must_use]
288 pub fn cycle<F, I>(factory: F) -> Self
289 where
290 F: Fn() -> I + Send + Sync + 'static,
291 I: IntoIterator<Item = Out>,
292 I::IntoIter: Send + 'static,
293 {
294 let factory = Arc::new(factory);
295 Self::from_factory(move || {
296 let factory = Arc::clone(&factory);
297 let mut current = None::<I::IntoIter>;
298 let mut terminated = false;
299 Box::new(std::iter::from_fn(move || {
300 if terminated {
301 return None;
302 }
303
304 if let Some(iter) = current.as_mut()
305 && let Some(item) = iter.next()
306 {
307 return Some(Ok(item));
308 }
309
310 let mut next = match catch_unwind_failed("cycle factory", || factory()) {
311 Ok(iterable) => iterable.into_iter(),
312 Err(error) => {
313 terminated = true;
314 return Some(Err(error));
315 }
316 };
317 match next.next() {
318 Some(item) => {
319 current = Some(next);
320 Some(Ok(item))
321 }
322 None => {
323 terminated = true;
324 Some(Err(StreamError::Failed("empty iterator".into())))
325 }
326 }
327 }))
328 })
329 }
330
331 #[must_use]
332 pub fn unfold<State, F>(initial: State, f: F) -> Self
333 where
334 State: Clone + Send + Sync + 'static,
335 F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
336 {
337 let f = Arc::new(f);
338 Self::from_factory(move || {
339 let f = Arc::clone(&f);
340 let mut state = Some(initial.clone());
341 let mut terminated = false;
342 Box::new(std::iter::from_fn(move || {
343 if terminated {
344 return None;
345 }
346 let current = state.take().expect("unfold state present");
347 match catch_unwind_failed("unfold function", || f(current)) {
348 Ok(Some((next_state, item))) => {
349 state = Some(next_state);
350 Some(Ok(item))
351 }
352 Ok(None) => {
353 terminated = true;
354 None
355 }
356 Err(error) => {
357 terminated = true;
358 Some(Err(error))
359 }
360 }
361 }))
362 })
363 }
364
365 #[must_use]
366 pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
367 where
368 State: Clone + Send + Sync + 'static,
369 F: Fn(State) -> Fut + Send + Sync + 'static,
370 Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
371 {
372 let f = Arc::new(f);
373 Self::from_factory(move || {
374 let f = Arc::clone(&f);
375 let mut state = Some(initial.clone());
376 let mut terminated = false;
377 Box::new(std::iter::from_fn(move || {
378 if terminated {
379 return None;
380 }
381 let current = state.take().expect("unfold_async state present");
382 match catch_unwind_failed("unfold_async factory", || f(current))
383 .and_then(flow::run_future_inline_or_spawn)
384 {
385 Ok(Some((next_state, item))) => {
386 state = Some(next_state);
387 Some(Ok(item))
388 }
389 Ok(None) => {
390 terminated = true;
391 None
392 }
393 Err(error) => {
394 terminated = true;
395 Some(Err(error))
396 }
397 }
398 }))
399 })
400 }
401
402 #[must_use]
403 pub fn unfold_resource<Resource, Create, Read, Close>(
404 create: Create,
405 read: Read,
406 close: Close,
407 ) -> Self
408 where
409 Resource: Send + 'static,
410 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
411 Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
412 Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
413 {
414 let create = Arc::new(create);
415 let read = Arc::new(read);
416 let close = Arc::new(close);
417 Self::from_factory(move || {
418 Box::new(UnfoldResourceStream {
419 create: Arc::clone(&create),
420 read: Arc::clone(&read),
421 close: Arc::clone(&close),
422 resource: None,
423 created: false,
424 terminated: false,
425 _marker: PhantomData,
426 })
427 })
428 }
429
430 #[must_use]
431 pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
432 create: Create,
433 read: Read,
434 close: Close,
435 ) -> Self
436 where
437 Resource: Send + 'static,
438 Create: Fn() -> CreateFut + Send + Sync + 'static,
439 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
440 Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
441 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
442 Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
443 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
444 {
445 let create = Arc::new(create);
446 let read = Arc::new(read);
447 let close = Arc::new(close);
448 Self::from_factory(move || {
449 Box::new(UnfoldResourceAsyncStream {
450 create: Arc::clone(&create),
451 read: Arc::clone(&read),
452 close: Arc::clone(&close),
453 resource: None,
454 created: false,
455 terminated: false,
456 _marker: PhantomData,
457 })
458 })
459 }
460
461 #[must_use]
462 pub fn lazy_single<F>(create: F) -> Self
463 where
464 F: Fn() -> Out + Send + Sync + 'static,
465 {
466 let create = Arc::new(create);
467 Self::from_factory(move || {
468 let create = Arc::clone(&create);
469 let mut emitted = false;
470 Box::new(std::iter::from_fn(move || {
471 if emitted {
472 return None;
473 }
474 emitted = true;
475 Some(catch_unwind_failed("lazy_single factory", || create()))
476 }))
477 })
478 }
479
480 #[must_use]
481 pub fn lazy_future<F, Fut>(create: F) -> Self
482 where
483 F: Fn() -> Fut + Send + Sync + 'static,
484 Fut: Future<Output = StreamResult<Out>> + Send + 'static,
485 {
486 let create = Arc::new(create);
487 Self::from_factory(move || {
488 let create = Arc::clone(&create);
489 let mut emitted = false;
490 Box::new(std::iter::from_fn(move || {
491 if emitted {
492 return None;
493 }
494 emitted = true;
495 Some(
496 catch_unwind_failed("lazy_future factory", || create())
497 .and_then(flow::run_future_inline_or_spawn),
498 )
499 }))
500 })
501 }
502
503 #[must_use]
504 pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
505 where
506 InnerMat: Send + 'static,
507 F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
508 {
509 let create = Arc::new(create);
510 Source::from_materialized_factory(move |materializer| {
511 let (sender, receiver) = oneshot::channel();
512 Ok((
513 Box::new(LazySourceStream {
514 create: Arc::clone(&create),
515 materializer: materializer
516 .with_name_prefix(materializer.name_prefix().to_owned()),
517 current: None,
518 mat_sender: Some(sender),
519 initialized: false,
520 terminated: false,
521 }) as BoxStream<Out>,
522 StreamCompletion::from_receiver(receiver, None),
523 ))
524 })
525 }
526
527 #[must_use]
528 pub fn lazy_future_source<InnerMat, F, Fut>(
529 create: F,
530 ) -> Source<Out, StreamCompletion<InnerMat>>
531 where
532 InnerMat: Send + 'static,
533 F: Fn() -> Fut + Send + Sync + 'static,
534 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
535 {
536 let create = Arc::new(create);
537 Source::from_materialized_factory(move |materializer| {
538 let (sender, receiver) = oneshot::channel();
539 Ok((
540 Box::new(LazyFutureSourceStream {
541 create: Arc::clone(&create),
542 materializer: materializer
543 .with_name_prefix(materializer.name_prefix().to_owned()),
544 current: None,
545 mat_sender: Some(sender),
546 initialized: false,
547 terminated: false,
548 _marker: PhantomData,
549 }) as BoxStream<Out>,
550 StreamCompletion::from_receiver(receiver, None),
551 ))
552 })
553 }
554
555 #[must_use]
556 pub fn from_fn_iter<F, I>(factory: F) -> Self
557 where
558 F: Fn() -> I + Send + Sync + 'static,
559 I: IntoIterator<Item = Out>,
560 I::IntoIter: Send + 'static,
561 {
562 Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
563 }
564}
565
566impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
567 fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
568 where
569 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
570 {
571 Self {
572 factory: Arc::new(FnSourceFactory(factory)),
573 terminal_factory: None,
574 hints,
575 attributes: Attributes::default(),
576 split_hook: None,
577 }
578 }
579
580 pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
581 where
582 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
583 {
584 Self::from_materialized_factory_with_hints(factory, SourceHints::default())
585 }
586
587 pub(crate) fn from_terminal_direct_materialized_factory<F, D>(
588 factory: F,
589 terminal_factory: D,
590 ) -> Self
591 where
592 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
593 D: Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)>
594 + Send
595 + Sync
596 + 'static,
597 {
598 Self {
599 factory: Arc::new(FnSourceFactory(factory)),
600 terminal_factory: Some(Arc::new(terminal_factory)),
601 hints: SourceHints::with_terminal_consumer_batch(),
602 attributes: Attributes::default(),
603 split_hook: None,
604 }
605 }
606
607 #[must_use]
608 pub fn as_source_with_context<Ctx, F>(
609 self,
610 extract_context: F,
611 ) -> SourceWithContext<Out, Ctx, Mat>
612 where
613 Ctx: Send + 'static,
614 F: Fn(&Out) -> Ctx + Send + Sync + 'static,
615 {
616 SourceWithContext::from_source(self.map(move |item| {
617 let context = extract_context(&item);
618 (item, context)
619 }))
620 }
621
622 #[must_use]
623 pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
624 where
625 Next: Send + 'static,
626 FlowMat: Send + 'static,
627 {
628 self.via_mat(flow, Keep::left)
629 }
630
631 #[must_use]
632 pub fn via_mat<Next, FlowMat, Combined, F>(
633 self,
634 flow: Flow<Out, Next, FlowMat>,
635 combine: F,
636 ) -> Source<Next, Combined>
637 where
638 Next: Send + 'static,
639 FlowMat: Send + 'static,
640 Combined: Send + 'static,
641 F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
642 {
643 let source = self.factory;
644 let transform = flow.transform;
645 let materialize_flow = flow.materialize;
646 let hints = self.hints.after_flow(flow.hints);
647 let combine = Arc::new(combine);
648 Source::from_materialized_factory_with_hints(
649 move |materializer| {
650 let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
651 let flow_mat = materialize_flow()?;
652 let stream = match &transform {
653 FlowTransform::Pure(transform) => transform(stream),
654 FlowTransform::Runtime(transform) => transform(stream, materializer)?,
655 };
656 Ok((stream, combine(source_mat, flow_mat)))
657 },
658 hints,
659 )
660 }
661
662 #[must_use]
663 pub fn via_mat_with<Next, FlowMat, Combined, F>(
664 self,
665 flow: Flow<Out, Next, FlowMat>,
666 combine: F,
667 ) -> Source<Next, Combined>
668 where
669 Next: Send + 'static,
670 FlowMat: Send + 'static,
671 Combined: Send + 'static,
672 F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
673 {
674 self.via_mat(flow, combine)
675 }
676
677 #[must_use]
678 pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
679 where
680 Next: Send + 'static,
681 F: Fn(Out) -> Next + Send + Sync + 'static,
682 {
683 let hints = self.hints.without_inline_micro();
684 Source {
685 factory: Arc::new(MapSourceFactory {
686 source: self.factory,
687 stage: f,
688 _marker: PhantomData,
689 }),
690 terminal_factory: None,
691 hints,
692 attributes: self.attributes,
693 split_hook: None,
694 }
695 }
696
697 #[must_use]
698 pub fn attributes(&self) -> &Attributes {
699 &self.attributes
700 }
701
702 #[must_use]
703 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
704 self.attributes = attributes;
705 self
706 }
707
708 #[must_use]
709 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
710 self.attributes = self.attributes.and(attributes);
711 self
712 }
713
714 #[must_use]
715 pub fn named(self, name: impl Into<String>) -> Self {
716 self.add_attributes(Attributes::named(name))
717 }
718
719 #[must_use]
726 pub fn async_boundary(self) -> Self {
727 self.via(Flow::identity().async_boundary())
728 }
729
730 #[must_use]
732 pub fn r#async(self) -> Self {
733 self.async_boundary()
734 }
735
736 #[must_use]
742 pub fn async_boundary_with_config(
743 self,
744 config: crate::graph::AsyncBoundaryExecutionConfig,
745 ) -> Self {
746 self.via(Flow::identity().async_boundary_with_config(config))
747 }
748
749 #[must_use]
751 pub fn async_boundary_with_buffer(self, buffer_size: usize) -> Self {
752 self.via(Flow::identity().async_boundary_with_buffer(buffer_size))
753 }
754
755 #[must_use]
756 pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
757 where
758 Next: Send + 'static,
759 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
760 {
761 self.via(Flow::identity().map_result(f))
762 }
763
764 #[must_use]
765 pub fn map_result_with_supervision<Next, F>(
766 self,
767 f: F,
768 decider: SupervisionDecider,
769 ) -> Source<Next, Mat>
770 where
771 Next: Send + 'static,
772 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
773 {
774 self.via(Flow::identity().map_result_with_supervision(f, decider))
775 }
776
777 #[must_use]
778 pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
779 where
780 F: Fn(&Out) -> bool + Send + Sync + 'static,
781 {
782 self.via(Flow::identity().filter(predicate))
783 }
784
785 #[must_use]
786 pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
787 where
788 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
789 {
790 self.via(Flow::identity().filter_result(predicate))
791 }
792
793 #[must_use]
794 pub fn filter_result_with_supervision<F>(
795 self,
796 predicate: F,
797 decider: SupervisionDecider,
798 ) -> Source<Out, Mat>
799 where
800 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
801 {
802 self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
803 }
804
805 #[must_use]
806 pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
807 where
808 F: Fn(&Out) -> bool + Send + Sync + 'static,
809 {
810 self.via(Flow::identity().filter_not(predicate))
811 }
812
813 #[must_use]
814 pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
815 where
816 Next: Send + 'static,
817 F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
818 {
819 self.via(Flow::identity().filter_map(f))
820 }
821
822 #[must_use]
823 pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
824 where
825 Next: Send + 'static,
826 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
827 {
828 self.via(Flow::identity().filter_map_result(f))
829 }
830
831 #[must_use]
832 pub fn filter_map_result_with_supervision<Next, F>(
833 self,
834 f: F,
835 decider: SupervisionDecider,
836 ) -> Source<Next, Mat>
837 where
838 Next: Send + 'static,
839 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
840 {
841 self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
842 }
843
844 #[must_use]
845 pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
846 where
847 Next: Send + 'static,
848 F: Fn(Out) -> I + Send + Sync + 'static,
849 I: IntoIterator<Item = Next>,
850 I::IntoIter: Send + 'static,
851 {
852 self.via(Flow::identity().map_concat(f))
853 }
854
855 #[must_use]
856 pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
857 where
858 Next: Send + 'static,
859 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
860 I: IntoIterator<Item = Next>,
861 I::IntoIter: Send + 'static,
862 {
863 self.via(Flow::identity().map_concat_result(f))
864 }
865
866 #[must_use]
867 pub fn map_concat_result_with_supervision<Next, F, I>(
868 self,
869 f: F,
870 decider: SupervisionDecider,
871 ) -> Source<Next, Mat>
872 where
873 Next: Send + 'static,
874 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
875 I: IntoIterator<Item = Next>,
876 I::IntoIter: Send + 'static,
877 {
878 self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
879 }
880
881 #[must_use]
882 pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
883 where
884 State: Clone + Send + Sync + 'static,
885 Next: Send + 'static,
886 F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
887 {
888 self.via(Flow::identity().stateful_map(seed, f))
889 }
890
891 #[must_use]
892 pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
893 where
894 State: Clone + Send + Sync + 'static,
895 Next: Send + 'static,
896 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
897 {
898 self.via(Flow::identity().stateful_map_result(seed, f))
899 }
900
901 #[must_use]
902 pub fn stateful_map_result_with_supervision<State, Next, F>(
903 self,
904 seed: State,
905 f: F,
906 decider: SupervisionDecider,
907 ) -> Source<Next, Mat>
908 where
909 State: Clone + Send + Sync + 'static,
910 Next: Send + 'static,
911 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
912 {
913 self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
914 }
915
916 #[must_use]
917 pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
918 where
919 State: Clone + Send + Sync + 'static,
920 Next: Send + 'static,
921 F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
922 I: IntoIterator<Item = Next>,
923 I::IntoIter: Send + 'static,
924 {
925 self.via(Flow::identity().stateful_map_concat(seed, f))
926 }
927
928 #[must_use]
929 pub fn stateful_map_concat_result<State, Next, F, I>(
930 self,
931 seed: State,
932 f: F,
933 ) -> Source<Next, Mat>
934 where
935 State: Clone + Send + Sync + 'static,
936 Next: Send + 'static,
937 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
938 I: IntoIterator<Item = Next>,
939 I::IntoIter: Send + 'static,
940 {
941 self.via(Flow::identity().stateful_map_concat_result(seed, f))
942 }
943
944 #[must_use]
945 pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
946 self,
947 seed: State,
948 f: F,
949 decider: SupervisionDecider,
950 ) -> Source<Next, Mat>
951 where
952 State: Clone + Send + Sync + 'static,
953 Next: Send + 'static,
954 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
955 I: IntoIterator<Item = Next>,
956 I::IntoIter: Send + 'static,
957 {
958 self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
959 }
960
961 #[must_use]
962 pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
963 where
964 Next: Send + 'static,
965 F: Fn(Out) -> Fut + Send + Sync + 'static,
966 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
967 {
968 self.via(Flow::identity().map_async(parallelism, f))
969 }
970
971 #[must_use]
972 pub fn map_async_with_supervision<Next, F, Fut>(
973 self,
974 parallelism: usize,
975 f: F,
976 decider: SupervisionDecider,
977 ) -> Source<Next, Mat>
978 where
979 Next: Send + 'static,
980 F: Fn(Out) -> Fut + Send + Sync + 'static,
981 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
982 {
983 self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
984 }
985
986 #[must_use]
987 pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
988 where
989 Next: Send + 'static,
990 F: Fn(Out) -> Fut + Send + Sync + 'static,
991 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
992 {
993 self.via(Flow::identity().map_async_unordered(parallelism, f))
994 }
995
996 #[must_use]
997 pub fn map_async_unordered_with_supervision<Next, F, Fut>(
998 self,
999 parallelism: usize,
1000 f: F,
1001 decider: SupervisionDecider,
1002 ) -> Source<Next, Mat>
1003 where
1004 Next: Send + 'static,
1005 F: Fn(Out) -> Fut + Send + Sync + 'static,
1006 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1007 {
1008 self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
1009 }
1010
1011 #[must_use]
1012 pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
1013 self,
1014 parallelism: usize,
1015 per_partition: usize,
1016 partition: Partition,
1017 f: F,
1018 ) -> Source<Next, Mat>
1019 where
1020 Key: Clone + Eq + Hash + Send + 'static,
1021 Next: Send + 'static,
1022 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
1023 F: Fn(Out) -> Fut + Send + Sync + 'static,
1024 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1025 {
1026 self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
1027 }
1028
1029 #[must_use]
1030 pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
1031 self.via(Flow::identity().prefix_and_tail(n))
1032 }
1033
1034 #[must_use]
1035 pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
1036 where
1037 Next: Send + 'static,
1038 FlowMat: Send + 'static,
1039 F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
1040 Out: Clone,
1041 {
1042 self.via(Flow::identity().flat_map_prefix(n, f))
1043 }
1044
1045 #[must_use]
1046 pub fn group_by<Key, F>(
1047 self,
1048 max_substreams: usize,
1049 f: F,
1050 allow_closed_substream_recreation: bool,
1051 ) -> Source<Source<Out>, Mat>
1052 where
1053 Key: Clone + Eq + Hash + Send + 'static,
1054 F: Fn(&Out) -> Key + Send + Sync + 'static,
1055 Out: Clone,
1056 {
1057 let batch_mode = if self.hints.inline_micro.is_some() && !allow_closed_substream_recreation
1058 {
1059 flow::GroupByBatchMode::FiniteEagerNoRecreate
1060 } else {
1061 flow::GroupByBatchMode::Immediate
1062 };
1063 self.via(Flow::identity().group_by_with_batching(
1064 max_substreams,
1065 f,
1066 allow_closed_substream_recreation,
1067 batch_mode,
1068 ))
1069 }
1070
1071 #[must_use]
1072 pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1073 where
1074 F: Fn(&Out) -> bool + Send + Sync + 'static,
1075 Out: Clone,
1076 {
1077 self.via(Flow::identity().split_when(predicate))
1078 }
1079
1080 #[must_use]
1081 pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1082 where
1083 F: Fn(&Out) -> bool + Send + Sync + 'static,
1084 Out: Clone,
1085 {
1086 self.via(Flow::identity().split_after(predicate))
1087 }
1088
1089 #[must_use]
1090 pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
1091 where
1092 Next: Send + 'static,
1093 NextMat: Send + 'static,
1094 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1095 {
1096 self.via(Flow::identity().flat_map_concat(f))
1097 }
1098
1099 #[must_use]
1100 pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
1101 where
1102 Next: Send + 'static,
1103 NextMat: Send + 'static,
1104 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1105 {
1106 self.via(Flow::identity().flat_map_merge(breadth, f))
1107 }
1108
1109 #[must_use]
1110 pub fn take(self, n: usize) -> Source<Out, Mat> {
1111 self.via(Flow::identity().take(n))
1112 }
1113
1114 #[must_use]
1115 pub fn drop(self, n: usize) -> Source<Out, Mat> {
1116 self.via(Flow::identity().drop(n))
1117 }
1118
1119 #[must_use]
1120 pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
1121 where
1122 F: Fn(&Out) -> bool + Send + Sync + 'static,
1123 {
1124 self.via(Flow::identity().take_while(predicate))
1125 }
1126
1127 #[must_use]
1128 pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
1129 where
1130 F: Fn(&Out) -> bool + Send + Sync + 'static,
1131 {
1132 self.via(Flow::identity().drop_while(predicate))
1133 }
1134
1135 #[must_use]
1136 pub fn limit(self, max: u64) -> Source<Out, Mat> {
1137 self.via(Flow::identity().limit(max))
1138 }
1139
1140 #[must_use]
1141 pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
1142 self.via(Flow::identity().grouped(size))
1143 }
1144
1145 #[must_use]
1146 pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1147 where
1148 State: Clone + Send + Sync + 'static,
1149 F: Fn(State, Out) -> State + Send + Sync + 'static,
1150 {
1151 self.via(Flow::identity().scan(seed, f))
1152 }
1153
1154 #[must_use]
1155 pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
1156 where
1157 State: Clone + Send + Sync + 'static,
1158 F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1159 Fut: Future<Output = StreamResult<State>> + Send + 'static,
1160 {
1161 self.via(Flow::identity().scan_async(seed, f))
1162 }
1163
1164 #[must_use]
1165 pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1166 where
1167 State: Clone + Send + Sync + 'static,
1168 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1169 {
1170 self.via(Flow::identity().scan_result(seed, f))
1171 }
1172
1173 #[must_use]
1174 pub fn scan_result_with_supervision<State, F>(
1175 self,
1176 seed: State,
1177 f: F,
1178 decider: SupervisionDecider,
1179 ) -> Source<State, Mat>
1180 where
1181 State: Clone + Send + Sync + 'static,
1182 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1183 {
1184 self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
1185 }
1186
1187 #[must_use]
1188 pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
1189 where
1190 Out: Clone,
1191 {
1192 self.via(Flow::identity().sliding(size, step))
1193 }
1194
1195 #[must_use]
1196 pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1197 where
1198 Acc: Clone + Send + Sync + 'static,
1199 F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1200 {
1201 self.via(Flow::identity().fold(zero, f))
1202 }
1203
1204 #[must_use]
1205 pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1206 where
1207 Acc: Clone + Send + Sync + 'static,
1208 F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1209 Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1210 {
1211 self.via(Flow::identity().fold_async(zero, f))
1212 }
1213
1214 #[must_use]
1215 pub fn map_with_resource<Resource, Next, Create, F, Close>(
1216 self,
1217 create: Create,
1218 f: F,
1219 close: Close,
1220 ) -> Source<Next, Mat>
1221 where
1222 Resource: Send + 'static,
1223 Next: Send + 'static,
1224 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1225 F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1226 Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1227 {
1228 self.via(Flow::identity().map_with_resource(create, f, close))
1229 }
1230
1231 #[must_use]
1232 pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1233 where
1234 Acc: Clone + Send + Sync + 'static,
1235 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1236 {
1237 self.via(Flow::identity().fold_result(zero, f))
1238 }
1239
1240 #[must_use]
1241 pub fn fold_result_with_supervision<Acc, F>(
1242 self,
1243 zero: Acc,
1244 f: F,
1245 decider: SupervisionDecider,
1246 ) -> Source<Acc, Mat>
1247 where
1248 Acc: Clone + Send + Sync + 'static,
1249 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1250 {
1251 self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
1252 }
1253
1254 #[must_use]
1255 pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
1256 where
1257 F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1258 {
1259 self.via(Flow::identity().reduce(f))
1260 }
1261
1262 #[must_use]
1263 pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
1264 where
1265 Out: Clone,
1266 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1267 {
1268 self.via(Flow::identity().reduce_result(f))
1269 }
1270
1271 #[must_use]
1272 pub fn reduce_result_with_supervision<F>(
1273 self,
1274 f: F,
1275 decider: SupervisionDecider,
1276 ) -> Source<Out, Mat>
1277 where
1278 Out: Clone,
1279 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1280 {
1281 self.via(Flow::identity().reduce_result_with_supervision(f, decider))
1282 }
1283
1284 #[must_use]
1285 pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
1286 where
1287 F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
1288 {
1289 self.via(Flow::identity().map_error(f))
1290 }
1291
1292 #[must_use]
1293 pub fn recover<F>(self, f: F) -> Source<Out, Mat>
1294 where
1295 F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
1296 {
1297 self.via(Flow::identity().recover(f))
1298 }
1299
1300 #[must_use]
1301 pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
1302 where
1303 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1304 {
1305 self.via(Flow::identity().recover_with(f))
1306 }
1307
1308 #[must_use]
1309 pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
1310 where
1311 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1312 {
1313 self.via(Flow::identity().recover_with_retries(retries, f))
1314 }
1315
1316 #[must_use]
1317 pub fn on_error_complete(self) -> Source<Out, Mat> {
1318 self.via(Flow::identity().on_error_complete())
1319 }
1320
1321 #[must_use]
1322 pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1323 where
1324 Mat2: Send + 'static,
1325 {
1326 let factory = self.factory;
1327 let hints = self.hints;
1328 let that_factory = that.factory;
1329 Source::from_materialized_factory_with_hints(
1330 move |materializer| {
1331 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1332 let secondary = match Arc::clone(&that_factory).create(materializer) {
1333 Ok((stream, _)) => stream,
1334 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1335 };
1336 Ok((concat_source_streams(vec![primary, secondary]), mat))
1337 },
1338 hints,
1339 )
1340 }
1341
1342 #[must_use]
1343 pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1344 where
1345 Mat2: Send + 'static,
1346 {
1347 let factory = self.factory;
1348 let hints = self.hints;
1349 let that_factory = that.factory;
1350 Source::from_materialized_factory_with_hints(
1351 move |materializer| {
1352 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1353 Ok((
1354 concat_source_streams_lazy(
1355 primary,
1356 vec![Arc::clone(&that_factory)],
1357 materializer,
1358 ),
1359 mat,
1360 ))
1361 },
1362 hints,
1363 )
1364 }
1365
1366 #[must_use]
1367 pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
1368 where
1369 Mat2: Send + 'static,
1370 I: IntoIterator<Item = Source<Out, Mat2>>,
1371 {
1372 let factory = self.factory;
1373 let hints = self.hints;
1374 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1375 Source::from_materialized_factory_with_hints(
1376 move |materializer| {
1377 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1378 Ok((
1379 concat_source_streams_lazy(primary, other_factories.clone(), materializer),
1380 mat,
1381 ))
1382 },
1383 hints,
1384 )
1385 }
1386
1387 #[must_use]
1388 pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1389 where
1390 Mat2: Send + 'static,
1391 {
1392 let factory = self.factory;
1393 let hints = self.hints;
1394 let that_factory = that.factory;
1395 Source::from_materialized_factory_with_hints(
1396 move |materializer| {
1397 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1398 let secondary = match Arc::clone(&that_factory).create(materializer) {
1399 Ok((stream, _)) => stream,
1400 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1401 };
1402 Ok((concat_source_streams(vec![secondary, primary]), mat))
1403 },
1404 hints,
1405 )
1406 }
1407
1408 #[must_use]
1409 pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1410 where
1411 Mat2: Send + 'static,
1412 {
1413 self.prepend(that)
1414 }
1415
1416 #[must_use]
1417 pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
1418 where
1419 Mat2: Send + 'static,
1420 {
1421 let factory = self.factory;
1422 let hints = self.hints;
1423 let secondary_factory = secondary.factory;
1424 Source::from_materialized_factory_with_hints(
1425 move |materializer| {
1426 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1427 let secondary = match Arc::clone(&secondary_factory).create(materializer) {
1428 Ok((stream, _)) => stream,
1429 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1430 };
1431 Ok((or_else_source_stream(primary, secondary), mat))
1432 },
1433 hints,
1434 )
1435 }
1436
1437 #[must_use]
1438 pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
1439 where
1440 Mat2: Send + 'static,
1441 {
1442 self.interleave_all([that], segment_size, false)
1443 }
1444
1445 #[must_use]
1446 pub fn interleave_all<Mat2, I>(
1447 self,
1448 those: I,
1449 segment_size: usize,
1450 eager_close: bool,
1451 ) -> Source<Out, Mat>
1452 where
1453 Mat2: Send + 'static,
1454 I: IntoIterator<Item = Source<Out, Mat2>>,
1455 {
1456 let factory = self.factory;
1457 let hints = self.hints;
1458 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1459 Source::from_materialized_factory_with_hints(
1460 move |materializer| {
1461 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1462 let mut streams = Vec::with_capacity(other_factories.len() + 1);
1463 streams.push(primary);
1464 for other in &other_factories {
1465 let stream = match Arc::clone(other).create(materializer) {
1466 Ok((stream, _)) => stream,
1467 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1468 };
1469 streams.push(stream);
1470 }
1471 Ok((
1472 interleave_source_streams(streams, segment_size, eager_close),
1473 mat,
1474 ))
1475 },
1476 hints,
1477 )
1478 }
1479
1480 #[must_use]
1481 pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1482 where
1483 Out: Ord,
1484 Mat2: Send + 'static,
1485 {
1486 self.via(Flow::identity().merge_sorted(that))
1487 }
1488
1489 #[must_use]
1490 pub fn merge_latest<Mat2>(
1491 self,
1492 that: Source<Out, Mat2>,
1493 eager_complete: bool,
1494 ) -> Source<Vec<Out>, Mat>
1495 where
1496 Out: Clone,
1497 Mat2: Send + 'static,
1498 {
1499 let factory = self.factory;
1500 let hints = self.hints;
1501 let that_factory = that.factory;
1502 Source::from_materialized_factory_with_hints(
1503 move |materializer| {
1504 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1505 let right = match Arc::clone(&that_factory).create(materializer) {
1506 Ok((stream, _)) => stream,
1507 Err(error) => {
1508 return Ok((
1509 Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
1510 mat,
1511 ));
1512 }
1513 };
1514 Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
1515 },
1516 hints,
1517 )
1518 }
1519
1520 #[must_use]
1521 pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
1522 where
1523 Mat2: Send + 'static,
1524 I: IntoIterator<Item = Source<Out, Mat2>>,
1525 {
1526 let factory = self.factory;
1527 let hints = self.hints;
1528 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1529 Source::from_materialized_factory_with_hints(
1530 move |materializer| {
1531 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1532 let mut streams = Vec::with_capacity(other_factories.len() + 1);
1533 streams.push(primary);
1534 for other in &other_factories {
1535 let stream = match Arc::clone(other).create(materializer) {
1536 Ok((stream, _)) => stream,
1537 Err(error) => {
1538 return Ok((
1539 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1540 mat,
1541 ));
1542 }
1543 };
1544 streams.push(stream);
1545 }
1546 Ok((merge_streams(streams, eager_complete), mat))
1547 },
1548 hints,
1549 )
1550 }
1551
1552 #[must_use]
1553 pub fn zip_with<Mat2, Out2, Next, F>(
1554 self,
1555 that: Source<Out2, Mat2>,
1556 combine: F,
1557 ) -> Source<Next, Mat>
1558 where
1559 Out2: Send + 'static,
1560 Next: Send + 'static,
1561 Mat2: Send + 'static,
1562 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1563 {
1564 self.via(Flow::identity().zip_with(that, combine))
1565 }
1566
1567 #[must_use]
1568 pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
1569 where
1570 Out: Clone,
1571 Out2: Clone + Send + 'static,
1572 Mat2: Send + 'static,
1573 {
1574 self.zip_latest_with(that, true, |left, right| (left, right))
1575 }
1576
1577 #[must_use]
1578 pub fn zip_latest_with<Mat2, Out2, Next, F>(
1579 self,
1580 that: Source<Out2, Mat2>,
1581 eager_complete: bool,
1582 combine: F,
1583 ) -> Source<Next, Mat>
1584 where
1585 Out: Clone,
1586 Out2: Clone + Send + 'static,
1587 Next: Send + 'static,
1588 Mat2: Send + 'static,
1589 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1590 {
1591 let factory = self.factory;
1592 let hints = self.hints;
1593 let that_factory = that.factory;
1594 let combine = Arc::new(combine);
1595 Source::from_materialized_factory_with_hints(
1596 move |materializer| {
1597 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1598 let right = match Arc::clone(&that_factory).create(materializer) {
1599 Ok((stream, _)) => stream,
1600 Err(error) => {
1601 return Ok((
1602 Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1603 mat,
1604 ));
1605 }
1606 };
1607 Ok((
1608 zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
1609 mat,
1610 ))
1611 },
1612 hints,
1613 )
1614 }
1615
1616 #[must_use]
1617 pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
1618 let factory = self.factory;
1619 let hints = self.hints;
1620 Source::from_materialized_factory_with_hints(
1621 move |materializer| {
1622 let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
1623 let mut index = 0_u64;
1624 Ok((
1625 Box::new(std::iter::from_fn(move || {
1626 stream.next().map(|item| {
1627 item.map(|value| {
1628 let pair = (value, index);
1629 index = index.wrapping_add(1);
1630 pair
1631 })
1632 })
1633 })) as BoxStream<(Out, u64)>,
1634 mat,
1635 ))
1636 },
1637 hints,
1638 )
1639 }
1640
1641 #[must_use]
1642 pub fn zip_all<Mat2, Out2>(
1643 self,
1644 that: Source<Out2, Mat2>,
1645 this_elem: Out,
1646 that_elem: Out2,
1647 ) -> Source<(Out, Out2), Mat>
1648 where
1649 Out: Clone + Sync,
1650 Out2: Clone + Send + Sync + 'static,
1651 Mat2: Send + 'static,
1652 {
1653 let factory = self.factory;
1654 let hints = self.hints;
1655 let that_factory = that.factory;
1656 Source::from_materialized_factory_with_hints(
1657 move |materializer| {
1658 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1659 let right = match Arc::clone(&that_factory).create(materializer) {
1660 Ok((stream, _)) => stream,
1661 Err(error) => {
1662 return Ok((
1663 Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
1664 mat,
1665 ));
1666 }
1667 };
1668 Ok((
1669 zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
1670 mat,
1671 ))
1672 },
1673 hints,
1674 )
1675 }
1676
1677 #[must_use]
1678 pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1679 where
1680 Out: Clone,
1681 SinkMat: Send + 'static,
1682 {
1683 self.via(Flow::identity().also_to(sink))
1684 }
1685
1686 #[must_use]
1687 pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
1688 where
1689 Out: Clone,
1690 SinkMat: Send + 'static,
1691 I: IntoIterator<Item = Sink<Out, SinkMat>>,
1692 {
1693 self.via(Flow::identity().also_to_all(sinks))
1694 }
1695
1696 #[must_use]
1697 pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
1698 where
1699 SinkMat: Send + 'static,
1700 F: Fn(&Out) -> bool + Send + Sync + 'static,
1701 {
1702 self.via(Flow::identity().divert_to(sink, predicate))
1703 }
1704
1705 #[must_use]
1706 pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1707 where
1708 Out: Clone,
1709 SinkMat: Send + 'static,
1710 {
1711 self.via(Flow::identity().wire_tap(sink))
1712 }
1713
1714 pub fn run_with<SinkMat: Send + 'static>(
1715 self,
1716 sink: Sink<Out, SinkMat>,
1717 ) -> StreamResult<SinkMat> {
1718 let fast_result = self
1721 .split_hook
1722 .as_ref()
1723 .zip(sink.fold_fp.as_deref())
1724 .and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
1725 if let Some(result) = fast_result {
1726 return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
1727 StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
1728 });
1729 }
1730 self.to_mat(sink, Keep::right).run()
1731 }
1732
1733 pub fn run_with_materializer<SinkMat: Send + 'static>(
1734 self,
1735 sink: Sink<Out, SinkMat>,
1736 materializer: &Materializer,
1737 ) -> StreamResult<SinkMat> {
1738 self.to_mat(sink, Keep::right)
1739 .run_with_materializer(materializer)
1740 }
1741
1742 #[must_use]
1743 pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
1744 where
1745 SinkMat: Send + 'static,
1746 {
1747 self.to_mat(sink, Keep::left)
1748 }
1749
1750 #[must_use]
1751 pub fn to_mat<SinkMat, Combined, F>(
1752 self,
1753 sink: Sink<Out, SinkMat>,
1754 combine: F,
1755 ) -> RunnableGraph<Combined>
1756 where
1757 SinkMat: Send + 'static,
1758 Combined: Send + 'static,
1759 F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
1760 {
1761 let factory = self.factory;
1762 let terminal_factory = self.terminal_factory;
1763 let hints = self.hints;
1764 let combine = Arc::new(combine);
1765 RunnableGraph::from_runner(move |materializer| {
1766 if let Some(terminal_factory) = &terminal_factory
1767 && let Some(fold_fp) = sink.fold_fp.as_deref()
1768 && fold_fp.supports_terminal_drain()
1769 {
1770 let (hook, source_mat) = terminal_factory(materializer)?;
1771 let sink_mat = fold_fp
1772 .try_register_terminal_drain(hook, materializer)
1773 .expect("terminal drain support advertised")
1774 .and_then(|mat| {
1775 mat.downcast::<SinkMat>().map(|boxed| *boxed).map_err(|_| {
1776 StreamError::Failed(
1777 "terminal fast path: unexpected mat type (internal error)".into(),
1778 )
1779 })
1780 })?;
1781 return Ok(combine(source_mat, sink_mat));
1782 }
1783 let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1784 let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
1785 let stream =
1786 runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
1787 sink.run_inline(stream, materializer)?
1788 } else {
1789 sink.run_from_source(stream, materializer, hints.runtime())?
1790 };
1791 Ok(combine(source_mat, sink_mat))
1792 })
1793 }
1794
1795 pub fn run_collect(self) -> StreamResult<Vec<Out>> {
1796 self.run_with(Sink::collect())?.wait()
1797 }
1798
1799 #[must_use]
1800 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
1801 where
1802 NextMat: Send + 'static,
1803 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
1804 {
1805 let factory = self.factory;
1806 let terminal_factory = self.terminal_factory;
1807 let hints = self.hints;
1808 let f = Arc::new(f);
1809 let factory_f = Arc::clone(&f);
1810 let mapped_terminal_factory = terminal_factory.map(|terminal_factory| {
1811 let f = Arc::clone(&f);
1812 Arc::new(move |materializer: &Materializer| {
1813 let (hook, mat) = terminal_factory(materializer)?;
1814 Ok((hook, f(mat)))
1815 }) as Arc<TerminalSourceFactory<Out, NextMat>>
1816 });
1817 Source {
1818 factory: Arc::new(FnSourceFactory(move |materializer: &Materializer| {
1819 let (stream, mat) = Arc::clone(&factory).create(materializer)?;
1820 Ok((stream, factory_f(mat)))
1821 })),
1822 terminal_factory: mapped_terminal_factory,
1823 hints,
1824 attributes: Attributes::default(),
1825 split_hook: None,
1826 }
1827 }
1828}
1829
1830impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
1831 #[must_use]
1832 pub fn combine<Mat1, Mat2, MatRest, I>(
1833 first: Source<Out, Mat1>,
1834 second: Source<Out, Mat2>,
1835 rest: I,
1836 strategy: SourceCombineStrategy,
1837 ) -> Source<Out, NotUsed>
1838 where
1839 Mat1: Send + 'static,
1840 Mat2: Send + 'static,
1841 MatRest: Send + 'static,
1842 I: IntoIterator<Item = Source<Out, MatRest>>,
1843 {
1844 let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
1845 Arc::new(move |materializer| {
1846 Arc::clone(&first.factory)
1847 .create(materializer)
1848 .map(|(stream, _)| stream)
1849 }),
1850 Arc::new(move |materializer| {
1851 Arc::clone(&second.factory)
1852 .create(materializer)
1853 .map(|(stream, _)| stream)
1854 }),
1855 ];
1856 factories.extend(rest.into_iter().map(|source| {
1857 Arc::new(move |materializer: &Materializer| {
1858 Arc::clone(&source.factory)
1859 .create(materializer)
1860 .map(|(stream, _)| stream)
1861 }) as Arc<CombinedSourceFactory<Out>>
1862 }));
1863 Source::from_materialized_factory(move |materializer| {
1864 let mut streams = Vec::with_capacity(factories.len());
1865 for factory in &factories {
1866 let stream = match factory(materializer) {
1867 Ok(stream) => stream,
1868 Err(error) => {
1869 return Ok((
1870 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1871 NotUsed,
1872 ));
1873 }
1874 };
1875 streams.push(stream);
1876 }
1877 let stream = match &strategy {
1878 SourceCombineStrategy::Merge { eager_complete } => {
1879 merge_streams(streams, *eager_complete)
1880 }
1881 SourceCombineStrategy::Concat => concat_source_streams(streams),
1882 SourceCombineStrategy::Prioritized {
1883 priorities,
1884 eager_complete,
1885 } => {
1886 if priorities.len() != streams.len() {
1887 return Err(StreamError::GraphValidation(format!(
1888 "combine priorities length {} did not match source count {}",
1889 priorities.len(),
1890 streams.len()
1891 )));
1892 }
1893 merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
1894 }
1895 };
1896 Ok((stream, NotUsed))
1897 })
1898 }
1899
1900 #[must_use]
1901 pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
1902 where
1903 I: IntoIterator<Item = Source<Out, Mat2>>,
1904 Mat2: Send + 'static,
1905 Out: Clone,
1906 {
1907 Self::zip_with_n(sources, |values| values)
1908 }
1909
1910 #[must_use]
1911 pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
1912 where
1913 I: IntoIterator<Item = Source<Out, Mat2>>,
1914 Mat2: Send + 'static,
1915 Next: Send + 'static,
1916 F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
1917 {
1918 let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
1919 let zipper = Arc::new(zipper);
1920 Source::from_materialized_factory(move |materializer| {
1921 let mut streams = Vec::with_capacity(factories.len());
1922 for factory in &factories {
1923 let stream = match Arc::clone(factory).create(materializer) {
1924 Ok((stream, _)) => stream,
1925 Err(error) => {
1926 return Ok((
1927 Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1928 NotUsed,
1929 ));
1930 }
1931 };
1932 streams.push(stream);
1933 }
1934 Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
1935 })
1936 }
1937
1938 #[must_use]
1939 pub fn merge_prioritized_n<Mat2, I>(
1940 sources_and_priorities: I,
1941 eager_complete: bool,
1942 ) -> Source<Out, NotUsed>
1943 where
1944 I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
1945 Mat2: Send + 'static,
1946 {
1947 let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
1948 if sources_and_priorities.is_empty() {
1949 return Source::empty();
1950 }
1951 let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
1952 .into_iter()
1953 .map(|(source, priority)| (source.factory, priority))
1954 .unzip();
1955 Source::from_materialized_factory(move |materializer| {
1956 let mut streams = Vec::with_capacity(factories.len());
1957 for factory in &factories {
1958 let stream = match Arc::clone(factory).create(materializer) {
1959 Ok((stream, _)) => stream,
1960 Err(error) => {
1961 return Ok((
1962 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1963 NotUsed,
1964 ));
1965 }
1966 };
1967 streams.push(stream);
1968 }
1969 Ok((
1970 merge_prioritized_streams(streams, priorities.clone(), eager_complete),
1971 NotUsed,
1972 ))
1973 })
1974 }
1975
1976 #[must_use]
1977 pub fn maybe() -> (MaybeHandle<Out>, Self) {
1978 let value = Arc::new(Mutex::new(None));
1979 let handle = MaybeHandle {
1980 value: Arc::clone(&value),
1981 };
1982 let source = Self::from_factory(move || {
1983 let result = value
1984 .lock()
1985 .expect("maybe source poisoned")
1986 .clone()
1987 .unwrap_or(Err(StreamError::MaybeIncomplete));
1988 Box::new(std::iter::once(result))
1989 });
1990 (handle, source)
1991 }
1992
1993 #[must_use]
1994 pub fn single(item: Out) -> Self {
1995 Self::from_factory_with_hints(
1996 move || Box::new(std::iter::once(Ok(item.clone()))),
1997 SourceHints::with_inline_micro(1),
1998 )
1999 }
2000
2001 #[must_use]
2002 pub fn repeat(item: Out) -> Self {
2003 Self::from_factory(move || {
2004 let item = item.clone();
2005 Box::new(std::iter::repeat_with(move || Ok(item.clone())))
2006 })
2007 }
2008
2009 #[must_use]
2010 pub fn from_iterable<I>(items: I) -> Self
2011 where
2012 I: IntoIterator<Item = Out>,
2013 {
2014 items.into_iter().collect()
2015 }
2016}
2017
2018impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
2019 fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
2020 let items: Arc<[Out]> = iter.into_iter().collect();
2023 let len = items.len();
2024 Self::from_factory_with_hints(
2025 move || {
2026 let items = Arc::clone(&items);
2027 let mut index = 0;
2028 Box::new(std::iter::from_fn(move || {
2029 let item = items.get(index)?.clone();
2030 index += 1;
2031 Some(Ok(item))
2032 }))
2033 },
2034 SourceHints::with_inline_micro(len),
2036 )
2037 }
2038}
2039
2040#[cfg(test)]
2045pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
2046 factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
2047 max_success_items: usize,
2048) -> Source<Out, NotUsed> {
2049 Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
2050}
2051
2052struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
2053where
2054 Create: Fn() -> StreamResult<Resource>,
2055 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2056 Close: Fn(Resource) -> StreamResult<()>,
2057{
2058 create: Arc<Create>,
2059 read: Arc<Read>,
2060 close: Arc<Close>,
2061 resource: Option<Resource>,
2062 created: bool,
2063 terminated: bool,
2064 _marker: PhantomData<fn() -> Out>,
2065}
2066
2067impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
2068where
2069 Create: Fn() -> StreamResult<Resource>,
2070 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2071 Close: Fn(Resource) -> StreamResult<()>,
2072{
2073 fn ensure_created(&mut self) -> StreamResult<()> {
2074 if self.created {
2075 return Ok(());
2076 }
2077 self.created = true;
2078 let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
2079 .and_then(|result| result)?;
2080 self.resource = Some(resource);
2081 Ok(())
2082 }
2083
2084 fn close_resource(&mut self) -> StreamResult<()> {
2085 match self.resource.take() {
2086 Some(resource) => {
2087 catch_unwind_failed("unfold_resource close", || (self.close)(resource))
2088 .and_then(|result| result)
2089 }
2090 None => Ok(()),
2091 }
2092 }
2093}
2094
2095impl<Resource, Out, Create, Read, Close> Iterator
2096 for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2097where
2098 Create: Fn() -> StreamResult<Resource>,
2099 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2100 Close: Fn(Resource) -> StreamResult<()>,
2101{
2102 type Item = StreamResult<Out>;
2103
2104 fn next(&mut self) -> Option<Self::Item> {
2105 if self.terminated {
2106 return None;
2107 }
2108 if let Err(error) = self.ensure_created() {
2109 self.terminated = true;
2110 return Some(Err(error));
2111 }
2112
2113 let result = {
2114 let resource = self
2115 .resource
2116 .as_mut()
2117 .expect("unfold_resource resource is open");
2118 catch_unwind_failed("unfold_resource read", || (self.read)(resource))
2119 .and_then(|result| result)
2120 };
2121
2122 match result {
2123 Ok(Some(item)) => Some(Ok(item)),
2124 Ok(None) => {
2125 self.terminated = true;
2126 match self.close_resource() {
2127 Ok(()) => None,
2128 Err(error) => Some(Err(error)),
2129 }
2130 }
2131 Err(read_error) => {
2132 self.terminated = true;
2133 let _ = self.close_resource();
2134 Some(Err(read_error))
2135 }
2136 }
2137 }
2138}
2139
2140impl<Resource, Out, Create, Read, Close> Drop
2141 for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2142where
2143 Create: Fn() -> StreamResult<Resource>,
2144 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2145 Close: Fn(Resource) -> StreamResult<()>,
2146{
2147 fn drop(&mut self) {
2148 let _ = self.close_resource();
2149 }
2150}
2151
2152type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
2153 fn() -> (Out, CreateFut, ReadFut, CloseFut);
2154
2155struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2156where
2157 Resource: Send + 'static,
2158 Create: Fn() -> CreateFut,
2159 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2160 Read: Fn(&mut Resource) -> ReadFut,
2161 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2162 Close: Fn(Resource) -> CloseFut,
2163 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2164{
2165 create: Arc<Create>,
2166 read: Arc<Read>,
2167 close: Arc<Close>,
2168 resource: Option<Resource>,
2169 created: bool,
2170 terminated: bool,
2171 _marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
2172}
2173
2174impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2175 UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2176where
2177 Resource: Send + 'static,
2178 Create: Fn() -> CreateFut,
2179 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2180 Read: Fn(&mut Resource) -> ReadFut,
2181 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2182 Close: Fn(Resource) -> CloseFut,
2183 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2184{
2185 fn ensure_created(&mut self) -> StreamResult<()> {
2186 if self.created {
2187 return Ok(());
2188 }
2189 self.created = true;
2190 let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
2191 .and_then(flow::run_future_inline_or_spawn)?;
2192 self.resource = Some(resource);
2193 Ok(())
2194 }
2195
2196 fn close_resource(&mut self) -> StreamResult<()> {
2197 match self.resource.take() {
2198 Some(resource) => {
2199 catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
2200 .and_then(flow::run_future_inline_or_spawn)
2201 }
2202 None => Ok(()),
2203 }
2204 }
2205}
2206
2207impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
2208 for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2209where
2210 Resource: Send + 'static,
2211 Out: Send + 'static,
2212 Create: Fn() -> CreateFut,
2213 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2214 Read: Fn(&mut Resource) -> ReadFut,
2215 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2216 Close: Fn(Resource) -> CloseFut,
2217 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2218{
2219 type Item = StreamResult<Out>;
2220
2221 fn next(&mut self) -> Option<Self::Item> {
2222 if self.terminated {
2223 return None;
2224 }
2225 if let Err(error) = self.ensure_created() {
2226 self.terminated = true;
2227 return Some(Err(error));
2228 }
2229
2230 let result = {
2231 let resource = self
2232 .resource
2233 .as_mut()
2234 .expect("unfold_resource_async resource is open");
2235 catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
2236 .and_then(flow::run_future_inline_or_spawn)
2237 };
2238
2239 match result {
2240 Ok(Some(item)) => Some(Ok(item)),
2241 Ok(None) => {
2242 self.terminated = true;
2243 match self.close_resource() {
2244 Ok(()) => None,
2245 Err(error) => Some(Err(error)),
2246 }
2247 }
2248 Err(read_error) => {
2249 self.terminated = true;
2250 let _ = self.close_resource();
2251 Some(Err(read_error))
2252 }
2253 }
2254 }
2255}
2256
2257impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
2258 for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2259where
2260 Resource: Send + 'static,
2261 Create: Fn() -> CreateFut,
2262 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2263 Read: Fn(&mut Resource) -> ReadFut,
2264 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2265 Close: Fn(Resource) -> CloseFut,
2266 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2267{
2268 fn drop(&mut self) {
2269 let _ = self.close_resource();
2270 }
2271}
2272
2273struct LazySourceStream<Out, InnerMat, F> {
2274 create: Arc<F>,
2275 materializer: Materializer,
2276 current: Option<BoxStream<Out>>,
2277 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2278 initialized: bool,
2279 terminated: bool,
2280}
2281
2282impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
2283where
2284 Out: Send + 'static,
2285 InnerMat: Send + 'static,
2286 F: Fn() -> Source<Out, InnerMat>,
2287{
2288 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2289 if let Some(sender) = self.mat_sender.take() {
2290 let _ = sender.send(result);
2291 }
2292 }
2293
2294 fn initialize(&mut self) -> StreamResult<()> {
2295 if self.initialized {
2296 return Ok(());
2297 }
2298 self.initialized = true;
2299 let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
2300 Ok(source) => source,
2301 Err(error) => {
2302 self.complete_mat(Err(error.clone()));
2303 return Err(error);
2304 }
2305 };
2306 match Arc::clone(&source.factory).create(&self.materializer) {
2307 Ok((stream, mat)) => {
2308 self.current = Some(stream);
2309 self.complete_mat(Ok(mat));
2310 Ok(())
2311 }
2312 Err(error) => {
2313 self.complete_mat(Err(error.clone()));
2314 Err(error)
2315 }
2316 }
2317 }
2318}
2319
2320impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
2321where
2322 Out: Send + 'static,
2323 InnerMat: Send + 'static,
2324 F: Fn() -> Source<Out, InnerMat>,
2325{
2326 type Item = StreamResult<Out>;
2327
2328 fn next(&mut self) -> Option<Self::Item> {
2329 if self.terminated {
2330 return None;
2331 }
2332 if let Err(error) = self.initialize() {
2333 self.terminated = true;
2334 return Some(Err(error));
2335 }
2336 match self
2337 .current
2338 .as_mut()
2339 .expect("lazy_source current stream initialized")
2340 .next()
2341 {
2342 Some(Ok(item)) => Some(Ok(item)),
2343 Some(Err(error)) => {
2344 self.terminated = true;
2345 Some(Err(error))
2346 }
2347 None => {
2348 self.terminated = true;
2349 None
2350 }
2351 }
2352 }
2353}
2354
2355impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
2356 fn drop(&mut self) {
2357 if !self.initialized
2358 && let Some(sender) = self.mat_sender.take()
2359 {
2360 let _ = sender.send(Err(StreamError::Failed(
2361 "lazy source was never materialized".into(),
2362 )));
2363 }
2364 }
2365}
2366
2367struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2368 create: Arc<F>,
2369 materializer: Materializer,
2370 current: Option<BoxStream<Out>>,
2371 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2372 initialized: bool,
2373 terminated: bool,
2374 _marker: PhantomData<fn() -> Fut>,
2375}
2376
2377impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
2378where
2379 Out: Send + 'static,
2380 InnerMat: Send + 'static,
2381 F: Fn() -> Fut,
2382 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2383{
2384 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2385 if let Some(sender) = self.mat_sender.take() {
2386 let _ = sender.send(result);
2387 }
2388 }
2389
2390 fn initialize(&mut self) -> StreamResult<()> {
2391 if self.initialized {
2392 return Ok(());
2393 }
2394 self.initialized = true;
2395 let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
2396 .and_then(flow::run_future_inline_or_spawn)
2397 {
2398 Ok(source) => source,
2399 Err(error) => {
2400 self.complete_mat(Err(error.clone()));
2401 return Err(error);
2402 }
2403 };
2404 match Arc::clone(&source.factory).create(&self.materializer) {
2405 Ok((stream, mat)) => {
2406 self.current = Some(stream);
2407 self.complete_mat(Ok(mat));
2408 Ok(())
2409 }
2410 Err(error) => {
2411 self.complete_mat(Err(error.clone()));
2412 Err(error)
2413 }
2414 }
2415 }
2416}
2417
2418impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
2419where
2420 Out: Send + 'static,
2421 InnerMat: Send + 'static,
2422 F: Fn() -> Fut,
2423 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2424{
2425 type Item = StreamResult<Out>;
2426
2427 fn next(&mut self) -> Option<Self::Item> {
2428 if self.terminated {
2429 return None;
2430 }
2431 if let Err(error) = self.initialize() {
2432 self.terminated = true;
2433 return Some(Err(error));
2434 }
2435 match self
2436 .current
2437 .as_mut()
2438 .expect("lazy_future_source current stream initialized")
2439 .next()
2440 {
2441 Some(Ok(item)) => Some(Ok(item)),
2442 Some(Err(error)) => {
2443 self.terminated = true;
2444 Some(Err(error))
2445 }
2446 None => {
2447 self.terminated = true;
2448 None
2449 }
2450 }
2451 }
2452}
2453
2454impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2455 fn drop(&mut self) {
2456 if !self.initialized
2457 && let Some(sender) = self.mat_sender.take()
2458 {
2459 let _ = sender.send(Err(StreamError::Failed(
2460 "lazy future source was never materialized".into(),
2461 )));
2462 }
2463 }
2464}
2465
2466fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
2467where
2468 Out: Send + 'static,
2469{
2470 let mut streams: VecDeque<_> = streams.into();
2471 let mut current = streams.pop_front();
2472 Box::new(std::iter::from_fn(move || {
2473 loop {
2474 match current.as_mut() {
2475 Some(stream) => match stream.next() {
2476 Some(item) => return Some(item),
2477 None => current = streams.pop_front(),
2478 },
2479 None => return None,
2480 }
2481 }
2482 }))
2483}
2484
2485fn concat_source_streams_lazy<Out, Mat>(
2486 initial: BoxStream<Out>,
2487 factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
2488 materializer: &Materializer,
2489) -> BoxStream<Out>
2490where
2491 Out: Send + 'static,
2492 Mat: Send + 'static,
2493{
2494 let mut current = Some(initial);
2495 let mut remaining: VecDeque<_> = factories.into();
2496 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
2497 Box::new(std::iter::from_fn(move || {
2498 loop {
2499 match current.as_mut() {
2500 Some(stream) => match stream.next() {
2501 Some(item) => return Some(item),
2502 None => {
2503 current = remaining.pop_front().map(|factory| {
2504 match factory.create(&materializer) {
2505 Ok((stream, _)) => stream,
2506 Err(error) => {
2507 Box::new(std::iter::once(Err(error))) as BoxStream<Out>
2508 }
2509 }
2510 });
2511 }
2512 },
2513 None => return None,
2514 }
2515 }
2516 }))
2517}
2518
2519fn or_else_source_stream<Out>(
2520 mut primary: BoxStream<Out>,
2521 mut secondary: BoxStream<Out>,
2522) -> BoxStream<Out>
2523where
2524 Out: Send + 'static,
2525{
2526 let mut primary_emitted = false;
2527 let mut using_secondary = false;
2528 Box::new(std::iter::from_fn(move || {
2529 loop {
2530 if using_secondary {
2531 return secondary.next();
2532 }
2533
2534 match primary.next() {
2535 Some(Ok(item)) => {
2536 primary_emitted = true;
2537 return Some(Ok(item));
2538 }
2539 Some(Err(error)) => return Some(Err(error)),
2540 None if primary_emitted => return None,
2541 None => using_secondary = true,
2542 }
2543 }
2544 }))
2545}
2546
2547fn interleave_source_streams<Out>(
2548 streams: Vec<BoxStream<Out>>,
2549 segment_size: usize,
2550 eager_close: bool,
2551) -> BoxStream<Out>
2552where
2553 Out: Send + 'static,
2554{
2555 if segment_size == 0 {
2556 return Box::new(std::iter::once(Err(StreamError::GraphValidation(
2557 "interleave segment size must be greater than zero".into(),
2558 ))));
2559 }
2560
2561 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
2562 let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
2563 let mut current = 0usize;
2564 let mut emitted = 0usize;
2565 Box::new(std::iter::from_fn(move || {
2566 loop {
2567 if streams.iter().all(Option::is_none) {
2568 return None;
2569 }
2570 if streams[current].is_none() {
2571 match next_active_source_stream(&streams, current) {
2572 Some(next) => {
2573 current = next;
2574 emitted = 0;
2575 }
2576 None => return None,
2577 }
2578 }
2579
2580 let Some(stream) = streams[current].as_mut() else {
2581 continue;
2582 };
2583 let next_item = pending[current].take().or_else(|| stream.next());
2584 match next_item {
2585 Some(Ok(item)) => {
2586 emitted += 1;
2587 if emitted == segment_size {
2588 emitted = 0;
2589 if let Some(next) = next_active_source_stream(&streams, current) {
2590 current = next;
2591 }
2592 }
2593 return Some(Ok(item));
2594 }
2595 Some(Err(error)) => return Some(Err(error)),
2596 None => {
2597 streams[current] = None;
2598 emitted = 0;
2599 if eager_close {
2600 return None;
2601 }
2602 match next_active_source_stream(&streams, current) {
2603 Some(next) => current = next,
2604 None => return None,
2605 }
2606 }
2607 }
2608 }
2609 }))
2610}
2611
2612fn next_active_source_stream<Out>(
2613 streams: &[Option<BoxStream<Out>>],
2614 current: usize,
2615) -> Option<usize>
2616where
2617 Out: Send + 'static,
2618{
2619 if streams.is_empty() {
2620 return None;
2621 }
2622 for offset in 1..=streams.len() {
2623 let index = (current + offset) % streams.len();
2624 if streams[index].is_some() {
2625 return Some(index);
2626 }
2627 }
2628 None
2629}