1use super::*;
2use crate::Attributes;
3use crate::context::FlowWithContext;
4use crate::stream::error::{decide_supervision, panic_stream_error};
5use futures::{FutureExt, task::noop_waker};
6use std::any::Any;
7use std::task::Context;
8use std::{
9 collections::{HashMap, HashSet},
10 thread,
11};
12
13pub(super) enum FlowTransform<In, Out> {
14 Pure(PureTransform<In, Out>),
15 Runtime(RuntimeTransform<In, Out>),
16}
17
18pub struct Flow<In, Out, Mat = NotUsed> {
19 pub(super) transform: FlowTransform<In, Out>,
20 pub(super) materialize: Arc<dyn Fn() -> StreamResult<Mat> + Send + Sync>,
21 pub(super) hints: FlowHints,
22 pub(super) attributes: Attributes,
23}
24
25#[derive(Clone)]
26pub struct BidiFlow<I1, O1, I2, O2> {
27 top: Flow<I1, O1, NotUsed>,
28 bottom: Flow<I2, O2, NotUsed>,
29 attributes: Attributes,
30}
31
32impl<In, Out> Clone for FlowTransform<In, Out> {
33 fn clone(&self) -> Self {
34 match self {
35 Self::Pure(transform) => Self::Pure(Arc::clone(transform)),
36 Self::Runtime(transform) => Self::Runtime(Arc::clone(transform)),
37 }
38 }
39}
40
41impl<In, Out, Mat> Clone for Flow<In, Out, Mat> {
42 fn clone(&self) -> Self {
43 Self {
44 transform: self.transform.clone(),
45 materialize: Arc::clone(&self.materialize),
46 hints: self.hints,
47 attributes: self.attributes.clone(),
48 }
49 }
50}
51
52fn call_supervised<T, F>(context: &str, f: F) -> StreamResult<T>
53where
54 F: FnOnce() -> StreamResult<T>,
55{
56 catch_unwind(AssertUnwindSafe(f)).unwrap_or_else(|_| Err(panic_stream_error(context)))
57}
58
59impl<T: Send + 'static> Flow<T, T, NotUsed> {
60 #[must_use]
61 pub fn identity() -> Self {
62 Self::from_preserving_transform(|input| input)
63 }
64}
65
66impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out, NotUsed> {
67 pub(crate) fn from_transform<F>(transform: F) -> Self
68 where
69 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
70 {
71 Self::from_parts_with_hints(transform, || Ok(NotUsed), FlowHints::default())
72 }
73
74 pub(crate) fn from_preserving_transform<F>(transform: F) -> Self
75 where
76 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
77 {
78 Self::from_parts_with_hints(
79 transform,
80 || Ok(NotUsed),
81 FlowHints::PRESERVES_INLINE_HEAD_TERMINAL,
82 )
83 }
84
85 pub(crate) fn from_runtime_transform<F>(transform: F) -> Self
86 where
87 F: Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync + 'static,
88 {
89 Self {
90 transform: FlowTransform::Runtime(Arc::new(transform)),
91 materialize: Arc::new(|| Ok(NotUsed)),
92 hints: FlowHints::default(),
93 attributes: Attributes::default(),
94 }
95 }
96
97 #[must_use]
98 pub fn from_sink_and_source<InMat, OutMat>(
99 sink: Sink<In, InMat>,
100 source: Source<Out, OutMat>,
101 ) -> Self
102 where
103 InMat: Send + 'static,
104 OutMat: Send + 'static,
105 {
106 Self::from_runtime_transform(move |input, materializer| {
107 let sink_keepalive = Arc::new(MaterializedKeepalive::default());
108 let sink_input = Box::new(InputKeepaliveStream {
109 inner: input,
110 keepalive: Arc::clone(&sink_keepalive),
111 peer_keepalive: None,
112 });
113 let sink_mat = sink.clone().run(sink_input, materializer)?;
114 sink_keepalive.store(Box::new(sink_mat));
115 let (output, source_mat) = Arc::clone(&source.factory).create(materializer)?;
116 let source_keepalive = Arc::new(MaterializedKeepalive::default());
117 source_keepalive.store(Box::new(source_mat));
118 Ok(Box::new(CoupledStream {
119 inner: output,
120 source_keepalive,
121 sink_keepalive: None,
122 coupled: false,
123 }))
124 })
125 }
126
127 #[must_use]
128 pub fn from_sink_and_source_coupled<InMat, OutMat>(
129 sink: Sink<In, InMat>,
130 source: Source<Out, OutMat>,
131 ) -> Self
132 where
133 InMat: Send + 'static,
134 OutMat: Send + 'static,
135 {
136 Self::from_runtime_transform(move |input, materializer| {
137 let source_keepalive = Arc::new(MaterializedKeepalive::default());
138 let sink_keepalive = Arc::new(MaterializedKeepalive::default());
139 let sink_input = Box::new(InputKeepaliveStream {
140 inner: input,
141 keepalive: Arc::clone(&sink_keepalive),
142 peer_keepalive: Some(Arc::clone(&source_keepalive)),
143 });
144 let sink_mat = sink.clone().run(sink_input, materializer)?;
145 sink_keepalive.store(Box::new(sink_mat));
146 let (output, source_mat) = Arc::clone(&source.factory).create(materializer)?;
147 source_keepalive.store(Box::new(source_mat));
148 Ok(Box::new(CoupledStream {
149 inner: output,
150 source_keepalive,
151 sink_keepalive: Some(sink_keepalive),
152 coupled: true,
153 }))
154 })
155 }
156 #[must_use]
157 pub fn future_flow<InnerMat, F, Fut>(future: F) -> Flow<In, Out, StreamCompletion<InnerMat>>
158 where
159 InnerMat: Send + 'static,
160 F: Fn() -> Fut + Send + Sync + 'static,
161 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
162 {
163 let future = Arc::new(future);
164 Flow::from_runtime_materialized_factory(move || {
165 let (sender, receiver) = oneshot::channel();
166 let sender = Arc::new(Mutex::new(Some(sender)));
167 let future = Arc::clone(&future);
168 let transform: RuntimeTransform<In, Out> =
169 Arc::new(move |input, materializer: &Materializer| {
170 let mat_sender = sender
171 .lock()
172 .expect("future_flow materialized sender poisoned")
173 .take()
174 .ok_or_else(|| {
175 StreamError::Failed("future_flow transform already materialized".into())
176 })?;
177 Ok(Box::new(FutureFlowStream {
178 future: Arc::clone(&future),
179 materializer: materializer
180 .with_name_prefix(materializer.name_prefix().to_owned()),
181 input: Some(input),
182 current: None,
183 mat_sender: Some(mat_sender),
184 initialized: false,
185 terminated: false,
186 _marker: PhantomData,
187 }) as BoxStream<Out>)
188 });
189 (transform, StreamCompletion::from_receiver(receiver, None))
190 })
191 }
192
193 #[must_use]
194 pub fn lazy_flow<InnerMat, F>(create: F) -> Flow<In, Out, StreamCompletion<InnerMat>>
195 where
196 InnerMat: Send + 'static,
197 F: Fn() -> Flow<In, Out, InnerMat> + Send + Sync + 'static,
198 {
199 let create = Arc::new(create);
200 Self::lazy_future_flow(move || {
201 let create = Arc::clone(&create);
202 async move { catch_unwind_failed("lazy_flow factory", || create()) }
203 })
204 }
205
206 #[must_use]
207 pub fn lazy_future_flow<InnerMat, F, Fut>(
208 create: F,
209 ) -> Flow<In, Out, StreamCompletion<InnerMat>>
210 where
211 InnerMat: Send + 'static,
212 F: Fn() -> Fut + Send + Sync + 'static,
213 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
214 {
215 let create = Arc::new(create);
216 Flow::from_runtime_materialized_factory(move || {
217 let (sender, receiver) = oneshot::channel();
218 let sender = Arc::new(Mutex::new(Some(sender)));
219 let create = Arc::clone(&create);
220 let transform: RuntimeTransform<In, Out> =
221 Arc::new(move |input, materializer: &Materializer| {
222 let mat_sender = sender
223 .lock()
224 .expect("lazy_future_flow materialized sender poisoned")
225 .take()
226 .ok_or_else(|| {
227 StreamError::Failed(
228 "lazy_future_flow transform already materialized".into(),
229 )
230 })?;
231 Ok(Box::new(LazyFutureFlowStream {
232 create: Arc::clone(&create),
233 materializer: materializer
234 .with_name_prefix(materializer.name_prefix().to_owned()),
235 input: Some(input),
236 current: None,
237 mat_sender: Some(mat_sender),
238 initialized: false,
239 terminated: false,
240 _marker: PhantomData,
241 }) as BoxStream<Out>)
242 });
243 (transform, StreamCompletion::from_receiver(receiver, None))
244 })
245 }
246}
247
248#[derive(Default)]
249struct MaterializedKeepalive {
250 released: AtomicBool,
251 value: Mutex<Option<Box<dyn Any + Send>>>,
252}
253
254impl MaterializedKeepalive {
255 fn store(&self, value: Box<dyn Any + Send>) {
256 let mut slot = self.value.lock().expect("materialized keepalive poisoned");
257 if !self.released.load(Ordering::SeqCst) {
258 *slot = Some(value);
259 return;
260 }
261 drop(slot);
266 release_materialized_value(value);
267 }
268
269 fn release(&self) {
270 self.released.store(true, Ordering::SeqCst);
271 if let Some(value) = self
272 .value
273 .lock()
274 .expect("materialized keepalive poisoned")
275 .take()
276 {
277 release_materialized_value(value);
278 }
279 }
280
281 fn is_released(&self) -> bool {
282 self.released.load(Ordering::SeqCst)
283 }
284}
285
286fn release_materialized_value(value: Box<dyn Any + Send>) {
287 match value.downcast::<Cancellable>() {
288 Ok(cancellable) => {
289 cancellable.cancel();
290 }
291 Err(value) => drop(value),
292 }
293}
294
295struct InputKeepaliveStream<In> {
296 inner: BoxStream<In>,
297 keepalive: Arc<MaterializedKeepalive>,
298 peer_keepalive: Option<Arc<MaterializedKeepalive>>,
299}
300
301impl<In> Iterator for InputKeepaliveStream<In> {
302 type Item = StreamResult<In>;
303
304 fn next(&mut self) -> Option<Self::Item> {
305 self.inner.next()
306 }
307}
308
309impl<In> Drop for InputKeepaliveStream<In> {
310 fn drop(&mut self) {
311 self.keepalive.release();
312 if let Some(peer_keepalive) = &self.peer_keepalive {
313 peer_keepalive.release();
314 }
315 }
316}
317
318struct CoupledStream<Out> {
319 inner: BoxStream<Out>,
320 source_keepalive: Arc<MaterializedKeepalive>,
321 sink_keepalive: Option<Arc<MaterializedKeepalive>>,
322 coupled: bool,
323}
324
325impl<Out> Iterator for CoupledStream<Out> {
326 type Item = StreamResult<Out>;
327
328 fn next(&mut self) -> Option<Self::Item> {
329 if self.coupled && self.source_keepalive.is_released() {
330 return None;
331 }
332 let next = self.inner.next();
333 if next.is_none() || next.as_ref().is_some_and(|item| item.is_err()) {
334 self.source_keepalive.release();
335 if self.coupled
336 && let Some(sink_keepalive) = &self.sink_keepalive
337 {
338 sink_keepalive.release();
339 }
340 }
341 next
342 }
343}
344
345impl<Out> Drop for CoupledStream<Out> {
346 fn drop(&mut self) {
347 self.source_keepalive.release();
348 if self.coupled
349 && let Some(sink_keepalive) = &self.sink_keepalive
350 {
351 sink_keepalive.release();
352 }
353 }
354}
355
356impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
357 pub fn as_flow_with_context<U, CtxIn, CtxOut, Collapse, Extract>(
358 self,
359 collapse_context: Collapse,
360 extract_context: Extract,
361 ) -> FlowWithContext<U, CtxIn, Out, CtxOut, Mat>
362 where
363 U: Send + 'static,
364 CtxIn: Send + 'static,
365 CtxOut: Send + 'static,
366 Collapse: Fn(U, CtxIn) -> In + Send + Sync + 'static,
367 Extract: Fn(&Out) -> CtxOut + Send + Sync + 'static,
368 {
369 FlowWithContext::from_flow(
370 Flow::identity()
371 .map(move |(value, context)| collapse_context(value, context))
372 .via_mat(self, |_, flow_mat| flow_mat)
373 .map(move |value| {
374 let context = extract_context(&value);
375 (value, context)
376 }),
377 )
378 }
379
380 pub(crate) fn from_parts<F, M>(transform: F, materialize: M) -> Self
381 where
382 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
383 M: Fn() -> StreamResult<Mat> + Send + Sync + 'static,
384 {
385 Self::from_parts_with_hints(transform, materialize, FlowHints::default())
386 }
387
388 pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
389 where
390 F: Fn() -> (PureTransform<In, Out>, Mat) + Send + Sync + 'static,
391 {
392 struct PendingSlot<In, Out, Mat> {
393 transform: Option<PureTransform<In, Out>>,
394 mat: Option<Mat>,
395 transform_taken: bool,
396 mat_taken: bool,
397 }
398
399 let pending = Arc::new(Mutex::new(HashMap::<
400 thread::ThreadId,
401 Vec<PendingSlot<In, Out, Mat>>,
402 >::new()));
403 let factory = Arc::new(factory);
404
405 let transform = {
406 let pending = Arc::clone(&pending);
407 let factory = Arc::clone(&factory);
408 move |input| {
409 let transform = {
410 let mut pending = pending.lock().expect("flow materialized factory poisoned");
411 let thread_id = thread::current().id();
412 let slots = pending.entry(thread_id).or_default();
413 let index = slots
414 .iter()
415 .position(|slot| !slot.transform_taken && slot.mat_taken)
416 .unwrap_or_else(|| {
417 let (transform, mat) = factory();
418 slots.push(PendingSlot {
419 transform: Some(transform),
420 mat: Some(mat),
421 transform_taken: false,
422 mat_taken: false,
423 });
424 slots.len() - 1
425 });
426 let slot = slots
427 .get_mut(index)
428 .expect("pending flow materialization slot exists");
429 slot.transform_taken = true;
430 let transform = slot
431 .transform
432 .take()
433 .expect("pending flow transform present");
434 if slot.transform_taken && slot.mat_taken {
435 slots.remove(index);
436 }
437 if slots.is_empty() {
438 pending.remove(&thread_id);
439 }
440 transform
441 };
442 transform(input)
443 }
444 };
445
446 let materialize = {
447 let pending = Arc::clone(&pending);
448 let factory = Arc::clone(&factory);
449 move || {
450 let mat = {
451 let mut pending = pending.lock().expect("flow materialized factory poisoned");
452 let thread_id = thread::current().id();
453 let slots = pending.entry(thread_id).or_default();
454 let index = slots
455 .iter()
456 .position(|slot| !slot.mat_taken && slot.transform_taken)
457 .unwrap_or_else(|| {
458 let (transform, mat) = factory();
459 slots.push(PendingSlot {
460 transform: Some(transform),
461 mat: Some(mat),
462 transform_taken: false,
463 mat_taken: false,
464 });
465 slots.len() - 1
466 });
467 let slot = slots
468 .get_mut(index)
469 .expect("pending flow materialization slot exists");
470 slot.mat_taken = true;
471 let mat = slot
472 .mat
473 .take()
474 .expect("pending flow materialized value present");
475 if slot.transform_taken && slot.mat_taken {
476 slots.remove(index);
477 }
478 if slots.is_empty() {
479 pending.remove(&thread_id);
480 }
481 mat
482 };
483 Ok(mat)
484 }
485 };
486
487 Self::from_parts_with_hints(transform, materialize, FlowHints::default())
488 }
489
490 pub(crate) fn from_runtime_materialized_factory<F>(factory: F) -> Self
491 where
492 F: Fn() -> (RuntimeTransform<In, Out>, Mat) + Send + Sync + 'static,
493 {
494 struct PendingSlot<In, Out, Mat> {
495 transform: Option<RuntimeTransform<In, Out>>,
496 mat: Option<Mat>,
497 transform_taken: bool,
498 mat_taken: bool,
499 }
500
501 let pending = Arc::new(Mutex::new(HashMap::<
502 thread::ThreadId,
503 Vec<PendingSlot<In, Out, Mat>>,
504 >::new()));
505 let factory = Arc::new(factory);
506
507 let transform = {
508 let pending = Arc::clone(&pending);
509 let factory = Arc::clone(&factory);
510 move |input, materializer: &Materializer| {
511 let thread_id = thread::current().id();
512 let transform = {
513 let mut pending = pending.lock().expect("flow materialized factory poisoned");
514 let slots = pending.entry(thread_id).or_default();
515 if let Some(index) = slots
516 .iter()
517 .position(|slot| !slot.transform_taken && slot.mat_taken)
518 {
519 let slot = slots
520 .get_mut(index)
521 .expect("pending flow materialization slot exists");
522 slot.transform_taken = true;
523 let transform = slot
524 .transform
525 .take()
526 .expect("pending flow transform present");
527 if slot.transform_taken && slot.mat_taken {
528 slots.remove(index);
529 }
530 if slots.is_empty() {
531 pending.remove(&thread_id);
532 }
533 Some(transform)
534 } else {
535 None
536 }
537 };
538
539 let transform = match transform {
540 Some(transform) => transform,
541 None => {
542 let (transform, mat) = factory();
543 let mut pending =
544 pending.lock().expect("flow materialized factory poisoned");
545 let slots = pending.entry(thread_id).or_default();
546 slots.push(PendingSlot {
547 transform: None,
548 mat: Some(mat),
549 transform_taken: true,
550 mat_taken: false,
551 });
552 transform
553 }
554 };
555
556 transform(input, materializer)
557 }
558 };
559
560 let materialize = {
561 let pending = Arc::clone(&pending);
562 let factory = Arc::clone(&factory);
563 move || {
564 let thread_id = thread::current().id();
565 let mat = {
566 let mut pending = pending.lock().expect("flow materialized factory poisoned");
567 let slots = pending.entry(thread_id).or_default();
568 if let Some(index) = slots
569 .iter()
570 .position(|slot| !slot.mat_taken && slot.transform_taken)
571 {
572 let slot = slots
573 .get_mut(index)
574 .expect("pending flow materialization slot exists");
575 slot.mat_taken = true;
576 let mat = slot
577 .mat
578 .take()
579 .expect("pending flow materialized value present");
580 if slot.transform_taken && slot.mat_taken {
581 slots.remove(index);
582 }
583 if slots.is_empty() {
584 pending.remove(&thread_id);
585 }
586 Some(mat)
587 } else {
588 None
589 }
590 };
591
592 match mat {
593 Some(mat) => Ok(mat),
594 None => {
595 let (transform, mat) = factory();
596 let mut pending =
597 pending.lock().expect("flow materialized factory poisoned");
598 let slots = pending.entry(thread_id).or_default();
599 slots.push(PendingSlot {
600 transform: Some(transform),
601 mat: None,
602 transform_taken: false,
603 mat_taken: true,
604 });
605 Ok(mat)
606 }
607 }
608 }
609 };
610
611 Flow {
612 transform: FlowTransform::Runtime(Arc::new(transform)),
613 materialize: Arc::new(materialize),
614 hints: FlowHints::default(),
615 attributes: Attributes::default(),
616 }
617 }
618
619 fn from_parts_with_hints<F, M>(transform: F, materialize: M, hints: FlowHints) -> Self
620 where
621 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
622 M: Fn() -> StreamResult<Mat> + Send + Sync + 'static,
623 {
624 Self {
625 transform: FlowTransform::Pure(Arc::new(transform)),
626 materialize: Arc::new(materialize),
627 hints,
628 attributes: Attributes::default(),
629 }
630 }
631
632 #[must_use]
633 pub fn attributes(&self) -> &Attributes {
634 &self.attributes
635 }
636
637 #[must_use]
638 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
639 self.attributes = attributes;
640 self
641 }
642
643 #[must_use]
644 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
645 self.attributes = self.attributes.and(attributes);
646 self
647 }
648
649 #[must_use]
650 pub fn named(self, name: impl Into<String>) -> Self {
651 self.add_attributes(Attributes::named(name))
652 }
653
654 #[must_use]
655 pub fn via<Next, NextMat>(self, next: Flow<Out, Next, NextMat>) -> Flow<In, Next, Mat>
656 where
657 Next: Send + 'static,
658 NextMat: Send + 'static,
659 {
660 self.via_mat(next, Keep::left)
661 }
662
663 #[must_use]
664 pub fn via_mat<Next, NextMat, Combined, F>(
665 self,
666 next: Flow<Out, Next, NextMat>,
667 combine: F,
668 ) -> Flow<In, Next, Combined>
669 where
670 Next: Send + 'static,
671 NextMat: Send + 'static,
672 Combined: Send + 'static,
673 F: Fn(Mat, NextMat) -> Combined + Send + Sync + 'static,
674 {
675 let Flow {
676 transform: first,
677 materialize: materialize_first,
678 hints: first_hints,
679 attributes: first_attributes,
680 } = self;
681 let Flow {
682 transform: second,
683 materialize: materialize_second,
684 hints: second_hints,
685 attributes: second_attributes,
686 } = next;
687 let combine = Arc::new(combine);
688 match (first, second) {
689 (FlowTransform::Pure(first), FlowTransform::Pure(second)) => {
690 let hints = first_hints.then(second_hints);
691 Flow::from_parts_with_hints(
692 move |input| second(first(input)),
693 move || {
694 let left = materialize_first()?;
695 let right = materialize_second()?;
696 Ok(combine(left, right))
697 },
698 hints,
699 )
700 .with_attributes(first_attributes.and(second_attributes))
701 }
702 (first, second) => {
703 let hints = FlowHints::default();
704 Flow {
705 transform: FlowTransform::Runtime(Arc::new(move |input, materializer| {
706 let stream = match &first {
707 FlowTransform::Pure(first) => first(input),
708 FlowTransform::Runtime(first) => first(input, materializer)?,
709 };
710 match &second {
711 FlowTransform::Pure(second) => Ok(second(stream)),
712 FlowTransform::Runtime(second) => second(stream, materializer),
713 }
714 })),
715 materialize: Arc::new(move || {
716 let left = materialize_first()?;
717 let right = materialize_second()?;
718 Ok(combine(left, right))
719 }),
720 hints,
721 attributes: first_attributes.and(second_attributes),
722 }
723 }
724 }
725 }
726
727 #[must_use]
728 pub fn via_mat_with<Next, NextMat, Combined, F>(
729 self,
730 next: Flow<Out, Next, NextMat>,
731 combine: F,
732 ) -> Flow<In, Next, Combined>
733 where
734 Next: Send + 'static,
735 NextMat: Send + 'static,
736 Combined: Send + 'static,
737 F: Fn(Mat, NextMat) -> Combined + Send + Sync + 'static,
738 {
739 self.via_mat(next, combine)
740 }
741
742 #[must_use]
743 pub fn map<Next, F>(self, f: F) -> Flow<In, Next, Mat>
744 where
745 Next: Send + 'static,
746 F: Fn(Out) -> Next + Send + Sync + 'static,
747 {
748 let stage = Arc::new(f);
749 match &self.transform {
750 FlowTransform::Pure(_) => {
751 let Flow {
752 transform,
753 materialize,
754 hints,
755 attributes,
756 } = self;
757 let FlowTransform::Pure(transform) = transform else {
758 unreachable!("pure transform checked above");
759 };
760 Flow::from_parts_with_hints(
761 move |input| {
762 let stage = Arc::clone(&stage);
763 Box::new(transform(input).map(move |item| item.map(|item| stage(item))))
764 },
765 move || materialize(),
766 hints,
767 )
768 .with_attributes(attributes)
769 }
770 FlowTransform::Runtime(_) => self.via(Flow::from_transform(move |input| {
771 let stage = Arc::clone(&stage);
772 Box::new(input.map(move |item| item.map(|item| stage(item))))
773 })),
774 }
775 }
776
777 #[must_use]
780 pub fn map_result<Next, F>(self, f: F) -> Flow<In, Next, Mat>
781 where
782 Next: Send + 'static,
783 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
784 {
785 let stage = Arc::new(f);
786 self.via(Flow::from_transform(move |input| {
787 let stage = Arc::clone(&stage);
788 Box::new(input.map(move |item| item.and_then(|item| stage(item))))
789 }))
790 }
791
792 #[must_use]
797 pub fn map_result_with_supervision<Next, F>(
798 self,
799 f: F,
800 decider: SupervisionDecider,
801 ) -> Flow<In, Next, Mat>
802 where
803 Next: Send + 'static,
804 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
805 {
806 let stage = Arc::new(f);
807 self.via(Flow::from_transform(move |input| {
808 let stage = Arc::clone(&stage);
809 let decider = Arc::clone(&decider);
810 Box::new(input.filter_map(move |item| match item {
811 Ok(item) => match call_supervised("map_result callback", || stage(item)) {
812 Ok(next) => Some(Ok(next)),
813 Err(error) => match decide_supervision(&decider, &error) {
814 SupervisionDirective::Stop => Some(Err(error)),
815 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
816 },
817 },
818 Err(error) => Some(Err(error)),
819 }))
820 }))
821 }
822
823 #[must_use]
824 pub fn filter<F>(self, predicate: F) -> Flow<In, Out, Mat>
825 where
826 F: Fn(&Out) -> bool + Send + Sync + 'static,
827 {
828 let predicate = Arc::new(predicate);
829 self.via(Flow::from_preserving_transform(move |input| {
830 let predicate = Arc::clone(&predicate);
831 Box::new(input.filter_map(move |item| match item {
832 Ok(item) if predicate(&item) => Some(Ok(item)),
833 Ok(_) => None,
834 Err(error) => Some(Err(error)),
835 }))
836 }))
837 }
838
839 #[must_use]
840 pub fn filter_result<F>(self, predicate: F) -> Flow<In, Out, Mat>
841 where
842 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
843 {
844 let predicate = Arc::new(predicate);
845 self.via(Flow::from_transform(move |input| {
846 let predicate = Arc::clone(&predicate);
847 Box::new(input.filter_map(move |item| match item {
848 Ok(item) => match predicate(&item) {
849 Ok(true) => Some(Ok(item)),
850 Ok(false) => None,
851 Err(error) => Some(Err(error)),
852 },
853 Err(error) => Some(Err(error)),
854 }))
855 }))
856 }
857
858 #[must_use]
859 pub fn filter_result_with_supervision<F>(
860 self,
861 predicate: F,
862 decider: SupervisionDecider,
863 ) -> Flow<In, Out, Mat>
864 where
865 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
866 {
867 let predicate = Arc::new(predicate);
868 self.via(Flow::from_transform(move |input| {
869 let predicate = Arc::clone(&predicate);
870 let decider = Arc::clone(&decider);
871 Box::new(input.filter_map(move |item| match item {
872 Ok(item) => match call_supervised("filter_result callback", || predicate(&item)) {
873 Ok(true) => Some(Ok(item)),
874 Ok(false) => None,
875 Err(error) => match decide_supervision(&decider, &error) {
876 SupervisionDirective::Stop => Some(Err(error)),
877 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
878 },
879 },
880 Err(error) => Some(Err(error)),
881 }))
882 }))
883 }
884
885 #[must_use]
886 pub fn filter_not<F>(self, predicate: F) -> Flow<In, Out, Mat>
887 where
888 F: Fn(&Out) -> bool + Send + Sync + 'static,
889 {
890 let predicate = Arc::new(predicate);
891 self.filter(move |item| !predicate(item))
892 }
893
894 #[must_use]
895 pub fn filter_map<Next, F>(self, f: F) -> Flow<In, Next, Mat>
896 where
897 Next: Send + 'static,
898 F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
899 {
900 let stage = Arc::new(f);
901 self.via(Flow::from_transform(move |input| {
902 let stage = Arc::clone(&stage);
903 Box::new(input.filter_map(move |item| match item {
904 Ok(item) => stage(item).map(Ok),
905 Err(error) => Some(Err(error)),
906 }))
907 }))
908 }
909
910 #[must_use]
911 pub fn filter_map_result<Next, F>(self, f: F) -> Flow<In, Next, Mat>
912 where
913 Next: Send + 'static,
914 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
915 {
916 let stage = Arc::new(f);
917 self.via(Flow::from_transform(move |input| {
918 let stage = Arc::clone(&stage);
919 Box::new(input.filter_map(move |item| match item {
920 Ok(item) => match stage(item) {
921 Ok(Some(next)) => Some(Ok(next)),
922 Ok(None) => None,
923 Err(error) => Some(Err(error)),
924 },
925 Err(error) => Some(Err(error)),
926 }))
927 }))
928 }
929
930 #[must_use]
931 pub fn filter_map_result_with_supervision<Next, F>(
932 self,
933 f: F,
934 decider: SupervisionDecider,
935 ) -> Flow<In, Next, Mat>
936 where
937 Next: Send + 'static,
938 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
939 {
940 let stage = Arc::new(f);
941 self.via(Flow::from_transform(move |input| {
942 let stage = Arc::clone(&stage);
943 let decider = Arc::clone(&decider);
944 Box::new(input.filter_map(move |item| match item {
945 Ok(item) => match call_supervised("filter_map_result callback", || stage(item)) {
946 Ok(Some(next)) => Some(Ok(next)),
947 Ok(None) => None,
948 Err(error) => match decide_supervision(&decider, &error) {
949 SupervisionDirective::Stop => Some(Err(error)),
950 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
951 },
952 },
953 Err(error) => Some(Err(error)),
954 }))
955 }))
956 }
957
958 #[must_use]
959 pub fn map_concat<Next, F, I>(self, f: F) -> Flow<In, Next, Mat>
960 where
961 Next: Send + 'static,
962 F: Fn(Out) -> I + Send + Sync + 'static,
963 I: IntoIterator<Item = Next>,
964 I::IntoIter: Send + 'static,
965 {
966 let stage = Arc::new(f);
967 self.via(Flow::from_transform(move |mut input| {
968 let stage = Arc::clone(&stage);
969 let mut current = None::<I::IntoIter>;
970 let mut done = false;
971 Box::new(std::iter::from_fn(move || {
972 loop {
973 if let Some(iter) = &mut current {
974 if let Some(item) = iter.next() {
975 return Some(Ok(item));
976 }
977 current = None;
978 }
979
980 if done {
981 return None;
982 }
983
984 match input.next()? {
985 Ok(item) => current = Some(stage(item).into_iter()),
986 Err(error) => {
987 done = true;
988 return Some(Err(error));
989 }
990 }
991 }
992 }))
993 }))
994 }
995
996 #[must_use]
997 pub fn map_concat_result<Next, F, I>(self, f: F) -> Flow<In, Next, Mat>
998 where
999 Next: Send + 'static,
1000 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
1001 I: IntoIterator<Item = Next>,
1002 I::IntoIter: Send + 'static,
1003 {
1004 let stage = Arc::new(f);
1005 self.via(Flow::from_transform(move |mut input| {
1006 let stage = Arc::clone(&stage);
1007 let mut current = None::<I::IntoIter>;
1008 let mut done = false;
1009 Box::new(std::iter::from_fn(move || {
1010 loop {
1011 if let Some(iter) = &mut current {
1012 if let Some(item) = iter.next() {
1013 return Some(Ok(item));
1014 }
1015 current = None;
1016 }
1017
1018 if done {
1019 return None;
1020 }
1021
1022 match input.next()? {
1023 Ok(item) => match stage(item) {
1024 Ok(items) => current = Some(items.into_iter()),
1025 Err(error) => {
1026 done = true;
1027 return Some(Err(error));
1028 }
1029 },
1030 Err(error) => {
1031 done = true;
1032 return Some(Err(error));
1033 }
1034 }
1035 }
1036 }))
1037 }))
1038 }
1039
1040 #[must_use]
1041 pub fn map_concat_result_with_supervision<Next, F, I>(
1042 self,
1043 f: F,
1044 decider: SupervisionDecider,
1045 ) -> Flow<In, Next, Mat>
1046 where
1047 Next: Send + 'static,
1048 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
1049 I: IntoIterator<Item = Next>,
1050 I::IntoIter: Send + 'static,
1051 {
1052 let stage = Arc::new(f);
1053 self.via(Flow::from_transform(move |mut input| {
1054 let stage = Arc::clone(&stage);
1055 let decider = Arc::clone(&decider);
1056 let mut current = None::<I::IntoIter>;
1057 let mut done = false;
1058 Box::new(std::iter::from_fn(move || {
1059 loop {
1060 if let Some(iter) = &mut current {
1061 if let Some(item) = iter.next() {
1062 return Some(Ok(item));
1063 }
1064 current = None;
1065 }
1066
1067 if done {
1068 return None;
1069 }
1070
1071 match input.next()? {
1072 Ok(item) => {
1073 match call_supervised("map_concat_result callback", || stage(item)) {
1074 Ok(items) => current = Some(items.into_iter()),
1075 Err(error) => match decide_supervision(&decider, &error) {
1076 SupervisionDirective::Stop => {
1077 done = true;
1078 return Some(Err(error));
1079 }
1080 SupervisionDirective::Resume
1081 | SupervisionDirective::Restart => {}
1082 },
1083 }
1084 }
1085 Err(error) => {
1086 done = true;
1087 return Some(Err(error));
1088 }
1089 }
1090 }
1091 }))
1092 }))
1093 }
1094
1095 #[must_use]
1096 pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Flow<In, Next, Mat>
1097 where
1098 State: Clone + Send + Sync + 'static,
1099 Next: Send + 'static,
1100 F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
1101 {
1102 let stage = Arc::new(f);
1103 self.via(Flow::from_transform(move |input| {
1104 let stage = Arc::clone(&stage);
1105 let mut state = seed.clone();
1106 Box::new(input.map(move |item| item.map(|item| stage(&mut state, item))))
1107 }))
1108 }
1109
1110 #[must_use]
1111 pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Flow<In, Next, Mat>
1112 where
1113 State: Clone + Send + Sync + 'static,
1114 Next: Send + 'static,
1115 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
1116 {
1117 let stage = Arc::new(f);
1118 self.via(Flow::from_transform(move |input| {
1119 let stage = Arc::clone(&stage);
1120 let mut state = seed.clone();
1121 Box::new(input.map(move |item| match item {
1122 Ok(item) => stage(&mut state, item),
1123 Err(error) => Err(error),
1124 }))
1125 }))
1126 }
1127
1128 #[must_use]
1129 pub fn stateful_map_result_with_supervision<State, Next, F>(
1130 self,
1131 seed: State,
1132 f: F,
1133 decider: SupervisionDecider,
1134 ) -> Flow<In, Next, Mat>
1135 where
1136 State: Clone + Send + Sync + 'static,
1137 Next: Send + 'static,
1138 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
1139 {
1140 let stage = Arc::new(f);
1141 self.via(Flow::from_transform(move |input| {
1142 let stage = Arc::clone(&stage);
1143 let decider = Arc::clone(&decider);
1144 let seed = seed.clone();
1145 let mut state = seed.clone();
1146 Box::new(input.filter_map(move |item| match item {
1147 Ok(item) => match call_supervised("stateful_map_result callback", || {
1148 stage(&mut state, item)
1149 }) {
1150 Ok(next) => Some(Ok(next)),
1151 Err(error) => match decide_supervision(&decider, &error) {
1152 SupervisionDirective::Stop => Some(Err(error)),
1153 SupervisionDirective::Resume => None,
1154 SupervisionDirective::Restart => {
1155 state = seed.clone();
1156 None
1157 }
1158 },
1159 },
1160 Err(error) => Some(Err(error)),
1161 }))
1162 }))
1163 }
1164
1165 #[must_use]
1166 pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Flow<In, Next, Mat>
1167 where
1168 State: Clone + Send + Sync + 'static,
1169 Next: Send + 'static,
1170 F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
1171 I: IntoIterator<Item = Next>,
1172 I::IntoIter: Send + 'static,
1173 {
1174 let stage = Arc::new(f);
1175 self.via(Flow::from_transform(move |mut input| {
1176 let stage = Arc::clone(&stage);
1177 let mut state = seed.clone();
1178 let mut current = None::<I::IntoIter>;
1179 let mut done = false;
1180 Box::new(std::iter::from_fn(move || {
1181 loop {
1182 if let Some(iter) = &mut current {
1183 if let Some(item) = iter.next() {
1184 return Some(Ok(item));
1185 }
1186 current = None;
1187 }
1188
1189 if done {
1190 return None;
1191 }
1192
1193 match input.next()? {
1194 Ok(item) => current = Some(stage(&mut state, item).into_iter()),
1195 Err(error) => {
1196 done = true;
1197 return Some(Err(error));
1198 }
1199 }
1200 }
1201 }))
1202 }))
1203 }
1204
1205 #[must_use]
1206 pub fn stateful_map_concat_result<State, Next, F, I>(
1207 self,
1208 seed: State,
1209 f: F,
1210 ) -> Flow<In, Next, Mat>
1211 where
1212 State: Clone + Send + Sync + 'static,
1213 Next: Send + 'static,
1214 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1215 I: IntoIterator<Item = Next>,
1216 I::IntoIter: Send + 'static,
1217 {
1218 let stage = Arc::new(f);
1219 self.via(Flow::from_transform(move |mut input| {
1220 let stage = Arc::clone(&stage);
1221 let mut state = seed.clone();
1222 let mut current = None::<I::IntoIter>;
1223 let mut done = false;
1224 Box::new(std::iter::from_fn(move || {
1225 loop {
1226 if let Some(iter) = &mut current {
1227 if let Some(item) = iter.next() {
1228 return Some(Ok(item));
1229 }
1230 current = None;
1231 }
1232
1233 if done {
1234 return None;
1235 }
1236
1237 match input.next()? {
1238 Ok(item) => match stage(&mut state, item) {
1239 Ok(items) => current = Some(items.into_iter()),
1240 Err(error) => {
1241 done = true;
1242 return Some(Err(error));
1243 }
1244 },
1245 Err(error) => {
1246 done = true;
1247 return Some(Err(error));
1248 }
1249 }
1250 }
1251 }))
1252 }))
1253 }
1254
1255 #[must_use]
1256 pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
1257 self,
1258 seed: State,
1259 f: F,
1260 decider: SupervisionDecider,
1261 ) -> Flow<In, Next, Mat>
1262 where
1263 State: Clone + Send + Sync + 'static,
1264 Next: Send + 'static,
1265 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1266 I: IntoIterator<Item = Next>,
1267 I::IntoIter: Send + 'static,
1268 {
1269 let stage = Arc::new(f);
1270 self.via(Flow::from_transform(move |mut input| {
1271 let stage = Arc::clone(&stage);
1272 let decider = Arc::clone(&decider);
1273 let seed = seed.clone();
1274 let mut state = seed.clone();
1275 let mut current = None::<I::IntoIter>;
1276 let mut done = false;
1277 Box::new(std::iter::from_fn(move || {
1278 loop {
1279 if let Some(iter) = &mut current {
1280 if let Some(item) = iter.next() {
1281 return Some(Ok(item));
1282 }
1283 current = None;
1284 }
1285
1286 if done {
1287 return None;
1288 }
1289
1290 match input.next()? {
1291 Ok(item) => {
1292 match call_supervised("stateful_map_concat_result callback", || {
1293 stage(&mut state, item)
1294 }) {
1295 Ok(items) => current = Some(items.into_iter()),
1296 Err(error) => match decide_supervision(&decider, &error) {
1297 SupervisionDirective::Stop => {
1298 done = true;
1299 return Some(Err(error));
1300 }
1301 SupervisionDirective::Resume => {}
1302 SupervisionDirective::Restart => state = seed.clone(),
1303 },
1304 }
1305 }
1306 Err(error) => {
1307 done = true;
1308 return Some(Err(error));
1309 }
1310 }
1311 }
1312 }))
1313 }))
1314 }
1315
1316 #[must_use]
1317 pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Flow<In, Next, Mat>
1321 where
1322 Next: Send + 'static,
1323 F: Fn(Out) -> Fut + Send + Sync + 'static,
1324 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1325 {
1326 assert!(
1327 parallelism > 0,
1328 "map_async parallelism must be greater than zero"
1329 );
1330 let stage = Arc::new(f);
1331 self.via(Flow::from_runtime_transform(move |input, _materializer| {
1332 let stage = Arc::clone(&stage);
1333 Ok(map_async_ordered(input, parallelism, stage))
1334 }))
1335 }
1336
1337 #[must_use]
1338 pub fn map_async_with_supervision<Next, F, Fut>(
1343 self,
1344 parallelism: usize,
1345 f: F,
1346 decider: SupervisionDecider,
1347 ) -> Flow<In, Next, Mat>
1348 where
1349 Next: Send + 'static,
1350 F: Fn(Out) -> Fut + Send + Sync + 'static,
1351 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1352 {
1353 assert!(
1354 parallelism > 0,
1355 "map_async parallelism must be greater than zero"
1356 );
1357 let stage = Arc::new(f);
1358 self.via(Flow::from_runtime_transform(move |input, _materializer| {
1359 let stage = Arc::clone(&stage);
1360 let decider = Arc::clone(&decider);
1361 Ok(map_async_ordered_supervised(
1362 input,
1363 parallelism,
1364 stage,
1365 decider,
1366 ))
1367 }))
1368 }
1369
1370 #[must_use]
1371 pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Flow<In, Next, Mat>
1375 where
1376 Next: Send + 'static,
1377 F: Fn(Out) -> Fut + Send + Sync + 'static,
1378 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1379 {
1380 assert!(
1381 parallelism > 0,
1382 "map_async_unordered parallelism must be greater than zero"
1383 );
1384 let stage = Arc::new(f);
1385 self.via(Flow::from_runtime_transform(move |input, _materializer| {
1386 let stage = Arc::clone(&stage);
1387 Ok(map_async_unordered(input, parallelism, stage))
1388 }))
1389 }
1390
1391 #[must_use]
1392 pub fn map_async_unordered_with_supervision<Next, F, Fut>(
1393 self,
1394 parallelism: usize,
1395 f: F,
1396 decider: SupervisionDecider,
1397 ) -> Flow<In, Next, Mat>
1398 where
1399 Next: Send + 'static,
1400 F: Fn(Out) -> Fut + Send + Sync + 'static,
1401 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1402 {
1403 assert!(
1404 parallelism > 0,
1405 "map_async_unordered parallelism must be greater than zero"
1406 );
1407 let stage = Arc::new(f);
1408 self.via(Flow::from_runtime_transform(move |input, _materializer| {
1409 let stage = Arc::clone(&stage);
1410 let decider = Arc::clone(&decider);
1411 Ok(map_async_unordered_supervised(
1412 input,
1413 parallelism,
1414 stage,
1415 decider,
1416 ))
1417 }))
1418 }
1419
1420 #[must_use]
1421 pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
1425 self,
1426 parallelism: usize,
1427 per_partition: usize,
1428 partition: Partition,
1429 f: F,
1430 ) -> Flow<In, Next, Mat>
1431 where
1432 Key: Clone + Eq + Hash + Send + 'static,
1433 Next: Send + 'static,
1434 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
1435 F: Fn(Out) -> Fut + Send + Sync + 'static,
1436 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1437 {
1438 assert!(
1439 parallelism > 0,
1440 "map_async_partitioned parallelism must be greater than zero"
1441 );
1442 assert!(
1443 per_partition > 0,
1444 "map_async_partitioned per_partition must be greater than zero"
1445 );
1446 let partition = Arc::new(partition);
1447 let stage = Arc::new(f);
1448 self.via(Flow::from_runtime_transform(move |input, _materializer| {
1449 let partition = Arc::clone(&partition);
1450 let stage = Arc::clone(&stage);
1451 Ok(map_async_partitioned(
1452 input,
1453 parallelism,
1454 per_partition,
1455 partition,
1456 stage,
1457 ))
1458 }))
1459 }
1460
1461 #[must_use]
1462 pub fn take(self, n: usize) -> Flow<In, Out, Mat> {
1463 self.via(Flow::from_transform(move |input| Box::new(input.take(n))))
1464 }
1465
1466 #[must_use]
1467 pub fn drop(self, n: usize) -> Flow<In, Out, Mat> {
1468 self.via(Flow::from_transform(move |input| {
1469 let mut remaining = n;
1470 Box::new(input.filter_map(move |item| match item {
1471 Ok(_) if remaining > 0 => {
1472 remaining -= 1;
1473 None
1474 }
1475 other => Some(other),
1476 }))
1477 }))
1478 }
1479
1480 #[must_use]
1481 pub fn take_while<F>(self, predicate: F) -> Flow<In, Out, Mat>
1482 where
1483 F: Fn(&Out) -> bool + Send + Sync + 'static,
1484 {
1485 let predicate = Arc::new(predicate);
1486 self.via(Flow::from_transform(move |mut input| {
1487 let predicate = Arc::clone(&predicate);
1488 let mut open = true;
1489 Box::new(std::iter::from_fn(move || {
1490 if !open {
1491 return None;
1492 }
1493
1494 match input.next() {
1495 Some(Ok(item)) if predicate(&item) => Some(Ok(item)),
1496 Some(Ok(_)) | None => {
1497 open = false;
1498 None
1499 }
1500 Some(Err(error)) => Some(Err(error)),
1501 }
1502 }))
1503 }))
1504 }
1505
1506 #[must_use]
1507 pub fn drop_while<F>(self, predicate: F) -> Flow<In, Out, Mat>
1508 where
1509 F: Fn(&Out) -> bool + Send + Sync + 'static,
1510 {
1511 let predicate = Arc::new(predicate);
1512 self.via(Flow::from_transform(move |mut input| {
1513 let predicate = Arc::clone(&predicate);
1514 let mut dropping = true;
1515 Box::new(std::iter::from_fn(move || {
1516 loop {
1517 let next = input.next()?;
1518 match next {
1519 Ok(item) if dropping && predicate(&item) => continue,
1520 Ok(item) => {
1521 dropping = false;
1522 return Some(Ok(item));
1523 }
1524 Err(error) => return Some(Err(error)),
1525 }
1526 }
1527 }))
1528 }))
1529 }
1530
1531 #[must_use]
1532 pub fn limit(self, max: u64) -> Flow<In, Out, Mat> {
1533 self.via(Flow::from_transform(move |input| {
1534 let mut seen = 0_u64;
1535 Box::new(input.map(move |item| match item {
1536 Ok(item) if seen < max => {
1537 seen += 1;
1538 Ok(item)
1539 }
1540 Ok(_) => Err(StreamError::LimitExceeded { max }),
1541 Err(error) => Err(error),
1542 }))
1543 }))
1544 }
1545
1546 #[must_use]
1547 pub fn grouped(self, size: usize) -> Flow<In, Vec<Out>, Mat> {
1548 assert!(size > 0, "grouped size must be greater than zero");
1549 self.via(Flow::from_transform(move |mut input| {
1550 Box::new(std::iter::from_fn(move || {
1551 let mut group = Vec::with_capacity(size);
1552 while group.len() < size {
1553 match input.next() {
1554 Some(Ok(item)) => group.push(item),
1555 Some(Err(error)) => return Some(Err(error)),
1556 None => break,
1557 }
1558 }
1559
1560 if group.is_empty() {
1561 None
1562 } else {
1563 Some(Ok(group))
1564 }
1565 }))
1566 }))
1567 }
1568
1569 #[must_use]
1570 pub fn scan<State, F>(self, seed: State, f: F) -> Flow<In, State, Mat>
1571 where
1572 State: Clone + Send + Sync + 'static,
1573 F: Fn(State, Out) -> State + Send + Sync + 'static,
1574 {
1575 let stage = Arc::new(f);
1576 self.via(Flow::from_transform(move |mut input| {
1577 let stage = Arc::clone(&stage);
1578 let mut state = Some(seed.clone());
1579 let mut emit_seed = true;
1580 Box::new(std::iter::from_fn(move || {
1581 if emit_seed {
1582 emit_seed = false;
1583 return Some(Ok(state.as_ref().expect("scan state present").clone()));
1584 }
1585
1586 match input.next()? {
1587 Ok(item) => {
1588 let prev = state.take().expect("scan state present");
1591 let next = stage(prev, item);
1592 state = Some(next.clone());
1593 Some(Ok(next))
1594 }
1595 Err(error) => Some(Err(error)),
1596 }
1597 }))
1598 }))
1599 }
1600
1601 #[must_use]
1602 pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Flow<In, State, Mat>
1603 where
1604 State: Clone + Send + Sync + 'static,
1605 F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1606 Fut: Future<Output = StreamResult<State>> + Send + 'static,
1607 {
1608 let stage = Arc::new(f);
1609 self.via(Flow::from_transform(move |mut input| {
1610 let stage = Arc::clone(&stage);
1611 let mut state = Some(seed.clone());
1612 let mut emit_seed = true;
1613 let mut terminated = false;
1614 Box::new(std::iter::from_fn(move || {
1615 if terminated {
1616 return None;
1617 }
1618 if emit_seed {
1619 emit_seed = false;
1620 return Some(Ok(state
1621 .as_ref()
1622 .expect("scan_async state present")
1623 .clone()));
1624 }
1625
1626 match input.next()? {
1627 Ok(item) => {
1628 let prev = state.take().expect("scan_async state present");
1629 match catch_unwind_failed("scan_async factory", || stage(prev, item))
1630 .and_then(run_future_inline_or_spawn)
1631 {
1632 Ok(next) => {
1633 state = Some(next.clone());
1634 Some(Ok(next))
1635 }
1636 Err(error) => {
1637 terminated = true;
1638 Some(Err(error))
1639 }
1640 }
1641 }
1642 Err(error) => {
1643 terminated = true;
1644 Some(Err(error))
1645 }
1646 }
1647 }))
1648 }))
1649 }
1650
1651 #[must_use]
1652 pub fn scan_result<State, F>(self, seed: State, f: F) -> Flow<In, State, Mat>
1653 where
1654 State: Clone + Send + Sync + 'static,
1655 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1656 {
1657 let stage = Arc::new(f);
1658 self.via(Flow::from_transform(move |mut input| {
1659 let stage = Arc::clone(&stage);
1660 let mut state = Some(seed.clone());
1661 let mut emit_seed = true;
1662 Box::new(std::iter::from_fn(move || {
1663 if emit_seed {
1664 emit_seed = false;
1665 return Some(Ok(state.as_ref().expect("scan state present").clone()));
1666 }
1667
1668 match input.next()? {
1669 Ok(item) => {
1670 let prev = state.take().expect("scan state present");
1671 match stage(prev, item) {
1672 Ok(next) => {
1673 state = Some(next.clone());
1674 Some(Ok(next))
1675 }
1676 Err(error) => Some(Err(error)),
1677 }
1678 }
1679 Err(error) => Some(Err(error)),
1680 }
1681 }))
1682 }))
1683 }
1684
1685 #[must_use]
1686 pub fn scan_result_with_supervision<State, F>(
1687 self,
1688 seed: State,
1689 f: F,
1690 decider: SupervisionDecider,
1691 ) -> Flow<In, State, Mat>
1692 where
1693 State: Clone + Send + Sync + 'static,
1694 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1695 {
1696 let stage = Arc::new(f);
1697 self.via(Flow::from_transform(move |mut input| {
1698 let stage = Arc::clone(&stage);
1699 let decider = Arc::clone(&decider);
1700 let seed = seed.clone();
1701 let mut state = Some(seed.clone());
1702 let mut emit_seed = true;
1703 Box::new(std::iter::from_fn(move || {
1704 loop {
1705 if emit_seed {
1706 emit_seed = false;
1707 return Some(Ok(state.as_ref().expect("scan state present").clone()));
1708 }
1709
1710 match input.next()? {
1711 Ok(item) => {
1712 let prev = state.take().expect("scan state present");
1713 match call_supervised("scan_result callback", || {
1714 stage(prev.clone(), item)
1715 }) {
1716 Ok(next) => {
1717 state = Some(next.clone());
1718 return Some(Ok(next));
1719 }
1720 Err(error) => match decide_supervision(&decider, &error) {
1721 SupervisionDirective::Stop => return Some(Err(error)),
1722 SupervisionDirective::Resume => {
1723 state = Some(prev);
1724 }
1725 SupervisionDirective::Restart => {
1726 state = Some(seed.clone());
1727 emit_seed = true;
1728 }
1729 },
1730 }
1731 }
1732 Err(error) => return Some(Err(error)),
1733 }
1734 }
1735 }))
1736 }))
1737 }
1738
1739 #[must_use]
1740 pub fn sliding(self, size: usize, step: usize) -> Flow<In, Vec<Out>, Mat>
1741 where
1742 Out: Clone,
1743 {
1744 assert!(size > 0, "sliding size must be greater than zero");
1745 assert!(step > 0, "sliding step must be greater than zero");
1746 self.via(Flow::from_transform(move |mut input| {
1747 let mut buffer = VecDeque::with_capacity(size.max(step));
1753 let mut terminated = false;
1754
1755 Box::new(std::iter::from_fn(move || {
1756 if terminated {
1757 return None;
1758 }
1759
1760 loop {
1761 match input.next() {
1762 Some(Ok(item)) => {
1763 buffer.push_back(item);
1764 if buffer.len() < size {
1765 continue;
1766 } else if buffer.len() == size {
1767 return Some(Ok(buffer.iter().cloned().collect()));
1768 } else if step <= size {
1769 for _ in 0..step {
1770 buffer.pop_front();
1771 }
1772 if buffer.len() == size {
1773 return Some(Ok(buffer.iter().cloned().collect()));
1774 }
1775 } else if buffer.len() == step {
1776 buffer.clear();
1778 }
1779 }
1780 Some(Err(error)) => {
1781 terminated = true;
1782 return Some(Err(error));
1783 }
1784 None => {
1785 terminated = true;
1786 if !buffer.is_empty() && buffer.len() < size {
1787 return Some(Ok(buffer.iter().cloned().collect()));
1788 }
1789 return None;
1790 }
1791 }
1792 }
1793 }))
1794 }))
1795 }
1796
1797 #[must_use]
1798 pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
1799 where
1800 Acc: Clone + Send + Sync + 'static,
1801 F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1802 {
1803 let stage = Arc::new(f);
1804 self.via(Flow::from_transform(move |input| {
1805 let stage = Arc::clone(&stage);
1806 let mut acc = zero.clone();
1807 for item in input {
1808 match item {
1809 Ok(item) => acc = stage(acc, item),
1810 Err(error) => return Box::new(std::iter::once(Err(error))),
1811 }
1812 }
1813 Box::new(std::iter::once(Ok(acc)))
1814 }))
1815 }
1816
1817 #[must_use]
1818 pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
1819 where
1820 Acc: Clone + Send + Sync + 'static,
1821 F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1822 Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1823 {
1824 let stage = Arc::new(f);
1825 self.via(Flow::from_transform(move |mut input| {
1826 let stage = Arc::clone(&stage);
1827 let mut acc = Some(zero.clone());
1828 let mut done = false;
1829 Box::new(std::iter::from_fn(move || {
1830 if done {
1831 return None;
1832 }
1833 done = true;
1834
1835 for item in input.by_ref() {
1836 let item = match item {
1837 Ok(item) => item,
1838 Err(error) => return Some(Err(error)),
1839 };
1840 let current = acc.take().expect("fold_async accumulator present");
1841 match catch_unwind_failed("fold_async factory", || stage(current, item))
1842 .and_then(run_future_inline_or_spawn)
1843 {
1844 Ok(next) => acc = Some(next),
1845 Err(error) => return Some(Err(error)),
1846 }
1847 }
1848
1849 Some(Ok(acc.take().expect("fold_async accumulator present")))
1850 }))
1851 }))
1852 }
1853
1854 #[must_use]
1855 pub fn map_with_resource<Resource, Next, Create, F, Close>(
1856 self,
1857 create: Create,
1858 f: F,
1859 close: Close,
1860 ) -> Flow<In, Next, Mat>
1861 where
1862 Resource: Send + 'static,
1863 Next: Send + 'static,
1864 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1865 F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1866 Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1867 {
1868 let create = Arc::new(create);
1869 let stage = Arc::new(f);
1870 let close = Arc::new(close);
1871 self.via(Flow::from_transform(move |input| {
1872 Box::new(MapWithResourceStream {
1873 input,
1874 create: Arc::clone(&create),
1875 stage: Arc::clone(&stage),
1876 close: Arc::clone(&close),
1877 resource: None,
1878 created: false,
1879 pending_terminal: None,
1880 terminated: false,
1881 _marker: PhantomData,
1882 }) as BoxStream<Next>
1883 }))
1884 }
1885
1886 #[must_use]
1887 pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
1888 where
1889 Acc: Clone + Send + Sync + 'static,
1890 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1891 {
1892 let stage = Arc::new(f);
1893 self.via(Flow::from_transform(move |input| {
1894 let stage = Arc::clone(&stage);
1895 let mut acc = zero.clone();
1896 for item in input {
1897 match item {
1898 Ok(item) => match stage(acc, item) {
1899 Ok(next) => acc = next,
1900 Err(error) => return Box::new(std::iter::once(Err(error))),
1901 },
1902 Err(error) => return Box::new(std::iter::once(Err(error))),
1903 }
1904 }
1905 Box::new(std::iter::once(Ok(acc)))
1906 }))
1907 }
1908
1909 #[must_use]
1910 pub fn fold_result_with_supervision<Acc, F>(
1911 self,
1912 zero: Acc,
1913 f: F,
1914 decider: SupervisionDecider,
1915 ) -> Flow<In, Acc, Mat>
1916 where
1917 Acc: Clone + Send + Sync + 'static,
1918 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1919 {
1920 let stage = Arc::new(f);
1921 self.via(Flow::from_transform(move |input| {
1922 let stage = Arc::clone(&stage);
1923 let decider = Arc::clone(&decider);
1924 let mut acc = zero.clone();
1925 for item in input {
1926 match item {
1927 Ok(item) => {
1928 let previous = acc;
1929 match call_supervised("fold_result callback", || {
1930 stage(previous.clone(), item)
1931 }) {
1932 Ok(next) => acc = next,
1933 Err(error) => match decide_supervision(&decider, &error) {
1934 SupervisionDirective::Stop => {
1935 return Box::new(std::iter::once(Err(error)));
1936 }
1937 SupervisionDirective::Resume => acc = previous,
1938 SupervisionDirective::Restart => acc = zero.clone(),
1939 },
1940 }
1941 }
1942 Err(error) => return Box::new(std::iter::once(Err(error))),
1943 }
1944 }
1945 Box::new(std::iter::once(Ok(acc)))
1946 }))
1947 }
1948
1949 #[must_use]
1950 pub fn reduce<F>(self, f: F) -> Flow<In, Out, Mat>
1951 where
1952 F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1953 {
1954 let stage = Arc::new(f);
1955 self.via(Flow::from_transform(move |mut input| {
1956 let Some(first) = input.next() else {
1957 return Box::new(std::iter::once(Err(StreamError::EmptyStream)));
1958 };
1959 let mut acc = match first {
1960 Ok(item) => item,
1961 Err(error) => return Box::new(std::iter::once(Err(error))),
1962 };
1963 for item in input {
1964 match item {
1965 Ok(item) => acc = stage(acc, item),
1966 Err(error) => return Box::new(std::iter::once(Err(error))),
1967 }
1968 }
1969 Box::new(std::iter::once(Ok(acc)))
1970 }))
1971 }
1972
1973 #[must_use]
1974 pub fn reduce_result<F>(self, f: F) -> Flow<In, Out, Mat>
1975 where
1976 Out: Clone,
1977 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
1978 {
1979 let stage = Arc::new(f);
1980 self.via(Flow::from_transform(move |mut input| {
1981 let Some(first) = input.next() else {
1982 return Box::new(std::iter::once(Err(StreamError::EmptyStream)));
1983 };
1984 let mut acc = match first {
1985 Ok(item) => item,
1986 Err(error) => return Box::new(std::iter::once(Err(error))),
1987 };
1988 for item in input {
1989 match item {
1990 Ok(item) => match stage(acc, item) {
1991 Ok(next) => acc = next,
1992 Err(error) => return Box::new(std::iter::once(Err(error))),
1993 },
1994 Err(error) => return Box::new(std::iter::once(Err(error))),
1995 }
1996 }
1997 Box::new(std::iter::once(Ok(acc)))
1998 }))
1999 }
2000
2001 #[must_use]
2002 pub fn reduce_result_with_supervision<F>(
2003 self,
2004 f: F,
2005 decider: SupervisionDecider,
2006 ) -> Flow<In, Out, Mat>
2007 where
2008 Out: Clone,
2009 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
2010 {
2011 let stage = Arc::new(f);
2012 self.via(Flow::from_transform(move |input| {
2013 let stage = Arc::clone(&stage);
2014 let decider = Arc::clone(&decider);
2015 let mut acc = None::<Out>;
2016 for item in input {
2017 match item {
2018 Ok(item) => {
2019 let Some(previous) = acc.take() else {
2020 acc = Some(item);
2021 continue;
2022 };
2023 match call_supervised("reduce_result callback", || {
2024 stage(previous.clone(), item)
2025 }) {
2026 Ok(next) => acc = Some(next),
2027 Err(error) => match decide_supervision(&decider, &error) {
2028 SupervisionDirective::Stop => {
2029 return Box::new(std::iter::once(Err(error)));
2030 }
2031 SupervisionDirective::Resume => acc = Some(previous),
2032 SupervisionDirective::Restart => acc = None,
2033 },
2034 }
2035 }
2036 Err(error) => return Box::new(std::iter::once(Err(error))),
2037 }
2038 }
2039 match acc {
2040 Some(acc) => Box::new(std::iter::once(Ok(acc))),
2041 None => Box::new(std::iter::once(Err(StreamError::EmptyStream))),
2042 }
2043 }))
2044 }
2045
2046 #[must_use]
2047 pub fn map_error<F>(self, f: F) -> Flow<In, Out, Mat>
2048 where
2049 F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
2050 {
2051 let stage = Arc::new(f);
2052 self.via(Flow::from_transform(move |input| {
2053 let stage = Arc::clone(&stage);
2054 Box::new(input.map(move |item| item.map_err(|error| stage(error))))
2055 }))
2056 }
2057
2058 #[must_use]
2059 pub fn recover<F>(self, f: F) -> Flow<In, Out, Mat>
2060 where
2061 F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
2062 {
2063 let stage = Arc::new(f);
2064 self.via(Flow::from_transform(move |mut input| {
2065 let stage = Arc::clone(&stage);
2066 let mut done = false;
2067 Box::new(std::iter::from_fn(move || {
2068 if done {
2069 return None;
2070 }
2071 match input.next()? {
2072 Ok(item) => Some(Ok(item)),
2073 Err(error) => {
2074 done = true;
2075 match stage(error.clone()) {
2076 Some(item) => Some(Ok(item)),
2077 None => Some(Err(error)),
2078 }
2079 }
2080 }
2081 }))
2082 }))
2083 }
2084
2085 #[must_use]
2086 pub fn recover_with<F>(self, f: F) -> Flow<In, Out, Mat>
2087 where
2088 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
2089 {
2090 self.recover_with_attempts(None, f)
2092 }
2093
2094 #[must_use]
2095 pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Flow<In, Out, Mat>
2096 where
2097 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
2098 {
2099 self.recover_with_attempts(Some(retries), f)
2100 }
2101
2102 fn recover_with_attempts<F>(self, attempts: Option<usize>, f: F) -> Flow<In, Out, Mat>
2103 where
2104 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
2105 {
2106 let stage = Arc::new(f);
2107 self.via(Flow::from_runtime_transform(move |input, materializer| {
2108 let replacement_materializer =
2109 materializer.with_name_prefix(materializer.name_prefix().to_owned());
2110 let stage = Arc::clone(&stage);
2111 let mut remaining_retries = attempts;
2113 let mut current = input;
2114 let mut terminated = false;
2115
2116 Ok(Box::new(std::iter::from_fn(move || {
2117 if terminated {
2118 return None;
2119 }
2120
2121 loop {
2122 match current.next() {
2123 Some(Ok(item)) => return Some(Ok(item)),
2124 Some(Err(error)) if remaining_retries != Some(0) => {
2125 if let Some(remaining) = remaining_retries.as_mut() {
2126 *remaining -= 1;
2127 }
2128 match stage(error.clone()) {
2129 Some(source) => match Arc::clone(&source.factory)
2130 .create(&replacement_materializer)
2131 {
2132 Ok((stream, _)) => current = stream,
2133 Err(error) => {
2134 terminated = true;
2135 return Some(Err(error));
2136 }
2137 },
2138 None => {
2139 terminated = true;
2140 return Some(Err(error));
2141 }
2142 }
2143 }
2144 Some(Err(error)) => {
2145 terminated = true;
2146 return Some(Err(error));
2147 }
2148 None => {
2149 terminated = true;
2150 return None;
2151 }
2152 }
2153 }
2154 })) as BoxStream<Out>)
2155 }))
2156 }
2157
2158 #[must_use]
2159 pub fn on_error_complete(self) -> Flow<In, Out, Mat> {
2160 self.via(Flow::from_transform(move |mut input| {
2161 let mut done = false;
2162 Box::new(std::iter::from_fn(move || {
2163 if done {
2164 return None;
2165 }
2166 match input.next()? {
2167 Ok(item) => Some(Ok(item)),
2168 Err(_) => {
2169 done = true;
2170 None
2171 }
2172 }
2173 }))
2174 }))
2175 }
2176
2177 #[must_use]
2178 pub fn prefix_and_tail(self, n: usize) -> Flow<In, (Vec<Out>, Source<Out>), Mat> {
2179 self.via(Flow::from_runtime_transform(move |input, _materializer| {
2180 Ok(prefix_and_tail_stream(input, n))
2181 }))
2182 }
2183
2184 #[must_use]
2185 pub fn flat_map_prefix<Next, NextMat, F>(self, n: usize, f: F) -> Flow<In, Next, Mat>
2186 where
2187 Next: Send + 'static,
2188 NextMat: Send + 'static,
2189 F: Fn(Vec<Out>) -> Flow<Out, Next, NextMat> + Send + Sync + 'static,
2190 Out: Clone,
2191 {
2192 let stage = Arc::new(f);
2193 self.via(Flow::from_runtime_transform(
2194 move |mut input, materializer| {
2195 let mut prefix = Vec::with_capacity(n);
2196 while prefix.len() < n {
2197 match input.next() {
2198 Some(Ok(item)) => prefix.push(item),
2199 Some(Err(error)) => {
2200 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>);
2201 }
2202 None => break,
2203 }
2204 }
2205
2206 let flow = stage(prefix);
2207 let transform = flow.transform;
2208 let _ = (flow.materialize)()?;
2209 match transform {
2210 FlowTransform::Pure(transform) => Ok(transform(input)),
2211 FlowTransform::Runtime(transform) => transform(input, materializer),
2212 }
2213 },
2214 ))
2215 }
2216
2217 #[must_use]
2218 pub fn group_by<Key, F>(
2219 self,
2220 max_substreams: usize,
2221 f: F,
2222 allow_closed_substream_recreation: bool,
2223 ) -> Flow<In, Source<Out>, Mat>
2224 where
2225 Key: Clone + Eq + Hash + Send + 'static,
2226 F: Fn(&Out) -> Key + Send + Sync + 'static,
2227 Out: Clone,
2228 {
2229 assert!(
2230 max_substreams > 0,
2231 "group_by max_substreams must be greater than zero"
2232 );
2233 let key_fn = Arc::new(f);
2234 self.via(Flow::from_runtime_transform(move |input, materializer| {
2235 Ok(group_by_stream(
2236 input,
2237 max_substreams,
2238 allow_closed_substream_recreation,
2239 Arc::clone(&key_fn),
2240 materializer,
2241 ))
2242 }))
2243 }
2244
2245 #[must_use]
2246 pub fn split_when<F>(self, predicate: F) -> Flow<In, Source<Out>, Mat>
2247 where
2248 F: Fn(&Out) -> bool + Send + Sync + 'static,
2249 Out: Clone,
2250 {
2251 let predicate = Arc::new(predicate);
2252 self.via(Flow::from_runtime_transform(move |input, materializer| {
2253 Ok(split_streams(
2254 input,
2255 SplitMode::When,
2256 Arc::clone(&predicate),
2257 materializer,
2258 ))
2259 }))
2260 }
2261
2262 #[must_use]
2263 pub fn split_after<F>(self, predicate: F) -> Flow<In, Source<Out>, Mat>
2264 where
2265 F: Fn(&Out) -> bool + Send + Sync + 'static,
2266 Out: Clone,
2267 {
2268 let predicate = Arc::new(predicate);
2269 self.via(Flow::from_runtime_transform(move |input, materializer| {
2270 Ok(split_streams(
2271 input,
2272 SplitMode::After,
2273 Arc::clone(&predicate),
2274 materializer,
2275 ))
2276 }))
2277 }
2278
2279 #[must_use]
2280 pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Flow<In, Next, Mat>
2281 where
2282 Next: Send + 'static,
2283 NextMat: Send + 'static,
2284 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
2285 {
2286 let stage = Arc::new(f);
2287 self.via(Flow::from_runtime_transform(move |input, materializer| {
2288 Ok(flat_map_concat_stream(
2289 input,
2290 Arc::clone(&stage),
2291 materializer,
2292 ))
2293 }))
2294 }
2295
2296 #[must_use]
2297 pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Flow<In, Next, Mat>
2298 where
2299 Next: Send + 'static,
2300 NextMat: Send + 'static,
2301 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
2302 {
2303 assert!(
2304 breadth > 0,
2305 "flat_map_merge breadth must be greater than zero"
2306 );
2307 let stage = Arc::new(f);
2308 self.via(Flow::from_runtime_transform(move |input, materializer| {
2309 Ok(flat_map_merge_stream(
2310 input,
2311 breadth,
2312 Arc::clone(&stage),
2313 materializer,
2314 ))
2315 }))
2316 }
2317
2318 #[must_use]
2319 pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2320 where
2321 Mat2: Send + 'static,
2322 {
2323 self.concat_sources([that])
2324 }
2325
2326 #[must_use]
2327 pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2328 where
2329 Mat2: Send + 'static,
2330 {
2331 let that_factory = that.factory;
2332 self.via(Flow::from_runtime_transform(move |input, materializer| {
2333 let primary = input;
2334 Ok(concat_streams_lazy(
2335 primary,
2336 vec![Arc::clone(&that_factory)],
2337 materializer,
2338 ))
2339 }))
2340 }
2341
2342 #[must_use]
2343 pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
2344 where
2345 Mat2: Send + 'static,
2346 I: IntoIterator<Item = Source<Out, Mat2>>,
2347 {
2348 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2349 self.via(Flow::from_runtime_transform(move |input, materializer| {
2350 Ok(concat_streams_lazy(
2351 input,
2352 source_factories.clone(),
2353 materializer,
2354 ))
2355 }))
2356 }
2357
2358 #[must_use]
2359 pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2360 where
2361 Mat2: Send + 'static,
2362 {
2363 self.prepend_sources([that])
2364 }
2365
2366 #[must_use]
2367 pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2368 where
2369 Mat2: Send + 'static,
2370 {
2371 self.prepend(that)
2372 }
2373
2374 #[must_use]
2375 pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2376 where
2377 Mat2: Send + 'static,
2378 {
2379 let secondary_factory = secondary.factory;
2380 self.via(Flow::from_runtime_transform(move |input, materializer| {
2381 let secondary = match Arc::clone(&secondary_factory).create(materializer) {
2382 Ok((stream, _)) => stream,
2383 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
2384 };
2385 Ok(or_else_stream(input, secondary))
2386 }))
2387 }
2388
2389 #[must_use]
2390 pub fn interleave<Mat2>(
2391 self,
2392 that: Source<Out, Mat2>,
2393 segment_size: usize,
2394 ) -> Flow<In, Out, Mat>
2395 where
2396 Mat2: Send + 'static,
2397 {
2398 self.interleave_all([that], segment_size, false)
2399 }
2400
2401 #[must_use]
2402 pub fn interleave_all<Mat2, I>(
2403 self,
2404 those: I,
2405 segment_size: usize,
2406 eager_close: bool,
2407 ) -> Flow<In, Out, Mat>
2408 where
2409 Mat2: Send + 'static,
2410 I: IntoIterator<Item = Source<Out, Mat2>>,
2411 {
2412 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2413 self.via(Flow::from_runtime_transform(move |input, materializer| {
2414 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2415 streams.push(input);
2416 for factory in &source_factories {
2417 let stream = match Arc::clone(factory).create(materializer) {
2418 Ok((stream, _)) => stream,
2419 Err(error) => {
2420 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2421 }
2422 };
2423 streams.push(stream);
2424 }
2425 Ok(interleave_streams(streams, segment_size, eager_close))
2426 }))
2427 }
2428
2429 #[must_use]
2430 pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2431 where
2432 Out: Ord,
2433 Mat2: Send + 'static,
2434 {
2435 let source_factory = that.factory;
2436 self.via(Flow::from_runtime_transform(move |input, materializer| {
2437 let other = match Arc::clone(&source_factory).create(materializer) {
2438 Ok((stream, _)) => stream,
2439 Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>),
2440 };
2441 Ok(merge_sorted_stream(input, other))
2442 }))
2443 }
2444
2445 #[must_use]
2446 pub fn merge_latest<Mat2>(
2447 self,
2448 that: Source<Out, Mat2>,
2449 eager_complete: bool,
2450 ) -> Flow<In, Vec<Out>, Mat>
2451 where
2452 Out: Clone,
2453 Mat2: Send + 'static,
2454 {
2455 let source_factory = that.factory;
2456 self.via(Flow::from_runtime_transform(move |input, materializer| {
2457 let other = match Arc::clone(&source_factory).create(materializer) {
2458 Ok((stream, _)) => stream,
2459 Err(error) => {
2460 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>);
2461 }
2462 };
2463 Ok(merge_latest_streams(vec![input, other], eager_complete))
2464 }))
2465 }
2466
2467 #[must_use]
2468 pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Flow<In, Out, Mat>
2469 where
2470 Mat2: Send + 'static,
2471 I: IntoIterator<Item = Source<Out, Mat2>>,
2472 {
2473 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2474 self.via(Flow::from_runtime_transform(move |input, materializer| {
2475 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2476 streams.push(input);
2477 for factory in &source_factories {
2478 let stream = match Arc::clone(factory).create(materializer) {
2479 Ok((stream, _)) => stream,
2480 Err(error) => {
2481 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2482 }
2483 };
2484 streams.push(stream);
2485 }
2486 Ok(merge_streams(streams, eager_complete))
2487 }))
2488 }
2489
2490 #[must_use]
2491 pub fn zip_with<Mat2, Out2, Next, F>(
2492 self,
2493 that: Source<Out2, Mat2>,
2494 combine: F,
2495 ) -> Flow<In, Next, Mat>
2496 where
2497 Out2: Send + 'static,
2498 Next: Send + 'static,
2499 Mat2: Send + 'static,
2500 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
2501 {
2502 let source_factory = that.factory;
2503 let combine = Arc::new(combine);
2504 self.via(Flow::from_runtime_transform(move |input, materializer| {
2505 let other = match Arc::clone(&source_factory).create(materializer) {
2506 Ok((stream, _)) => stream,
2507 Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>),
2508 };
2509 let combine = Arc::clone(&combine);
2510 Ok(Box::new(
2511 zip_streams(input, other)
2512 .map(move |item| item.map(|(left, right)| combine(left, right))),
2513 ) as BoxStream<Next>)
2514 }))
2515 }
2516
2517 #[must_use]
2518 pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Flow<In, (Out, Out2), Mat>
2519 where
2520 Out: Clone,
2521 Out2: Clone + Send + 'static,
2522 Mat2: Send + 'static,
2523 {
2524 self.zip_latest_with(that, true, |left, right| (left, right))
2525 }
2526
2527 #[must_use]
2528 pub fn zip_latest_with<Mat2, Out2, Next, F>(
2529 self,
2530 that: Source<Out2, Mat2>,
2531 eager_complete: bool,
2532 combine: F,
2533 ) -> Flow<In, Next, Mat>
2534 where
2535 Out: Clone,
2536 Out2: Clone + Send + 'static,
2537 Next: Send + 'static,
2538 Mat2: Send + 'static,
2539 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
2540 {
2541 let source_factory = that.factory;
2542 let combine = Arc::new(combine);
2543 self.via(Flow::from_runtime_transform(move |input, materializer| {
2544 let other = match Arc::clone(&source_factory).create(materializer) {
2545 Ok((stream, _)) => stream,
2546 Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>),
2547 };
2548 Ok(zip_latest_with_stream(
2549 input,
2550 other,
2551 eager_complete,
2552 Arc::clone(&combine),
2553 ))
2554 }))
2555 }
2556
2557 #[must_use]
2558 pub fn zip_with_index(self) -> Flow<In, (Out, u64), Mat> {
2559 self.via(Flow::from_runtime_transform(
2560 move |mut input, _materializer| {
2561 let mut index = 0_u64;
2562 Ok(Box::new(std::iter::from_fn(move || {
2563 input.next().map(|item| {
2564 item.map(|value| {
2565 let pair = (value, index);
2566 index = index.wrapping_add(1);
2567 pair
2568 })
2569 })
2570 })) as BoxStream<(Out, u64)>)
2571 },
2572 ))
2573 }
2574
2575 #[must_use]
2576 pub fn zip_all<Mat2, Out2>(
2577 self,
2578 that: Source<Out2, Mat2>,
2579 this_elem: Out,
2580 that_elem: Out2,
2581 ) -> Flow<In, (Out, Out2), Mat>
2582 where
2583 Out: Clone + Sync,
2584 Out2: Clone + Send + Sync + 'static,
2585 Mat2: Send + 'static,
2586 {
2587 let source_factory = that.factory;
2588 self.via(Flow::from_runtime_transform(move |input, materializer| {
2589 let other = match Arc::clone(&source_factory).create(materializer) {
2590 Ok((stream, _)) => stream,
2591 Err(error) => {
2592 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>);
2593 }
2594 };
2595 Ok(zip_all_stream(
2596 input,
2597 other,
2598 this_elem.clone(),
2599 that_elem.clone(),
2600 ))
2601 }))
2602 }
2603
2604 #[must_use]
2605 pub fn also_to<SideMat>(self, sink: Sink<Out, SideMat>) -> Flow<In, Out, Mat>
2606 where
2607 Out: Clone,
2608 SideMat: Send + 'static,
2609 {
2610 self.via(Flow::from_runtime_transform(
2611 move |mut input: BoxStream<Out>, materializer| {
2612 let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 0)?;
2613 let mut sender = Some(side_sender);
2614 let side_mat = side_mat;
2615 Ok(Box::new(std::iter::from_fn(move || match input.next() {
2616 Some(Ok(item)) => {
2617 let _ = &side_mat;
2618 if sender
2619 .as_ref()
2620 .is_some_and(|sender| sender.send(Ok(item.clone())).is_err())
2621 {
2622 sender = None;
2623 return None;
2624 }
2625 Some(Ok(item))
2626 }
2627 Some(Err(error)) => {
2628 let _ = &side_mat;
2629 let _ = sender
2630 .as_ref()
2631 .and_then(|sender| sender.send(Err(error.clone())).ok());
2632 sender = None;
2633 Some(Err(error))
2634 }
2635 None => {
2636 let _ = &side_mat;
2637 sender = None;
2638 None
2639 }
2640 })) as BoxStream<Out>)
2641 },
2642 ))
2643 }
2644
2645 #[must_use]
2646 pub fn also_to_all<SideMat, I>(self, sinks: I) -> Flow<In, Out, Mat>
2647 where
2648 Out: Clone,
2649 SideMat: Send + 'static,
2650 I: IntoIterator<Item = Sink<Out, SideMat>>,
2651 {
2652 let sinks: Vec<_> = sinks.into_iter().collect();
2653 if sinks.is_empty() {
2654 return self;
2655 }
2656
2657 self.via(Flow::from_runtime_transform(
2658 move |mut input: BoxStream<Out>, materializer| {
2659 let mut sides = sinks
2660 .iter()
2661 .map(|sink| materialize_side_sink(sink, materializer, 0))
2662 .collect::<StreamResult<Vec<_>>>()?;
2663 Ok(Box::new(std::iter::from_fn(move || match input.next() {
2664 Some(Ok(item)) => {
2665 for (sender, _) in &sides {
2666 if sender.send(Ok(item.clone())).is_err() {
2667 sides.clear();
2668 return None;
2669 }
2670 }
2671 Some(Ok(item))
2672 }
2673 Some(Err(error)) => {
2674 for (sender, _) in &sides {
2675 let _ = sender.send(Err(error.clone())).ok();
2676 }
2677 sides.clear();
2678 Some(Err(error))
2679 }
2680 None => {
2681 sides.clear();
2682 None
2683 }
2684 })) as BoxStream<Out>)
2685 },
2686 ))
2687 }
2688
2689 #[must_use]
2690 pub fn divert_to<SideMat, F>(self, sink: Sink<Out, SideMat>, predicate: F) -> Flow<In, Out, Mat>
2691 where
2692 SideMat: Send + 'static,
2693 F: Fn(&Out) -> bool + Send + Sync + 'static,
2694 {
2695 let predicate = Arc::new(predicate);
2696 self.via(Flow::from_runtime_transform(
2697 move |mut input: BoxStream<Out>, materializer| {
2698 let predicate = Arc::clone(&predicate);
2699 let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 0)?;
2700 let mut sender = Some(side_sender);
2701 let side_mat = side_mat;
2702 Ok(Box::new(std::iter::from_fn(move || {
2703 loop {
2704 let _ = &side_mat;
2705 match input.next() {
2706 Some(Ok(item)) if predicate(&item) => {
2707 if sender
2708 .as_ref()
2709 .is_some_and(|sender| sender.send(Ok(item)).is_err())
2710 {
2711 sender = None;
2712 return None;
2713 }
2714 }
2715 Some(Ok(item)) => return Some(Ok(item)),
2716 Some(Err(error)) => {
2717 let _ = sender
2718 .as_ref()
2719 .and_then(|sender| sender.send(Err(error.clone())).ok());
2720 sender = None;
2721 return Some(Err(error));
2722 }
2723 None => {
2724 sender = None;
2725 return None;
2726 }
2727 }
2728 }
2729 })) as BoxStream<Out>)
2730 },
2731 ))
2732 }
2733
2734 #[must_use]
2735 pub fn wire_tap<SideMat>(self, sink: Sink<Out, SideMat>) -> Flow<In, Out, Mat>
2736 where
2737 Out: Clone,
2738 SideMat: Send + 'static,
2739 {
2740 self.via(Flow::from_runtime_transform(
2741 move |mut input: BoxStream<Out>, materializer| {
2742 let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 1)?;
2743 let mut sender = Some(side_sender);
2744 let side_mat = side_mat;
2745 Ok(Box::new(std::iter::from_fn(move || match input.next() {
2746 Some(Ok(item)) => {
2747 let _ = &side_mat;
2748 if let Some(side) = sender.as_ref() {
2749 match side.try_send(Ok(item.clone())) {
2750 Ok(()) | Err(std::sync::mpsc::TrySendError::Full(_)) => {}
2751 Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
2752 sender = None
2753 }
2754 }
2755 }
2756 Some(Ok(item))
2757 }
2758 Some(Err(error)) => {
2759 let _ = &side_mat;
2760 if let Some(side) = sender.as_ref() {
2761 match side.try_send(Err(error.clone())) {
2762 Ok(())
2763 | Err(std::sync::mpsc::TrySendError::Full(_))
2764 | Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {}
2765 }
2766 }
2767 sender = None;
2768 Some(Err(error))
2769 }
2770 None => {
2771 let _ = &side_mat;
2772 sender = None;
2773 None
2774 }
2775 })) as BoxStream<Out>)
2776 },
2777 ))
2778 }
2779
2780 fn concat_sources<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
2781 where
2782 Mat2: Send + 'static,
2783 I: IntoIterator<Item = Source<Out, Mat2>>,
2784 {
2785 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2786 self.via(Flow::from_runtime_transform(move |input, materializer| {
2787 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2788 streams.push(input);
2789 for factory in &source_factories {
2790 let stream = match Arc::clone(factory).create(materializer) {
2791 Ok((stream, _)) => stream,
2792 Err(error) => {
2793 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2794 }
2795 };
2796 streams.push(stream);
2797 }
2798 Ok(concat_streams(streams))
2799 }))
2800 }
2801
2802 fn prepend_sources<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
2803 where
2804 Mat2: Send + 'static,
2805 I: IntoIterator<Item = Source<Out, Mat2>>,
2806 {
2807 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2808 self.via(Flow::from_runtime_transform(move |input, materializer| {
2809 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2810 for factory in &source_factories {
2811 let stream = match Arc::clone(factory).create(materializer) {
2812 Ok((stream, _)) => stream,
2813 Err(error) => {
2814 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2815 }
2816 };
2817 streams.push(stream);
2818 }
2819 streams.push(input);
2820 Ok(concat_streams(streams))
2821 }))
2822 }
2823
2824 #[must_use]
2827 pub fn intersperse(self, inject: Out) -> Flow<In, Out, Mat>
2828 where
2829 Out: Clone + Sync,
2830 {
2831 let inject = Arc::new(inject);
2832 self.via(Flow::from_transform(move |mut input| {
2833 let inject = Arc::clone(&inject);
2834 let mut first = true;
2835 Box::new(std::iter::from_fn(move || {
2836 if first {
2837 first = false;
2838 match input.next() {
2839 None => None,
2840 Some(item) => {
2841 if item.is_err() {
2842 first = true;
2843 }
2844 Some(item)
2845 }
2846 }
2847 } else {
2848 match input.next() {
2849 None => None,
2850 Some(item) => {
2851 if item.is_err() {
2852 first = true;
2853 Some(item)
2854 } else {
2855 Some(Ok((*inject).clone()))
2856 }
2857 }
2858 }
2859 }
2860 }))
2861 }))
2862 }
2863
2864 #[must_use]
2865 pub fn flatten_optional<Inner>(self) -> Flow<In, Inner, Mat>
2866 where
2867 Out: Into<Option<Inner>>,
2868 Inner: Send + 'static,
2869 {
2870 self.filter_map(|item| item.into())
2871 }
2872
2873 #[must_use]
2874 pub fn grouped_weighted<F>(self, max_weight: usize, cost_fn: F) -> Flow<In, Vec<Out>, Mat>
2875 where
2876 F: Fn(&Out) -> usize + Send + Sync + 'static,
2877 {
2878 let cost_fn = Arc::new(cost_fn);
2879 self.via(Flow::from_transform(move |mut input| {
2880 let cost_fn = Arc::clone(&cost_fn);
2881 Box::new(std::iter::from_fn(move || {
2882 let mut group = Vec::new();
2883 let mut weight = 0usize;
2884 while weight < max_weight {
2885 match input.next() {
2886 Some(Ok(item)) => {
2887 let item_weight = cost_fn(&item);
2888 if weight > 0 && weight + item_weight > max_weight {
2889 group.push(item);
2890 break;
2891 }
2892 weight += item_weight;
2893 group.push(item);
2894 }
2895 Some(Err(error)) => return Some(Err(error)),
2896 None => break,
2897 }
2898 }
2899 if group.is_empty() {
2900 None
2901 } else {
2902 Some(Ok(group))
2903 }
2904 }))
2905 }))
2906 }
2907
2908 #[must_use]
2909 pub fn limit_weighted<F>(self, max_weight: usize, cost_fn: F) -> Flow<In, Out, Mat>
2910 where
2911 F: Fn(&Out) -> usize + Send + Sync + 'static,
2912 {
2913 let cost_fn = Arc::new(cost_fn);
2914 self.via(Flow::from_transform(move |mut input| {
2915 let cost_fn = Arc::clone(&cost_fn);
2916 let mut weight = 0usize;
2917 Box::new(std::iter::from_fn(move || match input.next()? {
2918 Ok(item) => {
2919 let item_weight = cost_fn(&item);
2920 if weight + item_weight > max_weight {
2921 Some(Err(StreamError::LimitExceeded {
2922 max: max_weight as u64,
2923 }))
2924 } else {
2925 weight += item_weight;
2926 Some(Ok(item))
2927 }
2928 }
2929 Err(error) => Some(Err(error)),
2930 }))
2931 }))
2932 }
2933
2934 #[must_use]
2935 pub fn contramap<NewIn, F>(self, f: F) -> Flow<NewIn, Out, Mat>
2936 where
2937 NewIn: Send + 'static,
2938 F: Fn(NewIn) -> In + Send + Sync + 'static,
2939 {
2940 let stage = Arc::new(f);
2941 Flow::from_transform(move |input| {
2942 let stage = Arc::clone(&stage);
2943 Box::new(input.map(move |item| item.map(|item| stage(item))))
2944 })
2945 .via_mat(self, Keep::right)
2946 }
2947
2948 #[must_use]
2949 pub fn monitor<F>(self, f: F) -> Flow<In, Out, Mat>
2950 where
2951 Out: Clone,
2952 F: Fn(&Out) + Send + Sync + 'static,
2953 {
2954 let stage = Arc::new(f);
2955 self.via(Flow::from_transform(move |input| {
2956 let stage = Arc::clone(&stage);
2957 Box::new(input.map(move |item| match item {
2958 Ok(item) => {
2959 stage(&item);
2960 Ok(item)
2961 }
2962 Err(error) => Err(error),
2963 }))
2964 }))
2965 }
2966
2967 #[must_use]
2968 pub fn watch_termination<CallbackMat, F>(self, materialize_callback: F) -> Flow<In, Out, Mat>
2969 where
2970 Mat: Clone,
2971 CallbackMat: Send + 'static,
2972 F: Fn(Mat) -> CallbackMat + Send + Sync + 'static,
2973 {
2974 let cb = Arc::new(materialize_callback);
2975 self.map_materialized_value(move |mat| {
2976 let _ = cb(mat.clone());
2977 mat
2978 })
2979 }
2980
2981 #[must_use]
2982 pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Sink<In, Mat>
2983 where
2984 SinkMat: Send + 'static,
2985 {
2986 self.to_mat(sink, Keep::left)
2987 }
2988
2989 #[must_use]
2990 pub fn to_mat<SinkMat, Combined, F>(
2991 self,
2992 sink: Sink<Out, SinkMat>,
2993 combine: F,
2994 ) -> Sink<In, Combined>
2995 where
2996 SinkMat: Send + 'static,
2997 Combined: Send + 'static,
2998 F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
2999 {
3000 let transform = self.transform;
3001 let materialize = self.materialize;
3002 let combine = Arc::new(combine);
3003 Sink::from_runner(move |input, materializer| {
3004 let flow_mat = materialize()?;
3005 let input = match &transform {
3006 FlowTransform::Pure(transform) => transform(input),
3007 FlowTransform::Runtime(transform) => transform(input, materializer)?,
3008 };
3009 let sink_mat = sink.run(input, materializer)?;
3010 Ok(combine(flow_mat, sink_mat))
3011 })
3012 }
3013
3014 #[must_use]
3015 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Flow<In, Out, NextMat>
3016 where
3017 NextMat: Send + 'static,
3018 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
3019 {
3020 let transform = self.transform;
3021 let materialize = self.materialize;
3022 let hints = self.hints;
3023 let attributes = self.attributes;
3024 let f = Arc::new(f);
3025 match transform {
3026 FlowTransform::Pure(transform) => Flow::from_parts_with_hints(
3027 move |input| transform(input),
3028 move || {
3029 let mat = materialize()?;
3030 Ok(f(mat))
3031 },
3032 hints,
3033 )
3034 .with_attributes(attributes),
3035 FlowTransform::Runtime(transform) => Flow {
3036 transform: FlowTransform::Runtime(transform),
3037 materialize: Arc::new(move || {
3038 let mat = materialize()?;
3039 Ok(f(mat))
3040 }),
3041 hints,
3042 attributes,
3043 },
3044 }
3045 }
3046}
3047
3048struct MapWithResourceStream<In, Out, Resource, Create, F, Close>
3049where
3050 Create: Fn() -> StreamResult<Resource>,
3051 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3052 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3053{
3054 input: BoxStream<In>,
3055 create: Arc<Create>,
3056 stage: Arc<F>,
3057 close: Arc<Close>,
3058 resource: Option<Resource>,
3059 created: bool,
3060 pending_terminal: Option<StreamError>,
3061 terminated: bool,
3062 _marker: PhantomData<fn() -> Out>,
3063}
3064
3065impl<In, Out, Resource, Create, F, Close> MapWithResourceStream<In, Out, Resource, Create, F, Close>
3066where
3067 Create: Fn() -> StreamResult<Resource>,
3068 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3069 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3070{
3071 fn ensure_created(&mut self) -> StreamResult<()> {
3072 if self.created {
3073 return Ok(());
3074 }
3075 self.created = true;
3076 let resource = catch_unwind_failed("map_with_resource create", || (self.create)())
3077 .and_then(|result| result)?;
3078 self.resource = Some(resource);
3079 Ok(())
3080 }
3081
3082 fn close_resource(&mut self) -> StreamResult<Option<Out>> {
3083 match self.resource.take() {
3084 Some(resource) => {
3085 catch_unwind_failed("map_with_resource close", || (self.close)(resource))
3086 .and_then(|result| result)
3087 }
3088 None => Ok(None),
3089 }
3090 }
3091
3092 fn close_with_terminal(&mut self, terminal: Option<StreamError>) -> Option<StreamResult<Out>> {
3093 self.terminated = terminal.is_none();
3094 match self.close_resource() {
3095 Ok(Some(item)) => {
3096 self.pending_terminal = terminal;
3097 Some(Ok(item))
3098 }
3099 Ok(None) => match terminal {
3100 Some(error) => {
3101 self.terminated = true;
3102 Some(Err(error))
3103 }
3104 None => {
3105 self.terminated = true;
3106 None
3107 }
3108 },
3109 Err(error) => {
3110 self.terminated = true;
3111 Some(Err(terminal.unwrap_or(error)))
3112 }
3113 }
3114 }
3115}
3116
3117impl<In, Out, Resource, Create, F, Close> Iterator
3118 for MapWithResourceStream<In, Out, Resource, Create, F, Close>
3119where
3120 Create: Fn() -> StreamResult<Resource>,
3121 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3122 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3123{
3124 type Item = StreamResult<Out>;
3125
3126 fn next(&mut self) -> Option<Self::Item> {
3127 if let Some(error) = self.pending_terminal.take() {
3128 self.terminated = true;
3129 return Some(Err(error));
3130 }
3131 if self.terminated {
3132 return None;
3133 }
3134 if let Err(error) = self.ensure_created() {
3135 self.terminated = true;
3136 return Some(Err(error));
3137 }
3138
3139 match self.input.next() {
3140 Some(Ok(item)) => {
3141 let result = {
3142 let resource = self
3143 .resource
3144 .as_mut()
3145 .expect("map_with_resource resource is open");
3146 catch_unwind_failed("map_with_resource function", || {
3147 (self.stage)(resource, item)
3148 })
3149 .and_then(|result| result)
3150 };
3151 match result {
3152 Ok(item) => Some(Ok(item)),
3153 Err(error) => self.close_with_terminal(Some(error)),
3154 }
3155 }
3156 Some(Err(error)) => self.close_with_terminal(Some(error)),
3157 None => self.close_with_terminal(None),
3158 }
3159 }
3160}
3161
3162impl<I1: Send + 'static, O1: Send + 'static, I2: Send + 'static, O2: Send + 'static>
3163 BidiFlow<I1, O1, I2, O2>
3164{
3165 #[must_use]
3166 pub fn from_flows<Mat1, Mat2>(top: Flow<I1, O1, Mat1>, bottom: Flow<I2, O2, Mat2>) -> Self
3167 where
3168 Mat1: Send + 'static,
3169 Mat2: Send + 'static,
3170 {
3171 Self {
3172 top: top.map_materialized_value(|_| NotUsed),
3173 bottom: bottom.map_materialized_value(|_| NotUsed),
3174 attributes: Attributes::default(),
3175 }
3176 }
3177}
3178
3179impl<I1: Send + 'static, O1: Send + 'static, I2: Send + 'static, O2: Send + 'static>
3180 BidiFlow<I1, O1, I2, O2>
3181{
3182 #[must_use]
3183 pub fn attributes(&self) -> &Attributes {
3184 &self.attributes
3185 }
3186
3187 #[must_use]
3188 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
3189 self.attributes = attributes;
3190 self
3191 }
3192
3193 #[must_use]
3194 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
3195 self.attributes = self.attributes.and(attributes);
3196 self
3197 }
3198
3199 #[must_use]
3200 pub fn named(self, name: impl Into<String>) -> Self {
3201 self.add_attributes(Attributes::named(name))
3202 }
3203
3204 #[must_use]
3205 pub fn join<Mat2>(self, flow: Flow<O1, I2, Mat2>) -> Flow<I1, O2, NotUsed>
3206 where
3207 Mat2: Send + 'static,
3208 {
3209 self.top
3210 .via(flow)
3211 .via(self.bottom)
3212 .map_materialized_value(|_| NotUsed)
3213 .with_attributes(self.attributes)
3214 }
3215
3216 #[must_use]
3217 pub fn atop<OO1: Send + 'static, II2: Send + 'static>(
3218 self,
3219 bidi: BidiFlow<O1, OO1, II2, I2>,
3220 ) -> BidiFlow<I1, OO1, II2, O2> {
3221 BidiFlow {
3222 top: self.top.via(bidi.top).map_materialized_value(|_| NotUsed),
3223 bottom: bidi
3224 .bottom
3225 .via(self.bottom)
3226 .map_materialized_value(|_| NotUsed),
3227 attributes: self.attributes.and(bidi.attributes),
3228 }
3229 }
3230
3231 #[must_use]
3232 pub fn reversed(self) -> BidiFlow<I2, O2, I1, O1> {
3233 BidiFlow {
3234 top: self.bottom,
3235 bottom: self.top.map_materialized_value(|_| NotUsed),
3236 attributes: self.attributes,
3237 }
3238 }
3239}
3240
3241impl<In, Out, Resource, Create, F, Close> Drop
3242 for MapWithResourceStream<In, Out, Resource, Create, F, Close>
3243where
3244 Create: Fn() -> StreamResult<Resource>,
3245 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3246 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3247{
3248 fn drop(&mut self) {
3249 let _ = self.close_resource();
3250 }
3251}
3252
3253fn materialize_inner_flow<In, Out, InnerMat>(
3254 flow: Flow<In, Out, InnerMat>,
3255 input: BoxStream<In>,
3256 materializer: &Materializer,
3257) -> StreamResult<(BoxStream<Out>, InnerMat)>
3258where
3259 In: Send + 'static,
3260 Out: Send + 'static,
3261 InnerMat: Send + 'static,
3262{
3263 let mat = (flow.materialize)()?;
3264 let stream = match flow.transform {
3265 FlowTransform::Pure(transform) => transform(input),
3266 FlowTransform::Runtime(transform) => transform(input, materializer)?,
3267 };
3268 Ok((stream, mat))
3269}
3270
3271struct FutureFlowStream<In, Out, InnerMat, F, Fut> {
3272 future: Arc<F>,
3273 materializer: Materializer,
3274 input: Option<BoxStream<In>>,
3275 current: Option<BoxStream<Out>>,
3276 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
3277 initialized: bool,
3278 terminated: bool,
3279 _marker: PhantomData<fn() -> Fut>,
3280}
3281
3282impl<In, Out, InnerMat, F, Fut> FutureFlowStream<In, Out, InnerMat, F, Fut>
3283where
3284 In: Send + 'static,
3285 Out: Send + 'static,
3286 InnerMat: Send + 'static,
3287 F: Fn() -> Fut,
3288 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3289{
3290 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
3291 if let Some(sender) = self.mat_sender.take() {
3292 let _ = sender.send(result);
3293 }
3294 }
3295
3296 fn initialize(&mut self) -> StreamResult<()> {
3297 if self.initialized {
3298 return Ok(());
3299 }
3300 self.initialized = true;
3301 let flow = match catch_unwind_failed("future_flow factory", || (self.future)())
3302 .and_then(run_future_inline_or_spawn)
3303 {
3304 Ok(flow) => flow,
3305 Err(error) => {
3306 self.complete_mat(Err(error.clone()));
3307 return Err(error);
3308 }
3309 };
3310 let input = self.input.take().expect("future_flow input available");
3311 match materialize_inner_flow(flow, input, &self.materializer) {
3312 Ok((stream, mat)) => {
3313 self.current = Some(stream);
3314 self.complete_mat(Ok(mat));
3315 Ok(())
3316 }
3317 Err(error) => {
3318 self.complete_mat(Err(error.clone()));
3319 Err(error)
3320 }
3321 }
3322 }
3323}
3324
3325impl<In, Out, InnerMat, F, Fut> Iterator for FutureFlowStream<In, Out, InnerMat, F, Fut>
3326where
3327 In: Send + 'static,
3328 Out: Send + 'static,
3329 InnerMat: Send + 'static,
3330 F: Fn() -> Fut,
3331 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3332{
3333 type Item = StreamResult<Out>;
3334
3335 fn next(&mut self) -> Option<Self::Item> {
3336 if self.terminated {
3337 return None;
3338 }
3339 if let Err(error) = self.initialize() {
3340 self.terminated = true;
3341 return Some(Err(error));
3342 }
3343 match self
3344 .current
3345 .as_mut()
3346 .expect("future_flow current stream initialized")
3347 .next()
3348 {
3349 Some(Ok(item)) => Some(Ok(item)),
3350 Some(Err(error)) => {
3351 self.terminated = true;
3352 Some(Err(error))
3353 }
3354 None => {
3355 self.terminated = true;
3356 None
3357 }
3358 }
3359 }
3360}
3361
3362impl<In, Out, InnerMat, F, Fut> Drop for FutureFlowStream<In, Out, InnerMat, F, Fut> {
3363 fn drop(&mut self) {
3364 if !self.initialized
3365 && let Some(sender) = self.mat_sender.take()
3366 {
3367 let _ = sender.send(Err(StreamError::Failed(
3368 "future flow was never materialized".into(),
3369 )));
3370 }
3371 }
3372}
3373
3374struct LazyFutureFlowStream<In, Out, InnerMat, F, Fut> {
3375 create: Arc<F>,
3376 materializer: Materializer,
3377 input: Option<BoxStream<In>>,
3378 current: Option<BoxStream<Out>>,
3379 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
3380 initialized: bool,
3381 terminated: bool,
3382 _marker: PhantomData<fn() -> Fut>,
3383}
3384
3385impl<In, Out, InnerMat, F, Fut> LazyFutureFlowStream<In, Out, InnerMat, F, Fut>
3386where
3387 In: Send + 'static,
3388 Out: Send + 'static,
3389 InnerMat: Send + 'static,
3390 F: Fn() -> Fut,
3391 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3392{
3393 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
3394 if let Some(sender) = self.mat_sender.take() {
3395 let _ = sender.send(result);
3396 }
3397 }
3398
3399 fn initialize(&mut self) -> Result<bool, StreamError> {
3400 if self.initialized {
3401 return Ok(true);
3402 }
3403 self.initialized = true;
3404 let first = match self
3405 .input
3406 .as_mut()
3407 .expect("lazy_future_flow input available")
3408 .next()
3409 {
3410 Some(Ok(item)) => item,
3411 Some(Err(error)) => {
3412 self.complete_mat(Err(error.clone()));
3413 return Err(error);
3414 }
3415 None => {
3416 self.complete_mat(Err(StreamError::Failed(
3417 "lazy flow was never materialized".into(),
3418 )));
3419 self.terminated = true;
3420 return Ok(false);
3421 }
3422 };
3423
3424 let flow = match catch_unwind_failed("lazy_future_flow factory", || (self.create)())
3425 .and_then(run_future_inline_or_spawn)
3426 {
3427 Ok(flow) => flow,
3428 Err(error) => {
3429 self.complete_mat(Err(error.clone()));
3430 return Err(error);
3431 }
3432 };
3433 let input = prepend_first_stream(
3434 first,
3435 self.input
3436 .take()
3437 .expect("lazy_future_flow input available after first element"),
3438 );
3439 match materialize_inner_flow(flow, input, &self.materializer) {
3440 Ok((stream, mat)) => {
3441 self.current = Some(stream);
3442 self.complete_mat(Ok(mat));
3443 Ok(true)
3444 }
3445 Err(error) => {
3446 self.complete_mat(Err(error.clone()));
3447 Err(error)
3448 }
3449 }
3450 }
3451}
3452
3453impl<In, Out, InnerMat, F, Fut> Iterator for LazyFutureFlowStream<In, Out, InnerMat, F, Fut>
3454where
3455 In: Send + 'static,
3456 Out: Send + 'static,
3457 InnerMat: Send + 'static,
3458 F: Fn() -> Fut,
3459 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3460{
3461 type Item = StreamResult<Out>;
3462
3463 fn next(&mut self) -> Option<Self::Item> {
3464 if self.terminated {
3465 return None;
3466 }
3467 match self.initialize() {
3468 Ok(true) => {}
3469 Ok(false) => return None,
3470 Err(error) => {
3471 self.terminated = true;
3472 return Some(Err(error));
3473 }
3474 }
3475 match self
3476 .current
3477 .as_mut()
3478 .expect("lazy_future_flow current stream initialized")
3479 .next()
3480 {
3481 Some(Ok(item)) => Some(Ok(item)),
3482 Some(Err(error)) => {
3483 self.terminated = true;
3484 Some(Err(error))
3485 }
3486 None => {
3487 self.terminated = true;
3488 None
3489 }
3490 }
3491 }
3492}
3493
3494impl<In, Out, InnerMat, F, Fut> Drop for LazyFutureFlowStream<In, Out, InnerMat, F, Fut> {
3495 fn drop(&mut self) {
3496 if !self.initialized
3497 && let Some(sender) = self.mat_sender.take()
3498 {
3499 let _ = sender.send(Err(StreamError::Failed(
3500 "lazy flow was never materialized".into(),
3501 )));
3502 }
3503 }
3504}
3505
3506fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
3507where
3508 In: Send + 'static,
3509{
3510 let mut first = Some(first);
3511 Box::new(std::iter::from_fn(move || {
3512 if let Some(item) = first.take() {
3513 Some(Ok(item))
3514 } else {
3515 rest.next()
3516 }
3517 }))
3518}
3519
3520pub(super) fn poll_once_or_pending<Fut, T>(future: Fut) -> Result<StreamResult<T>, Pin<Box<Fut>>>
3521where
3522 Fut: Future<Output = StreamResult<T>>,
3523{
3524 let mut future = Box::pin(future);
3525 let _guard = stream_tokio_runtime().enter();
3526 let waker = noop_waker();
3527 let mut cx = Context::from_waker(&waker);
3528 match catch_unwind(AssertUnwindSafe(|| future.as_mut().poll(&mut cx))) {
3529 Ok(Poll::Ready(output)) => Ok(output),
3530 Ok(Poll::Pending) => Err(future),
3531 Err(_) => Ok(Err(StreamError::Failed("future task panicked".into()))),
3532 }
3533}
3534
3535pub(super) fn spawn_completion_task<Fut, T, Msg, Map>(
3536 task_id: usize,
3537 future: Pin<Box<Fut>>,
3538 sender: std::sync::mpsc::Sender<(usize, Msg)>,
3539 map: Map,
3540) -> AbortOnDropHandle<()>
3541where
3542 Fut: Future<Output = StreamResult<T>> + Send + 'static,
3543 T: Send + 'static,
3544 Msg: Send + 'static,
3545 Map: FnOnce(StreamResult<T>) -> Msg + Send + 'static,
3546{
3547 spawn_tokio_task(async move {
3548 let result = AssertUnwindSafe(future).catch_unwind().await;
3549 let message = match result {
3550 Ok(output) => map(output),
3551 Err(_) => map(Err(StreamError::Failed("future task panicked".into()))),
3552 };
3553 let _ = sender.send((task_id, message));
3554 })
3555}
3556
3557pub(super) fn recv_completion<Msg>(
3558 receiver: &std::sync::mpsc::Receiver<(usize, Msg)>,
3559) -> Option<(usize, Msg)> {
3560 let mut idle_spins = 0;
3561 loop {
3562 match receiver.try_recv() {
3563 Ok(message) => return Some(message),
3564 Err(std::sync::mpsc::TryRecvError::Disconnected) => return None,
3565 Err(std::sync::mpsc::TryRecvError::Empty) if idle_spins < STREAM_READY_SPINS => {
3566 idle_spins += STREAM_SPIN_BACKOFF;
3567 for _ in 0..STREAM_SPIN_BACKOFF {
3568 std::hint::spin_loop();
3569 }
3570 }
3571 Err(std::sync::mpsc::TryRecvError::Empty) => {
3572 idle_spins = 0;
3573 match receiver.recv_timeout(STREAM_MAX_PARK) {
3574 Ok(message) => return Some(message),
3575 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
3576 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => return None,
3577 }
3578 }
3579 }
3580 }
3581}
3582
3583pub(super) fn run_future_inline_or_spawn<Fut, T>(future: Fut) -> StreamResult<T>
3584where
3585 Fut: Future<Output = StreamResult<T>> + Send + 'static,
3586 T: Send + 'static,
3587{
3588 match poll_once_or_pending(future) {
3589 Ok(result) => result,
3590 Err(future) => {
3591 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<T>)>();
3592 let _task = spawn_completion_task(0, future, sender, |result| result);
3593 recv_completion(&receiver)
3594 .map(|(_, result)| result)
3595 .unwrap_or_else(|| Err(StreamError::Failed("future task dropped".into())))
3596 }
3597 }
3598}
3599
3600fn map_async_ordered<Out, Next, F, Fut>(
3601 mut input: BoxStream<Out>,
3602 parallelism: usize,
3603 stage: Arc<F>,
3604) -> BoxStream<Next>
3605where
3606 Out: Send + 'static,
3607 Next: Send + 'static,
3608 F: Fn(Out) -> Fut + Send + Sync + 'static,
3609 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
3610{
3611 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
3612 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
3613 let mut next_index = 0_usize;
3614 let mut next_to_emit = 0_usize;
3615 let mut completed = BTreeMap::new();
3616 let mut input_done = false;
3617
3618 Box::new(std::iter::from_fn(move || {
3619 loop {
3620 if let Some(result) = completed.remove(&next_to_emit) {
3621 next_to_emit += 1;
3622 return Some(result);
3623 }
3624
3625 while tasks.len() + completed.len() < parallelism && !input_done {
3626 match input.next() {
3627 Some(Ok(item)) => {
3628 let index = next_index;
3629 next_index += 1;
3630 match poll_once_or_pending(stage(item)) {
3631 Ok(result) => {
3632 if index == next_to_emit {
3633 next_to_emit += 1;
3634 return Some(result);
3635 }
3636 completed.insert(index, result);
3637 }
3638 Err(future) => {
3639 tasks.insert(
3640 index,
3641 spawn_completion_task(
3642 index,
3643 future,
3644 sender.clone(),
3645 |result| result,
3646 ),
3647 );
3648 }
3649 }
3650 }
3651 Some(Err(error)) => {
3652 completed.insert(next_index, Err(error));
3653 next_index += 1;
3654 input_done = true;
3655 }
3656 None => input_done = true,
3657 }
3658 }
3659
3660 if let Some(result) = completed.remove(&next_to_emit) {
3661 next_to_emit += 1;
3662 return Some(result);
3663 }
3664
3665 if tasks.is_empty() {
3666 return None;
3667 }
3668
3669 if let Some((index, result)) = recv_completion(&receiver) {
3670 tasks.remove(&index);
3671 if index == next_to_emit {
3672 next_to_emit += 1;
3673 return Some(result);
3674 }
3675 completed.insert(index, result);
3676 }
3677 }
3678 }))
3679}
3680
3681fn supervise_async_result<Next>(
3682 result: StreamResult<Next>,
3683 decider: &SupervisionDecider,
3684) -> Option<StreamResult<Next>> {
3685 match result {
3686 Ok(item) => Some(Ok(item)),
3687 Err(error) => match decide_supervision(decider, &error) {
3688 SupervisionDirective::Stop => Some(Err(error)),
3689 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
3690 },
3691 }
3692}
3693
3694fn map_async_ordered_supervised<Out, Next, F, Fut>(
3695 mut input: BoxStream<Out>,
3696 parallelism: usize,
3697 stage: Arc<F>,
3698 decider: SupervisionDecider,
3699) -> BoxStream<Next>
3700where
3701 Out: Send + 'static,
3702 Next: Send + 'static,
3703 F: Fn(Out) -> Fut + Send + Sync + 'static,
3704 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
3705{
3706 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
3707 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
3708 let mut next_index = 0_usize;
3709 let mut next_to_emit = 0_usize;
3710 let mut completed = BTreeMap::<usize, Option<StreamResult<Next>>>::new();
3711 let mut input_done = false;
3712
3713 Box::new(std::iter::from_fn(move || {
3714 loop {
3715 while let Some(result) = completed.remove(&next_to_emit) {
3716 next_to_emit += 1;
3717 if let Some(result) = result {
3718 return Some(result);
3719 }
3720 }
3721
3722 while tasks.len() + completed.len() < parallelism && !input_done {
3723 match input.next() {
3724 Some(Ok(item)) => {
3725 let index = next_index;
3726 next_index += 1;
3727 match catch_unwind(AssertUnwindSafe(|| poll_once_or_pending(stage(item)))) {
3728 Ok(Ok(result)) => {
3729 let result = supervise_async_result(result, &decider);
3730 if index == next_to_emit {
3731 next_to_emit += 1;
3732 if let Some(result) = result {
3733 return Some(result);
3734 }
3735 } else {
3736 completed.insert(index, result);
3737 }
3738 }
3739 Ok(Err(future)) => {
3740 tasks.insert(
3741 index,
3742 spawn_completion_task(
3743 index,
3744 future,
3745 sender.clone(),
3746 |result| result,
3747 ),
3748 );
3749 }
3750 Err(_) => {
3751 let error = panic_stream_error("map_async callback");
3752 let result = supervise_async_result(Err(error), &decider);
3753 if index == next_to_emit {
3754 next_to_emit += 1;
3755 if let Some(result) = result {
3756 return Some(result);
3757 }
3758 } else {
3759 completed.insert(index, result);
3760 }
3761 }
3762 }
3763 }
3764 Some(Err(error)) => {
3765 completed.insert(next_index, Some(Err(error)));
3766 next_index += 1;
3767 input_done = true;
3768 }
3769 None => input_done = true,
3770 }
3771 }
3772
3773 while let Some(result) = completed.remove(&next_to_emit) {
3774 next_to_emit += 1;
3775 if let Some(result) = result {
3776 return Some(result);
3777 }
3778 }
3779
3780 if tasks.is_empty() {
3781 return None;
3782 }
3783
3784 if let Some((index, result)) = recv_completion(&receiver) {
3785 tasks.remove(&index);
3786 let result = supervise_async_result(result, &decider);
3787 if index == next_to_emit {
3788 next_to_emit += 1;
3789 if let Some(result) = result {
3790 return Some(result);
3791 }
3792 } else {
3793 completed.insert(index, result);
3794 }
3795 }
3796 }
3797 }))
3798}
3799
3800fn concat_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
3801where
3802 Out: Send + 'static,
3803{
3804 let mut streams: VecDeque<_> = streams.into();
3805 let mut current = streams.pop_front();
3806 Box::new(std::iter::from_fn(move || {
3807 loop {
3808 match current.as_mut() {
3809 Some(stream) => match stream.next() {
3810 Some(item) => return Some(item),
3811 None => current = streams.pop_front(),
3812 },
3813 None => return None,
3814 }
3815 }
3816 }))
3817}
3818
3819fn concat_streams_lazy<Out, Mat>(
3820 initial: BoxStream<Out>,
3821 factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
3822 materializer: &Materializer,
3823) -> BoxStream<Out>
3824where
3825 Out: Send + 'static,
3826 Mat: Send + 'static,
3827{
3828 let mut current = Some(initial);
3829 let mut remaining: VecDeque<_> = factories.into();
3830 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
3831 Box::new(std::iter::from_fn(move || {
3832 loop {
3833 match current.as_mut() {
3834 Some(stream) => match stream.next() {
3835 Some(item) => return Some(item),
3836 None => {
3837 current = remaining.pop_front().map(|factory| {
3838 match factory.create(&materializer) {
3839 Ok((stream, _)) => stream,
3840 Err(error) => {
3841 Box::new(std::iter::once(Err(error))) as BoxStream<Out>
3842 }
3843 }
3844 });
3845 }
3846 },
3847 None => return None,
3848 }
3849 }
3850 }))
3851}
3852
3853fn or_else_stream<Out>(mut primary: BoxStream<Out>, mut secondary: BoxStream<Out>) -> BoxStream<Out>
3854where
3855 Out: Send + 'static,
3856{
3857 let mut primary_emitted = false;
3858 let mut using_secondary = false;
3859 Box::new(std::iter::from_fn(move || {
3860 loop {
3861 if using_secondary {
3862 return secondary.next();
3863 }
3864
3865 match primary.next() {
3866 Some(Ok(item)) => {
3867 primary_emitted = true;
3868 return Some(Ok(item));
3869 }
3870 Some(Err(error)) => return Some(Err(error)),
3871 None if primary_emitted => return None,
3872 None => using_secondary = true,
3873 }
3874 }
3875 }))
3876}
3877
3878fn interleave_streams<Out>(
3879 streams: Vec<BoxStream<Out>>,
3880 segment_size: usize,
3881 eager_close: bool,
3882) -> BoxStream<Out>
3883where
3884 Out: Send + 'static,
3885{
3886 if segment_size == 0 {
3887 return Box::new(std::iter::once(Err(StreamError::GraphValidation(
3888 "interleave segment size must be greater than zero".into(),
3889 ))));
3890 }
3891
3892 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
3893 let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
3894 let mut current = 0usize;
3895 let mut emitted = 0usize;
3896 Box::new(std::iter::from_fn(move || {
3897 loop {
3898 if streams.iter().all(Option::is_none) {
3899 return None;
3900 }
3901 if streams[current].is_none() {
3902 match next_active_stream(&streams, current) {
3903 Some(next) => {
3904 current = next;
3905 emitted = 0;
3906 }
3907 None => return None,
3908 }
3909 }
3910
3911 let Some(stream) = streams[current].as_mut() else {
3912 continue;
3913 };
3914 let next_item = pending[current].take().or_else(|| stream.next());
3915 match next_item {
3916 Some(Ok(item)) => {
3917 emitted += 1;
3918 if emitted == segment_size {
3919 emitted = 0;
3920 if let Some(next) = next_active_stream(&streams, current) {
3921 current = next;
3922 }
3923 }
3924 return Some(Ok(item));
3925 }
3926 Some(Err(error)) => return Some(Err(error)),
3927 None => {
3928 streams[current] = None;
3929 emitted = 0;
3930 if eager_close {
3931 return None;
3932 }
3933 match next_active_stream(&streams, current) {
3934 Some(next) => current = next,
3935 None => return None,
3936 }
3937 }
3938 }
3939 }
3940 }))
3941}
3942
3943fn next_active_stream<Out>(streams: &[Option<BoxStream<Out>>], current: usize) -> Option<usize>
3944where
3945 Out: Send + 'static,
3946{
3947 if streams.is_empty() {
3948 return None;
3949 }
3950 for offset in 1..=streams.len() {
3951 let index = (current + offset) % streams.len();
3952 if streams[index].is_some() {
3953 return Some(index);
3954 }
3955 }
3956 None
3957}
3958
3959fn materialize_side_sink<Out, Mat>(
3960 sink: &Sink<Out, Mat>,
3961 materializer: &Materializer,
3962 buffer: usize,
3963) -> StreamResult<(std::sync::mpsc::SyncSender<StreamResult<Out>>, Mat)>
3964where
3965 Out: Send + 'static,
3966 Mat: Send + 'static,
3967{
3968 let (sender, receiver) = std::sync::mpsc::sync_channel(buffer);
3969 let mat = sink.run(side_receiver_stream(receiver), materializer)?;
3970 Ok((sender, mat))
3971}
3972
3973fn side_receiver_stream<Out>(
3974 receiver: std::sync::mpsc::Receiver<StreamResult<Out>>,
3975) -> BoxStream<Out>
3976where
3977 Out: Send + 'static,
3978{
3979 Box::new(std::iter::from_fn(move || receiver.recv().ok()))
3980}
3981
3982#[derive(Clone)]
3983enum LiveSubstreamTerminal {
3984 Complete,
3985 Error(StreamError),
3986}
3987
3988const LIVE_SUBSTREAM_CAPACITY: usize = 256;
3989const LIVE_SUBSTREAM_BATCH: usize = 64;
3990const FLAT_MAP_MERGE_SUBSTREAM_WINDOW: usize = 64;
3991
3992struct LiveSubstreamShared<T> {
3993 state: Mutex<LiveSubstreamState<T>>,
3994 available: Condvar,
3995 cancelled: Arc<AtomicBool>,
3996 capacity: usize,
3997 batch_size: usize,
3998}
3999
4000struct LiveSubstreamState<T> {
4001 buffered: usize,
4002 batches: VecDeque<VecDeque<T>>,
4003 terminal: Option<LiveSubstreamTerminal>,
4004}
4005
4006impl<T> LiveSubstreamShared<T> {
4007 fn new() -> Arc<Self> {
4008 Self::with_capacity(LIVE_SUBSTREAM_CAPACITY)
4009 }
4010
4011 fn with_capacity(capacity: usize) -> Arc<Self> {
4012 Self::with_batching(capacity, LIVE_SUBSTREAM_BATCH)
4013 }
4014
4015 fn with_batching(capacity: usize, batch_size: usize) -> Arc<Self> {
4016 Arc::new(Self {
4017 state: Mutex::new(LiveSubstreamState {
4018 buffered: 0,
4019 batches: VecDeque::new(),
4020 terminal: None,
4021 }),
4022 available: Condvar::new(),
4023 cancelled: Arc::new(AtomicBool::new(false)),
4024 capacity,
4025 batch_size: batch_size.max(1),
4026 })
4027 }
4028}
4029
4030struct LiveSubstreamStream<T> {
4031 shared: Arc<LiveSubstreamShared<T>>,
4032 completion: Option<StreamCompletion<NotUsed>>,
4033 local_batch: VecDeque<T>,
4034}
4035
4036impl<T> Iterator for LiveSubstreamStream<T> {
4037 type Item = StreamResult<T>;
4038
4039 fn next(&mut self) -> Option<Self::Item> {
4040 if let Some(item) = self.local_batch.pop_front() {
4041 return Some(Ok(item));
4042 }
4043
4044 let mut state = self
4045 .shared
4046 .state
4047 .lock()
4048 .unwrap_or_else(|poison| poison.into_inner());
4049 loop {
4050 if let Some(mut batch) = state.batches.pop_front() {
4051 state.buffered -= batch.len();
4052 drop(state);
4053 self.shared.available.notify_all();
4054 let item = batch.pop_front().expect("live substream batch has an item");
4055 self.local_batch = batch;
4056 return Some(Ok(item));
4057 }
4058 if let Some(terminal) = state.terminal.clone() {
4059 return match terminal {
4060 LiveSubstreamTerminal::Complete => None,
4061 LiveSubstreamTerminal::Error(error) => Some(Err(error)),
4062 };
4063 }
4064 state = self
4065 .shared
4066 .available
4067 .wait(state)
4068 .unwrap_or_else(|poison| poison.into_inner());
4069 }
4070 }
4071}
4072
4073impl<T> Drop for LiveSubstreamStream<T> {
4074 fn drop(&mut self) {
4075 self.shared.cancelled.store(true, Ordering::SeqCst);
4076 self.shared.available.notify_all();
4077 let _ = self.completion.take();
4078 }
4079}
4080
4081fn push_live_substream_batch<T>(
4085 shared: &Arc<LiveSubstreamShared<T>>,
4086 batch: &mut VecDeque<T>,
4087) -> Result<(), ()> {
4088 while !batch.is_empty() {
4089 let mut state = shared
4090 .state
4091 .lock()
4092 .unwrap_or_else(|poison| poison.into_inner());
4093 while state.buffered >= shared.capacity && state.terminal.is_none() {
4094 if shared.cancelled.load(Ordering::SeqCst) {
4095 batch.clear();
4096 return Err(());
4097 }
4098 state = shared
4099 .available
4100 .wait(state)
4101 .unwrap_or_else(|poison| poison.into_inner());
4102 }
4103 if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
4104 batch.clear();
4105 return Err(());
4106 }
4107 let was_empty = state.buffered == 0;
4108 while state.buffered < shared.capacity && !batch.is_empty() {
4110 let item = batch.pop_front().expect("batch non-empty");
4111 if let Some(back) = state.batches.back_mut()
4112 && back.len() < shared.batch_size
4113 {
4114 back.push_back(item);
4115 } else {
4116 let mut new_batch =
4117 VecDeque::with_capacity(shared.batch_size.min(shared.capacity.max(1)));
4118 new_batch.push_back(item);
4119 state.batches.push_back(new_batch);
4120 }
4121 state.buffered += 1;
4122 }
4123 drop(state);
4124 if was_empty {
4125 shared.available.notify_all();
4126 }
4127 }
4128 Ok(())
4129}
4130
4131fn push_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>, item: T) -> Result<(), T> {
4132 let mut state = shared
4133 .state
4134 .lock()
4135 .unwrap_or_else(|poison| poison.into_inner());
4136 while state.buffered >= shared.capacity && state.terminal.is_none() {
4137 if shared.cancelled.load(Ordering::SeqCst) {
4138 return Err(item);
4139 }
4140 state = shared
4141 .available
4142 .wait(state)
4143 .unwrap_or_else(|poison| poison.into_inner());
4144 }
4145 if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
4146 return Err(item);
4147 }
4148 let was_empty = state.buffered == 0;
4149 if let Some(batch) = state.batches.back_mut()
4150 && batch.len() < shared.batch_size
4151 {
4152 batch.push_back(item);
4153 } else {
4154 let mut batch = VecDeque::with_capacity(shared.batch_size.min(shared.capacity.max(1)));
4155 batch.push_back(item);
4156 state.batches.push_back(batch);
4157 }
4158 state.buffered += 1;
4159 drop(state);
4160 if was_empty {
4161 shared.available.notify_all();
4162 }
4163 Ok(())
4164}
4165
4166fn complete_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>) {
4167 let mut state = shared
4168 .state
4169 .lock()
4170 .unwrap_or_else(|poison| poison.into_inner());
4171 if state.terminal.is_none() {
4172 state.terminal = Some(LiveSubstreamTerminal::Complete);
4173 }
4174 drop(state);
4175 shared.available.notify_all();
4176}
4177
4178fn fail_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>, error: StreamError) {
4179 let mut state = shared
4180 .state
4181 .lock()
4182 .unwrap_or_else(|poison| poison.into_inner());
4183 if state.terminal.is_none() {
4184 state.terminal = Some(LiveSubstreamTerminal::Error(error));
4185 }
4186 drop(state);
4187 shared.available.notify_all();
4188}
4189
4190fn cancel_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>) {
4191 fail_live_substream(shared, StreamError::Cancelled);
4192}
4193
4194fn source_from_live_substream<T>(shared: Arc<LiveSubstreamShared<T>>) -> Source<T>
4195where
4196 T: Send + 'static,
4197{
4198 let claimed = Arc::new(AtomicBool::new(false));
4199 Source::from_materialized_factory(move |_materializer| {
4200 if claimed.swap(true, Ordering::SeqCst) {
4201 return Err(StreamError::Failed(
4202 "substream source cannot be materialized more than once".into(),
4203 ));
4204 }
4205 Ok((
4206 Box::new(LiveSubstreamStream {
4207 shared: Arc::clone(&shared),
4208 completion: None,
4209 local_batch: VecDeque::new(),
4210 }) as BoxStream<T>,
4211 NotUsed,
4212 ))
4213 })
4214}
4215
4216fn source_from_once_stream<T>(stream: BoxStream<T>) -> Source<T>
4217where
4218 T: Send + 'static,
4219{
4220 let stream = Arc::new(Mutex::new(Some(stream)));
4221 Source::from_materialized_factory(move |_materializer| {
4222 let mut slot = stream.lock().unwrap_or_else(|poison| poison.into_inner());
4223 let stream = slot.take().ok_or_else(|| {
4224 StreamError::Failed("substream source cannot be materialized more than once".into())
4225 })?;
4226 Ok((stream, NotUsed))
4227 })
4228}
4229
4230fn prefix_and_tail_stream<Out>(
4231 input: BoxStream<Out>,
4232 n: usize,
4233) -> BoxStream<(Vec<Out>, Source<Out>)>
4234where
4235 Out: Send + 'static,
4236{
4237 let mut input = Some(input);
4238 let mut emitted = false;
4239 Box::new(std::iter::from_fn(move || {
4240 if emitted {
4241 return None;
4242 }
4243 emitted = true;
4244
4245 let mut prefix = Vec::with_capacity(n);
4246 while prefix.len() < n {
4247 match input
4248 .as_mut()
4249 .expect("prefix_and_tail input available")
4250 .next()
4251 {
4252 Some(Ok(item)) => prefix.push(item),
4253 Some(Err(error)) => return Some(Err(error)),
4254 None => return Some(Ok((prefix, Source::empty()))),
4255 }
4256 }
4257
4258 Some(Ok((
4259 prefix,
4260 source_from_once_stream(input.take().expect("tail input available")),
4261 )))
4262 }))
4263}
4264
4265struct GroupByWorkerGuard<Key, Out> {
4266 outer: Arc<LiveSubstreamShared<Source<Out>>>,
4267 active: HashMap<Key, Arc<LiveSubstreamShared<Out>>>,
4268 closed: HashSet<Key>,
4269 armed: bool,
4270}
4271
4272impl<Key, Out> GroupByWorkerGuard<Key, Out>
4273where
4274 Key: Eq + Hash,
4275{
4276 fn new(outer: Arc<LiveSubstreamShared<Source<Out>>>) -> Self {
4277 Self {
4278 outer,
4279 active: HashMap::new(),
4280 closed: HashSet::new(),
4281 armed: true,
4282 }
4283 }
4284
4285 fn disarm(&mut self) {
4286 self.armed = false;
4287 }
4288
4289 fn fail_all(&self, error: StreamError)
4290 where
4291 Out: Send + 'static,
4292 {
4293 fail_live_substream(&self.outer, error.clone());
4294 for substream in self.active.values() {
4295 fail_live_substream(substream, error.clone());
4296 }
4297 }
4298
4299 fn complete_all(&self)
4300 where
4301 Out: Send + 'static,
4302 {
4303 complete_live_substream(&self.outer);
4304 for substream in self.active.values() {
4305 complete_live_substream(substream);
4306 }
4307 }
4308
4309 fn cancel_all(&self)
4310 where
4311 Out: Send + 'static,
4312 {
4313 for substream in self.active.values() {
4314 cancel_live_substream(substream);
4315 }
4316 }
4317}
4318
4319impl<Key, Out> Drop for GroupByWorkerGuard<Key, Out> {
4320 fn drop(&mut self) {
4321 if self.armed {
4322 fail_live_substream(&self.outer, StreamError::AbruptTermination);
4323 for substream in self.active.values() {
4324 fail_live_substream(substream, StreamError::AbruptTermination);
4325 }
4326 }
4327 }
4328}
4329
4330fn group_by_flush_write_batch<Key, Out>(
4331 guard: &mut GroupByWorkerGuard<Key, Out>,
4332 wb_key: &mut Option<Key>,
4333 wb_sub: &mut Option<Arc<LiveSubstreamShared<Out>>>,
4334 wb_items: &mut VecDeque<Out>,
4335 allow_closed_substream_recreation: bool,
4336) where
4337 Key: Clone + Eq + Hash,
4338 Out: Send + 'static,
4339{
4340 if wb_items.is_empty() {
4341 return;
4342 }
4343 let key = wb_key.take().expect("wb_key set when wb_items non-empty");
4344 if let Some(ref sub) = *wb_sub {
4345 if push_live_substream_batch(sub, wb_items).is_err() {
4346 guard.active.remove(&key);
4347 if !allow_closed_substream_recreation {
4348 guard.closed.insert(key);
4349 }
4350 }
4351 } else {
4352 wb_items.clear();
4353 }
4354 *wb_sub = None;
4355}
4356
4357fn group_by_stream<Out, Key, F>(
4358 mut input: BoxStream<Out>,
4359 max_substreams: usize,
4360 allow_closed_substream_recreation: bool,
4361 key_fn: Arc<F>,
4362 materializer: &Materializer,
4363) -> BoxStream<Source<Out>>
4364where
4365 Out: Clone + Send + 'static,
4366 Key: Clone + Eq + Hash + Send + 'static,
4367 F: Fn(&Out) -> Key + Send + Sync + 'static,
4368{
4369 let outer = LiveSubstreamShared::new();
4370 let worker_outer = Arc::clone(&outer);
4371 let completion = materializer.spawn_stream(move |cancelled| {
4372 let mut guard = GroupByWorkerGuard::new(worker_outer);
4373
4374 let mut wb_key: Option<Key> = None;
4379 let mut wb_sub: Option<Arc<LiveSubstreamShared<Out>>> = None;
4380 let mut wb_items: VecDeque<Out> = VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH);
4381
4382 while !cancelled.load(Ordering::SeqCst) {
4383 if guard.outer.cancelled.load(Ordering::SeqCst) {
4384 guard.disarm();
4385 return Ok(NotUsed);
4386 }
4387
4388 match input.next() {
4389 Some(Ok(item)) => {
4390 let key = match catch_unwind(AssertUnwindSafe(|| key_fn(&item))) {
4391 Ok(key) => key,
4392 Err(_panic) => {
4393 wb_items.clear();
4394 guard.fail_all(StreamError::AbruptTermination);
4395 guard.disarm();
4396 return Ok(NotUsed);
4397 }
4398 };
4399
4400 if let Some(current) = guard.active.get(&key)
4404 && current.cancelled.load(Ordering::SeqCst)
4405 {
4406 if wb_key.as_ref() == Some(&key) {
4409 wb_items.clear();
4410 wb_key = None;
4411 wb_sub = None;
4412 }
4413 guard.active.remove(&key);
4414 if !allow_closed_substream_recreation {
4415 guard.closed.insert(key.clone());
4416 }
4417 }
4418
4419 if let Some(current) = guard.active.get(&key).cloned() {
4420 if wb_key.as_ref() != Some(&key) {
4423 group_by_flush_write_batch(
4426 &mut guard,
4427 &mut wb_key,
4428 &mut wb_sub,
4429 &mut wb_items,
4430 allow_closed_substream_recreation,
4431 );
4432 wb_key = Some(key.clone());
4433 wb_sub = Some(current);
4434 }
4435 wb_items.push_back(item);
4436 if wb_items.len() >= LIVE_SUBSTREAM_BATCH {
4437 group_by_flush_write_batch(
4438 &mut guard,
4439 &mut wb_key,
4440 &mut wb_sub,
4441 &mut wb_items,
4442 allow_closed_substream_recreation,
4443 );
4444 }
4445 continue;
4446 }
4447
4448 if !wb_items.is_empty() {
4451 group_by_flush_write_batch(
4452 &mut guard,
4453 &mut wb_key,
4454 &mut wb_sub,
4455 &mut wb_items,
4456 allow_closed_substream_recreation,
4457 );
4458 }
4459
4460 if guard.closed.contains(&key) {
4461 continue;
4462 }
4463
4464 if guard.active.len() + guard.closed.len() == max_substreams {
4465 let error = StreamError::Failed(format!(
4466 "group_by reached max_substreams ({max_substreams})"
4467 ));
4468 guard.fail_all(error.clone());
4469 guard.disarm();
4470 return Err(error);
4471 }
4472
4473 let substream = LiveSubstreamShared::with_capacity(LIVE_SUBSTREAM_CAPACITY);
4474 push_live_substream(&substream, item)
4475 .unwrap_or_else(|_| unreachable!("fresh group_by substream"));
4476 guard.active.insert(key.clone(), Arc::clone(&substream));
4477 if push_live_substream(
4478 &guard.outer,
4479 source_from_live_substream(Arc::clone(&substream)),
4480 )
4481 .is_err()
4482 {
4483 guard.cancel_all();
4484 cancel_live_substream(&substream);
4485 guard.disarm();
4486 return Ok(NotUsed);
4487 }
4488 }
4489 Some(Err(error)) => {
4490 wb_items.clear();
4491 guard.fail_all(error.clone());
4492 guard.disarm();
4493 return Err(error);
4494 }
4495 None => {
4496 group_by_flush_write_batch(
4497 &mut guard,
4498 &mut wb_key,
4499 &mut wb_sub,
4500 &mut wb_items,
4501 allow_closed_substream_recreation,
4502 );
4503 guard.complete_all();
4504 guard.disarm();
4505 return Ok(NotUsed);
4506 }
4507 }
4508 }
4509
4510 group_by_flush_write_batch(
4511 &mut guard,
4512 &mut wb_key,
4513 &mut wb_sub,
4514 &mut wb_items,
4515 allow_closed_substream_recreation,
4516 );
4517 guard.complete_all();
4518 guard.disarm();
4519 Ok(NotUsed)
4520 });
4521
4522 Box::new(LiveSubstreamStream {
4523 shared: outer,
4524 completion: Some(completion),
4525 local_batch: VecDeque::new(),
4526 })
4527}
4528
4529#[derive(Clone, Copy, Debug)]
4530enum SplitMode {
4531 When,
4532 After,
4533}
4534
4535#[cfg(test)]
4536struct SplitWorkerGuard<Out> {
4537 outer: Arc<LiveSubstreamShared<Source<Out>>>,
4538 current: Option<Arc<LiveSubstreamShared<Out>>>,
4539 armed: bool,
4540 pending: VecDeque<Out>,
4544}
4545
4546#[cfg(test)]
4547impl<Out> SplitWorkerGuard<Out> {
4548 fn new(outer: Arc<LiveSubstreamShared<Source<Out>>>) -> Self {
4549 Self {
4550 outer,
4551 current: None,
4552 armed: true,
4553 pending: VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH),
4554 }
4555 }
4556
4557 fn disarm(&mut self) {
4558 self.armed = false;
4559 }
4560
4561 fn open_segment(&mut self) -> Result<(), ()>
4564 where
4565 Out: Send + 'static,
4566 {
4567 let substream = LiveSubstreamShared::new();
4568 self.current = Some(Arc::clone(&substream));
4569 push_live_substream(&self.outer, source_from_live_substream(substream)).map_err(|_| ())
4570 }
4571
4572 fn flush_pending(&mut self) -> Result<(), ()>
4575 where
4576 Out: Send + 'static,
4577 {
4578 if self.pending.is_empty() {
4579 return Ok(());
4580 }
4581 match self.current {
4582 Some(ref current) => push_live_substream_batch(current, &mut self.pending),
4583 None => {
4584 self.pending.clear();
4585 Ok(())
4586 }
4587 }
4588 }
4589
4590 fn push_item(&mut self, item: Out) -> Result<(), ()>
4594 where
4595 Out: Send + 'static,
4596 {
4597 if self.current.is_none() {
4598 return Ok(());
4600 }
4601 self.pending.push_back(item);
4602 if self.pending.len() >= LIVE_SUBSTREAM_BATCH {
4603 self.flush_pending()
4604 } else {
4605 Ok(())
4606 }
4607 }
4608
4609 fn close_segment(&mut self)
4612 where
4613 Out: Send + 'static,
4614 {
4615 let _ = self.flush_pending();
4616 if let Some(current) = self.current.take() {
4617 complete_live_substream(¤t);
4618 }
4619 }
4620
4621 fn fail_current(&mut self, error: StreamError)
4622 where
4623 Out: Send + 'static,
4624 {
4625 self.pending.clear();
4626 if let Some(current) = self.current.take() {
4627 fail_live_substream(¤t, error);
4628 }
4629 }
4630
4631 fn fail_all(&mut self, error: StreamError)
4632 where
4633 Out: Send + 'static,
4634 {
4635 self.fail_current(error.clone());
4636 fail_live_substream(&self.outer, error);
4637 }
4638
4639 fn complete_all(&mut self)
4640 where
4641 Out: Send + 'static,
4642 {
4643 self.close_segment();
4644 complete_live_substream(&self.outer);
4645 }
4646}
4647
4648#[cfg(test)]
4649impl<Out> Drop for SplitWorkerGuard<Out> {
4650 fn drop(&mut self) {
4651 if self.armed {
4652 self.pending.clear();
4653 if let Some(current) = self.current.take() {
4654 fail_live_substream(¤t, StreamError::AbruptTermination);
4655 }
4656 fail_live_substream(&self.outer, StreamError::AbruptTermination);
4657 }
4658 }
4659}
4660
4661fn split_streams<Out, F>(
4662 input: BoxStream<Out>,
4663 mode: SplitMode,
4664 predicate: Arc<F>,
4665 materializer: &Materializer,
4666) -> BoxStream<Source<Out>>
4667where
4668 Out: Clone + Send + 'static,
4669 F: Fn(&Out) -> bool + Send + Sync + 'static,
4670{
4671 #[cfg(test)]
4672 if current_substream_mode() == SubstreamExecutorMode::LegacyOnly {
4673 return split_streams_legacy(input, mode, predicate, materializer);
4674 }
4675 let parent_cancelled = Arc::new(AtomicBool::new(false));
4676 split_streams_fast(input, mode, predicate, parent_cancelled, materializer)
4677}
4678
4679#[cfg(test)]
4680fn split_streams_legacy<Out, F>(
4681 mut input: BoxStream<Out>,
4682 mode: SplitMode,
4683 predicate: Arc<F>,
4684 materializer: &Materializer,
4685) -> BoxStream<Source<Out>>
4686where
4687 Out: Clone + Send + 'static,
4688 F: Fn(&Out) -> bool + Send + Sync + 'static,
4689{
4690 let outer = LiveSubstreamShared::new();
4691 let worker_outer = Arc::clone(&outer);
4692 let completion = materializer.spawn_stream(move |cancelled| {
4693 let mut guard = SplitWorkerGuard::new(Arc::clone(&worker_outer));
4694
4695 while !cancelled.load(Ordering::SeqCst) {
4696 if worker_outer.cancelled.load(Ordering::SeqCst) {
4697 guard.disarm();
4698 return Ok(NotUsed);
4699 }
4700
4701 match input.next() {
4702 Some(Ok(item)) => {
4703 let split = match catch_unwind(AssertUnwindSafe(|| predicate(&item))) {
4704 Ok(split) => split,
4705 Err(_panic) => {
4706 guard.fail_all(StreamError::AbruptTermination);
4707 guard.disarm();
4708 return Ok(NotUsed);
4709 }
4710 };
4711
4712 match mode {
4713 SplitMode::When => {
4714 if split && guard.current.is_some() {
4715 guard.close_segment();
4717 }
4718 if guard.current.is_none() && guard.open_segment().is_err() {
4720 guard.disarm();
4721 return Ok(NotUsed);
4722 }
4723 if guard.push_item(item).is_err() {
4724 guard.disarm();
4725 return Ok(NotUsed);
4726 }
4727 }
4728 SplitMode::After => {
4729 if guard.current.is_none() && guard.open_segment().is_err() {
4730 guard.disarm();
4731 return Ok(NotUsed);
4732 }
4733 if guard.push_item(item).is_err() {
4734 guard.disarm();
4735 return Ok(NotUsed);
4736 }
4737 if split {
4738 guard.close_segment();
4739 }
4740 }
4741 }
4742 }
4743 Some(Err(error)) => {
4744 guard.fail_all(error.clone());
4745 guard.disarm();
4746 return Err(error);
4747 }
4748 None => {
4749 guard.complete_all();
4750 guard.disarm();
4751 return Ok(NotUsed);
4752 }
4753 }
4754 }
4755
4756 guard.complete_all();
4757 guard.disarm();
4758 Ok(NotUsed)
4759 });
4760
4761 Box::new(LiveSubstreamStream {
4762 shared: outer,
4763 completion: Some(completion),
4764 local_batch: VecDeque::new(),
4765 })
4766}
4767
4768trait SplitConsumer<T: Send + 'static>: Send + 'static {
4773 fn push_item(&mut self, item: T) -> StreamResult<()>;
4774 fn complete(self: Box<Self>);
4775 fn fail(self: Box<Self>, error: StreamError);
4776}
4777
4778struct FoldConsumer<T, Acc> {
4779 acc: Option<Acc>,
4780 f: Arc<dyn Fn(Acc, T) -> Acc + Send + Sync + 'static>,
4781 tx: futures::channel::oneshot::Sender<StreamResult<Acc>>,
4782}
4783
4784impl<T: Send + 'static, Acc: Send + 'static> SplitConsumer<T> for FoldConsumer<T, Acc> {
4785 fn push_item(&mut self, item: T) -> StreamResult<()> {
4786 let acc = self.acc.take().expect("FoldConsumer: push after done");
4787 self.acc = Some((self.f)(acc, item));
4788 Ok(())
4789 }
4790 fn complete(mut self: Box<Self>) {
4791 let acc = self
4792 .acc
4793 .take()
4794 .expect("FoldConsumer: complete called twice");
4795 let _ = self.tx.send(Ok(acc));
4796 }
4797 fn fail(mut self: Box<Self>, error: StreamError) {
4798 self.acc = None;
4799 let _ = self.tx.send(Err(error));
4800 }
4801}
4802
4803struct FoldResultConsumer<T, Acc> {
4804 acc: Option<Acc>,
4805 f: Arc<dyn Fn(Acc, T) -> StreamResult<Acc> + Send + Sync + 'static>,
4806 tx: futures::channel::oneshot::Sender<StreamResult<Acc>>,
4807}
4808
4809impl<T: Send + 'static, Acc: Send + 'static> SplitConsumer<T> for FoldResultConsumer<T, Acc> {
4810 fn push_item(&mut self, item: T) -> StreamResult<()> {
4811 let acc = self
4812 .acc
4813 .take()
4814 .expect("FoldResultConsumer: push after done");
4815 match (self.f)(acc, item) {
4816 Ok(new_acc) => {
4817 self.acc = Some(new_acc);
4818 Ok(())
4819 }
4820 Err(e) => Err(e),
4821 }
4822 }
4823 fn complete(mut self: Box<Self>) {
4824 let acc = self
4825 .acc
4826 .take()
4827 .expect("FoldResultConsumer: complete called twice");
4828 let _ = self.tx.send(Ok(acc));
4829 }
4830 fn fail(mut self: Box<Self>, error: StreamError) {
4831 self.acc = None;
4832 let _ = self.tx.send(Err(error));
4833 }
4834}
4835
4836struct CollectConsumer<T> {
4837 items: Vec<T>,
4838 tx: futures::channel::oneshot::Sender<StreamResult<Vec<T>>>,
4839}
4840
4841impl<T: Send + 'static> SplitConsumer<T> for CollectConsumer<T> {
4842 fn push_item(&mut self, item: T) -> StreamResult<()> {
4843 self.items.push(item);
4844 Ok(())
4845 }
4846 fn complete(self: Box<Self>) {
4847 let _ = self.tx.send(Ok(self.items));
4848 }
4849 fn fail(self: Box<Self>, error: StreamError) {
4850 let _ = self.tx.send(Err(error));
4851 }
4852}
4853
4854struct IgnoreConsumer<T> {
4855 tx: futures::channel::oneshot::Sender<StreamResult<NotUsed>>,
4856 _phantom: std::marker::PhantomData<fn(T)>,
4857}
4858
4859impl<T: Send + 'static> SplitConsumer<T> for IgnoreConsumer<T> {
4860 fn push_item(&mut self, _item: T) -> StreamResult<()> {
4861 Ok(())
4862 }
4863 fn complete(self: Box<Self>) {
4864 let _ = self.tx.send(Ok(NotUsed));
4865 }
4866 fn fail(self: Box<Self>, error: StreamError) {
4867 let _ = self.tx.send(Err(error));
4868 }
4869}
4870
4871pub(super) struct FoldDescriptor<T, Acc> {
4874 pub(super) zero: Acc,
4875 pub(super) f: Arc<dyn Fn(Acc, T) -> Acc + Send + Sync + 'static>,
4876}
4877
4878impl<T: Send + 'static, Acc: Clone + Send + Sync + 'static> FoldFastPathDyn<T>
4879 for FoldDescriptor<T, Acc>
4880{
4881 fn try_register(
4882 &self,
4883 hook: Arc<dyn SplitSegmentHookDyn>,
4884 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
4885 let hook_any = hook.as_any_arc();
4886 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
4887 if slot.claimed.swap(true, Ordering::SeqCst) {
4888 return Some(Err(StreamError::Failed(
4889 "substream source cannot be materialized more than once".into(),
4890 )));
4891 }
4892 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Acc>>();
4893 {
4894 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
4895
4896 if let Some(terminal) = state.terminal.take() {
4898 let buffer = std::mem::take(&mut state.buffer);
4899 state.consumer = SegmentConsumer::DirectTaken;
4900 drop(state);
4901 let mut acc = self.zero.clone();
4903 for item in buffer {
4904 acc = (self.f)(acc, item);
4905 }
4906 let result = match terminal {
4907 LiveSubstreamTerminal::Complete => Ok(acc),
4908 LiveSubstreamTerminal::Error(e) => Err(e),
4909 };
4910 let _ = tx.send(result);
4911 let completion = StreamCompletion::from_receiver(rx, None);
4912 return Some(Ok(Box::new(completion)));
4913 }
4914
4915 let consumer: Box<dyn SplitConsumer<T>> = Box::new(FoldConsumer {
4917 acc: Some(self.zero.clone()),
4918 f: Arc::clone(&self.f),
4919 tx,
4920 });
4921 state.consumer = SegmentConsumer::Direct(consumer);
4922 }
4923 slot.available.notify_all();
4924 let completion = StreamCompletion::from_receiver(rx, None);
4925 Some(Ok(Box::new(completion)))
4926 }
4927}
4928
4929pub(super) struct FoldResultDescriptor<T, Acc> {
4930 pub(super) zero: Acc,
4931 pub(super) f: Arc<dyn Fn(Acc, T) -> StreamResult<Acc> + Send + Sync + 'static>,
4932}
4933
4934impl<T: Send + 'static, Acc: Clone + Send + Sync + 'static> FoldFastPathDyn<T>
4935 for FoldResultDescriptor<T, Acc>
4936{
4937 fn try_register(
4938 &self,
4939 hook: Arc<dyn SplitSegmentHookDyn>,
4940 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
4941 let hook_any = hook.as_any_arc();
4942 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
4943 if slot.claimed.swap(true, Ordering::SeqCst) {
4944 return Some(Err(StreamError::Failed(
4945 "substream source cannot be materialized more than once".into(),
4946 )));
4947 }
4948 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Acc>>();
4949 {
4950 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
4951
4952 if let Some(terminal) = state.terminal.take() {
4954 let buffer = std::mem::take(&mut state.buffer);
4955 state.consumer = SegmentConsumer::DirectTaken;
4956 drop(state);
4957 let acc = self.zero.clone();
4958 let result = match terminal {
4959 LiveSubstreamTerminal::Complete => buffer
4960 .into_iter()
4961 .try_fold(acc, |a, item| (self.f)(a, item)),
4962 LiveSubstreamTerminal::Error(e) => Err(e),
4963 };
4964 let _ = tx.send(result);
4965 let completion = StreamCompletion::from_receiver(rx, None);
4966 return Some(Ok(Box::new(completion)));
4967 }
4968
4969 let consumer: Box<dyn SplitConsumer<T>> = Box::new(FoldResultConsumer {
4971 acc: Some(self.zero.clone()),
4972 f: Arc::clone(&self.f),
4973 tx,
4974 });
4975 state.consumer = SegmentConsumer::Direct(consumer);
4976 }
4977 slot.available.notify_all();
4978 let completion = StreamCompletion::from_receiver(rx, None);
4979 Some(Ok(Box::new(completion)))
4980 }
4981}
4982
4983pub(super) struct CollectDescriptor<T> {
4984 pub(super) _phantom: std::marker::PhantomData<fn(T)>,
4985}
4986
4987impl<T: Send + 'static> FoldFastPathDyn<T> for CollectDescriptor<T> {
4988 fn try_register(
4989 &self,
4990 hook: Arc<dyn SplitSegmentHookDyn>,
4991 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
4992 let hook_any = hook.as_any_arc();
4993 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
4994 if slot.claimed.swap(true, Ordering::SeqCst) {
4995 return Some(Err(StreamError::Failed(
4996 "substream source cannot be materialized more than once".into(),
4997 )));
4998 }
4999 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Vec<T>>>();
5000 {
5001 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5002
5003 if let Some(terminal) = state.terminal.take() {
5005 let buffer = std::mem::take(&mut state.buffer);
5006 state.consumer = SegmentConsumer::DirectTaken;
5007 drop(state);
5008 let items: Vec<T> = buffer.into_iter().collect();
5009 let result = match terminal {
5010 LiveSubstreamTerminal::Complete => Ok(items),
5011 LiveSubstreamTerminal::Error(e) => Err(e),
5012 };
5013 let _ = tx.send(result);
5014 let completion = StreamCompletion::from_receiver(rx, None);
5015 return Some(Ok(Box::new(completion)));
5016 }
5017
5018 let consumer: Box<dyn SplitConsumer<T>> = Box::new(CollectConsumer {
5020 items: Vec::new(),
5021 tx,
5022 });
5023 state.consumer = SegmentConsumer::Direct(consumer);
5024 }
5025 slot.available.notify_all();
5026 let completion = StreamCompletion::from_receiver(rx, None);
5027 Some(Ok(Box::new(completion)))
5028 }
5029}
5030
5031pub(super) struct IgnoreDescriptor<T> {
5032 pub(super) _phantom: std::marker::PhantomData<fn(T)>,
5033}
5034
5035impl<T: Send + 'static> FoldFastPathDyn<T> for IgnoreDescriptor<T> {
5036 fn try_register(
5037 &self,
5038 hook: Arc<dyn SplitSegmentHookDyn>,
5039 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5040 let hook_any = hook.as_any_arc();
5041 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
5042 if slot.claimed.swap(true, Ordering::SeqCst) {
5043 return Some(Err(StreamError::Failed(
5044 "substream source cannot be materialized more than once".into(),
5045 )));
5046 }
5047 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<NotUsed>>();
5048 {
5049 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5050
5051 if let Some(terminal) = state.terminal.take() {
5053 state.consumer = SegmentConsumer::DirectTaken;
5054 drop(state);
5055 let result = match terminal {
5056 LiveSubstreamTerminal::Complete => Ok(NotUsed),
5057 LiveSubstreamTerminal::Error(e) => Err(e),
5058 };
5059 let _ = tx.send(result);
5060 let completion = StreamCompletion::from_receiver(rx, None);
5061 return Some(Ok(Box::new(completion)));
5062 }
5063
5064 let consumer: Box<dyn SplitConsumer<T>> = Box::new(IgnoreConsumer {
5066 tx,
5067 _phantom: std::marker::PhantomData,
5068 });
5069 state.consumer = SegmentConsumer::Direct(consumer);
5070 }
5071 slot.available.notify_all();
5072 let completion = StreamCompletion::from_receiver(rx, None);
5073 Some(Ok(Box::new(completion)))
5074 }
5075}
5076
5077enum SegmentConsumer<T: Send + 'static> {
5080 Pending,
5081 Direct(Box<dyn SplitConsumer<T>>),
5082 DirectTaken,
5083 Fallback,
5084}
5085
5086struct SegmentSlotState<T: Send + 'static> {
5087 buffer: VecDeque<T>,
5088 consumer: SegmentConsumer<T>,
5089 terminal: Option<LiveSubstreamTerminal>,
5090}
5091
5092pub(super) struct SegmentConsumerSlot<T: Send + 'static> {
5093 claimed: Arc<AtomicBool>,
5094 state: Mutex<SegmentSlotState<T>>,
5095 available: Condvar,
5096 parent_cancelled: Arc<AtomicBool>,
5097}
5098
5099impl<T: Clone + Send + 'static> SegmentConsumerSlot<T> {
5100 fn new(parent_cancelled: Arc<AtomicBool>) -> Arc<Self> {
5101 Arc::new(Self {
5102 claimed: Arc::new(AtomicBool::new(false)),
5103 state: Mutex::new(SegmentSlotState {
5104 buffer: VecDeque::new(),
5105 consumer: SegmentConsumer::Pending,
5106 terminal: None,
5107 }),
5108 available: Condvar::new(),
5109 parent_cancelled,
5110 })
5111 }
5112}
5113
5114impl<T: Clone + Send + 'static> SplitSegmentHookDyn for SegmentConsumerSlot<T> {
5115 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
5116 self
5117 }
5118}
5119
5120impl<T: Clone + Send + 'static> SourceFactory<T, NotUsed> for SegmentConsumerSlot<T> {
5121 fn create(
5122 self: Arc<Self>,
5123 _materializer: &Materializer,
5124 ) -> StreamResult<(BoxStream<T>, NotUsed)> {
5125 if self.claimed.swap(true, Ordering::SeqCst) {
5126 return Err(StreamError::Failed(
5127 "substream source cannot be materialized more than once".into(),
5128 ));
5129 }
5130 {
5131 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
5132 state.consumer = SegmentConsumer::Fallback;
5133 }
5134 self.available.notify_all();
5135 let stream = FallbackSegmentStream {
5136 slot: Arc::clone(&self),
5137 };
5138 Ok((Box::new(stream), NotUsed))
5139 }
5140}
5141
5142struct FallbackSegmentStream<T: Send + 'static> {
5143 slot: Arc<SegmentConsumerSlot<T>>,
5144}
5145
5146impl<T: Clone + Send + 'static> Iterator for FallbackSegmentStream<T> {
5147 type Item = StreamResult<T>;
5148
5149 fn next(&mut self) -> Option<Self::Item> {
5150 let mut state = self.slot.state.lock().unwrap_or_else(|p| p.into_inner());
5151 loop {
5152 if let Some(item) = state.buffer.pop_front() {
5153 drop(state);
5154 self.slot.available.notify_all();
5155 return Some(Ok(item));
5156 }
5157 if let Some(terminal) = &state.terminal {
5158 return match terminal {
5159 LiveSubstreamTerminal::Complete => None,
5160 LiveSubstreamTerminal::Error(e) => Some(Err(e.clone())),
5161 };
5162 }
5163 if self.slot.parent_cancelled.load(Ordering::SeqCst) {
5164 return Some(Err(StreamError::AbruptTermination));
5165 }
5166 state = self
5167 .slot
5168 .available
5169 .wait(state)
5170 .unwrap_or_else(|p| p.into_inner());
5171 }
5172 }
5173}
5174
5175impl<T: Send + 'static> Drop for FallbackSegmentStream<T> {
5176 fn drop(&mut self) {
5177 let mut state = self.slot.state.lock().unwrap_or_else(|p| p.into_inner());
5178 if state.terminal.is_none() {
5179 state.terminal = Some(LiveSubstreamTerminal::Error(StreamError::Cancelled));
5180 }
5181 drop(state);
5182 self.slot.available.notify_all();
5183 }
5184}
5185
5186struct SplitFastWorkerGuard<Out: Send + 'static> {
5189 outer: Arc<LiveSubstreamShared<Source<Out>>>,
5190 current_slot: Option<Arc<SegmentConsumerSlot<Out>>>,
5191 current_consumer: Option<Box<dyn SplitConsumer<Out>>>,
5192 armed: bool,
5193 parent_cancelled: Arc<AtomicBool>,
5194 local_pending: VecDeque<Out>,
5195}
5196
5197impl<Out: Clone + Send + 'static> SplitFastWorkerGuard<Out> {
5198 fn new(
5199 outer: Arc<LiveSubstreamShared<Source<Out>>>,
5200 parent_cancelled: Arc<AtomicBool>,
5201 ) -> Self {
5202 Self {
5203 outer,
5204 current_slot: None,
5205 current_consumer: None,
5206 armed: true,
5207 parent_cancelled,
5208 local_pending: VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH),
5209 }
5210 }
5211
5212 fn disarm(&mut self) {
5213 self.armed = false;
5214 }
5215
5216 fn open_segment(&mut self) -> Result<(), ()> {
5217 let slot = SegmentConsumerSlot::new(Arc::clone(&self.parent_cancelled));
5218 let factory: Arc<dyn SourceFactory<Out, NotUsed>> =
5219 Arc::clone(&slot) as Arc<dyn SourceFactory<Out, NotUsed>>;
5220 let hook: Arc<dyn SplitSegmentHookDyn> = Arc::clone(&slot) as Arc<dyn SplitSegmentHookDyn>;
5221 let source = Source {
5222 factory,
5223 hints: SourceHints::default(),
5224 attributes: Attributes::default(),
5225 split_hook: Some(hook),
5226 };
5227 self.current_slot = Some(slot);
5228 push_live_substream(&self.outer, source).map_err(|_| ())
5229 }
5230
5231 fn push_item(&mut self, item: Out) -> Result<(), ()> {
5232 if let Some(ref mut consumer) = self.current_consumer {
5234 if let Err(e) = consumer.push_item(item) {
5235 let c = self.current_consumer.take().unwrap();
5236 c.fail(e);
5237 return Err(());
5238 }
5239 return Ok(());
5240 }
5241
5242 self.local_pending.push_back(item);
5244 if self.local_pending.len() >= LIVE_SUBSTREAM_BATCH {
5245 self.flush_pending()
5246 } else {
5247 Ok(())
5248 }
5249 }
5250
5251 fn flush_pending(&mut self) -> Result<(), ()> {
5254 if self.local_pending.is_empty() {
5255 return Ok(());
5256 }
5257
5258 if let Some(ref mut consumer) = self.current_consumer {
5260 for item in self.local_pending.drain(..) {
5261 if let Err(e) = consumer.push_item(item) {
5262 let c = self.current_consumer.take().unwrap();
5263 c.fail(e);
5264 return Err(());
5265 }
5266 }
5267 return Ok(());
5268 }
5269
5270 let slot = match &self.current_slot {
5271 Some(s) => Arc::clone(s),
5272 None => {
5273 self.local_pending.clear();
5274 return Ok(());
5275 }
5276 };
5277
5278 loop {
5279 if self.parent_cancelled.load(Ordering::SeqCst) {
5280 self.local_pending.clear();
5281 return Err(());
5282 }
5283
5284 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5285
5286 if matches!(state.consumer, SegmentConsumer::Direct(_)) {
5287 let consumer =
5288 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5289 SegmentConsumer::Direct(c) => c,
5290 _ => unreachable!(),
5291 };
5292 let mut drain_buf = std::mem::take(&mut state.buffer);
5293 drop(state);
5294 let mut consumer_box = consumer;
5295 for item in drain_buf.drain(..) {
5296 if let Err(e) = consumer_box.push_item(item) {
5297 consumer_box.fail(e);
5298 self.local_pending.clear();
5299 return Err(());
5300 }
5301 }
5302 for item in self.local_pending.drain(..) {
5303 if let Err(e) = consumer_box.push_item(item) {
5304 consumer_box.fail(e);
5305 return Err(());
5306 }
5307 }
5308 self.current_consumer = Some(consumer_box);
5309 return Ok(());
5310 }
5311
5312 if matches!(
5313 state.consumer,
5314 SegmentConsumer::Pending | SegmentConsumer::Fallback
5315 ) {
5316 let is_fallback = matches!(state.consumer, SegmentConsumer::Fallback);
5317 let cap = LIVE_SUBSTREAM_CAPACITY.saturating_sub(state.buffer.len());
5318 if cap > 0 {
5319 let to_flush = cap.min(self.local_pending.len());
5320 let items: Vec<Out> = self.local_pending.drain(..to_flush).collect();
5321 state.buffer.extend(items);
5322 let has_more = !self.local_pending.is_empty();
5323 drop(state);
5324 if is_fallback {
5325 slot.available.notify_all();
5326 }
5327 if !has_more {
5328 return Ok(());
5329 }
5330 let mut state2 = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5332 state2 = slot
5333 .available
5334 .wait(state2)
5335 .unwrap_or_else(|p| p.into_inner());
5336 drop(state2);
5337 continue;
5338 } else {
5339 state = slot
5341 .available
5342 .wait(state)
5343 .unwrap_or_else(|p| p.into_inner());
5344 continue;
5345 }
5346 }
5347
5348 return Ok(());
5350 }
5351 }
5352
5353 fn close_segment(&mut self) {
5354 let _ = self.flush_pending();
5356
5357 if let Some(consumer) = self.current_consumer.take() {
5359 consumer.complete();
5360 self.current_slot = None;
5361 return;
5362 }
5363
5364 let slot = match self.current_slot.take() {
5365 Some(s) => s,
5366 None => return,
5367 };
5368
5369 if self.parent_cancelled.load(Ordering::SeqCst) {
5370 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5371 if state.terminal.is_none() {
5372 state.terminal = Some(LiveSubstreamTerminal::Error(StreamError::AbruptTermination));
5373 }
5374 drop(state);
5375 slot.available.notify_all();
5376 return;
5377 }
5378
5379 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5380 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5381 SegmentConsumer::Direct(mut consumer_box) => {
5382 let mut drain_buf = std::mem::take(&mut state.buffer);
5383 drop(state);
5384 for item in drain_buf.drain(..) {
5385 if let Err(e) = consumer_box.push_item(item) {
5386 consumer_box.fail(e);
5387 return;
5388 }
5389 }
5390 consumer_box.complete();
5391 }
5392 SegmentConsumer::DirectTaken => {
5393 }
5395 SegmentConsumer::Fallback => {
5396 state.consumer = SegmentConsumer::DirectTaken;
5397 if state.terminal.is_none() {
5398 state.terminal = Some(LiveSubstreamTerminal::Complete);
5399 }
5400 drop(state);
5401 slot.available.notify_all();
5402 }
5403 SegmentConsumer::Pending => {
5404 state.consumer = SegmentConsumer::Pending;
5406 if state.terminal.is_none() {
5407 state.terminal = Some(LiveSubstreamTerminal::Complete);
5408 }
5409 drop(state);
5410 slot.available.notify_all();
5412 }
5413 }
5414 }
5415
5416 fn fail_segment(&mut self, error: StreamError) {
5417 self.local_pending.clear();
5419
5420 if let Some(consumer) = self.current_consumer.take() {
5421 consumer.fail(error);
5422 self.current_slot = None;
5423 return;
5424 }
5425 let slot = match self.current_slot.take() {
5426 Some(s) => s,
5427 None => return,
5428 };
5429 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5430 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5431 SegmentConsumer::Direct(c) => {
5432 drop(state);
5433 c.fail(error);
5434 }
5435 _ => {
5436 if state.terminal.is_none() {
5437 state.terminal = Some(LiveSubstreamTerminal::Error(error));
5438 }
5439 drop(state);
5440 slot.available.notify_all();
5441 }
5442 }
5443 }
5444
5445 fn fail_all(&mut self, error: StreamError) {
5446 self.fail_segment(error.clone());
5447 fail_live_substream(&self.outer, error);
5448 }
5449
5450 fn complete_all(&mut self) {
5451 self.close_segment();
5452 complete_live_substream(&self.outer);
5453 }
5454}
5455
5456impl<Out: Send + 'static> Drop for SplitFastWorkerGuard<Out> {
5457 fn drop(&mut self) {
5458 if self.armed {
5459 self.local_pending.clear(); if let Some(consumer) = self.current_consumer.take() {
5461 consumer.fail(StreamError::AbruptTermination);
5462 } else if let Some(slot) = self.current_slot.take() {
5463 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5464 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5465 SegmentConsumer::Direct(c) => {
5466 drop(state);
5467 c.fail(StreamError::AbruptTermination);
5468 }
5469 _ => {
5470 if state.terminal.is_none() {
5471 state.terminal =
5472 Some(LiveSubstreamTerminal::Error(StreamError::AbruptTermination));
5473 }
5474 drop(state);
5475 slot.available.notify_all();
5476 }
5477 }
5478 }
5479 fail_live_substream(&self.outer, StreamError::AbruptTermination);
5480 }
5481 }
5482}
5483
5484fn split_streams_fast<Out, F>(
5485 mut input: BoxStream<Out>,
5486 mode: SplitMode,
5487 predicate: Arc<F>,
5488 parent_cancelled: Arc<AtomicBool>,
5489 materializer: &Materializer,
5490) -> BoxStream<Source<Out>>
5491where
5492 Out: Clone + Send + 'static,
5493 F: Fn(&Out) -> bool + Send + Sync + 'static,
5494{
5495 let outer = LiveSubstreamShared::new();
5496 let worker_outer = Arc::clone(&outer);
5497 let completion = materializer.spawn_stream(move |cancelled| {
5498 let mut guard =
5499 SplitFastWorkerGuard::new(Arc::clone(&worker_outer), Arc::clone(&parent_cancelled));
5500
5501 while !cancelled.load(Ordering::SeqCst) {
5502 if worker_outer.cancelled.load(Ordering::SeqCst) {
5503 guard.disarm();
5504 return Ok(NotUsed);
5505 }
5506
5507 match input.next() {
5508 Some(Ok(item)) => {
5509 let split = match catch_unwind(AssertUnwindSafe(|| predicate(&item))) {
5510 Ok(s) => s,
5511 Err(_) => {
5512 guard.fail_all(StreamError::AbruptTermination);
5513 guard.disarm();
5514 return Ok(NotUsed);
5515 }
5516 };
5517
5518 match mode {
5519 SplitMode::When => {
5520 if split
5521 && (guard.current_slot.is_some()
5522 || guard.current_consumer.is_some())
5523 {
5524 guard.close_segment();
5525 }
5526 if guard.current_slot.is_none()
5527 && guard.current_consumer.is_none()
5528 && guard.open_segment().is_err()
5529 {
5530 guard.disarm();
5531 return Ok(NotUsed);
5532 }
5533 if guard.push_item(item).is_err() {
5534 guard.disarm();
5535 return Ok(NotUsed);
5536 }
5537 }
5538 SplitMode::After => {
5539 if guard.current_slot.is_none()
5540 && guard.current_consumer.is_none()
5541 && guard.open_segment().is_err()
5542 {
5543 guard.disarm();
5544 return Ok(NotUsed);
5545 }
5546 if guard.push_item(item).is_err() {
5547 guard.disarm();
5548 return Ok(NotUsed);
5549 }
5550 if split {
5551 guard.close_segment();
5552 }
5553 }
5554 }
5555 }
5556 Some(Err(error)) => {
5557 guard.fail_all(error.clone());
5558 guard.disarm();
5559 return Err(error);
5560 }
5561 None => {
5562 guard.complete_all();
5563 guard.disarm();
5564 return Ok(NotUsed);
5565 }
5566 }
5567 }
5568 guard.complete_all();
5569 guard.disarm();
5570 Ok(NotUsed)
5571 });
5572
5573 Box::new(LiveSubstreamStream {
5574 shared: outer,
5575 completion: Some(completion),
5576 local_batch: VecDeque::new(),
5577 })
5578}
5579
5580#[cfg(test)]
5582struct FlatMapMergeShared<T> {
5583 state: Mutex<FlatMapMergeState<T>>,
5584 available: Condvar,
5585 cancelled: Arc<AtomicBool>,
5586}
5587
5588#[cfg(test)]
5589struct FlatMapMergeState<T> {
5590 queued: VecDeque<(usize, T)>,
5591 window: HashMap<usize, usize>,
5592 active_streams: usize,
5593 input_done: bool,
5594 terminal: Option<StreamError>,
5595}
5596
5597#[cfg(test)]
5598impl<T> FlatMapMergeShared<T> {
5599 fn new() -> Arc<Self> {
5600 Arc::new(Self {
5601 state: Mutex::new(FlatMapMergeState {
5602 queued: VecDeque::new(),
5603 window: HashMap::new(),
5604 active_streams: 0,
5605 input_done: false,
5606 terminal: None,
5607 }),
5608 available: Condvar::new(),
5609 cancelled: Arc::new(AtomicBool::new(false)),
5610 })
5611 }
5612}
5613
5614#[cfg(test)]
5615struct FlatMapMergeStream<T> {
5616 shared: Arc<FlatMapMergeShared<T>>,
5617 completion: Option<StreamCompletion<NotUsed>>,
5618}
5619
5620#[cfg(test)]
5621impl<T> Iterator for FlatMapMergeStream<T> {
5622 type Item = StreamResult<T>;
5623
5624 fn next(&mut self) -> Option<Self::Item> {
5625 let mut state = self
5626 .shared
5627 .state
5628 .lock()
5629 .unwrap_or_else(|poison| poison.into_inner());
5630 loop {
5631 if let Some((stream_id, item)) = state.queued.pop_front() {
5632 if let Some(count) = state.window.get_mut(&stream_id) {
5635 *count -= 1;
5636 if *count == 0 {
5637 state.window.remove(&stream_id);
5638 }
5639 }
5640 drop(state);
5641 self.shared.available.notify_all();
5642 return Some(Ok(item));
5643 }
5644 if let Some(error) = state.terminal.clone() {
5645 return Some(Err(error));
5646 }
5647 if state.input_done && state.active_streams == 0 {
5648 return None;
5649 }
5650 state = self
5651 .shared
5652 .available
5653 .wait(state)
5654 .unwrap_or_else(|poison| poison.into_inner());
5655 }
5656 }
5657}
5658
5659#[cfg(test)]
5660impl<T> Drop for FlatMapMergeStream<T> {
5661 fn drop(&mut self) {
5662 self.shared.cancelled.store(true, Ordering::SeqCst);
5663 self.shared.available.notify_all();
5664 let _ = self.completion.take();
5665 }
5666}
5667
5668#[cfg(test)]
5669fn flat_map_merge_register_stream<T>(shared: &Arc<FlatMapMergeShared<T>>) {
5670 let mut state = shared
5671 .state
5672 .lock()
5673 .unwrap_or_else(|poison| poison.into_inner());
5674 state.active_streams += 1;
5675 drop(state);
5676 shared.available.notify_all();
5677}
5678
5679#[cfg(test)]
5680fn flat_map_merge_finish_stream<T>(
5681 shared: &Arc<FlatMapMergeShared<T>>,
5682 stream_id: usize,
5683 terminal: Result<(), StreamError>,
5684) {
5685 let mut state = shared
5686 .state
5687 .lock()
5688 .unwrap_or_else(|poison| poison.into_inner());
5689 state.active_streams = state.active_streams.saturating_sub(1);
5690 if let Err(error) = terminal
5691 && state.terminal.is_none()
5692 {
5693 state.terminal = Some(error);
5694 }
5695 state.window.entry(stream_id).or_default();
5697 drop(state);
5698 shared.available.notify_all();
5699}
5700
5701#[cfg(test)]
5702fn flat_map_merge_push<T>(
5703 shared: &Arc<FlatMapMergeShared<T>>,
5704 stream_id: usize,
5705 item: T,
5706) -> Result<(), T> {
5707 let mut state = shared
5710 .state
5711 .lock()
5712 .unwrap_or_else(|poison| poison.into_inner());
5713 while state.window.get(&stream_id).copied().unwrap_or(0) >= FLAT_MAP_MERGE_SUBSTREAM_WINDOW
5714 && state.terminal.is_none()
5715 {
5716 if shared.cancelled.load(Ordering::SeqCst) {
5717 return Err(item);
5718 }
5719 state = shared
5720 .available
5721 .wait(state)
5722 .unwrap_or_else(|poison| poison.into_inner());
5723 }
5724 if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
5725 return Err(item);
5726 }
5727 *state.window.entry(stream_id).or_insert(0) += 1;
5728 let was_empty = state.queued.is_empty();
5729 state.queued.push_back((stream_id, item));
5730 drop(state);
5731 if was_empty {
5732 shared.available.notify_all();
5733 }
5734 Ok(())
5735}
5736
5737#[cfg(test)]
5738struct FlatMapMergeCoordinatorGuard<T> {
5739 shared: Arc<FlatMapMergeShared<T>>,
5740 armed: bool,
5741}
5742
5743#[cfg(test)]
5744impl<T> FlatMapMergeCoordinatorGuard<T> {
5745 fn new(shared: Arc<FlatMapMergeShared<T>>) -> Self {
5746 Self {
5747 shared,
5748 armed: true,
5749 }
5750 }
5751
5752 fn finish(&mut self) {
5753 let mut state = self
5754 .shared
5755 .state
5756 .lock()
5757 .unwrap_or_else(|poison| poison.into_inner());
5758 state.input_done = true;
5759 drop(state);
5760 self.shared.available.notify_all();
5761 self.armed = false;
5762 }
5763}
5764
5765#[cfg(test)]
5766impl<T> Drop for FlatMapMergeCoordinatorGuard<T> {
5767 fn drop(&mut self) {
5768 if self.armed {
5769 let mut state = self
5770 .shared
5771 .state
5772 .lock()
5773 .unwrap_or_else(|poison| poison.into_inner());
5774 if state.terminal.is_none() {
5775 state.terminal = Some(StreamError::AbruptTermination);
5776 }
5777 state.input_done = true;
5778 drop(state);
5779 self.shared.cancelled.store(true, Ordering::SeqCst);
5780 self.shared.available.notify_all();
5781 }
5782 }
5783}
5784
5785#[cfg(test)]
5786struct FlatMapMergeWorkerGuard<T> {
5787 shared: Arc<FlatMapMergeShared<T>>,
5788 stream_id: usize,
5789 armed: bool,
5790}
5791
5792#[cfg(test)]
5793impl<T> FlatMapMergeWorkerGuard<T> {
5794 fn new(shared: Arc<FlatMapMergeShared<T>>, stream_id: usize) -> Self {
5795 Self {
5796 shared,
5797 stream_id,
5798 armed: true,
5799 }
5800 }
5801
5802 fn finish(&mut self, terminal: Result<(), StreamError>) {
5803 flat_map_merge_finish_stream(&self.shared, self.stream_id, terminal);
5804 self.armed = false;
5805 }
5806}
5807
5808#[cfg(test)]
5809impl<T> Drop for FlatMapMergeWorkerGuard<T> {
5810 fn drop(&mut self) {
5811 if self.armed {
5812 flat_map_merge_finish_stream(
5813 &self.shared,
5814 self.stream_id,
5815 Err(StreamError::AbruptTermination),
5816 );
5817 self.shared.cancelled.store(true, Ordering::SeqCst);
5818 }
5819 }
5820}
5821
5822#[cfg(test)]
5825#[derive(Clone, Copy, PartialEq, Eq, Debug)]
5826pub(crate) enum SubstreamExecutorMode {
5827 Auto,
5828 LegacyOnly,
5829 ReadyRingOnly,
5830 SplitSinkOnly,
5831}
5832
5833#[cfg(test)]
5834thread_local! {
5835 static SUBSTREAM_EXECUTOR_MODE: std::cell::Cell<SubstreamExecutorMode> =
5836 const { std::cell::Cell::new(SubstreamExecutorMode::Auto) };
5837}
5838
5839#[cfg(test)]
5840pub(crate) fn with_substream_mode<R>(mode: SubstreamExecutorMode, f: impl FnOnce() -> R) -> R {
5841 SUBSTREAM_EXECUTOR_MODE.with(|m| {
5842 let old = m.get();
5843 m.set(mode);
5844 let result = f();
5845 m.set(old);
5846 result
5847 })
5848}
5849
5850#[cfg(test)]
5851fn current_substream_mode() -> SubstreamExecutorMode {
5852 SUBSTREAM_EXECUTOR_MODE.with(|m| m.get())
5853}
5854
5855fn flat_map_merge_stream<Out, Next, NextMat, F>(
5858 input: BoxStream<Out>,
5859 breadth: usize,
5860 stage: Arc<F>,
5861 materializer: &Materializer,
5862) -> BoxStream<Next>
5863where
5864 Out: Send + 'static,
5865 Next: Send + 'static,
5866 NextMat: Send + 'static,
5867 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
5868{
5869 #[cfg(test)]
5870 if current_substream_mode() == SubstreamExecutorMode::LegacyOnly {
5871 return flat_map_merge_stream_legacy(input, breadth, stage, materializer);
5872 }
5873 flat_map_merge_stream_ready(input, breadth, stage, materializer)
5874}
5875
5876#[cfg(test)]
5879fn flat_map_merge_stream_legacy<Out, Next, NextMat, F>(
5880 mut input: BoxStream<Out>,
5881 breadth: usize,
5882 stage: Arc<F>,
5883 materializer: &Materializer,
5884) -> BoxStream<Next>
5885where
5886 Out: Send + 'static,
5887 Next: Send + 'static,
5888 NextMat: Send + 'static,
5889 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
5890{
5891 let worker_materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
5892 let shared = FlatMapMergeShared::new();
5893 let worker_shared = Arc::clone(&shared);
5894 let completion = materializer.spawn_stream(move |cancelled| {
5895 let mut next_id = 0usize;
5896 let mut guard = FlatMapMergeCoordinatorGuard::new(worker_shared);
5897 let mut workers = HashMap::<usize, StreamCompletion<NotUsed>>::with_capacity(breadth);
5898
5899 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
5900 workers.retain(|_, completion| completion.try_wait().is_none());
5901 {
5902 let mut state = guard
5903 .shared
5904 .state
5905 .lock()
5906 .unwrap_or_else(|poison| poison.into_inner());
5907 while state.active_streams >= breadth
5908 && state.terminal.is_none()
5909 && !cancelled.load(Ordering::SeqCst)
5910 && !guard.shared.cancelled.load(Ordering::SeqCst)
5911 {
5912 state = guard
5913 .shared
5914 .available
5915 .wait(state)
5916 .unwrap_or_else(|poison| poison.into_inner());
5917 }
5918 if state.terminal.is_some() {
5919 let error = state.terminal.clone().expect("terminal checked above");
5920 drop(state);
5921 guard.finish();
5922 return Err(error);
5923 }
5924 }
5925
5926 match input.next() {
5927 Some(Ok(item)) => {
5928 let source = stage(item);
5929 let (mut stream, _) =
5930 match Arc::clone(&source.factory).create(&worker_materializer) {
5931 Ok(parts) => parts,
5932 Err(error) => {
5933 let mut state = guard
5934 .shared
5935 .state
5936 .lock()
5937 .unwrap_or_else(|poison| poison.into_inner());
5938 if state.terminal.is_none() {
5939 state.terminal = Some(error.clone());
5940 }
5941 drop(state);
5942 guard.shared.available.notify_all();
5943 guard.finish();
5944 return Err(error);
5945 }
5946 };
5947 let stream_id = next_id;
5948 next_id += 1;
5949 flat_map_merge_register_stream(&guard.shared);
5950 let worker_shared = Arc::clone(&guard.shared);
5951 workers.insert(
5952 stream_id,
5953 worker_materializer.spawn_stream(move |inner_cancelled| {
5954 let mut worker_guard =
5955 FlatMapMergeWorkerGuard::new(Arc::clone(&worker_shared), stream_id);
5956 while !inner_cancelled.load(Ordering::SeqCst)
5957 && !worker_shared.cancelled.load(Ordering::SeqCst)
5958 {
5959 match stream.next() {
5960 Some(Ok(item)) => {
5961 if flat_map_merge_push(&worker_shared, stream_id, item)
5962 .is_err()
5963 {
5964 worker_guard.finish(Ok(()));
5965 return Ok(NotUsed);
5966 }
5967 }
5968 Some(Err(error)) => {
5969 worker_guard.finish(Err(error.clone()));
5970 return Err(error);
5971 }
5972 None => {
5973 worker_guard.finish(Ok(()));
5974 return Ok(NotUsed);
5975 }
5976 }
5977 }
5978 worker_guard.finish(Ok(()));
5979 Ok(NotUsed)
5980 }),
5981 );
5982 }
5983 Some(Err(error)) => {
5984 let mut state = guard
5985 .shared
5986 .state
5987 .lock()
5988 .unwrap_or_else(|poison| poison.into_inner());
5989 if state.terminal.is_none() {
5990 state.terminal = Some(error.clone());
5991 }
5992 drop(state);
5993 guard.shared.available.notify_all();
5994 guard.finish();
5995 return Err(error);
5996 }
5997 None => {
5998 guard.finish();
5999 break;
6000 }
6001 }
6002 }
6003
6004 if guard.armed {
6005 guard.finish();
6006 }
6007 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
6015 workers.retain(|_, completion| completion.try_wait().is_none());
6016 let state = guard
6022 .shared
6023 .state
6024 .lock()
6025 .unwrap_or_else(|poison| poison.into_inner());
6026 if state.active_streams == 0 {
6027 return Ok(NotUsed);
6028 }
6029 drop(
6030 guard
6031 .shared
6032 .available
6033 .wait(state)
6034 .unwrap_or_else(|poison| poison.into_inner()),
6035 );
6036 }
6037 Ok(NotUsed)
6038 });
6039
6040 Box::new(FlatMapMergeStream {
6041 shared,
6042 completion: Some(completion),
6043 })
6044}
6045
6046const FLAT_MAP_MERGE_READY_BATCH: usize = 16;
6049
6050const FLAT_MAP_MERGE_INLINE_MICRO_MAX: usize = FLAT_MAP_MERGE_READY_BATCH;
6054
6055struct FlatMapMergeReadyShared<T> {
6056 coordinator: Mutex<FlatMapMergeReadyState<T>>,
6057 available: Condvar,
6058 cancelled: Arc<AtomicBool>,
6059}
6060
6061struct FlatMapMergeReadyState<T> {
6062 lanes: HashMap<usize, Arc<FlatMapMergeLane<T>>>,
6063 ready: std::collections::VecDeque<usize>,
6064 queued_items: usize,
6065 active_streams: usize,
6066 input_done: bool,
6067 terminal: Option<StreamError>,
6068 generation: u64,
6069}
6070
6071struct FlatMapMergeLane<T> {
6072 state: Mutex<FlatMapMergeLaneState<T>>,
6073 space_available: Condvar,
6074}
6075
6076struct FlatMapMergeLaneState<T> {
6077 buffer: std::collections::VecDeque<T>,
6078 in_ready_ring: bool,
6079 publishing: bool,
6080 closed: bool,
6081}
6082
6083struct FlatMapMergeReadyStream<T> {
6084 shared: Arc<FlatMapMergeReadyShared<T>>,
6085 completion: Option<StreamCompletion<NotUsed>>,
6086 local_batch: std::collections::VecDeque<T>,
6087}
6088
6089impl<T: Send + 'static> Iterator for FlatMapMergeReadyStream<T> {
6090 type Item = StreamResult<T>;
6091
6092 fn next(&mut self) -> Option<Self::Item> {
6093 if let Some(item) = self.local_batch.pop_front() {
6095 return Some(Ok(item));
6096 }
6097
6098 let mut coord = self
6100 .shared
6101 .coordinator
6102 .lock()
6103 .unwrap_or_else(|p| p.into_inner());
6104 loop {
6105 if let Some(error) = coord.terminal.clone() {
6107 return Some(Err(error));
6108 }
6109
6110 if let Some(lane_id) = coord.ready.pop_front() {
6112 let lane = coord.lanes.get(&lane_id).cloned();
6113 drop(coord);
6114
6115 if let Some(lane) = lane {
6116 let mut lane_state = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6118 while lane_state.publishing {
6119 lane_state = lane
6120 .space_available
6121 .wait(lane_state)
6122 .unwrap_or_else(|p| p.into_inner());
6123 }
6124 let drain_n = lane_state.buffer.len().min(FLAT_MAP_MERGE_READY_BATCH);
6125 let mut batch = std::collections::VecDeque::with_capacity(drain_n);
6126 for _ in 0..drain_n {
6127 if let Some(item) = lane_state.buffer.pop_front() {
6128 batch.push_back(item);
6129 }
6130 }
6131 let still_has_items = !lane_state.buffer.is_empty();
6132 let is_closed = lane_state.closed;
6133 if !still_has_items {
6134 lane_state.in_ready_ring = false;
6135 }
6136 drop(lane_state);
6138 if !batch.is_empty() {
6140 lane.space_available.notify_all();
6141 }
6142
6143 let freed = batch.len();
6145 let mut coord = self
6146 .shared
6147 .coordinator
6148 .lock()
6149 .unwrap_or_else(|p| p.into_inner());
6150 coord.queued_items = coord.queued_items.saturating_sub(freed);
6151 if still_has_items {
6152 coord.ready.push_back(lane_id);
6153 } else if is_closed {
6154 coord.lanes.remove(&lane_id);
6155 }
6156 coord.generation += 1;
6157 drop(coord);
6158 self.shared.available.notify_all();
6159
6160 let mut iter = batch.into_iter();
6162 if let Some(first) = iter.next() {
6163 self.local_batch.extend(iter);
6164 return Some(Ok(first));
6165 }
6166 }
6167 coord = self
6169 .shared
6170 .coordinator
6171 .lock()
6172 .unwrap_or_else(|p| p.into_inner());
6173 continue;
6174 }
6175
6176 if coord.terminal.is_none()
6178 && coord.input_done
6179 && coord.active_streams == 0
6180 && coord.queued_items == 0
6181 {
6182 return None;
6183 }
6184
6185 let seen = coord.generation;
6187 coord = self
6188 .shared
6189 .available
6190 .wait_while(coord, |s| {
6191 s.generation == seen
6192 && s.terminal.is_none()
6193 && s.ready.is_empty()
6194 && !(s.input_done && s.active_streams == 0 && s.queued_items == 0)
6195 })
6196 .unwrap_or_else(|p| p.into_inner());
6197 }
6198 }
6199}
6200
6201impl<T> Drop for FlatMapMergeReadyStream<T> {
6202 fn drop(&mut self) {
6203 let lanes: Vec<Arc<FlatMapMergeLane<T>>> = {
6204 let mut coord = self
6205 .shared
6206 .coordinator
6207 .lock()
6208 .unwrap_or_else(|p| p.into_inner());
6209 coord.generation += 1;
6210 coord.lanes.values().cloned().collect()
6211 };
6212 self.shared.cancelled.store(true, Ordering::SeqCst);
6213 self.shared.available.notify_all();
6214 for lane in lanes {
6215 lane.space_available.notify_all();
6216 }
6217 let _ = self.completion.take();
6218 }
6219}
6220
6221fn finish_lane_ready<T>(
6224 shared: &Arc<FlatMapMergeReadyShared<T>>,
6225 stream_id: usize,
6226 terminal: Result<(), StreamError>,
6227) {
6228 let is_error = terminal.is_err();
6229
6230 let lane_is_empty = {
6232 let coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
6233 let lane_opt = coord.lanes.get(&stream_id).cloned();
6234 drop(coord);
6235 if let Some(lane) = lane_opt {
6236 let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6237 ls.closed = true;
6238 ls.buffer.is_empty()
6239 } else {
6240 true
6241 }
6242 };
6243
6244 let lanes_to_notify: Vec<Arc<FlatMapMergeLane<T>>> = {
6246 let mut coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
6247 coord.active_streams = coord.active_streams.saturating_sub(1);
6248 if let Err(ref error) = terminal
6249 && coord.terminal.is_none()
6250 {
6251 coord.terminal = Some(error.clone());
6252 }
6253 if lane_is_empty {
6254 coord.lanes.remove(&stream_id);
6255 }
6256 coord.generation += 1;
6257 if is_error {
6258 coord.lanes.values().cloned().collect()
6259 } else {
6260 vec![]
6261 }
6262 };
6263
6264 if is_error {
6265 shared.cancelled.store(true, Ordering::SeqCst);
6266 for lane in &lanes_to_notify {
6267 lane.space_available.notify_all();
6268 }
6269 }
6270 shared.available.notify_all();
6271}
6272
6273struct FlatMapMergeReadyCoordinatorGuard<T> {
6274 shared: Arc<FlatMapMergeReadyShared<T>>,
6275 armed: bool,
6276}
6277
6278impl<T> FlatMapMergeReadyCoordinatorGuard<T> {
6279 fn new(shared: Arc<FlatMapMergeReadyShared<T>>) -> Self {
6280 Self {
6281 shared,
6282 armed: true,
6283 }
6284 }
6285
6286 fn finish(&mut self) {
6287 let mut coord = self
6288 .shared
6289 .coordinator
6290 .lock()
6291 .unwrap_or_else(|p| p.into_inner());
6292 coord.input_done = true;
6293 coord.generation += 1;
6294 drop(coord);
6295 self.shared.available.notify_all();
6296 self.armed = false;
6297 }
6298}
6299
6300impl<T> Drop for FlatMapMergeReadyCoordinatorGuard<T> {
6301 fn drop(&mut self) {
6302 if self.armed {
6303 let lanes: Vec<Arc<FlatMapMergeLane<T>>> = {
6304 let mut coord = self
6305 .shared
6306 .coordinator
6307 .lock()
6308 .unwrap_or_else(|p| p.into_inner());
6309 if coord.terminal.is_none() {
6310 coord.terminal = Some(StreamError::AbruptTermination);
6311 }
6312 coord.input_done = true;
6313 coord.generation += 1;
6314 coord.lanes.values().cloned().collect()
6315 };
6316 self.shared.cancelled.store(true, Ordering::SeqCst);
6317 self.shared.available.notify_all();
6318 for lane in lanes {
6319 lane.space_available.notify_all();
6320 }
6321 }
6322 }
6323}
6324
6325struct FlatMapMergeReadyWorkerGuard<T> {
6326 shared: Arc<FlatMapMergeReadyShared<T>>,
6327 stream_id: usize,
6328 armed: bool,
6329}
6330
6331impl<T> FlatMapMergeReadyWorkerGuard<T> {
6332 fn new(shared: Arc<FlatMapMergeReadyShared<T>>, stream_id: usize) -> Self {
6333 Self {
6334 shared,
6335 stream_id,
6336 armed: true,
6337 }
6338 }
6339
6340 fn finish(&mut self, terminal: Result<(), StreamError>) {
6341 finish_lane_ready(&self.shared, self.stream_id, terminal);
6342 self.armed = false;
6343 }
6344}
6345
6346impl<T> Drop for FlatMapMergeReadyWorkerGuard<T> {
6347 fn drop(&mut self) {
6348 if self.armed {
6349 finish_lane_ready(
6350 &self.shared,
6351 self.stream_id,
6352 Err(StreamError::AbruptTermination),
6353 );
6354 self.shared.cancelled.store(true, Ordering::SeqCst);
6355 }
6356 }
6357}
6358
6359fn publish_ready_batch<T>(
6374 shared: &Arc<FlatMapMergeReadyShared<T>>,
6375 stream_id: usize,
6376 lane: &Arc<FlatMapMergeLane<T>>,
6377 batch: std::collections::VecDeque<T>,
6378) {
6379 let batch_len = batch.len();
6380 if batch_len == 0 {
6381 return;
6382 }
6383
6384 let was_not_in_ready = {
6386 let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6387 let was_not = !ls.in_ready_ring;
6388 ls.publishing = true;
6389 ls.buffer.extend(batch);
6390 if !ls.buffer.is_empty() {
6391 ls.in_ready_ring = true;
6392 }
6393 was_not
6394 };
6395
6396 {
6398 let mut coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
6399 coord.queued_items += batch_len;
6400 if was_not_in_ready {
6401 coord.ready.push_back(stream_id);
6402 }
6403 coord.generation += 1;
6404 }
6405 {
6406 let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6407 ls.publishing = false;
6408 }
6409 lane.space_available.notify_all();
6410 shared.available.notify_all();
6411}
6412
6413struct FlatMapMergeReadyInlineGuard<T> {
6423 shared: Arc<FlatMapMergeReadyShared<T>>,
6424 stream_id: usize,
6425 armed: bool,
6426}
6427
6428impl<T> FlatMapMergeReadyInlineGuard<T> {
6429 fn new(shared: Arc<FlatMapMergeReadyShared<T>>, stream_id: usize) -> Self {
6430 Self {
6431 shared,
6432 stream_id,
6433 armed: true,
6434 }
6435 }
6436
6437 fn finish(&mut self, terminal: Result<(), StreamError>) {
6438 finish_lane_ready(&self.shared, self.stream_id, terminal);
6439 self.armed = false;
6440 }
6441
6442 fn hand_off(&mut self) {
6443 self.armed = false;
6444 }
6445}
6446
6447impl<T> Drop for FlatMapMergeReadyInlineGuard<T> {
6448 fn drop(&mut self) {
6449 if self.armed {
6450 finish_lane_ready(
6451 &self.shared,
6452 self.stream_id,
6453 Err(StreamError::AbruptTermination),
6454 );
6455 }
6456 }
6457}
6458
6459fn flat_map_merge_stream_ready<Out, Next, NextMat, F>(
6460 mut input: BoxStream<Out>,
6461 breadth: usize,
6462 stage: Arc<F>,
6463 materializer: &Materializer,
6464) -> BoxStream<Next>
6465where
6466 Out: Send + 'static,
6467 Next: Send + 'static,
6468 NextMat: Send + 'static,
6469 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
6470{
6471 let worker_materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
6472 let shared: Arc<FlatMapMergeReadyShared<Next>> = Arc::new(FlatMapMergeReadyShared {
6473 coordinator: Mutex::new(FlatMapMergeReadyState {
6474 lanes: HashMap::new(),
6475 ready: std::collections::VecDeque::new(),
6476 queued_items: 0,
6477 active_streams: 0,
6478 input_done: false,
6479 terminal: None,
6480 generation: 0,
6481 }),
6482 available: Condvar::new(),
6483 cancelled: Arc::new(AtomicBool::new(false)),
6484 });
6485
6486 let worker_shared = Arc::clone(&shared);
6487 let completion = materializer.spawn_stream(move |cancelled| {
6488 let mut next_id = 0usize;
6489 let mut guard = FlatMapMergeReadyCoordinatorGuard::new(worker_shared);
6490 let mut workers = HashMap::<usize, StreamCompletion<NotUsed>>::with_capacity(breadth);
6491
6492 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
6493 workers.retain(|_, c| c.try_wait().is_none());
6494
6495 {
6497 let mut coord = guard
6498 .shared
6499 .coordinator
6500 .lock()
6501 .unwrap_or_else(|p| p.into_inner());
6502 loop {
6503 if coord.terminal.is_some()
6504 || coord.active_streams < breadth
6505 || cancelled.load(Ordering::SeqCst)
6506 || guard.shared.cancelled.load(Ordering::SeqCst)
6507 {
6508 break;
6509 }
6510 let seen = coord.generation;
6511 coord = guard
6512 .shared
6513 .available
6514 .wait_while(coord, |s| {
6515 s.generation == seen
6516 && s.terminal.is_none()
6517 && s.active_streams >= breadth
6518 && !cancelled.load(Ordering::SeqCst)
6519 && !guard.shared.cancelled.load(Ordering::SeqCst)
6520 })
6521 .unwrap_or_else(|p| p.into_inner());
6522 }
6523 if coord.terminal.is_some() {
6524 let error = coord.terminal.clone().expect("terminal checked");
6525 drop(coord);
6526 guard.finish();
6527 return Err(error);
6528 }
6529 }
6530
6531 match input.next() {
6532 Some(Ok(item)) => {
6533 let source = stage(item);
6535 let inline_hint = source.hints.inline_micro;
6537 let (stream, _) = match Arc::clone(&source.factory).create(&worker_materializer)
6538 {
6539 Ok(parts) => parts,
6540 Err(error) => {
6541 let lanes: Vec<Arc<FlatMapMergeLane<Next>>> = {
6542 let mut coord = guard
6543 .shared
6544 .coordinator
6545 .lock()
6546 .unwrap_or_else(|p| p.into_inner());
6547 if coord.terminal.is_none() {
6548 coord.terminal = Some(error.clone());
6549 }
6550 coord.generation += 1;
6551 coord.lanes.values().cloned().collect()
6552 };
6553 guard.shared.cancelled.store(true, Ordering::SeqCst);
6554 guard.shared.available.notify_all();
6555 for lane in lanes {
6556 lane.space_available.notify_all();
6557 }
6558 guard.finish();
6559 return Err(error);
6560 }
6561 };
6562
6563 let stream_id = next_id;
6564 next_id += 1;
6565 let mut stream = stream;
6566
6567 let lane: Arc<FlatMapMergeLane<Next>> = Arc::new(FlatMapMergeLane {
6569 state: Mutex::new(FlatMapMergeLaneState {
6570 buffer: std::collections::VecDeque::new(),
6571 in_ready_ring: false,
6572 publishing: false,
6573 closed: false,
6574 }),
6575 space_available: Condvar::new(),
6576 });
6577 {
6578 let mut coord = guard
6579 .shared
6580 .coordinator
6581 .lock()
6582 .unwrap_or_else(|p| p.into_inner());
6583 coord.lanes.insert(stream_id, Arc::clone(&lane));
6584 coord.active_streams += 1;
6585 }
6587
6588 let mut inline_guard =
6591 FlatMapMergeReadyInlineGuard::new(Arc::clone(&guard.shared), stream_id);
6592
6593 let is_inline = inline_hint
6594 .is_some_and(|h| h.max_success_items <= FLAT_MAP_MERGE_INLINE_MICRO_MAX);
6595
6596 if is_inline {
6597 let max_items = inline_hint.expect("checked").max_success_items;
6601
6602 if guard.shared.cancelled.load(Ordering::SeqCst)
6604 || cancelled.load(Ordering::SeqCst)
6605 {
6606 inline_guard.finish(Ok(()));
6607 continue;
6608 }
6609
6610 let max_pulls = max_items.saturating_add(1);
6613 let mut local_batch = std::collections::VecDeque::with_capacity(max_items);
6614 let mut terminal_result: Option<Result<(), StreamError>> = None;
6615
6616 for _ in 0..max_pulls {
6617 if guard.shared.cancelled.load(Ordering::SeqCst)
6618 || cancelled.load(Ordering::SeqCst)
6619 {
6620 break;
6621 }
6622 match stream.next() {
6623 Some(Ok(item)) => local_batch.push_back(item),
6624 Some(Err(e)) => {
6625 terminal_result = Some(Err(e));
6626 break;
6627 }
6628 None => {
6629 terminal_result = Some(Ok(()));
6630 break;
6631 }
6632 }
6633 }
6634
6635 if guard.shared.cancelled.load(Ordering::SeqCst)
6637 || cancelled.load(Ordering::SeqCst)
6638 {
6639 inline_guard.finish(Ok(()));
6640 continue;
6641 }
6642
6643 publish_ready_batch(&guard.shared, stream_id, &lane, local_batch);
6646
6647 match terminal_result {
6648 Some(Err(e)) => {
6649 inline_guard.finish(Err(e));
6652 }
6653 Some(Ok(())) | None => {
6654 inline_guard.finish(Ok(()));
6655 }
6656 }
6657 } else {
6658 inline_guard.hand_off();
6660 let worker_shared = Arc::clone(&guard.shared);
6661 let worker_lane = Arc::clone(&lane);
6662 workers.insert(
6663 stream_id,
6664 worker_materializer.spawn_stream(move |inner_cancelled| {
6665 let mut worker_guard = FlatMapMergeReadyWorkerGuard::new(
6666 Arc::clone(&worker_shared),
6667 stream_id,
6668 );
6669
6670 loop {
6671 if inner_cancelled.load(Ordering::SeqCst)
6672 || worker_shared.cancelled.load(Ordering::SeqCst)
6673 {
6674 worker_guard.finish(Ok(()));
6675 return Ok(NotUsed);
6676 }
6677
6678 let capacity;
6680 {
6681 let mut ls = worker_lane
6682 .state
6683 .lock()
6684 .unwrap_or_else(|p| p.into_inner());
6685 while ls.buffer.len() >= FLAT_MAP_MERGE_SUBSTREAM_WINDOW
6686 && !worker_shared.cancelled.load(Ordering::SeqCst)
6687 && !ls.closed
6688 && !inner_cancelled.load(Ordering::SeqCst)
6689 {
6690 ls = worker_lane
6691 .space_available
6692 .wait(ls)
6693 .unwrap_or_else(|p| p.into_inner());
6694 }
6695 if worker_shared.cancelled.load(Ordering::SeqCst)
6696 || ls.closed
6697 || inner_cancelled.load(Ordering::SeqCst)
6698 {
6699 drop(ls);
6700 worker_guard.finish(Ok(()));
6701 return Ok(NotUsed);
6702 }
6703 capacity =
6704 FLAT_MAP_MERGE_SUBSTREAM_WINDOW - ls.buffer.len();
6705 }
6706 let batch_size = capacity.min(FLAT_MAP_MERGE_READY_BATCH);
6710 let mut local_batch =
6711 std::collections::VecDeque::with_capacity(batch_size);
6712 let mut terminal_result: Option<Result<(), StreamError>> = None;
6713 for _ in 0..batch_size {
6714 if inner_cancelled.load(Ordering::SeqCst)
6715 || worker_shared.cancelled.load(Ordering::SeqCst)
6716 {
6717 break;
6718 }
6719 match stream.next() {
6720 Some(Ok(item)) => local_batch.push_back(item),
6721 Some(Err(e)) => {
6722 terminal_result = Some(Err(e));
6723 break;
6724 }
6725 None => {
6726 terminal_result = Some(Ok(()));
6727 break;
6728 }
6729 }
6730 }
6731 let batch_len = local_batch.len();
6732
6733 publish_ready_batch(
6736 &worker_shared,
6737 stream_id,
6738 &worker_lane,
6739 local_batch,
6740 );
6741
6742 match terminal_result {
6743 Some(Ok(())) => {
6744 worker_guard.finish(Ok(()));
6745 return Ok(NotUsed);
6746 }
6747 Some(Err(e)) => {
6748 worker_guard.finish(Err(e.clone()));
6749 return Err(e);
6750 }
6751 None => {
6752 if batch_len == 0 {
6753 worker_guard.finish(Ok(()));
6754 return Ok(NotUsed);
6755 }
6756 }
6757 }
6758 }
6759 }),
6760 );
6761 }
6762 }
6763 Some(Err(error)) => {
6764 let lanes: Vec<Arc<FlatMapMergeLane<Next>>> = {
6765 let mut coord = guard
6766 .shared
6767 .coordinator
6768 .lock()
6769 .unwrap_or_else(|p| p.into_inner());
6770 if coord.terminal.is_none() {
6771 coord.terminal = Some(error.clone());
6772 }
6773 coord.generation += 1;
6774 coord.lanes.values().cloned().collect()
6775 };
6776 guard.shared.cancelled.store(true, Ordering::SeqCst);
6777 guard.shared.available.notify_all();
6778 for lane in lanes {
6779 lane.space_available.notify_all();
6780 }
6781 guard.finish();
6782 return Err(error);
6783 }
6784 None => {
6785 guard.finish();
6786 break;
6787 }
6788 }
6789 }
6790
6791 if guard.armed {
6792 guard.finish();
6793 }
6794
6795 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
6799 workers.retain(|_, c| c.try_wait().is_none());
6800 let coord = guard
6801 .shared
6802 .coordinator
6803 .lock()
6804 .unwrap_or_else(|p| p.into_inner());
6805 if coord.active_streams == 0 {
6806 return Ok(NotUsed);
6807 }
6808 let seen = coord.generation;
6809 drop(
6810 guard
6811 .shared
6812 .available
6813 .wait_while(coord, |s| s.generation == seen && s.active_streams > 0)
6814 .unwrap_or_else(|p| p.into_inner()),
6815 );
6816 }
6817 Ok(NotUsed)
6818 });
6819
6820 Box::new(FlatMapMergeReadyStream {
6821 shared,
6822 completion: Some(completion),
6823 local_batch: std::collections::VecDeque::new(),
6824 })
6825}
6826
6827fn flat_map_concat_stream<Out, Next, NextMat, F>(
6828 mut input: BoxStream<Out>,
6829 stage: Arc<F>,
6830 materializer: &Materializer,
6831) -> BoxStream<Next>
6832where
6833 Out: Send + 'static,
6834 Next: Send + 'static,
6835 NextMat: Send + 'static,
6836 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
6837{
6838 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
6839 let mut current: Option<BoxStream<Next>> = None;
6840 Box::new(std::iter::from_fn(move || {
6841 loop {
6842 if let Some(stream) = current.as_mut() {
6843 match stream.next() {
6844 Some(item) => return Some(item),
6845 None => current = None,
6846 }
6847 }
6848
6849 match input.next() {
6850 Some(Ok(item)) => {
6851 let source = stage(item);
6852 current = Some(match Arc::clone(&source.factory).create(&materializer) {
6853 Ok((stream, _)) => stream,
6854 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
6855 });
6856 }
6857 Some(Err(error)) => return Some(Err(error)),
6858 None => return None,
6859 }
6860 }
6861 }))
6862}
6863
6864fn map_async_unordered<Out, Next, F, Fut>(
6865 mut input: BoxStream<Out>,
6866 parallelism: usize,
6867 stage: Arc<F>,
6868) -> BoxStream<Next>
6869where
6870 Out: Send + 'static,
6871 Next: Send + 'static,
6872 F: Fn(Out) -> Fut + Send + Sync + 'static,
6873 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
6874{
6875 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
6876 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
6877 let mut next_task_id = 0_usize;
6878 let mut input_done = false;
6879
6880 Box::new(std::iter::from_fn(move || {
6881 loop {
6882 while tasks.len() < parallelism && !input_done {
6883 match input.next() {
6884 Some(Ok(item)) => match poll_once_or_pending(stage(item)) {
6885 Ok(result) => return Some(result),
6886 Err(future) => {
6887 let task_id = next_task_id;
6888 next_task_id += 1;
6889 tasks.insert(
6890 task_id,
6891 spawn_completion_task(task_id, future, sender.clone(), |result| {
6892 result
6893 }),
6894 );
6895 }
6896 },
6897 Some(Err(error)) => {
6898 input_done = true;
6899 return Some(Err(error));
6900 }
6901 None => input_done = true,
6902 }
6903 }
6904
6905 if tasks.is_empty() {
6906 return None;
6907 }
6908
6909 if let Some((task_id, result)) = recv_completion(&receiver) {
6910 tasks.remove(&task_id);
6911 return Some(result);
6912 }
6913 }
6914 }))
6915}
6916
6917fn map_async_unordered_supervised<Out, Next, F, Fut>(
6918 mut input: BoxStream<Out>,
6919 parallelism: usize,
6920 stage: Arc<F>,
6921 decider: SupervisionDecider,
6922) -> BoxStream<Next>
6923where
6924 Out: Send + 'static,
6925 Next: Send + 'static,
6926 F: Fn(Out) -> Fut + Send + Sync + 'static,
6927 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
6928{
6929 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
6930 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
6931 let mut next_task_id = 0_usize;
6932 let mut input_done = false;
6933
6934 Box::new(std::iter::from_fn(move || {
6935 loop {
6936 while tasks.len() < parallelism && !input_done {
6937 match input.next() {
6938 Some(Ok(item)) => {
6939 match catch_unwind(AssertUnwindSafe(|| poll_once_or_pending(stage(item)))) {
6940 Ok(Ok(result)) => {
6941 if let Some(result) = supervise_async_result(result, &decider) {
6942 return Some(result);
6943 }
6944 }
6945 Ok(Err(future)) => {
6946 let task_id = next_task_id;
6947 next_task_id += 1;
6948 tasks.insert(
6949 task_id,
6950 spawn_completion_task(
6951 task_id,
6952 future,
6953 sender.clone(),
6954 |result| result,
6955 ),
6956 );
6957 }
6958 Err(_) => {
6959 let error = panic_stream_error("map_async_unordered callback");
6960 if let Some(result) = supervise_async_result(Err(error), &decider) {
6961 return Some(result);
6962 }
6963 }
6964 }
6965 }
6966 Some(Err(error)) => {
6967 input_done = true;
6968 return Some(Err(error));
6969 }
6970 None => input_done = true,
6971 }
6972 }
6973
6974 if tasks.is_empty() {
6975 return None;
6976 }
6977
6978 if let Some((task_id, result)) = recv_completion(&receiver) {
6979 tasks.remove(&task_id);
6980 if let Some(result) = supervise_async_result(result, &decider) {
6981 return Some(result);
6982 }
6983 }
6984 }
6985 }))
6986}
6987
6988#[inline(always)]
6989fn map_async_partitioned_serial<Out, Key, Next, Partition, F, Fut>(
6990 mut input: BoxStream<Out>,
6991 partition: Arc<Partition>,
6992 stage: Arc<F>,
6993) -> BoxStream<Next>
6994where
6995 Out: Send + 'static,
6996 Key: Send + 'static,
6997 Next: Send + 'static,
6998 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
6999 F: Fn(Out) -> Fut + Send + Sync + 'static,
7000 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7001{
7002 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
7003 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(1);
7004 let mut next_index = 0_usize;
7005 Box::new(std::iter::from_fn(move || {
7006 if !tasks.is_empty()
7010 && let Some((task_id, result)) = recv_completion(&receiver)
7011 {
7012 tasks.remove(&task_id);
7013 return Some(result);
7014 }
7015
7016 let item = input.next()?;
7017 match item {
7018 Ok(item) => {
7019 let _ = partition(&item);
7020 let index = next_index;
7021 next_index += 1;
7022 Some(match poll_once_or_pending(stage(item)) {
7023 Ok(result) => result,
7024 Err(future) => {
7025 tasks.insert(
7026 index,
7027 spawn_completion_task(index, future, sender.clone(), |result| result),
7028 );
7029 let (task_id, result) =
7030 recv_completion(&receiver).expect("pending map_async task completion");
7031 tasks.remove(&task_id);
7032 result
7033 }
7034 })
7035 }
7036 Err(error) => Some(Err(error)),
7037 }
7038 }))
7039}
7040
7041#[inline(always)]
7042fn map_async_partitioned_scanning<Out, Key, Next, Partition, F, Fut>(
7043 mut input: BoxStream<Out>,
7044 parallelism: usize,
7045 per_partition: usize,
7046 partition: Arc<Partition>,
7047 stage: Arc<F>,
7048) -> BoxStream<Next>
7049where
7050 Out: Send + 'static,
7051 Key: Clone + Eq + Hash + Send + 'static,
7052 Next: Send + 'static,
7053 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
7054 F: Fn(Out) -> Fut + Send + Sync + 'static,
7055 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7056{
7057 let (sender, receiver) = std::sync::mpsc::channel::<(usize, (Key, StreamResult<Next>))>();
7058 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
7059 let mut active_by_key = HashMap::<Key, usize>::with_capacity(parallelism);
7060 let mut pending = VecDeque::<(usize, Key, Out)>::with_capacity(parallelism);
7061 let mut completed = BTreeMap::<usize, StreamResult<Next>>::new();
7062 let mut next_index = 0_usize;
7063 let mut next_to_emit = 0_usize;
7064 let mut input_done = false;
7065
7066 Box::new(std::iter::from_fn(move || {
7067 loop {
7068 if let Some(result) = completed.remove(&next_to_emit) {
7069 next_to_emit += 1;
7070 return Some(result);
7071 }
7072
7073 while tasks.len() + completed.len() < parallelism {
7074 let next = pending
7075 .iter()
7076 .position(|(_, key, _)| {
7077 active_by_key.get(key).copied().unwrap_or(0) < per_partition
7078 })
7079 .and_then(|index| pending.remove(index))
7080 .or_else(|| {
7081 if input_done {
7082 return None;
7083 }
7084 match input.next() {
7085 Some(Ok(item)) => {
7086 let key = partition(&item);
7087 let index = next_index;
7088 next_index += 1;
7089 Some((index, key, item))
7090 }
7091 Some(Err(error)) => {
7092 completed.insert(next_index, Err(error));
7093 next_index += 1;
7094 input_done = true;
7095 None
7096 }
7097 None => {
7098 input_done = true;
7099 None
7100 }
7101 }
7102 });
7103
7104 let Some((index, key, item)) = next else {
7105 break;
7106 };
7107 if active_by_key.get(&key).copied().unwrap_or(0) >= per_partition {
7108 pending.push_back((index, key, item));
7109 if input_done || pending.len() >= parallelism {
7110 break;
7111 }
7112 continue;
7113 }
7114 *active_by_key.entry(key.clone()).or_default() += 1;
7115 match poll_once_or_pending(stage(item)) {
7116 Ok(result) => {
7117 if let Some(count) = active_by_key.get_mut(&key) {
7118 *count -= 1;
7119 if *count == 0 {
7120 active_by_key.remove(&key);
7121 }
7122 }
7123 completed.insert(index, result);
7124 }
7125 Err(future) => {
7126 tasks.insert(
7127 index,
7128 spawn_completion_task(index, future, sender.clone(), move |result| {
7129 (key, result)
7130 }),
7131 );
7132 }
7133 }
7134 }
7135
7136 if let Some(result) = completed.remove(&next_to_emit) {
7137 next_to_emit += 1;
7138 return Some(result);
7139 }
7140
7141 if tasks.is_empty() {
7142 return None;
7143 }
7144
7145 if let Some((index, (key, result))) = recv_completion(&receiver) {
7146 tasks.remove(&index);
7147 if let Some(count) = active_by_key.get_mut(&key) {
7148 *count -= 1;
7149 if *count == 0 {
7150 active_by_key.remove(&key);
7151 }
7152 }
7153 if index == next_to_emit {
7154 next_to_emit += 1;
7155 return Some(result);
7156 }
7157 completed.insert(index, result);
7158 }
7159 }
7160 }))
7161}
7162
7163fn map_async_partitioned<Out, Key, Next, Partition, F, Fut>(
7164 mut input: BoxStream<Out>,
7165 parallelism: usize,
7166 per_partition: usize,
7167 partition: Arc<Partition>,
7168 stage: Arc<F>,
7169) -> BoxStream<Next>
7170where
7171 Out: Send + 'static,
7172 Key: Clone + Eq + Hash + Send + 'static,
7173 Next: Send + 'static,
7174 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
7175 F: Fn(Out) -> Fut + Send + Sync + 'static,
7176 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7177{
7178 if parallelism == 1 {
7179 return map_async_partitioned_serial(input, partition, stage);
7180 }
7181 if parallelism <= 4 {
7183 return map_async_partitioned_scanning(input, parallelism, per_partition, partition, stage);
7184 }
7185
7186 let (sender, receiver) = std::sync::mpsc::channel::<(usize, (usize, StreamResult<Next>))>();
7187 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
7188 let mut slots_by_key = HashMap::<Key, usize>::with_capacity(parallelism);
7189 let mut slots = Vec::<PartitionSlot<Key, Out>>::with_capacity(parallelism);
7190 let mut free_slots = Vec::<usize>::new();
7191 let mut ready_slots = VecDeque::<usize>::with_capacity(parallelism);
7192 let mut completed = BTreeMap::<usize, StreamResult<Next>>::new();
7193 let mut next_index = 0_usize;
7194 let mut next_to_emit = 0_usize;
7195 let mut input_done = false;
7196 Box::new(std::iter::from_fn(move || {
7197 loop {
7198 if let Some(result) = completed.remove(&next_to_emit) {
7199 next_to_emit += 1;
7200 return Some(result);
7201 }
7202
7203 while tasks.len() + completed.len() < parallelism {
7204 if let Some((index, slot, item)) =
7205 pop_ready_partition_slot(&mut slots, &mut ready_slots, per_partition)
7206 {
7207 match poll_once_or_pending(stage(item)) {
7208 Ok(result) => {
7209 let mut remove_empty = false;
7210 if let Some(state) = slots.get_mut(slot) {
7211 state.active -= 1;
7212 remove_empty = state.active == 0
7213 && state.queued.is_empty()
7214 && !state.in_ready_queue;
7215 }
7216 if remove_empty {
7217 retire_partition_slot(
7218 slot,
7219 &mut slots_by_key,
7220 &mut slots,
7221 &mut free_slots,
7222 );
7223 } else {
7224 ready_partition_slot(
7225 &mut slots,
7226 &mut ready_slots,
7227 slot,
7228 per_partition,
7229 );
7230 }
7231 if index == next_to_emit {
7232 next_to_emit += 1;
7233 return Some(result);
7234 }
7235 completed.insert(index, result);
7236 }
7237 Err(future) => {
7238 tasks.insert(
7239 index,
7240 spawn_completion_task(
7241 index,
7242 future,
7243 sender.clone(),
7244 move |result| (slot, result),
7245 ),
7246 );
7247 }
7248 }
7249 continue;
7250 }
7251
7252 if input_done {
7253 break;
7254 }
7255
7256 match input.next() {
7257 Some(Ok(item)) => {
7258 let key = partition(&item);
7259 let index = next_index;
7260 next_index += 1;
7261 let slot =
7262 partition_slot_for(key, &mut slots_by_key, &mut slots, &mut free_slots);
7263 let state = &mut slots[slot];
7264 if state.active < per_partition {
7265 match poll_once_or_pending(stage(item)) {
7266 Ok(result) => {
7267 if index == next_to_emit {
7268 next_to_emit += 1;
7269 if state.queued.is_empty() && !state.in_ready_queue {
7270 retire_partition_slot(
7271 slot,
7272 &mut slots_by_key,
7273 &mut slots,
7274 &mut free_slots,
7275 );
7276 }
7277 return Some(result);
7278 }
7279 completed.insert(index, result);
7280 if state.queued.is_empty() && !state.in_ready_queue {
7281 retire_partition_slot(
7282 slot,
7283 &mut slots_by_key,
7284 &mut slots,
7285 &mut free_slots,
7286 );
7287 }
7288 }
7289 Err(future) => {
7290 state.active += 1;
7291 tasks.insert(
7292 index,
7293 spawn_completion_task(
7294 index,
7295 future,
7296 sender.clone(),
7297 move |result| (slot, result),
7298 ),
7299 );
7300 }
7301 }
7302 } else {
7303 state.queued.push_back((index, item));
7304 }
7305 }
7306 Some(Err(error)) => {
7307 completed.insert(next_index, Err(error));
7308 next_index += 1;
7309 input_done = true;
7310 break;
7311 }
7312 None => {
7313 input_done = true;
7314 break;
7315 }
7316 }
7317 }
7318
7319 if let Some(result) = completed.remove(&next_to_emit) {
7320 next_to_emit += 1;
7321 return Some(result);
7322 }
7323
7324 if tasks.is_empty() {
7325 return None;
7326 }
7327
7328 if let Some((index, (slot, result))) = recv_completion(&receiver) {
7329 tasks.remove(&index);
7330 let mut remove_empty = false;
7331 if let Some(state) = slots.get_mut(slot) {
7332 state.active -= 1;
7333 remove_empty =
7334 state.active == 0 && state.queued.is_empty() && !state.in_ready_queue;
7335 }
7336 if remove_empty {
7337 retire_partition_slot(slot, &mut slots_by_key, &mut slots, &mut free_slots);
7338 } else {
7339 ready_partition_slot(&mut slots, &mut ready_slots, slot, per_partition);
7340 }
7341 if index == next_to_emit {
7342 next_to_emit += 1;
7343 return Some(result);
7344 }
7345 completed.insert(index, result);
7346 }
7347 }
7348 }))
7349}
7350
7351#[cfg(test)]
7352mod flat_map_merge_ready_ring_tests {
7353 use super::*;
7354 use std::sync::mpsc;
7355 use std::time::Duration;
7356
7357 fn run_sorted<T: Ord + Send + 'static>(source: crate::Source<T>) -> Vec<T> {
7358 let mut v = source.run_collect().unwrap();
7359 v.sort_unstable();
7360 v
7361 }
7362
7363 #[test]
7364 fn ready_ring_empty_upstream() {
7365 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7366 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7367 });
7368 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7369 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7370 });
7371 assert_eq!(legacy, ring);
7372 assert!(ring.is_empty());
7373 }
7374
7375 #[test]
7376 fn ready_ring_single_lane() {
7377 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7378 run_sorted(crate::Source::single(42_i32).flat_map_merge(4, crate::Source::single))
7379 });
7380 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7381 run_sorted(crate::Source::single(42_i32).flat_map_merge(4, crate::Source::single))
7382 });
7383 assert_eq!(legacy, ring);
7384 assert_eq!(ring, vec![42]);
7385 }
7386
7387 #[test]
7388 fn ready_ring_breadth_one_exact_order() {
7389 let make = || {
7391 crate::Source::from_iter(0_i32..5)
7392 .flat_map_merge(1, |x| crate::Source::single(x * 10))
7393 .run_collect()
7394 .unwrap()
7395 };
7396 let mut legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7397 let mut ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7398 legacy.sort_unstable();
7399 ring.sort_unstable();
7400 assert_eq!(legacy, ring);
7401 }
7402
7403 #[test]
7404 fn ready_ring_breadth_gt_input() {
7405 let make = || {
7406 run_sorted(
7407 crate::Source::from_iter(0_i32..3)
7408 .flat_map_merge(100, |x| crate::Source::from_iter([x, x + 1, x + 2])),
7409 )
7410 };
7411 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7412 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7413 assert_eq!(legacy, ring);
7414 assert_eq!(ring.len(), 9);
7415 }
7416
7417 #[test]
7418 fn ready_ring_mixed_short_long() {
7419 let make = || {
7420 run_sorted(crate::Source::from_iter(0_i32..8).flat_map_merge(4, |x| {
7421 if x % 3 == 0 {
7422 crate::Source::from_iter(0..20_i32).map(move |i| x * 100 + i)
7423 } else {
7424 crate::Source::single(x)
7425 }
7426 }))
7427 };
7428 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7429 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7430 assert_eq!(legacy, ring);
7431 }
7432
7433 #[test]
7434 fn ready_ring_respects_breadth_bound() {
7435 use std::sync::atomic::{AtomicUsize, Ordering as Ord};
7436 let active = Arc::new(AtomicUsize::new(0));
7437 let max_active = Arc::new(AtomicUsize::new(0));
7438 let a2 = Arc::clone(&active);
7439 let m2 = Arc::clone(&max_active);
7440 let mut values = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7441 crate::Source::from_iter(0_i32..6)
7442 .flat_map_merge(2, move |item| {
7443 let a = Arc::clone(&a2);
7444 let m = Arc::clone(&m2);
7445 crate::Source::future(move || {
7446 let a = Arc::clone(&a);
7447 let m = Arc::clone(&m);
7448 async move {
7449 let now = a.fetch_add(1, Ord::SeqCst) + 1;
7450 let mut seen = m.load(Ord::SeqCst);
7451 while now > seen {
7452 match m.compare_exchange(seen, now, Ord::SeqCst, Ord::SeqCst) {
7453 Ok(_) => break,
7454 Err(v) => seen = v,
7455 }
7456 }
7457 thread::sleep(Duration::from_millis(20));
7458 a.fetch_sub(1, Ord::SeqCst);
7459 Ok(item)
7460 }
7461 })
7462 })
7463 .run_collect()
7464 .unwrap()
7465 });
7466 values.sort_unstable();
7467 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
7468 assert!(max_active.load(std::sync::atomic::Ordering::SeqCst) <= 2);
7469 }
7470
7471 #[test]
7472 fn ready_ring_fairness_slow_lane_not_starved() {
7473 use std::sync::atomic::{AtomicBool, Ordering as Ord};
7474 let slow_emitted = Arc::new(AtomicBool::new(false));
7475 let slow_flag = Arc::clone(&slow_emitted);
7476 let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7477 crate::Source::from_iter(0_i32..3)
7478 .flat_map_merge(4, move |lane_id| {
7479 let flag = Arc::clone(&slow_flag);
7480 match lane_id {
7481 0 => crate::Source::from_iter(0_i32..50),
7482 1 => crate::Source::from_iter(100_i32..150),
7483 _ => crate::Source::future(move || {
7484 let flag = Arc::clone(&flag);
7485 async move {
7486 thread::sleep(Duration::from_millis(10));
7487 flag.store(true, Ord::SeqCst);
7488 Ok(999_i32)
7489 }
7490 }),
7491 }
7492 })
7493 .run_collect()
7494 .unwrap()
7495 });
7496 assert!(slow_emitted.load(std::sync::atomic::Ordering::SeqCst));
7497 assert!(results.contains(&999));
7498 assert_eq!(results.len(), 101);
7499 }
7500
7501 #[test]
7502 fn ready_ring_inner_failure_no_hang() {
7503 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7504 crate::Source::from_iter(0_i32..8)
7505 .flat_map_merge(4, |x| {
7506 if x == 3 {
7507 crate::Source::failed(StreamError::Failed("lane-fail".into()))
7508 } else {
7509 crate::Source::from_iter(0_i32..10)
7510 }
7511 })
7512 .run_collect()
7513 });
7514 assert_eq!(result, Err(StreamError::Failed("lane-fail".into())));
7515 }
7516
7517 #[test]
7518 fn ready_ring_factory_failure_propagates() {
7519 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7520 crate::Source::from_iter(0_i32..4)
7521 .flat_map_merge(2, |x| {
7522 if x == 2 {
7523 crate::Source::failed(StreamError::Failed("factory-fail".into()))
7524 } else {
7525 crate::Source::single(x)
7526 }
7527 })
7528 .run_collect()
7529 });
7530 assert!(result.is_err());
7531 }
7532
7533 #[test]
7534 fn ready_ring_closure_not_under_coordinator_lock() {
7535 let guard_mutex = Arc::new(std::sync::Mutex::<()>::new(()));
7536 let gm = Arc::clone(&guard_mutex);
7537 let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7538 crate::Source::from_iter(0_i32..10)
7539 .flat_map_merge(4, move |x| {
7540 let _lock = gm.lock().unwrap();
7541 crate::Source::single(x)
7542 })
7543 .run_collect()
7544 .unwrap()
7545 });
7546 assert_eq!(results.len(), 10);
7547 }
7548
7549 #[test]
7556 fn ready_ring_bounded_memory_producer_blocks_at_window() {
7557 const WINDOW: usize = FLAT_MAP_MERGE_SUBSTREAM_WINDOW;
7558 const EXTRA: usize = 4;
7559 let (gate_tx, gate_rx) = mpsc::channel::<()>();
7562 let gate_tx = Arc::new(std::sync::Mutex::new(gate_tx));
7563 let gate_tx2 = Arc::clone(&gate_tx);
7564 let produced = Arc::new(std::sync::atomic::AtomicUsize::new(0));
7565 let prod2 = Arc::clone(&produced);
7566
7567 let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7568 crate::Source::single(0_i32)
7569 .flat_map_merge(2, move |_| {
7570 let tx = Arc::clone(&gate_tx2);
7571 let prod = Arc::clone(&prod2);
7572 crate::Source::from_factory(move || {
7573 let tx = Arc::clone(&tx);
7574 let prod = Arc::clone(&prod);
7575 let mut i = 0_i32;
7576 Box::new(std::iter::from_fn(move || {
7577 if i as usize >= WINDOW + EXTRA {
7578 return None;
7579 }
7580 if i as usize == WINDOW {
7581 let _ = tx.lock().unwrap().send(());
7583 }
7584 prod.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
7585 i += 1;
7586 Some(Ok(i))
7587 }))
7588 })
7589 })
7590 .run_with(crate::Sink::queue())
7591 .unwrap()
7592 });
7593
7594 let mut total = 0;
7596 while queue.pull().unwrap().is_some() {
7597 total += 1;
7598 }
7599 let signal = gate_rx.recv_timeout(Duration::from_secs(1));
7601 assert!(signal.is_ok(), "producer never reached the window boundary");
7602 assert_eq!(total, WINDOW + EXTRA);
7603 }
7604
7605 #[test]
7606 fn ready_ring_cancellation_wakes_blocked_lanes() {
7607 let rt = crate::stream::runtime::Runtime::new();
7608 let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7609 crate::Source::from_iter(0_i32..4)
7610 .flat_map_merge(4, |_| crate::Source::repeat(1_i32))
7611 .run_with_materializer(crate::Sink::queue(), &rt)
7612 .unwrap()
7613 });
7614 for _ in 0..8 {
7615 let _ = queue.pull();
7616 }
7617 drop(queue);
7618 rt.shutdown();
7619 }
7620
7621 #[test]
7622 fn ready_ring_lost_wakeup_stress() {
7623 for _ in 0..20 {
7624 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7625 crate::Source::from_iter(0_i32..50)
7626 .flat_map_merge(8, |item| {
7627 crate::Source::from_iter([item, item + 1, item + 2])
7628 })
7629 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v as i64))
7630 .unwrap()
7631 .wait()
7632 });
7633 assert_eq!(result, Ok(3825), "lost-wakeup stress: wrong sum");
7634 }
7635 }
7636
7637 #[test]
7638 fn ready_ring_concurrent_streams_lost_wakeup_stress() {
7639 const STREAMS: usize = 32;
7640 const ROUNDS: usize = 8;
7641 const EXPECTED: i64 = 998_080;
7642
7643 for _ in 0..ROUNDS {
7644 let barrier = Arc::new(std::sync::Barrier::new(STREAMS));
7645 let mut handles = Vec::with_capacity(STREAMS);
7646 for _ in 0..STREAMS {
7647 let barrier = Arc::clone(&barrier);
7648 handles.push(thread::spawn(move || {
7649 barrier.wait();
7650 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7651 crate::Source::from_iter(0_i64..32)
7652 .flat_map_merge(8, |item| {
7653 crate::Source::from_iter(item * 100..item * 100 + 20)
7654 })
7655 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
7656 .unwrap()
7657 .wait()
7658 });
7659 assert_eq!(result, Ok(EXPECTED), "concurrent ready-ring sum");
7660 }));
7661 }
7662
7663 for handle in handles {
7664 handle.join().expect("ready-ring stress worker panicked");
7665 }
7666 }
7667 }
7668
7669 #[test]
7670 fn ready_ring_tail_loop_stress() {
7671 for _ in 0..20 {
7672 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7673 crate::Source::from_iter(0_i64..100)
7674 .flat_map_merge(16, |item| crate::Source::from_iter([item, item + 1000]))
7675 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
7676 .unwrap()
7677 .wait()
7678 });
7679 assert_eq!(result, Ok(109_900), "tail-loop stress: wrong sum");
7680 }
7681 }
7682
7683 #[test]
7684 fn ready_ring_auto_mode_matches_ring() {
7685 let make = || {
7686 run_sorted(
7687 crate::Source::from_iter(0_i32..10)
7688 .flat_map_merge(4, |x| crate::Source::from_iter([x, x + 100])),
7689 )
7690 };
7691 let auto = with_substream_mode(SubstreamExecutorMode::Auto, make);
7692 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7693 assert_eq!(auto, ring);
7694 }
7695}
7696
7697#[cfg(test)]
7699mod inline_micro_source_tests {
7700 use super::*;
7701 use crate::stream::source::test_source_with_inline_micro_hint;
7702 use std::sync::mpsc;
7703
7704 fn run_sorted<T: Ord + Send + 'static>(source: crate::Source<T>) -> Vec<T> {
7705 let mut v = source.run_collect().unwrap();
7706 v.sort_unstable();
7707 v
7708 }
7709
7710 #[test]
7713 fn inline_empty_upstream() {
7714 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7715 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7716 });
7717 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7718 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7719 });
7720 assert_eq!(legacy, ring);
7721 assert!(ring.is_empty());
7722 }
7723
7724 #[test]
7725 fn inline_single_inner_source() {
7726 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7727 run_sorted(
7728 crate::Source::single(99_i32).flat_map_merge(4, |x| crate::Source::single(x * 2)),
7729 )
7730 });
7731 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7732 run_sorted(
7733 crate::Source::single(99_i32).flat_map_merge(4, |x| crate::Source::single(x * 2)),
7734 )
7735 });
7736 assert_eq!(legacy, ring);
7737 assert_eq!(ring, vec![198]);
7738 }
7739
7740 #[test]
7741 fn inline_breadth_one_exact_order() {
7742 let make = || {
7744 crate::Source::from_iter(0_i32..6)
7745 .flat_map_merge(1, |x| {
7746 crate::Source::from_iter([x * 10, x * 10 + 1, x * 10 + 2, x * 10 + 3])
7747 })
7748 .run_collect()
7749 .unwrap()
7750 };
7751 let mut legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7752 let mut ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7753 legacy.sort_unstable();
7754 ring.sort_unstable();
7755 assert_eq!(legacy, ring);
7756 }
7757
7758 #[test]
7759 fn inline_breadth_gt_input() {
7760 let make = || {
7761 run_sorted(
7762 crate::Source::from_iter(0_i32..3)
7763 .flat_map_merge(100, |x| crate::Source::from_iter([x, x + 1, x + 2, x + 3])),
7764 )
7765 };
7766 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7767 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7768 assert_eq!(legacy, ring);
7769 assert_eq!(ring.len(), 12);
7770 }
7771
7772 #[test]
7773 fn inline_mixed_empty_single_four_item() {
7774 let make = || {
7775 run_sorted(
7776 crate::Source::from_iter(0_i32..12).flat_map_merge(4, |x| match x % 3 {
7777 0 => crate::Source::empty(),
7778 1 => crate::Source::single(x),
7779 _ => crate::Source::from_iter([x, x + 100, x + 200, x + 300]),
7780 }),
7781 )
7782 };
7783 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7784 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7785 assert_eq!(legacy, ring);
7786 }
7787
7788 #[test]
7789 fn inline_2k_x4_b8_benchmark_shape() {
7790 let make = || {
7791 run_sorted(
7792 crate::Source::from_iter(0_i32..2_000).flat_map_merge(8, |item| {
7793 crate::Source::from_iter([item, item + 1, item + 2, item + 3])
7794 }),
7795 )
7796 };
7797 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7798 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7799 assert_eq!(legacy, ring);
7800 assert_eq!(ring.len(), 8_000);
7801 }
7802
7803 #[test]
7806 fn inline_lost_wakeup_stress() {
7807 for _ in 0..20 {
7808 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7809 crate::Source::from_iter(0_i32..50)
7810 .flat_map_merge(8, |item| {
7811 crate::Source::from_iter([item, item + 1, item + 2, item + 3])
7812 })
7813 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v as i64))
7814 .unwrap()
7815 .wait()
7816 });
7817 assert_eq!(result, Ok(5200), "lost-wakeup stress: wrong sum");
7821 }
7822 }
7823
7824 #[test]
7825 fn inline_tail_loop_stress() {
7826 for _ in 0..20 {
7827 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7828 crate::Source::from_iter(0_i64..100)
7829 .flat_map_merge(16, |item| crate::Source::from_iter([item, item + 1000]))
7830 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
7831 .unwrap()
7832 .wait()
7833 });
7834 assert_eq!(result, Ok(109_900), "tail-loop stress: wrong sum");
7836 }
7837 }
7838
7839 #[test]
7842 fn inline_large_source_uses_worker_fallback() {
7843 let make = || {
7846 run_sorted(crate::Source::from_iter(0_i32..4).flat_map_merge(2, |x| {
7847 crate::Source::from_iter(0_i32..20).map(move |i| x * 100 + i)
7848 }))
7849 };
7850 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7851 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7852 assert_eq!(legacy, ring);
7853 assert_eq!(ring.len(), 80);
7854 }
7855
7856 #[test]
7859 fn inline_inner_error_before_any_item() {
7860 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7861 crate::Source::from_iter(0_i32..4)
7862 .flat_map_merge(2, |x| {
7863 if x == 1 {
7864 crate::Source::failed(StreamError::Failed("inline-err".into()))
7865 } else {
7866 crate::Source::from_iter([x, x + 1])
7867 }
7868 })
7869 .run_collect()
7870 });
7871 assert_eq!(result, Err(StreamError::Failed("inline-err".into())));
7872 }
7873
7874 #[test]
7875 fn inline_inner_error_after_some_items() {
7876 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7880 crate::Source::single(0_i32)
7881 .flat_map_merge(2, |_x| {
7882 test_source_with_inline_micro_hint(
7883 || {
7884 let mut count = 0;
7885 Box::new(std::iter::from_fn(move || {
7886 count += 1;
7887 match count {
7888 1 => Some(Ok(42_i32)),
7889 2 => Some(Err(StreamError::Failed("after-items".into()))),
7890 _ => None,
7891 }
7892 }))
7893 },
7894 1, )
7896 })
7897 .run_collect()
7898 });
7899 assert!(result.is_err());
7903 }
7904
7905 #[test]
7906 fn inline_worker_lane_fails_during_inline_drain() {
7907 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7910 crate::Source::from_iter(0_i32..10)
7911 .flat_map_merge(2, |x| {
7912 if x == 5 {
7913 crate::Source::failed(StreamError::Failed("worker-fail".into()))
7914 } else if x % 2 == 0 {
7915 crate::Source::from_iter([x, x + 1, x + 2])
7917 } else {
7918 crate::Source::from_iter(0_i32..40).map(move |i| x * 100 + i)
7920 }
7921 })
7922 .run_collect()
7923 });
7924 assert!(result.is_err());
7925 }
7926
7927 #[test]
7930 fn inline_cancellation_drop_output() {
7931 let rt = crate::stream::runtime::Runtime::new();
7934 let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7935 crate::Source::from_iter(0_i32..100)
7936 .flat_map_merge(8, |x| {
7937 if x % 3 == 0 {
7938 crate::Source::from_iter([x, x + 1, x + 2, x + 3]) } else {
7940 crate::Source::repeat(x) }
7942 })
7943 .run_with_materializer(crate::Sink::queue(), &rt)
7944 .unwrap()
7945 });
7946 for _ in 0..16 {
7948 let _ = queue.pull();
7949 }
7950 drop(queue);
7951 rt.shutdown();
7952 }
7953
7954 #[test]
7957 fn inline_next_not_under_coordinator_lock() {
7958 let (tx, rx) = mpsc::channel::<()>();
7966 let tx = Arc::new(std::sync::Mutex::new(tx));
7967 let tx2 = Arc::clone(&tx);
7968
7969 let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7970 crate::Source::from_iter(0_i32..2)
7971 .flat_map_merge(2, move |x| {
7972 let sig = Arc::clone(&tx2);
7973 if x == 0 {
7974 crate::Source::from_iter(0_i32..40)
7976 } else {
7977 test_source_with_inline_micro_hint(
7980 move || {
7981 let sig = Arc::clone(&sig);
7982 let mut emitted = false;
7983 Box::new(std::iter::from_fn(move || {
7984 if !emitted {
7985 emitted = true;
7986 let _ = sig.lock().unwrap().send(());
7987 Some(Ok(999_i32))
7988 } else {
7989 None
7990 }
7991 }))
7992 },
7993 1,
7994 )
7995 }
7996 })
7997 .run_collect()
7998 .unwrap()
7999 });
8000
8001 let signal = rx.recv_timeout(std::time::Duration::from_secs(5));
8003 assert!(signal.is_ok(), "inline next() never ran");
8004 assert!(results.contains(&999));
8005 assert_eq!(results.len(), 41);
8006 }
8007
8008 #[test]
8011 fn inline_auto_matches_readyring() {
8012 let make = || {
8013 run_sorted(
8014 crate::Source::from_iter(0_i32..20)
8015 .flat_map_merge(4, |x| crate::Source::from_iter([x, x + 1, x + 2, x + 3])),
8016 )
8017 };
8018 let auto = with_substream_mode(SubstreamExecutorMode::Auto, make);
8019 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
8020 assert_eq!(auto, ring);
8021 }
8022}
8023
8024#[cfg(test)]
8026mod split_sink_fast_path_tests {
8027 use super::*;
8028
8029 fn run_split_fold(
8031 items: Vec<u64>,
8032 split_mode: SplitMode,
8033 executor_mode: SubstreamExecutorMode,
8034 ) -> Vec<u64> {
8035 with_substream_mode(executor_mode, || {
8036 let source = match split_mode {
8037 SplitMode::When => crate::Source::from_iter(items).split_when(|x| x % 10 == 0),
8038 SplitMode::After => crate::Source::from_iter(items).split_after(|x| x % 10 == 0),
8039 };
8040 source
8041 .run_with(crate::Sink::fold(
8042 Vec::new(),
8043 |mut acc, seg: crate::Source<u64>| {
8044 let sum = seg
8045 .run_with(crate::Sink::fold(0u64, |a, x| a + x))
8046 .unwrap()
8047 .wait()
8048 .unwrap();
8049 acc.push(sum);
8050 acc
8051 },
8052 ))
8053 .unwrap()
8054 .wait()
8055 .unwrap()
8056 })
8057 }
8058
8059 fn run_split_collect_segments(
8061 items: Vec<u64>,
8062 split_mode: SplitMode,
8063 executor_mode: SubstreamExecutorMode,
8064 ) -> Vec<Vec<u64>> {
8065 with_substream_mode(executor_mode, || {
8066 let source = match split_mode {
8067 SplitMode::When => crate::Source::from_iter(items).split_when(move |x| x % 10 == 0),
8068 SplitMode::After => {
8069 crate::Source::from_iter(items).split_after(move |x| x % 10 == 0)
8070 }
8071 };
8072 source
8073 .run_with(crate::Sink::fold(
8074 Vec::new(),
8075 |mut acc, seg: crate::Source<u64>| {
8076 let v = seg
8077 .run_with(crate::Sink::collect())
8078 .unwrap()
8079 .wait()
8080 .unwrap();
8081 acc.push(v);
8082 acc
8083 },
8084 ))
8085 .unwrap()
8086 .wait()
8087 .unwrap()
8088 })
8089 }
8090
8091 #[test]
8094 fn split_fast_equivalence_empty_input() {
8095 for sm in [SplitMode::When, SplitMode::After] {
8096 let legacy = run_split_collect_segments(vec![], sm, SubstreamExecutorMode::LegacyOnly);
8097 let fast = run_split_collect_segments(vec![], sm, SubstreamExecutorMode::SplitSinkOnly);
8098 assert_eq!(legacy, fast, "empty input, mode {sm:?}");
8099 }
8100 }
8101
8102 #[test]
8103 fn split_fast_equivalence_no_boundaries() {
8104 let items: Vec<u64> = (1..=9).collect();
8105 for sm in [SplitMode::When, SplitMode::After] {
8106 let legacy =
8107 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8108 let fast =
8109 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8110 assert_eq!(legacy, fast, "no boundaries, mode {sm:?}");
8111 }
8112 }
8113
8114 #[test]
8115 fn split_fast_equivalence_first_element_boundary_when() {
8116 let items: Vec<u64> = vec![10, 1, 2, 3];
8118 let legacy = run_split_collect_segments(
8119 items.clone(),
8120 SplitMode::When,
8121 SubstreamExecutorMode::LegacyOnly,
8122 );
8123 let fast = run_split_collect_segments(
8124 items,
8125 SplitMode::When,
8126 SubstreamExecutorMode::SplitSinkOnly,
8127 );
8128 assert_eq!(legacy, fast);
8129 }
8130
8131 #[test]
8132 fn split_fast_equivalence_first_element_boundary_after() {
8133 let items: Vec<u64> = vec![10, 1, 2, 3];
8135 let legacy = run_split_collect_segments(
8136 items.clone(),
8137 SplitMode::After,
8138 SubstreamExecutorMode::LegacyOnly,
8139 );
8140 let fast = run_split_collect_segments(
8141 items,
8142 SplitMode::After,
8143 SubstreamExecutorMode::SplitSinkOnly,
8144 );
8145 assert_eq!(legacy, fast);
8146 }
8147
8148 #[test]
8149 fn split_fast_equivalence_consecutive_matches() {
8150 let items: Vec<u64> = vec![10, 20, 30, 1, 2, 40];
8151 for sm in [SplitMode::When, SplitMode::After] {
8152 let legacy =
8153 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8154 let fast =
8155 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8156 assert_eq!(legacy, fast, "consecutive matches, mode {sm:?}");
8157 }
8158 }
8159
8160 #[test]
8161 fn split_fast_equivalence_last_element_boundary() {
8162 let items: Vec<u64> = vec![1, 2, 3, 10];
8163 for sm in [SplitMode::When, SplitMode::After] {
8164 let legacy =
8165 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8166 let fast =
8167 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8168 assert_eq!(legacy, fast, "last element boundary, mode {sm:?}");
8169 }
8170 }
8171
8172 #[test]
8173 fn split_fast_equivalence_mixed() {
8174 let items: Vec<u64> = (0..50u64).collect();
8175 for sm in [SplitMode::When, SplitMode::After] {
8176 let legacy =
8177 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8178 let fast =
8179 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8180 assert_eq!(legacy, fast, "mixed 0..50, mode {sm:?}");
8181 }
8182 }
8183
8184 #[test]
8185 fn split_fast_equivalence_fold_sums() {
8186 let items: Vec<u64> = (0..50u64).collect();
8187 for sm in [SplitMode::When, SplitMode::After] {
8188 let legacy = run_split_fold(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8189 let fast = run_split_fold(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8190 assert_eq!(legacy, fast, "fold sums, mode {sm:?}");
8191 }
8192 }
8193
8194 #[test]
8195 fn split_fast_equivalence_with_collect() {
8196 let items: Vec<u64> = (0..312u64).collect();
8198 for sm in [SplitMode::When, SplitMode::After] {
8199 let legacy =
8200 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8201 let fast =
8202 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8203 assert_eq!(legacy, fast, "collect 312 items, mode {sm:?}");
8204 }
8205 }
8206
8207 #[test]
8210 fn split_fast_fold_result_equivalence() {
8211 let items: Vec<u64> = (0..50u64).collect();
8212 let run = |executor_mode| {
8213 with_substream_mode(executor_mode, || {
8214 crate::Source::from_iter(items.clone())
8215 .split_when(move |x| x % 10 == 0)
8216 .run_with(crate::Sink::fold(
8217 Vec::new(),
8218 |mut acc, seg: crate::Source<u64>| {
8219 let sum = seg
8220 .run_with(crate::Sink::fold_result(0u64, |a, x| Ok(a + x)))
8221 .unwrap()
8222 .wait()
8223 .unwrap();
8224 acc.push(sum);
8225 acc
8226 },
8227 ))
8228 .unwrap()
8229 .wait()
8230 .unwrap()
8231 })
8232 };
8233 assert_eq!(
8234 run(SubstreamExecutorMode::LegacyOnly),
8235 run(SubstreamExecutorMode::SplitSinkOnly)
8236 );
8237 }
8238
8239 #[test]
8242 fn split_fast_ignore_equivalence() {
8243 let items: Vec<u64> = (0..50u64).collect();
8244 let run = |executor_mode| {
8245 with_substream_mode(executor_mode, || {
8246 crate::Source::from_iter(items.clone())
8247 .split_when(move |x| x % 10 == 0)
8248 .run_with(crate::Sink::fold(0u64, |count, seg: crate::Source<u64>| {
8249 seg.run_with(crate::Sink::ignore()).unwrap().wait().unwrap();
8250 count + 1
8251 }))
8252 .unwrap()
8253 .wait()
8254 .unwrap()
8255 })
8256 };
8257 let legacy = run(SubstreamExecutorMode::LegacyOnly);
8258 let fast = run(SubstreamExecutorMode::SplitSinkOnly);
8259 assert_eq!(legacy, fast, "ignore segment counts must match");
8260 }
8261
8262 #[test]
8265 fn split_fast_one_shot_cannot_materialize_twice() {
8266 with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8267 let materializer = crate::Runtime::default();
8268 let result = crate::Source::from_iter(1u64..=5)
8269 .split_when(|x| x % 3 == 0)
8270 .run_with(crate::Sink::fold(0u64, |_, seg: crate::Source<u64>| {
8271 let c1 = seg.clone().run_with(crate::Sink::fold(0u64, |a, x| a + x));
8273 let c2 = seg.run_with(crate::Sink::fold(0u64, |a, x| a + x));
8275 assert!(c1.is_ok(), "first materialization should succeed");
8276 assert!(c2.is_err(), "second materialization should fail: {c2:?}");
8277 let _ = c1.unwrap().wait();
8278 0u64
8279 }));
8280 let _ = result;
8281 let _ = &materializer;
8282 });
8283 }
8284
8285 #[test]
8288 fn split_fast_predicate_panic_both_modes() {
8289 for sm in [SplitMode::When, SplitMode::After] {
8292 let result = with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8293 let source = match sm {
8294 SplitMode::When => crate::Source::from_iter(0u64..10).split_when(|x| {
8295 if *x == 5 {
8296 panic!("test panic");
8297 }
8298 x % 3 == 0
8299 }),
8300 SplitMode::After => crate::Source::from_iter(0u64..10).split_after(|x| {
8301 if *x == 5 {
8302 panic!("test panic");
8303 }
8304 x % 3 == 0
8305 }),
8306 };
8307 source
8308 .run_with(crate::Sink::fold(
8309 Vec::<u64>::new(),
8310 |mut acc, seg: crate::Source<u64>| {
8311 let completion = seg.run_with(crate::Sink::ignore());
8313 if let Ok(c) = completion {
8314 let _ = c.wait();
8315 }
8316 acc.push(0u64);
8317 acc
8318 },
8319 ))
8320 .map(|c| c.wait())
8321 });
8322 let _ = result;
8324 }
8325 }
8326
8327 #[test]
8330 fn split_fast_stress_20x() {
8331 for i in 0..20 {
8332 let items: Vec<u64> = (0..10_000u64).collect();
8333 for sm in [SplitMode::When, SplitMode::After] {
8334 let fast = run_split_collect_segments(
8335 items.clone(),
8336 sm,
8337 SubstreamExecutorMode::SplitSinkOnly,
8338 );
8339 let legacy = run_split_collect_segments(
8340 items.clone(),
8341 sm,
8342 SubstreamExecutorMode::LegacyOnly,
8343 );
8344 assert_eq!(
8345 fast.len(),
8346 legacy.len(),
8347 "stress run {i} segment count mismatch, mode {sm:?}"
8348 );
8349 assert_eq!(
8350 fast.iter().flatten().sum::<u64>(),
8351 legacy.iter().flatten().sum::<u64>(),
8352 "stress run {i} sum mismatch, mode {sm:?}"
8353 );
8354 }
8355 }
8356 }
8357
8358 #[test]
8361 fn split_fast_auto_mode_matches_fast() {
8362 let items: Vec<u64> = (0..50u64).collect();
8363 for sm in [SplitMode::When, SplitMode::After] {
8364 let auto = run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::Auto);
8365 let fast =
8366 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8367 assert_eq!(auto, fast, "auto == fast, mode {sm:?}");
8368 }
8369 }
8370
8371 #[test]
8374 fn split_fast_fallback_path_via_foreach() {
8375 use std::sync::atomic::{AtomicU64, Ordering as Ord};
8378 let total = Arc::new(AtomicU64::new(0));
8379 let total2 = Arc::clone(&total);
8380 let result = with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8381 crate::Source::from_iter(0u64..30)
8382 .split_when(|x| x % 10 == 0)
8383 .run_with(crate::Sink::fold(
8384 Vec::new(),
8385 move |mut acc, seg: crate::Source<u64>| {
8386 let t = Arc::clone(&total2);
8387 seg.run_with(crate::Sink::foreach(move |x| {
8390 t.fetch_add(x, Ord::SeqCst);
8391 }))
8392 .unwrap()
8393 .wait()
8394 .unwrap();
8395 acc.push(1u64);
8396 acc
8397 },
8398 ))
8399 .unwrap()
8400 .wait()
8401 .unwrap()
8402 });
8403 assert_eq!(result.len(), 3, "should have 3 segments");
8404 assert_eq!(
8406 total.load(std::sync::atomic::Ordering::SeqCst),
8407 (0..30u64).sum::<u64>()
8408 );
8409 }
8410
8411 #[test]
8414 fn split_fast_liveness_segment_count_when() {
8415 let items: Vec<u64> = (0..30u64).collect();
8416 let fast = run_split_collect_segments(
8417 items.clone(),
8418 SplitMode::When,
8419 SubstreamExecutorMode::SplitSinkOnly,
8420 );
8421 let legacy =
8422 run_split_collect_segments(items, SplitMode::When, SubstreamExecutorMode::LegacyOnly);
8423 assert_eq!(fast.len(), legacy.len());
8424 }
8425
8426 #[test]
8427 fn split_fast_liveness_segment_count_after() {
8428 let items: Vec<u64> = (0..30u64).collect();
8429 let fast = run_split_collect_segments(
8430 items.clone(),
8431 SplitMode::After,
8432 SubstreamExecutorMode::SplitSinkOnly,
8433 );
8434 let legacy =
8435 run_split_collect_segments(items, SplitMode::After, SubstreamExecutorMode::LegacyOnly);
8436 assert_eq!(fast.len(), legacy.len());
8437 }
8438
8439 #[test]
8446 fn split_fast_bounded_memory_rendezvous() {
8447 use std::sync::{
8448 atomic::{AtomicBool, AtomicUsize, Ordering},
8449 mpsc,
8450 };
8451 use std::time::Duration;
8452
8453 const CAPACITY: usize = LIVE_SUBSTREAM_CAPACITY;
8454 const BATCH: usize = LIVE_SUBSTREAM_BATCH;
8455 const TOTAL: usize = CAPACITY * 2;
8456 const MAX_IN_FLIGHT: usize = CAPACITY + BATCH;
8459
8460 let produced = Arc::new(AtomicUsize::new(0));
8461 let consumed = Arc::new(AtomicUsize::new(0));
8462 let bound_violated = Arc::new(AtomicBool::new(false));
8463
8464 let (item_tx, item_rx) = mpsc::channel::<u64>();
8466
8467 let prod_for_factory = Arc::clone(&produced);
8468 let prod_for_fold = Arc::clone(&produced);
8469 let cons_for_fold = Arc::clone(&consumed);
8470 let bv_for_fold = Arc::clone(&bound_violated);
8471
8472 let join = std::thread::spawn(move || {
8473 with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8474 crate::Source::from_factory(move || {
8475 let prod = Arc::clone(&prod_for_factory);
8476 let mut i = 0u64;
8477 Box::new(std::iter::from_fn(move || {
8478 if i as usize >= TOTAL {
8479 return None;
8480 }
8481 prod.fetch_add(1, Ordering::SeqCst);
8482 let val = i;
8483 i += 1;
8484 Some(Ok(val))
8485 }))
8486 })
8487 .split_when(|_| false)
8489 .run_with(crate::Sink::fold(
8490 0usize,
8491 move |count, seg: crate::Source<u64>| {
8492 let cons = Arc::clone(&cons_for_fold);
8493 let bv = Arc::clone(&bv_for_fold);
8494 let prod = Arc::clone(&prod_for_fold);
8495 let itx = item_tx.clone();
8497 seg.run_with(crate::Sink::foreach(move |x: u64| {
8499 let c = cons.fetch_add(1, Ordering::SeqCst) + 1;
8500 let p = prod.load(Ordering::SeqCst);
8501 if p > c + MAX_IN_FLIGHT {
8502 bv.store(true, Ordering::SeqCst);
8503 }
8504 let _ = itx.send(x);
8505 }))
8506 .unwrap()
8507 .wait()
8508 .unwrap();
8509 count + 1
8510 },
8511 ))
8512 .unwrap()
8513 .wait()
8514 .unwrap()
8515 })
8516 });
8517
8518 let mut received = Vec::with_capacity(TOTAL);
8520 for i in 0..TOTAL {
8521 match item_rx.recv_timeout(Duration::from_secs(10)) {
8522 Ok(item) => received.push(item),
8523 Err(mpsc::RecvTimeoutError::Timeout) => {
8524 panic!("deadlock: no item {i} within 10 s")
8525 }
8526 Err(mpsc::RecvTimeoutError::Disconnected) => {
8527 panic!("stream ended early at item {i}")
8528 }
8529 }
8530 }
8531
8532 let seg_count = join.join().expect("stream thread panicked");
8533
8534 assert!(
8536 !bound_violated.load(Ordering::SeqCst),
8537 "bound violated: producer ran >MAX_IN_FLIGHT={MAX_IN_FLIGHT} ahead of consumer"
8538 );
8539
8540 assert_eq!(seg_count, 1, "expected exactly 1 segment");
8542 assert_eq!(received.len(), TOTAL, "not all items received");
8543 let expected: Vec<u64> = (0..TOTAL as u64).collect();
8544 assert_eq!(received, expected, "items not in correct order");
8545 }
8546}