1use super::flow::{self, FlowTransform};
11use super::*;
12use crate::Attributes;
13use crate::context::SourceWithContext;
14
15#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
16pub struct NotUsed;
17
18type CombinedSourceFactory<Out> =
19 dyn Fn(&Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync;
20
21pub struct Keep;
22
23impl Keep {
24 pub fn left<Left, Right>(left: Left, _right: Right) -> Left {
25 left
26 }
27
28 pub fn right<Left, Right>(_left: Left, right: Right) -> Right {
29 right
30 }
31
32 pub fn both<Left, Right>(left: Left, right: Right) -> (Left, Right) {
33 (left, right)
34 }
35
36 pub fn none<Left, Right>(_left: Left, _right: Right) -> NotUsed {
37 NotUsed
38 }
39}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum SourceCombineStrategy {
43 Merge {
44 eager_complete: bool,
45 },
46 Concat,
47 Prioritized {
48 priorities: Vec<usize>,
49 eager_complete: bool,
50 },
51}
52
53#[derive(Clone)]
54pub struct MaybeHandle<T> {
55 value: Arc<Mutex<Option<StreamResult<T>>>>,
56}
57
58impl<T> MaybeHandle<T> {
59 #[must_use]
60 pub fn is_completed(&self) -> bool {
61 self.value.lock().expect("maybe handle poisoned").is_some()
62 }
63
64 pub fn complete(&self, item: T) -> StreamResult<()> {
65 self.settle(Ok(item))
66 }
67
68 pub fn fail(&self, error: StreamError) -> StreamResult<()> {
69 self.settle(Err(error))
70 }
71
72 fn settle(&self, result: StreamResult<T>) -> StreamResult<()> {
73 let mut value = self.value.lock().expect("maybe handle poisoned");
74 if value.is_some() {
75 return Err(StreamError::Failed("maybe source already completed".into()));
76 }
77 *value = Some(result);
78 Ok(())
79 }
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
83pub struct Demand(u64);
84
85impl Demand {
86 pub const ZERO: Self = Self(0);
87 pub const ONE: Self = Self(1);
88 pub const MAX: Self = Self(u64::MAX);
89
90 #[must_use]
91 pub const fn new(available: u64) -> Self {
92 Self(available)
93 }
94
95 #[must_use]
96 pub const fn available(self) -> u64 {
97 self.0
98 }
99
100 #[must_use]
101 pub const fn is_unbounded(self) -> bool {
102 self.0 == u64::MAX
103 }
104
105 #[must_use]
106 pub const fn is_empty(self) -> bool {
107 self.0 == 0
108 }
109
110 #[must_use]
111 pub const fn saturating_add(self, rhs: Self) -> Self {
112 Self(self.0.saturating_add(rhs.0))
113 }
114
115 pub fn consume_one(&mut self) -> bool {
116 match self.0 {
117 0 => false,
118 u64::MAX => true,
119 _ => {
120 self.0 -= 1;
121 true
122 }
123 }
124 }
125}
126
127pub trait PushOutlet<T>: Send {
128 fn push(&mut self, item: T) -> StreamResult<Demand>;
129
130 fn complete(&mut self) -> StreamResult<()> {
131 Ok(())
132 }
133
134 fn fail(&mut self, cause: StreamError) -> StreamResult<()> {
135 Err(cause)
136 }
137}
138
139#[derive(Clone)]
140pub struct Source<Out, Mat = NotUsed> {
141 pub(crate) factory: Arc<dyn SourceFactory<Out, Mat>>,
142 pub(crate) terminal_factory: Option<Arc<TerminalSourceFactory<Out, Mat>>>,
143 pub(super) hints: SourceHints,
144 pub(super) attributes: Attributes,
145 pub(crate) split_hook: Option<Arc<dyn SplitSegmentHookDyn>>,
148}
149
150pub(crate) type TerminalSourceFactory<Out, Mat> =
151 dyn Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)> + Send + Sync;
152
153impl<Out: Send + 'static> Source<Out, NotUsed> {
154 pub(super) fn from_factory<F>(factory: F) -> Self
155 where
156 F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
157 {
158 Self::from_factory_with_hints(factory, SourceHints::default())
159 }
160
161 fn from_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
162 where
163 F: Fn() -> BoxStream<Out> + Send + Sync + 'static,
164 {
165 Self::from_materialized_factory_with_hints(
166 move |_materializer| Ok((factory(), NotUsed)),
167 hints,
168 )
169 }
170
171 #[must_use]
172 pub fn empty() -> Self {
173 Self::from_factory_with_hints(
174 || Box::new(std::iter::empty()),
175 SourceHints::with_inline_micro(0),
176 )
177 }
178
179 #[must_use]
180 pub fn never() -> Self {
181 Self::from_materialized_factory_with_hints(
182 move |materializer| {
183 let state = Arc::clone(&materializer.inner.state);
184 Ok((
185 Box::new(std::iter::from_fn(move || {
186 loop {
187 if state.shutdown.load(Ordering::SeqCst) {
188 return Some(Err(StreamError::AbruptTermination));
189 }
190 if super::runtime::current_stream_cancelled()
191 .as_ref()
192 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
193 {
194 return Some(Err(StreamError::Cancelled));
195 }
196 std::thread::park_timeout(Duration::from_millis(1));
197 }
198 })),
199 NotUsed,
200 ))
201 },
202 SourceHints::default(),
203 )
204 }
205
206 #[must_use]
207 pub fn failed(error: StreamError) -> Self {
208 Self::from_factory_with_hints(
209 move || Box::new(std::iter::once(Err(error.clone()))),
210 SourceHints::with_inline_micro(0),
212 )
213 }
214
215 #[must_use]
216 pub fn future<F, Fut>(future: F) -> Self
217 where
218 F: Fn() -> Fut + Send + Sync + 'static,
219 Fut: Future<Output = StreamResult<Out>> + Send + 'static,
220 {
221 let future = Arc::new(future);
222 Self::from_factory(move || {
223 let future = Arc::clone(&future);
224 let mut emitted = false;
225 Box::new(std::iter::from_fn(move || {
226 if emitted {
227 return None;
228 }
229 emitted = true;
230 Some(
231 catch_unwind_failed("source future factory", || future())
232 .and_then(flow::run_future_inline_or_spawn),
233 )
234 }))
235 })
236 }
237
238 #[must_use]
239 pub fn future_source<F, Fut>(future: F) -> Self
240 where
241 F: Fn() -> Fut + Send + Sync + 'static,
242 Fut: Future<Output = StreamResult<Source<Out>>> + Send + 'static,
243 {
244 let future = Arc::new(future);
245 Self::from_materialized_factory(move |materializer| {
246 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
247 let future = Arc::clone(&future);
248 let mut current = None::<BoxStream<Out>>;
249 let mut initialized = false;
250 let mut terminated = false;
251 Ok((
252 Box::new(std::iter::from_fn(move || {
253 if terminated {
254 return None;
255 }
256
257 loop {
258 if let Some(stream) = current.as_mut() {
259 match stream.next() {
260 Some(item) => return Some(item),
261 None => {
262 terminated = true;
263 return None;
264 }
265 }
266 }
267
268 if initialized {
269 terminated = true;
270 return None;
271 }
272 initialized = true;
273 let source = match catch_unwind_failed("future_source factory", || future())
274 .and_then(flow::run_future_inline_or_spawn)
275 {
276 Ok(source) => source,
277 Err(error) => {
278 terminated = true;
279 return Some(Err(error));
280 }
281 };
282 current = Some(match Arc::clone(&source.factory).create(&materializer) {
283 Ok((stream, _)) => stream,
284 Err(error) => {
285 terminated = true;
286 return Some(Err(error));
287 }
288 });
289 }
290 })) as BoxStream<Out>,
291 NotUsed,
292 ))
293 })
294 }
295
296 #[must_use]
297 pub fn cycle<F, I>(factory: F) -> Self
298 where
299 F: Fn() -> I + Send + Sync + 'static,
300 I: IntoIterator<Item = Out>,
301 I::IntoIter: Send + 'static,
302 {
303 let factory = Arc::new(factory);
304 Self::from_factory(move || {
305 let factory = Arc::clone(&factory);
306 let mut current = None::<I::IntoIter>;
307 let mut terminated = false;
308 Box::new(std::iter::from_fn(move || {
309 if terminated {
310 return None;
311 }
312
313 if let Some(iter) = current.as_mut()
314 && let Some(item) = iter.next()
315 {
316 return Some(Ok(item));
317 }
318
319 let mut next = match catch_unwind_failed("cycle factory", || factory()) {
320 Ok(iterable) => iterable.into_iter(),
321 Err(error) => {
322 terminated = true;
323 return Some(Err(error));
324 }
325 };
326 match next.next() {
327 Some(item) => {
328 current = Some(next);
329 Some(Ok(item))
330 }
331 None => {
332 terminated = true;
333 Some(Err(StreamError::Failed("empty iterator".into())))
334 }
335 }
336 }))
337 })
338 }
339
340 #[must_use]
341 pub fn unfold<State, F>(initial: State, f: F) -> Self
342 where
343 State: Clone + Send + Sync + 'static,
344 F: Fn(State) -> Option<(State, Out)> + Send + Sync + 'static,
345 {
346 let f = Arc::new(f);
347 Self::from_factory(move || {
348 let f = Arc::clone(&f);
349 let mut state = Some(initial.clone());
350 let mut terminated = false;
351 Box::new(std::iter::from_fn(move || {
352 if terminated {
353 return None;
354 }
355 let current = state.take().expect("unfold state present");
356 match catch_unwind_failed("unfold function", || f(current)) {
357 Ok(Some((next_state, item))) => {
358 state = Some(next_state);
359 Some(Ok(item))
360 }
361 Ok(None) => {
362 terminated = true;
363 None
364 }
365 Err(error) => {
366 terminated = true;
367 Some(Err(error))
368 }
369 }
370 }))
371 })
372 }
373
374 #[must_use]
375 pub fn unfold_async<State, F, Fut>(initial: State, f: F) -> Self
376 where
377 State: Clone + Send + Sync + 'static,
378 F: Fn(State) -> Fut + Send + Sync + 'static,
379 Fut: Future<Output = StreamResult<Option<(State, Out)>>> + Send + 'static,
380 {
381 let f = Arc::new(f);
382 Self::from_factory(move || {
383 let f = Arc::clone(&f);
384 let mut state = Some(initial.clone());
385 let mut terminated = false;
386 Box::new(std::iter::from_fn(move || {
387 if terminated {
388 return None;
389 }
390 let current = state.take().expect("unfold_async state present");
391 match catch_unwind_failed("unfold_async factory", || f(current))
392 .and_then(flow::run_future_inline_or_spawn)
393 {
394 Ok(Some((next_state, item))) => {
395 state = Some(next_state);
396 Some(Ok(item))
397 }
398 Ok(None) => {
399 terminated = true;
400 None
401 }
402 Err(error) => {
403 terminated = true;
404 Some(Err(error))
405 }
406 }
407 }))
408 })
409 }
410
411 #[must_use]
412 pub fn unfold_resource<Resource, Create, Read, Close>(
413 create: Create,
414 read: Read,
415 close: Close,
416 ) -> Self
417 where
418 Resource: Send + 'static,
419 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
420 Read: Fn(&mut Resource) -> StreamResult<Option<Out>> + Send + Sync + 'static,
421 Close: Fn(Resource) -> StreamResult<()> + Send + Sync + 'static,
422 {
423 let create = Arc::new(create);
424 let read = Arc::new(read);
425 let close = Arc::new(close);
426 Self::from_factory(move || {
427 Box::new(UnfoldResourceStream {
428 create: Arc::clone(&create),
429 read: Arc::clone(&read),
430 close: Arc::clone(&close),
431 resource: None,
432 created: false,
433 terminated: false,
434 _marker: PhantomData,
435 })
436 })
437 }
438
439 #[must_use]
440 pub fn unfold_resource_async<Resource, Create, CreateFut, Read, ReadFut, Close, CloseFut>(
441 create: Create,
442 read: Read,
443 close: Close,
444 ) -> Self
445 where
446 Resource: Send + 'static,
447 Create: Fn() -> CreateFut + Send + Sync + 'static,
448 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
449 Read: Fn(&mut Resource) -> ReadFut + Send + Sync + 'static,
450 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
451 Close: Fn(Resource) -> CloseFut + Send + Sync + 'static,
452 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
453 {
454 let create = Arc::new(create);
455 let read = Arc::new(read);
456 let close = Arc::new(close);
457 Self::from_factory(move || {
458 Box::new(UnfoldResourceAsyncStream {
459 create: Arc::clone(&create),
460 read: Arc::clone(&read),
461 close: Arc::clone(&close),
462 resource: None,
463 created: false,
464 terminated: false,
465 _marker: PhantomData,
466 })
467 })
468 }
469
470 #[must_use]
471 pub fn lazy_single<F>(create: F) -> Self
472 where
473 F: Fn() -> Out + Send + Sync + 'static,
474 {
475 let create = Arc::new(create);
476 Self::from_factory(move || {
477 let create = Arc::clone(&create);
478 let mut emitted = false;
479 Box::new(std::iter::from_fn(move || {
480 if emitted {
481 return None;
482 }
483 emitted = true;
484 Some(catch_unwind_failed("lazy_single factory", || create()))
485 }))
486 })
487 }
488
489 #[must_use]
490 pub fn lazy_future<F, Fut>(create: F) -> Self
491 where
492 F: Fn() -> Fut + Send + Sync + 'static,
493 Fut: Future<Output = StreamResult<Out>> + Send + 'static,
494 {
495 let create = Arc::new(create);
496 Self::from_factory(move || {
497 let create = Arc::clone(&create);
498 let mut emitted = false;
499 Box::new(std::iter::from_fn(move || {
500 if emitted {
501 return None;
502 }
503 emitted = true;
504 Some(
505 catch_unwind_failed("lazy_future factory", || create())
506 .and_then(flow::run_future_inline_or_spawn),
507 )
508 }))
509 })
510 }
511
512 #[must_use]
513 pub fn lazy_source<InnerMat, F>(create: F) -> Source<Out, StreamCompletion<InnerMat>>
514 where
515 InnerMat: Send + 'static,
516 F: Fn() -> Source<Out, InnerMat> + Send + Sync + 'static,
517 {
518 let create = Arc::new(create);
519 Source::from_materialized_factory(move |materializer| {
520 let (sender, receiver) = oneshot::channel();
521 Ok((
522 Box::new(LazySourceStream {
523 create: Arc::clone(&create),
524 materializer: materializer
525 .with_name_prefix(materializer.name_prefix().to_owned()),
526 current: None,
527 mat_sender: Some(sender),
528 initialized: false,
529 terminated: false,
530 }) as BoxStream<Out>,
531 StreamCompletion::from_receiver(receiver, None),
532 ))
533 })
534 }
535
536 #[must_use]
537 pub fn lazy_future_source<InnerMat, F, Fut>(
538 create: F,
539 ) -> Source<Out, StreamCompletion<InnerMat>>
540 where
541 InnerMat: Send + 'static,
542 F: Fn() -> Fut + Send + Sync + 'static,
543 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
544 {
545 let create = Arc::new(create);
546 Source::from_materialized_factory(move |materializer| {
547 let (sender, receiver) = oneshot::channel();
548 Ok((
549 Box::new(LazyFutureSourceStream {
550 create: Arc::clone(&create),
551 materializer: materializer
552 .with_name_prefix(materializer.name_prefix().to_owned()),
553 current: None,
554 mat_sender: Some(sender),
555 initialized: false,
556 terminated: false,
557 _marker: PhantomData,
558 }) as BoxStream<Out>,
559 StreamCompletion::from_receiver(receiver, None),
560 ))
561 })
562 }
563
564 #[must_use]
565 pub fn from_fn_iter<F, I>(factory: F) -> Self
566 where
567 F: Fn() -> I + Send + Sync + 'static,
568 I: IntoIterator<Item = Out>,
569 I::IntoIter: Send + 'static,
570 {
571 Self::from_factory(move || Box::new(factory().into_iter().map(Ok)))
572 }
573}
574
575impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
576 fn from_materialized_factory_with_hints<F>(factory: F, hints: SourceHints) -> Self
577 where
578 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
579 {
580 Self {
581 factory: Arc::new(FnSourceFactory(factory)),
582 terminal_factory: None,
583 hints,
584 attributes: Attributes::default(),
585 split_hook: None,
586 }
587 }
588
589 pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
590 where
591 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
592 {
593 Self::from_materialized_factory_with_hints(factory, SourceHints::default())
594 }
595
596 pub(crate) fn from_terminal_direct_materialized_factory<F, D>(
597 factory: F,
598 terminal_factory: D,
599 ) -> Self
600 where
601 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync + 'static,
602 D: Fn(&Materializer) -> StreamResult<(Arc<dyn TerminalSourceHookDyn<Out>>, Mat)>
603 + Send
604 + Sync
605 + 'static,
606 {
607 Self {
608 factory: Arc::new(FnSourceFactory(factory)),
609 terminal_factory: Some(Arc::new(terminal_factory)),
610 hints: SourceHints::with_terminal_consumer_batch(),
611 attributes: Attributes::default(),
612 split_hook: None,
613 }
614 }
615
616 #[must_use]
617 pub fn as_source_with_context<Ctx, F>(
618 self,
619 extract_context: F,
620 ) -> SourceWithContext<Out, Ctx, Mat>
621 where
622 Ctx: Send + 'static,
623 F: Fn(&Out) -> Ctx + Send + Sync + 'static,
624 {
625 SourceWithContext::from_source(self.map(move |item| {
626 let context = extract_context(&item);
627 (item, context)
628 }))
629 }
630
631 #[must_use]
632 pub fn via<Next, FlowMat>(self, flow: Flow<Out, Next, FlowMat>) -> Source<Next, Mat>
633 where
634 Next: Send + 'static,
635 FlowMat: Send + 'static,
636 {
637 self.via_mat(flow, Keep::left)
638 }
639
640 #[must_use]
641 pub fn via_mat<Next, FlowMat, Combined, F>(
642 self,
643 flow: Flow<Out, Next, FlowMat>,
644 combine: F,
645 ) -> Source<Next, Combined>
646 where
647 Next: Send + 'static,
648 FlowMat: Send + 'static,
649 Combined: Send + 'static,
650 F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
651 {
652 let source = self.factory;
653 let transform = flow.transform;
654 let materialize_flow = flow.materialize;
655 let hints = self.hints.after_flow(flow.hints);
656 let combine = Arc::new(combine);
657 Source::from_materialized_factory_with_hints(
658 move |materializer| {
659 let (stream, source_mat) = Arc::clone(&source).create(materializer)?;
660 let flow_mat = materialize_flow()?;
661 let stream = match &transform {
662 FlowTransform::Pure(transform) => transform(stream),
663 FlowTransform::Runtime(transform) => transform(stream, materializer)?,
664 };
665 Ok((stream, combine(source_mat, flow_mat)))
666 },
667 hints,
668 )
669 }
670
671 #[must_use]
672 pub fn via_mat_with<Next, FlowMat, Combined, F>(
673 self,
674 flow: Flow<Out, Next, FlowMat>,
675 combine: F,
676 ) -> Source<Next, Combined>
677 where
678 Next: Send + 'static,
679 FlowMat: Send + 'static,
680 Combined: Send + 'static,
681 F: Fn(Mat, FlowMat) -> Combined + Send + Sync + 'static,
682 {
683 self.via_mat(flow, combine)
684 }
685
686 #[must_use]
687 pub fn map<Next, F>(self, f: F) -> Source<Next, Mat>
688 where
689 Next: Send + 'static,
690 F: Fn(Out) -> Next + Send + Sync + 'static,
691 {
692 let hints = self.hints.without_inline_micro();
693 Source {
694 factory: Arc::new(MapSourceFactory {
695 source: self.factory,
696 stage: f,
697 _marker: PhantomData,
698 }),
699 terminal_factory: None,
700 hints,
701 attributes: self.attributes,
702 split_hook: None,
703 }
704 }
705
706 #[must_use]
707 pub fn attributes(&self) -> &Attributes {
708 &self.attributes
709 }
710
711 #[must_use]
712 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
713 self.attributes = attributes;
714 self
715 }
716
717 #[must_use]
718 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
719 self.attributes = self.attributes.and(attributes);
720 self
721 }
722
723 #[must_use]
724 pub fn named(self, name: impl Into<String>) -> Self {
725 self.add_attributes(Attributes::named(name))
726 }
727
728 #[must_use]
735 pub fn async_boundary(self) -> Self {
736 self.via(Flow::identity().async_boundary())
737 }
738
739 #[must_use]
741 pub fn r#async(self) -> Self {
742 self.async_boundary()
743 }
744
745 #[must_use]
751 pub fn async_boundary_with_config(
752 self,
753 config: crate::graph::AsyncBoundaryExecutionConfig,
754 ) -> Self {
755 self.via(Flow::identity().async_boundary_with_config(config))
756 }
757
758 #[must_use]
760 pub fn async_boundary_with_buffer(self, buffer_size: usize) -> Self {
761 self.via(Flow::identity().async_boundary_with_buffer(buffer_size))
762 }
763
764 #[must_use]
765 pub fn try_map<Next, F>(self, f: F) -> Source<Next, Mat>
766 where
767 Next: Send + 'static,
768 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
769 {
770 self.via(Flow::identity().try_map(f))
771 }
772
773 #[must_use]
775 #[deprecated(
776 since = "0.9.0",
777 note = "renamed to `try_map` (idiomatic); the `_result` name still works"
778 )]
779 pub fn map_result<Next, F>(self, f: F) -> Source<Next, Mat>
780 where
781 Next: Send + 'static,
782 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
783 {
784 self.try_map(f)
785 }
786
787 #[must_use]
788 pub fn map_result_with_supervision<Next, F>(
789 self,
790 f: F,
791 decider: SupervisionDecider,
792 ) -> Source<Next, Mat>
793 where
794 Next: Send + 'static,
795 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
796 {
797 self.via(Flow::identity().map_result_with_supervision(f, decider))
798 }
799
800 #[must_use]
801 pub fn filter<F>(self, predicate: F) -> Source<Out, Mat>
802 where
803 F: Fn(&Out) -> bool + Send + Sync + 'static,
804 {
805 self.via(Flow::identity().filter(predicate))
806 }
807
808 #[must_use]
809 pub fn try_filter<F>(self, predicate: F) -> Source<Out, Mat>
810 where
811 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
812 {
813 self.via(Flow::identity().try_filter(predicate))
814 }
815
816 #[must_use]
818 #[deprecated(
819 since = "0.9.0",
820 note = "renamed to `try_filter` (idiomatic); the `_result` name still works"
821 )]
822 pub fn filter_result<F>(self, predicate: F) -> Source<Out, Mat>
823 where
824 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
825 {
826 self.try_filter(predicate)
827 }
828
829 #[must_use]
830 pub fn filter_result_with_supervision<F>(
831 self,
832 predicate: F,
833 decider: SupervisionDecider,
834 ) -> Source<Out, Mat>
835 where
836 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
837 {
838 self.via(Flow::identity().filter_result_with_supervision(predicate, decider))
839 }
840
841 #[must_use]
842 pub fn filter_not<F>(self, predicate: F) -> Source<Out, Mat>
843 where
844 F: Fn(&Out) -> bool + Send + Sync + 'static,
845 {
846 self.via(Flow::identity().filter_not(predicate))
847 }
848
849 #[must_use]
850 pub fn filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
851 where
852 Next: Send + 'static,
853 F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
854 {
855 self.via(Flow::identity().filter_map(f))
856 }
857
858 #[must_use]
859 pub fn try_filter_map<Next, F>(self, f: F) -> Source<Next, Mat>
860 where
861 Next: Send + 'static,
862 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
863 {
864 self.via(Flow::identity().try_filter_map(f))
865 }
866
867 #[must_use]
869 #[deprecated(
870 since = "0.9.0",
871 note = "renamed to `try_filter_map` (idiomatic); the `_result` name still works"
872 )]
873 pub fn filter_map_result<Next, F>(self, f: F) -> Source<Next, Mat>
874 where
875 Next: Send + 'static,
876 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
877 {
878 self.try_filter_map(f)
879 }
880
881 #[must_use]
882 pub fn filter_map_result_with_supervision<Next, F>(
883 self,
884 f: F,
885 decider: SupervisionDecider,
886 ) -> Source<Next, Mat>
887 where
888 Next: Send + 'static,
889 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
890 {
891 self.via(Flow::identity().filter_map_result_with_supervision(f, decider))
892 }
893
894 #[must_use]
895 pub fn map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
896 where
897 Next: Send + 'static,
898 F: Fn(Out) -> I + Send + Sync + 'static,
899 I: IntoIterator<Item = Next>,
900 I::IntoIter: Send + 'static,
901 {
902 self.via(Flow::identity().map_concat(f))
903 }
904
905 #[must_use]
906 pub fn try_map_concat<Next, F, I>(self, f: F) -> Source<Next, Mat>
907 where
908 Next: Send + 'static,
909 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
910 I: IntoIterator<Item = Next>,
911 I::IntoIter: Send + 'static,
912 {
913 self.via(Flow::identity().try_map_concat(f))
914 }
915
916 #[must_use]
918 #[deprecated(
919 since = "0.9.0",
920 note = "renamed to `try_map_concat` (idiomatic); the `_result` name still works"
921 )]
922 pub fn map_concat_result<Next, F, I>(self, f: F) -> Source<Next, Mat>
923 where
924 Next: Send + 'static,
925 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
926 I: IntoIterator<Item = Next>,
927 I::IntoIter: Send + 'static,
928 {
929 self.try_map_concat(f)
930 }
931
932 #[must_use]
933 pub fn map_concat_result_with_supervision<Next, F, I>(
934 self,
935 f: F,
936 decider: SupervisionDecider,
937 ) -> Source<Next, Mat>
938 where
939 Next: Send + 'static,
940 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
941 I: IntoIterator<Item = Next>,
942 I::IntoIter: Send + 'static,
943 {
944 self.via(Flow::identity().map_concat_result_with_supervision(f, decider))
945 }
946
947 #[must_use]
948 pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
949 where
950 State: Clone + Send + Sync + 'static,
951 Next: Send + 'static,
952 F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
953 {
954 self.via(Flow::identity().stateful_map(seed, f))
955 }
956
957 #[must_use]
958 pub fn try_stateful_map<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
959 where
960 State: Clone + Send + Sync + 'static,
961 Next: Send + 'static,
962 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
963 {
964 self.via(Flow::identity().try_stateful_map(seed, f))
965 }
966
967 #[must_use]
969 #[deprecated(
970 since = "0.9.0",
971 note = "renamed to `try_stateful_map` (idiomatic); the `_result` name still works"
972 )]
973 pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Source<Next, Mat>
974 where
975 State: Clone + Send + Sync + 'static,
976 Next: Send + 'static,
977 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
978 {
979 self.try_stateful_map(seed, f)
980 }
981
982 #[must_use]
983 pub fn stateful_map_result_with_supervision<State, Next, F>(
984 self,
985 seed: State,
986 f: F,
987 decider: SupervisionDecider,
988 ) -> Source<Next, Mat>
989 where
990 State: Clone + Send + Sync + 'static,
991 Next: Send + 'static,
992 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
993 {
994 self.via(Flow::identity().stateful_map_result_with_supervision(seed, f, decider))
995 }
996
997 #[must_use]
998 pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
999 where
1000 State: Clone + Send + Sync + 'static,
1001 Next: Send + 'static,
1002 F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
1003 I: IntoIterator<Item = Next>,
1004 I::IntoIter: Send + 'static,
1005 {
1006 self.via(Flow::identity().stateful_map_concat(seed, f))
1007 }
1008
1009 #[must_use]
1010 pub fn try_stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Source<Next, Mat>
1011 where
1012 State: Clone + Send + Sync + 'static,
1013 Next: Send + 'static,
1014 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1015 I: IntoIterator<Item = Next>,
1016 I::IntoIter: Send + 'static,
1017 {
1018 self.via(Flow::identity().try_stateful_map_concat(seed, f))
1019 }
1020
1021 #[must_use]
1023 #[deprecated(
1024 since = "0.9.0",
1025 note = "renamed to `try_stateful_map_concat` (idiomatic); the `_result` name still works"
1026 )]
1027 pub fn stateful_map_concat_result<State, Next, F, I>(
1028 self,
1029 seed: State,
1030 f: F,
1031 ) -> Source<Next, Mat>
1032 where
1033 State: Clone + Send + Sync + 'static,
1034 Next: Send + 'static,
1035 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1036 I: IntoIterator<Item = Next>,
1037 I::IntoIter: Send + 'static,
1038 {
1039 self.try_stateful_map_concat(seed, f)
1040 }
1041
1042 #[must_use]
1043 pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
1044 self,
1045 seed: State,
1046 f: F,
1047 decider: SupervisionDecider,
1048 ) -> Source<Next, Mat>
1049 where
1050 State: Clone + Send + Sync + 'static,
1051 Next: Send + 'static,
1052 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1053 I: IntoIterator<Item = Next>,
1054 I::IntoIter: Send + 'static,
1055 {
1056 self.via(Flow::identity().stateful_map_concat_result_with_supervision(seed, f, decider))
1057 }
1058
1059 #[must_use]
1060 pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
1061 where
1062 Next: Send + 'static,
1063 F: Fn(Out) -> Fut + Send + Sync + 'static,
1064 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1065 {
1066 self.via(Flow::identity().map_async(parallelism, f))
1067 }
1068
1069 #[must_use]
1070 pub fn map_async_with_supervision<Next, F, Fut>(
1071 self,
1072 parallelism: usize,
1073 f: F,
1074 decider: SupervisionDecider,
1075 ) -> Source<Next, Mat>
1076 where
1077 Next: Send + 'static,
1078 F: Fn(Out) -> Fut + Send + Sync + 'static,
1079 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1080 {
1081 self.via(Flow::identity().map_async_with_supervision(parallelism, f, decider))
1082 }
1083
1084 #[must_use]
1085 pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Source<Next, Mat>
1086 where
1087 Next: Send + 'static,
1088 F: Fn(Out) -> Fut + Send + Sync + 'static,
1089 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1090 {
1091 self.via(Flow::identity().map_async_unordered(parallelism, f))
1092 }
1093
1094 #[must_use]
1095 pub fn map_async_unordered_with_supervision<Next, F, Fut>(
1096 self,
1097 parallelism: usize,
1098 f: F,
1099 decider: SupervisionDecider,
1100 ) -> Source<Next, Mat>
1101 where
1102 Next: Send + 'static,
1103 F: Fn(Out) -> Fut + Send + Sync + 'static,
1104 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1105 {
1106 self.via(Flow::identity().map_async_unordered_with_supervision(parallelism, f, decider))
1107 }
1108
1109 #[must_use]
1110 pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
1111 self,
1112 parallelism: usize,
1113 per_partition: usize,
1114 partition: Partition,
1115 f: F,
1116 ) -> Source<Next, Mat>
1117 where
1118 Key: Clone + Eq + Hash + Send + 'static,
1119 Next: Send + 'static,
1120 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
1121 F: Fn(Out) -> Fut + Send + Sync + 'static,
1122 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1123 {
1124 self.via(Flow::identity().map_async_partitioned(parallelism, per_partition, partition, f))
1125 }
1126
1127 #[must_use]
1128 pub fn prefix_and_tail(self, n: usize) -> Source<(Vec<Out>, Source<Out>), Mat> {
1129 self.via(Flow::identity().prefix_and_tail(n))
1130 }
1131
1132 #[must_use]
1133 pub fn flat_map_prefix<Next, FlowMat, F>(self, n: usize, f: F) -> Source<Next, Mat>
1134 where
1135 Next: Send + 'static,
1136 FlowMat: Send + 'static,
1137 F: Fn(Vec<Out>) -> Flow<Out, Next, FlowMat> + Send + Sync + 'static,
1138 Out: Clone,
1139 {
1140 self.via(Flow::identity().flat_map_prefix(n, f))
1141 }
1142
1143 #[must_use]
1144 pub fn group_by<Key, F>(
1145 self,
1146 max_substreams: usize,
1147 f: F,
1148 allow_closed_substream_recreation: bool,
1149 ) -> Source<Source<Out>, Mat>
1150 where
1151 Key: Clone + Eq + Hash + Send + 'static,
1152 F: Fn(&Out) -> Key + Send + Sync + 'static,
1153 Out: Clone,
1154 {
1155 let batch_mode = if self.hints.inline_micro.is_some() && !allow_closed_substream_recreation
1156 {
1157 flow::GroupByBatchMode::FiniteEagerNoRecreate
1158 } else {
1159 flow::GroupByBatchMode::Immediate
1160 };
1161 self.via(Flow::identity().group_by_with_batching(
1162 max_substreams,
1163 f,
1164 allow_closed_substream_recreation,
1165 batch_mode,
1166 ))
1167 }
1168
1169 #[must_use]
1170 pub fn split_when<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1171 where
1172 F: Fn(&Out) -> bool + Send + Sync + 'static,
1173 Out: Clone,
1174 {
1175 self.via(Flow::identity().split_when(predicate))
1176 }
1177
1178 #[must_use]
1179 pub fn split_after<F>(self, predicate: F) -> Source<Source<Out>, Mat>
1180 where
1181 F: Fn(&Out) -> bool + Send + Sync + 'static,
1182 Out: Clone,
1183 {
1184 self.via(Flow::identity().split_after(predicate))
1185 }
1186
1187 #[must_use]
1188 pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Source<Next, Mat>
1189 where
1190 Next: Send + 'static,
1191 NextMat: Send + 'static,
1192 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1193 {
1194 self.via(Flow::identity().flat_map_concat(f))
1195 }
1196
1197 #[must_use]
1198 pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Source<Next, Mat>
1199 where
1200 Next: Send + 'static,
1201 NextMat: Send + 'static,
1202 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
1203 {
1204 self.via(Flow::identity().flat_map_merge(breadth, f))
1205 }
1206
1207 #[must_use]
1208 pub fn take(self, n: usize) -> Source<Out, Mat> {
1209 self.via(Flow::identity().take(n))
1210 }
1211
1212 #[must_use]
1213 pub fn drop(self, n: usize) -> Source<Out, Mat> {
1214 self.via(Flow::identity().drop(n))
1215 }
1216
1217 #[must_use]
1218 pub fn take_while<F>(self, predicate: F) -> Source<Out, Mat>
1219 where
1220 F: Fn(&Out) -> bool + Send + Sync + 'static,
1221 {
1222 self.via(Flow::identity().take_while(predicate))
1223 }
1224
1225 #[must_use]
1226 pub fn drop_while<F>(self, predicate: F) -> Source<Out, Mat>
1227 where
1228 F: Fn(&Out) -> bool + Send + Sync + 'static,
1229 {
1230 self.via(Flow::identity().drop_while(predicate))
1231 }
1232
1233 #[must_use]
1234 pub fn limit(self, max: u64) -> Source<Out, Mat> {
1235 self.via(Flow::identity().limit(max))
1236 }
1237
1238 #[must_use]
1239 pub fn grouped(self, size: usize) -> Source<Vec<Out>, Mat> {
1240 self.via(Flow::identity().grouped(size))
1241 }
1242
1243 #[must_use]
1244 pub fn scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1245 where
1246 State: Clone + Send + Sync + 'static,
1247 F: Fn(State, Out) -> State + Send + Sync + 'static,
1248 {
1249 self.via(Flow::identity().scan(seed, f))
1250 }
1251
1252 #[must_use]
1253 pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Source<State, Mat>
1254 where
1255 State: Clone + Send + Sync + 'static,
1256 F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1257 Fut: Future<Output = StreamResult<State>> + Send + 'static,
1258 {
1259 self.via(Flow::identity().scan_async(seed, f))
1260 }
1261
1262 #[must_use]
1263 pub fn try_scan<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1264 where
1265 State: Clone + Send + Sync + 'static,
1266 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1267 {
1268 self.via(Flow::identity().try_scan(seed, f))
1269 }
1270
1271 #[must_use]
1273 #[deprecated(
1274 since = "0.9.0",
1275 note = "renamed to `try_scan` (idiomatic); the `_result` name still works"
1276 )]
1277 pub fn scan_result<State, F>(self, seed: State, f: F) -> Source<State, Mat>
1278 where
1279 State: Clone + Send + Sync + 'static,
1280 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1281 {
1282 self.try_scan(seed, f)
1283 }
1284
1285 #[must_use]
1286 pub fn scan_result_with_supervision<State, F>(
1287 self,
1288 seed: State,
1289 f: F,
1290 decider: SupervisionDecider,
1291 ) -> Source<State, Mat>
1292 where
1293 State: Clone + Send + Sync + 'static,
1294 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1295 {
1296 self.via(Flow::identity().scan_result_with_supervision(seed, f, decider))
1297 }
1298
1299 #[must_use]
1300 pub fn sliding(self, size: usize, step: usize) -> Source<Vec<Out>, Mat>
1301 where
1302 Out: Clone,
1303 {
1304 self.via(Flow::identity().sliding(size, step))
1305 }
1306
1307 #[must_use]
1308 pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1309 where
1310 Acc: Clone + Send + Sync + 'static,
1311 F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1312 {
1313 self.via(Flow::identity().fold(zero, f))
1314 }
1315
1316 #[must_use]
1317 pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1318 where
1319 Acc: Clone + Send + Sync + 'static,
1320 F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1321 Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1322 {
1323 self.via(Flow::identity().fold_async(zero, f))
1324 }
1325
1326 #[must_use]
1327 pub fn map_with_resource<Resource, Next, Create, F, Close>(
1328 self,
1329 create: Create,
1330 f: F,
1331 close: Close,
1332 ) -> Source<Next, Mat>
1333 where
1334 Resource: Send + 'static,
1335 Next: Send + 'static,
1336 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1337 F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1338 Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1339 {
1340 self.via(Flow::identity().map_with_resource(create, f, close))
1341 }
1342
1343 #[must_use]
1344 pub fn try_fold<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1345 where
1346 Acc: Clone + Send + Sync + 'static,
1347 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1348 {
1349 self.via(Flow::identity().try_fold(zero, f))
1350 }
1351
1352 #[must_use]
1354 #[deprecated(
1355 since = "0.9.0",
1356 note = "renamed to `try_fold` (idiomatic); the `_result` name still works"
1357 )]
1358 pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Source<Acc, Mat>
1359 where
1360 Acc: Clone + Send + Sync + 'static,
1361 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1362 {
1363 self.try_fold(zero, f)
1364 }
1365
1366 #[must_use]
1367 pub fn fold_result_with_supervision<Acc, F>(
1368 self,
1369 zero: Acc,
1370 f: F,
1371 decider: SupervisionDecider,
1372 ) -> Source<Acc, Mat>
1373 where
1374 Acc: Clone + Send + Sync + 'static,
1375 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1376 {
1377 self.via(Flow::identity().fold_result_with_supervision(zero, f, decider))
1378 }
1379
1380 #[must_use]
1381 pub fn reduce<F>(self, f: F) -> Source<Out, Mat>
1382 where
1383 F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1384 {
1385 self.via(Flow::identity().reduce(f))
1386 }
1387
1388 #[must_use]
1389 pub fn try_reduce<F>(self, f: F) -> Source<Out, Mat>
1390 where
1391 Out: Clone,
1392 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1393 {
1394 self.via(Flow::identity().try_reduce(f))
1395 }
1396
1397 #[must_use]
1399 #[deprecated(
1400 since = "0.9.0",
1401 note = "renamed to `try_reduce` (idiomatic); the `_result` name still works"
1402 )]
1403 pub fn reduce_result<F>(self, f: F) -> Source<Out, Mat>
1404 where
1405 Out: Clone,
1406 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1407 {
1408 self.try_reduce(f)
1409 }
1410
1411 #[must_use]
1412 pub fn reduce_result_with_supervision<F>(
1413 self,
1414 f: F,
1415 decider: SupervisionDecider,
1416 ) -> Source<Out, Mat>
1417 where
1418 Out: Clone,
1419 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1420 {
1421 self.via(Flow::identity().reduce_result_with_supervision(f, decider))
1422 }
1423
1424 #[must_use]
1425 pub fn map_error<F>(self, f: F) -> Source<Out, Mat>
1426 where
1427 F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
1428 {
1429 self.via(Flow::identity().map_error(f))
1430 }
1431
1432 #[must_use]
1433 pub fn recover<F>(self, f: F) -> Source<Out, Mat>
1434 where
1435 F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
1436 {
1437 self.via(Flow::identity().recover(f))
1438 }
1439
1440 #[must_use]
1441 pub fn recover_with<F>(self, f: F) -> Source<Out, Mat>
1442 where
1443 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1444 {
1445 self.via(Flow::identity().recover_with(f))
1446 }
1447
1448 #[must_use]
1449 pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Source<Out, Mat>
1450 where
1451 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
1452 {
1453 self.via(Flow::identity().recover_with_retries(retries, f))
1454 }
1455
1456 #[must_use]
1457 pub fn on_error_complete(self) -> Source<Out, Mat> {
1458 self.via(Flow::identity().on_error_complete())
1459 }
1460
1461 #[must_use]
1462 pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1463 where
1464 Mat2: Send + 'static,
1465 {
1466 let factory = self.factory;
1467 let hints = self.hints;
1468 let that_factory = that.factory;
1469 Source::from_materialized_factory_with_hints(
1470 move |materializer| {
1471 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1472 let secondary = match Arc::clone(&that_factory).create(materializer) {
1473 Ok((stream, _)) => stream,
1474 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1475 };
1476 Ok((concat_source_streams(vec![primary, secondary]), mat))
1477 },
1478 hints,
1479 )
1480 }
1481
1482 #[must_use]
1483 pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1484 where
1485 Mat2: Send + 'static,
1486 {
1487 let factory = self.factory;
1488 let hints = self.hints;
1489 let that_factory = that.factory;
1490 Source::from_materialized_factory_with_hints(
1491 move |materializer| {
1492 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1493 Ok((
1494 concat_source_streams_lazy(
1495 primary,
1496 vec![Arc::clone(&that_factory)],
1497 materializer,
1498 ),
1499 mat,
1500 ))
1501 },
1502 hints,
1503 )
1504 }
1505
1506 #[must_use]
1507 pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Source<Out, Mat>
1508 where
1509 Mat2: Send + 'static,
1510 I: IntoIterator<Item = Source<Out, Mat2>>,
1511 {
1512 let factory = self.factory;
1513 let hints = self.hints;
1514 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1515 Source::from_materialized_factory_with_hints(
1516 move |materializer| {
1517 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1518 Ok((
1519 concat_source_streams_lazy(primary, other_factories.clone(), materializer),
1520 mat,
1521 ))
1522 },
1523 hints,
1524 )
1525 }
1526
1527 #[must_use]
1528 pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1529 where
1530 Mat2: Send + 'static,
1531 {
1532 let factory = self.factory;
1533 let hints = self.hints;
1534 let that_factory = that.factory;
1535 Source::from_materialized_factory_with_hints(
1536 move |materializer| {
1537 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1538 let secondary = match Arc::clone(&that_factory).create(materializer) {
1539 Ok((stream, _)) => stream,
1540 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1541 };
1542 Ok((concat_source_streams(vec![secondary, primary]), mat))
1543 },
1544 hints,
1545 )
1546 }
1547
1548 #[must_use]
1549 pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1550 where
1551 Mat2: Send + 'static,
1552 {
1553 self.prepend(that)
1554 }
1555
1556 #[must_use]
1557 pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Source<Out, Mat>
1558 where
1559 Mat2: Send + 'static,
1560 {
1561 let factory = self.factory;
1562 let hints = self.hints;
1563 let secondary_factory = secondary.factory;
1564 Source::from_materialized_factory_with_hints(
1565 move |materializer| {
1566 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1567 let secondary = match Arc::clone(&secondary_factory).create(materializer) {
1568 Ok((stream, _)) => stream,
1569 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1570 };
1571 Ok((or_else_source_stream(primary, secondary), mat))
1572 },
1573 hints,
1574 )
1575 }
1576
1577 #[must_use]
1578 pub fn interleave<Mat2>(self, that: Source<Out, Mat2>, segment_size: usize) -> Source<Out, Mat>
1579 where
1580 Mat2: Send + 'static,
1581 {
1582 self.interleave_all([that], segment_size, false)
1583 }
1584
1585 #[must_use]
1586 pub fn interleave_all<Mat2, I>(
1587 self,
1588 those: I,
1589 segment_size: usize,
1590 eager_close: bool,
1591 ) -> Source<Out, Mat>
1592 where
1593 Mat2: Send + 'static,
1594 I: IntoIterator<Item = Source<Out, Mat2>>,
1595 {
1596 let factory = self.factory;
1597 let hints = self.hints;
1598 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1599 Source::from_materialized_factory_with_hints(
1600 move |materializer| {
1601 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1602 let mut streams = Vec::with_capacity(other_factories.len() + 1);
1603 streams.push(primary);
1604 for other in &other_factories {
1605 let stream = match Arc::clone(other).create(materializer) {
1606 Ok((stream, _)) => stream,
1607 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1608 };
1609 streams.push(stream);
1610 }
1611 Ok((
1612 interleave_source_streams(streams, segment_size, eager_close),
1613 mat,
1614 ))
1615 },
1616 hints,
1617 )
1618 }
1619
1620 #[must_use]
1621 pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Source<Out, Mat>
1622 where
1623 Out: Ord,
1624 Mat2: Send + 'static,
1625 {
1626 self.via(Flow::identity().merge_sorted(that))
1627 }
1628
1629 #[must_use]
1630 pub fn merge_latest<Mat2>(
1631 self,
1632 that: Source<Out, Mat2>,
1633 eager_complete: bool,
1634 ) -> Source<Vec<Out>, Mat>
1635 where
1636 Out: Clone,
1637 Mat2: Send + 'static,
1638 {
1639 let factory = self.factory;
1640 let hints = self.hints;
1641 let that_factory = that.factory;
1642 Source::from_materialized_factory_with_hints(
1643 move |materializer| {
1644 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1645 let right = match Arc::clone(&that_factory).create(materializer) {
1646 Ok((stream, _)) => stream,
1647 Err(error) => {
1648 return Ok((
1649 Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>,
1650 mat,
1651 ));
1652 }
1653 };
1654 Ok((merge_latest_streams(vec![left, right], eager_complete), mat))
1655 },
1656 hints,
1657 )
1658 }
1659
1660 #[must_use]
1661 pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Source<Out, Mat>
1662 where
1663 Mat2: Send + 'static,
1664 I: IntoIterator<Item = Source<Out, Mat2>>,
1665 {
1666 let factory = self.factory;
1667 let hints = self.hints;
1668 let other_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
1669 Source::from_materialized_factory_with_hints(
1670 move |materializer| {
1671 let (primary, mat) = Arc::clone(&factory).create(materializer)?;
1672 let mut streams = Vec::with_capacity(other_factories.len() + 1);
1673 streams.push(primary);
1674 for other in &other_factories {
1675 let stream = match Arc::clone(other).create(materializer) {
1676 Ok((stream, _)) => stream,
1677 Err(error) => {
1678 return Ok((
1679 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
1680 mat,
1681 ));
1682 }
1683 };
1684 streams.push(stream);
1685 }
1686 Ok((merge_streams(streams, eager_complete), mat))
1687 },
1688 hints,
1689 )
1690 }
1691
1692 #[must_use]
1693 pub fn zip_with<Mat2, Out2, Next, F>(
1694 self,
1695 that: Source<Out2, Mat2>,
1696 combine: F,
1697 ) -> Source<Next, Mat>
1698 where
1699 Out2: Send + 'static,
1700 Next: Send + 'static,
1701 Mat2: Send + 'static,
1702 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1703 {
1704 self.via(Flow::identity().zip_with(that, combine))
1705 }
1706
1707 #[must_use]
1708 pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Source<(Out, Out2), Mat>
1709 where
1710 Out: Clone,
1711 Out2: Clone + Send + 'static,
1712 Mat2: Send + 'static,
1713 {
1714 self.zip_latest_with(that, true, |left, right| (left, right))
1715 }
1716
1717 #[must_use]
1718 pub fn zip_latest_with<Mat2, Out2, Next, F>(
1719 self,
1720 that: Source<Out2, Mat2>,
1721 eager_complete: bool,
1722 combine: F,
1723 ) -> Source<Next, Mat>
1724 where
1725 Out: Clone,
1726 Out2: Clone + Send + 'static,
1727 Next: Send + 'static,
1728 Mat2: Send + 'static,
1729 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
1730 {
1731 let factory = self.factory;
1732 let hints = self.hints;
1733 let that_factory = that.factory;
1734 let combine = Arc::new(combine);
1735 Source::from_materialized_factory_with_hints(
1736 move |materializer| {
1737 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1738 let right = match Arc::clone(&that_factory).create(materializer) {
1739 Ok((stream, _)) => stream,
1740 Err(error) => {
1741 return Ok((
1742 Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
1743 mat,
1744 ));
1745 }
1746 };
1747 Ok((
1748 zip_latest_with_stream(left, right, eager_complete, Arc::clone(&combine)),
1749 mat,
1750 ))
1751 },
1752 hints,
1753 )
1754 }
1755
1756 #[must_use]
1757 pub fn zip_with_index(self) -> Source<(Out, u64), Mat> {
1758 let factory = self.factory;
1759 let hints = self.hints;
1760 Source::from_materialized_factory_with_hints(
1761 move |materializer| {
1762 let (mut stream, mat) = Arc::clone(&factory).create(materializer)?;
1763 let mut index = 0_u64;
1764 Ok((
1765 Box::new(std::iter::from_fn(move || {
1766 stream.next().map(|item| {
1767 item.map(|value| {
1768 let pair = (value, index);
1769 index = index.wrapping_add(1);
1770 pair
1771 })
1772 })
1773 })) as BoxStream<(Out, u64)>,
1774 mat,
1775 ))
1776 },
1777 hints,
1778 )
1779 }
1780
1781 #[must_use]
1782 pub fn zip_all<Mat2, Out2>(
1783 self,
1784 that: Source<Out2, Mat2>,
1785 this_elem: Out,
1786 that_elem: Out2,
1787 ) -> Source<(Out, Out2), Mat>
1788 where
1789 Out: Clone + Sync,
1790 Out2: Clone + Send + Sync + 'static,
1791 Mat2: Send + 'static,
1792 {
1793 let factory = self.factory;
1794 let hints = self.hints;
1795 let that_factory = that.factory;
1796 Source::from_materialized_factory_with_hints(
1797 move |materializer| {
1798 let (left, mat) = Arc::clone(&factory).create(materializer)?;
1799 let right = match Arc::clone(&that_factory).create(materializer) {
1800 Ok((stream, _)) => stream,
1801 Err(error) => {
1802 return Ok((
1803 Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>,
1804 mat,
1805 ));
1806 }
1807 };
1808 Ok((
1809 zip_all_stream(left, right, this_elem.clone(), that_elem.clone()),
1810 mat,
1811 ))
1812 },
1813 hints,
1814 )
1815 }
1816
1817 #[must_use]
1818 pub fn also_to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1819 where
1820 Out: Clone,
1821 SinkMat: Send + 'static,
1822 {
1823 self.via(Flow::identity().also_to(sink))
1824 }
1825
1826 #[must_use]
1827 pub fn also_to_all<SinkMat, I>(self, sinks: I) -> Source<Out, Mat>
1828 where
1829 Out: Clone,
1830 SinkMat: Send + 'static,
1831 I: IntoIterator<Item = Sink<Out, SinkMat>>,
1832 {
1833 self.via(Flow::identity().also_to_all(sinks))
1834 }
1835
1836 #[must_use]
1837 pub fn divert_to<SinkMat, F>(self, sink: Sink<Out, SinkMat>, predicate: F) -> Source<Out, Mat>
1838 where
1839 SinkMat: Send + 'static,
1840 F: Fn(&Out) -> bool + Send + Sync + 'static,
1841 {
1842 self.via(Flow::identity().divert_to(sink, predicate))
1843 }
1844
1845 #[must_use]
1846 pub fn wire_tap<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Source<Out, Mat>
1847 where
1848 Out: Clone,
1849 SinkMat: Send + 'static,
1850 {
1851 self.via(Flow::identity().wire_tap(sink))
1852 }
1853
1854 pub fn run_with<SinkMat: Send + 'static>(
1855 self,
1856 sink: Sink<Out, SinkMat>,
1857 ) -> StreamResult<SinkMat> {
1858 let fast_result = self
1861 .split_hook
1862 .as_ref()
1863 .zip(sink.fold_fp.as_deref())
1864 .and_then(|(hook, fp)| fp.try_register(Arc::clone(hook)));
1865 if let Some(result) = fast_result {
1866 return result?.downcast::<SinkMat>().map(|b| *b).map_err(|_| {
1867 StreamError::Failed("split fast path: unexpected mat type (internal error)".into())
1868 });
1869 }
1870 self.to_mat(sink, Keep::right).run()
1871 }
1872
1873 pub fn run_with_materializer<SinkMat: Send + 'static>(
1874 self,
1875 sink: Sink<Out, SinkMat>,
1876 materializer: &Materializer,
1877 ) -> StreamResult<SinkMat> {
1878 self.to_mat(sink, Keep::right)
1879 .run_with_materializer(materializer)
1880 }
1881
1882 #[must_use]
1883 pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> RunnableGraph<Mat>
1884 where
1885 SinkMat: Send + 'static,
1886 {
1887 self.to_mat(sink, Keep::left)
1888 }
1889
1890 #[must_use]
1891 pub fn to_mat<SinkMat, Combined, F>(
1892 self,
1893 sink: Sink<Out, SinkMat>,
1894 combine: F,
1895 ) -> RunnableGraph<Combined>
1896 where
1897 SinkMat: Send + 'static,
1898 Combined: Send + 'static,
1899 F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
1900 {
1901 let factory = self.factory;
1902 let terminal_factory = self.terminal_factory;
1903 let hints = self.hints;
1904 let combine = Arc::new(combine);
1905 RunnableGraph::from_runner(move |materializer| {
1906 if let Some(terminal_factory) = &terminal_factory
1907 && let Some(fold_fp) = sink.fold_fp.as_deref()
1908 {
1909 let (hook, source_mat) = terminal_factory(materializer)?;
1910 if hook.supports_direct_terminal()
1911 && let Some(sink_mat) =
1912 fold_fp.try_register_direct_terminal(Arc::clone(&hook), materializer)
1913 {
1914 let sink_mat = sink_mat.and_then(|mat| {
1915 mat.downcast::<SinkMat>().map(|boxed| *boxed).map_err(|_| {
1916 StreamError::Failed(
1917 "terminal fast path: unexpected mat type (internal error)".into(),
1918 )
1919 })
1920 })?;
1921 return Ok(combine(source_mat, sink_mat));
1922 }
1923 if !fold_fp.supports_terminal_drain() {
1924 let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1925 let sink_mat = sink.run_from_source(stream, materializer, hints.runtime())?;
1926 return Ok(combine(source_mat, sink_mat));
1927 }
1928 let sink_mat = fold_fp
1929 .try_register_terminal_drain(hook, materializer)
1930 .expect("terminal drain support advertised")
1931 .and_then(|mat| {
1932 mat.downcast::<SinkMat>().map(|boxed| *boxed).map_err(|_| {
1933 StreamError::Failed(
1934 "terminal fast path: unexpected mat type (internal error)".into(),
1935 )
1936 })
1937 })?;
1938 return Ok(combine(source_mat, sink_mat));
1939 }
1940 let (stream, source_mat) = Arc::clone(&factory).create(materializer)?;
1941 let sink_mat = if hints.inline_head_terminal && sink.can_inline() {
1942 let stream =
1943 runtime_checked_stream(stream, Arc::clone(&materializer.inner.state), None);
1944 sink.run_inline(stream, materializer)?
1945 } else {
1946 sink.run_from_source(stream, materializer, hints.runtime())?
1947 };
1948 Ok(combine(source_mat, sink_mat))
1949 })
1950 }
1951
1952 pub fn run_collect(self) -> StreamResult<Vec<Out>> {
1953 self.run_with(Sink::collect())?.wait()
1954 }
1955
1956 pub fn run_fold<Acc, F>(self, zero: Acc, f: F) -> StreamResult<StreamCompletion<Acc>>
1957 where
1958 Acc: Clone + Send + Sync + 'static,
1959 F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1960 {
1961 self.run_with(Sink::fold(zero, f))
1962 }
1963
1964 pub fn run_foreach<F>(self, f: F) -> StreamResult<StreamCompletion<NotUsed>>
1965 where
1966 F: Fn(Out) + Send + Sync + 'static,
1967 {
1968 self.run_with(Sink::foreach(f))
1969 }
1970
1971 pub fn run_for_each<F>(self, f: F) -> StreamResult<StreamCompletion<NotUsed>>
1972 where
1973 F: Fn(Out) + Send + Sync + 'static,
1974 {
1975 self.run_with(Sink::foreach(f))
1976 }
1977
1978 pub fn run_reduce<F>(self, f: F) -> StreamResult<StreamCompletion<Out>>
1979 where
1980 F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1981 {
1982 self.run_with(Sink::reduce(f))
1983 }
1984
1985 #[must_use]
1986 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Source<Out, NextMat>
1987 where
1988 NextMat: Send + 'static,
1989 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
1990 {
1991 let factory = self.factory;
1992 let terminal_factory = self.terminal_factory;
1993 let hints = self.hints;
1994 let f = Arc::new(f);
1995 let factory_f = Arc::clone(&f);
1996 let mapped_terminal_factory = terminal_factory.map(|terminal_factory| {
1997 let f = Arc::clone(&f);
1998 Arc::new(move |materializer: &Materializer| {
1999 let (hook, mat) = terminal_factory(materializer)?;
2000 Ok((hook, f(mat)))
2001 }) as Arc<TerminalSourceFactory<Out, NextMat>>
2002 });
2003 Source {
2004 factory: Arc::new(FnSourceFactory(move |materializer: &Materializer| {
2005 let (stream, mat) = Arc::clone(&factory).create(materializer)?;
2006 Ok((stream, factory_f(mat)))
2007 })),
2008 terminal_factory: mapped_terminal_factory,
2009 hints,
2010 attributes: Attributes::default(),
2011 split_hook: None,
2012 }
2013 }
2014}
2015
2016impl<Out: Clone + Send + Sync + 'static> Source<Out, NotUsed> {
2017 #[must_use]
2018 pub fn combine<Mat1, Mat2, MatRest, I>(
2019 first: Source<Out, Mat1>,
2020 second: Source<Out, Mat2>,
2021 rest: I,
2022 strategy: SourceCombineStrategy,
2023 ) -> Source<Out, NotUsed>
2024 where
2025 Mat1: Send + 'static,
2026 Mat2: Send + 'static,
2027 MatRest: Send + 'static,
2028 I: IntoIterator<Item = Source<Out, MatRest>>,
2029 {
2030 let mut factories: Vec<Arc<CombinedSourceFactory<Out>>> = vec![
2031 Arc::new(move |materializer| {
2032 Arc::clone(&first.factory)
2033 .create(materializer)
2034 .map(|(stream, _)| stream)
2035 }),
2036 Arc::new(move |materializer| {
2037 Arc::clone(&second.factory)
2038 .create(materializer)
2039 .map(|(stream, _)| stream)
2040 }),
2041 ];
2042 factories.extend(rest.into_iter().map(|source| {
2043 Arc::new(move |materializer: &Materializer| {
2044 Arc::clone(&source.factory)
2045 .create(materializer)
2046 .map(|(stream, _)| stream)
2047 }) as Arc<CombinedSourceFactory<Out>>
2048 }));
2049 Source::from_materialized_factory(move |materializer| {
2050 let mut streams = Vec::with_capacity(factories.len());
2051 for factory in &factories {
2052 let stream = match factory(materializer) {
2053 Ok(stream) => stream,
2054 Err(error) => {
2055 return Ok((
2056 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
2057 NotUsed,
2058 ));
2059 }
2060 };
2061 streams.push(stream);
2062 }
2063 let stream = match &strategy {
2064 SourceCombineStrategy::Merge { eager_complete } => {
2065 merge_streams(streams, *eager_complete)
2066 }
2067 SourceCombineStrategy::Concat => concat_source_streams(streams),
2068 SourceCombineStrategy::Prioritized {
2069 priorities,
2070 eager_complete,
2071 } => {
2072 if priorities.len() != streams.len() {
2073 return Err(StreamError::GraphValidation(format!(
2074 "combine priorities length {} did not match source count {}",
2075 priorities.len(),
2076 streams.len()
2077 )));
2078 }
2079 merge_prioritized_streams(streams, priorities.clone(), *eager_complete)
2080 }
2081 };
2082 Ok((stream, NotUsed))
2083 })
2084 }
2085
2086 #[must_use]
2087 pub fn zip_n<Mat2, I>(sources: I) -> Source<Vec<Out>, NotUsed>
2088 where
2089 I: IntoIterator<Item = Source<Out, Mat2>>,
2090 Mat2: Send + 'static,
2091 Out: Clone,
2092 {
2093 Self::zip_with_n(sources, |values| values)
2094 }
2095
2096 #[must_use]
2097 pub fn zip_with_n<Mat2, I, Next, F>(sources: I, zipper: F) -> Source<Next, NotUsed>
2098 where
2099 I: IntoIterator<Item = Source<Out, Mat2>>,
2100 Mat2: Send + 'static,
2101 Next: Send + 'static,
2102 F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
2103 {
2104 let factories: Vec<_> = sources.into_iter().map(|source| source.factory).collect();
2105 let zipper = Arc::new(zipper);
2106 Source::from_materialized_factory(move |materializer| {
2107 let mut streams = Vec::with_capacity(factories.len());
2108 for factory in &factories {
2109 let stream = match Arc::clone(factory).create(materializer) {
2110 Ok((stream, _)) => stream,
2111 Err(error) => {
2112 return Ok((
2113 Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
2114 NotUsed,
2115 ));
2116 }
2117 };
2118 streams.push(stream);
2119 }
2120 Ok((zip_n_streams(streams, Arc::clone(&zipper)), NotUsed))
2121 })
2122 }
2123
2124 #[must_use]
2125 pub fn merge_prioritized_n<Mat2, I>(
2126 sources_and_priorities: I,
2127 eager_complete: bool,
2128 ) -> Source<Out, NotUsed>
2129 where
2130 I: IntoIterator<Item = (Source<Out, Mat2>, usize)>,
2131 Mat2: Send + 'static,
2132 {
2133 let sources_and_priorities: Vec<_> = sources_and_priorities.into_iter().collect();
2134 if sources_and_priorities.is_empty() {
2135 return Source::empty();
2136 }
2137 let (factories, priorities): (Vec<_>, Vec<_>) = sources_and_priorities
2138 .into_iter()
2139 .map(|(source, priority)| (source.factory, priority))
2140 .unzip();
2141 Source::from_materialized_factory(move |materializer| {
2142 let mut streams = Vec::with_capacity(factories.len());
2143 for factory in &factories {
2144 let stream = match Arc::clone(factory).create(materializer) {
2145 Ok((stream, _)) => stream,
2146 Err(error) => {
2147 return Ok((
2148 Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
2149 NotUsed,
2150 ));
2151 }
2152 };
2153 streams.push(stream);
2154 }
2155 Ok((
2156 merge_prioritized_streams(streams, priorities.clone(), eager_complete),
2157 NotUsed,
2158 ))
2159 })
2160 }
2161
2162 #[must_use]
2163 pub fn maybe() -> (MaybeHandle<Out>, Self) {
2164 let value = Arc::new(Mutex::new(None));
2165 let handle = MaybeHandle {
2166 value: Arc::clone(&value),
2167 };
2168 let source = Self::from_factory(move || {
2169 let result = value
2170 .lock()
2171 .expect("maybe source poisoned")
2172 .clone()
2173 .unwrap_or(Err(StreamError::MaybeIncomplete));
2174 Box::new(std::iter::once(result))
2175 });
2176 (handle, source)
2177 }
2178
2179 #[must_use]
2180 pub fn single(item: Out) -> Self {
2181 Self::from_factory_with_hints(
2182 move || Box::new(std::iter::once(Ok(item.clone()))),
2183 SourceHints::with_inline_micro(1),
2184 )
2185 }
2186
2187 #[must_use]
2188 pub fn repeat(item: Out) -> Self {
2189 Self::from_factory(move || {
2190 let item = item.clone();
2191 Box::new(std::iter::repeat_with(move || Ok(item.clone())))
2192 })
2193 }
2194
2195 #[must_use]
2196 pub fn from_iterable<I>(items: I) -> Self
2197 where
2198 I: IntoIterator<Item = Out>,
2199 {
2200 items.into_iter().collect()
2201 }
2202}
2203
2204impl<Out: Clone + Send + Sync + 'static> FromIterator<Out> for Source<Out, NotUsed> {
2205 fn from_iter<T: IntoIterator<Item = Out>>(iter: T) -> Self {
2206 let items: Arc<[Out]> = iter.into_iter().collect();
2209 let len = items.len();
2210 Self::from_factory_with_hints(
2211 move || {
2212 let items = Arc::clone(&items);
2213 let mut index = 0;
2214 Box::new(std::iter::from_fn(move || {
2215 let item = items.get(index)?.clone();
2216 index += 1;
2217 Some(Ok(item))
2218 }))
2219 },
2220 SourceHints::with_inline_micro(len),
2222 )
2223 }
2224}
2225
2226impl<Out: Clone + Send + Sync + 'static> From<Vec<Out>> for Source<Out, NotUsed> {
2227 fn from(items: Vec<Out>) -> Self {
2228 Source::from_iter(items)
2229 }
2230}
2231
2232impl<Out: Clone + Send + Sync + 'static, const N: usize> From<[Out; N]> for Source<Out, NotUsed> {
2233 fn from(items: [Out; N]) -> Self {
2234 Source::from_iter(items)
2235 }
2236}
2237
2238pub trait IntoSource: IntoIterator + Sized {
2268 #[must_use]
2269 fn into_source(self) -> Source<Self::Item, NotUsed>
2270 where
2271 Self::Item: Clone + Send + Sync + 'static,
2272 {
2273 Source::from_iter(self)
2274 }
2275}
2276
2277impl<I> IntoSource for I where I: IntoIterator {}
2278
2279#[cfg(test)]
2284pub(in crate::stream) fn test_source_with_inline_micro_hint<Out: Send + 'static>(
2285 factory: impl Fn() -> BoxStream<Out> + Send + Sync + 'static,
2286 max_success_items: usize,
2287) -> Source<Out, NotUsed> {
2288 Source::from_factory_with_hints(factory, SourceHints::with_inline_micro(max_success_items))
2289}
2290
2291struct UnfoldResourceStream<Resource, Out, Create, Read, Close>
2292where
2293 Create: Fn() -> StreamResult<Resource>,
2294 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2295 Close: Fn(Resource) -> StreamResult<()>,
2296{
2297 create: Arc<Create>,
2298 read: Arc<Read>,
2299 close: Arc<Close>,
2300 resource: Option<Resource>,
2301 created: bool,
2302 terminated: bool,
2303 _marker: PhantomData<fn() -> Out>,
2304}
2305
2306impl<Resource, Out, Create, Read, Close> UnfoldResourceStream<Resource, Out, Create, Read, Close>
2307where
2308 Create: Fn() -> StreamResult<Resource>,
2309 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2310 Close: Fn(Resource) -> StreamResult<()>,
2311{
2312 fn ensure_created(&mut self) -> StreamResult<()> {
2313 if self.created {
2314 return Ok(());
2315 }
2316 self.created = true;
2317 let resource = catch_unwind_failed("unfold_resource create", || (self.create)())
2318 .and_then(|result| result)?;
2319 self.resource = Some(resource);
2320 Ok(())
2321 }
2322
2323 fn close_resource(&mut self) -> StreamResult<()> {
2324 match self.resource.take() {
2325 Some(resource) => {
2326 catch_unwind_failed("unfold_resource close", || (self.close)(resource))
2327 .and_then(|result| result)
2328 }
2329 None => Ok(()),
2330 }
2331 }
2332}
2333
2334impl<Resource, Out, Create, Read, Close> Iterator
2335 for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2336where
2337 Create: Fn() -> StreamResult<Resource>,
2338 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2339 Close: Fn(Resource) -> StreamResult<()>,
2340{
2341 type Item = StreamResult<Out>;
2342
2343 fn next(&mut self) -> Option<Self::Item> {
2344 if self.terminated {
2345 return None;
2346 }
2347 if let Err(error) = self.ensure_created() {
2348 self.terminated = true;
2349 return Some(Err(error));
2350 }
2351
2352 let result = {
2353 let resource = self
2354 .resource
2355 .as_mut()
2356 .expect("unfold_resource resource is open");
2357 catch_unwind_failed("unfold_resource read", || (self.read)(resource))
2358 .and_then(|result| result)
2359 };
2360
2361 match result {
2362 Ok(Some(item)) => Some(Ok(item)),
2363 Ok(None) => {
2364 self.terminated = true;
2365 match self.close_resource() {
2366 Ok(()) => None,
2367 Err(error) => Some(Err(error)),
2368 }
2369 }
2370 Err(read_error) => {
2371 self.terminated = true;
2372 let _ = self.close_resource();
2373 Some(Err(read_error))
2374 }
2375 }
2376 }
2377}
2378
2379impl<Resource, Out, Create, Read, Close> Drop
2380 for UnfoldResourceStream<Resource, Out, Create, Read, Close>
2381where
2382 Create: Fn() -> StreamResult<Resource>,
2383 Read: Fn(&mut Resource) -> StreamResult<Option<Out>>,
2384 Close: Fn(Resource) -> StreamResult<()>,
2385{
2386 fn drop(&mut self) {
2387 let _ = self.close_resource();
2388 }
2389}
2390
2391type UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut> =
2392 fn() -> (Out, CreateFut, ReadFut, CloseFut);
2393
2394struct UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2395where
2396 Resource: Send + 'static,
2397 Create: Fn() -> CreateFut,
2398 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2399 Read: Fn(&mut Resource) -> ReadFut,
2400 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2401 Close: Fn(Resource) -> CloseFut,
2402 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2403{
2404 create: Arc<Create>,
2405 read: Arc<Read>,
2406 close: Arc<Close>,
2407 resource: Option<Resource>,
2408 created: bool,
2409 terminated: bool,
2410 _marker: PhantomData<UnfoldResourceAsyncMarker<Out, CreateFut, ReadFut, CloseFut>>,
2411}
2412
2413impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2414 UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2415where
2416 Resource: Send + 'static,
2417 Create: Fn() -> CreateFut,
2418 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2419 Read: Fn(&mut Resource) -> ReadFut,
2420 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2421 Close: Fn(Resource) -> CloseFut,
2422 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2423{
2424 fn ensure_created(&mut self) -> StreamResult<()> {
2425 if self.created {
2426 return Ok(());
2427 }
2428 self.created = true;
2429 let resource = catch_unwind_failed("unfold_resource_async create", || (self.create)())
2430 .and_then(flow::run_future_inline_or_spawn)?;
2431 self.resource = Some(resource);
2432 Ok(())
2433 }
2434
2435 fn close_resource(&mut self) -> StreamResult<()> {
2436 match self.resource.take() {
2437 Some(resource) => {
2438 catch_unwind_failed("unfold_resource_async close", || (self.close)(resource))
2439 .and_then(flow::run_future_inline_or_spawn)
2440 }
2441 None => Ok(()),
2442 }
2443 }
2444}
2445
2446impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Iterator
2447 for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2448where
2449 Resource: Send + 'static,
2450 Out: Send + 'static,
2451 Create: Fn() -> CreateFut,
2452 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2453 Read: Fn(&mut Resource) -> ReadFut,
2454 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2455 Close: Fn(Resource) -> CloseFut,
2456 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2457{
2458 type Item = StreamResult<Out>;
2459
2460 fn next(&mut self) -> Option<Self::Item> {
2461 if self.terminated {
2462 return None;
2463 }
2464 if let Err(error) = self.ensure_created() {
2465 self.terminated = true;
2466 return Some(Err(error));
2467 }
2468
2469 let result = {
2470 let resource = self
2471 .resource
2472 .as_mut()
2473 .expect("unfold_resource_async resource is open");
2474 catch_unwind_failed("unfold_resource_async read", || (self.read)(resource))
2475 .and_then(flow::run_future_inline_or_spawn)
2476 };
2477
2478 match result {
2479 Ok(Some(item)) => Some(Ok(item)),
2480 Ok(None) => {
2481 self.terminated = true;
2482 match self.close_resource() {
2483 Ok(()) => None,
2484 Err(error) => Some(Err(error)),
2485 }
2486 }
2487 Err(read_error) => {
2488 self.terminated = true;
2489 let _ = self.close_resource();
2490 Some(Err(read_error))
2491 }
2492 }
2493 }
2494}
2495
2496impl<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut> Drop
2497 for UnfoldResourceAsyncStream<Resource, Out, Create, CreateFut, Read, ReadFut, Close, CloseFut>
2498where
2499 Resource: Send + 'static,
2500 Create: Fn() -> CreateFut,
2501 CreateFut: Future<Output = StreamResult<Resource>> + Send + 'static,
2502 Read: Fn(&mut Resource) -> ReadFut,
2503 ReadFut: Future<Output = StreamResult<Option<Out>>> + Send + 'static,
2504 Close: Fn(Resource) -> CloseFut,
2505 CloseFut: Future<Output = StreamResult<()>> + Send + 'static,
2506{
2507 fn drop(&mut self) {
2508 let _ = self.close_resource();
2509 }
2510}
2511
2512struct LazySourceStream<Out, InnerMat, F> {
2513 create: Arc<F>,
2514 materializer: Materializer,
2515 current: Option<BoxStream<Out>>,
2516 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2517 initialized: bool,
2518 terminated: bool,
2519}
2520
2521impl<Out, InnerMat, F> LazySourceStream<Out, InnerMat, F>
2522where
2523 Out: Send + 'static,
2524 InnerMat: Send + 'static,
2525 F: Fn() -> Source<Out, InnerMat>,
2526{
2527 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2528 if let Some(sender) = self.mat_sender.take() {
2529 let _ = sender.send(result);
2530 }
2531 }
2532
2533 fn initialize(&mut self) -> StreamResult<()> {
2534 if self.initialized {
2535 return Ok(());
2536 }
2537 self.initialized = true;
2538 let source = match catch_unwind_failed("lazy_source factory", || (self.create)()) {
2539 Ok(source) => source,
2540 Err(error) => {
2541 self.complete_mat(Err(error.clone()));
2542 return Err(error);
2543 }
2544 };
2545 match Arc::clone(&source.factory).create(&self.materializer) {
2546 Ok((stream, mat)) => {
2547 self.current = Some(stream);
2548 self.complete_mat(Ok(mat));
2549 Ok(())
2550 }
2551 Err(error) => {
2552 self.complete_mat(Err(error.clone()));
2553 Err(error)
2554 }
2555 }
2556 }
2557}
2558
2559impl<Out, InnerMat, F> Iterator for LazySourceStream<Out, InnerMat, F>
2560where
2561 Out: Send + 'static,
2562 InnerMat: Send + 'static,
2563 F: Fn() -> Source<Out, InnerMat>,
2564{
2565 type Item = StreamResult<Out>;
2566
2567 fn next(&mut self) -> Option<Self::Item> {
2568 if self.terminated {
2569 return None;
2570 }
2571 if let Err(error) = self.initialize() {
2572 self.terminated = true;
2573 return Some(Err(error));
2574 }
2575 match self
2576 .current
2577 .as_mut()
2578 .expect("lazy_source current stream initialized")
2579 .next()
2580 {
2581 Some(Ok(item)) => Some(Ok(item)),
2582 Some(Err(error)) => {
2583 self.terminated = true;
2584 Some(Err(error))
2585 }
2586 None => {
2587 self.terminated = true;
2588 None
2589 }
2590 }
2591 }
2592}
2593
2594impl<Out, InnerMat, F> Drop for LazySourceStream<Out, InnerMat, F> {
2595 fn drop(&mut self) {
2596 if !self.initialized
2597 && let Some(sender) = self.mat_sender.take()
2598 {
2599 let _ = sender.send(Err(StreamError::Failed(
2600 "lazy source was never materialized".into(),
2601 )));
2602 }
2603 }
2604}
2605
2606struct LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2607 create: Arc<F>,
2608 materializer: Materializer,
2609 current: Option<BoxStream<Out>>,
2610 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
2611 initialized: bool,
2612 terminated: bool,
2613 _marker: PhantomData<fn() -> Fut>,
2614}
2615
2616impl<Out, InnerMat, F, Fut> LazyFutureSourceStream<Out, InnerMat, F, Fut>
2617where
2618 Out: Send + 'static,
2619 InnerMat: Send + 'static,
2620 F: Fn() -> Fut,
2621 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2622{
2623 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
2624 if let Some(sender) = self.mat_sender.take() {
2625 let _ = sender.send(result);
2626 }
2627 }
2628
2629 fn initialize(&mut self) -> StreamResult<()> {
2630 if self.initialized {
2631 return Ok(());
2632 }
2633 self.initialized = true;
2634 let source = match catch_unwind_failed("lazy_future_source factory", || (self.create)())
2635 .and_then(flow::run_future_inline_or_spawn)
2636 {
2637 Ok(source) => source,
2638 Err(error) => {
2639 self.complete_mat(Err(error.clone()));
2640 return Err(error);
2641 }
2642 };
2643 match Arc::clone(&source.factory).create(&self.materializer) {
2644 Ok((stream, mat)) => {
2645 self.current = Some(stream);
2646 self.complete_mat(Ok(mat));
2647 Ok(())
2648 }
2649 Err(error) => {
2650 self.complete_mat(Err(error.clone()));
2651 Err(error)
2652 }
2653 }
2654 }
2655}
2656
2657impl<Out, InnerMat, F, Fut> Iterator for LazyFutureSourceStream<Out, InnerMat, F, Fut>
2658where
2659 Out: Send + 'static,
2660 InnerMat: Send + 'static,
2661 F: Fn() -> Fut,
2662 Fut: Future<Output = StreamResult<Source<Out, InnerMat>>> + Send + 'static,
2663{
2664 type Item = StreamResult<Out>;
2665
2666 fn next(&mut self) -> Option<Self::Item> {
2667 if self.terminated {
2668 return None;
2669 }
2670 if let Err(error) = self.initialize() {
2671 self.terminated = true;
2672 return Some(Err(error));
2673 }
2674 match self
2675 .current
2676 .as_mut()
2677 .expect("lazy_future_source current stream initialized")
2678 .next()
2679 {
2680 Some(Ok(item)) => Some(Ok(item)),
2681 Some(Err(error)) => {
2682 self.terminated = true;
2683 Some(Err(error))
2684 }
2685 None => {
2686 self.terminated = true;
2687 None
2688 }
2689 }
2690 }
2691}
2692
2693impl<Out, InnerMat, F, Fut> Drop for LazyFutureSourceStream<Out, InnerMat, F, Fut> {
2694 fn drop(&mut self) {
2695 if !self.initialized
2696 && let Some(sender) = self.mat_sender.take()
2697 {
2698 let _ = sender.send(Err(StreamError::Failed(
2699 "lazy future source was never materialized".into(),
2700 )));
2701 }
2702 }
2703}
2704
2705fn concat_source_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
2706where
2707 Out: Send + 'static,
2708{
2709 let mut streams: VecDeque<_> = streams.into();
2710 let mut current = streams.pop_front();
2711 Box::new(std::iter::from_fn(move || {
2712 loop {
2713 match current.as_mut() {
2714 Some(stream) => match stream.next() {
2715 Some(item) => return Some(item),
2716 None => current = streams.pop_front(),
2717 },
2718 None => return None,
2719 }
2720 }
2721 }))
2722}
2723
2724fn concat_source_streams_lazy<Out, Mat>(
2725 initial: BoxStream<Out>,
2726 factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
2727 materializer: &Materializer,
2728) -> BoxStream<Out>
2729where
2730 Out: Send + 'static,
2731 Mat: Send + 'static,
2732{
2733 let mut current = Some(initial);
2734 let mut remaining: VecDeque<_> = factories.into();
2735 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
2736 Box::new(std::iter::from_fn(move || {
2737 loop {
2738 match current.as_mut() {
2739 Some(stream) => match stream.next() {
2740 Some(item) => return Some(item),
2741 None => {
2742 current = remaining.pop_front().map(|factory| {
2743 match factory.create(&materializer) {
2744 Ok((stream, _)) => stream,
2745 Err(error) => {
2746 Box::new(std::iter::once(Err(error))) as BoxStream<Out>
2747 }
2748 }
2749 });
2750 }
2751 },
2752 None => return None,
2753 }
2754 }
2755 }))
2756}
2757
2758fn or_else_source_stream<Out>(
2759 mut primary: BoxStream<Out>,
2760 mut secondary: BoxStream<Out>,
2761) -> BoxStream<Out>
2762where
2763 Out: Send + 'static,
2764{
2765 let mut primary_emitted = false;
2766 let mut using_secondary = false;
2767 Box::new(std::iter::from_fn(move || {
2768 loop {
2769 if using_secondary {
2770 return secondary.next();
2771 }
2772
2773 match primary.next() {
2774 Some(Ok(item)) => {
2775 primary_emitted = true;
2776 return Some(Ok(item));
2777 }
2778 Some(Err(error)) => return Some(Err(error)),
2779 None if primary_emitted => return None,
2780 None => using_secondary = true,
2781 }
2782 }
2783 }))
2784}
2785
2786fn interleave_source_streams<Out>(
2787 streams: Vec<BoxStream<Out>>,
2788 segment_size: usize,
2789 eager_close: bool,
2790) -> BoxStream<Out>
2791where
2792 Out: Send + 'static,
2793{
2794 if segment_size == 0 {
2795 return Box::new(std::iter::once(Err(StreamError::GraphValidation(
2796 "interleave segment size must be greater than zero".into(),
2797 ))));
2798 }
2799
2800 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
2801 let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
2802 let mut current = 0usize;
2803 let mut emitted = 0usize;
2804 Box::new(std::iter::from_fn(move || {
2805 loop {
2806 if streams.iter().all(Option::is_none) {
2807 return None;
2808 }
2809 if streams[current].is_none() {
2810 match next_active_source_stream(&streams, current) {
2811 Some(next) => {
2812 current = next;
2813 emitted = 0;
2814 }
2815 None => return None,
2816 }
2817 }
2818
2819 let Some(stream) = streams[current].as_mut() else {
2820 continue;
2821 };
2822 let next_item = pending[current].take().or_else(|| stream.next());
2823 match next_item {
2824 Some(Ok(item)) => {
2825 emitted += 1;
2826 if emitted == segment_size {
2827 emitted = 0;
2828 if let Some(next) = next_active_source_stream(&streams, current) {
2829 current = next;
2830 }
2831 }
2832 return Some(Ok(item));
2833 }
2834 Some(Err(error)) => return Some(Err(error)),
2835 None => {
2836 streams[current] = None;
2837 emitted = 0;
2838 if eager_close {
2839 return None;
2840 }
2841 match next_active_source_stream(&streams, current) {
2842 Some(next) => current = next,
2843 None => return None,
2844 }
2845 }
2846 }
2847 }
2848 }))
2849}
2850
2851fn next_active_source_stream<Out>(
2852 streams: &[Option<BoxStream<Out>>],
2853 current: usize,
2854) -> Option<usize>
2855where
2856 Out: Send + 'static,
2857{
2858 if streams.is_empty() {
2859 return None;
2860 }
2861 for offset in 1..=streams.len() {
2862 let index = (current + offset) % streams.len();
2863 if streams[index].is_some() {
2864 return Some(index);
2865 }
2866 }
2867 None
2868}