1use super::*;
2use crate::Attributes;
3use crate::context::FlowWithContext;
4use crate::stream::error::{decide_supervision, panic_stream_error};
5use futures::{FutureExt, task::noop_waker};
6use std::any::Any;
7use std::task::Context;
8use std::{
9 collections::{HashMap, HashSet},
10 thread,
11};
12
13pub(super) enum FlowTransform<In, Out> {
14 Pure(PureTransform<In, Out>),
15 Runtime(RuntimeTransform<In, Out>),
16}
17
18pub struct Flow<In, Out, Mat = NotUsed> {
19 pub(super) transform: FlowTransform<In, Out>,
20 pub(super) materialize: Arc<dyn Fn() -> StreamResult<Mat> + Send + Sync>,
21 pub(super) hints: FlowHints,
22 pub(super) attributes: Attributes,
23}
24
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub(super) enum GroupByBatchMode {
27 Immediate,
28 FiniteEagerNoRecreate,
29}
30
31#[derive(Clone)]
32pub struct BidiFlow<I1, O1, I2, O2> {
33 top: Flow<I1, O1, NotUsed>,
34 bottom: Flow<I2, O2, NotUsed>,
35 attributes: Attributes,
36}
37
38impl<In, Out> Clone for FlowTransform<In, Out> {
39 fn clone(&self) -> Self {
40 match self {
41 Self::Pure(transform) => Self::Pure(Arc::clone(transform)),
42 Self::Runtime(transform) => Self::Runtime(Arc::clone(transform)),
43 }
44 }
45}
46
47impl<In, Out, Mat> Clone for Flow<In, Out, Mat> {
48 fn clone(&self) -> Self {
49 Self {
50 transform: self.transform.clone(),
51 materialize: Arc::clone(&self.materialize),
52 hints: self.hints,
53 attributes: self.attributes.clone(),
54 }
55 }
56}
57
58fn call_supervised<T, F>(context: &str, f: F) -> StreamResult<T>
59where
60 F: FnOnce() -> StreamResult<T>,
61{
62 catch_unwind(AssertUnwindSafe(f)).unwrap_or_else(|_| Err(panic_stream_error(context)))
63}
64
65impl<T: Send + 'static> Flow<T, T, NotUsed> {
66 #[must_use]
67 pub fn identity() -> Self {
68 Self::from_preserving_transform(|input| input)
69 }
70}
71
72impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out, NotUsed> {
73 pub(crate) fn from_transform<F>(transform: F) -> Self
74 where
75 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
76 {
77 Self::from_parts_with_hints(transform, || Ok(NotUsed), FlowHints::default())
78 }
79
80 pub(crate) fn from_preserving_transform<F>(transform: F) -> Self
81 where
82 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
83 {
84 Self::from_parts_with_hints(
85 transform,
86 || Ok(NotUsed),
87 FlowHints::PRESERVES_INLINE_HEAD_TERMINAL,
88 )
89 }
90
91 pub(crate) fn from_runtime_transform<F>(transform: F) -> Self
92 where
93 F: Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync + 'static,
94 {
95 Self::from_runtime_transform_with_hints(transform, FlowHints::default())
96 }
97
98 fn from_runtime_transform_with_hints<F>(transform: F, hints: FlowHints) -> Self
99 where
100 F: Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync + 'static,
101 {
102 Self {
103 transform: FlowTransform::Runtime(Arc::new(transform)),
104 materialize: Arc::new(|| Ok(NotUsed)),
105 hints,
106 attributes: Attributes::default(),
107 }
108 }
109
110 #[must_use]
111 pub fn from_sink_and_source<InMat, OutMat>(
112 sink: Sink<In, InMat>,
113 source: Source<Out, OutMat>,
114 ) -> Self
115 where
116 InMat: Send + 'static,
117 OutMat: Send + 'static,
118 {
119 Self::from_runtime_transform(move |input, materializer| {
120 let sink_keepalive = Arc::new(MaterializedKeepalive::default());
121 let sink_input = Box::new(InputKeepaliveStream {
122 inner: input,
123 keepalive: Arc::clone(&sink_keepalive),
124 peer_keepalive: None,
125 });
126 let sink_mat = sink.clone().run(sink_input, materializer)?;
127 sink_keepalive.store(Box::new(sink_mat));
128 let (output, source_mat) = Arc::clone(&source.factory).create(materializer)?;
129 let source_keepalive = Arc::new(MaterializedKeepalive::default());
130 source_keepalive.store(Box::new(source_mat));
131 Ok(Box::new(CoupledStream {
132 inner: output,
133 source_keepalive,
134 sink_keepalive: None,
135 coupled: false,
136 }))
137 })
138 }
139
140 #[must_use]
141 pub fn from_sink_and_source_coupled<InMat, OutMat>(
142 sink: Sink<In, InMat>,
143 source: Source<Out, OutMat>,
144 ) -> Self
145 where
146 InMat: Send + 'static,
147 OutMat: Send + 'static,
148 {
149 Self::from_runtime_transform(move |input, materializer| {
150 let source_keepalive = Arc::new(MaterializedKeepalive::default());
151 let sink_keepalive = Arc::new(MaterializedKeepalive::default());
152 let sink_input = Box::new(InputKeepaliveStream {
153 inner: input,
154 keepalive: Arc::clone(&sink_keepalive),
155 peer_keepalive: Some(Arc::clone(&source_keepalive)),
156 });
157 let sink_mat = sink.clone().run(sink_input, materializer)?;
158 sink_keepalive.store(Box::new(sink_mat));
159 let (output, source_mat) = Arc::clone(&source.factory).create(materializer)?;
160 source_keepalive.store(Box::new(source_mat));
161 Ok(Box::new(CoupledStream {
162 inner: output,
163 source_keepalive,
164 sink_keepalive: Some(sink_keepalive),
165 coupled: true,
166 }))
167 })
168 }
169 #[must_use]
170 pub fn future_flow<InnerMat, F, Fut>(future: F) -> Flow<In, Out, StreamCompletion<InnerMat>>
171 where
172 InnerMat: Send + 'static,
173 F: Fn() -> Fut + Send + Sync + 'static,
174 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
175 {
176 let future = Arc::new(future);
177 Flow::from_runtime_materialized_factory(move || {
178 let (sender, receiver) = oneshot::channel();
179 let sender = Arc::new(Mutex::new(Some(sender)));
180 let future = Arc::clone(&future);
181 let transform: RuntimeTransform<In, Out> =
182 Arc::new(move |input, materializer: &Materializer| {
183 let mat_sender = sender
184 .lock()
185 .expect("future_flow materialized sender poisoned")
186 .take()
187 .ok_or_else(|| {
188 StreamError::Failed("future_flow transform already materialized".into())
189 })?;
190 Ok(Box::new(FutureFlowStream {
191 future: Arc::clone(&future),
192 materializer: materializer
193 .with_name_prefix(materializer.name_prefix().to_owned()),
194 input: Some(input),
195 current: None,
196 mat_sender: Some(mat_sender),
197 initialized: false,
198 terminated: false,
199 _marker: PhantomData,
200 }) as BoxStream<Out>)
201 });
202 (transform, StreamCompletion::from_receiver(receiver, None))
203 })
204 }
205
206 #[must_use]
207 pub fn lazy_flow<InnerMat, F>(create: F) -> Flow<In, Out, StreamCompletion<InnerMat>>
208 where
209 InnerMat: Send + 'static,
210 F: Fn() -> Flow<In, Out, InnerMat> + Send + Sync + 'static,
211 {
212 let create = Arc::new(create);
213 Self::lazy_future_flow(move || {
214 let create = Arc::clone(&create);
215 async move { catch_unwind_failed("lazy_flow factory", || create()) }
216 })
217 }
218
219 #[must_use]
220 pub fn lazy_future_flow<InnerMat, F, Fut>(
221 create: F,
222 ) -> Flow<In, Out, StreamCompletion<InnerMat>>
223 where
224 InnerMat: Send + 'static,
225 F: Fn() -> Fut + Send + Sync + 'static,
226 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
227 {
228 let create = Arc::new(create);
229 Flow::from_runtime_materialized_factory(move || {
230 let (sender, receiver) = oneshot::channel();
231 let sender = Arc::new(Mutex::new(Some(sender)));
232 let create = Arc::clone(&create);
233 let transform: RuntimeTransform<In, Out> =
234 Arc::new(move |input, materializer: &Materializer| {
235 let mat_sender = sender
236 .lock()
237 .expect("lazy_future_flow materialized sender poisoned")
238 .take()
239 .ok_or_else(|| {
240 StreamError::Failed(
241 "lazy_future_flow transform already materialized".into(),
242 )
243 })?;
244 Ok(Box::new(LazyFutureFlowStream {
245 create: Arc::clone(&create),
246 materializer: materializer
247 .with_name_prefix(materializer.name_prefix().to_owned()),
248 input: Some(input),
249 current: None,
250 mat_sender: Some(mat_sender),
251 initialized: false,
252 terminated: false,
253 _marker: PhantomData,
254 }) as BoxStream<Out>)
255 });
256 (transform, StreamCompletion::from_receiver(receiver, None))
257 })
258 }
259}
260
261#[derive(Default)]
262struct MaterializedKeepalive {
263 released: AtomicBool,
264 value: Mutex<Option<Box<dyn Any + Send>>>,
265}
266
267impl MaterializedKeepalive {
268 fn store(&self, value: Box<dyn Any + Send>) {
269 let mut slot = self.value.lock().expect("materialized keepalive poisoned");
270 if !self.released.load(Ordering::SeqCst) {
271 *slot = Some(value);
272 return;
273 }
274 drop(slot);
279 release_materialized_value(value);
280 }
281
282 fn release(&self) {
283 self.released.store(true, Ordering::SeqCst);
284 if let Some(value) = self
285 .value
286 .lock()
287 .expect("materialized keepalive poisoned")
288 .take()
289 {
290 release_materialized_value(value);
291 }
292 }
293
294 fn is_released(&self) -> bool {
295 self.released.load(Ordering::SeqCst)
296 }
297}
298
299fn release_materialized_value(value: Box<dyn Any + Send>) {
300 match value.downcast::<Cancellable>() {
301 Ok(cancellable) => {
302 cancellable.cancel();
303 }
304 Err(value) => drop(value),
305 }
306}
307
308struct InputKeepaliveStream<In> {
309 inner: BoxStream<In>,
310 keepalive: Arc<MaterializedKeepalive>,
311 peer_keepalive: Option<Arc<MaterializedKeepalive>>,
312}
313
314impl<In> Iterator for InputKeepaliveStream<In> {
315 type Item = StreamResult<In>;
316
317 fn next(&mut self) -> Option<Self::Item> {
318 self.inner.next()
319 }
320}
321
322impl<In> Drop for InputKeepaliveStream<In> {
323 fn drop(&mut self) {
324 self.keepalive.release();
325 if let Some(peer_keepalive) = &self.peer_keepalive {
326 peer_keepalive.release();
327 }
328 }
329}
330
331struct CoupledStream<Out> {
332 inner: BoxStream<Out>,
333 source_keepalive: Arc<MaterializedKeepalive>,
334 sink_keepalive: Option<Arc<MaterializedKeepalive>>,
335 coupled: bool,
336}
337
338impl<Out> Iterator for CoupledStream<Out> {
339 type Item = StreamResult<Out>;
340
341 fn next(&mut self) -> Option<Self::Item> {
342 if self.coupled && self.source_keepalive.is_released() {
343 return None;
344 }
345 let next = self.inner.next();
346 if next.is_none() || next.as_ref().is_some_and(|item| item.is_err()) {
347 self.source_keepalive.release();
348 if self.coupled
349 && let Some(sink_keepalive) = &self.sink_keepalive
350 {
351 sink_keepalive.release();
352 }
353 }
354 next
355 }
356}
357
358impl<Out> Drop for CoupledStream<Out> {
359 fn drop(&mut self) {
360 self.source_keepalive.release();
361 if self.coupled
362 && let Some(sink_keepalive) = &self.sink_keepalive
363 {
364 sink_keepalive.release();
365 }
366 }
367}
368
369impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
370 pub fn as_flow_with_context<U, CtxIn, CtxOut, Collapse, Extract>(
371 self,
372 collapse_context: Collapse,
373 extract_context: Extract,
374 ) -> FlowWithContext<U, CtxIn, Out, CtxOut, Mat>
375 where
376 U: Send + 'static,
377 CtxIn: Send + 'static,
378 CtxOut: Send + 'static,
379 Collapse: Fn(U, CtxIn) -> In + Send + Sync + 'static,
380 Extract: Fn(&Out) -> CtxOut + Send + Sync + 'static,
381 {
382 FlowWithContext::from_flow(
383 Flow::identity()
384 .map(move |(value, context)| collapse_context(value, context))
385 .via_mat(self, |_, flow_mat| flow_mat)
386 .map(move |value| {
387 let context = extract_context(&value);
388 (value, context)
389 }),
390 )
391 }
392
393 pub(crate) fn from_parts<F, M>(transform: F, materialize: M) -> Self
394 where
395 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
396 M: Fn() -> StreamResult<Mat> + Send + Sync + 'static,
397 {
398 Self::from_parts_with_hints(transform, materialize, FlowHints::default())
399 }
400
401 pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
402 where
403 F: Fn() -> (PureTransform<In, Out>, Mat) + Send + Sync + 'static,
404 {
405 struct PendingSlot<In, Out, Mat> {
406 transform: Option<PureTransform<In, Out>>,
407 mat: Option<Mat>,
408 transform_taken: bool,
409 mat_taken: bool,
410 }
411
412 let pending = Arc::new(Mutex::new(HashMap::<
413 thread::ThreadId,
414 Vec<PendingSlot<In, Out, Mat>>,
415 >::new()));
416 let factory = Arc::new(factory);
417
418 let transform = {
419 let pending = Arc::clone(&pending);
420 let factory = Arc::clone(&factory);
421 move |input| {
422 let transform = {
423 let mut pending = pending.lock().expect("flow materialized factory poisoned");
424 let thread_id = thread::current().id();
425 let slots = pending.entry(thread_id).or_default();
426 let index = slots
427 .iter()
428 .position(|slot| !slot.transform_taken && slot.mat_taken)
429 .unwrap_or_else(|| {
430 let (transform, mat) = factory();
431 slots.push(PendingSlot {
432 transform: Some(transform),
433 mat: Some(mat),
434 transform_taken: false,
435 mat_taken: false,
436 });
437 slots.len() - 1
438 });
439 let slot = slots
440 .get_mut(index)
441 .expect("pending flow materialization slot exists");
442 slot.transform_taken = true;
443 let transform = slot
444 .transform
445 .take()
446 .expect("pending flow transform present");
447 if slot.transform_taken && slot.mat_taken {
448 slots.remove(index);
449 }
450 if slots.is_empty() {
451 pending.remove(&thread_id);
452 }
453 transform
454 };
455 transform(input)
456 }
457 };
458
459 let materialize = {
460 let pending = Arc::clone(&pending);
461 let factory = Arc::clone(&factory);
462 move || {
463 let mat = {
464 let mut pending = pending.lock().expect("flow materialized factory poisoned");
465 let thread_id = thread::current().id();
466 let slots = pending.entry(thread_id).or_default();
467 let index = slots
468 .iter()
469 .position(|slot| !slot.mat_taken && slot.transform_taken)
470 .unwrap_or_else(|| {
471 let (transform, mat) = factory();
472 slots.push(PendingSlot {
473 transform: Some(transform),
474 mat: Some(mat),
475 transform_taken: false,
476 mat_taken: false,
477 });
478 slots.len() - 1
479 });
480 let slot = slots
481 .get_mut(index)
482 .expect("pending flow materialization slot exists");
483 slot.mat_taken = true;
484 let mat = slot
485 .mat
486 .take()
487 .expect("pending flow materialized value present");
488 if slot.transform_taken && slot.mat_taken {
489 slots.remove(index);
490 }
491 if slots.is_empty() {
492 pending.remove(&thread_id);
493 }
494 mat
495 };
496 Ok(mat)
497 }
498 };
499
500 Self::from_parts_with_hints(transform, materialize, FlowHints::default())
501 }
502
503 pub(crate) fn from_runtime_materialized_factory<F>(factory: F) -> Self
504 where
505 F: Fn() -> (RuntimeTransform<In, Out>, Mat) + Send + Sync + 'static,
506 {
507 struct PendingSlot<In, Out, Mat> {
508 transform: Option<RuntimeTransform<In, Out>>,
509 mat: Option<Mat>,
510 transform_taken: bool,
511 mat_taken: bool,
512 }
513
514 let pending = Arc::new(Mutex::new(HashMap::<
515 thread::ThreadId,
516 Vec<PendingSlot<In, Out, Mat>>,
517 >::new()));
518 let factory = Arc::new(factory);
519
520 let transform = {
521 let pending = Arc::clone(&pending);
522 let factory = Arc::clone(&factory);
523 move |input, materializer: &Materializer| {
524 let thread_id = thread::current().id();
525 let transform = {
526 let mut pending = pending.lock().expect("flow materialized factory poisoned");
527 let slots = pending.entry(thread_id).or_default();
528 if let Some(index) = slots
529 .iter()
530 .position(|slot| !slot.transform_taken && slot.mat_taken)
531 {
532 let slot = slots
533 .get_mut(index)
534 .expect("pending flow materialization slot exists");
535 slot.transform_taken = true;
536 let transform = slot
537 .transform
538 .take()
539 .expect("pending flow transform present");
540 if slot.transform_taken && slot.mat_taken {
541 slots.remove(index);
542 }
543 if slots.is_empty() {
544 pending.remove(&thread_id);
545 }
546 Some(transform)
547 } else {
548 None
549 }
550 };
551
552 let transform = match transform {
553 Some(transform) => transform,
554 None => {
555 let (transform, mat) = factory();
556 let mut pending =
557 pending.lock().expect("flow materialized factory poisoned");
558 let slots = pending.entry(thread_id).or_default();
559 slots.push(PendingSlot {
560 transform: None,
561 mat: Some(mat),
562 transform_taken: true,
563 mat_taken: false,
564 });
565 transform
566 }
567 };
568
569 transform(input, materializer)
570 }
571 };
572
573 let materialize = {
574 let pending = Arc::clone(&pending);
575 let factory = Arc::clone(&factory);
576 move || {
577 let thread_id = thread::current().id();
578 let mat = {
579 let mut pending = pending.lock().expect("flow materialized factory poisoned");
580 let slots = pending.entry(thread_id).or_default();
581 if let Some(index) = slots
582 .iter()
583 .position(|slot| !slot.mat_taken && slot.transform_taken)
584 {
585 let slot = slots
586 .get_mut(index)
587 .expect("pending flow materialization slot exists");
588 slot.mat_taken = true;
589 let mat = slot
590 .mat
591 .take()
592 .expect("pending flow materialized value present");
593 if slot.transform_taken && slot.mat_taken {
594 slots.remove(index);
595 }
596 if slots.is_empty() {
597 pending.remove(&thread_id);
598 }
599 Some(mat)
600 } else {
601 None
602 }
603 };
604
605 match mat {
606 Some(mat) => Ok(mat),
607 None => {
608 let (transform, mat) = factory();
609 let mut pending =
610 pending.lock().expect("flow materialized factory poisoned");
611 let slots = pending.entry(thread_id).or_default();
612 slots.push(PendingSlot {
613 transform: Some(transform),
614 mat: None,
615 transform_taken: false,
616 mat_taken: true,
617 });
618 Ok(mat)
619 }
620 }
621 }
622 };
623
624 Flow {
625 transform: FlowTransform::Runtime(Arc::new(transform)),
626 materialize: Arc::new(materialize),
627 hints: FlowHints::default(),
628 attributes: Attributes::default(),
629 }
630 }
631
632 fn from_parts_with_hints<F, M>(transform: F, materialize: M, hints: FlowHints) -> Self
633 where
634 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
635 M: Fn() -> StreamResult<Mat> + Send + Sync + 'static,
636 {
637 Self {
638 transform: FlowTransform::Pure(Arc::new(transform)),
639 materialize: Arc::new(materialize),
640 hints,
641 attributes: Attributes::default(),
642 }
643 }
644
645 #[must_use]
646 pub fn attributes(&self) -> &Attributes {
647 &self.attributes
648 }
649
650 #[must_use]
651 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
652 self.attributes = attributes;
653 self
654 }
655
656 #[must_use]
657 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
658 self.attributes = self.attributes.and(attributes);
659 self
660 }
661
662 #[must_use]
663 pub fn named(self, name: impl Into<String>) -> Self {
664 self.add_attributes(Attributes::named(name))
665 }
666
667 #[must_use]
668 pub fn via<Next, NextMat>(self, next: Flow<Out, Next, NextMat>) -> Flow<In, Next, Mat>
669 where
670 Next: Send + 'static,
671 NextMat: Send + 'static,
672 {
673 self.via_mat(next, Keep::left)
674 }
675
676 #[must_use]
677 pub fn via_mat<Next, NextMat, Combined, F>(
678 self,
679 next: Flow<Out, Next, NextMat>,
680 combine: F,
681 ) -> Flow<In, Next, Combined>
682 where
683 Next: Send + 'static,
684 NextMat: Send + 'static,
685 Combined: Send + 'static,
686 F: Fn(Mat, NextMat) -> Combined + Send + Sync + 'static,
687 {
688 let Flow {
689 transform: first,
690 materialize: materialize_first,
691 hints: first_hints,
692 attributes: first_attributes,
693 } = self;
694 let Flow {
695 transform: second,
696 materialize: materialize_second,
697 hints: second_hints,
698 attributes: second_attributes,
699 } = next;
700 let combine = Arc::new(combine);
701 match (first, second) {
702 (FlowTransform::Pure(first), FlowTransform::Pure(second)) => {
703 let hints = first_hints.then(second_hints);
704 Flow::from_parts_with_hints(
705 move |input| second(first(input)),
706 move || {
707 let left = materialize_first()?;
708 let right = materialize_second()?;
709 Ok(combine(left, right))
710 },
711 hints,
712 )
713 .with_attributes(first_attributes.and(second_attributes))
714 }
715 (first, second) => {
716 let hints = first_hints.then(second_hints);
717 Flow {
718 transform: FlowTransform::Runtime(Arc::new(move |input, materializer| {
719 let stream = match &first {
720 FlowTransform::Pure(first) => first(input),
721 FlowTransform::Runtime(first) => first(input, materializer)?,
722 };
723 match &second {
724 FlowTransform::Pure(second) => Ok(second(stream)),
725 FlowTransform::Runtime(second) => second(stream, materializer),
726 }
727 })),
728 materialize: Arc::new(move || {
729 let left = materialize_first()?;
730 let right = materialize_second()?;
731 Ok(combine(left, right))
732 }),
733 hints,
734 attributes: first_attributes.and(second_attributes),
735 }
736 }
737 }
738 }
739
740 #[must_use]
741 pub fn via_mat_with<Next, NextMat, Combined, F>(
742 self,
743 next: Flow<Out, Next, NextMat>,
744 combine: F,
745 ) -> Flow<In, Next, Combined>
746 where
747 Next: Send + 'static,
748 NextMat: Send + 'static,
749 Combined: Send + 'static,
750 F: Fn(Mat, NextMat) -> Combined + Send + Sync + 'static,
751 {
752 self.via_mat(next, combine)
753 }
754
755 #[must_use]
756 pub fn map<Next, F>(self, f: F) -> Flow<In, Next, Mat>
757 where
758 Next: Send + 'static,
759 F: Fn(Out) -> Next + Send + Sync + 'static,
760 {
761 let stage = Arc::new(f);
762 match &self.transform {
763 FlowTransform::Pure(_) => {
764 let Flow {
765 transform,
766 materialize,
767 hints,
768 attributes,
769 } = self;
770 let FlowTransform::Pure(transform) = transform else {
771 unreachable!("pure transform checked above");
772 };
773 Flow::from_parts_with_hints(
774 move |input| {
775 let stage = Arc::clone(&stage);
776 Box::new(transform(input).map(move |item| item.map(|item| stage(item))))
777 },
778 move || materialize(),
779 hints,
780 )
781 .with_attributes(attributes)
782 }
783 FlowTransform::Runtime(_) => self.via(Flow::from_transform(move |input| {
784 let stage = Arc::clone(&stage);
785 Box::new(input.map(move |item| item.map(|item| stage(item))))
786 })),
787 }
788 }
789
790 #[must_use]
793 pub fn map_result<Next, F>(self, f: F) -> Flow<In, Next, Mat>
794 where
795 Next: Send + 'static,
796 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
797 {
798 let stage = Arc::new(f);
799 self.via(Flow::from_transform(move |input| {
800 let stage = Arc::clone(&stage);
801 Box::new(input.map(move |item| item.and_then(|item| stage(item))))
802 }))
803 }
804
805 #[must_use]
810 pub fn map_result_with_supervision<Next, F>(
811 self,
812 f: F,
813 decider: SupervisionDecider,
814 ) -> Flow<In, Next, Mat>
815 where
816 Next: Send + 'static,
817 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
818 {
819 let stage = Arc::new(f);
820 self.via(Flow::from_transform(move |input| {
821 let stage = Arc::clone(&stage);
822 let decider = Arc::clone(&decider);
823 Box::new(input.filter_map(move |item| match item {
824 Ok(item) => match call_supervised("map_result callback", || stage(item)) {
825 Ok(next) => Some(Ok(next)),
826 Err(error) => match decide_supervision(&decider, &error) {
827 SupervisionDirective::Stop => Some(Err(error)),
828 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
829 },
830 },
831 Err(error) => Some(Err(error)),
832 }))
833 }))
834 }
835
836 #[must_use]
837 pub fn filter<F>(self, predicate: F) -> Flow<In, Out, Mat>
838 where
839 F: Fn(&Out) -> bool + Send + Sync + 'static,
840 {
841 let predicate = Arc::new(predicate);
842 self.via(Flow::from_preserving_transform(move |input| {
843 let predicate = Arc::clone(&predicate);
844 Box::new(input.filter_map(move |item| match item {
845 Ok(item) if predicate(&item) => Some(Ok(item)),
846 Ok(_) => None,
847 Err(error) => Some(Err(error)),
848 }))
849 }))
850 }
851
852 #[must_use]
853 pub fn filter_result<F>(self, predicate: F) -> Flow<In, Out, Mat>
854 where
855 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
856 {
857 let predicate = Arc::new(predicate);
858 self.via(Flow::from_transform(move |input| {
859 let predicate = Arc::clone(&predicate);
860 Box::new(input.filter_map(move |item| match item {
861 Ok(item) => match predicate(&item) {
862 Ok(true) => Some(Ok(item)),
863 Ok(false) => None,
864 Err(error) => Some(Err(error)),
865 },
866 Err(error) => Some(Err(error)),
867 }))
868 }))
869 }
870
871 #[must_use]
872 pub fn filter_result_with_supervision<F>(
873 self,
874 predicate: F,
875 decider: SupervisionDecider,
876 ) -> Flow<In, Out, Mat>
877 where
878 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
879 {
880 let predicate = Arc::new(predicate);
881 self.via(Flow::from_transform(move |input| {
882 let predicate = Arc::clone(&predicate);
883 let decider = Arc::clone(&decider);
884 Box::new(input.filter_map(move |item| match item {
885 Ok(item) => match call_supervised("filter_result callback", || predicate(&item)) {
886 Ok(true) => Some(Ok(item)),
887 Ok(false) => None,
888 Err(error) => match decide_supervision(&decider, &error) {
889 SupervisionDirective::Stop => Some(Err(error)),
890 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
891 },
892 },
893 Err(error) => Some(Err(error)),
894 }))
895 }))
896 }
897
898 #[must_use]
899 pub fn filter_not<F>(self, predicate: F) -> Flow<In, Out, Mat>
900 where
901 F: Fn(&Out) -> bool + Send + Sync + 'static,
902 {
903 let predicate = Arc::new(predicate);
904 self.filter(move |item| !predicate(item))
905 }
906
907 #[must_use]
908 pub fn filter_map<Next, F>(self, f: F) -> Flow<In, Next, Mat>
909 where
910 Next: Send + 'static,
911 F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
912 {
913 let stage = Arc::new(f);
914 self.via(Flow::from_transform(move |input| {
915 let stage = Arc::clone(&stage);
916 Box::new(input.filter_map(move |item| match item {
917 Ok(item) => stage(item).map(Ok),
918 Err(error) => Some(Err(error)),
919 }))
920 }))
921 }
922
923 #[must_use]
924 pub fn filter_map_result<Next, F>(self, f: F) -> Flow<In, Next, Mat>
925 where
926 Next: Send + 'static,
927 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
928 {
929 let stage = Arc::new(f);
930 self.via(Flow::from_transform(move |input| {
931 let stage = Arc::clone(&stage);
932 Box::new(input.filter_map(move |item| match item {
933 Ok(item) => match stage(item) {
934 Ok(Some(next)) => Some(Ok(next)),
935 Ok(None) => None,
936 Err(error) => Some(Err(error)),
937 },
938 Err(error) => Some(Err(error)),
939 }))
940 }))
941 }
942
943 #[must_use]
944 pub fn filter_map_result_with_supervision<Next, F>(
945 self,
946 f: F,
947 decider: SupervisionDecider,
948 ) -> Flow<In, Next, Mat>
949 where
950 Next: Send + 'static,
951 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
952 {
953 let stage = Arc::new(f);
954 self.via(Flow::from_transform(move |input| {
955 let stage = Arc::clone(&stage);
956 let decider = Arc::clone(&decider);
957 Box::new(input.filter_map(move |item| match item {
958 Ok(item) => match call_supervised("filter_map_result callback", || stage(item)) {
959 Ok(Some(next)) => Some(Ok(next)),
960 Ok(None) => None,
961 Err(error) => match decide_supervision(&decider, &error) {
962 SupervisionDirective::Stop => Some(Err(error)),
963 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
964 },
965 },
966 Err(error) => Some(Err(error)),
967 }))
968 }))
969 }
970
971 #[must_use]
972 pub fn map_concat<Next, F, I>(self, f: F) -> Flow<In, Next, Mat>
973 where
974 Next: Send + 'static,
975 F: Fn(Out) -> I + Send + Sync + 'static,
976 I: IntoIterator<Item = Next>,
977 I::IntoIter: Send + 'static,
978 {
979 let stage = Arc::new(f);
980 self.via(Flow::from_transform(move |mut input| {
981 let stage = Arc::clone(&stage);
982 let mut current = None::<I::IntoIter>;
983 let mut done = false;
984 Box::new(std::iter::from_fn(move || {
985 loop {
986 if let Some(iter) = &mut current {
987 if let Some(item) = iter.next() {
988 return Some(Ok(item));
989 }
990 current = None;
991 }
992
993 if done {
994 return None;
995 }
996
997 match input.next()? {
998 Ok(item) => current = Some(stage(item).into_iter()),
999 Err(error) => {
1000 done = true;
1001 return Some(Err(error));
1002 }
1003 }
1004 }
1005 }))
1006 }))
1007 }
1008
1009 #[must_use]
1010 pub fn map_concat_result<Next, F, I>(self, f: F) -> Flow<In, Next, Mat>
1011 where
1012 Next: Send + 'static,
1013 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
1014 I: IntoIterator<Item = Next>,
1015 I::IntoIter: Send + 'static,
1016 {
1017 let stage = Arc::new(f);
1018 self.via(Flow::from_transform(move |mut input| {
1019 let stage = Arc::clone(&stage);
1020 let mut current = None::<I::IntoIter>;
1021 let mut done = false;
1022 Box::new(std::iter::from_fn(move || {
1023 loop {
1024 if let Some(iter) = &mut current {
1025 if let Some(item) = iter.next() {
1026 return Some(Ok(item));
1027 }
1028 current = None;
1029 }
1030
1031 if done {
1032 return None;
1033 }
1034
1035 match input.next()? {
1036 Ok(item) => match stage(item) {
1037 Ok(items) => current = Some(items.into_iter()),
1038 Err(error) => {
1039 done = true;
1040 return Some(Err(error));
1041 }
1042 },
1043 Err(error) => {
1044 done = true;
1045 return Some(Err(error));
1046 }
1047 }
1048 }
1049 }))
1050 }))
1051 }
1052
1053 #[must_use]
1054 pub fn map_concat_result_with_supervision<Next, F, I>(
1055 self,
1056 f: F,
1057 decider: SupervisionDecider,
1058 ) -> Flow<In, Next, Mat>
1059 where
1060 Next: Send + 'static,
1061 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
1062 I: IntoIterator<Item = Next>,
1063 I::IntoIter: Send + 'static,
1064 {
1065 let stage = Arc::new(f);
1066 self.via(Flow::from_transform(move |mut input| {
1067 let stage = Arc::clone(&stage);
1068 let decider = Arc::clone(&decider);
1069 let mut current = None::<I::IntoIter>;
1070 let mut done = false;
1071 Box::new(std::iter::from_fn(move || {
1072 loop {
1073 if let Some(iter) = &mut current {
1074 if let Some(item) = iter.next() {
1075 return Some(Ok(item));
1076 }
1077 current = None;
1078 }
1079
1080 if done {
1081 return None;
1082 }
1083
1084 match input.next()? {
1085 Ok(item) => {
1086 match call_supervised("map_concat_result callback", || stage(item)) {
1087 Ok(items) => current = Some(items.into_iter()),
1088 Err(error) => match decide_supervision(&decider, &error) {
1089 SupervisionDirective::Stop => {
1090 done = true;
1091 return Some(Err(error));
1092 }
1093 SupervisionDirective::Resume
1094 | SupervisionDirective::Restart => {}
1095 },
1096 }
1097 }
1098 Err(error) => {
1099 done = true;
1100 return Some(Err(error));
1101 }
1102 }
1103 }
1104 }))
1105 }))
1106 }
1107
1108 #[must_use]
1109 pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Flow<In, Next, Mat>
1110 where
1111 State: Clone + Send + Sync + 'static,
1112 Next: Send + 'static,
1113 F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
1114 {
1115 let stage = Arc::new(f);
1116 self.via(Flow::from_transform(move |input| {
1117 let stage = Arc::clone(&stage);
1118 let mut state = seed.clone();
1119 Box::new(input.map(move |item| item.map(|item| stage(&mut state, item))))
1120 }))
1121 }
1122
1123 #[must_use]
1124 pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Flow<In, Next, Mat>
1125 where
1126 State: Clone + Send + Sync + 'static,
1127 Next: Send + 'static,
1128 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
1129 {
1130 let stage = Arc::new(f);
1131 self.via(Flow::from_transform(move |input| {
1132 let stage = Arc::clone(&stage);
1133 let mut state = seed.clone();
1134 Box::new(input.map(move |item| match item {
1135 Ok(item) => stage(&mut state, item),
1136 Err(error) => Err(error),
1137 }))
1138 }))
1139 }
1140
1141 #[must_use]
1142 pub fn stateful_map_result_with_supervision<State, Next, F>(
1143 self,
1144 seed: State,
1145 f: F,
1146 decider: SupervisionDecider,
1147 ) -> Flow<In, Next, Mat>
1148 where
1149 State: Clone + Send + Sync + 'static,
1150 Next: Send + 'static,
1151 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
1152 {
1153 let stage = Arc::new(f);
1154 self.via(Flow::from_transform(move |input| {
1155 let stage = Arc::clone(&stage);
1156 let decider = Arc::clone(&decider);
1157 let seed = seed.clone();
1158 let mut state = seed.clone();
1159 Box::new(input.filter_map(move |item| match item {
1160 Ok(item) => match call_supervised("stateful_map_result callback", || {
1161 stage(&mut state, item)
1162 }) {
1163 Ok(next) => Some(Ok(next)),
1164 Err(error) => match decide_supervision(&decider, &error) {
1165 SupervisionDirective::Stop => Some(Err(error)),
1166 SupervisionDirective::Resume => None,
1167 SupervisionDirective::Restart => {
1168 state = seed.clone();
1169 None
1170 }
1171 },
1172 },
1173 Err(error) => Some(Err(error)),
1174 }))
1175 }))
1176 }
1177
1178 #[must_use]
1179 pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Flow<In, Next, Mat>
1180 where
1181 State: Clone + Send + Sync + 'static,
1182 Next: Send + 'static,
1183 F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
1184 I: IntoIterator<Item = Next>,
1185 I::IntoIter: Send + 'static,
1186 {
1187 let stage = Arc::new(f);
1188 self.via(Flow::from_transform(move |mut input| {
1189 let stage = Arc::clone(&stage);
1190 let mut state = seed.clone();
1191 let mut current = None::<I::IntoIter>;
1192 let mut done = false;
1193 Box::new(std::iter::from_fn(move || {
1194 loop {
1195 if let Some(iter) = &mut current {
1196 if let Some(item) = iter.next() {
1197 return Some(Ok(item));
1198 }
1199 current = None;
1200 }
1201
1202 if done {
1203 return None;
1204 }
1205
1206 match input.next()? {
1207 Ok(item) => current = Some(stage(&mut state, item).into_iter()),
1208 Err(error) => {
1209 done = true;
1210 return Some(Err(error));
1211 }
1212 }
1213 }
1214 }))
1215 }))
1216 }
1217
1218 #[must_use]
1219 pub fn stateful_map_concat_result<State, Next, F, I>(
1220 self,
1221 seed: State,
1222 f: F,
1223 ) -> Flow<In, Next, Mat>
1224 where
1225 State: Clone + Send + Sync + 'static,
1226 Next: Send + 'static,
1227 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1228 I: IntoIterator<Item = Next>,
1229 I::IntoIter: Send + 'static,
1230 {
1231 let stage = Arc::new(f);
1232 self.via(Flow::from_transform(move |mut input| {
1233 let stage = Arc::clone(&stage);
1234 let mut state = seed.clone();
1235 let mut current = None::<I::IntoIter>;
1236 let mut done = false;
1237 Box::new(std::iter::from_fn(move || {
1238 loop {
1239 if let Some(iter) = &mut current {
1240 if let Some(item) = iter.next() {
1241 return Some(Ok(item));
1242 }
1243 current = None;
1244 }
1245
1246 if done {
1247 return None;
1248 }
1249
1250 match input.next()? {
1251 Ok(item) => match stage(&mut state, item) {
1252 Ok(items) => current = Some(items.into_iter()),
1253 Err(error) => {
1254 done = true;
1255 return Some(Err(error));
1256 }
1257 },
1258 Err(error) => {
1259 done = true;
1260 return Some(Err(error));
1261 }
1262 }
1263 }
1264 }))
1265 }))
1266 }
1267
1268 #[must_use]
1269 pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
1270 self,
1271 seed: State,
1272 f: F,
1273 decider: SupervisionDecider,
1274 ) -> Flow<In, Next, Mat>
1275 where
1276 State: Clone + Send + Sync + 'static,
1277 Next: Send + 'static,
1278 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1279 I: IntoIterator<Item = Next>,
1280 I::IntoIter: Send + 'static,
1281 {
1282 let stage = Arc::new(f);
1283 self.via(Flow::from_transform(move |mut input| {
1284 let stage = Arc::clone(&stage);
1285 let decider = Arc::clone(&decider);
1286 let seed = seed.clone();
1287 let mut state = seed.clone();
1288 let mut current = None::<I::IntoIter>;
1289 let mut done = false;
1290 Box::new(std::iter::from_fn(move || {
1291 loop {
1292 if let Some(iter) = &mut current {
1293 if let Some(item) = iter.next() {
1294 return Some(Ok(item));
1295 }
1296 current = None;
1297 }
1298
1299 if done {
1300 return None;
1301 }
1302
1303 match input.next()? {
1304 Ok(item) => {
1305 match call_supervised("stateful_map_concat_result callback", || {
1306 stage(&mut state, item)
1307 }) {
1308 Ok(items) => current = Some(items.into_iter()),
1309 Err(error) => match decide_supervision(&decider, &error) {
1310 SupervisionDirective::Stop => {
1311 done = true;
1312 return Some(Err(error));
1313 }
1314 SupervisionDirective::Resume => {}
1315 SupervisionDirective::Restart => state = seed.clone(),
1316 },
1317 }
1318 }
1319 Err(error) => {
1320 done = true;
1321 return Some(Err(error));
1322 }
1323 }
1324 }
1325 }))
1326 }))
1327 }
1328
1329 #[must_use]
1330 pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Flow<In, Next, Mat>
1334 where
1335 Next: Send + 'static,
1336 F: Fn(Out) -> Fut + Send + Sync + 'static,
1337 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1338 {
1339 assert!(
1340 parallelism > 0,
1341 "map_async parallelism must be greater than zero"
1342 );
1343 let stage = Arc::new(f);
1344 self.via(Flow::from_runtime_transform_with_hints(
1345 move |input, _materializer| {
1346 let stage = Arc::clone(&stage);
1347 Ok(map_async_ordered(input, parallelism, stage))
1348 },
1349 FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
1350 ))
1351 }
1352
1353 #[must_use]
1354 pub fn map_async_with_supervision<Next, F, Fut>(
1359 self,
1360 parallelism: usize,
1361 f: F,
1362 decider: SupervisionDecider,
1363 ) -> Flow<In, Next, Mat>
1364 where
1365 Next: Send + 'static,
1366 F: Fn(Out) -> Fut + Send + Sync + 'static,
1367 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1368 {
1369 assert!(
1370 parallelism > 0,
1371 "map_async parallelism must be greater than zero"
1372 );
1373 let stage = Arc::new(f);
1374 self.via(Flow::from_runtime_transform(move |input, _materializer| {
1375 let stage = Arc::clone(&stage);
1376 let decider = Arc::clone(&decider);
1377 Ok(map_async_ordered_supervised(
1378 input,
1379 parallelism,
1380 stage,
1381 decider,
1382 ))
1383 }))
1384 }
1385
1386 #[must_use]
1387 pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Flow<In, Next, Mat>
1391 where
1392 Next: Send + 'static,
1393 F: Fn(Out) -> Fut + Send + Sync + 'static,
1394 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1395 {
1396 assert!(
1397 parallelism > 0,
1398 "map_async_unordered parallelism must be greater than zero"
1399 );
1400 let stage = Arc::new(f);
1401 self.via(Flow::from_runtime_transform_with_hints(
1402 move |input, _materializer| {
1403 let stage = Arc::clone(&stage);
1404 Ok(map_async_unordered(input, parallelism, stage))
1405 },
1406 FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
1407 ))
1408 }
1409
1410 #[must_use]
1411 pub fn map_async_unordered_with_supervision<Next, F, Fut>(
1412 self,
1413 parallelism: usize,
1414 f: F,
1415 decider: SupervisionDecider,
1416 ) -> Flow<In, Next, Mat>
1417 where
1418 Next: Send + 'static,
1419 F: Fn(Out) -> Fut + Send + Sync + 'static,
1420 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1421 {
1422 assert!(
1423 parallelism > 0,
1424 "map_async_unordered parallelism must be greater than zero"
1425 );
1426 let stage = Arc::new(f);
1427 self.via(Flow::from_runtime_transform(move |input, _materializer| {
1428 let stage = Arc::clone(&stage);
1429 let decider = Arc::clone(&decider);
1430 Ok(map_async_unordered_supervised(
1431 input,
1432 parallelism,
1433 stage,
1434 decider,
1435 ))
1436 }))
1437 }
1438
1439 #[must_use]
1440 pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
1444 self,
1445 parallelism: usize,
1446 per_partition: usize,
1447 partition: Partition,
1448 f: F,
1449 ) -> Flow<In, Next, Mat>
1450 where
1451 Key: Clone + Eq + Hash + Send + 'static,
1452 Next: Send + 'static,
1453 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
1454 F: Fn(Out) -> Fut + Send + Sync + 'static,
1455 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1456 {
1457 assert!(
1458 parallelism > 0,
1459 "map_async_partitioned parallelism must be greater than zero"
1460 );
1461 assert!(
1462 per_partition > 0,
1463 "map_async_partitioned per_partition must be greater than zero"
1464 );
1465 let partition = Arc::new(partition);
1466 let stage = Arc::new(f);
1467 self.via(Flow::from_runtime_transform_with_hints(
1468 move |input, _materializer| {
1469 let partition = Arc::clone(&partition);
1470 let stage = Arc::clone(&stage);
1471 Ok(map_async_partitioned(
1472 input,
1473 parallelism,
1474 per_partition,
1475 partition,
1476 stage,
1477 ))
1478 },
1479 FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
1480 ))
1481 }
1482
1483 #[must_use]
1484 pub fn take(self, n: usize) -> Flow<In, Out, Mat> {
1485 self.via(Flow::from_transform(move |input| Box::new(input.take(n))))
1486 }
1487
1488 #[must_use]
1489 pub fn drop(self, n: usize) -> Flow<In, Out, Mat> {
1490 self.via(Flow::from_transform(move |input| {
1491 let mut remaining = n;
1492 Box::new(input.filter_map(move |item| match item {
1493 Ok(_) if remaining > 0 => {
1494 remaining -= 1;
1495 None
1496 }
1497 other => Some(other),
1498 }))
1499 }))
1500 }
1501
1502 #[must_use]
1503 pub fn take_while<F>(self, predicate: F) -> Flow<In, Out, Mat>
1504 where
1505 F: Fn(&Out) -> bool + Send + Sync + 'static,
1506 {
1507 let predicate = Arc::new(predicate);
1508 self.via(Flow::from_transform(move |mut input| {
1509 let predicate = Arc::clone(&predicate);
1510 let mut open = true;
1511 Box::new(std::iter::from_fn(move || {
1512 if !open {
1513 return None;
1514 }
1515
1516 match input.next() {
1517 Some(Ok(item)) if predicate(&item) => Some(Ok(item)),
1518 Some(Ok(_)) | None => {
1519 open = false;
1520 None
1521 }
1522 Some(Err(error)) => Some(Err(error)),
1523 }
1524 }))
1525 }))
1526 }
1527
1528 #[must_use]
1529 pub fn drop_while<F>(self, predicate: F) -> Flow<In, Out, Mat>
1530 where
1531 F: Fn(&Out) -> bool + Send + Sync + 'static,
1532 {
1533 let predicate = Arc::new(predicate);
1534 self.via(Flow::from_transform(move |mut input| {
1535 let predicate = Arc::clone(&predicate);
1536 let mut dropping = true;
1537 Box::new(std::iter::from_fn(move || {
1538 loop {
1539 let next = input.next()?;
1540 match next {
1541 Ok(item) if dropping && predicate(&item) => continue,
1542 Ok(item) => {
1543 dropping = false;
1544 return Some(Ok(item));
1545 }
1546 Err(error) => return Some(Err(error)),
1547 }
1548 }
1549 }))
1550 }))
1551 }
1552
1553 #[must_use]
1554 pub fn limit(self, max: u64) -> Flow<In, Out, Mat> {
1555 self.via(Flow::from_transform(move |input| {
1556 let mut seen = 0_u64;
1557 Box::new(input.map(move |item| match item {
1558 Ok(item) if seen < max => {
1559 seen += 1;
1560 Ok(item)
1561 }
1562 Ok(_) => Err(StreamError::LimitExceeded { max }),
1563 Err(error) => Err(error),
1564 }))
1565 }))
1566 }
1567
1568 #[must_use]
1569 pub fn grouped(self, size: usize) -> Flow<In, Vec<Out>, Mat> {
1570 assert!(size > 0, "grouped size must be greater than zero");
1571 self.via(Flow::from_transform(move |mut input| {
1572 Box::new(std::iter::from_fn(move || {
1573 let mut group = Vec::with_capacity(size);
1574 while group.len() < size {
1575 match input.next() {
1576 Some(Ok(item)) => group.push(item),
1577 Some(Err(error)) => return Some(Err(error)),
1578 None => break,
1579 }
1580 }
1581
1582 if group.is_empty() {
1583 None
1584 } else {
1585 Some(Ok(group))
1586 }
1587 }))
1588 }))
1589 }
1590
1591 #[must_use]
1592 pub fn scan<State, F>(self, seed: State, f: F) -> Flow<In, State, Mat>
1593 where
1594 State: Clone + Send + Sync + 'static,
1595 F: Fn(State, Out) -> State + Send + Sync + 'static,
1596 {
1597 let stage = Arc::new(f);
1598 self.via(Flow::from_transform(move |mut input| {
1599 let stage = Arc::clone(&stage);
1600 let mut state = Some(seed.clone());
1601 let mut emit_seed = true;
1602 Box::new(std::iter::from_fn(move || {
1603 if emit_seed {
1604 emit_seed = false;
1605 return Some(Ok(state.as_ref().expect("scan state present").clone()));
1606 }
1607
1608 match input.next()? {
1609 Ok(item) => {
1610 let prev = state.take().expect("scan state present");
1613 let next = stage(prev, item);
1614 state = Some(next.clone());
1615 Some(Ok(next))
1616 }
1617 Err(error) => Some(Err(error)),
1618 }
1619 }))
1620 }))
1621 }
1622
1623 #[must_use]
1624 pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Flow<In, State, Mat>
1625 where
1626 State: Clone + Send + Sync + 'static,
1627 F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1628 Fut: Future<Output = StreamResult<State>> + Send + 'static,
1629 {
1630 let stage = Arc::new(f);
1631 self.via(Flow::from_transform(move |mut input| {
1632 let stage = Arc::clone(&stage);
1633 let mut state = Some(seed.clone());
1634 let mut emit_seed = true;
1635 let mut terminated = false;
1636 Box::new(std::iter::from_fn(move || {
1637 if terminated {
1638 return None;
1639 }
1640 if emit_seed {
1641 emit_seed = false;
1642 return Some(Ok(state
1643 .as_ref()
1644 .expect("scan_async state present")
1645 .clone()));
1646 }
1647
1648 match input.next()? {
1649 Ok(item) => {
1650 let prev = state.take().expect("scan_async state present");
1651 match catch_unwind_failed("scan_async factory", || stage(prev, item))
1652 .and_then(run_future_inline_or_spawn)
1653 {
1654 Ok(next) => {
1655 state = Some(next.clone());
1656 Some(Ok(next))
1657 }
1658 Err(error) => {
1659 terminated = true;
1660 Some(Err(error))
1661 }
1662 }
1663 }
1664 Err(error) => {
1665 terminated = true;
1666 Some(Err(error))
1667 }
1668 }
1669 }))
1670 }))
1671 }
1672
1673 #[must_use]
1674 pub fn scan_result<State, F>(self, seed: State, f: F) -> Flow<In, State, Mat>
1675 where
1676 State: Clone + Send + Sync + 'static,
1677 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1678 {
1679 let stage = Arc::new(f);
1680 self.via(Flow::from_transform(move |mut input| {
1681 let stage = Arc::clone(&stage);
1682 let mut state = Some(seed.clone());
1683 let mut emit_seed = true;
1684 Box::new(std::iter::from_fn(move || {
1685 if emit_seed {
1686 emit_seed = false;
1687 return Some(Ok(state.as_ref().expect("scan state present").clone()));
1688 }
1689
1690 match input.next()? {
1691 Ok(item) => {
1692 let prev = state.take().expect("scan state present");
1693 match stage(prev, item) {
1694 Ok(next) => {
1695 state = Some(next.clone());
1696 Some(Ok(next))
1697 }
1698 Err(error) => Some(Err(error)),
1699 }
1700 }
1701 Err(error) => Some(Err(error)),
1702 }
1703 }))
1704 }))
1705 }
1706
1707 #[must_use]
1708 pub fn scan_result_with_supervision<State, F>(
1709 self,
1710 seed: State,
1711 f: F,
1712 decider: SupervisionDecider,
1713 ) -> Flow<In, State, Mat>
1714 where
1715 State: Clone + Send + Sync + 'static,
1716 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1717 {
1718 let stage = Arc::new(f);
1719 self.via(Flow::from_transform(move |mut input| {
1720 let stage = Arc::clone(&stage);
1721 let decider = Arc::clone(&decider);
1722 let seed = seed.clone();
1723 let mut state = Some(seed.clone());
1724 let mut emit_seed = true;
1725 Box::new(std::iter::from_fn(move || {
1726 loop {
1727 if emit_seed {
1728 emit_seed = false;
1729 return Some(Ok(state.as_ref().expect("scan state present").clone()));
1730 }
1731
1732 match input.next()? {
1733 Ok(item) => {
1734 let prev = state.take().expect("scan state present");
1735 match call_supervised("scan_result callback", || {
1736 stage(prev.clone(), item)
1737 }) {
1738 Ok(next) => {
1739 state = Some(next.clone());
1740 return Some(Ok(next));
1741 }
1742 Err(error) => match decide_supervision(&decider, &error) {
1743 SupervisionDirective::Stop => return Some(Err(error)),
1744 SupervisionDirective::Resume => {
1745 state = Some(prev);
1746 }
1747 SupervisionDirective::Restart => {
1748 state = Some(seed.clone());
1749 emit_seed = true;
1750 }
1751 },
1752 }
1753 }
1754 Err(error) => return Some(Err(error)),
1755 }
1756 }
1757 }))
1758 }))
1759 }
1760
1761 #[must_use]
1762 pub fn sliding(self, size: usize, step: usize) -> Flow<In, Vec<Out>, Mat>
1763 where
1764 Out: Clone,
1765 {
1766 assert!(size > 0, "sliding size must be greater than zero");
1767 assert!(step > 0, "sliding step must be greater than zero");
1768 self.via(Flow::from_transform(move |mut input| {
1769 let mut buffer = VecDeque::with_capacity(size.max(step));
1775 let mut terminated = false;
1776
1777 Box::new(std::iter::from_fn(move || {
1778 if terminated {
1779 return None;
1780 }
1781
1782 loop {
1783 match input.next() {
1784 Some(Ok(item)) => {
1785 buffer.push_back(item);
1786 if buffer.len() < size {
1787 continue;
1788 } else if buffer.len() == size {
1789 return Some(Ok(buffer.iter().cloned().collect()));
1790 } else if step <= size {
1791 for _ in 0..step {
1792 buffer.pop_front();
1793 }
1794 if buffer.len() == size {
1795 return Some(Ok(buffer.iter().cloned().collect()));
1796 }
1797 } else if buffer.len() == step {
1798 buffer.clear();
1800 }
1801 }
1802 Some(Err(error)) => {
1803 terminated = true;
1804 return Some(Err(error));
1805 }
1806 None => {
1807 terminated = true;
1808 if !buffer.is_empty() && buffer.len() < size {
1809 return Some(Ok(buffer.iter().cloned().collect()));
1810 }
1811 return None;
1812 }
1813 }
1814 }
1815 }))
1816 }))
1817 }
1818
1819 #[must_use]
1820 pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
1821 where
1822 Acc: Clone + Send + Sync + 'static,
1823 F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1824 {
1825 let stage = Arc::new(f);
1826 self.via(Flow::from_transform(move |input| {
1827 let stage = Arc::clone(&stage);
1828 let mut acc = zero.clone();
1829 for item in input {
1830 match item {
1831 Ok(item) => acc = stage(acc, item),
1832 Err(error) => return Box::new(std::iter::once(Err(error))),
1833 }
1834 }
1835 Box::new(std::iter::once(Ok(acc)))
1836 }))
1837 }
1838
1839 #[must_use]
1840 pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
1841 where
1842 Acc: Clone + Send + Sync + 'static,
1843 F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1844 Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1845 {
1846 let stage = Arc::new(f);
1847 self.via(Flow::from_transform(move |mut input| {
1848 let stage = Arc::clone(&stage);
1849 let mut acc = Some(zero.clone());
1850 let mut done = false;
1851 Box::new(std::iter::from_fn(move || {
1852 if done {
1853 return None;
1854 }
1855 done = true;
1856
1857 for item in input.by_ref() {
1858 let item = match item {
1859 Ok(item) => item,
1860 Err(error) => return Some(Err(error)),
1861 };
1862 let current = acc.take().expect("fold_async accumulator present");
1863 match catch_unwind_failed("fold_async factory", || stage(current, item))
1864 .and_then(run_future_inline_or_spawn)
1865 {
1866 Ok(next) => acc = Some(next),
1867 Err(error) => return Some(Err(error)),
1868 }
1869 }
1870
1871 Some(Ok(acc.take().expect("fold_async accumulator present")))
1872 }))
1873 }))
1874 }
1875
1876 #[must_use]
1877 pub fn map_with_resource<Resource, Next, Create, F, Close>(
1878 self,
1879 create: Create,
1880 f: F,
1881 close: Close,
1882 ) -> Flow<In, Next, Mat>
1883 where
1884 Resource: Send + 'static,
1885 Next: Send + 'static,
1886 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1887 F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1888 Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1889 {
1890 let create = Arc::new(create);
1891 let stage = Arc::new(f);
1892 let close = Arc::new(close);
1893 self.via(Flow::from_transform(move |input| {
1894 Box::new(MapWithResourceStream {
1895 input,
1896 create: Arc::clone(&create),
1897 stage: Arc::clone(&stage),
1898 close: Arc::clone(&close),
1899 resource: None,
1900 created: false,
1901 pending_terminal: None,
1902 terminated: false,
1903 _marker: PhantomData,
1904 }) as BoxStream<Next>
1905 }))
1906 }
1907
1908 #[must_use]
1909 pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
1910 where
1911 Acc: Clone + Send + Sync + 'static,
1912 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1913 {
1914 let stage = Arc::new(f);
1915 self.via(Flow::from_transform(move |input| {
1916 let stage = Arc::clone(&stage);
1917 let mut acc = zero.clone();
1918 for item in input {
1919 match item {
1920 Ok(item) => match stage(acc, item) {
1921 Ok(next) => acc = next,
1922 Err(error) => return Box::new(std::iter::once(Err(error))),
1923 },
1924 Err(error) => return Box::new(std::iter::once(Err(error))),
1925 }
1926 }
1927 Box::new(std::iter::once(Ok(acc)))
1928 }))
1929 }
1930
1931 #[must_use]
1932 pub fn fold_result_with_supervision<Acc, F>(
1933 self,
1934 zero: Acc,
1935 f: F,
1936 decider: SupervisionDecider,
1937 ) -> Flow<In, Acc, Mat>
1938 where
1939 Acc: Clone + Send + Sync + 'static,
1940 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1941 {
1942 let stage = Arc::new(f);
1943 self.via(Flow::from_transform(move |input| {
1944 let stage = Arc::clone(&stage);
1945 let decider = Arc::clone(&decider);
1946 let mut acc = zero.clone();
1947 for item in input {
1948 match item {
1949 Ok(item) => {
1950 let previous = acc;
1951 match call_supervised("fold_result callback", || {
1952 stage(previous.clone(), item)
1953 }) {
1954 Ok(next) => acc = next,
1955 Err(error) => match decide_supervision(&decider, &error) {
1956 SupervisionDirective::Stop => {
1957 return Box::new(std::iter::once(Err(error)));
1958 }
1959 SupervisionDirective::Resume => acc = previous,
1960 SupervisionDirective::Restart => acc = zero.clone(),
1961 },
1962 }
1963 }
1964 Err(error) => return Box::new(std::iter::once(Err(error))),
1965 }
1966 }
1967 Box::new(std::iter::once(Ok(acc)))
1968 }))
1969 }
1970
1971 #[must_use]
1972 pub fn reduce<F>(self, f: F) -> Flow<In, Out, Mat>
1973 where
1974 F: Fn(Out, Out) -> Out + Send + Sync + 'static,
1975 {
1976 let stage = Arc::new(f);
1977 self.via(Flow::from_transform(move |mut input| {
1978 let Some(first) = input.next() else {
1979 return Box::new(std::iter::once(Err(StreamError::EmptyStream)));
1980 };
1981 let mut acc = match first {
1982 Ok(item) => item,
1983 Err(error) => return Box::new(std::iter::once(Err(error))),
1984 };
1985 for item in input {
1986 match item {
1987 Ok(item) => acc = stage(acc, item),
1988 Err(error) => return Box::new(std::iter::once(Err(error))),
1989 }
1990 }
1991 Box::new(std::iter::once(Ok(acc)))
1992 }))
1993 }
1994
1995 #[must_use]
1996 pub fn reduce_result<F>(self, f: F) -> Flow<In, Out, Mat>
1997 where
1998 Out: Clone,
1999 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
2000 {
2001 let stage = Arc::new(f);
2002 self.via(Flow::from_transform(move |mut input| {
2003 let Some(first) = input.next() else {
2004 return Box::new(std::iter::once(Err(StreamError::EmptyStream)));
2005 };
2006 let mut acc = match first {
2007 Ok(item) => item,
2008 Err(error) => return Box::new(std::iter::once(Err(error))),
2009 };
2010 for item in input {
2011 match item {
2012 Ok(item) => match stage(acc, item) {
2013 Ok(next) => acc = next,
2014 Err(error) => return Box::new(std::iter::once(Err(error))),
2015 },
2016 Err(error) => return Box::new(std::iter::once(Err(error))),
2017 }
2018 }
2019 Box::new(std::iter::once(Ok(acc)))
2020 }))
2021 }
2022
2023 #[must_use]
2024 pub fn reduce_result_with_supervision<F>(
2025 self,
2026 f: F,
2027 decider: SupervisionDecider,
2028 ) -> Flow<In, Out, Mat>
2029 where
2030 Out: Clone,
2031 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
2032 {
2033 let stage = Arc::new(f);
2034 self.via(Flow::from_transform(move |input| {
2035 let stage = Arc::clone(&stage);
2036 let decider = Arc::clone(&decider);
2037 let mut acc = None::<Out>;
2038 for item in input {
2039 match item {
2040 Ok(item) => {
2041 let Some(previous) = acc.take() else {
2042 acc = Some(item);
2043 continue;
2044 };
2045 match call_supervised("reduce_result callback", || {
2046 stage(previous.clone(), item)
2047 }) {
2048 Ok(next) => acc = Some(next),
2049 Err(error) => match decide_supervision(&decider, &error) {
2050 SupervisionDirective::Stop => {
2051 return Box::new(std::iter::once(Err(error)));
2052 }
2053 SupervisionDirective::Resume => acc = Some(previous),
2054 SupervisionDirective::Restart => acc = None,
2055 },
2056 }
2057 }
2058 Err(error) => return Box::new(std::iter::once(Err(error))),
2059 }
2060 }
2061 match acc {
2062 Some(acc) => Box::new(std::iter::once(Ok(acc))),
2063 None => Box::new(std::iter::once(Err(StreamError::EmptyStream))),
2064 }
2065 }))
2066 }
2067
2068 #[must_use]
2069 pub fn map_error<F>(self, f: F) -> Flow<In, Out, Mat>
2070 where
2071 F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
2072 {
2073 let stage = Arc::new(f);
2074 self.via(Flow::from_transform(move |input| {
2075 let stage = Arc::clone(&stage);
2076 Box::new(input.map(move |item| item.map_err(|error| stage(error))))
2077 }))
2078 }
2079
2080 #[must_use]
2081 pub fn recover<F>(self, f: F) -> Flow<In, Out, Mat>
2082 where
2083 F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
2084 {
2085 let stage = Arc::new(f);
2086 self.via(Flow::from_transform(move |mut input| {
2087 let stage = Arc::clone(&stage);
2088 let mut done = false;
2089 Box::new(std::iter::from_fn(move || {
2090 if done {
2091 return None;
2092 }
2093 match input.next()? {
2094 Ok(item) => Some(Ok(item)),
2095 Err(error) => {
2096 done = true;
2097 match stage(error.clone()) {
2098 Some(item) => Some(Ok(item)),
2099 None => Some(Err(error)),
2100 }
2101 }
2102 }
2103 }))
2104 }))
2105 }
2106
2107 #[must_use]
2108 pub fn recover_with<F>(self, f: F) -> Flow<In, Out, Mat>
2109 where
2110 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
2111 {
2112 self.recover_with_attempts(None, f)
2114 }
2115
2116 #[must_use]
2117 pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Flow<In, Out, Mat>
2118 where
2119 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
2120 {
2121 self.recover_with_attempts(Some(retries), f)
2122 }
2123
2124 fn recover_with_attempts<F>(self, attempts: Option<usize>, f: F) -> Flow<In, Out, Mat>
2125 where
2126 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
2127 {
2128 let stage = Arc::new(f);
2129 self.via(Flow::from_runtime_transform(move |input, materializer| {
2130 let replacement_materializer =
2131 materializer.with_name_prefix(materializer.name_prefix().to_owned());
2132 let stage = Arc::clone(&stage);
2133 let mut remaining_retries = attempts;
2135 let mut current = input;
2136 let mut terminated = false;
2137
2138 Ok(Box::new(std::iter::from_fn(move || {
2139 if terminated {
2140 return None;
2141 }
2142
2143 loop {
2144 match current.next() {
2145 Some(Ok(item)) => return Some(Ok(item)),
2146 Some(Err(error)) if remaining_retries != Some(0) => {
2147 if let Some(remaining) = remaining_retries.as_mut() {
2148 *remaining -= 1;
2149 }
2150 match stage(error.clone()) {
2151 Some(source) => match Arc::clone(&source.factory)
2152 .create(&replacement_materializer)
2153 {
2154 Ok((stream, _)) => current = stream,
2155 Err(error) => {
2156 terminated = true;
2157 return Some(Err(error));
2158 }
2159 },
2160 None => {
2161 terminated = true;
2162 return Some(Err(error));
2163 }
2164 }
2165 }
2166 Some(Err(error)) => {
2167 terminated = true;
2168 return Some(Err(error));
2169 }
2170 None => {
2171 terminated = true;
2172 return None;
2173 }
2174 }
2175 }
2176 })) as BoxStream<Out>)
2177 }))
2178 }
2179
2180 #[must_use]
2181 pub fn on_error_complete(self) -> Flow<In, Out, Mat> {
2182 self.via(Flow::from_transform(move |mut input| {
2183 let mut done = false;
2184 Box::new(std::iter::from_fn(move || {
2185 if done {
2186 return None;
2187 }
2188 match input.next()? {
2189 Ok(item) => Some(Ok(item)),
2190 Err(_) => {
2191 done = true;
2192 None
2193 }
2194 }
2195 }))
2196 }))
2197 }
2198
2199 #[must_use]
2200 pub fn prefix_and_tail(self, n: usize) -> Flow<In, (Vec<Out>, Source<Out>), Mat> {
2201 self.via(Flow::from_runtime_transform(move |input, _materializer| {
2202 Ok(prefix_and_tail_stream(input, n))
2203 }))
2204 }
2205
2206 #[must_use]
2207 pub fn flat_map_prefix<Next, NextMat, F>(self, n: usize, f: F) -> Flow<In, Next, Mat>
2208 where
2209 Next: Send + 'static,
2210 NextMat: Send + 'static,
2211 F: Fn(Vec<Out>) -> Flow<Out, Next, NextMat> + Send + Sync + 'static,
2212 Out: Clone,
2213 {
2214 let stage = Arc::new(f);
2215 self.via(Flow::from_runtime_transform(
2216 move |mut input, materializer| {
2217 let mut prefix = Vec::with_capacity(n);
2218 while prefix.len() < n {
2219 match input.next() {
2220 Some(Ok(item)) => prefix.push(item),
2221 Some(Err(error)) => {
2222 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>);
2223 }
2224 None => break,
2225 }
2226 }
2227
2228 let flow = stage(prefix);
2229 let transform = flow.transform;
2230 let _ = (flow.materialize)()?;
2231 match transform {
2232 FlowTransform::Pure(transform) => Ok(transform(input)),
2233 FlowTransform::Runtime(transform) => transform(input, materializer),
2234 }
2235 },
2236 ))
2237 }
2238
2239 #[must_use]
2240 pub fn group_by<Key, F>(
2241 self,
2242 max_substreams: usize,
2243 f: F,
2244 allow_closed_substream_recreation: bool,
2245 ) -> Flow<In, Source<Out>, Mat>
2246 where
2247 Key: Clone + Eq + Hash + Send + 'static,
2248 F: Fn(&Out) -> Key + Send + Sync + 'static,
2249 Out: Clone,
2250 {
2251 self.group_by_with_batching(
2252 max_substreams,
2253 f,
2254 allow_closed_substream_recreation,
2255 GroupByBatchMode::Immediate,
2256 )
2257 }
2258
2259 pub(super) fn group_by_with_batching<Key, F>(
2260 self,
2261 max_substreams: usize,
2262 f: F,
2263 allow_closed_substream_recreation: bool,
2264 batch_mode: GroupByBatchMode,
2265 ) -> Flow<In, Source<Out>, Mat>
2266 where
2267 Key: Clone + Eq + Hash + Send + 'static,
2268 F: Fn(&Out) -> Key + Send + Sync + 'static,
2269 Out: Clone,
2270 {
2271 assert!(
2272 max_substreams > 0,
2273 "group_by max_substreams must be greater than zero"
2274 );
2275 let key_fn = Arc::new(f);
2276 self.via(Flow::from_runtime_transform(move |input, materializer| {
2277 Ok(group_by_stream(
2278 input,
2279 max_substreams,
2280 allow_closed_substream_recreation,
2281 Arc::clone(&key_fn),
2282 batch_mode,
2283 materializer,
2284 ))
2285 }))
2286 }
2287
2288 #[must_use]
2289 pub fn split_when<F>(self, predicate: F) -> Flow<In, Source<Out>, Mat>
2290 where
2291 F: Fn(&Out) -> bool + Send + Sync + 'static,
2292 Out: Clone,
2293 {
2294 let predicate = Arc::new(predicate);
2295 self.via(Flow::from_runtime_transform(move |input, materializer| {
2296 Ok(split_streams(
2297 input,
2298 SplitMode::When,
2299 Arc::clone(&predicate),
2300 materializer,
2301 ))
2302 }))
2303 }
2304
2305 #[must_use]
2306 pub fn split_after<F>(self, predicate: F) -> Flow<In, Source<Out>, Mat>
2307 where
2308 F: Fn(&Out) -> bool + Send + Sync + 'static,
2309 Out: Clone,
2310 {
2311 let predicate = Arc::new(predicate);
2312 self.via(Flow::from_runtime_transform(move |input, materializer| {
2313 Ok(split_streams(
2314 input,
2315 SplitMode::After,
2316 Arc::clone(&predicate),
2317 materializer,
2318 ))
2319 }))
2320 }
2321
2322 #[must_use]
2323 pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Flow<In, Next, Mat>
2324 where
2325 Next: Send + 'static,
2326 NextMat: Send + 'static,
2327 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
2328 {
2329 let stage = Arc::new(f);
2330 self.via(Flow::from_runtime_transform(move |input, materializer| {
2331 Ok(flat_map_concat_stream(
2332 input,
2333 Arc::clone(&stage),
2334 materializer,
2335 ))
2336 }))
2337 }
2338
2339 #[must_use]
2340 pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Flow<In, Next, Mat>
2341 where
2342 Next: Send + 'static,
2343 NextMat: Send + 'static,
2344 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
2345 {
2346 assert!(
2347 breadth > 0,
2348 "flat_map_merge breadth must be greater than zero"
2349 );
2350 let stage = Arc::new(f);
2351 self.via(Flow::from_runtime_transform_with_hints(
2352 move |input, materializer| {
2353 Ok(flat_map_merge_stream(
2354 input,
2355 breadth,
2356 Arc::clone(&stage),
2357 materializer,
2358 ))
2359 },
2360 FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
2361 ))
2362 }
2363
2364 #[must_use]
2365 pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2366 where
2367 Mat2: Send + 'static,
2368 {
2369 self.concat_sources([that])
2370 }
2371
2372 #[must_use]
2373 pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2374 where
2375 Mat2: Send + 'static,
2376 {
2377 let that_factory = that.factory;
2378 self.via(Flow::from_runtime_transform(move |input, materializer| {
2379 let primary = input;
2380 Ok(concat_streams_lazy(
2381 primary,
2382 vec![Arc::clone(&that_factory)],
2383 materializer,
2384 ))
2385 }))
2386 }
2387
2388 #[must_use]
2389 pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
2390 where
2391 Mat2: Send + 'static,
2392 I: IntoIterator<Item = Source<Out, Mat2>>,
2393 {
2394 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2395 self.via(Flow::from_runtime_transform(move |input, materializer| {
2396 Ok(concat_streams_lazy(
2397 input,
2398 source_factories.clone(),
2399 materializer,
2400 ))
2401 }))
2402 }
2403
2404 #[must_use]
2405 pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2406 where
2407 Mat2: Send + 'static,
2408 {
2409 self.prepend_sources([that])
2410 }
2411
2412 #[must_use]
2413 pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2414 where
2415 Mat2: Send + 'static,
2416 {
2417 self.prepend(that)
2418 }
2419
2420 #[must_use]
2421 pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2422 where
2423 Mat2: Send + 'static,
2424 {
2425 let secondary_factory = secondary.factory;
2426 self.via(Flow::from_runtime_transform(move |input, materializer| {
2427 let secondary = match Arc::clone(&secondary_factory).create(materializer) {
2428 Ok((stream, _)) => stream,
2429 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
2430 };
2431 Ok(or_else_stream(input, secondary))
2432 }))
2433 }
2434
2435 #[must_use]
2436 pub fn interleave<Mat2>(
2437 self,
2438 that: Source<Out, Mat2>,
2439 segment_size: usize,
2440 ) -> Flow<In, Out, Mat>
2441 where
2442 Mat2: Send + 'static,
2443 {
2444 self.interleave_all([that], segment_size, false)
2445 }
2446
2447 #[must_use]
2448 pub fn interleave_all<Mat2, I>(
2449 self,
2450 those: I,
2451 segment_size: usize,
2452 eager_close: bool,
2453 ) -> Flow<In, Out, Mat>
2454 where
2455 Mat2: Send + 'static,
2456 I: IntoIterator<Item = Source<Out, Mat2>>,
2457 {
2458 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2459 self.via(Flow::from_runtime_transform(move |input, materializer| {
2460 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2461 streams.push(input);
2462 for factory in &source_factories {
2463 let stream = match Arc::clone(factory).create(materializer) {
2464 Ok((stream, _)) => stream,
2465 Err(error) => {
2466 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2467 }
2468 };
2469 streams.push(stream);
2470 }
2471 Ok(interleave_streams(streams, segment_size, eager_close))
2472 }))
2473 }
2474
2475 #[must_use]
2476 pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2477 where
2478 Out: Ord,
2479 Mat2: Send + 'static,
2480 {
2481 let source_factory = that.factory;
2482 self.via(Flow::from_runtime_transform(move |input, materializer| {
2483 let other = match Arc::clone(&source_factory).create(materializer) {
2484 Ok((stream, _)) => stream,
2485 Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>),
2486 };
2487 Ok(merge_sorted_stream(input, other))
2488 }))
2489 }
2490
2491 #[must_use]
2492 pub fn merge_latest<Mat2>(
2493 self,
2494 that: Source<Out, Mat2>,
2495 eager_complete: bool,
2496 ) -> Flow<In, Vec<Out>, Mat>
2497 where
2498 Out: Clone,
2499 Mat2: Send + 'static,
2500 {
2501 let source_factory = that.factory;
2502 self.via(Flow::from_runtime_transform(move |input, materializer| {
2503 let other = match Arc::clone(&source_factory).create(materializer) {
2504 Ok((stream, _)) => stream,
2505 Err(error) => {
2506 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>);
2507 }
2508 };
2509 Ok(merge_latest_streams(vec![input, other], eager_complete))
2510 }))
2511 }
2512
2513 #[must_use]
2514 pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Flow<In, Out, Mat>
2515 where
2516 Mat2: Send + 'static,
2517 I: IntoIterator<Item = Source<Out, Mat2>>,
2518 {
2519 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2520 self.via(Flow::from_runtime_transform(move |input, materializer| {
2521 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2522 streams.push(input);
2523 for factory in &source_factories {
2524 let stream = match Arc::clone(factory).create(materializer) {
2525 Ok((stream, _)) => stream,
2526 Err(error) => {
2527 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2528 }
2529 };
2530 streams.push(stream);
2531 }
2532 Ok(merge_streams(streams, eager_complete))
2533 }))
2534 }
2535
2536 #[must_use]
2537 pub fn zip_with<Mat2, Out2, Next, F>(
2538 self,
2539 that: Source<Out2, Mat2>,
2540 combine: F,
2541 ) -> Flow<In, Next, Mat>
2542 where
2543 Out2: Send + 'static,
2544 Next: Send + 'static,
2545 Mat2: Send + 'static,
2546 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
2547 {
2548 let source_factory = that.factory;
2549 let combine = Arc::new(combine);
2550 self.via(Flow::from_runtime_transform(move |input, materializer| {
2551 let other = match Arc::clone(&source_factory).create(materializer) {
2552 Ok((stream, _)) => stream,
2553 Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>),
2554 };
2555 let combine = Arc::clone(&combine);
2556 Ok(Box::new(
2557 zip_streams(input, other)
2558 .map(move |item| item.map(|(left, right)| combine(left, right))),
2559 ) as BoxStream<Next>)
2560 }))
2561 }
2562
2563 #[must_use]
2564 pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Flow<In, (Out, Out2), Mat>
2565 where
2566 Out: Clone,
2567 Out2: Clone + Send + 'static,
2568 Mat2: Send + 'static,
2569 {
2570 self.zip_latest_with(that, true, |left, right| (left, right))
2571 }
2572
2573 #[must_use]
2574 pub fn zip_latest_with<Mat2, Out2, Next, F>(
2575 self,
2576 that: Source<Out2, Mat2>,
2577 eager_complete: bool,
2578 combine: F,
2579 ) -> Flow<In, Next, Mat>
2580 where
2581 Out: Clone,
2582 Out2: Clone + Send + 'static,
2583 Next: Send + 'static,
2584 Mat2: Send + 'static,
2585 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
2586 {
2587 let source_factory = that.factory;
2588 let combine = Arc::new(combine);
2589 self.via(Flow::from_runtime_transform(move |input, materializer| {
2590 let other = match Arc::clone(&source_factory).create(materializer) {
2591 Ok((stream, _)) => stream,
2592 Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>),
2593 };
2594 Ok(zip_latest_with_stream(
2595 input,
2596 other,
2597 eager_complete,
2598 Arc::clone(&combine),
2599 ))
2600 }))
2601 }
2602
2603 #[must_use]
2604 pub fn zip_with_index(self) -> Flow<In, (Out, u64), Mat> {
2605 self.via(Flow::from_runtime_transform(
2606 move |mut input, _materializer| {
2607 let mut index = 0_u64;
2608 Ok(Box::new(std::iter::from_fn(move || {
2609 input.next().map(|item| {
2610 item.map(|value| {
2611 let pair = (value, index);
2612 index = index.wrapping_add(1);
2613 pair
2614 })
2615 })
2616 })) as BoxStream<(Out, u64)>)
2617 },
2618 ))
2619 }
2620
2621 #[must_use]
2622 pub fn zip_all<Mat2, Out2>(
2623 self,
2624 that: Source<Out2, Mat2>,
2625 this_elem: Out,
2626 that_elem: Out2,
2627 ) -> Flow<In, (Out, Out2), Mat>
2628 where
2629 Out: Clone + Sync,
2630 Out2: Clone + Send + Sync + 'static,
2631 Mat2: Send + 'static,
2632 {
2633 let source_factory = that.factory;
2634 self.via(Flow::from_runtime_transform(move |input, materializer| {
2635 let other = match Arc::clone(&source_factory).create(materializer) {
2636 Ok((stream, _)) => stream,
2637 Err(error) => {
2638 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>);
2639 }
2640 };
2641 Ok(zip_all_stream(
2642 input,
2643 other,
2644 this_elem.clone(),
2645 that_elem.clone(),
2646 ))
2647 }))
2648 }
2649
2650 #[must_use]
2651 pub fn also_to<SideMat>(self, sink: Sink<Out, SideMat>) -> Flow<In, Out, Mat>
2652 where
2653 Out: Clone,
2654 SideMat: Send + 'static,
2655 {
2656 self.via(Flow::from_runtime_transform(
2657 move |mut input: BoxStream<Out>, materializer| {
2658 let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 0)?;
2659 let mut sender = Some(side_sender);
2660 let side_mat = side_mat;
2661 Ok(Box::new(std::iter::from_fn(move || match input.next() {
2662 Some(Ok(item)) => {
2663 let _ = &side_mat;
2664 if sender
2665 .as_ref()
2666 .is_some_and(|sender| sender.send(Ok(item.clone())).is_err())
2667 {
2668 sender = None;
2669 return None;
2670 }
2671 Some(Ok(item))
2672 }
2673 Some(Err(error)) => {
2674 let _ = &side_mat;
2675 let _ = sender
2676 .as_ref()
2677 .and_then(|sender| sender.send(Err(error.clone())).ok());
2678 sender = None;
2679 Some(Err(error))
2680 }
2681 None => {
2682 let _ = &side_mat;
2683 sender = None;
2684 None
2685 }
2686 })) as BoxStream<Out>)
2687 },
2688 ))
2689 }
2690
2691 #[must_use]
2692 pub fn also_to_all<SideMat, I>(self, sinks: I) -> Flow<In, Out, Mat>
2693 where
2694 Out: Clone,
2695 SideMat: Send + 'static,
2696 I: IntoIterator<Item = Sink<Out, SideMat>>,
2697 {
2698 let sinks: Vec<_> = sinks.into_iter().collect();
2699 if sinks.is_empty() {
2700 return self;
2701 }
2702
2703 self.via(Flow::from_runtime_transform(
2704 move |mut input: BoxStream<Out>, materializer| {
2705 let mut sides = sinks
2706 .iter()
2707 .map(|sink| materialize_side_sink(sink, materializer, 0))
2708 .collect::<StreamResult<Vec<_>>>()?;
2709 Ok(Box::new(std::iter::from_fn(move || match input.next() {
2710 Some(Ok(item)) => {
2711 for (sender, _) in &sides {
2712 if sender.send(Ok(item.clone())).is_err() {
2713 sides.clear();
2714 return None;
2715 }
2716 }
2717 Some(Ok(item))
2718 }
2719 Some(Err(error)) => {
2720 for (sender, _) in &sides {
2721 let _ = sender.send(Err(error.clone())).ok();
2722 }
2723 sides.clear();
2724 Some(Err(error))
2725 }
2726 None => {
2727 sides.clear();
2728 None
2729 }
2730 })) as BoxStream<Out>)
2731 },
2732 ))
2733 }
2734
2735 #[must_use]
2736 pub fn divert_to<SideMat, F>(self, sink: Sink<Out, SideMat>, predicate: F) -> Flow<In, Out, Mat>
2737 where
2738 SideMat: Send + 'static,
2739 F: Fn(&Out) -> bool + Send + Sync + 'static,
2740 {
2741 let predicate = Arc::new(predicate);
2742 self.via(Flow::from_runtime_transform(
2743 move |mut input: BoxStream<Out>, materializer| {
2744 let predicate = Arc::clone(&predicate);
2745 let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 0)?;
2746 let mut sender = Some(side_sender);
2747 let side_mat = side_mat;
2748 Ok(Box::new(std::iter::from_fn(move || {
2749 loop {
2750 let _ = &side_mat;
2751 match input.next() {
2752 Some(Ok(item)) if predicate(&item) => {
2753 if sender
2754 .as_ref()
2755 .is_some_and(|sender| sender.send(Ok(item)).is_err())
2756 {
2757 sender = None;
2758 return None;
2759 }
2760 }
2761 Some(Ok(item)) => return Some(Ok(item)),
2762 Some(Err(error)) => {
2763 let _ = sender
2764 .as_ref()
2765 .and_then(|sender| sender.send(Err(error.clone())).ok());
2766 sender = None;
2767 return Some(Err(error));
2768 }
2769 None => {
2770 sender = None;
2771 return None;
2772 }
2773 }
2774 }
2775 })) as BoxStream<Out>)
2776 },
2777 ))
2778 }
2779
2780 #[must_use]
2781 pub fn wire_tap<SideMat>(self, sink: Sink<Out, SideMat>) -> Flow<In, Out, Mat>
2782 where
2783 Out: Clone,
2784 SideMat: Send + 'static,
2785 {
2786 self.via(Flow::from_runtime_transform(
2787 move |mut input: BoxStream<Out>, materializer| {
2788 let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 1)?;
2789 let mut sender = Some(side_sender);
2790 let side_mat = side_mat;
2791 Ok(Box::new(std::iter::from_fn(move || match input.next() {
2792 Some(Ok(item)) => {
2793 let _ = &side_mat;
2794 if let Some(side) = sender.as_ref() {
2795 match side.try_send(Ok(item.clone())) {
2796 Ok(()) | Err(std::sync::mpsc::TrySendError::Full(_)) => {}
2797 Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
2798 sender = None
2799 }
2800 }
2801 }
2802 Some(Ok(item))
2803 }
2804 Some(Err(error)) => {
2805 let _ = &side_mat;
2806 if let Some(side) = sender.as_ref() {
2807 match side.try_send(Err(error.clone())) {
2808 Ok(())
2809 | Err(std::sync::mpsc::TrySendError::Full(_))
2810 | Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {}
2811 }
2812 }
2813 sender = None;
2814 Some(Err(error))
2815 }
2816 None => {
2817 let _ = &side_mat;
2818 sender = None;
2819 None
2820 }
2821 })) as BoxStream<Out>)
2822 },
2823 ))
2824 }
2825
2826 fn concat_sources<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
2827 where
2828 Mat2: Send + 'static,
2829 I: IntoIterator<Item = Source<Out, Mat2>>,
2830 {
2831 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2832 self.via(Flow::from_runtime_transform(move |input, materializer| {
2833 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2834 streams.push(input);
2835 for factory in &source_factories {
2836 let stream = match Arc::clone(factory).create(materializer) {
2837 Ok((stream, _)) => stream,
2838 Err(error) => {
2839 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2840 }
2841 };
2842 streams.push(stream);
2843 }
2844 Ok(concat_streams(streams))
2845 }))
2846 }
2847
2848 fn prepend_sources<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
2849 where
2850 Mat2: Send + 'static,
2851 I: IntoIterator<Item = Source<Out, Mat2>>,
2852 {
2853 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2854 self.via(Flow::from_runtime_transform(move |input, materializer| {
2855 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2856 for factory in &source_factories {
2857 let stream = match Arc::clone(factory).create(materializer) {
2858 Ok((stream, _)) => stream,
2859 Err(error) => {
2860 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2861 }
2862 };
2863 streams.push(stream);
2864 }
2865 streams.push(input);
2866 Ok(concat_streams(streams))
2867 }))
2868 }
2869
2870 #[must_use]
2873 pub fn intersperse(self, inject: Out) -> Flow<In, Out, Mat>
2874 where
2875 Out: Clone + Sync,
2876 {
2877 let inject = Arc::new(inject);
2878 self.via(Flow::from_transform(move |mut input| {
2879 let inject = Arc::clone(&inject);
2880 let mut first = true;
2881 Box::new(std::iter::from_fn(move || {
2882 if first {
2883 first = false;
2884 match input.next() {
2885 None => None,
2886 Some(item) => {
2887 if item.is_err() {
2888 first = true;
2889 }
2890 Some(item)
2891 }
2892 }
2893 } else {
2894 match input.next() {
2895 None => None,
2896 Some(item) => {
2897 if item.is_err() {
2898 first = true;
2899 Some(item)
2900 } else {
2901 Some(Ok((*inject).clone()))
2902 }
2903 }
2904 }
2905 }
2906 }))
2907 }))
2908 }
2909
2910 #[must_use]
2911 pub fn flatten_optional<Inner>(self) -> Flow<In, Inner, Mat>
2912 where
2913 Out: Into<Option<Inner>>,
2914 Inner: Send + 'static,
2915 {
2916 self.filter_map(|item| item.into())
2917 }
2918
2919 #[must_use]
2920 pub fn grouped_weighted<F>(self, max_weight: usize, cost_fn: F) -> Flow<In, Vec<Out>, Mat>
2921 where
2922 F: Fn(&Out) -> usize + Send + Sync + 'static,
2923 {
2924 let cost_fn = Arc::new(cost_fn);
2925 self.via(Flow::from_transform(move |mut input| {
2926 let cost_fn = Arc::clone(&cost_fn);
2927 Box::new(std::iter::from_fn(move || {
2928 let mut group = Vec::new();
2929 let mut weight = 0usize;
2930 while weight < max_weight {
2931 match input.next() {
2932 Some(Ok(item)) => {
2933 let item_weight = cost_fn(&item);
2934 if weight > 0 && weight + item_weight > max_weight {
2935 group.push(item);
2936 break;
2937 }
2938 weight += item_weight;
2939 group.push(item);
2940 }
2941 Some(Err(error)) => return Some(Err(error)),
2942 None => break,
2943 }
2944 }
2945 if group.is_empty() {
2946 None
2947 } else {
2948 Some(Ok(group))
2949 }
2950 }))
2951 }))
2952 }
2953
2954 #[must_use]
2955 pub fn limit_weighted<F>(self, max_weight: usize, cost_fn: F) -> Flow<In, Out, Mat>
2956 where
2957 F: Fn(&Out) -> usize + Send + Sync + 'static,
2958 {
2959 let cost_fn = Arc::new(cost_fn);
2960 self.via(Flow::from_transform(move |mut input| {
2961 let cost_fn = Arc::clone(&cost_fn);
2962 let mut weight = 0usize;
2963 Box::new(std::iter::from_fn(move || match input.next()? {
2964 Ok(item) => {
2965 let item_weight = cost_fn(&item);
2966 if weight + item_weight > max_weight {
2967 Some(Err(StreamError::LimitExceeded {
2968 max: max_weight as u64,
2969 }))
2970 } else {
2971 weight += item_weight;
2972 Some(Ok(item))
2973 }
2974 }
2975 Err(error) => Some(Err(error)),
2976 }))
2977 }))
2978 }
2979
2980 #[must_use]
2981 pub fn contramap<NewIn, F>(self, f: F) -> Flow<NewIn, Out, Mat>
2982 where
2983 NewIn: Send + 'static,
2984 F: Fn(NewIn) -> In + Send + Sync + 'static,
2985 {
2986 let stage = Arc::new(f);
2987 Flow::from_transform(move |input| {
2988 let stage = Arc::clone(&stage);
2989 Box::new(input.map(move |item| item.map(|item| stage(item))))
2990 })
2991 .via_mat(self, Keep::right)
2992 }
2993
2994 #[must_use]
2995 pub fn monitor<F>(self, f: F) -> Flow<In, Out, Mat>
2996 where
2997 Out: Clone,
2998 F: Fn(&Out) + Send + Sync + 'static,
2999 {
3000 let stage = Arc::new(f);
3001 self.via(Flow::from_transform(move |input| {
3002 let stage = Arc::clone(&stage);
3003 Box::new(input.map(move |item| match item {
3004 Ok(item) => {
3005 stage(&item);
3006 Ok(item)
3007 }
3008 Err(error) => Err(error),
3009 }))
3010 }))
3011 }
3012
3013 #[must_use]
3014 pub fn watch_termination<CallbackMat, F>(self, materialize_callback: F) -> Flow<In, Out, Mat>
3015 where
3016 Mat: Clone,
3017 CallbackMat: Send + 'static,
3018 F: Fn(Mat) -> CallbackMat + Send + Sync + 'static,
3019 {
3020 let cb = Arc::new(materialize_callback);
3021 self.map_materialized_value(move |mat| {
3022 let _ = cb(mat.clone());
3023 mat
3024 })
3025 }
3026
3027 #[must_use]
3028 pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Sink<In, Mat>
3029 where
3030 SinkMat: Send + 'static,
3031 {
3032 self.to_mat(sink, Keep::left)
3033 }
3034
3035 #[must_use]
3036 pub fn to_mat<SinkMat, Combined, F>(
3037 self,
3038 sink: Sink<Out, SinkMat>,
3039 combine: F,
3040 ) -> Sink<In, Combined>
3041 where
3042 SinkMat: Send + 'static,
3043 Combined: Send + 'static,
3044 F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
3045 {
3046 let transform = self.transform;
3047 let materialize = self.materialize;
3048 let combine = Arc::new(combine);
3049 Sink::from_runner(move |input, materializer| {
3050 let flow_mat = materialize()?;
3051 let input = match &transform {
3052 FlowTransform::Pure(transform) => transform(input),
3053 FlowTransform::Runtime(transform) => transform(input, materializer)?,
3054 };
3055 let sink_mat = sink.run(input, materializer)?;
3056 Ok(combine(flow_mat, sink_mat))
3057 })
3058 }
3059
3060 #[must_use]
3061 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Flow<In, Out, NextMat>
3062 where
3063 NextMat: Send + 'static,
3064 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
3065 {
3066 let transform = self.transform;
3067 let materialize = self.materialize;
3068 let hints = self.hints;
3069 let attributes = self.attributes;
3070 let f = Arc::new(f);
3071 match transform {
3072 FlowTransform::Pure(transform) => Flow::from_parts_with_hints(
3073 move |input| transform(input),
3074 move || {
3075 let mat = materialize()?;
3076 Ok(f(mat))
3077 },
3078 hints,
3079 )
3080 .with_attributes(attributes),
3081 FlowTransform::Runtime(transform) => Flow {
3082 transform: FlowTransform::Runtime(transform),
3083 materialize: Arc::new(move || {
3084 let mat = materialize()?;
3085 Ok(f(mat))
3086 }),
3087 hints,
3088 attributes,
3089 },
3090 }
3091 }
3092}
3093
3094struct MapWithResourceStream<In, Out, Resource, Create, F, Close>
3095where
3096 Create: Fn() -> StreamResult<Resource>,
3097 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3098 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3099{
3100 input: BoxStream<In>,
3101 create: Arc<Create>,
3102 stage: Arc<F>,
3103 close: Arc<Close>,
3104 resource: Option<Resource>,
3105 created: bool,
3106 pending_terminal: Option<StreamError>,
3107 terminated: bool,
3108 _marker: PhantomData<fn() -> Out>,
3109}
3110
3111impl<In, Out, Resource, Create, F, Close> MapWithResourceStream<In, Out, Resource, Create, F, Close>
3112where
3113 Create: Fn() -> StreamResult<Resource>,
3114 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3115 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3116{
3117 fn ensure_created(&mut self) -> StreamResult<()> {
3118 if self.created {
3119 return Ok(());
3120 }
3121 self.created = true;
3122 let resource = catch_unwind_failed("map_with_resource create", || (self.create)())
3123 .and_then(|result| result)?;
3124 self.resource = Some(resource);
3125 Ok(())
3126 }
3127
3128 fn close_resource(&mut self) -> StreamResult<Option<Out>> {
3129 match self.resource.take() {
3130 Some(resource) => {
3131 catch_unwind_failed("map_with_resource close", || (self.close)(resource))
3132 .and_then(|result| result)
3133 }
3134 None => Ok(None),
3135 }
3136 }
3137
3138 fn close_with_terminal(&mut self, terminal: Option<StreamError>) -> Option<StreamResult<Out>> {
3139 self.terminated = terminal.is_none();
3140 match self.close_resource() {
3141 Ok(Some(item)) => {
3142 self.pending_terminal = terminal;
3143 Some(Ok(item))
3144 }
3145 Ok(None) => match terminal {
3146 Some(error) => {
3147 self.terminated = true;
3148 Some(Err(error))
3149 }
3150 None => {
3151 self.terminated = true;
3152 None
3153 }
3154 },
3155 Err(error) => {
3156 self.terminated = true;
3157 Some(Err(terminal.unwrap_or(error)))
3158 }
3159 }
3160 }
3161}
3162
3163impl<In, Out, Resource, Create, F, Close> Iterator
3164 for MapWithResourceStream<In, Out, Resource, Create, F, Close>
3165where
3166 Create: Fn() -> StreamResult<Resource>,
3167 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3168 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3169{
3170 type Item = StreamResult<Out>;
3171
3172 fn next(&mut self) -> Option<Self::Item> {
3173 if let Some(error) = self.pending_terminal.take() {
3174 self.terminated = true;
3175 return Some(Err(error));
3176 }
3177 if self.terminated {
3178 return None;
3179 }
3180 if let Err(error) = self.ensure_created() {
3181 self.terminated = true;
3182 return Some(Err(error));
3183 }
3184
3185 match self.input.next() {
3186 Some(Ok(item)) => {
3187 let result = {
3188 let resource = self
3189 .resource
3190 .as_mut()
3191 .expect("map_with_resource resource is open");
3192 catch_unwind_failed("map_with_resource function", || {
3193 (self.stage)(resource, item)
3194 })
3195 .and_then(|result| result)
3196 };
3197 match result {
3198 Ok(item) => Some(Ok(item)),
3199 Err(error) => self.close_with_terminal(Some(error)),
3200 }
3201 }
3202 Some(Err(error)) => self.close_with_terminal(Some(error)),
3203 None => self.close_with_terminal(None),
3204 }
3205 }
3206}
3207
3208impl<I1: Send + 'static, O1: Send + 'static, I2: Send + 'static, O2: Send + 'static>
3209 BidiFlow<I1, O1, I2, O2>
3210{
3211 #[must_use]
3212 pub fn from_flows<Mat1, Mat2>(top: Flow<I1, O1, Mat1>, bottom: Flow<I2, O2, Mat2>) -> Self
3213 where
3214 Mat1: Send + 'static,
3215 Mat2: Send + 'static,
3216 {
3217 Self {
3218 top: top.map_materialized_value(|_| NotUsed),
3219 bottom: bottom.map_materialized_value(|_| NotUsed),
3220 attributes: Attributes::default(),
3221 }
3222 }
3223}
3224
3225impl<I1: Send + 'static, O1: Send + 'static, I2: Send + 'static, O2: Send + 'static>
3226 BidiFlow<I1, O1, I2, O2>
3227{
3228 #[must_use]
3229 pub fn attributes(&self) -> &Attributes {
3230 &self.attributes
3231 }
3232
3233 #[must_use]
3234 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
3235 self.attributes = attributes;
3236 self
3237 }
3238
3239 #[must_use]
3240 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
3241 self.attributes = self.attributes.and(attributes);
3242 self
3243 }
3244
3245 #[must_use]
3246 pub fn named(self, name: impl Into<String>) -> Self {
3247 self.add_attributes(Attributes::named(name))
3248 }
3249
3250 #[must_use]
3251 pub fn join<Mat2>(self, flow: Flow<O1, I2, Mat2>) -> Flow<I1, O2, NotUsed>
3252 where
3253 Mat2: Send + 'static,
3254 {
3255 self.top
3256 .via(flow)
3257 .via(self.bottom)
3258 .map_materialized_value(|_| NotUsed)
3259 .with_attributes(self.attributes)
3260 }
3261
3262 #[must_use]
3263 pub fn atop<OO1: Send + 'static, II2: Send + 'static>(
3264 self,
3265 bidi: BidiFlow<O1, OO1, II2, I2>,
3266 ) -> BidiFlow<I1, OO1, II2, O2> {
3267 BidiFlow {
3268 top: self.top.via(bidi.top).map_materialized_value(|_| NotUsed),
3269 bottom: bidi
3270 .bottom
3271 .via(self.bottom)
3272 .map_materialized_value(|_| NotUsed),
3273 attributes: self.attributes.and(bidi.attributes),
3274 }
3275 }
3276
3277 #[must_use]
3278 pub fn reversed(self) -> BidiFlow<I2, O2, I1, O1> {
3279 BidiFlow {
3280 top: self.bottom,
3281 bottom: self.top.map_materialized_value(|_| NotUsed),
3282 attributes: self.attributes,
3283 }
3284 }
3285}
3286
3287impl<In, Out, Resource, Create, F, Close> Drop
3288 for MapWithResourceStream<In, Out, Resource, Create, F, Close>
3289where
3290 Create: Fn() -> StreamResult<Resource>,
3291 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3292 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3293{
3294 fn drop(&mut self) {
3295 let _ = self.close_resource();
3296 }
3297}
3298
3299fn materialize_inner_flow<In, Out, InnerMat>(
3300 flow: Flow<In, Out, InnerMat>,
3301 input: BoxStream<In>,
3302 materializer: &Materializer,
3303) -> StreamResult<(BoxStream<Out>, InnerMat)>
3304where
3305 In: Send + 'static,
3306 Out: Send + 'static,
3307 InnerMat: Send + 'static,
3308{
3309 let mat = (flow.materialize)()?;
3310 let stream = match flow.transform {
3311 FlowTransform::Pure(transform) => transform(input),
3312 FlowTransform::Runtime(transform) => transform(input, materializer)?,
3313 };
3314 Ok((stream, mat))
3315}
3316
3317struct FutureFlowStream<In, Out, InnerMat, F, Fut> {
3318 future: Arc<F>,
3319 materializer: Materializer,
3320 input: Option<BoxStream<In>>,
3321 current: Option<BoxStream<Out>>,
3322 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
3323 initialized: bool,
3324 terminated: bool,
3325 _marker: PhantomData<fn() -> Fut>,
3326}
3327
3328impl<In, Out, InnerMat, F, Fut> FutureFlowStream<In, Out, InnerMat, F, Fut>
3329where
3330 In: Send + 'static,
3331 Out: Send + 'static,
3332 InnerMat: Send + 'static,
3333 F: Fn() -> Fut,
3334 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3335{
3336 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
3337 if let Some(sender) = self.mat_sender.take() {
3338 let _ = sender.send(result);
3339 }
3340 }
3341
3342 fn initialize(&mut self) -> StreamResult<()> {
3343 if self.initialized {
3344 return Ok(());
3345 }
3346 self.initialized = true;
3347 let flow = match catch_unwind_failed("future_flow factory", || (self.future)())
3348 .and_then(run_future_inline_or_spawn)
3349 {
3350 Ok(flow) => flow,
3351 Err(error) => {
3352 self.complete_mat(Err(error.clone()));
3353 return Err(error);
3354 }
3355 };
3356 let input = self.input.take().expect("future_flow input available");
3357 match materialize_inner_flow(flow, input, &self.materializer) {
3358 Ok((stream, mat)) => {
3359 self.current = Some(stream);
3360 self.complete_mat(Ok(mat));
3361 Ok(())
3362 }
3363 Err(error) => {
3364 self.complete_mat(Err(error.clone()));
3365 Err(error)
3366 }
3367 }
3368 }
3369}
3370
3371impl<In, Out, InnerMat, F, Fut> Iterator for FutureFlowStream<In, Out, InnerMat, F, Fut>
3372where
3373 In: Send + 'static,
3374 Out: Send + 'static,
3375 InnerMat: Send + 'static,
3376 F: Fn() -> Fut,
3377 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3378{
3379 type Item = StreamResult<Out>;
3380
3381 fn next(&mut self) -> Option<Self::Item> {
3382 if self.terminated {
3383 return None;
3384 }
3385 if let Err(error) = self.initialize() {
3386 self.terminated = true;
3387 return Some(Err(error));
3388 }
3389 match self
3390 .current
3391 .as_mut()
3392 .expect("future_flow current stream initialized")
3393 .next()
3394 {
3395 Some(Ok(item)) => Some(Ok(item)),
3396 Some(Err(error)) => {
3397 self.terminated = true;
3398 Some(Err(error))
3399 }
3400 None => {
3401 self.terminated = true;
3402 None
3403 }
3404 }
3405 }
3406}
3407
3408impl<In, Out, InnerMat, F, Fut> Drop for FutureFlowStream<In, Out, InnerMat, F, Fut> {
3409 fn drop(&mut self) {
3410 if !self.initialized
3411 && let Some(sender) = self.mat_sender.take()
3412 {
3413 let _ = sender.send(Err(StreamError::Failed(
3414 "future flow was never materialized".into(),
3415 )));
3416 }
3417 }
3418}
3419
3420struct LazyFutureFlowStream<In, Out, InnerMat, F, Fut> {
3421 create: Arc<F>,
3422 materializer: Materializer,
3423 input: Option<BoxStream<In>>,
3424 current: Option<BoxStream<Out>>,
3425 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
3426 initialized: bool,
3427 terminated: bool,
3428 _marker: PhantomData<fn() -> Fut>,
3429}
3430
3431impl<In, Out, InnerMat, F, Fut> LazyFutureFlowStream<In, Out, InnerMat, F, Fut>
3432where
3433 In: Send + 'static,
3434 Out: Send + 'static,
3435 InnerMat: Send + 'static,
3436 F: Fn() -> Fut,
3437 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3438{
3439 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
3440 if let Some(sender) = self.mat_sender.take() {
3441 let _ = sender.send(result);
3442 }
3443 }
3444
3445 fn initialize(&mut self) -> Result<bool, StreamError> {
3446 if self.initialized {
3447 return Ok(true);
3448 }
3449 self.initialized = true;
3450 let first = match self
3451 .input
3452 .as_mut()
3453 .expect("lazy_future_flow input available")
3454 .next()
3455 {
3456 Some(Ok(item)) => item,
3457 Some(Err(error)) => {
3458 self.complete_mat(Err(error.clone()));
3459 return Err(error);
3460 }
3461 None => {
3462 self.complete_mat(Err(StreamError::Failed(
3463 "lazy flow was never materialized".into(),
3464 )));
3465 self.terminated = true;
3466 return Ok(false);
3467 }
3468 };
3469
3470 let flow = match catch_unwind_failed("lazy_future_flow factory", || (self.create)())
3471 .and_then(run_future_inline_or_spawn)
3472 {
3473 Ok(flow) => flow,
3474 Err(error) => {
3475 self.complete_mat(Err(error.clone()));
3476 return Err(error);
3477 }
3478 };
3479 let input = prepend_first_stream(
3480 first,
3481 self.input
3482 .take()
3483 .expect("lazy_future_flow input available after first element"),
3484 );
3485 match materialize_inner_flow(flow, input, &self.materializer) {
3486 Ok((stream, mat)) => {
3487 self.current = Some(stream);
3488 self.complete_mat(Ok(mat));
3489 Ok(true)
3490 }
3491 Err(error) => {
3492 self.complete_mat(Err(error.clone()));
3493 Err(error)
3494 }
3495 }
3496 }
3497}
3498
3499impl<In, Out, InnerMat, F, Fut> Iterator for LazyFutureFlowStream<In, Out, InnerMat, F, Fut>
3500where
3501 In: Send + 'static,
3502 Out: Send + 'static,
3503 InnerMat: Send + 'static,
3504 F: Fn() -> Fut,
3505 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3506{
3507 type Item = StreamResult<Out>;
3508
3509 fn next(&mut self) -> Option<Self::Item> {
3510 if self.terminated {
3511 return None;
3512 }
3513 match self.initialize() {
3514 Ok(true) => {}
3515 Ok(false) => return None,
3516 Err(error) => {
3517 self.terminated = true;
3518 return Some(Err(error));
3519 }
3520 }
3521 match self
3522 .current
3523 .as_mut()
3524 .expect("lazy_future_flow current stream initialized")
3525 .next()
3526 {
3527 Some(Ok(item)) => Some(Ok(item)),
3528 Some(Err(error)) => {
3529 self.terminated = true;
3530 Some(Err(error))
3531 }
3532 None => {
3533 self.terminated = true;
3534 None
3535 }
3536 }
3537 }
3538}
3539
3540impl<In, Out, InnerMat, F, Fut> Drop for LazyFutureFlowStream<In, Out, InnerMat, F, Fut> {
3541 fn drop(&mut self) {
3542 if !self.initialized
3543 && let Some(sender) = self.mat_sender.take()
3544 {
3545 let _ = sender.send(Err(StreamError::Failed(
3546 "lazy flow was never materialized".into(),
3547 )));
3548 }
3549 }
3550}
3551
3552fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
3553where
3554 In: Send + 'static,
3555{
3556 let mut first = Some(first);
3557 Box::new(std::iter::from_fn(move || {
3558 if let Some(item) = first.take() {
3559 Some(Ok(item))
3560 } else {
3561 rest.next()
3562 }
3563 }))
3564}
3565
3566pub(super) fn poll_once_or_pending<Fut, T>(future: Fut) -> Result<StreamResult<T>, Pin<Box<Fut>>>
3567where
3568 Fut: Future<Output = StreamResult<T>>,
3569{
3570 let mut future = Box::pin(future);
3571 let _guard = stream_tokio_runtime().enter();
3572 let waker = noop_waker();
3573 let mut cx = Context::from_waker(&waker);
3574 match catch_unwind(AssertUnwindSafe(|| future.as_mut().poll(&mut cx))) {
3575 Ok(Poll::Ready(output)) => Ok(output),
3576 Ok(Poll::Pending) => Err(future),
3577 Err(_) => Ok(Err(StreamError::Failed("future task panicked".into()))),
3578 }
3579}
3580
3581pub(super) fn spawn_completion_task<Fut, T, Msg, Map>(
3582 task_id: usize,
3583 future: Pin<Box<Fut>>,
3584 sender: std::sync::mpsc::Sender<(usize, Msg)>,
3585 map: Map,
3586) -> AbortOnDropHandle<()>
3587where
3588 Fut: Future<Output = StreamResult<T>> + Send + 'static,
3589 T: Send + 'static,
3590 Msg: Send + 'static,
3591 Map: FnOnce(StreamResult<T>) -> Msg + Send + 'static,
3592{
3593 spawn_tokio_task(async move {
3594 let result = AssertUnwindSafe(future).catch_unwind().await;
3595 let message = match result {
3596 Ok(output) => map(output),
3597 Err(_) => map(Err(StreamError::Failed("future task panicked".into()))),
3598 };
3599 let _ = sender.send((task_id, message));
3600 })
3601}
3602
3603pub(super) fn recv_completion<Msg>(
3604 receiver: &std::sync::mpsc::Receiver<(usize, Msg)>,
3605) -> Option<(usize, Msg)> {
3606 let mut idle_spins = 0;
3607 loop {
3608 match receiver.try_recv() {
3609 Ok(message) => return Some(message),
3610 Err(std::sync::mpsc::TryRecvError::Disconnected) => return None,
3611 Err(std::sync::mpsc::TryRecvError::Empty) if idle_spins < STREAM_READY_SPINS => {
3612 idle_spins += STREAM_SPIN_BACKOFF;
3613 for _ in 0..STREAM_SPIN_BACKOFF {
3614 std::hint::spin_loop();
3615 }
3616 }
3617 Err(std::sync::mpsc::TryRecvError::Empty) => {
3618 idle_spins = 0;
3619 match receiver.recv_timeout(STREAM_MAX_PARK) {
3620 Ok(message) => return Some(message),
3621 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
3622 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => return None,
3623 }
3624 }
3625 }
3626 }
3627}
3628
3629pub(super) fn run_future_inline_or_spawn<Fut, T>(future: Fut) -> StreamResult<T>
3630where
3631 Fut: Future<Output = StreamResult<T>> + Send + 'static,
3632 T: Send + 'static,
3633{
3634 match poll_once_or_pending(future) {
3635 Ok(result) => result,
3636 Err(future) => {
3637 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<T>)>();
3638 let _task = spawn_completion_task(0, future, sender, |result| result);
3639 recv_completion(&receiver)
3640 .map(|(_, result)| result)
3641 .unwrap_or_else(|| Err(StreamError::Failed("future task dropped".into())))
3642 }
3643 }
3644}
3645
3646fn map_async_ordered<Out, Next, F, Fut>(
3647 mut input: BoxStream<Out>,
3648 parallelism: usize,
3649 stage: Arc<F>,
3650) -> BoxStream<Next>
3651where
3652 Out: Send + 'static,
3653 Next: Send + 'static,
3654 F: Fn(Out) -> Fut + Send + Sync + 'static,
3655 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
3656{
3657 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
3658 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
3659 let mut next_index = 0_usize;
3660 let mut next_to_emit = 0_usize;
3661 let mut completed = BTreeMap::new();
3662 let mut input_done = false;
3663
3664 Box::new(std::iter::from_fn(move || {
3665 loop {
3666 if let Some(result) = completed.remove(&next_to_emit) {
3667 next_to_emit += 1;
3668 return Some(result);
3669 }
3670
3671 while tasks.len() + completed.len() < parallelism && !input_done {
3672 match input.next() {
3673 Some(Ok(item)) => {
3674 let index = next_index;
3675 next_index += 1;
3676 match poll_once_or_pending(stage(item)) {
3677 Ok(result) => {
3678 if index == next_to_emit {
3679 next_to_emit += 1;
3680 return Some(result);
3681 }
3682 completed.insert(index, result);
3683 }
3684 Err(future) => {
3685 tasks.insert(
3686 index,
3687 spawn_completion_task(
3688 index,
3689 future,
3690 sender.clone(),
3691 |result| result,
3692 ),
3693 );
3694 }
3695 }
3696 }
3697 Some(Err(error)) => {
3698 completed.insert(next_index, Err(error));
3699 next_index += 1;
3700 input_done = true;
3701 }
3702 None => input_done = true,
3703 }
3704 }
3705
3706 if let Some(result) = completed.remove(&next_to_emit) {
3707 next_to_emit += 1;
3708 return Some(result);
3709 }
3710
3711 if tasks.is_empty() {
3712 return None;
3713 }
3714
3715 if let Some((index, result)) = recv_completion(&receiver) {
3716 tasks.remove(&index);
3717 if index == next_to_emit {
3718 next_to_emit += 1;
3719 return Some(result);
3720 }
3721 completed.insert(index, result);
3722 }
3723 }
3724 }))
3725}
3726
3727fn supervise_async_result<Next>(
3728 result: StreamResult<Next>,
3729 decider: &SupervisionDecider,
3730) -> Option<StreamResult<Next>> {
3731 match result {
3732 Ok(item) => Some(Ok(item)),
3733 Err(error) => match decide_supervision(decider, &error) {
3734 SupervisionDirective::Stop => Some(Err(error)),
3735 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
3736 },
3737 }
3738}
3739
3740fn map_async_ordered_supervised<Out, Next, F, Fut>(
3741 mut input: BoxStream<Out>,
3742 parallelism: usize,
3743 stage: Arc<F>,
3744 decider: SupervisionDecider,
3745) -> BoxStream<Next>
3746where
3747 Out: Send + 'static,
3748 Next: Send + 'static,
3749 F: Fn(Out) -> Fut + Send + Sync + 'static,
3750 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
3751{
3752 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
3753 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
3754 let mut next_index = 0_usize;
3755 let mut next_to_emit = 0_usize;
3756 let mut completed = BTreeMap::<usize, Option<StreamResult<Next>>>::new();
3757 let mut input_done = false;
3758
3759 Box::new(std::iter::from_fn(move || {
3760 loop {
3761 while let Some(result) = completed.remove(&next_to_emit) {
3762 next_to_emit += 1;
3763 if let Some(result) = result {
3764 return Some(result);
3765 }
3766 }
3767
3768 while tasks.len() + completed.len() < parallelism && !input_done {
3769 match input.next() {
3770 Some(Ok(item)) => {
3771 let index = next_index;
3772 next_index += 1;
3773 match catch_unwind(AssertUnwindSafe(|| poll_once_or_pending(stage(item)))) {
3774 Ok(Ok(result)) => {
3775 let result = supervise_async_result(result, &decider);
3776 if index == next_to_emit {
3777 next_to_emit += 1;
3778 if let Some(result) = result {
3779 return Some(result);
3780 }
3781 } else {
3782 completed.insert(index, result);
3783 }
3784 }
3785 Ok(Err(future)) => {
3786 tasks.insert(
3787 index,
3788 spawn_completion_task(
3789 index,
3790 future,
3791 sender.clone(),
3792 |result| result,
3793 ),
3794 );
3795 }
3796 Err(_) => {
3797 let error = panic_stream_error("map_async callback");
3798 let result = supervise_async_result(Err(error), &decider);
3799 if index == next_to_emit {
3800 next_to_emit += 1;
3801 if let Some(result) = result {
3802 return Some(result);
3803 }
3804 } else {
3805 completed.insert(index, result);
3806 }
3807 }
3808 }
3809 }
3810 Some(Err(error)) => {
3811 completed.insert(next_index, Some(Err(error)));
3812 next_index += 1;
3813 input_done = true;
3814 }
3815 None => input_done = true,
3816 }
3817 }
3818
3819 while let Some(result) = completed.remove(&next_to_emit) {
3820 next_to_emit += 1;
3821 if let Some(result) = result {
3822 return Some(result);
3823 }
3824 }
3825
3826 if tasks.is_empty() {
3827 return None;
3828 }
3829
3830 if let Some((index, result)) = recv_completion(&receiver) {
3831 tasks.remove(&index);
3832 let result = supervise_async_result(result, &decider);
3833 if index == next_to_emit {
3834 next_to_emit += 1;
3835 if let Some(result) = result {
3836 return Some(result);
3837 }
3838 } else {
3839 completed.insert(index, result);
3840 }
3841 }
3842 }
3843 }))
3844}
3845
3846fn concat_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
3847where
3848 Out: Send + 'static,
3849{
3850 let mut streams: VecDeque<_> = streams.into();
3851 let mut current = streams.pop_front();
3852 Box::new(std::iter::from_fn(move || {
3853 loop {
3854 match current.as_mut() {
3855 Some(stream) => match stream.next() {
3856 Some(item) => return Some(item),
3857 None => current = streams.pop_front(),
3858 },
3859 None => return None,
3860 }
3861 }
3862 }))
3863}
3864
3865fn concat_streams_lazy<Out, Mat>(
3866 initial: BoxStream<Out>,
3867 factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
3868 materializer: &Materializer,
3869) -> BoxStream<Out>
3870where
3871 Out: Send + 'static,
3872 Mat: Send + 'static,
3873{
3874 let mut current = Some(initial);
3875 let mut remaining: VecDeque<_> = factories.into();
3876 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
3877 Box::new(std::iter::from_fn(move || {
3878 loop {
3879 match current.as_mut() {
3880 Some(stream) => match stream.next() {
3881 Some(item) => return Some(item),
3882 None => {
3883 current = remaining.pop_front().map(|factory| {
3884 match factory.create(&materializer) {
3885 Ok((stream, _)) => stream,
3886 Err(error) => {
3887 Box::new(std::iter::once(Err(error))) as BoxStream<Out>
3888 }
3889 }
3890 });
3891 }
3892 },
3893 None => return None,
3894 }
3895 }
3896 }))
3897}
3898
3899fn or_else_stream<Out>(mut primary: BoxStream<Out>, mut secondary: BoxStream<Out>) -> BoxStream<Out>
3900where
3901 Out: Send + 'static,
3902{
3903 let mut primary_emitted = false;
3904 let mut using_secondary = false;
3905 Box::new(std::iter::from_fn(move || {
3906 loop {
3907 if using_secondary {
3908 return secondary.next();
3909 }
3910
3911 match primary.next() {
3912 Some(Ok(item)) => {
3913 primary_emitted = true;
3914 return Some(Ok(item));
3915 }
3916 Some(Err(error)) => return Some(Err(error)),
3917 None if primary_emitted => return None,
3918 None => using_secondary = true,
3919 }
3920 }
3921 }))
3922}
3923
3924fn interleave_streams<Out>(
3925 streams: Vec<BoxStream<Out>>,
3926 segment_size: usize,
3927 eager_close: bool,
3928) -> BoxStream<Out>
3929where
3930 Out: Send + 'static,
3931{
3932 if segment_size == 0 {
3933 return Box::new(std::iter::once(Err(StreamError::GraphValidation(
3934 "interleave segment size must be greater than zero".into(),
3935 ))));
3936 }
3937
3938 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
3939 let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
3940 let mut current = 0usize;
3941 let mut emitted = 0usize;
3942 Box::new(std::iter::from_fn(move || {
3943 loop {
3944 if streams.iter().all(Option::is_none) {
3945 return None;
3946 }
3947 if streams[current].is_none() {
3948 match next_active_stream(&streams, current) {
3949 Some(next) => {
3950 current = next;
3951 emitted = 0;
3952 }
3953 None => return None,
3954 }
3955 }
3956
3957 let Some(stream) = streams[current].as_mut() else {
3958 continue;
3959 };
3960 let next_item = pending[current].take().or_else(|| stream.next());
3961 match next_item {
3962 Some(Ok(item)) => {
3963 emitted += 1;
3964 if emitted == segment_size {
3965 emitted = 0;
3966 if let Some(next) = next_active_stream(&streams, current) {
3967 current = next;
3968 }
3969 }
3970 return Some(Ok(item));
3971 }
3972 Some(Err(error)) => return Some(Err(error)),
3973 None => {
3974 streams[current] = None;
3975 emitted = 0;
3976 if eager_close {
3977 return None;
3978 }
3979 match next_active_stream(&streams, current) {
3980 Some(next) => current = next,
3981 None => return None,
3982 }
3983 }
3984 }
3985 }
3986 }))
3987}
3988
3989fn next_active_stream<Out>(streams: &[Option<BoxStream<Out>>], current: usize) -> Option<usize>
3990where
3991 Out: Send + 'static,
3992{
3993 if streams.is_empty() {
3994 return None;
3995 }
3996 for offset in 1..=streams.len() {
3997 let index = (current + offset) % streams.len();
3998 if streams[index].is_some() {
3999 return Some(index);
4000 }
4001 }
4002 None
4003}
4004
4005fn materialize_side_sink<Out, Mat>(
4006 sink: &Sink<Out, Mat>,
4007 materializer: &Materializer,
4008 buffer: usize,
4009) -> StreamResult<(std::sync::mpsc::SyncSender<StreamResult<Out>>, Mat)>
4010where
4011 Out: Send + 'static,
4012 Mat: Send + 'static,
4013{
4014 let (sender, receiver) = std::sync::mpsc::sync_channel(buffer);
4015 let mat = sink.run(side_receiver_stream(receiver), materializer)?;
4016 Ok((sender, mat))
4017}
4018
4019fn side_receiver_stream<Out>(
4020 receiver: std::sync::mpsc::Receiver<StreamResult<Out>>,
4021) -> BoxStream<Out>
4022where
4023 Out: Send + 'static,
4024{
4025 Box::new(std::iter::from_fn(move || receiver.recv().ok()))
4026}
4027
4028#[derive(Clone)]
4029enum LiveSubstreamTerminal {
4030 Complete,
4031 Error(StreamError),
4032}
4033
4034const LIVE_SUBSTREAM_CAPACITY: usize = 256;
4035const LIVE_SUBSTREAM_BATCH: usize = 64;
4036const FLAT_MAP_MERGE_SUBSTREAM_WINDOW: usize = 64;
4037
4038struct LiveSubstreamShared<T> {
4039 state: Mutex<LiveSubstreamState<T>>,
4040 available: Condvar,
4041 cancelled: Arc<AtomicBool>,
4042 capacity: usize,
4043 batch_size: usize,
4044}
4045
4046struct LiveSubstreamState<T> {
4047 buffered: usize,
4048 batches: VecDeque<VecDeque<T>>,
4049 terminal: Option<LiveSubstreamTerminal>,
4050}
4051
4052impl<T> LiveSubstreamShared<T> {
4053 fn new() -> Arc<Self> {
4054 Self::with_capacity(LIVE_SUBSTREAM_CAPACITY)
4055 }
4056
4057 fn with_capacity(capacity: usize) -> Arc<Self> {
4058 Self::with_batching(capacity, LIVE_SUBSTREAM_BATCH)
4059 }
4060
4061 fn with_batching(capacity: usize, batch_size: usize) -> Arc<Self> {
4062 Arc::new(Self {
4063 state: Mutex::new(LiveSubstreamState {
4064 buffered: 0,
4065 batches: VecDeque::new(),
4066 terminal: None,
4067 }),
4068 available: Condvar::new(),
4069 cancelled: Arc::new(AtomicBool::new(false)),
4070 capacity,
4071 batch_size: batch_size.max(1),
4072 })
4073 }
4074}
4075
4076struct LiveSubstreamStream<T> {
4077 shared: Arc<LiveSubstreamShared<T>>,
4078 completion: Option<StreamCompletion<NotUsed>>,
4079 local_batch: VecDeque<T>,
4080}
4081
4082impl<T> Iterator for LiveSubstreamStream<T> {
4083 type Item = StreamResult<T>;
4084
4085 fn next(&mut self) -> Option<Self::Item> {
4086 if let Some(item) = self.local_batch.pop_front() {
4087 return Some(Ok(item));
4088 }
4089
4090 let mut state = self
4091 .shared
4092 .state
4093 .lock()
4094 .unwrap_or_else(|poison| poison.into_inner());
4095 loop {
4096 if let Some(mut batch) = state.batches.pop_front() {
4097 state.buffered -= batch.len();
4098 drop(state);
4099 self.shared.available.notify_all();
4100 let item = batch.pop_front().expect("live substream batch has an item");
4101 self.local_batch = batch;
4102 return Some(Ok(item));
4103 }
4104 if let Some(terminal) = state.terminal.clone() {
4105 return match terminal {
4106 LiveSubstreamTerminal::Complete => None,
4107 LiveSubstreamTerminal::Error(error) => Some(Err(error)),
4108 };
4109 }
4110 state = self
4111 .shared
4112 .available
4113 .wait(state)
4114 .unwrap_or_else(|poison| poison.into_inner());
4115 }
4116 }
4117}
4118
4119impl<T> Drop for LiveSubstreamStream<T> {
4120 fn drop(&mut self) {
4121 self.shared.cancelled.store(true, Ordering::SeqCst);
4122 self.shared.available.notify_all();
4123 let _ = self.completion.take();
4124 }
4125}
4126
4127fn push_live_substream_batch<T>(
4131 shared: &Arc<LiveSubstreamShared<T>>,
4132 batch: &mut VecDeque<T>,
4133) -> Result<(), ()> {
4134 while !batch.is_empty() {
4135 let mut state = shared
4136 .state
4137 .lock()
4138 .unwrap_or_else(|poison| poison.into_inner());
4139 while state.buffered >= shared.capacity && state.terminal.is_none() {
4140 if shared.cancelled.load(Ordering::SeqCst) {
4141 batch.clear();
4142 return Err(());
4143 }
4144 state = shared
4145 .available
4146 .wait(state)
4147 .unwrap_or_else(|poison| poison.into_inner());
4148 }
4149 if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
4150 batch.clear();
4151 return Err(());
4152 }
4153 let was_empty = state.buffered == 0;
4154 while state.buffered < shared.capacity && !batch.is_empty() {
4156 let item = batch.pop_front().expect("batch non-empty");
4157 if let Some(back) = state.batches.back_mut()
4158 && back.len() < shared.batch_size
4159 {
4160 back.push_back(item);
4161 } else {
4162 let mut new_batch =
4163 VecDeque::with_capacity(shared.batch_size.min(shared.capacity.max(1)));
4164 new_batch.push_back(item);
4165 state.batches.push_back(new_batch);
4166 }
4167 state.buffered += 1;
4168 }
4169 drop(state);
4170 if was_empty {
4171 shared.available.notify_all();
4172 }
4173 }
4174 Ok(())
4175}
4176
4177fn push_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>, item: T) -> Result<(), T> {
4178 let mut state = shared
4179 .state
4180 .lock()
4181 .unwrap_or_else(|poison| poison.into_inner());
4182 while state.buffered >= shared.capacity && state.terminal.is_none() {
4183 if shared.cancelled.load(Ordering::SeqCst) {
4184 return Err(item);
4185 }
4186 state = shared
4187 .available
4188 .wait(state)
4189 .unwrap_or_else(|poison| poison.into_inner());
4190 }
4191 if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
4192 return Err(item);
4193 }
4194 let was_empty = state.buffered == 0;
4195 if let Some(batch) = state.batches.back_mut()
4196 && batch.len() < shared.batch_size
4197 {
4198 batch.push_back(item);
4199 } else {
4200 let mut batch = VecDeque::with_capacity(shared.batch_size.min(shared.capacity.max(1)));
4201 batch.push_back(item);
4202 state.batches.push_back(batch);
4203 }
4204 state.buffered += 1;
4205 drop(state);
4206 if was_empty {
4207 shared.available.notify_all();
4208 }
4209 Ok(())
4210}
4211
4212fn complete_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>) {
4213 let mut state = shared
4214 .state
4215 .lock()
4216 .unwrap_or_else(|poison| poison.into_inner());
4217 if state.terminal.is_none() {
4218 state.terminal = Some(LiveSubstreamTerminal::Complete);
4219 }
4220 drop(state);
4221 shared.available.notify_all();
4222}
4223
4224fn fail_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>, error: StreamError) {
4225 let mut state = shared
4226 .state
4227 .lock()
4228 .unwrap_or_else(|poison| poison.into_inner());
4229 if state.terminal.is_none() {
4230 state.terminal = Some(LiveSubstreamTerminal::Error(error));
4231 }
4232 drop(state);
4233 shared.available.notify_all();
4234}
4235
4236fn cancel_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>) {
4237 fail_live_substream(shared, StreamError::Cancelled);
4238}
4239
4240fn source_from_live_substream<T>(shared: Arc<LiveSubstreamShared<T>>) -> Source<T>
4241where
4242 T: Send + 'static,
4243{
4244 let claimed = Arc::new(AtomicBool::new(false));
4245 Source::from_materialized_factory(move |_materializer| {
4246 if claimed.swap(true, Ordering::SeqCst) {
4247 return Err(StreamError::Failed(
4248 "substream source cannot be materialized more than once".into(),
4249 ));
4250 }
4251 Ok((
4252 Box::new(LiveSubstreamStream {
4253 shared: Arc::clone(&shared),
4254 completion: None,
4255 local_batch: VecDeque::new(),
4256 }) as BoxStream<T>,
4257 NotUsed,
4258 ))
4259 })
4260}
4261
4262fn source_from_once_stream<T>(stream: BoxStream<T>) -> Source<T>
4263where
4264 T: Send + 'static,
4265{
4266 let stream = Arc::new(Mutex::new(Some(stream)));
4267 Source::from_materialized_factory(move |_materializer| {
4268 let mut slot = stream.lock().unwrap_or_else(|poison| poison.into_inner());
4269 let stream = slot.take().ok_or_else(|| {
4270 StreamError::Failed("substream source cannot be materialized more than once".into())
4271 })?;
4272 Ok((stream, NotUsed))
4273 })
4274}
4275
4276fn prefix_and_tail_stream<Out>(
4277 input: BoxStream<Out>,
4278 n: usize,
4279) -> BoxStream<(Vec<Out>, Source<Out>)>
4280where
4281 Out: Send + 'static,
4282{
4283 let mut input = Some(input);
4284 let mut emitted = false;
4285 Box::new(std::iter::from_fn(move || {
4286 if emitted {
4287 return None;
4288 }
4289 emitted = true;
4290
4291 let mut prefix = Vec::with_capacity(n);
4292 while prefix.len() < n {
4293 match input
4294 .as_mut()
4295 .expect("prefix_and_tail input available")
4296 .next()
4297 {
4298 Some(Ok(item)) => prefix.push(item),
4299 Some(Err(error)) => return Some(Err(error)),
4300 None => return Some(Ok((prefix, Source::empty()))),
4301 }
4302 }
4303
4304 Some(Ok((
4305 prefix,
4306 source_from_once_stream(input.take().expect("tail input available")),
4307 )))
4308 }))
4309}
4310
4311struct GroupByWorkerGuard<Key, Out> {
4312 outer: Arc<LiveSubstreamShared<Source<Out>>>,
4313 active: HashMap<Key, Arc<LiveSubstreamShared<Out>>>,
4314 closed: HashSet<Key>,
4315 armed: bool,
4316}
4317
4318impl<Key, Out> GroupByWorkerGuard<Key, Out>
4319where
4320 Key: Eq + Hash,
4321{
4322 fn new(outer: Arc<LiveSubstreamShared<Source<Out>>>) -> Self {
4323 Self {
4324 outer,
4325 active: HashMap::new(),
4326 closed: HashSet::new(),
4327 armed: true,
4328 }
4329 }
4330
4331 fn disarm(&mut self) {
4332 self.armed = false;
4333 }
4334
4335 fn fail_all(&self, error: StreamError)
4336 where
4337 Out: Send + 'static,
4338 {
4339 fail_live_substream(&self.outer, error.clone());
4340 for substream in self.active.values() {
4341 fail_live_substream(substream, error.clone());
4342 }
4343 }
4344
4345 fn complete_all(&self)
4346 where
4347 Out: Send + 'static,
4348 {
4349 complete_live_substream(&self.outer);
4350 for substream in self.active.values() {
4351 complete_live_substream(substream);
4352 }
4353 }
4354
4355 fn cancel_all(&self)
4356 where
4357 Out: Send + 'static,
4358 {
4359 for substream in self.active.values() {
4360 cancel_live_substream(substream);
4361 }
4362 }
4363}
4364
4365impl<Key, Out> Drop for GroupByWorkerGuard<Key, Out> {
4366 fn drop(&mut self) {
4367 if self.armed {
4368 fail_live_substream(&self.outer, StreamError::AbruptTermination);
4369 for substream in self.active.values() {
4370 fail_live_substream(substream, StreamError::AbruptTermination);
4371 }
4372 }
4373 }
4374}
4375
4376fn group_by_flush_write_batch<Key, Out>(
4377 guard: &mut GroupByWorkerGuard<Key, Out>,
4378 wb_key: &mut Option<Key>,
4379 wb_sub: &mut Option<Arc<LiveSubstreamShared<Out>>>,
4380 wb_items: &mut VecDeque<Out>,
4381 allow_closed_substream_recreation: bool,
4382) where
4383 Key: Clone + Eq + Hash,
4384 Out: Send + 'static,
4385{
4386 if wb_items.is_empty() {
4387 return;
4388 }
4389 let key = wb_key.take().expect("wb_key set when wb_items non-empty");
4390 if let Some(ref sub) = *wb_sub {
4391 if push_live_substream_batch(sub, wb_items).is_err() {
4392 guard.active.remove(&key);
4393 if !allow_closed_substream_recreation {
4394 guard.closed.insert(key);
4395 }
4396 }
4397 } else {
4398 wb_items.clear();
4399 }
4400 *wb_sub = None;
4401}
4402
4403fn group_by_stream<Out, Key, F>(
4404 mut input: BoxStream<Out>,
4405 max_substreams: usize,
4406 allow_closed_substream_recreation: bool,
4407 key_fn: Arc<F>,
4408 batch_mode: GroupByBatchMode,
4409 materializer: &Materializer,
4410) -> BoxStream<Source<Out>>
4411where
4412 Out: Clone + Send + 'static,
4413 Key: Clone + Eq + Hash + Send + 'static,
4414 F: Fn(&Out) -> Key + Send + Sync + 'static,
4415{
4416 let outer = LiveSubstreamShared::new();
4417 let worker_outer = Arc::clone(&outer);
4418 let batch_repeated_keys = batch_mode == GroupByBatchMode::FiniteEagerNoRecreate;
4419 let completion = materializer.spawn_stream(move |cancelled| {
4420 let mut guard = GroupByWorkerGuard::new(worker_outer);
4421
4422 let mut wb_key: Option<Key> = None;
4427 let mut wb_sub: Option<Arc<LiveSubstreamShared<Out>>> = None;
4428 let mut wb_items: VecDeque<Out> = VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH);
4429
4430 while !cancelled.load(Ordering::SeqCst) {
4431 if guard.outer.cancelled.load(Ordering::SeqCst) {
4432 guard.disarm();
4433 return Ok(NotUsed);
4434 }
4435
4436 match input.next() {
4437 Some(Ok(item)) => {
4438 let key = match catch_unwind(AssertUnwindSafe(|| key_fn(&item))) {
4439 Ok(key) => key,
4440 Err(_panic) => {
4441 wb_items.clear();
4442 guard.fail_all(StreamError::AbruptTermination);
4443 guard.disarm();
4444 return Ok(NotUsed);
4445 }
4446 };
4447
4448 if let Some(current) = guard.active.get(&key)
4452 && current.cancelled.load(Ordering::SeqCst)
4453 {
4454 if wb_key.as_ref() == Some(&key) {
4457 wb_items.clear();
4458 wb_key = None;
4459 wb_sub = None;
4460 }
4461 guard.active.remove(&key);
4462 if !allow_closed_substream_recreation {
4463 guard.closed.insert(key.clone());
4464 }
4465 }
4466
4467 let mut item = item;
4468 if let Some(current) = guard.active.get(&key).cloned() {
4469 if !batch_repeated_keys {
4470 item = match push_live_substream(¤t, item) {
4471 Ok(()) => {
4472 continue;
4473 }
4474 Err(item) => item,
4475 };
4476 guard.active.remove(&key);
4477 if !allow_closed_substream_recreation {
4478 guard.closed.insert(key.clone());
4479 continue;
4480 }
4481 } else {
4482 if wb_key.as_ref() != Some(&key) {
4486 group_by_flush_write_batch(
4487 &mut guard,
4488 &mut wb_key,
4489 &mut wb_sub,
4490 &mut wb_items,
4491 allow_closed_substream_recreation,
4492 );
4493 wb_key = Some(key.clone());
4494 wb_sub = Some(current);
4495 }
4496 wb_items.push_back(item);
4497 if wb_items.len() >= LIVE_SUBSTREAM_BATCH {
4498 group_by_flush_write_batch(
4499 &mut guard,
4500 &mut wb_key,
4501 &mut wb_sub,
4502 &mut wb_items,
4503 allow_closed_substream_recreation,
4504 );
4505 }
4506 continue;
4507 }
4508 }
4509
4510 if !wb_items.is_empty() {
4513 group_by_flush_write_batch(
4514 &mut guard,
4515 &mut wb_key,
4516 &mut wb_sub,
4517 &mut wb_items,
4518 allow_closed_substream_recreation,
4519 );
4520 }
4521
4522 if guard.closed.contains(&key) {
4523 continue;
4524 }
4525
4526 if guard.active.len() + guard.closed.len() == max_substreams {
4527 let error = StreamError::Failed(format!(
4528 "group_by reached max_substreams ({max_substreams})"
4529 ));
4530 guard.fail_all(error.clone());
4531 guard.disarm();
4532 return Err(error);
4533 }
4534
4535 let substream = LiveSubstreamShared::with_capacity(LIVE_SUBSTREAM_CAPACITY);
4536 push_live_substream(&substream, item)
4537 .unwrap_or_else(|_| unreachable!("fresh group_by substream"));
4538 guard.active.insert(key.clone(), Arc::clone(&substream));
4539 if push_live_substream(
4540 &guard.outer,
4541 source_from_live_substream(Arc::clone(&substream)),
4542 )
4543 .is_err()
4544 {
4545 guard.cancel_all();
4546 cancel_live_substream(&substream);
4547 guard.disarm();
4548 return Ok(NotUsed);
4549 }
4550 }
4551 Some(Err(error)) => {
4552 wb_items.clear();
4553 guard.fail_all(error.clone());
4554 guard.disarm();
4555 return Err(error);
4556 }
4557 None => {
4558 group_by_flush_write_batch(
4559 &mut guard,
4560 &mut wb_key,
4561 &mut wb_sub,
4562 &mut wb_items,
4563 allow_closed_substream_recreation,
4564 );
4565 guard.complete_all();
4566 guard.disarm();
4567 return Ok(NotUsed);
4568 }
4569 }
4570 }
4571
4572 group_by_flush_write_batch(
4573 &mut guard,
4574 &mut wb_key,
4575 &mut wb_sub,
4576 &mut wb_items,
4577 allow_closed_substream_recreation,
4578 );
4579 guard.complete_all();
4580 guard.disarm();
4581 Ok(NotUsed)
4582 });
4583
4584 Box::new(LiveSubstreamStream {
4585 shared: outer,
4586 completion: Some(completion),
4587 local_batch: VecDeque::new(),
4588 })
4589}
4590
4591#[derive(Clone, Copy, Debug)]
4592enum SplitMode {
4593 When,
4594 After,
4595}
4596
4597#[cfg(test)]
4598struct SplitWorkerGuard<Out> {
4599 outer: Arc<LiveSubstreamShared<Source<Out>>>,
4600 current: Option<Arc<LiveSubstreamShared<Out>>>,
4601 armed: bool,
4602 pending: VecDeque<Out>,
4606}
4607
4608#[cfg(test)]
4609impl<Out> SplitWorkerGuard<Out> {
4610 fn new(outer: Arc<LiveSubstreamShared<Source<Out>>>) -> Self {
4611 Self {
4612 outer,
4613 current: None,
4614 armed: true,
4615 pending: VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH),
4616 }
4617 }
4618
4619 fn disarm(&mut self) {
4620 self.armed = false;
4621 }
4622
4623 fn open_segment(&mut self) -> Result<(), ()>
4626 where
4627 Out: Send + 'static,
4628 {
4629 let substream = LiveSubstreamShared::new();
4630 self.current = Some(Arc::clone(&substream));
4631 push_live_substream(&self.outer, source_from_live_substream(substream)).map_err(|_| ())
4632 }
4633
4634 fn flush_pending(&mut self) -> Result<(), ()>
4637 where
4638 Out: Send + 'static,
4639 {
4640 if self.pending.is_empty() {
4641 return Ok(());
4642 }
4643 match self.current {
4644 Some(ref current) => push_live_substream_batch(current, &mut self.pending),
4645 None => {
4646 self.pending.clear();
4647 Ok(())
4648 }
4649 }
4650 }
4651
4652 fn push_item(&mut self, item: Out) -> Result<(), ()>
4656 where
4657 Out: Send + 'static,
4658 {
4659 if self.current.is_none() {
4660 return Ok(());
4662 }
4663 self.pending.push_back(item);
4664 if self.pending.len() >= LIVE_SUBSTREAM_BATCH {
4665 self.flush_pending()
4666 } else {
4667 Ok(())
4668 }
4669 }
4670
4671 fn close_segment(&mut self)
4674 where
4675 Out: Send + 'static,
4676 {
4677 let _ = self.flush_pending();
4678 if let Some(current) = self.current.take() {
4679 complete_live_substream(¤t);
4680 }
4681 }
4682
4683 fn fail_current(&mut self, error: StreamError)
4684 where
4685 Out: Send + 'static,
4686 {
4687 self.pending.clear();
4688 if let Some(current) = self.current.take() {
4689 fail_live_substream(¤t, error);
4690 }
4691 }
4692
4693 fn fail_all(&mut self, error: StreamError)
4694 where
4695 Out: Send + 'static,
4696 {
4697 self.fail_current(error.clone());
4698 fail_live_substream(&self.outer, error);
4699 }
4700
4701 fn complete_all(&mut self)
4702 where
4703 Out: Send + 'static,
4704 {
4705 self.close_segment();
4706 complete_live_substream(&self.outer);
4707 }
4708}
4709
4710#[cfg(test)]
4711impl<Out> Drop for SplitWorkerGuard<Out> {
4712 fn drop(&mut self) {
4713 if self.armed {
4714 self.pending.clear();
4715 if let Some(current) = self.current.take() {
4716 fail_live_substream(¤t, StreamError::AbruptTermination);
4717 }
4718 fail_live_substream(&self.outer, StreamError::AbruptTermination);
4719 }
4720 }
4721}
4722
4723fn split_streams<Out, F>(
4724 input: BoxStream<Out>,
4725 mode: SplitMode,
4726 predicate: Arc<F>,
4727 materializer: &Materializer,
4728) -> BoxStream<Source<Out>>
4729where
4730 Out: Clone + Send + 'static,
4731 F: Fn(&Out) -> bool + Send + Sync + 'static,
4732{
4733 #[cfg(test)]
4734 if current_substream_mode() == SubstreamExecutorMode::LegacyOnly {
4735 return split_streams_legacy(input, mode, predicate, materializer);
4736 }
4737 let parent_cancelled = Arc::new(AtomicBool::new(false));
4738 split_streams_fast(input, mode, predicate, parent_cancelled, materializer)
4739}
4740
4741#[cfg(test)]
4742fn split_streams_legacy<Out, F>(
4743 mut input: BoxStream<Out>,
4744 mode: SplitMode,
4745 predicate: Arc<F>,
4746 materializer: &Materializer,
4747) -> BoxStream<Source<Out>>
4748where
4749 Out: Clone + Send + 'static,
4750 F: Fn(&Out) -> bool + Send + Sync + 'static,
4751{
4752 let outer = LiveSubstreamShared::new();
4753 let worker_outer = Arc::clone(&outer);
4754 let completion = materializer.spawn_stream(move |cancelled| {
4755 let mut guard = SplitWorkerGuard::new(Arc::clone(&worker_outer));
4756
4757 while !cancelled.load(Ordering::SeqCst) {
4758 if worker_outer.cancelled.load(Ordering::SeqCst) {
4759 guard.disarm();
4760 return Ok(NotUsed);
4761 }
4762
4763 match input.next() {
4764 Some(Ok(item)) => {
4765 let split = match catch_unwind(AssertUnwindSafe(|| predicate(&item))) {
4766 Ok(split) => split,
4767 Err(_panic) => {
4768 guard.fail_all(StreamError::AbruptTermination);
4769 guard.disarm();
4770 return Ok(NotUsed);
4771 }
4772 };
4773
4774 match mode {
4775 SplitMode::When => {
4776 if split && guard.current.is_some() {
4777 guard.close_segment();
4779 }
4780 if guard.current.is_none() && guard.open_segment().is_err() {
4782 guard.disarm();
4783 return Ok(NotUsed);
4784 }
4785 if guard.push_item(item).is_err() {
4786 guard.disarm();
4787 return Ok(NotUsed);
4788 }
4789 }
4790 SplitMode::After => {
4791 if guard.current.is_none() && guard.open_segment().is_err() {
4792 guard.disarm();
4793 return Ok(NotUsed);
4794 }
4795 if guard.push_item(item).is_err() {
4796 guard.disarm();
4797 return Ok(NotUsed);
4798 }
4799 if split {
4800 guard.close_segment();
4801 }
4802 }
4803 }
4804 }
4805 Some(Err(error)) => {
4806 guard.fail_all(error.clone());
4807 guard.disarm();
4808 return Err(error);
4809 }
4810 None => {
4811 guard.complete_all();
4812 guard.disarm();
4813 return Ok(NotUsed);
4814 }
4815 }
4816 }
4817
4818 guard.complete_all();
4819 guard.disarm();
4820 Ok(NotUsed)
4821 });
4822
4823 Box::new(LiveSubstreamStream {
4824 shared: outer,
4825 completion: Some(completion),
4826 local_batch: VecDeque::new(),
4827 })
4828}
4829
4830trait SplitConsumer<T: Send + 'static>: Send + 'static {
4835 fn push_item(&mut self, item: T) -> StreamResult<()>;
4836 fn complete(self: Box<Self>);
4837 fn fail(self: Box<Self>, error: StreamError);
4838}
4839
4840struct FoldConsumer<T, Acc> {
4841 acc: Option<Acc>,
4842 f: Arc<dyn Fn(Acc, T) -> Acc + Send + Sync + 'static>,
4843 tx: futures::channel::oneshot::Sender<StreamResult<Acc>>,
4844}
4845
4846impl<T: Send + 'static, Acc: Send + 'static> SplitConsumer<T> for FoldConsumer<T, Acc> {
4847 fn push_item(&mut self, item: T) -> StreamResult<()> {
4848 let acc = self.acc.take().expect("FoldConsumer: push after done");
4849 self.acc = Some((self.f)(acc, item));
4850 Ok(())
4851 }
4852 fn complete(mut self: Box<Self>) {
4853 let acc = self
4854 .acc
4855 .take()
4856 .expect("FoldConsumer: complete called twice");
4857 let _ = self.tx.send(Ok(acc));
4858 }
4859 fn fail(mut self: Box<Self>, error: StreamError) {
4860 self.acc = None;
4861 let _ = self.tx.send(Err(error));
4862 }
4863}
4864
4865struct FoldResultConsumer<T, Acc> {
4866 acc: Option<Acc>,
4867 f: Arc<dyn Fn(Acc, T) -> StreamResult<Acc> + Send + Sync + 'static>,
4868 tx: futures::channel::oneshot::Sender<StreamResult<Acc>>,
4869}
4870
4871impl<T: Send + 'static, Acc: Send + 'static> SplitConsumer<T> for FoldResultConsumer<T, Acc> {
4872 fn push_item(&mut self, item: T) -> StreamResult<()> {
4873 let acc = self
4874 .acc
4875 .take()
4876 .expect("FoldResultConsumer: push after done");
4877 match (self.f)(acc, item) {
4878 Ok(new_acc) => {
4879 self.acc = Some(new_acc);
4880 Ok(())
4881 }
4882 Err(e) => Err(e),
4883 }
4884 }
4885 fn complete(mut self: Box<Self>) {
4886 let acc = self
4887 .acc
4888 .take()
4889 .expect("FoldResultConsumer: complete called twice");
4890 let _ = self.tx.send(Ok(acc));
4891 }
4892 fn fail(mut self: Box<Self>, error: StreamError) {
4893 self.acc = None;
4894 let _ = self.tx.send(Err(error));
4895 }
4896}
4897
4898struct CollectConsumer<T> {
4899 items: Vec<T>,
4900 tx: futures::channel::oneshot::Sender<StreamResult<Vec<T>>>,
4901}
4902
4903impl<T: Send + 'static> SplitConsumer<T> for CollectConsumer<T> {
4904 fn push_item(&mut self, item: T) -> StreamResult<()> {
4905 self.items.push(item);
4906 Ok(())
4907 }
4908 fn complete(self: Box<Self>) {
4909 let _ = self.tx.send(Ok(self.items));
4910 }
4911 fn fail(self: Box<Self>, error: StreamError) {
4912 let _ = self.tx.send(Err(error));
4913 }
4914}
4915
4916struct IgnoreConsumer<T> {
4917 tx: futures::channel::oneshot::Sender<StreamResult<NotUsed>>,
4918 _phantom: std::marker::PhantomData<fn(T)>,
4919}
4920
4921impl<T: Send + 'static> SplitConsumer<T> for IgnoreConsumer<T> {
4922 fn push_item(&mut self, _item: T) -> StreamResult<()> {
4923 Ok(())
4924 }
4925 fn complete(self: Box<Self>) {
4926 let _ = self.tx.send(Ok(NotUsed));
4927 }
4928 fn fail(self: Box<Self>, error: StreamError) {
4929 let _ = self.tx.send(Err(error));
4930 }
4931}
4932
4933pub(super) struct FoldDescriptor<T, Acc> {
4936 pub(super) zero: Acc,
4937 pub(super) f: Arc<dyn Fn(Acc, T) -> Acc + Send + Sync + 'static>,
4938}
4939
4940impl<T: Send + 'static, Acc: Clone + Send + Sync + 'static> FoldFastPathDyn<T>
4941 for FoldDescriptor<T, Acc>
4942{
4943 fn try_register(
4944 &self,
4945 hook: Arc<dyn SplitSegmentHookDyn>,
4946 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
4947 let hook_any = hook.as_any_arc();
4948 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
4949 if slot.claimed.swap(true, Ordering::SeqCst) {
4950 return Some(Err(StreamError::Failed(
4951 "substream source cannot be materialized more than once".into(),
4952 )));
4953 }
4954 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Acc>>();
4955 {
4956 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
4957
4958 if let Some(terminal) = state.terminal.take() {
4960 let buffer = std::mem::take(&mut state.buffer);
4961 state.consumer = SegmentConsumer::DirectTaken;
4962 drop(state);
4963 let mut acc = self.zero.clone();
4965 for item in buffer {
4966 acc = (self.f)(acc, item);
4967 }
4968 let result = match terminal {
4969 LiveSubstreamTerminal::Complete => Ok(acc),
4970 LiveSubstreamTerminal::Error(e) => Err(e),
4971 };
4972 let _ = tx.send(result);
4973 let completion = StreamCompletion::from_receiver(rx, None);
4974 return Some(Ok(Box::new(completion)));
4975 }
4976
4977 let consumer: Box<dyn SplitConsumer<T>> = Box::new(FoldConsumer {
4979 acc: Some(self.zero.clone()),
4980 f: Arc::clone(&self.f),
4981 tx,
4982 });
4983 state.consumer = SegmentConsumer::Direct(consumer);
4984 }
4985 slot.available.notify_all();
4986 let completion = StreamCompletion::from_receiver(rx, None);
4987 Some(Ok(Box::new(completion)))
4988 }
4989}
4990
4991pub(super) struct FoldResultDescriptor<T, Acc> {
4992 pub(super) zero: Acc,
4993 pub(super) f: Arc<dyn Fn(Acc, T) -> StreamResult<Acc> + Send + Sync + 'static>,
4994}
4995
4996impl<T: Send + 'static, Acc: Clone + Send + Sync + 'static> FoldFastPathDyn<T>
4997 for FoldResultDescriptor<T, Acc>
4998{
4999 fn try_register(
5000 &self,
5001 hook: Arc<dyn SplitSegmentHookDyn>,
5002 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5003 let hook_any = hook.as_any_arc();
5004 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
5005 if slot.claimed.swap(true, Ordering::SeqCst) {
5006 return Some(Err(StreamError::Failed(
5007 "substream source cannot be materialized more than once".into(),
5008 )));
5009 }
5010 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Acc>>();
5011 {
5012 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5013
5014 if let Some(terminal) = state.terminal.take() {
5016 let buffer = std::mem::take(&mut state.buffer);
5017 state.consumer = SegmentConsumer::DirectTaken;
5018 drop(state);
5019 let acc = self.zero.clone();
5020 let result = match terminal {
5021 LiveSubstreamTerminal::Complete => buffer
5022 .into_iter()
5023 .try_fold(acc, |a, item| (self.f)(a, item)),
5024 LiveSubstreamTerminal::Error(e) => Err(e),
5025 };
5026 let _ = tx.send(result);
5027 let completion = StreamCompletion::from_receiver(rx, None);
5028 return Some(Ok(Box::new(completion)));
5029 }
5030
5031 let consumer: Box<dyn SplitConsumer<T>> = Box::new(FoldResultConsumer {
5033 acc: Some(self.zero.clone()),
5034 f: Arc::clone(&self.f),
5035 tx,
5036 });
5037 state.consumer = SegmentConsumer::Direct(consumer);
5038 }
5039 slot.available.notify_all();
5040 let completion = StreamCompletion::from_receiver(rx, None);
5041 Some(Ok(Box::new(completion)))
5042 }
5043}
5044
5045pub(super) struct CollectDescriptor<T> {
5046 pub(super) _phantom: std::marker::PhantomData<fn(T)>,
5047}
5048
5049impl<T: Send + 'static> FoldFastPathDyn<T> for CollectDescriptor<T> {
5050 fn try_register(
5051 &self,
5052 hook: Arc<dyn SplitSegmentHookDyn>,
5053 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5054 let hook_any = hook.as_any_arc();
5055 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
5056 if slot.claimed.swap(true, Ordering::SeqCst) {
5057 return Some(Err(StreamError::Failed(
5058 "substream source cannot be materialized more than once".into(),
5059 )));
5060 }
5061 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Vec<T>>>();
5062 {
5063 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5064
5065 if let Some(terminal) = state.terminal.take() {
5067 let buffer = std::mem::take(&mut state.buffer);
5068 state.consumer = SegmentConsumer::DirectTaken;
5069 drop(state);
5070 let items: Vec<T> = buffer.into_iter().collect();
5071 let result = match terminal {
5072 LiveSubstreamTerminal::Complete => Ok(items),
5073 LiveSubstreamTerminal::Error(e) => Err(e),
5074 };
5075 let _ = tx.send(result);
5076 let completion = StreamCompletion::from_receiver(rx, None);
5077 return Some(Ok(Box::new(completion)));
5078 }
5079
5080 let consumer: Box<dyn SplitConsumer<T>> = Box::new(CollectConsumer {
5082 items: Vec::new(),
5083 tx,
5084 });
5085 state.consumer = SegmentConsumer::Direct(consumer);
5086 }
5087 slot.available.notify_all();
5088 let completion = StreamCompletion::from_receiver(rx, None);
5089 Some(Ok(Box::new(completion)))
5090 }
5091}
5092
5093pub(super) struct IgnoreDescriptor<T> {
5094 pub(super) _phantom: std::marker::PhantomData<fn(T)>,
5095}
5096
5097impl<T: Send + 'static> FoldFastPathDyn<T> for IgnoreDescriptor<T> {
5098 fn try_register(
5099 &self,
5100 hook: Arc<dyn SplitSegmentHookDyn>,
5101 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5102 let hook_any = hook.as_any_arc();
5103 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
5104 if slot.claimed.swap(true, Ordering::SeqCst) {
5105 return Some(Err(StreamError::Failed(
5106 "substream source cannot be materialized more than once".into(),
5107 )));
5108 }
5109 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<NotUsed>>();
5110 {
5111 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5112
5113 if let Some(terminal) = state.terminal.take() {
5115 state.consumer = SegmentConsumer::DirectTaken;
5116 drop(state);
5117 let result = match terminal {
5118 LiveSubstreamTerminal::Complete => Ok(NotUsed),
5119 LiveSubstreamTerminal::Error(e) => Err(e),
5120 };
5121 let _ = tx.send(result);
5122 let completion = StreamCompletion::from_receiver(rx, None);
5123 return Some(Ok(Box::new(completion)));
5124 }
5125
5126 let consumer: Box<dyn SplitConsumer<T>> = Box::new(IgnoreConsumer {
5128 tx,
5129 _phantom: std::marker::PhantomData,
5130 });
5131 state.consumer = SegmentConsumer::Direct(consumer);
5132 }
5133 slot.available.notify_all();
5134 let completion = StreamCompletion::from_receiver(rx, None);
5135 Some(Ok(Box::new(completion)))
5136 }
5137}
5138
5139enum SegmentConsumer<T: Send + 'static> {
5142 Pending,
5143 Direct(Box<dyn SplitConsumer<T>>),
5144 DirectTaken,
5145 Fallback,
5146}
5147
5148struct SegmentSlotState<T: Send + 'static> {
5149 buffer: VecDeque<T>,
5150 consumer: SegmentConsumer<T>,
5151 terminal: Option<LiveSubstreamTerminal>,
5152}
5153
5154pub(super) struct SegmentConsumerSlot<T: Send + 'static> {
5155 claimed: Arc<AtomicBool>,
5156 state: Mutex<SegmentSlotState<T>>,
5157 available: Condvar,
5158 parent_cancelled: Arc<AtomicBool>,
5159}
5160
5161impl<T: Clone + Send + 'static> SegmentConsumerSlot<T> {
5162 fn new(parent_cancelled: Arc<AtomicBool>) -> Arc<Self> {
5163 Arc::new(Self {
5164 claimed: Arc::new(AtomicBool::new(false)),
5165 state: Mutex::new(SegmentSlotState {
5166 buffer: VecDeque::new(),
5167 consumer: SegmentConsumer::Pending,
5168 terminal: None,
5169 }),
5170 available: Condvar::new(),
5171 parent_cancelled,
5172 })
5173 }
5174}
5175
5176impl<T: Clone + Send + 'static> SplitSegmentHookDyn for SegmentConsumerSlot<T> {
5177 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
5178 self
5179 }
5180}
5181
5182impl<T: Clone + Send + 'static> SourceFactory<T, NotUsed> for SegmentConsumerSlot<T> {
5183 fn create(
5184 self: Arc<Self>,
5185 _materializer: &Materializer,
5186 ) -> StreamResult<(BoxStream<T>, NotUsed)> {
5187 if self.claimed.swap(true, Ordering::SeqCst) {
5188 return Err(StreamError::Failed(
5189 "substream source cannot be materialized more than once".into(),
5190 ));
5191 }
5192 {
5193 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
5194 state.consumer = SegmentConsumer::Fallback;
5195 }
5196 self.available.notify_all();
5197 let stream = FallbackSegmentStream {
5198 slot: Arc::clone(&self),
5199 };
5200 Ok((Box::new(stream), NotUsed))
5201 }
5202}
5203
5204struct FallbackSegmentStream<T: Send + 'static> {
5205 slot: Arc<SegmentConsumerSlot<T>>,
5206}
5207
5208impl<T: Clone + Send + 'static> Iterator for FallbackSegmentStream<T> {
5209 type Item = StreamResult<T>;
5210
5211 fn next(&mut self) -> Option<Self::Item> {
5212 let mut state = self.slot.state.lock().unwrap_or_else(|p| p.into_inner());
5213 loop {
5214 if let Some(item) = state.buffer.pop_front() {
5215 drop(state);
5216 self.slot.available.notify_all();
5217 return Some(Ok(item));
5218 }
5219 if let Some(terminal) = &state.terminal {
5220 return match terminal {
5221 LiveSubstreamTerminal::Complete => None,
5222 LiveSubstreamTerminal::Error(e) => Some(Err(e.clone())),
5223 };
5224 }
5225 if self.slot.parent_cancelled.load(Ordering::SeqCst) {
5226 return Some(Err(StreamError::AbruptTermination));
5227 }
5228 state = self
5229 .slot
5230 .available
5231 .wait(state)
5232 .unwrap_or_else(|p| p.into_inner());
5233 }
5234 }
5235}
5236
5237impl<T: Send + 'static> Drop for FallbackSegmentStream<T> {
5238 fn drop(&mut self) {
5239 let mut state = self.slot.state.lock().unwrap_or_else(|p| p.into_inner());
5240 if state.terminal.is_none() {
5241 state.terminal = Some(LiveSubstreamTerminal::Error(StreamError::Cancelled));
5242 }
5243 drop(state);
5244 self.slot.available.notify_all();
5245 }
5246}
5247
5248struct SplitFastWorkerGuard<Out: Send + 'static> {
5251 outer: Arc<LiveSubstreamShared<Source<Out>>>,
5252 current_slot: Option<Arc<SegmentConsumerSlot<Out>>>,
5253 current_consumer: Option<Box<dyn SplitConsumer<Out>>>,
5254 armed: bool,
5255 parent_cancelled: Arc<AtomicBool>,
5256 local_pending: VecDeque<Out>,
5257}
5258
5259impl<Out: Clone + Send + 'static> SplitFastWorkerGuard<Out> {
5260 fn new(
5261 outer: Arc<LiveSubstreamShared<Source<Out>>>,
5262 parent_cancelled: Arc<AtomicBool>,
5263 ) -> Self {
5264 Self {
5265 outer,
5266 current_slot: None,
5267 current_consumer: None,
5268 armed: true,
5269 parent_cancelled,
5270 local_pending: VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH),
5271 }
5272 }
5273
5274 fn disarm(&mut self) {
5275 self.armed = false;
5276 }
5277
5278 fn open_segment(&mut self) -> Result<(), ()> {
5279 let slot = SegmentConsumerSlot::new(Arc::clone(&self.parent_cancelled));
5280 let factory: Arc<dyn SourceFactory<Out, NotUsed>> =
5281 Arc::clone(&slot) as Arc<dyn SourceFactory<Out, NotUsed>>;
5282 let hook: Arc<dyn SplitSegmentHookDyn> = Arc::clone(&slot) as Arc<dyn SplitSegmentHookDyn>;
5283 let source = Source {
5284 factory,
5285 hints: SourceHints::default(),
5286 attributes: Attributes::default(),
5287 split_hook: Some(hook),
5288 };
5289 self.current_slot = Some(slot);
5290 push_live_substream(&self.outer, source).map_err(|_| ())
5291 }
5292
5293 fn push_item(&mut self, item: Out) -> Result<(), ()> {
5294 if let Some(ref mut consumer) = self.current_consumer {
5296 if let Err(e) = consumer.push_item(item) {
5297 let c = self.current_consumer.take().unwrap();
5298 c.fail(e);
5299 return Err(());
5300 }
5301 return Ok(());
5302 }
5303
5304 self.local_pending.push_back(item);
5306 if self.local_pending.len() >= LIVE_SUBSTREAM_BATCH {
5307 self.flush_pending()
5308 } else {
5309 Ok(())
5310 }
5311 }
5312
5313 fn flush_pending(&mut self) -> Result<(), ()> {
5316 if self.local_pending.is_empty() {
5317 return Ok(());
5318 }
5319
5320 if let Some(ref mut consumer) = self.current_consumer {
5322 for item in self.local_pending.drain(..) {
5323 if let Err(e) = consumer.push_item(item) {
5324 let c = self.current_consumer.take().unwrap();
5325 c.fail(e);
5326 return Err(());
5327 }
5328 }
5329 return Ok(());
5330 }
5331
5332 let slot = match &self.current_slot {
5333 Some(s) => Arc::clone(s),
5334 None => {
5335 self.local_pending.clear();
5336 return Ok(());
5337 }
5338 };
5339
5340 loop {
5341 if self.parent_cancelled.load(Ordering::SeqCst) {
5342 self.local_pending.clear();
5343 return Err(());
5344 }
5345
5346 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5347
5348 if matches!(state.consumer, SegmentConsumer::Direct(_)) {
5349 let consumer =
5350 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5351 SegmentConsumer::Direct(c) => c,
5352 _ => unreachable!(),
5353 };
5354 let mut drain_buf = std::mem::take(&mut state.buffer);
5355 drop(state);
5356 let mut consumer_box = consumer;
5357 for item in drain_buf.drain(..) {
5358 if let Err(e) = consumer_box.push_item(item) {
5359 consumer_box.fail(e);
5360 self.local_pending.clear();
5361 return Err(());
5362 }
5363 }
5364 for item in self.local_pending.drain(..) {
5365 if let Err(e) = consumer_box.push_item(item) {
5366 consumer_box.fail(e);
5367 return Err(());
5368 }
5369 }
5370 self.current_consumer = Some(consumer_box);
5371 return Ok(());
5372 }
5373
5374 if matches!(
5375 state.consumer,
5376 SegmentConsumer::Pending | SegmentConsumer::Fallback
5377 ) {
5378 let is_fallback = matches!(state.consumer, SegmentConsumer::Fallback);
5379 let cap = LIVE_SUBSTREAM_CAPACITY.saturating_sub(state.buffer.len());
5380 if cap > 0 {
5381 let to_flush = cap.min(self.local_pending.len());
5382 let items: Vec<Out> = self.local_pending.drain(..to_flush).collect();
5383 state.buffer.extend(items);
5384 let has_more = !self.local_pending.is_empty();
5385 drop(state);
5386 if is_fallback {
5387 slot.available.notify_all();
5388 }
5389 if !has_more {
5390 return Ok(());
5391 }
5392 continue;
5396 } else {
5397 state = slot
5399 .available
5400 .wait(state)
5401 .unwrap_or_else(|p| p.into_inner());
5402 continue;
5403 }
5404 }
5405
5406 return Ok(());
5408 }
5409 }
5410
5411 fn close_segment(&mut self) {
5412 let _ = self.flush_pending();
5414
5415 if let Some(consumer) = self.current_consumer.take() {
5417 consumer.complete();
5418 self.current_slot = None;
5419 return;
5420 }
5421
5422 let slot = match self.current_slot.take() {
5423 Some(s) => s,
5424 None => return,
5425 };
5426
5427 if self.parent_cancelled.load(Ordering::SeqCst) {
5428 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5429 if state.terminal.is_none() {
5430 state.terminal = Some(LiveSubstreamTerminal::Error(StreamError::AbruptTermination));
5431 }
5432 drop(state);
5433 slot.available.notify_all();
5434 return;
5435 }
5436
5437 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5438 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5439 SegmentConsumer::Direct(mut consumer_box) => {
5440 let mut drain_buf = std::mem::take(&mut state.buffer);
5441 drop(state);
5442 for item in drain_buf.drain(..) {
5443 if let Err(e) = consumer_box.push_item(item) {
5444 consumer_box.fail(e);
5445 return;
5446 }
5447 }
5448 consumer_box.complete();
5449 }
5450 SegmentConsumer::DirectTaken => {
5451 }
5453 SegmentConsumer::Fallback => {
5454 state.consumer = SegmentConsumer::DirectTaken;
5455 if state.terminal.is_none() {
5456 state.terminal = Some(LiveSubstreamTerminal::Complete);
5457 }
5458 drop(state);
5459 slot.available.notify_all();
5460 }
5461 SegmentConsumer::Pending => {
5462 state.consumer = SegmentConsumer::Pending;
5464 if state.terminal.is_none() {
5465 state.terminal = Some(LiveSubstreamTerminal::Complete);
5466 }
5467 drop(state);
5468 slot.available.notify_all();
5470 }
5471 }
5472 }
5473
5474 fn fail_segment(&mut self, error: StreamError) {
5475 self.local_pending.clear();
5477
5478 if let Some(consumer) = self.current_consumer.take() {
5479 consumer.fail(error);
5480 self.current_slot = None;
5481 return;
5482 }
5483 let slot = match self.current_slot.take() {
5484 Some(s) => s,
5485 None => return,
5486 };
5487 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5488 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5489 SegmentConsumer::Direct(c) => {
5490 drop(state);
5491 c.fail(error);
5492 }
5493 _ => {
5494 if state.terminal.is_none() {
5495 state.terminal = Some(LiveSubstreamTerminal::Error(error));
5496 }
5497 drop(state);
5498 slot.available.notify_all();
5499 }
5500 }
5501 }
5502
5503 fn fail_all(&mut self, error: StreamError) {
5504 self.fail_segment(error.clone());
5505 fail_live_substream(&self.outer, error);
5506 }
5507
5508 fn complete_all(&mut self) {
5509 self.close_segment();
5510 complete_live_substream(&self.outer);
5511 }
5512}
5513
5514impl<Out: Send + 'static> Drop for SplitFastWorkerGuard<Out> {
5515 fn drop(&mut self) {
5516 if self.armed {
5517 self.local_pending.clear(); if let Some(consumer) = self.current_consumer.take() {
5519 consumer.fail(StreamError::AbruptTermination);
5520 } else if let Some(slot) = self.current_slot.take() {
5521 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5522 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5523 SegmentConsumer::Direct(c) => {
5524 drop(state);
5525 c.fail(StreamError::AbruptTermination);
5526 }
5527 _ => {
5528 if state.terminal.is_none() {
5529 state.terminal =
5530 Some(LiveSubstreamTerminal::Error(StreamError::AbruptTermination));
5531 }
5532 drop(state);
5533 slot.available.notify_all();
5534 }
5535 }
5536 }
5537 fail_live_substream(&self.outer, StreamError::AbruptTermination);
5538 }
5539 }
5540}
5541
5542fn split_streams_fast<Out, F>(
5543 mut input: BoxStream<Out>,
5544 mode: SplitMode,
5545 predicate: Arc<F>,
5546 parent_cancelled: Arc<AtomicBool>,
5547 materializer: &Materializer,
5548) -> BoxStream<Source<Out>>
5549where
5550 Out: Clone + Send + 'static,
5551 F: Fn(&Out) -> bool + Send + Sync + 'static,
5552{
5553 let outer = LiveSubstreamShared::new();
5554 let worker_outer = Arc::clone(&outer);
5555 let completion = materializer.spawn_stream(move |cancelled| {
5556 let mut guard =
5557 SplitFastWorkerGuard::new(Arc::clone(&worker_outer), Arc::clone(&parent_cancelled));
5558
5559 while !cancelled.load(Ordering::SeqCst) {
5560 if worker_outer.cancelled.load(Ordering::SeqCst) {
5561 guard.disarm();
5562 return Ok(NotUsed);
5563 }
5564
5565 match input.next() {
5566 Some(Ok(item)) => {
5567 let split = match catch_unwind(AssertUnwindSafe(|| predicate(&item))) {
5568 Ok(s) => s,
5569 Err(_) => {
5570 guard.fail_all(StreamError::AbruptTermination);
5571 guard.disarm();
5572 return Ok(NotUsed);
5573 }
5574 };
5575
5576 match mode {
5577 SplitMode::When => {
5578 if split
5579 && (guard.current_slot.is_some()
5580 || guard.current_consumer.is_some())
5581 {
5582 guard.close_segment();
5583 }
5584 if guard.current_slot.is_none()
5585 && guard.current_consumer.is_none()
5586 && guard.open_segment().is_err()
5587 {
5588 guard.disarm();
5589 return Ok(NotUsed);
5590 }
5591 if guard.push_item(item).is_err() {
5592 guard.disarm();
5593 return Ok(NotUsed);
5594 }
5595 }
5596 SplitMode::After => {
5597 if guard.current_slot.is_none()
5598 && guard.current_consumer.is_none()
5599 && guard.open_segment().is_err()
5600 {
5601 guard.disarm();
5602 return Ok(NotUsed);
5603 }
5604 if guard.push_item(item).is_err() {
5605 guard.disarm();
5606 return Ok(NotUsed);
5607 }
5608 if split {
5609 guard.close_segment();
5610 }
5611 }
5612 }
5613 }
5614 Some(Err(error)) => {
5615 guard.fail_all(error.clone());
5616 guard.disarm();
5617 return Err(error);
5618 }
5619 None => {
5620 guard.complete_all();
5621 guard.disarm();
5622 return Ok(NotUsed);
5623 }
5624 }
5625 }
5626 guard.complete_all();
5627 guard.disarm();
5628 Ok(NotUsed)
5629 });
5630
5631 Box::new(LiveSubstreamStream {
5632 shared: outer,
5633 completion: Some(completion),
5634 local_batch: VecDeque::new(),
5635 })
5636}
5637
5638#[cfg(test)]
5640struct FlatMapMergeShared<T> {
5641 state: Mutex<FlatMapMergeState<T>>,
5642 available: Condvar,
5643 cancelled: Arc<AtomicBool>,
5644}
5645
5646#[cfg(test)]
5647struct FlatMapMergeState<T> {
5648 queued: VecDeque<(usize, T)>,
5649 window: HashMap<usize, usize>,
5650 active_streams: usize,
5651 input_done: bool,
5652 terminal: Option<StreamError>,
5653}
5654
5655#[cfg(test)]
5656impl<T> FlatMapMergeShared<T> {
5657 fn new() -> Arc<Self> {
5658 Arc::new(Self {
5659 state: Mutex::new(FlatMapMergeState {
5660 queued: VecDeque::new(),
5661 window: HashMap::new(),
5662 active_streams: 0,
5663 input_done: false,
5664 terminal: None,
5665 }),
5666 available: Condvar::new(),
5667 cancelled: Arc::new(AtomicBool::new(false)),
5668 })
5669 }
5670}
5671
5672#[cfg(test)]
5673struct FlatMapMergeStream<T> {
5674 shared: Arc<FlatMapMergeShared<T>>,
5675 completion: Option<StreamCompletion<NotUsed>>,
5676}
5677
5678#[cfg(test)]
5679impl<T> Iterator for FlatMapMergeStream<T> {
5680 type Item = StreamResult<T>;
5681
5682 fn next(&mut self) -> Option<Self::Item> {
5683 let mut state = self
5684 .shared
5685 .state
5686 .lock()
5687 .unwrap_or_else(|poison| poison.into_inner());
5688 loop {
5689 if let Some((stream_id, item)) = state.queued.pop_front() {
5690 if let Some(count) = state.window.get_mut(&stream_id) {
5693 *count -= 1;
5694 if *count == 0 {
5695 state.window.remove(&stream_id);
5696 }
5697 }
5698 drop(state);
5699 self.shared.available.notify_all();
5700 return Some(Ok(item));
5701 }
5702 if let Some(error) = state.terminal.clone() {
5703 return Some(Err(error));
5704 }
5705 if state.input_done && state.active_streams == 0 {
5706 return None;
5707 }
5708 state = self
5709 .shared
5710 .available
5711 .wait(state)
5712 .unwrap_or_else(|poison| poison.into_inner());
5713 }
5714 }
5715}
5716
5717#[cfg(test)]
5718impl<T> Drop for FlatMapMergeStream<T> {
5719 fn drop(&mut self) {
5720 self.shared.cancelled.store(true, Ordering::SeqCst);
5721 self.shared.available.notify_all();
5722 let _ = self.completion.take();
5723 }
5724}
5725
5726#[cfg(test)]
5727fn flat_map_merge_register_stream<T>(shared: &Arc<FlatMapMergeShared<T>>) {
5728 let mut state = shared
5729 .state
5730 .lock()
5731 .unwrap_or_else(|poison| poison.into_inner());
5732 state.active_streams += 1;
5733 drop(state);
5734 shared.available.notify_all();
5735}
5736
5737#[cfg(test)]
5738fn flat_map_merge_finish_stream<T>(
5739 shared: &Arc<FlatMapMergeShared<T>>,
5740 stream_id: usize,
5741 terminal: Result<(), StreamError>,
5742) {
5743 let mut state = shared
5744 .state
5745 .lock()
5746 .unwrap_or_else(|poison| poison.into_inner());
5747 state.active_streams = state.active_streams.saturating_sub(1);
5748 if let Err(error) = terminal
5749 && state.terminal.is_none()
5750 {
5751 state.terminal = Some(error);
5752 }
5753 state.window.entry(stream_id).or_default();
5755 drop(state);
5756 shared.available.notify_all();
5757}
5758
5759#[cfg(test)]
5760fn flat_map_merge_push<T>(
5761 shared: &Arc<FlatMapMergeShared<T>>,
5762 stream_id: usize,
5763 item: T,
5764) -> Result<(), T> {
5765 let mut state = shared
5768 .state
5769 .lock()
5770 .unwrap_or_else(|poison| poison.into_inner());
5771 while state.window.get(&stream_id).copied().unwrap_or(0) >= FLAT_MAP_MERGE_SUBSTREAM_WINDOW
5772 && state.terminal.is_none()
5773 {
5774 if shared.cancelled.load(Ordering::SeqCst) {
5775 return Err(item);
5776 }
5777 state = shared
5778 .available
5779 .wait(state)
5780 .unwrap_or_else(|poison| poison.into_inner());
5781 }
5782 if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
5783 return Err(item);
5784 }
5785 *state.window.entry(stream_id).or_insert(0) += 1;
5786 let was_empty = state.queued.is_empty();
5787 state.queued.push_back((stream_id, item));
5788 drop(state);
5789 if was_empty {
5790 shared.available.notify_all();
5791 }
5792 Ok(())
5793}
5794
5795#[cfg(test)]
5796struct FlatMapMergeCoordinatorGuard<T> {
5797 shared: Arc<FlatMapMergeShared<T>>,
5798 armed: bool,
5799}
5800
5801#[cfg(test)]
5802impl<T> FlatMapMergeCoordinatorGuard<T> {
5803 fn new(shared: Arc<FlatMapMergeShared<T>>) -> Self {
5804 Self {
5805 shared,
5806 armed: true,
5807 }
5808 }
5809
5810 fn finish(&mut self) {
5811 let mut state = self
5812 .shared
5813 .state
5814 .lock()
5815 .unwrap_or_else(|poison| poison.into_inner());
5816 state.input_done = true;
5817 drop(state);
5818 self.shared.available.notify_all();
5819 self.armed = false;
5820 }
5821}
5822
5823#[cfg(test)]
5824impl<T> Drop for FlatMapMergeCoordinatorGuard<T> {
5825 fn drop(&mut self) {
5826 if self.armed {
5827 let mut state = self
5828 .shared
5829 .state
5830 .lock()
5831 .unwrap_or_else(|poison| poison.into_inner());
5832 if state.terminal.is_none() {
5833 state.terminal = Some(StreamError::AbruptTermination);
5834 }
5835 state.input_done = true;
5836 drop(state);
5837 self.shared.cancelled.store(true, Ordering::SeqCst);
5838 self.shared.available.notify_all();
5839 }
5840 }
5841}
5842
5843#[cfg(test)]
5844struct FlatMapMergeWorkerGuard<T> {
5845 shared: Arc<FlatMapMergeShared<T>>,
5846 stream_id: usize,
5847 armed: bool,
5848}
5849
5850#[cfg(test)]
5851impl<T> FlatMapMergeWorkerGuard<T> {
5852 fn new(shared: Arc<FlatMapMergeShared<T>>, stream_id: usize) -> Self {
5853 Self {
5854 shared,
5855 stream_id,
5856 armed: true,
5857 }
5858 }
5859
5860 fn finish(&mut self, terminal: Result<(), StreamError>) {
5861 flat_map_merge_finish_stream(&self.shared, self.stream_id, terminal);
5862 self.armed = false;
5863 }
5864}
5865
5866#[cfg(test)]
5867impl<T> Drop for FlatMapMergeWorkerGuard<T> {
5868 fn drop(&mut self) {
5869 if self.armed {
5870 flat_map_merge_finish_stream(
5871 &self.shared,
5872 self.stream_id,
5873 Err(StreamError::AbruptTermination),
5874 );
5875 self.shared.cancelled.store(true, Ordering::SeqCst);
5876 }
5877 }
5878}
5879
5880#[cfg(test)]
5883#[derive(Clone, Copy, PartialEq, Eq, Debug)]
5884pub(crate) enum SubstreamExecutorMode {
5885 Auto,
5886 LegacyOnly,
5887 ReadyRingOnly,
5888 SplitSinkOnly,
5889}
5890
5891#[cfg(test)]
5892thread_local! {
5893 static SUBSTREAM_EXECUTOR_MODE: std::cell::Cell<SubstreamExecutorMode> =
5894 const { std::cell::Cell::new(SubstreamExecutorMode::Auto) };
5895}
5896
5897#[cfg(test)]
5898pub(crate) fn with_substream_mode<R>(mode: SubstreamExecutorMode, f: impl FnOnce() -> R) -> R {
5899 SUBSTREAM_EXECUTOR_MODE.with(|m| {
5900 let old = m.get();
5901 m.set(mode);
5902 let result = f();
5903 m.set(old);
5904 result
5905 })
5906}
5907
5908#[cfg(test)]
5909fn current_substream_mode() -> SubstreamExecutorMode {
5910 SUBSTREAM_EXECUTOR_MODE.with(|m| m.get())
5911}
5912
5913fn flat_map_merge_stream<Out, Next, NextMat, F>(
5916 input: BoxStream<Out>,
5917 breadth: usize,
5918 stage: Arc<F>,
5919 materializer: &Materializer,
5920) -> BoxStream<Next>
5921where
5922 Out: Send + 'static,
5923 Next: Send + 'static,
5924 NextMat: Send + 'static,
5925 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
5926{
5927 #[cfg(test)]
5928 if current_substream_mode() == SubstreamExecutorMode::LegacyOnly {
5929 return flat_map_merge_stream_legacy(input, breadth, stage, materializer);
5930 }
5931 flat_map_merge_stream_ready(input, breadth, stage, materializer)
5932}
5933
5934#[cfg(test)]
5937fn flat_map_merge_stream_legacy<Out, Next, NextMat, F>(
5938 mut input: BoxStream<Out>,
5939 breadth: usize,
5940 stage: Arc<F>,
5941 materializer: &Materializer,
5942) -> BoxStream<Next>
5943where
5944 Out: Send + 'static,
5945 Next: Send + 'static,
5946 NextMat: Send + 'static,
5947 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
5948{
5949 let worker_materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
5950 let shared = FlatMapMergeShared::new();
5951 let worker_shared = Arc::clone(&shared);
5952 let completion = materializer.spawn_stream(move |cancelled| {
5953 let mut next_id = 0usize;
5954 let mut guard = FlatMapMergeCoordinatorGuard::new(worker_shared);
5955 let mut workers = HashMap::<usize, StreamCompletion<NotUsed>>::with_capacity(breadth);
5956
5957 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
5958 workers.retain(|_, completion| completion.try_wait().is_none());
5959 {
5960 let mut state = guard
5961 .shared
5962 .state
5963 .lock()
5964 .unwrap_or_else(|poison| poison.into_inner());
5965 while state.active_streams >= breadth
5966 && state.terminal.is_none()
5967 && !cancelled.load(Ordering::SeqCst)
5968 && !guard.shared.cancelled.load(Ordering::SeqCst)
5969 {
5970 state = guard
5971 .shared
5972 .available
5973 .wait(state)
5974 .unwrap_or_else(|poison| poison.into_inner());
5975 }
5976 if state.terminal.is_some() {
5977 let error = state.terminal.clone().expect("terminal checked above");
5978 drop(state);
5979 guard.finish();
5980 return Err(error);
5981 }
5982 }
5983
5984 match input.next() {
5985 Some(Ok(item)) => {
5986 let source = stage(item);
5987 let (mut stream, _) =
5988 match Arc::clone(&source.factory).create(&worker_materializer) {
5989 Ok(parts) => parts,
5990 Err(error) => {
5991 let mut state = guard
5992 .shared
5993 .state
5994 .lock()
5995 .unwrap_or_else(|poison| poison.into_inner());
5996 if state.terminal.is_none() {
5997 state.terminal = Some(error.clone());
5998 }
5999 drop(state);
6000 guard.shared.available.notify_all();
6001 guard.finish();
6002 return Err(error);
6003 }
6004 };
6005 let stream_id = next_id;
6006 next_id += 1;
6007 flat_map_merge_register_stream(&guard.shared);
6008 let worker_shared = Arc::clone(&guard.shared);
6009 workers.insert(
6010 stream_id,
6011 worker_materializer.spawn_stream(move |inner_cancelled| {
6012 let mut worker_guard =
6013 FlatMapMergeWorkerGuard::new(Arc::clone(&worker_shared), stream_id);
6014 while !inner_cancelled.load(Ordering::SeqCst)
6015 && !worker_shared.cancelled.load(Ordering::SeqCst)
6016 {
6017 match stream.next() {
6018 Some(Ok(item)) => {
6019 if flat_map_merge_push(&worker_shared, stream_id, item)
6020 .is_err()
6021 {
6022 worker_guard.finish(Ok(()));
6023 return Ok(NotUsed);
6024 }
6025 }
6026 Some(Err(error)) => {
6027 worker_guard.finish(Err(error.clone()));
6028 return Err(error);
6029 }
6030 None => {
6031 worker_guard.finish(Ok(()));
6032 return Ok(NotUsed);
6033 }
6034 }
6035 }
6036 worker_guard.finish(Ok(()));
6037 Ok(NotUsed)
6038 }),
6039 );
6040 }
6041 Some(Err(error)) => {
6042 let mut state = guard
6043 .shared
6044 .state
6045 .lock()
6046 .unwrap_or_else(|poison| poison.into_inner());
6047 if state.terminal.is_none() {
6048 state.terminal = Some(error.clone());
6049 }
6050 drop(state);
6051 guard.shared.available.notify_all();
6052 guard.finish();
6053 return Err(error);
6054 }
6055 None => {
6056 guard.finish();
6057 break;
6058 }
6059 }
6060 }
6061
6062 if guard.armed {
6063 guard.finish();
6064 }
6065 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
6073 workers.retain(|_, completion| completion.try_wait().is_none());
6074 let state = guard
6080 .shared
6081 .state
6082 .lock()
6083 .unwrap_or_else(|poison| poison.into_inner());
6084 if state.active_streams == 0 {
6085 return Ok(NotUsed);
6086 }
6087 drop(
6088 guard
6089 .shared
6090 .available
6091 .wait(state)
6092 .unwrap_or_else(|poison| poison.into_inner()),
6093 );
6094 }
6095 Ok(NotUsed)
6096 });
6097
6098 Box::new(FlatMapMergeStream {
6099 shared,
6100 completion: Some(completion),
6101 })
6102}
6103
6104const FLAT_MAP_MERGE_READY_BATCH: usize = 16;
6107
6108const FLAT_MAP_MERGE_INLINE_MICRO_MAX: usize = FLAT_MAP_MERGE_READY_BATCH;
6112
6113struct FlatMapMergeReadyShared<T> {
6114 coordinator: Mutex<FlatMapMergeReadyState<T>>,
6115 available: Condvar,
6116 cancelled: Arc<AtomicBool>,
6117}
6118
6119struct FlatMapMergeReadyState<T> {
6120 lanes: HashMap<usize, Arc<FlatMapMergeLane<T>>>,
6121 ready: std::collections::VecDeque<usize>,
6122 queued_items: usize,
6123 active_streams: usize,
6124 input_done: bool,
6125 terminal: Option<StreamError>,
6126 generation: u64,
6127}
6128
6129struct FlatMapMergeLane<T> {
6130 state: Mutex<FlatMapMergeLaneState<T>>,
6131 space_available: Condvar,
6132}
6133
6134struct FlatMapMergeLaneState<T> {
6135 buffer: std::collections::VecDeque<T>,
6136 in_ready_ring: bool,
6137 publishing: bool,
6138 closed: bool,
6139}
6140
6141struct FlatMapMergeReadyStream<T> {
6142 shared: Arc<FlatMapMergeReadyShared<T>>,
6143 completion: Option<StreamCompletion<NotUsed>>,
6144 local_batch: std::collections::VecDeque<T>,
6145}
6146
6147impl<T: Send + 'static> Iterator for FlatMapMergeReadyStream<T> {
6148 type Item = StreamResult<T>;
6149
6150 fn next(&mut self) -> Option<Self::Item> {
6151 if let Some(item) = self.local_batch.pop_front() {
6153 return Some(Ok(item));
6154 }
6155
6156 let mut coord = self
6158 .shared
6159 .coordinator
6160 .lock()
6161 .unwrap_or_else(|p| p.into_inner());
6162 loop {
6163 if let Some(error) = coord.terminal.clone() {
6165 return Some(Err(error));
6166 }
6167
6168 if let Some(lane_id) = coord.ready.pop_front() {
6170 let lane = coord.lanes.get(&lane_id).cloned();
6171 drop(coord);
6172
6173 if let Some(lane) = lane {
6174 let mut lane_state = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6176 while lane_state.publishing {
6177 lane_state = lane
6178 .space_available
6179 .wait(lane_state)
6180 .unwrap_or_else(|p| p.into_inner());
6181 }
6182 let drain_n = lane_state.buffer.len().min(FLAT_MAP_MERGE_READY_BATCH);
6183 let mut batch = std::collections::VecDeque::with_capacity(drain_n);
6184 for _ in 0..drain_n {
6185 if let Some(item) = lane_state.buffer.pop_front() {
6186 batch.push_back(item);
6187 }
6188 }
6189 let still_has_items = !lane_state.buffer.is_empty();
6190 let is_closed = lane_state.closed;
6191 if !still_has_items {
6192 lane_state.in_ready_ring = false;
6193 }
6194 drop(lane_state);
6196 if !batch.is_empty() {
6198 lane.space_available.notify_all();
6199 }
6200
6201 let freed = batch.len();
6203 let mut coord = self
6204 .shared
6205 .coordinator
6206 .lock()
6207 .unwrap_or_else(|p| p.into_inner());
6208 coord.queued_items = coord.queued_items.saturating_sub(freed);
6209 if still_has_items {
6210 coord.ready.push_back(lane_id);
6211 } else if is_closed {
6212 coord.lanes.remove(&lane_id);
6213 }
6214 coord.generation += 1;
6215 drop(coord);
6216 self.shared.available.notify_all();
6217
6218 let mut iter = batch.into_iter();
6220 if let Some(first) = iter.next() {
6221 self.local_batch.extend(iter);
6222 return Some(Ok(first));
6223 }
6224 }
6225 coord = self
6227 .shared
6228 .coordinator
6229 .lock()
6230 .unwrap_or_else(|p| p.into_inner());
6231 continue;
6232 }
6233
6234 if coord.terminal.is_none()
6236 && coord.input_done
6237 && coord.active_streams == 0
6238 && coord.queued_items == 0
6239 {
6240 return None;
6241 }
6242
6243 let seen = coord.generation;
6245 coord = self
6246 .shared
6247 .available
6248 .wait_while(coord, |s| {
6249 s.generation == seen
6250 && s.terminal.is_none()
6251 && s.ready.is_empty()
6252 && !(s.input_done && s.active_streams == 0 && s.queued_items == 0)
6253 })
6254 .unwrap_or_else(|p| p.into_inner());
6255 }
6256 }
6257}
6258
6259impl<T> Drop for FlatMapMergeReadyStream<T> {
6260 fn drop(&mut self) {
6261 let lanes: Vec<Arc<FlatMapMergeLane<T>>> = {
6262 let mut coord = self
6263 .shared
6264 .coordinator
6265 .lock()
6266 .unwrap_or_else(|p| p.into_inner());
6267 coord.generation += 1;
6268 coord.lanes.values().cloned().collect()
6269 };
6270 self.shared.cancelled.store(true, Ordering::SeqCst);
6271 self.shared.available.notify_all();
6272 for lane in lanes {
6273 lane.space_available.notify_all();
6274 }
6275 let _ = self.completion.take();
6276 }
6277}
6278
6279fn finish_lane_ready<T>(
6282 shared: &Arc<FlatMapMergeReadyShared<T>>,
6283 stream_id: usize,
6284 terminal: Result<(), StreamError>,
6285) {
6286 let is_error = terminal.is_err();
6287
6288 let lane_is_empty = {
6290 let coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
6291 let lane_opt = coord.lanes.get(&stream_id).cloned();
6292 drop(coord);
6293 if let Some(lane) = lane_opt {
6294 let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6295 ls.closed = true;
6296 ls.buffer.is_empty()
6297 } else {
6298 true
6299 }
6300 };
6301
6302 let lanes_to_notify: Vec<Arc<FlatMapMergeLane<T>>> = {
6304 let mut coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
6305 coord.active_streams = coord.active_streams.saturating_sub(1);
6306 if let Err(ref error) = terminal
6307 && coord.terminal.is_none()
6308 {
6309 coord.terminal = Some(error.clone());
6310 }
6311 if lane_is_empty {
6312 coord.lanes.remove(&stream_id);
6313 }
6314 coord.generation += 1;
6315 if is_error {
6316 coord.lanes.values().cloned().collect()
6317 } else {
6318 vec![]
6319 }
6320 };
6321
6322 if is_error {
6323 shared.cancelled.store(true, Ordering::SeqCst);
6324 for lane in &lanes_to_notify {
6325 lane.space_available.notify_all();
6326 }
6327 }
6328 shared.available.notify_all();
6329}
6330
6331struct FlatMapMergeReadyCoordinatorGuard<T> {
6332 shared: Arc<FlatMapMergeReadyShared<T>>,
6333 armed: bool,
6334}
6335
6336impl<T> FlatMapMergeReadyCoordinatorGuard<T> {
6337 fn new(shared: Arc<FlatMapMergeReadyShared<T>>) -> Self {
6338 Self {
6339 shared,
6340 armed: true,
6341 }
6342 }
6343
6344 fn finish(&mut self) {
6345 let mut coord = self
6346 .shared
6347 .coordinator
6348 .lock()
6349 .unwrap_or_else(|p| p.into_inner());
6350 coord.input_done = true;
6351 coord.generation += 1;
6352 drop(coord);
6353 self.shared.available.notify_all();
6354 self.armed = false;
6355 }
6356}
6357
6358impl<T> Drop for FlatMapMergeReadyCoordinatorGuard<T> {
6359 fn drop(&mut self) {
6360 if self.armed {
6361 let lanes: Vec<Arc<FlatMapMergeLane<T>>> = {
6362 let mut coord = self
6363 .shared
6364 .coordinator
6365 .lock()
6366 .unwrap_or_else(|p| p.into_inner());
6367 if coord.terminal.is_none() {
6368 coord.terminal = Some(StreamError::AbruptTermination);
6369 }
6370 coord.input_done = true;
6371 coord.generation += 1;
6372 coord.lanes.values().cloned().collect()
6373 };
6374 self.shared.cancelled.store(true, Ordering::SeqCst);
6375 self.shared.available.notify_all();
6376 for lane in lanes {
6377 lane.space_available.notify_all();
6378 }
6379 }
6380 }
6381}
6382
6383struct FlatMapMergeReadyWorkerGuard<T> {
6384 shared: Arc<FlatMapMergeReadyShared<T>>,
6385 stream_id: usize,
6386 armed: bool,
6387}
6388
6389impl<T> FlatMapMergeReadyWorkerGuard<T> {
6390 fn new(shared: Arc<FlatMapMergeReadyShared<T>>, stream_id: usize) -> Self {
6391 Self {
6392 shared,
6393 stream_id,
6394 armed: true,
6395 }
6396 }
6397
6398 fn finish(&mut self, terminal: Result<(), StreamError>) {
6399 finish_lane_ready(&self.shared, self.stream_id, terminal);
6400 self.armed = false;
6401 }
6402}
6403
6404impl<T> Drop for FlatMapMergeReadyWorkerGuard<T> {
6405 fn drop(&mut self) {
6406 if self.armed {
6407 finish_lane_ready(
6408 &self.shared,
6409 self.stream_id,
6410 Err(StreamError::AbruptTermination),
6411 );
6412 self.shared.cancelled.store(true, Ordering::SeqCst);
6413 }
6414 }
6415}
6416
6417fn publish_ready_batch<T>(
6432 shared: &Arc<FlatMapMergeReadyShared<T>>,
6433 stream_id: usize,
6434 lane: &Arc<FlatMapMergeLane<T>>,
6435 batch: std::collections::VecDeque<T>,
6436) {
6437 let batch_len = batch.len();
6438 if batch_len == 0 {
6439 return;
6440 }
6441
6442 let was_not_in_ready = {
6444 let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6445 let was_not = !ls.in_ready_ring;
6446 ls.publishing = true;
6447 ls.buffer.extend(batch);
6448 if !ls.buffer.is_empty() {
6449 ls.in_ready_ring = true;
6450 }
6451 was_not
6452 };
6453
6454 {
6456 let mut coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
6457 coord.queued_items += batch_len;
6458 if was_not_in_ready {
6459 coord.ready.push_back(stream_id);
6460 }
6461 coord.generation += 1;
6462 }
6463 {
6464 let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6465 ls.publishing = false;
6466 }
6467 lane.space_available.notify_all();
6468 shared.available.notify_all();
6469}
6470
6471struct FlatMapMergeReadyInlineGuard<T> {
6481 shared: Arc<FlatMapMergeReadyShared<T>>,
6482 stream_id: usize,
6483 armed: bool,
6484}
6485
6486impl<T> FlatMapMergeReadyInlineGuard<T> {
6487 fn new(shared: Arc<FlatMapMergeReadyShared<T>>, stream_id: usize) -> Self {
6488 Self {
6489 shared,
6490 stream_id,
6491 armed: true,
6492 }
6493 }
6494
6495 fn finish(&mut self, terminal: Result<(), StreamError>) {
6496 finish_lane_ready(&self.shared, self.stream_id, terminal);
6497 self.armed = false;
6498 }
6499
6500 fn hand_off(&mut self) {
6501 self.armed = false;
6502 }
6503}
6504
6505impl<T> Drop for FlatMapMergeReadyInlineGuard<T> {
6506 fn drop(&mut self) {
6507 if self.armed {
6508 finish_lane_ready(
6509 &self.shared,
6510 self.stream_id,
6511 Err(StreamError::AbruptTermination),
6512 );
6513 }
6514 }
6515}
6516
6517fn flat_map_merge_stream_ready<Out, Next, NextMat, F>(
6518 mut input: BoxStream<Out>,
6519 breadth: usize,
6520 stage: Arc<F>,
6521 materializer: &Materializer,
6522) -> BoxStream<Next>
6523where
6524 Out: Send + 'static,
6525 Next: Send + 'static,
6526 NextMat: Send + 'static,
6527 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
6528{
6529 let worker_materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
6530 let shared: Arc<FlatMapMergeReadyShared<Next>> = Arc::new(FlatMapMergeReadyShared {
6531 coordinator: Mutex::new(FlatMapMergeReadyState {
6532 lanes: HashMap::new(),
6533 ready: std::collections::VecDeque::new(),
6534 queued_items: 0,
6535 active_streams: 0,
6536 input_done: false,
6537 terminal: None,
6538 generation: 0,
6539 }),
6540 available: Condvar::new(),
6541 cancelled: Arc::new(AtomicBool::new(false)),
6542 });
6543
6544 let worker_shared = Arc::clone(&shared);
6545 let completion = materializer.spawn_stream(move |cancelled| {
6546 let mut next_id = 0usize;
6547 let mut guard = FlatMapMergeReadyCoordinatorGuard::new(worker_shared);
6548 let mut workers = HashMap::<usize, StreamCompletion<NotUsed>>::with_capacity(breadth);
6549
6550 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
6551 workers.retain(|_, c| c.try_wait().is_none());
6552
6553 {
6555 let mut coord = guard
6556 .shared
6557 .coordinator
6558 .lock()
6559 .unwrap_or_else(|p| p.into_inner());
6560 loop {
6561 if coord.terminal.is_some()
6562 || coord.active_streams < breadth
6563 || cancelled.load(Ordering::SeqCst)
6564 || guard.shared.cancelled.load(Ordering::SeqCst)
6565 {
6566 break;
6567 }
6568 let seen = coord.generation;
6569 coord = guard
6570 .shared
6571 .available
6572 .wait_while(coord, |s| {
6573 s.generation == seen
6574 && s.terminal.is_none()
6575 && s.active_streams >= breadth
6576 && !cancelled.load(Ordering::SeqCst)
6577 && !guard.shared.cancelled.load(Ordering::SeqCst)
6578 })
6579 .unwrap_or_else(|p| p.into_inner());
6580 }
6581 if coord.terminal.is_some() {
6582 let error = coord.terminal.clone().expect("terminal checked");
6583 drop(coord);
6584 guard.finish();
6585 return Err(error);
6586 }
6587 }
6588
6589 match input.next() {
6590 Some(Ok(item)) => {
6591 let source = stage(item);
6593 let inline_hint = source.hints.inline_micro;
6595 let (stream, _) = match Arc::clone(&source.factory).create(&worker_materializer)
6596 {
6597 Ok(parts) => parts,
6598 Err(error) => {
6599 let lanes: Vec<Arc<FlatMapMergeLane<Next>>> = {
6600 let mut coord = guard
6601 .shared
6602 .coordinator
6603 .lock()
6604 .unwrap_or_else(|p| p.into_inner());
6605 if coord.terminal.is_none() {
6606 coord.terminal = Some(error.clone());
6607 }
6608 coord.generation += 1;
6609 coord.lanes.values().cloned().collect()
6610 };
6611 guard.shared.cancelled.store(true, Ordering::SeqCst);
6612 guard.shared.available.notify_all();
6613 for lane in lanes {
6614 lane.space_available.notify_all();
6615 }
6616 guard.finish();
6617 return Err(error);
6618 }
6619 };
6620
6621 let stream_id = next_id;
6622 next_id += 1;
6623 let mut stream = stream;
6624
6625 let lane: Arc<FlatMapMergeLane<Next>> = Arc::new(FlatMapMergeLane {
6627 state: Mutex::new(FlatMapMergeLaneState {
6628 buffer: std::collections::VecDeque::new(),
6629 in_ready_ring: false,
6630 publishing: false,
6631 closed: false,
6632 }),
6633 space_available: Condvar::new(),
6634 });
6635 {
6636 let mut coord = guard
6637 .shared
6638 .coordinator
6639 .lock()
6640 .unwrap_or_else(|p| p.into_inner());
6641 coord.lanes.insert(stream_id, Arc::clone(&lane));
6642 coord.active_streams += 1;
6643 }
6645
6646 let mut inline_guard =
6649 FlatMapMergeReadyInlineGuard::new(Arc::clone(&guard.shared), stream_id);
6650
6651 let is_inline = inline_hint
6652 .is_some_and(|h| h.max_success_items <= FLAT_MAP_MERGE_INLINE_MICRO_MAX);
6653
6654 if is_inline {
6655 let max_items = inline_hint.expect("checked").max_success_items;
6659
6660 if guard.shared.cancelled.load(Ordering::SeqCst)
6662 || cancelled.load(Ordering::SeqCst)
6663 {
6664 inline_guard.finish(Ok(()));
6665 continue;
6666 }
6667
6668 let max_pulls = max_items.saturating_add(1);
6671 let mut local_batch = std::collections::VecDeque::with_capacity(max_items);
6672 let mut terminal_result: Option<Result<(), StreamError>> = None;
6673
6674 for _ in 0..max_pulls {
6675 if guard.shared.cancelled.load(Ordering::SeqCst)
6676 || cancelled.load(Ordering::SeqCst)
6677 {
6678 break;
6679 }
6680 match stream.next() {
6681 Some(Ok(item)) => local_batch.push_back(item),
6682 Some(Err(e)) => {
6683 terminal_result = Some(Err(e));
6684 break;
6685 }
6686 None => {
6687 terminal_result = Some(Ok(()));
6688 break;
6689 }
6690 }
6691 }
6692
6693 if guard.shared.cancelled.load(Ordering::SeqCst)
6695 || cancelled.load(Ordering::SeqCst)
6696 {
6697 inline_guard.finish(Ok(()));
6698 continue;
6699 }
6700
6701 publish_ready_batch(&guard.shared, stream_id, &lane, local_batch);
6704
6705 match terminal_result {
6706 Some(Err(e)) => {
6707 inline_guard.finish(Err(e));
6710 }
6711 Some(Ok(())) | None => {
6712 inline_guard.finish(Ok(()));
6713 }
6714 }
6715 } else {
6716 inline_guard.hand_off();
6718 let worker_shared = Arc::clone(&guard.shared);
6719 let worker_lane = Arc::clone(&lane);
6720 workers.insert(
6721 stream_id,
6722 worker_materializer.spawn_stream(move |inner_cancelled| {
6723 let mut worker_guard = FlatMapMergeReadyWorkerGuard::new(
6724 Arc::clone(&worker_shared),
6725 stream_id,
6726 );
6727
6728 loop {
6729 if inner_cancelled.load(Ordering::SeqCst)
6730 || worker_shared.cancelled.load(Ordering::SeqCst)
6731 {
6732 worker_guard.finish(Ok(()));
6733 return Ok(NotUsed);
6734 }
6735
6736 let capacity;
6738 {
6739 let mut ls = worker_lane
6740 .state
6741 .lock()
6742 .unwrap_or_else(|p| p.into_inner());
6743 while ls.buffer.len() >= FLAT_MAP_MERGE_SUBSTREAM_WINDOW
6744 && !worker_shared.cancelled.load(Ordering::SeqCst)
6745 && !ls.closed
6746 && !inner_cancelled.load(Ordering::SeqCst)
6747 {
6748 ls = worker_lane
6749 .space_available
6750 .wait(ls)
6751 .unwrap_or_else(|p| p.into_inner());
6752 }
6753 if worker_shared.cancelled.load(Ordering::SeqCst)
6754 || ls.closed
6755 || inner_cancelled.load(Ordering::SeqCst)
6756 {
6757 drop(ls);
6758 worker_guard.finish(Ok(()));
6759 return Ok(NotUsed);
6760 }
6761 capacity =
6762 FLAT_MAP_MERGE_SUBSTREAM_WINDOW - ls.buffer.len();
6763 }
6764 let batch_size = capacity.min(FLAT_MAP_MERGE_READY_BATCH);
6768 let mut local_batch =
6769 std::collections::VecDeque::with_capacity(batch_size);
6770 let mut terminal_result: Option<Result<(), StreamError>> = None;
6771 for _ in 0..batch_size {
6772 if inner_cancelled.load(Ordering::SeqCst)
6773 || worker_shared.cancelled.load(Ordering::SeqCst)
6774 {
6775 break;
6776 }
6777 match stream.next() {
6778 Some(Ok(item)) => local_batch.push_back(item),
6779 Some(Err(e)) => {
6780 terminal_result = Some(Err(e));
6781 break;
6782 }
6783 None => {
6784 terminal_result = Some(Ok(()));
6785 break;
6786 }
6787 }
6788 }
6789 let batch_len = local_batch.len();
6790
6791 publish_ready_batch(
6794 &worker_shared,
6795 stream_id,
6796 &worker_lane,
6797 local_batch,
6798 );
6799
6800 match terminal_result {
6801 Some(Ok(())) => {
6802 worker_guard.finish(Ok(()));
6803 return Ok(NotUsed);
6804 }
6805 Some(Err(e)) => {
6806 worker_guard.finish(Err(e.clone()));
6807 return Err(e);
6808 }
6809 None => {
6810 if batch_len == 0 {
6811 worker_guard.finish(Ok(()));
6812 return Ok(NotUsed);
6813 }
6814 }
6815 }
6816 }
6817 }),
6818 );
6819 }
6820 }
6821 Some(Err(error)) => {
6822 let lanes: Vec<Arc<FlatMapMergeLane<Next>>> = {
6823 let mut coord = guard
6824 .shared
6825 .coordinator
6826 .lock()
6827 .unwrap_or_else(|p| p.into_inner());
6828 if coord.terminal.is_none() {
6829 coord.terminal = Some(error.clone());
6830 }
6831 coord.generation += 1;
6832 coord.lanes.values().cloned().collect()
6833 };
6834 guard.shared.cancelled.store(true, Ordering::SeqCst);
6835 guard.shared.available.notify_all();
6836 for lane in lanes {
6837 lane.space_available.notify_all();
6838 }
6839 guard.finish();
6840 return Err(error);
6841 }
6842 None => {
6843 guard.finish();
6844 break;
6845 }
6846 }
6847 }
6848
6849 if guard.armed {
6850 guard.finish();
6851 }
6852
6853 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
6857 workers.retain(|_, c| c.try_wait().is_none());
6858 let coord = guard
6859 .shared
6860 .coordinator
6861 .lock()
6862 .unwrap_or_else(|p| p.into_inner());
6863 if coord.active_streams == 0 {
6864 return Ok(NotUsed);
6865 }
6866 let seen = coord.generation;
6867 drop(
6868 guard
6869 .shared
6870 .available
6871 .wait_while(coord, |s| s.generation == seen && s.active_streams > 0)
6872 .unwrap_or_else(|p| p.into_inner()),
6873 );
6874 }
6875 Ok(NotUsed)
6876 });
6877
6878 Box::new(FlatMapMergeReadyStream {
6879 shared,
6880 completion: Some(completion),
6881 local_batch: std::collections::VecDeque::new(),
6882 })
6883}
6884
6885fn flat_map_concat_stream<Out, Next, NextMat, F>(
6886 mut input: BoxStream<Out>,
6887 stage: Arc<F>,
6888 materializer: &Materializer,
6889) -> BoxStream<Next>
6890where
6891 Out: Send + 'static,
6892 Next: Send + 'static,
6893 NextMat: Send + 'static,
6894 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
6895{
6896 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
6897 let mut current: Option<BoxStream<Next>> = None;
6898 Box::new(std::iter::from_fn(move || {
6899 loop {
6900 if let Some(stream) = current.as_mut() {
6901 match stream.next() {
6902 Some(item) => return Some(item),
6903 None => current = None,
6904 }
6905 }
6906
6907 match input.next() {
6908 Some(Ok(item)) => {
6909 let source = stage(item);
6910 current = Some(match Arc::clone(&source.factory).create(&materializer) {
6911 Ok((stream, _)) => stream,
6912 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
6913 });
6914 }
6915 Some(Err(error)) => return Some(Err(error)),
6916 None => return None,
6917 }
6918 }
6919 }))
6920}
6921
6922fn map_async_unordered<Out, Next, F, Fut>(
6923 mut input: BoxStream<Out>,
6924 parallelism: usize,
6925 stage: Arc<F>,
6926) -> BoxStream<Next>
6927where
6928 Out: Send + 'static,
6929 Next: Send + 'static,
6930 F: Fn(Out) -> Fut + Send + Sync + 'static,
6931 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
6932{
6933 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
6934 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
6935 let mut next_task_id = 0_usize;
6936 let mut input_done = false;
6937
6938 Box::new(std::iter::from_fn(move || {
6939 loop {
6940 while tasks.len() < parallelism && !input_done {
6941 match input.next() {
6942 Some(Ok(item)) => match poll_once_or_pending(stage(item)) {
6943 Ok(result) => return Some(result),
6944 Err(future) => {
6945 let task_id = next_task_id;
6946 next_task_id += 1;
6947 tasks.insert(
6948 task_id,
6949 spawn_completion_task(task_id, future, sender.clone(), |result| {
6950 result
6951 }),
6952 );
6953 }
6954 },
6955 Some(Err(error)) => {
6956 input_done = true;
6957 return Some(Err(error));
6958 }
6959 None => input_done = true,
6960 }
6961 }
6962
6963 if tasks.is_empty() {
6964 return None;
6965 }
6966
6967 if let Some((task_id, result)) = recv_completion(&receiver) {
6968 tasks.remove(&task_id);
6969 return Some(result);
6970 }
6971 }
6972 }))
6973}
6974
6975fn map_async_unordered_supervised<Out, Next, F, Fut>(
6976 mut input: BoxStream<Out>,
6977 parallelism: usize,
6978 stage: Arc<F>,
6979 decider: SupervisionDecider,
6980) -> BoxStream<Next>
6981where
6982 Out: Send + 'static,
6983 Next: Send + 'static,
6984 F: Fn(Out) -> Fut + Send + Sync + 'static,
6985 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
6986{
6987 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
6988 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
6989 let mut next_task_id = 0_usize;
6990 let mut input_done = false;
6991
6992 Box::new(std::iter::from_fn(move || {
6993 loop {
6994 while tasks.len() < parallelism && !input_done {
6995 match input.next() {
6996 Some(Ok(item)) => {
6997 match catch_unwind(AssertUnwindSafe(|| poll_once_or_pending(stage(item)))) {
6998 Ok(Ok(result)) => {
6999 if let Some(result) = supervise_async_result(result, &decider) {
7000 return Some(result);
7001 }
7002 }
7003 Ok(Err(future)) => {
7004 let task_id = next_task_id;
7005 next_task_id += 1;
7006 tasks.insert(
7007 task_id,
7008 spawn_completion_task(
7009 task_id,
7010 future,
7011 sender.clone(),
7012 |result| result,
7013 ),
7014 );
7015 }
7016 Err(_) => {
7017 let error = panic_stream_error("map_async_unordered callback");
7018 if let Some(result) = supervise_async_result(Err(error), &decider) {
7019 return Some(result);
7020 }
7021 }
7022 }
7023 }
7024 Some(Err(error)) => {
7025 input_done = true;
7026 return Some(Err(error));
7027 }
7028 None => input_done = true,
7029 }
7030 }
7031
7032 if tasks.is_empty() {
7033 return None;
7034 }
7035
7036 if let Some((task_id, result)) = recv_completion(&receiver) {
7037 tasks.remove(&task_id);
7038 if let Some(result) = supervise_async_result(result, &decider) {
7039 return Some(result);
7040 }
7041 }
7042 }
7043 }))
7044}
7045
7046#[inline(always)]
7047fn map_async_partitioned_serial<Out, Key, Next, Partition, F, Fut>(
7048 mut input: BoxStream<Out>,
7049 partition: Arc<Partition>,
7050 stage: Arc<F>,
7051) -> BoxStream<Next>
7052where
7053 Out: Send + 'static,
7054 Key: Send + 'static,
7055 Next: Send + 'static,
7056 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
7057 F: Fn(Out) -> Fut + Send + Sync + 'static,
7058 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7059{
7060 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
7061 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(1);
7062 let mut next_index = 0_usize;
7063 Box::new(std::iter::from_fn(move || {
7064 if !tasks.is_empty()
7068 && let Some((task_id, result)) = recv_completion(&receiver)
7069 {
7070 tasks.remove(&task_id);
7071 return Some(result);
7072 }
7073
7074 let item = input.next()?;
7075 match item {
7076 Ok(item) => {
7077 let _ = partition(&item);
7078 let index = next_index;
7079 next_index += 1;
7080 Some(match poll_once_or_pending(stage(item)) {
7081 Ok(result) => result,
7082 Err(future) => {
7083 tasks.insert(
7084 index,
7085 spawn_completion_task(index, future, sender.clone(), |result| result),
7086 );
7087 let (task_id, result) =
7088 recv_completion(&receiver).expect("pending map_async task completion");
7089 tasks.remove(&task_id);
7090 result
7091 }
7092 })
7093 }
7094 Err(error) => Some(Err(error)),
7095 }
7096 }))
7097}
7098
7099#[inline(always)]
7100fn map_async_partitioned_scanning<Out, Key, Next, Partition, F, Fut>(
7101 mut input: BoxStream<Out>,
7102 parallelism: usize,
7103 per_partition: usize,
7104 partition: Arc<Partition>,
7105 stage: Arc<F>,
7106) -> BoxStream<Next>
7107where
7108 Out: Send + 'static,
7109 Key: Clone + Eq + Hash + Send + 'static,
7110 Next: Send + 'static,
7111 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
7112 F: Fn(Out) -> Fut + Send + Sync + 'static,
7113 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7114{
7115 let (sender, receiver) = std::sync::mpsc::channel::<(usize, (Key, StreamResult<Next>))>();
7116 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
7117 let mut active_by_key = HashMap::<Key, usize>::with_capacity(parallelism);
7118 let mut pending = VecDeque::<(usize, Key, Out)>::with_capacity(parallelism);
7119 let mut completed = BTreeMap::<usize, StreamResult<Next>>::new();
7120 let mut next_index = 0_usize;
7121 let mut next_to_emit = 0_usize;
7122 let mut input_done = false;
7123
7124 Box::new(std::iter::from_fn(move || {
7125 loop {
7126 if let Some(result) = completed.remove(&next_to_emit) {
7127 next_to_emit += 1;
7128 return Some(result);
7129 }
7130
7131 while tasks.len() + completed.len() < parallelism {
7132 let next = pending
7133 .iter()
7134 .position(|(_, key, _)| {
7135 active_by_key.get(key).copied().unwrap_or(0) < per_partition
7136 })
7137 .and_then(|index| pending.remove(index))
7138 .or_else(|| {
7139 if input_done {
7140 return None;
7141 }
7142 match input.next() {
7143 Some(Ok(item)) => {
7144 let key = partition(&item);
7145 let index = next_index;
7146 next_index += 1;
7147 Some((index, key, item))
7148 }
7149 Some(Err(error)) => {
7150 completed.insert(next_index, Err(error));
7151 next_index += 1;
7152 input_done = true;
7153 None
7154 }
7155 None => {
7156 input_done = true;
7157 None
7158 }
7159 }
7160 });
7161
7162 let Some((index, key, item)) = next else {
7163 break;
7164 };
7165 if active_by_key.get(&key).copied().unwrap_or(0) >= per_partition {
7166 pending.push_back((index, key, item));
7167 if input_done || pending.len() >= parallelism {
7168 break;
7169 }
7170 continue;
7171 }
7172 *active_by_key.entry(key.clone()).or_default() += 1;
7173 match poll_once_or_pending(stage(item)) {
7174 Ok(result) => {
7175 if let Some(count) = active_by_key.get_mut(&key) {
7176 *count -= 1;
7177 if *count == 0 {
7178 active_by_key.remove(&key);
7179 }
7180 }
7181 completed.insert(index, result);
7182 }
7183 Err(future) => {
7184 tasks.insert(
7185 index,
7186 spawn_completion_task(index, future, sender.clone(), move |result| {
7187 (key, result)
7188 }),
7189 );
7190 }
7191 }
7192 }
7193
7194 if let Some(result) = completed.remove(&next_to_emit) {
7195 next_to_emit += 1;
7196 return Some(result);
7197 }
7198
7199 if tasks.is_empty() {
7200 return None;
7201 }
7202
7203 if let Some((index, (key, result))) = recv_completion(&receiver) {
7204 tasks.remove(&index);
7205 if let Some(count) = active_by_key.get_mut(&key) {
7206 *count -= 1;
7207 if *count == 0 {
7208 active_by_key.remove(&key);
7209 }
7210 }
7211 if index == next_to_emit {
7212 next_to_emit += 1;
7213 return Some(result);
7214 }
7215 completed.insert(index, result);
7216 }
7217 }
7218 }))
7219}
7220
7221fn map_async_partitioned<Out, Key, Next, Partition, F, Fut>(
7222 mut input: BoxStream<Out>,
7223 parallelism: usize,
7224 per_partition: usize,
7225 partition: Arc<Partition>,
7226 stage: Arc<F>,
7227) -> BoxStream<Next>
7228where
7229 Out: Send + 'static,
7230 Key: Clone + Eq + Hash + Send + 'static,
7231 Next: Send + 'static,
7232 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
7233 F: Fn(Out) -> Fut + Send + Sync + 'static,
7234 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7235{
7236 if parallelism == 1 {
7237 return map_async_partitioned_serial(input, partition, stage);
7238 }
7239 if parallelism <= 4 {
7241 return map_async_partitioned_scanning(input, parallelism, per_partition, partition, stage);
7242 }
7243
7244 let (sender, receiver) = std::sync::mpsc::channel::<(usize, (usize, StreamResult<Next>))>();
7245 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
7246 let mut slots_by_key = HashMap::<Key, usize>::with_capacity(parallelism);
7247 let mut slots = Vec::<PartitionSlot<Key, Out>>::with_capacity(parallelism);
7248 let mut free_slots = Vec::<usize>::new();
7249 let mut ready_slots = VecDeque::<usize>::with_capacity(parallelism);
7250 let mut completed = BTreeMap::<usize, StreamResult<Next>>::new();
7251 let mut next_index = 0_usize;
7252 let mut next_to_emit = 0_usize;
7253 let mut input_done = false;
7254 Box::new(std::iter::from_fn(move || {
7255 loop {
7256 if let Some(result) = completed.remove(&next_to_emit) {
7257 next_to_emit += 1;
7258 return Some(result);
7259 }
7260
7261 while tasks.len() + completed.len() < parallelism {
7262 if let Some((index, slot, item)) =
7263 pop_ready_partition_slot(&mut slots, &mut ready_slots, per_partition)
7264 {
7265 match poll_once_or_pending(stage(item)) {
7266 Ok(result) => {
7267 let mut remove_empty = false;
7268 if let Some(state) = slots.get_mut(slot) {
7269 state.active -= 1;
7270 remove_empty = state.active == 0
7271 && state.queued.is_empty()
7272 && !state.in_ready_queue;
7273 }
7274 if remove_empty {
7275 retire_partition_slot(
7276 slot,
7277 &mut slots_by_key,
7278 &mut slots,
7279 &mut free_slots,
7280 );
7281 } else {
7282 ready_partition_slot(
7283 &mut slots,
7284 &mut ready_slots,
7285 slot,
7286 per_partition,
7287 );
7288 }
7289 if index == next_to_emit {
7290 next_to_emit += 1;
7291 return Some(result);
7292 }
7293 completed.insert(index, result);
7294 }
7295 Err(future) => {
7296 tasks.insert(
7297 index,
7298 spawn_completion_task(
7299 index,
7300 future,
7301 sender.clone(),
7302 move |result| (slot, result),
7303 ),
7304 );
7305 }
7306 }
7307 continue;
7308 }
7309
7310 if input_done {
7311 break;
7312 }
7313
7314 match input.next() {
7315 Some(Ok(item)) => {
7316 let key = partition(&item);
7317 let index = next_index;
7318 next_index += 1;
7319 let slot =
7320 partition_slot_for(key, &mut slots_by_key, &mut slots, &mut free_slots);
7321 let state = &mut slots[slot];
7322 if state.active < per_partition {
7323 match poll_once_or_pending(stage(item)) {
7324 Ok(result) => {
7325 if index == next_to_emit {
7326 next_to_emit += 1;
7327 if state.queued.is_empty() && !state.in_ready_queue {
7328 retire_partition_slot(
7329 slot,
7330 &mut slots_by_key,
7331 &mut slots,
7332 &mut free_slots,
7333 );
7334 }
7335 return Some(result);
7336 }
7337 completed.insert(index, result);
7338 if state.queued.is_empty() && !state.in_ready_queue {
7339 retire_partition_slot(
7340 slot,
7341 &mut slots_by_key,
7342 &mut slots,
7343 &mut free_slots,
7344 );
7345 }
7346 }
7347 Err(future) => {
7348 state.active += 1;
7349 tasks.insert(
7350 index,
7351 spawn_completion_task(
7352 index,
7353 future,
7354 sender.clone(),
7355 move |result| (slot, result),
7356 ),
7357 );
7358 }
7359 }
7360 } else {
7361 state.queued.push_back((index, item));
7362 }
7363 }
7364 Some(Err(error)) => {
7365 completed.insert(next_index, Err(error));
7366 next_index += 1;
7367 input_done = true;
7368 break;
7369 }
7370 None => {
7371 input_done = true;
7372 break;
7373 }
7374 }
7375 }
7376
7377 if let Some(result) = completed.remove(&next_to_emit) {
7378 next_to_emit += 1;
7379 return Some(result);
7380 }
7381
7382 if tasks.is_empty() {
7383 return None;
7384 }
7385
7386 if let Some((index, (slot, result))) = recv_completion(&receiver) {
7387 tasks.remove(&index);
7388 let mut remove_empty = false;
7389 if let Some(state) = slots.get_mut(slot) {
7390 state.active -= 1;
7391 remove_empty =
7392 state.active == 0 && state.queued.is_empty() && !state.in_ready_queue;
7393 }
7394 if remove_empty {
7395 retire_partition_slot(slot, &mut slots_by_key, &mut slots, &mut free_slots);
7396 } else {
7397 ready_partition_slot(&mut slots, &mut ready_slots, slot, per_partition);
7398 }
7399 if index == next_to_emit {
7400 next_to_emit += 1;
7401 return Some(result);
7402 }
7403 completed.insert(index, result);
7404 }
7405 }
7406 }))
7407}
7408
7409#[cfg(test)]
7410mod flat_map_merge_ready_ring_tests {
7411 use super::*;
7412 use std::sync::mpsc;
7413 use std::time::Duration;
7414
7415 fn run_sorted<T: Ord + Send + 'static>(source: crate::Source<T>) -> Vec<T> {
7416 let mut v = source.run_collect().unwrap();
7417 v.sort_unstable();
7418 v
7419 }
7420
7421 #[test]
7422 fn ready_ring_empty_upstream() {
7423 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7424 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7425 });
7426 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7427 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7428 });
7429 assert_eq!(legacy, ring);
7430 assert!(ring.is_empty());
7431 }
7432
7433 #[test]
7434 fn ready_ring_single_lane() {
7435 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7436 run_sorted(crate::Source::single(42_i32).flat_map_merge(4, crate::Source::single))
7437 });
7438 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7439 run_sorted(crate::Source::single(42_i32).flat_map_merge(4, crate::Source::single))
7440 });
7441 assert_eq!(legacy, ring);
7442 assert_eq!(ring, vec![42]);
7443 }
7444
7445 #[test]
7446 fn ready_ring_breadth_one_exact_order() {
7447 let make = || {
7449 crate::Source::from_iter(0_i32..5)
7450 .flat_map_merge(1, |x| crate::Source::single(x * 10))
7451 .run_collect()
7452 .unwrap()
7453 };
7454 let mut legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7455 let mut ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7456 legacy.sort_unstable();
7457 ring.sort_unstable();
7458 assert_eq!(legacy, ring);
7459 }
7460
7461 #[test]
7462 fn ready_ring_breadth_gt_input() {
7463 let make = || {
7464 run_sorted(
7465 crate::Source::from_iter(0_i32..3)
7466 .flat_map_merge(100, |x| crate::Source::from_iter([x, x + 1, x + 2])),
7467 )
7468 };
7469 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7470 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7471 assert_eq!(legacy, ring);
7472 assert_eq!(ring.len(), 9);
7473 }
7474
7475 #[test]
7476 fn ready_ring_mixed_short_long() {
7477 let make = || {
7478 run_sorted(crate::Source::from_iter(0_i32..8).flat_map_merge(4, |x| {
7479 if x % 3 == 0 {
7480 crate::Source::from_iter(0..20_i32).map(move |i| x * 100 + i)
7481 } else {
7482 crate::Source::single(x)
7483 }
7484 }))
7485 };
7486 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7487 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7488 assert_eq!(legacy, ring);
7489 }
7490
7491 #[test]
7492 fn ready_ring_respects_breadth_bound() {
7493 use std::sync::atomic::{AtomicUsize, Ordering as Ord};
7494 let active = Arc::new(AtomicUsize::new(0));
7495 let max_active = Arc::new(AtomicUsize::new(0));
7496 let a2 = Arc::clone(&active);
7497 let m2 = Arc::clone(&max_active);
7498 let mut values = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7499 crate::Source::from_iter(0_i32..6)
7500 .flat_map_merge(2, move |item| {
7501 let a = Arc::clone(&a2);
7502 let m = Arc::clone(&m2);
7503 crate::Source::future(move || {
7504 let a = Arc::clone(&a);
7505 let m = Arc::clone(&m);
7506 async move {
7507 let now = a.fetch_add(1, Ord::SeqCst) + 1;
7508 let mut seen = m.load(Ord::SeqCst);
7509 while now > seen {
7510 match m.compare_exchange(seen, now, Ord::SeqCst, Ord::SeqCst) {
7511 Ok(_) => break,
7512 Err(v) => seen = v,
7513 }
7514 }
7515 thread::sleep(Duration::from_millis(20));
7516 a.fetch_sub(1, Ord::SeqCst);
7517 Ok(item)
7518 }
7519 })
7520 })
7521 .run_collect()
7522 .unwrap()
7523 });
7524 values.sort_unstable();
7525 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
7526 assert!(max_active.load(std::sync::atomic::Ordering::SeqCst) <= 2);
7527 }
7528
7529 #[test]
7530 fn ready_ring_fairness_slow_lane_not_starved() {
7531 use std::sync::atomic::{AtomicBool, Ordering as Ord};
7532 let slow_emitted = Arc::new(AtomicBool::new(false));
7533 let slow_flag = Arc::clone(&slow_emitted);
7534 let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7535 crate::Source::from_iter(0_i32..3)
7536 .flat_map_merge(4, move |lane_id| {
7537 let flag = Arc::clone(&slow_flag);
7538 match lane_id {
7539 0 => crate::Source::from_iter(0_i32..50),
7540 1 => crate::Source::from_iter(100_i32..150),
7541 _ => crate::Source::future(move || {
7542 let flag = Arc::clone(&flag);
7543 async move {
7544 thread::sleep(Duration::from_millis(10));
7545 flag.store(true, Ord::SeqCst);
7546 Ok(999_i32)
7547 }
7548 }),
7549 }
7550 })
7551 .run_collect()
7552 .unwrap()
7553 });
7554 assert!(slow_emitted.load(std::sync::atomic::Ordering::SeqCst));
7555 assert!(results.contains(&999));
7556 assert_eq!(results.len(), 101);
7557 }
7558
7559 #[test]
7560 fn ready_ring_inner_failure_no_hang() {
7561 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7562 crate::Source::from_iter(0_i32..8)
7563 .flat_map_merge(4, |x| {
7564 if x == 3 {
7565 crate::Source::failed(StreamError::Failed("lane-fail".into()))
7566 } else {
7567 crate::Source::from_iter(0_i32..10)
7568 }
7569 })
7570 .run_collect()
7571 });
7572 assert_eq!(result, Err(StreamError::Failed("lane-fail".into())));
7573 }
7574
7575 #[test]
7576 fn ready_ring_factory_failure_propagates() {
7577 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7578 crate::Source::from_iter(0_i32..4)
7579 .flat_map_merge(2, |x| {
7580 if x == 2 {
7581 crate::Source::failed(StreamError::Failed("factory-fail".into()))
7582 } else {
7583 crate::Source::single(x)
7584 }
7585 })
7586 .run_collect()
7587 });
7588 assert!(result.is_err());
7589 }
7590
7591 #[test]
7592 fn ready_ring_closure_not_under_coordinator_lock() {
7593 let guard_mutex = Arc::new(std::sync::Mutex::<()>::new(()));
7594 let gm = Arc::clone(&guard_mutex);
7595 let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7596 crate::Source::from_iter(0_i32..10)
7597 .flat_map_merge(4, move |x| {
7598 let _lock = gm.lock().unwrap();
7599 crate::Source::single(x)
7600 })
7601 .run_collect()
7602 .unwrap()
7603 });
7604 assert_eq!(results.len(), 10);
7605 }
7606
7607 #[test]
7614 fn ready_ring_bounded_memory_producer_blocks_at_window() {
7615 const WINDOW: usize = FLAT_MAP_MERGE_SUBSTREAM_WINDOW;
7616 const EXTRA: usize = 4;
7617 let (gate_tx, gate_rx) = mpsc::channel::<()>();
7620 let gate_tx = Arc::new(std::sync::Mutex::new(gate_tx));
7621 let gate_tx2 = Arc::clone(&gate_tx);
7622 let produced = Arc::new(std::sync::atomic::AtomicUsize::new(0));
7623 let prod2 = Arc::clone(&produced);
7624
7625 let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7626 crate::Source::single(0_i32)
7627 .flat_map_merge(2, move |_| {
7628 let tx = Arc::clone(&gate_tx2);
7629 let prod = Arc::clone(&prod2);
7630 crate::Source::from_factory(move || {
7631 let tx = Arc::clone(&tx);
7632 let prod = Arc::clone(&prod);
7633 let mut i = 0_i32;
7634 Box::new(std::iter::from_fn(move || {
7635 if i as usize >= WINDOW + EXTRA {
7636 return None;
7637 }
7638 if i as usize == WINDOW {
7639 let _ = tx.lock().unwrap().send(());
7641 }
7642 prod.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
7643 i += 1;
7644 Some(Ok(i))
7645 }))
7646 })
7647 })
7648 .run_with(crate::Sink::queue())
7649 .unwrap()
7650 });
7651
7652 let mut total = 0;
7654 while queue.pull().unwrap().is_some() {
7655 total += 1;
7656 }
7657 let signal = gate_rx.recv_timeout(Duration::from_secs(1));
7659 assert!(signal.is_ok(), "producer never reached the window boundary");
7660 assert_eq!(total, WINDOW + EXTRA);
7661 }
7662
7663 #[test]
7664 fn ready_ring_cancellation_wakes_blocked_lanes() {
7665 let rt = crate::stream::runtime::Runtime::new();
7666 let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7667 crate::Source::from_iter(0_i32..4)
7668 .flat_map_merge(4, |_| crate::Source::repeat(1_i32))
7669 .run_with_materializer(crate::Sink::queue(), &rt)
7670 .unwrap()
7671 });
7672 for _ in 0..8 {
7673 let _ = queue.pull();
7674 }
7675 drop(queue);
7676 rt.shutdown();
7677 }
7678
7679 #[test]
7680 fn ready_ring_lost_wakeup_stress() {
7681 for _ in 0..20 {
7682 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7683 crate::Source::from_iter(0_i32..50)
7684 .flat_map_merge(8, |item| {
7685 crate::Source::from_iter([item, item + 1, item + 2])
7686 })
7687 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v as i64))
7688 .unwrap()
7689 .wait()
7690 });
7691 assert_eq!(result, Ok(3825), "lost-wakeup stress: wrong sum");
7692 }
7693 }
7694
7695 #[test]
7696 fn ready_ring_concurrent_streams_lost_wakeup_stress() {
7697 const STREAMS: usize = 32;
7698 const ROUNDS: usize = 8;
7699 const EXPECTED: i64 = 998_080;
7700
7701 for _ in 0..ROUNDS {
7702 let barrier = Arc::new(std::sync::Barrier::new(STREAMS));
7703 let mut handles = Vec::with_capacity(STREAMS);
7704 for _ in 0..STREAMS {
7705 let barrier = Arc::clone(&barrier);
7706 handles.push(thread::spawn(move || {
7707 barrier.wait();
7708 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7709 crate::Source::from_iter(0_i64..32)
7710 .flat_map_merge(8, |item| {
7711 crate::Source::from_iter(item * 100..item * 100 + 20)
7712 })
7713 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
7714 .unwrap()
7715 .wait()
7716 });
7717 assert_eq!(result, Ok(EXPECTED), "concurrent ready-ring sum");
7718 }));
7719 }
7720
7721 for handle in handles {
7722 handle.join().expect("ready-ring stress worker panicked");
7723 }
7724 }
7725 }
7726
7727 #[test]
7728 fn ready_ring_tail_loop_stress() {
7729 for _ in 0..20 {
7730 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7731 crate::Source::from_iter(0_i64..100)
7732 .flat_map_merge(16, |item| crate::Source::from_iter([item, item + 1000]))
7733 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
7734 .unwrap()
7735 .wait()
7736 });
7737 assert_eq!(result, Ok(109_900), "tail-loop stress: wrong sum");
7738 }
7739 }
7740
7741 #[test]
7742 fn ready_ring_auto_mode_matches_ring() {
7743 let make = || {
7744 run_sorted(
7745 crate::Source::from_iter(0_i32..10)
7746 .flat_map_merge(4, |x| crate::Source::from_iter([x, x + 100])),
7747 )
7748 };
7749 let auto = with_substream_mode(SubstreamExecutorMode::Auto, make);
7750 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7751 assert_eq!(auto, ring);
7752 }
7753}
7754
7755#[cfg(test)]
7757mod inline_micro_source_tests {
7758 use super::*;
7759 use crate::stream::source::test_source_with_inline_micro_hint;
7760 use std::sync::mpsc;
7761
7762 fn run_sorted<T: Ord + Send + 'static>(source: crate::Source<T>) -> Vec<T> {
7763 let mut v = source.run_collect().unwrap();
7764 v.sort_unstable();
7765 v
7766 }
7767
7768 #[test]
7771 fn inline_empty_upstream() {
7772 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7773 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7774 });
7775 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7776 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7777 });
7778 assert_eq!(legacy, ring);
7779 assert!(ring.is_empty());
7780 }
7781
7782 #[test]
7783 fn inline_single_inner_source() {
7784 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7785 run_sorted(
7786 crate::Source::single(99_i32).flat_map_merge(4, |x| crate::Source::single(x * 2)),
7787 )
7788 });
7789 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7790 run_sorted(
7791 crate::Source::single(99_i32).flat_map_merge(4, |x| crate::Source::single(x * 2)),
7792 )
7793 });
7794 assert_eq!(legacy, ring);
7795 assert_eq!(ring, vec![198]);
7796 }
7797
7798 #[test]
7799 fn inline_breadth_one_exact_order() {
7800 let make = || {
7802 crate::Source::from_iter(0_i32..6)
7803 .flat_map_merge(1, |x| {
7804 crate::Source::from_iter([x * 10, x * 10 + 1, x * 10 + 2, x * 10 + 3])
7805 })
7806 .run_collect()
7807 .unwrap()
7808 };
7809 let mut legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7810 let mut ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7811 legacy.sort_unstable();
7812 ring.sort_unstable();
7813 assert_eq!(legacy, ring);
7814 }
7815
7816 #[test]
7817 fn inline_breadth_gt_input() {
7818 let make = || {
7819 run_sorted(
7820 crate::Source::from_iter(0_i32..3)
7821 .flat_map_merge(100, |x| crate::Source::from_iter([x, x + 1, x + 2, x + 3])),
7822 )
7823 };
7824 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7825 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7826 assert_eq!(legacy, ring);
7827 assert_eq!(ring.len(), 12);
7828 }
7829
7830 #[test]
7831 fn inline_mixed_empty_single_four_item() {
7832 let make = || {
7833 run_sorted(
7834 crate::Source::from_iter(0_i32..12).flat_map_merge(4, |x| match x % 3 {
7835 0 => crate::Source::empty(),
7836 1 => crate::Source::single(x),
7837 _ => crate::Source::from_iter([x, x + 100, x + 200, x + 300]),
7838 }),
7839 )
7840 };
7841 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7842 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7843 assert_eq!(legacy, ring);
7844 }
7845
7846 #[test]
7847 fn inline_2k_x4_b8_benchmark_shape() {
7848 let make = || {
7849 run_sorted(
7850 crate::Source::from_iter(0_i32..2_000).flat_map_merge(8, |item| {
7851 crate::Source::from_iter([item, item + 1, item + 2, item + 3])
7852 }),
7853 )
7854 };
7855 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7856 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7857 assert_eq!(legacy, ring);
7858 assert_eq!(ring.len(), 8_000);
7859 }
7860
7861 #[test]
7864 fn inline_lost_wakeup_stress() {
7865 for _ in 0..20 {
7866 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7867 crate::Source::from_iter(0_i32..50)
7868 .flat_map_merge(8, |item| {
7869 crate::Source::from_iter([item, item + 1, item + 2, item + 3])
7870 })
7871 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v as i64))
7872 .unwrap()
7873 .wait()
7874 });
7875 assert_eq!(result, Ok(5200), "lost-wakeup stress: wrong sum");
7879 }
7880 }
7881
7882 #[test]
7883 fn inline_tail_loop_stress() {
7884 for _ in 0..20 {
7885 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7886 crate::Source::from_iter(0_i64..100)
7887 .flat_map_merge(16, |item| crate::Source::from_iter([item, item + 1000]))
7888 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
7889 .unwrap()
7890 .wait()
7891 });
7892 assert_eq!(result, Ok(109_900), "tail-loop stress: wrong sum");
7894 }
7895 }
7896
7897 #[test]
7900 fn inline_large_source_uses_worker_fallback() {
7901 let make = || {
7904 run_sorted(crate::Source::from_iter(0_i32..4).flat_map_merge(2, |x| {
7905 crate::Source::from_iter(0_i32..20).map(move |i| x * 100 + i)
7906 }))
7907 };
7908 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7909 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7910 assert_eq!(legacy, ring);
7911 assert_eq!(ring.len(), 80);
7912 }
7913
7914 #[test]
7917 fn inline_inner_error_before_any_item() {
7918 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7919 crate::Source::from_iter(0_i32..4)
7920 .flat_map_merge(2, |x| {
7921 if x == 1 {
7922 crate::Source::failed(StreamError::Failed("inline-err".into()))
7923 } else {
7924 crate::Source::from_iter([x, x + 1])
7925 }
7926 })
7927 .run_collect()
7928 });
7929 assert_eq!(result, Err(StreamError::Failed("inline-err".into())));
7930 }
7931
7932 #[test]
7933 fn inline_inner_error_after_some_items() {
7934 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7938 crate::Source::single(0_i32)
7939 .flat_map_merge(2, |_x| {
7940 test_source_with_inline_micro_hint(
7941 || {
7942 let mut count = 0;
7943 Box::new(std::iter::from_fn(move || {
7944 count += 1;
7945 match count {
7946 1 => Some(Ok(42_i32)),
7947 2 => Some(Err(StreamError::Failed("after-items".into()))),
7948 _ => None,
7949 }
7950 }))
7951 },
7952 1, )
7954 })
7955 .run_collect()
7956 });
7957 assert!(result.is_err());
7961 }
7962
7963 #[test]
7964 fn inline_worker_lane_fails_during_inline_drain() {
7965 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7968 crate::Source::from_iter(0_i32..10)
7969 .flat_map_merge(2, |x| {
7970 if x == 5 {
7971 crate::Source::failed(StreamError::Failed("worker-fail".into()))
7972 } else if x % 2 == 0 {
7973 crate::Source::from_iter([x, x + 1, x + 2])
7975 } else {
7976 crate::Source::from_iter(0_i32..40).map(move |i| x * 100 + i)
7978 }
7979 })
7980 .run_collect()
7981 });
7982 assert!(result.is_err());
7983 }
7984
7985 #[test]
7988 fn inline_cancellation_drop_output() {
7989 let rt = crate::stream::runtime::Runtime::new();
7992 let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7993 crate::Source::from_iter(0_i32..100)
7994 .flat_map_merge(8, |x| {
7995 if x % 3 == 0 {
7996 crate::Source::from_iter([x, x + 1, x + 2, x + 3]) } else {
7998 crate::Source::repeat(x) }
8000 })
8001 .run_with_materializer(crate::Sink::queue(), &rt)
8002 .unwrap()
8003 });
8004 for _ in 0..16 {
8006 let _ = queue.pull();
8007 }
8008 drop(queue);
8009 rt.shutdown();
8010 }
8011
8012 #[test]
8015 fn inline_next_not_under_coordinator_lock() {
8016 let (tx, rx) = mpsc::channel::<()>();
8024 let tx = Arc::new(std::sync::Mutex::new(tx));
8025 let tx2 = Arc::clone(&tx);
8026
8027 let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
8028 crate::Source::from_iter(0_i32..2)
8029 .flat_map_merge(2, move |x| {
8030 let sig = Arc::clone(&tx2);
8031 if x == 0 {
8032 crate::Source::from_iter(0_i32..40)
8034 } else {
8035 test_source_with_inline_micro_hint(
8038 move || {
8039 let sig = Arc::clone(&sig);
8040 let mut emitted = false;
8041 Box::new(std::iter::from_fn(move || {
8042 if !emitted {
8043 emitted = true;
8044 let _ = sig.lock().unwrap().send(());
8045 Some(Ok(999_i32))
8046 } else {
8047 None
8048 }
8049 }))
8050 },
8051 1,
8052 )
8053 }
8054 })
8055 .run_collect()
8056 .unwrap()
8057 });
8058
8059 let signal = rx.recv_timeout(std::time::Duration::from_secs(5));
8061 assert!(signal.is_ok(), "inline next() never ran");
8062 assert!(results.contains(&999));
8063 assert_eq!(results.len(), 41);
8064 }
8065
8066 #[test]
8069 fn inline_auto_matches_readyring() {
8070 let make = || {
8071 run_sorted(
8072 crate::Source::from_iter(0_i32..20)
8073 .flat_map_merge(4, |x| crate::Source::from_iter([x, x + 1, x + 2, x + 3])),
8074 )
8075 };
8076 let auto = with_substream_mode(SubstreamExecutorMode::Auto, make);
8077 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
8078 assert_eq!(auto, ring);
8079 }
8080}
8081
8082#[cfg(test)]
8084mod split_sink_fast_path_tests {
8085 use super::*;
8086
8087 fn run_split_fold(
8089 items: Vec<u64>,
8090 split_mode: SplitMode,
8091 executor_mode: SubstreamExecutorMode,
8092 ) -> Vec<u64> {
8093 with_substream_mode(executor_mode, || {
8094 let source = match split_mode {
8095 SplitMode::When => crate::Source::from_iter(items).split_when(|x| x % 10 == 0),
8096 SplitMode::After => crate::Source::from_iter(items).split_after(|x| x % 10 == 0),
8097 };
8098 source
8099 .run_with(crate::Sink::fold(
8100 Vec::new(),
8101 |mut acc, seg: crate::Source<u64>| {
8102 let sum = seg
8103 .run_with(crate::Sink::fold(0u64, |a, x| a + x))
8104 .unwrap()
8105 .wait()
8106 .unwrap();
8107 acc.push(sum);
8108 acc
8109 },
8110 ))
8111 .unwrap()
8112 .wait()
8113 .unwrap()
8114 })
8115 }
8116
8117 fn run_split_collect_segments(
8119 items: Vec<u64>,
8120 split_mode: SplitMode,
8121 executor_mode: SubstreamExecutorMode,
8122 ) -> Vec<Vec<u64>> {
8123 with_substream_mode(executor_mode, || {
8124 let source = match split_mode {
8125 SplitMode::When => crate::Source::from_iter(items).split_when(move |x| x % 10 == 0),
8126 SplitMode::After => {
8127 crate::Source::from_iter(items).split_after(move |x| x % 10 == 0)
8128 }
8129 };
8130 source
8131 .run_with(crate::Sink::fold(
8132 Vec::new(),
8133 |mut acc, seg: crate::Source<u64>| {
8134 let v = seg
8135 .run_with(crate::Sink::collect())
8136 .unwrap()
8137 .wait()
8138 .unwrap();
8139 acc.push(v);
8140 acc
8141 },
8142 ))
8143 .unwrap()
8144 .wait()
8145 .unwrap()
8146 })
8147 }
8148
8149 #[test]
8152 fn split_fast_equivalence_empty_input() {
8153 for sm in [SplitMode::When, SplitMode::After] {
8154 let legacy = run_split_collect_segments(vec![], sm, SubstreamExecutorMode::LegacyOnly);
8155 let fast = run_split_collect_segments(vec![], sm, SubstreamExecutorMode::SplitSinkOnly);
8156 assert_eq!(legacy, fast, "empty input, mode {sm:?}");
8157 }
8158 }
8159
8160 #[test]
8161 fn split_fast_equivalence_no_boundaries() {
8162 let items: Vec<u64> = (1..=9).collect();
8163 for sm in [SplitMode::When, SplitMode::After] {
8164 let legacy =
8165 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8166 let fast =
8167 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8168 assert_eq!(legacy, fast, "no boundaries, mode {sm:?}");
8169 }
8170 }
8171
8172 #[test]
8173 fn split_fast_equivalence_first_element_boundary_when() {
8174 let items: Vec<u64> = vec![10, 1, 2, 3];
8176 let legacy = run_split_collect_segments(
8177 items.clone(),
8178 SplitMode::When,
8179 SubstreamExecutorMode::LegacyOnly,
8180 );
8181 let fast = run_split_collect_segments(
8182 items,
8183 SplitMode::When,
8184 SubstreamExecutorMode::SplitSinkOnly,
8185 );
8186 assert_eq!(legacy, fast);
8187 }
8188
8189 #[test]
8190 fn split_fast_equivalence_first_element_boundary_after() {
8191 let items: Vec<u64> = vec![10, 1, 2, 3];
8193 let legacy = run_split_collect_segments(
8194 items.clone(),
8195 SplitMode::After,
8196 SubstreamExecutorMode::LegacyOnly,
8197 );
8198 let fast = run_split_collect_segments(
8199 items,
8200 SplitMode::After,
8201 SubstreamExecutorMode::SplitSinkOnly,
8202 );
8203 assert_eq!(legacy, fast);
8204 }
8205
8206 #[test]
8207 fn split_fast_equivalence_consecutive_matches() {
8208 let items: Vec<u64> = vec![10, 20, 30, 1, 2, 40];
8209 for sm in [SplitMode::When, SplitMode::After] {
8210 let legacy =
8211 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8212 let fast =
8213 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8214 assert_eq!(legacy, fast, "consecutive matches, mode {sm:?}");
8215 }
8216 }
8217
8218 #[test]
8219 fn split_fast_equivalence_last_element_boundary() {
8220 let items: Vec<u64> = vec![1, 2, 3, 10];
8221 for sm in [SplitMode::When, SplitMode::After] {
8222 let legacy =
8223 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8224 let fast =
8225 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8226 assert_eq!(legacy, fast, "last element boundary, mode {sm:?}");
8227 }
8228 }
8229
8230 #[test]
8231 fn split_fast_equivalence_mixed() {
8232 let items: Vec<u64> = (0..50u64).collect();
8233 for sm in [SplitMode::When, SplitMode::After] {
8234 let legacy =
8235 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8236 let fast =
8237 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8238 assert_eq!(legacy, fast, "mixed 0..50, mode {sm:?}");
8239 }
8240 }
8241
8242 #[test]
8243 fn split_fast_equivalence_fold_sums() {
8244 let items: Vec<u64> = (0..50u64).collect();
8245 for sm in [SplitMode::When, SplitMode::After] {
8246 let legacy = run_split_fold(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8247 let fast = run_split_fold(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8248 assert_eq!(legacy, fast, "fold sums, mode {sm:?}");
8249 }
8250 }
8251
8252 #[test]
8253 fn split_fast_equivalence_with_collect() {
8254 let items: Vec<u64> = (0..312u64).collect();
8256 for sm in [SplitMode::When, SplitMode::After] {
8257 let legacy =
8258 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8259 let fast =
8260 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8261 assert_eq!(legacy, fast, "collect 312 items, mode {sm:?}");
8262 }
8263 }
8264
8265 #[test]
8268 fn split_fast_fold_result_equivalence() {
8269 let items: Vec<u64> = (0..50u64).collect();
8270 let run = |executor_mode| {
8271 with_substream_mode(executor_mode, || {
8272 crate::Source::from_iter(items.clone())
8273 .split_when(move |x| x % 10 == 0)
8274 .run_with(crate::Sink::fold(
8275 Vec::new(),
8276 |mut acc, seg: crate::Source<u64>| {
8277 let sum = seg
8278 .run_with(crate::Sink::fold_result(0u64, |a, x| Ok(a + x)))
8279 .unwrap()
8280 .wait()
8281 .unwrap();
8282 acc.push(sum);
8283 acc
8284 },
8285 ))
8286 .unwrap()
8287 .wait()
8288 .unwrap()
8289 })
8290 };
8291 assert_eq!(
8292 run(SubstreamExecutorMode::LegacyOnly),
8293 run(SubstreamExecutorMode::SplitSinkOnly)
8294 );
8295 }
8296
8297 #[test]
8300 fn split_fast_ignore_equivalence() {
8301 let items: Vec<u64> = (0..50u64).collect();
8302 let run = |executor_mode| {
8303 with_substream_mode(executor_mode, || {
8304 crate::Source::from_iter(items.clone())
8305 .split_when(move |x| x % 10 == 0)
8306 .run_with(crate::Sink::fold(0u64, |count, seg: crate::Source<u64>| {
8307 seg.run_with(crate::Sink::ignore()).unwrap().wait().unwrap();
8308 count + 1
8309 }))
8310 .unwrap()
8311 .wait()
8312 .unwrap()
8313 })
8314 };
8315 let legacy = run(SubstreamExecutorMode::LegacyOnly);
8316 let fast = run(SubstreamExecutorMode::SplitSinkOnly);
8317 assert_eq!(legacy, fast, "ignore segment counts must match");
8318 }
8319
8320 #[test]
8323 fn split_fast_one_shot_cannot_materialize_twice() {
8324 with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8325 let materializer = crate::Runtime::default();
8326 let result = crate::Source::from_iter(1u64..=5)
8327 .split_when(|x| x % 3 == 0)
8328 .run_with(crate::Sink::fold(0u64, |_, seg: crate::Source<u64>| {
8329 let c1 = seg.clone().run_with(crate::Sink::fold(0u64, |a, x| a + x));
8331 let c2 = seg.run_with(crate::Sink::fold(0u64, |a, x| a + x));
8333 assert!(c1.is_ok(), "first materialization should succeed");
8334 assert!(c2.is_err(), "second materialization should fail: {c2:?}");
8335 let _ = c1.unwrap().wait();
8336 0u64
8337 }));
8338 let _ = result;
8339 let _ = &materializer;
8340 });
8341 }
8342
8343 #[test]
8346 fn split_fast_predicate_panic_both_modes() {
8347 for sm in [SplitMode::When, SplitMode::After] {
8350 let result = with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8351 let source = match sm {
8352 SplitMode::When => crate::Source::from_iter(0u64..10).split_when(|x| {
8353 if *x == 5 {
8354 panic!("test panic");
8355 }
8356 x % 3 == 0
8357 }),
8358 SplitMode::After => crate::Source::from_iter(0u64..10).split_after(|x| {
8359 if *x == 5 {
8360 panic!("test panic");
8361 }
8362 x % 3 == 0
8363 }),
8364 };
8365 source
8366 .run_with(crate::Sink::fold(
8367 Vec::<u64>::new(),
8368 |mut acc, seg: crate::Source<u64>| {
8369 let completion = seg.run_with(crate::Sink::ignore());
8371 if let Ok(c) = completion {
8372 let _ = c.wait();
8373 }
8374 acc.push(0u64);
8375 acc
8376 },
8377 ))
8378 .map(|c| c.wait())
8379 });
8380 let _ = result;
8382 }
8383 }
8384
8385 #[test]
8388 fn split_fast_stress_20x() {
8389 for i in 0..20 {
8390 let items: Vec<u64> = (0..10_000u64).collect();
8391 for sm in [SplitMode::When, SplitMode::After] {
8392 let fast = run_split_collect_segments(
8393 items.clone(),
8394 sm,
8395 SubstreamExecutorMode::SplitSinkOnly,
8396 );
8397 let legacy = run_split_collect_segments(
8398 items.clone(),
8399 sm,
8400 SubstreamExecutorMode::LegacyOnly,
8401 );
8402 assert_eq!(
8403 fast.len(),
8404 legacy.len(),
8405 "stress run {i} segment count mismatch, mode {sm:?}"
8406 );
8407 assert_eq!(
8408 fast.iter().flatten().sum::<u64>(),
8409 legacy.iter().flatten().sum::<u64>(),
8410 "stress run {i} sum mismatch, mode {sm:?}"
8411 );
8412 }
8413 }
8414 }
8415
8416 #[test]
8419 fn split_fast_auto_mode_matches_fast() {
8420 let items: Vec<u64> = (0..50u64).collect();
8421 for sm in [SplitMode::When, SplitMode::After] {
8422 let auto = run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::Auto);
8423 let fast =
8424 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8425 assert_eq!(auto, fast, "auto == fast, mode {sm:?}");
8426 }
8427 }
8428
8429 #[test]
8432 fn split_fast_fallback_path_via_foreach() {
8433 use std::sync::atomic::{AtomicU64, Ordering as Ord};
8436 let total = Arc::new(AtomicU64::new(0));
8437 let total2 = Arc::clone(&total);
8438 let result = with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8439 crate::Source::from_iter(0u64..30)
8440 .split_when(|x| x % 10 == 0)
8441 .run_with(crate::Sink::fold(
8442 Vec::new(),
8443 move |mut acc, seg: crate::Source<u64>| {
8444 let t = Arc::clone(&total2);
8445 seg.run_with(crate::Sink::foreach(move |x| {
8448 t.fetch_add(x, Ord::SeqCst);
8449 }))
8450 .unwrap()
8451 .wait()
8452 .unwrap();
8453 acc.push(1u64);
8454 acc
8455 },
8456 ))
8457 .unwrap()
8458 .wait()
8459 .unwrap()
8460 });
8461 assert_eq!(result.len(), 3, "should have 3 segments");
8462 assert_eq!(
8464 total.load(std::sync::atomic::Ordering::SeqCst),
8465 (0..30u64).sum::<u64>()
8466 );
8467 }
8468
8469 #[test]
8472 fn split_fast_liveness_segment_count_when() {
8473 let items: Vec<u64> = (0..30u64).collect();
8474 let fast = run_split_collect_segments(
8475 items.clone(),
8476 SplitMode::When,
8477 SubstreamExecutorMode::SplitSinkOnly,
8478 );
8479 let legacy =
8480 run_split_collect_segments(items, SplitMode::When, SubstreamExecutorMode::LegacyOnly);
8481 assert_eq!(fast.len(), legacy.len());
8482 }
8483
8484 #[test]
8485 fn split_fast_liveness_segment_count_after() {
8486 let items: Vec<u64> = (0..30u64).collect();
8487 let fast = run_split_collect_segments(
8488 items.clone(),
8489 SplitMode::After,
8490 SubstreamExecutorMode::SplitSinkOnly,
8491 );
8492 let legacy =
8493 run_split_collect_segments(items, SplitMode::After, SubstreamExecutorMode::LegacyOnly);
8494 assert_eq!(fast.len(), legacy.len());
8495 }
8496
8497 #[test]
8504 fn split_fast_bounded_memory_rendezvous() {
8505 use std::sync::{
8506 atomic::{AtomicBool, AtomicUsize, Ordering},
8507 mpsc,
8508 };
8509 use std::time::{Duration, Instant};
8510
8511 const CAPACITY: usize = LIVE_SUBSTREAM_CAPACITY;
8512 const BATCH: usize = LIVE_SUBSTREAM_BATCH;
8513 const TOTAL: usize = CAPACITY * 2;
8514 const MAX_IN_FLIGHT: usize = CAPACITY + BATCH;
8517
8518 let produced = Arc::new(AtomicUsize::new(0));
8519 let consumed = Arc::new(AtomicUsize::new(0));
8520 let bound_violated = Arc::new(AtomicBool::new(false));
8521
8522 let (item_tx, item_rx) = mpsc::channel::<u64>();
8524
8525 let prod_for_factory = Arc::clone(&produced);
8526 let prod_for_fold = Arc::clone(&produced);
8527 let cons_for_fold = Arc::clone(&consumed);
8528 let bv_for_fold = Arc::clone(&bound_violated);
8529
8530 let join = std::thread::spawn(move || {
8531 with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8532 crate::Source::from_factory(move || {
8533 let prod = Arc::clone(&prod_for_factory);
8534 let mut i = 0u64;
8535 Box::new(std::iter::from_fn(move || {
8536 if i as usize >= TOTAL {
8537 return None;
8538 }
8539 prod.fetch_add(1, Ordering::SeqCst);
8540 let val = i;
8541 i += 1;
8542 Some(Ok(val))
8543 }))
8544 })
8545 .split_when(|_| false)
8547 .run_with(crate::Sink::fold(
8548 0usize,
8549 move |count, seg: crate::Source<u64>| {
8550 let cons = Arc::clone(&cons_for_fold);
8551 let bv = Arc::clone(&bv_for_fold);
8552 let prod = Arc::clone(&prod_for_fold);
8553 let itx = item_tx.clone();
8555 seg.run_with(crate::Sink::foreach(move |x: u64| {
8557 let c = cons.fetch_add(1, Ordering::SeqCst) + 1;
8558 let p = prod.load(Ordering::SeqCst);
8559 if p > c + MAX_IN_FLIGHT {
8560 bv.store(true, Ordering::SeqCst);
8561 }
8562 let _ = itx.send(x);
8563 }))
8564 .unwrap()
8565 .wait()
8566 .unwrap();
8567 count + 1
8568 },
8569 ))
8570 .unwrap()
8571 .wait()
8572 .unwrap()
8573 })
8574 });
8575
8576 let mut received = Vec::with_capacity(TOTAL);
8581 let timeout = Duration::from_secs(60);
8582 let deadline = Instant::now() + timeout;
8583 for i in 0..TOTAL {
8584 let remaining = deadline.saturating_duration_since(Instant::now());
8585 if remaining == Duration::ZERO {
8586 panic!(
8587 "deadlock: received {} of {TOTAL} items within {timeout:?}",
8588 received.len()
8589 );
8590 }
8591 match item_rx.recv_timeout(remaining) {
8592 Ok(item) => received.push(item),
8593 Err(mpsc::RecvTimeoutError::Timeout) => {
8594 panic!(
8595 "deadlock: no item {i} before {timeout:?} rendezvous deadline; received {} of {TOTAL}",
8596 received.len()
8597 )
8598 }
8599 Err(mpsc::RecvTimeoutError::Disconnected) => {
8600 panic!("stream ended early at item {i}")
8601 }
8602 }
8603 }
8604
8605 let seg_count = join.join().expect("stream thread panicked");
8606
8607 assert!(
8609 !bound_violated.load(Ordering::SeqCst),
8610 "bound violated: producer ran >MAX_IN_FLIGHT={MAX_IN_FLIGHT} ahead of consumer"
8611 );
8612
8613 assert_eq!(seg_count, 1, "expected exactly 1 segment");
8615 assert_eq!(received.len(), TOTAL, "not all items received");
8616 let expected: Vec<u64> = (0..TOTAL as u64).collect();
8617 assert_eq!(received, expected, "items not in correct order");
8618 }
8619}