1use super::*;
2use crate::Attributes;
3use crate::context::FlowWithContext;
4use crate::stream::error::{decide_supervision, panic_stream_error};
5use futures::{FutureExt, task::noop_waker};
6use std::any::Any;
7use std::task::Context;
8use std::{
9 collections::{HashMap, HashSet},
10 thread,
11};
12
13pub(super) enum FlowTransform<In, Out> {
14 Pure(PureTransform<In, Out>),
15 Runtime(RuntimeTransform<In, Out>),
16}
17
18pub struct Flow<In, Out, Mat = NotUsed> {
19 pub(super) transform: FlowTransform<In, Out>,
20 pub(super) materialize: Arc<dyn Fn() -> StreamResult<Mat> + Send + Sync>,
21 pub(super) hints: FlowHints,
22 pub(super) attributes: Attributes,
23}
24
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub(super) enum GroupByBatchMode {
27 Immediate,
28 FiniteEagerNoRecreate,
29}
30
31#[derive(Clone)]
32pub struct BidiFlow<I1, O1, I2, O2> {
33 top: Flow<I1, O1, NotUsed>,
34 bottom: Flow<I2, O2, NotUsed>,
35 attributes: Attributes,
36}
37
38impl<In, Out> Clone for FlowTransform<In, Out> {
39 fn clone(&self) -> Self {
40 match self {
41 Self::Pure(transform) => Self::Pure(Arc::clone(transform)),
42 Self::Runtime(transform) => Self::Runtime(Arc::clone(transform)),
43 }
44 }
45}
46
47impl<In, Out, Mat> Clone for Flow<In, Out, Mat> {
48 fn clone(&self) -> Self {
49 Self {
50 transform: self.transform.clone(),
51 materialize: Arc::clone(&self.materialize),
52 hints: self.hints,
53 attributes: self.attributes.clone(),
54 }
55 }
56}
57
58fn call_supervised<T, F>(context: &str, f: F) -> StreamResult<T>
59where
60 F: FnOnce() -> StreamResult<T>,
61{
62 catch_unwind(AssertUnwindSafe(f)).unwrap_or_else(|_| Err(panic_stream_error(context)))
63}
64
65impl<T: Send + 'static> Flow<T, T, NotUsed> {
66 #[must_use]
67 pub fn identity() -> Self {
68 Self::from_preserving_transform(|input| input)
69 }
70}
71
72impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out, NotUsed> {
73 pub(crate) fn from_transform<F>(transform: F) -> Self
74 where
75 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
76 {
77 Self::from_parts_with_hints(transform, || Ok(NotUsed), FlowHints::default())
78 }
79
80 pub(crate) fn from_preserving_transform<F>(transform: F) -> Self
81 where
82 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
83 {
84 Self::from_parts_with_hints(
85 transform,
86 || Ok(NotUsed),
87 FlowHints::PRESERVES_INLINE_HEAD_TERMINAL,
88 )
89 }
90
91 pub(crate) fn from_runtime_transform<F>(transform: F) -> Self
92 where
93 F: Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync + 'static,
94 {
95 Self::from_runtime_transform_with_hints(transform, FlowHints::default())
96 }
97
98 fn from_runtime_transform_with_hints<F>(transform: F, hints: FlowHints) -> Self
99 where
100 F: Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync + 'static,
101 {
102 Self {
103 transform: FlowTransform::Runtime(Arc::new(transform)),
104 materialize: Arc::new(|| Ok(NotUsed)),
105 hints,
106 attributes: Attributes::default(),
107 }
108 }
109
110 #[must_use]
111 pub fn from_sink_and_source<InMat, OutMat>(
112 sink: Sink<In, InMat>,
113 source: Source<Out, OutMat>,
114 ) -> Self
115 where
116 InMat: Send + 'static,
117 OutMat: Send + 'static,
118 {
119 Self::from_runtime_transform(move |input, materializer| {
120 let sink_keepalive = Arc::new(MaterializedKeepalive::default());
121 let sink_input = Box::new(InputKeepaliveStream {
122 inner: input,
123 keepalive: Arc::clone(&sink_keepalive),
124 peer_keepalive: None,
125 });
126 let sink_mat = sink.clone().run(sink_input, materializer)?;
127 sink_keepalive.store(Box::new(sink_mat));
128 let (output, source_mat) = Arc::clone(&source.factory).create(materializer)?;
129 let source_keepalive = Arc::new(MaterializedKeepalive::default());
130 source_keepalive.store(Box::new(source_mat));
131 Ok(Box::new(CoupledStream {
132 inner: output,
133 source_keepalive,
134 sink_keepalive: None,
135 coupled: false,
136 }))
137 })
138 }
139
140 #[must_use]
141 pub fn from_sink_and_source_coupled<InMat, OutMat>(
142 sink: Sink<In, InMat>,
143 source: Source<Out, OutMat>,
144 ) -> Self
145 where
146 InMat: Send + 'static,
147 OutMat: Send + 'static,
148 {
149 Self::from_runtime_transform(move |input, materializer| {
150 let source_keepalive = Arc::new(MaterializedKeepalive::default());
151 let sink_keepalive = Arc::new(MaterializedKeepalive::default());
152 let sink_input = Box::new(InputKeepaliveStream {
153 inner: input,
154 keepalive: Arc::clone(&sink_keepalive),
155 peer_keepalive: Some(Arc::clone(&source_keepalive)),
156 });
157 let sink_mat = sink.clone().run(sink_input, materializer)?;
158 sink_keepalive.store(Box::new(sink_mat));
159 let (output, source_mat) = Arc::clone(&source.factory).create(materializer)?;
160 source_keepalive.store(Box::new(source_mat));
161 Ok(Box::new(CoupledStream {
162 inner: output,
163 source_keepalive,
164 sink_keepalive: Some(sink_keepalive),
165 coupled: true,
166 }))
167 })
168 }
169 #[must_use]
170 pub fn future_flow<InnerMat, F, Fut>(future: F) -> Flow<In, Out, StreamCompletion<InnerMat>>
171 where
172 InnerMat: Send + 'static,
173 F: Fn() -> Fut + Send + Sync + 'static,
174 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
175 {
176 let future = Arc::new(future);
177 Flow::from_runtime_materialized_factory(move || {
178 let (sender, receiver) = oneshot::channel();
179 let sender = Arc::new(Mutex::new(Some(sender)));
180 let future = Arc::clone(&future);
181 let transform: RuntimeTransform<In, Out> =
182 Arc::new(move |input, materializer: &Materializer| {
183 let mat_sender = sender
184 .lock()
185 .expect("future_flow materialized sender poisoned")
186 .take()
187 .ok_or_else(|| {
188 StreamError::Failed("future_flow transform already materialized".into())
189 })?;
190 Ok(Box::new(FutureFlowStream {
191 future: Arc::clone(&future),
192 materializer: materializer
193 .with_name_prefix(materializer.name_prefix().to_owned()),
194 input: Some(input),
195 current: None,
196 mat_sender: Some(mat_sender),
197 initialized: false,
198 terminated: false,
199 _marker: PhantomData,
200 }) as BoxStream<Out>)
201 });
202 (transform, StreamCompletion::from_receiver(receiver, None))
203 })
204 }
205
206 #[must_use]
207 pub fn lazy_flow<InnerMat, F>(create: F) -> Flow<In, Out, StreamCompletion<InnerMat>>
208 where
209 InnerMat: Send + 'static,
210 F: Fn() -> Flow<In, Out, InnerMat> + Send + Sync + 'static,
211 {
212 let create = Arc::new(create);
213 Self::lazy_future_flow(move || {
214 let create = Arc::clone(&create);
215 async move { catch_unwind_failed("lazy_flow factory", || create()) }
216 })
217 }
218
219 #[must_use]
220 pub fn lazy_future_flow<InnerMat, F, Fut>(
221 create: F,
222 ) -> Flow<In, Out, StreamCompletion<InnerMat>>
223 where
224 InnerMat: Send + 'static,
225 F: Fn() -> Fut + Send + Sync + 'static,
226 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
227 {
228 let create = Arc::new(create);
229 Flow::from_runtime_materialized_factory(move || {
230 let (sender, receiver) = oneshot::channel();
231 let sender = Arc::new(Mutex::new(Some(sender)));
232 let create = Arc::clone(&create);
233 let transform: RuntimeTransform<In, Out> =
234 Arc::new(move |input, materializer: &Materializer| {
235 let mat_sender = sender
236 .lock()
237 .expect("lazy_future_flow materialized sender poisoned")
238 .take()
239 .ok_or_else(|| {
240 StreamError::Failed(
241 "lazy_future_flow transform already materialized".into(),
242 )
243 })?;
244 Ok(Box::new(LazyFutureFlowStream {
245 create: Arc::clone(&create),
246 materializer: materializer
247 .with_name_prefix(materializer.name_prefix().to_owned()),
248 input: Some(input),
249 current: None,
250 mat_sender: Some(mat_sender),
251 initialized: false,
252 terminated: false,
253 _marker: PhantomData,
254 }) as BoxStream<Out>)
255 });
256 (transform, StreamCompletion::from_receiver(receiver, None))
257 })
258 }
259}
260
261#[derive(Default)]
262struct MaterializedKeepalive {
263 released: AtomicBool,
264 value: Mutex<Option<Box<dyn Any + Send>>>,
265}
266
267impl MaterializedKeepalive {
268 fn store(&self, value: Box<dyn Any + Send>) {
269 let mut slot = self.value.lock().expect("materialized keepalive poisoned");
270 if !self.released.load(Ordering::SeqCst) {
271 *slot = Some(value);
272 return;
273 }
274 drop(slot);
279 release_materialized_value(value);
280 }
281
282 fn release(&self) {
283 self.released.store(true, Ordering::SeqCst);
284 if let Some(value) = self
285 .value
286 .lock()
287 .expect("materialized keepalive poisoned")
288 .take()
289 {
290 release_materialized_value(value);
291 }
292 }
293
294 fn is_released(&self) -> bool {
295 self.released.load(Ordering::SeqCst)
296 }
297}
298
299fn release_materialized_value(value: Box<dyn Any + Send>) {
300 match value.downcast::<Cancellable>() {
301 Ok(cancellable) => {
302 cancellable.cancel();
303 }
304 Err(value) => drop(value),
305 }
306}
307
308struct InputKeepaliveStream<In> {
309 inner: BoxStream<In>,
310 keepalive: Arc<MaterializedKeepalive>,
311 peer_keepalive: Option<Arc<MaterializedKeepalive>>,
312}
313
314impl<In> Iterator for InputKeepaliveStream<In> {
315 type Item = StreamResult<In>;
316
317 fn next(&mut self) -> Option<Self::Item> {
318 self.inner.next()
319 }
320}
321
322impl<In> Drop for InputKeepaliveStream<In> {
323 fn drop(&mut self) {
324 self.keepalive.release();
325 if let Some(peer_keepalive) = &self.peer_keepalive {
326 peer_keepalive.release();
327 }
328 }
329}
330
331struct CoupledStream<Out> {
332 inner: BoxStream<Out>,
333 source_keepalive: Arc<MaterializedKeepalive>,
334 sink_keepalive: Option<Arc<MaterializedKeepalive>>,
335 coupled: bool,
336}
337
338impl<Out> Iterator for CoupledStream<Out> {
339 type Item = StreamResult<Out>;
340
341 fn next(&mut self) -> Option<Self::Item> {
342 if self.coupled && self.source_keepalive.is_released() {
343 return None;
344 }
345 let next = self.inner.next();
346 if next.is_none() || next.as_ref().is_some_and(|item| item.is_err()) {
347 self.source_keepalive.release();
348 if self.coupled
349 && let Some(sink_keepalive) = &self.sink_keepalive
350 {
351 sink_keepalive.release();
352 }
353 }
354 next
355 }
356}
357
358impl<Out> Drop for CoupledStream<Out> {
359 fn drop(&mut self) {
360 self.source_keepalive.release();
361 if self.coupled
362 && let Some(sink_keepalive) = &self.sink_keepalive
363 {
364 sink_keepalive.release();
365 }
366 }
367}
368
369impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
370 pub fn as_flow_with_context<U, CtxIn, CtxOut, Collapse, Extract>(
371 self,
372 collapse_context: Collapse,
373 extract_context: Extract,
374 ) -> FlowWithContext<U, CtxIn, Out, CtxOut, Mat>
375 where
376 U: Send + 'static,
377 CtxIn: Send + 'static,
378 CtxOut: Send + 'static,
379 Collapse: Fn(U, CtxIn) -> In + Send + Sync + 'static,
380 Extract: Fn(&Out) -> CtxOut + Send + Sync + 'static,
381 {
382 FlowWithContext::from_flow(
383 Flow::identity()
384 .map(move |(value, context)| collapse_context(value, context))
385 .via_mat(self, |_, flow_mat| flow_mat)
386 .map(move |value| {
387 let context = extract_context(&value);
388 (value, context)
389 }),
390 )
391 }
392
393 pub(crate) fn from_parts<F, M>(transform: F, materialize: M) -> Self
394 where
395 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
396 M: Fn() -> StreamResult<Mat> + Send + Sync + 'static,
397 {
398 Self::from_parts_with_hints(transform, materialize, FlowHints::default())
399 }
400
401 pub(crate) fn from_materialized_factory<F>(factory: F) -> Self
402 where
403 F: Fn() -> (PureTransform<In, Out>, Mat) + Send + Sync + 'static,
404 {
405 struct PendingSlot<In, Out, Mat> {
406 transform: Option<PureTransform<In, Out>>,
407 mat: Option<Mat>,
408 transform_taken: bool,
409 mat_taken: bool,
410 }
411
412 let pending = Arc::new(Mutex::new(HashMap::<
413 thread::ThreadId,
414 Vec<PendingSlot<In, Out, Mat>>,
415 >::new()));
416 let factory = Arc::new(factory);
417
418 let transform = {
419 let pending = Arc::clone(&pending);
420 let factory = Arc::clone(&factory);
421 move |input| {
422 let transform = {
423 let mut pending = pending.lock().expect("flow materialized factory poisoned");
424 let thread_id = thread::current().id();
425 let slots = pending.entry(thread_id).or_default();
426 let index = slots
427 .iter()
428 .position(|slot| !slot.transform_taken && slot.mat_taken)
429 .unwrap_or_else(|| {
430 let (transform, mat) = factory();
431 slots.push(PendingSlot {
432 transform: Some(transform),
433 mat: Some(mat),
434 transform_taken: false,
435 mat_taken: false,
436 });
437 slots.len() - 1
438 });
439 let slot = slots
440 .get_mut(index)
441 .expect("pending flow materialization slot exists");
442 slot.transform_taken = true;
443 let transform = slot
444 .transform
445 .take()
446 .expect("pending flow transform present");
447 if slot.transform_taken && slot.mat_taken {
448 slots.remove(index);
449 }
450 if slots.is_empty() {
451 pending.remove(&thread_id);
452 }
453 transform
454 };
455 transform(input)
456 }
457 };
458
459 let materialize = {
460 let pending = Arc::clone(&pending);
461 let factory = Arc::clone(&factory);
462 move || {
463 let mat = {
464 let mut pending = pending.lock().expect("flow materialized factory poisoned");
465 let thread_id = thread::current().id();
466 let slots = pending.entry(thread_id).or_default();
467 let index = slots
468 .iter()
469 .position(|slot| !slot.mat_taken && slot.transform_taken)
470 .unwrap_or_else(|| {
471 let (transform, mat) = factory();
472 slots.push(PendingSlot {
473 transform: Some(transform),
474 mat: Some(mat),
475 transform_taken: false,
476 mat_taken: false,
477 });
478 slots.len() - 1
479 });
480 let slot = slots
481 .get_mut(index)
482 .expect("pending flow materialization slot exists");
483 slot.mat_taken = true;
484 let mat = slot
485 .mat
486 .take()
487 .expect("pending flow materialized value present");
488 if slot.transform_taken && slot.mat_taken {
489 slots.remove(index);
490 }
491 if slots.is_empty() {
492 pending.remove(&thread_id);
493 }
494 mat
495 };
496 Ok(mat)
497 }
498 };
499
500 Self::from_parts_with_hints(transform, materialize, FlowHints::default())
501 }
502
503 pub(crate) fn from_runtime_materialized_factory<F>(factory: F) -> Self
504 where
505 F: Fn() -> (RuntimeTransform<In, Out>, Mat) + Send + Sync + 'static,
506 {
507 struct PendingSlot<In, Out, Mat> {
508 transform: Option<RuntimeTransform<In, Out>>,
509 mat: Option<Mat>,
510 transform_taken: bool,
511 mat_taken: bool,
512 }
513
514 let pending = Arc::new(Mutex::new(HashMap::<
515 thread::ThreadId,
516 Vec<PendingSlot<In, Out, Mat>>,
517 >::new()));
518 let factory = Arc::new(factory);
519
520 let transform = {
521 let pending = Arc::clone(&pending);
522 let factory = Arc::clone(&factory);
523 move |input, materializer: &Materializer| {
524 let thread_id = thread::current().id();
525 let transform = {
526 let mut pending = pending.lock().expect("flow materialized factory poisoned");
527 let slots = pending.entry(thread_id).or_default();
528 if let Some(index) = slots
529 .iter()
530 .position(|slot| !slot.transform_taken && slot.mat_taken)
531 {
532 let slot = slots
533 .get_mut(index)
534 .expect("pending flow materialization slot exists");
535 slot.transform_taken = true;
536 let transform = slot
537 .transform
538 .take()
539 .expect("pending flow transform present");
540 if slot.transform_taken && slot.mat_taken {
541 slots.remove(index);
542 }
543 if slots.is_empty() {
544 pending.remove(&thread_id);
545 }
546 Some(transform)
547 } else {
548 None
549 }
550 };
551
552 let transform = match transform {
553 Some(transform) => transform,
554 None => {
555 let (transform, mat) = factory();
556 let mut pending =
557 pending.lock().expect("flow materialized factory poisoned");
558 let slots = pending.entry(thread_id).or_default();
559 slots.push(PendingSlot {
560 transform: None,
561 mat: Some(mat),
562 transform_taken: true,
563 mat_taken: false,
564 });
565 transform
566 }
567 };
568
569 transform(input, materializer)
570 }
571 };
572
573 let materialize = {
574 let pending = Arc::clone(&pending);
575 let factory = Arc::clone(&factory);
576 move || {
577 let thread_id = thread::current().id();
578 let mat = {
579 let mut pending = pending.lock().expect("flow materialized factory poisoned");
580 let slots = pending.entry(thread_id).or_default();
581 if let Some(index) = slots
582 .iter()
583 .position(|slot| !slot.mat_taken && slot.transform_taken)
584 {
585 let slot = slots
586 .get_mut(index)
587 .expect("pending flow materialization slot exists");
588 slot.mat_taken = true;
589 let mat = slot
590 .mat
591 .take()
592 .expect("pending flow materialized value present");
593 if slot.transform_taken && slot.mat_taken {
594 slots.remove(index);
595 }
596 if slots.is_empty() {
597 pending.remove(&thread_id);
598 }
599 Some(mat)
600 } else {
601 None
602 }
603 };
604
605 match mat {
606 Some(mat) => Ok(mat),
607 None => {
608 let (transform, mat) = factory();
609 let mut pending =
610 pending.lock().expect("flow materialized factory poisoned");
611 let slots = pending.entry(thread_id).or_default();
612 slots.push(PendingSlot {
613 transform: Some(transform),
614 mat: None,
615 transform_taken: false,
616 mat_taken: true,
617 });
618 Ok(mat)
619 }
620 }
621 }
622 };
623
624 Flow {
625 transform: FlowTransform::Runtime(Arc::new(transform)),
626 materialize: Arc::new(materialize),
627 hints: FlowHints::default(),
628 attributes: Attributes::default(),
629 }
630 }
631
632 fn from_parts_with_hints<F, M>(transform: F, materialize: M, hints: FlowHints) -> Self
633 where
634 F: Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync + 'static,
635 M: Fn() -> StreamResult<Mat> + Send + Sync + 'static,
636 {
637 Self {
638 transform: FlowTransform::Pure(Arc::new(transform)),
639 materialize: Arc::new(materialize),
640 hints,
641 attributes: Attributes::default(),
642 }
643 }
644
645 #[must_use]
646 pub fn attributes(&self) -> &Attributes {
647 &self.attributes
648 }
649
650 #[must_use]
651 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
652 self.attributes = attributes;
653 self
654 }
655
656 #[must_use]
657 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
658 self.attributes = self.attributes.and(attributes);
659 self
660 }
661
662 #[must_use]
663 pub fn named(self, name: impl Into<String>) -> Self {
664 self.add_attributes(Attributes::named(name))
665 }
666
667 #[must_use]
674 pub fn async_boundary(self) -> Self {
675 self.async_boundary_with_config(crate::graph::AsyncBoundaryExecutionConfig::default())
676 }
677
678 #[must_use]
680 pub fn r#async(self) -> Self {
681 self.async_boundary()
682 }
683
684 #[must_use]
690 pub fn async_boundary_with_config(
691 self,
692 config: crate::graph::AsyncBoundaryExecutionConfig,
693 ) -> Self {
694 self.via(Flow::from_runtime_transform(move |input, materializer| {
695 super::async_boundary::linear_async_boundary_stream(input, materializer, config)
696 }))
697 }
698
699 #[must_use]
701 pub fn async_boundary_with_buffer(self, buffer_size: usize) -> Self {
702 self.async_boundary_with_config(crate::graph::AsyncBoundaryExecutionConfig {
703 buffer_size,
704 ..crate::graph::AsyncBoundaryExecutionConfig::default()
705 })
706 }
707
708 #[must_use]
709 pub fn via<Next, NextMat>(self, next: Flow<Out, Next, NextMat>) -> Flow<In, Next, Mat>
710 where
711 Next: Send + 'static,
712 NextMat: Send + 'static,
713 {
714 self.via_mat(next, Keep::left)
715 }
716
717 #[must_use]
718 pub fn via_mat<Next, NextMat, Combined, F>(
719 self,
720 next: Flow<Out, Next, NextMat>,
721 combine: F,
722 ) -> Flow<In, Next, Combined>
723 where
724 Next: Send + 'static,
725 NextMat: Send + 'static,
726 Combined: Send + 'static,
727 F: Fn(Mat, NextMat) -> Combined + Send + Sync + 'static,
728 {
729 let Flow {
730 transform: first,
731 materialize: materialize_first,
732 hints: first_hints,
733 attributes: first_attributes,
734 } = self;
735 let Flow {
736 transform: second,
737 materialize: materialize_second,
738 hints: second_hints,
739 attributes: second_attributes,
740 } = next;
741 let combine = Arc::new(combine);
742 match (first, second) {
743 (FlowTransform::Pure(first), FlowTransform::Pure(second)) => {
744 let hints = first_hints.then(second_hints);
745 Flow::from_parts_with_hints(
746 move |input| second(first(input)),
747 move || {
748 let left = materialize_first()?;
749 let right = materialize_second()?;
750 Ok(combine(left, right))
751 },
752 hints,
753 )
754 .with_attributes(first_attributes.and(second_attributes))
755 }
756 (first, second) => {
757 let hints = first_hints.then(second_hints);
758 Flow {
759 transform: FlowTransform::Runtime(Arc::new(move |input, materializer| {
760 let stream = match &first {
761 FlowTransform::Pure(first) => first(input),
762 FlowTransform::Runtime(first) => first(input, materializer)?,
763 };
764 match &second {
765 FlowTransform::Pure(second) => Ok(second(stream)),
766 FlowTransform::Runtime(second) => second(stream, materializer),
767 }
768 })),
769 materialize: Arc::new(move || {
770 let left = materialize_first()?;
771 let right = materialize_second()?;
772 Ok(combine(left, right))
773 }),
774 hints,
775 attributes: first_attributes.and(second_attributes),
776 }
777 }
778 }
779 }
780
781 #[must_use]
782 pub fn via_mat_with<Next, NextMat, Combined, F>(
783 self,
784 next: Flow<Out, Next, NextMat>,
785 combine: F,
786 ) -> Flow<In, Next, Combined>
787 where
788 Next: Send + 'static,
789 NextMat: Send + 'static,
790 Combined: Send + 'static,
791 F: Fn(Mat, NextMat) -> Combined + Send + Sync + 'static,
792 {
793 self.via_mat(next, combine)
794 }
795
796 #[must_use]
797 pub fn map<Next, F>(self, f: F) -> Flow<In, Next, Mat>
798 where
799 Next: Send + 'static,
800 F: Fn(Out) -> Next + Send + Sync + 'static,
801 {
802 let stage = Arc::new(f);
803 match &self.transform {
804 FlowTransform::Pure(_) => {
805 let Flow {
806 transform,
807 materialize,
808 hints,
809 attributes,
810 } = self;
811 let FlowTransform::Pure(transform) = transform else {
812 unreachable!("pure transform checked above");
813 };
814 Flow::from_parts_with_hints(
815 move |input| {
816 let stage = Arc::clone(&stage);
817 Box::new(transform(input).map(move |item| item.map(|item| stage(item))))
818 },
819 move || materialize(),
820 hints,
821 )
822 .with_attributes(attributes)
823 }
824 FlowTransform::Runtime(_) => self.via(Flow::from_transform(move |input| {
825 let stage = Arc::clone(&stage);
826 Box::new(input.map(move |item| item.map(|item| stage(item))))
827 })),
828 }
829 }
830
831 #[must_use]
834 pub fn map_result<Next, F>(self, f: F) -> Flow<In, Next, Mat>
835 where
836 Next: Send + 'static,
837 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
838 {
839 let stage = Arc::new(f);
840 self.via(Flow::from_transform(move |input| {
841 let stage = Arc::clone(&stage);
842 Box::new(input.map(move |item| item.and_then(|item| stage(item))))
843 }))
844 }
845
846 #[must_use]
851 pub fn map_result_with_supervision<Next, F>(
852 self,
853 f: F,
854 decider: SupervisionDecider,
855 ) -> Flow<In, Next, Mat>
856 where
857 Next: Send + 'static,
858 F: Fn(Out) -> StreamResult<Next> + Send + Sync + 'static,
859 {
860 let stage = Arc::new(f);
861 self.via(Flow::from_transform(move |input| {
862 let stage = Arc::clone(&stage);
863 let decider = Arc::clone(&decider);
864 Box::new(input.filter_map(move |item| match item {
865 Ok(item) => match call_supervised("map_result callback", || stage(item)) {
866 Ok(next) => Some(Ok(next)),
867 Err(error) => match decide_supervision(&decider, &error) {
868 SupervisionDirective::Stop => Some(Err(error)),
869 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
870 },
871 },
872 Err(error) => Some(Err(error)),
873 }))
874 }))
875 }
876
877 #[must_use]
878 pub fn filter<F>(self, predicate: F) -> Flow<In, Out, Mat>
879 where
880 F: Fn(&Out) -> bool + Send + Sync + 'static,
881 {
882 let predicate = Arc::new(predicate);
883 self.via(Flow::from_preserving_transform(move |input| {
884 let predicate = Arc::clone(&predicate);
885 Box::new(input.filter_map(move |item| match item {
886 Ok(item) if predicate(&item) => Some(Ok(item)),
887 Ok(_) => None,
888 Err(error) => Some(Err(error)),
889 }))
890 }))
891 }
892
893 #[must_use]
894 pub fn filter_result<F>(self, predicate: F) -> Flow<In, Out, Mat>
895 where
896 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
897 {
898 let predicate = Arc::new(predicate);
899 self.via(Flow::from_transform(move |input| {
900 let predicate = Arc::clone(&predicate);
901 Box::new(input.filter_map(move |item| match item {
902 Ok(item) => match predicate(&item) {
903 Ok(true) => Some(Ok(item)),
904 Ok(false) => None,
905 Err(error) => Some(Err(error)),
906 },
907 Err(error) => Some(Err(error)),
908 }))
909 }))
910 }
911
912 #[must_use]
913 pub fn filter_result_with_supervision<F>(
914 self,
915 predicate: F,
916 decider: SupervisionDecider,
917 ) -> Flow<In, Out, Mat>
918 where
919 F: Fn(&Out) -> StreamResult<bool> + Send + Sync + 'static,
920 {
921 let predicate = Arc::new(predicate);
922 self.via(Flow::from_transform(move |input| {
923 let predicate = Arc::clone(&predicate);
924 let decider = Arc::clone(&decider);
925 Box::new(input.filter_map(move |item| match item {
926 Ok(item) => match call_supervised("filter_result callback", || predicate(&item)) {
927 Ok(true) => Some(Ok(item)),
928 Ok(false) => None,
929 Err(error) => match decide_supervision(&decider, &error) {
930 SupervisionDirective::Stop => Some(Err(error)),
931 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
932 },
933 },
934 Err(error) => Some(Err(error)),
935 }))
936 }))
937 }
938
939 #[must_use]
940 pub fn filter_not<F>(self, predicate: F) -> Flow<In, Out, Mat>
941 where
942 F: Fn(&Out) -> bool + Send + Sync + 'static,
943 {
944 let predicate = Arc::new(predicate);
945 self.filter(move |item| !predicate(item))
946 }
947
948 #[must_use]
949 pub fn filter_map<Next, F>(self, f: F) -> Flow<In, Next, Mat>
950 where
951 Next: Send + 'static,
952 F: Fn(Out) -> Option<Next> + Send + Sync + 'static,
953 {
954 let stage = Arc::new(f);
955 self.via(Flow::from_transform(move |input| {
956 let stage = Arc::clone(&stage);
957 Box::new(input.filter_map(move |item| match item {
958 Ok(item) => stage(item).map(Ok),
959 Err(error) => Some(Err(error)),
960 }))
961 }))
962 }
963
964 #[must_use]
965 pub fn filter_map_result<Next, F>(self, f: F) -> Flow<In, Next, Mat>
966 where
967 Next: Send + 'static,
968 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
969 {
970 let stage = Arc::new(f);
971 self.via(Flow::from_transform(move |input| {
972 let stage = Arc::clone(&stage);
973 Box::new(input.filter_map(move |item| match item {
974 Ok(item) => match stage(item) {
975 Ok(Some(next)) => Some(Ok(next)),
976 Ok(None) => None,
977 Err(error) => Some(Err(error)),
978 },
979 Err(error) => Some(Err(error)),
980 }))
981 }))
982 }
983
984 #[must_use]
985 pub fn filter_map_result_with_supervision<Next, F>(
986 self,
987 f: F,
988 decider: SupervisionDecider,
989 ) -> Flow<In, Next, Mat>
990 where
991 Next: Send + 'static,
992 F: Fn(Out) -> StreamResult<Option<Next>> + Send + Sync + 'static,
993 {
994 let stage = Arc::new(f);
995 self.via(Flow::from_transform(move |input| {
996 let stage = Arc::clone(&stage);
997 let decider = Arc::clone(&decider);
998 Box::new(input.filter_map(move |item| match item {
999 Ok(item) => match call_supervised("filter_map_result callback", || stage(item)) {
1000 Ok(Some(next)) => Some(Ok(next)),
1001 Ok(None) => None,
1002 Err(error) => match decide_supervision(&decider, &error) {
1003 SupervisionDirective::Stop => Some(Err(error)),
1004 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
1005 },
1006 },
1007 Err(error) => Some(Err(error)),
1008 }))
1009 }))
1010 }
1011
1012 #[must_use]
1013 pub fn map_concat<Next, F, I>(self, f: F) -> Flow<In, Next, Mat>
1014 where
1015 Next: Send + 'static,
1016 F: Fn(Out) -> I + Send + Sync + 'static,
1017 I: IntoIterator<Item = Next>,
1018 I::IntoIter: Send + 'static,
1019 {
1020 let stage = Arc::new(f);
1021 self.via(Flow::from_transform(move |mut input| {
1022 let stage = Arc::clone(&stage);
1023 let mut current = None::<I::IntoIter>;
1024 let mut done = false;
1025 Box::new(std::iter::from_fn(move || {
1026 loop {
1027 if let Some(iter) = &mut current {
1028 if let Some(item) = iter.next() {
1029 return Some(Ok(item));
1030 }
1031 current = None;
1032 }
1033
1034 if done {
1035 return None;
1036 }
1037
1038 match input.next()? {
1039 Ok(item) => current = Some(stage(item).into_iter()),
1040 Err(error) => {
1041 done = true;
1042 return Some(Err(error));
1043 }
1044 }
1045 }
1046 }))
1047 }))
1048 }
1049
1050 #[must_use]
1051 pub fn map_concat_result<Next, F, I>(self, f: F) -> Flow<In, Next, Mat>
1052 where
1053 Next: Send + 'static,
1054 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
1055 I: IntoIterator<Item = Next>,
1056 I::IntoIter: Send + 'static,
1057 {
1058 let stage = Arc::new(f);
1059 self.via(Flow::from_transform(move |mut input| {
1060 let stage = Arc::clone(&stage);
1061 let mut current = None::<I::IntoIter>;
1062 let mut done = false;
1063 Box::new(std::iter::from_fn(move || {
1064 loop {
1065 if let Some(iter) = &mut current {
1066 if let Some(item) = iter.next() {
1067 return Some(Ok(item));
1068 }
1069 current = None;
1070 }
1071
1072 if done {
1073 return None;
1074 }
1075
1076 match input.next()? {
1077 Ok(item) => match stage(item) {
1078 Ok(items) => current = Some(items.into_iter()),
1079 Err(error) => {
1080 done = true;
1081 return Some(Err(error));
1082 }
1083 },
1084 Err(error) => {
1085 done = true;
1086 return Some(Err(error));
1087 }
1088 }
1089 }
1090 }))
1091 }))
1092 }
1093
1094 #[must_use]
1095 pub fn map_concat_result_with_supervision<Next, F, I>(
1096 self,
1097 f: F,
1098 decider: SupervisionDecider,
1099 ) -> Flow<In, Next, Mat>
1100 where
1101 Next: Send + 'static,
1102 F: Fn(Out) -> StreamResult<I> + Send + Sync + 'static,
1103 I: IntoIterator<Item = Next>,
1104 I::IntoIter: Send + 'static,
1105 {
1106 let stage = Arc::new(f);
1107 self.via(Flow::from_transform(move |mut input| {
1108 let stage = Arc::clone(&stage);
1109 let decider = Arc::clone(&decider);
1110 let mut current = None::<I::IntoIter>;
1111 let mut done = false;
1112 Box::new(std::iter::from_fn(move || {
1113 loop {
1114 if let Some(iter) = &mut current {
1115 if let Some(item) = iter.next() {
1116 return Some(Ok(item));
1117 }
1118 current = None;
1119 }
1120
1121 if done {
1122 return None;
1123 }
1124
1125 match input.next()? {
1126 Ok(item) => {
1127 match call_supervised("map_concat_result callback", || stage(item)) {
1128 Ok(items) => current = Some(items.into_iter()),
1129 Err(error) => match decide_supervision(&decider, &error) {
1130 SupervisionDirective::Stop => {
1131 done = true;
1132 return Some(Err(error));
1133 }
1134 SupervisionDirective::Resume
1135 | SupervisionDirective::Restart => {}
1136 },
1137 }
1138 }
1139 Err(error) => {
1140 done = true;
1141 return Some(Err(error));
1142 }
1143 }
1144 }
1145 }))
1146 }))
1147 }
1148
1149 #[must_use]
1150 pub fn stateful_map<State, Next, F>(self, seed: State, f: F) -> Flow<In, Next, Mat>
1151 where
1152 State: Clone + Send + Sync + 'static,
1153 Next: Send + 'static,
1154 F: Fn(&mut State, Out) -> Next + Send + Sync + 'static,
1155 {
1156 let stage = Arc::new(f);
1157 self.via(Flow::from_transform(move |input| {
1158 let stage = Arc::clone(&stage);
1159 let mut state = seed.clone();
1160 Box::new(input.map(move |item| item.map(|item| stage(&mut state, item))))
1161 }))
1162 }
1163
1164 #[must_use]
1165 pub fn stateful_map_result<State, Next, F>(self, seed: State, f: F) -> Flow<In, Next, Mat>
1166 where
1167 State: Clone + Send + Sync + 'static,
1168 Next: Send + 'static,
1169 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
1170 {
1171 let stage = Arc::new(f);
1172 self.via(Flow::from_transform(move |input| {
1173 let stage = Arc::clone(&stage);
1174 let mut state = seed.clone();
1175 Box::new(input.map(move |item| match item {
1176 Ok(item) => stage(&mut state, item),
1177 Err(error) => Err(error),
1178 }))
1179 }))
1180 }
1181
1182 #[must_use]
1183 pub fn stateful_map_result_with_supervision<State, Next, F>(
1184 self,
1185 seed: State,
1186 f: F,
1187 decider: SupervisionDecider,
1188 ) -> Flow<In, Next, Mat>
1189 where
1190 State: Clone + Send + Sync + 'static,
1191 Next: Send + 'static,
1192 F: Fn(&mut State, Out) -> StreamResult<Next> + Send + Sync + 'static,
1193 {
1194 let stage = Arc::new(f);
1195 self.via(Flow::from_transform(move |input| {
1196 let stage = Arc::clone(&stage);
1197 let decider = Arc::clone(&decider);
1198 let seed = seed.clone();
1199 let mut state = seed.clone();
1200 Box::new(input.filter_map(move |item| match item {
1201 Ok(item) => match call_supervised("stateful_map_result callback", || {
1202 stage(&mut state, item)
1203 }) {
1204 Ok(next) => Some(Ok(next)),
1205 Err(error) => match decide_supervision(&decider, &error) {
1206 SupervisionDirective::Stop => Some(Err(error)),
1207 SupervisionDirective::Resume => None,
1208 SupervisionDirective::Restart => {
1209 state = seed.clone();
1210 None
1211 }
1212 },
1213 },
1214 Err(error) => Some(Err(error)),
1215 }))
1216 }))
1217 }
1218
1219 #[must_use]
1220 pub fn stateful_map_concat<State, Next, F, I>(self, seed: State, f: F) -> Flow<In, Next, Mat>
1221 where
1222 State: Clone + Send + Sync + 'static,
1223 Next: Send + 'static,
1224 F: Fn(&mut State, Out) -> I + Send + Sync + 'static,
1225 I: IntoIterator<Item = Next>,
1226 I::IntoIter: Send + 'static,
1227 {
1228 let stage = Arc::new(f);
1229 self.via(Flow::from_transform(move |mut input| {
1230 let stage = Arc::clone(&stage);
1231 let mut state = seed.clone();
1232 let mut current = None::<I::IntoIter>;
1233 let mut done = false;
1234 Box::new(std::iter::from_fn(move || {
1235 loop {
1236 if let Some(iter) = &mut current {
1237 if let Some(item) = iter.next() {
1238 return Some(Ok(item));
1239 }
1240 current = None;
1241 }
1242
1243 if done {
1244 return None;
1245 }
1246
1247 match input.next()? {
1248 Ok(item) => current = Some(stage(&mut state, item).into_iter()),
1249 Err(error) => {
1250 done = true;
1251 return Some(Err(error));
1252 }
1253 }
1254 }
1255 }))
1256 }))
1257 }
1258
1259 #[must_use]
1260 pub fn stateful_map_concat_result<State, Next, F, I>(
1261 self,
1262 seed: State,
1263 f: F,
1264 ) -> Flow<In, Next, Mat>
1265 where
1266 State: Clone + Send + Sync + 'static,
1267 Next: Send + 'static,
1268 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1269 I: IntoIterator<Item = Next>,
1270 I::IntoIter: Send + 'static,
1271 {
1272 let stage = Arc::new(f);
1273 self.via(Flow::from_transform(move |mut input| {
1274 let stage = Arc::clone(&stage);
1275 let mut state = seed.clone();
1276 let mut current = None::<I::IntoIter>;
1277 let mut done = false;
1278 Box::new(std::iter::from_fn(move || {
1279 loop {
1280 if let Some(iter) = &mut current {
1281 if let Some(item) = iter.next() {
1282 return Some(Ok(item));
1283 }
1284 current = None;
1285 }
1286
1287 if done {
1288 return None;
1289 }
1290
1291 match input.next()? {
1292 Ok(item) => match stage(&mut state, item) {
1293 Ok(items) => current = Some(items.into_iter()),
1294 Err(error) => {
1295 done = true;
1296 return Some(Err(error));
1297 }
1298 },
1299 Err(error) => {
1300 done = true;
1301 return Some(Err(error));
1302 }
1303 }
1304 }
1305 }))
1306 }))
1307 }
1308
1309 #[must_use]
1310 pub fn stateful_map_concat_result_with_supervision<State, Next, F, I>(
1311 self,
1312 seed: State,
1313 f: F,
1314 decider: SupervisionDecider,
1315 ) -> Flow<In, Next, Mat>
1316 where
1317 State: Clone + Send + Sync + 'static,
1318 Next: Send + 'static,
1319 F: Fn(&mut State, Out) -> StreamResult<I> + Send + Sync + 'static,
1320 I: IntoIterator<Item = Next>,
1321 I::IntoIter: Send + 'static,
1322 {
1323 let stage = Arc::new(f);
1324 self.via(Flow::from_transform(move |mut input| {
1325 let stage = Arc::clone(&stage);
1326 let decider = Arc::clone(&decider);
1327 let seed = seed.clone();
1328 let mut state = seed.clone();
1329 let mut current = None::<I::IntoIter>;
1330 let mut done = false;
1331 Box::new(std::iter::from_fn(move || {
1332 loop {
1333 if let Some(iter) = &mut current {
1334 if let Some(item) = iter.next() {
1335 return Some(Ok(item));
1336 }
1337 current = None;
1338 }
1339
1340 if done {
1341 return None;
1342 }
1343
1344 match input.next()? {
1345 Ok(item) => {
1346 match call_supervised("stateful_map_concat_result callback", || {
1347 stage(&mut state, item)
1348 }) {
1349 Ok(items) => current = Some(items.into_iter()),
1350 Err(error) => match decide_supervision(&decider, &error) {
1351 SupervisionDirective::Stop => {
1352 done = true;
1353 return Some(Err(error));
1354 }
1355 SupervisionDirective::Resume => {}
1356 SupervisionDirective::Restart => state = seed.clone(),
1357 },
1358 }
1359 }
1360 Err(error) => {
1361 done = true;
1362 return Some(Err(error));
1363 }
1364 }
1365 }
1366 }))
1367 }))
1368 }
1369
1370 #[must_use]
1371 pub fn map_async<Next, F, Fut>(self, parallelism: usize, f: F) -> Flow<In, Next, Mat>
1375 where
1376 Next: Send + 'static,
1377 F: Fn(Out) -> Fut + Send + Sync + 'static,
1378 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1379 {
1380 assert!(
1381 parallelism > 0,
1382 "map_async parallelism must be greater than zero"
1383 );
1384 let stage = Arc::new(f);
1385 self.via(Flow::from_runtime_transform_with_hints(
1386 move |input, _materializer| {
1387 let stage = Arc::clone(&stage);
1388 Ok(map_async_ordered(input, parallelism, stage))
1389 },
1390 FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
1391 ))
1392 }
1393
1394 #[must_use]
1395 pub fn map_async_with_supervision<Next, F, Fut>(
1400 self,
1401 parallelism: usize,
1402 f: F,
1403 decider: SupervisionDecider,
1404 ) -> Flow<In, Next, Mat>
1405 where
1406 Next: Send + 'static,
1407 F: Fn(Out) -> Fut + Send + Sync + 'static,
1408 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1409 {
1410 assert!(
1411 parallelism > 0,
1412 "map_async parallelism must be greater than zero"
1413 );
1414 let stage = Arc::new(f);
1415 self.via(Flow::from_runtime_transform(move |input, _materializer| {
1416 let stage = Arc::clone(&stage);
1417 let decider = Arc::clone(&decider);
1418 Ok(map_async_ordered_supervised(
1419 input,
1420 parallelism,
1421 stage,
1422 decider,
1423 ))
1424 }))
1425 }
1426
1427 #[must_use]
1428 pub fn map_async_unordered<Next, F, Fut>(self, parallelism: usize, f: F) -> Flow<In, Next, Mat>
1432 where
1433 Next: Send + 'static,
1434 F: Fn(Out) -> Fut + Send + Sync + 'static,
1435 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1436 {
1437 assert!(
1438 parallelism > 0,
1439 "map_async_unordered parallelism must be greater than zero"
1440 );
1441 let stage = Arc::new(f);
1442 self.via(Flow::from_runtime_transform_with_hints(
1443 move |input, _materializer| {
1444 let stage = Arc::clone(&stage);
1445 Ok(map_async_unordered(input, parallelism, stage))
1446 },
1447 FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
1448 ))
1449 }
1450
1451 #[must_use]
1452 pub fn map_async_unordered_with_supervision<Next, F, Fut>(
1453 self,
1454 parallelism: usize,
1455 f: F,
1456 decider: SupervisionDecider,
1457 ) -> Flow<In, Next, Mat>
1458 where
1459 Next: Send + 'static,
1460 F: Fn(Out) -> Fut + Send + Sync + 'static,
1461 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1462 {
1463 assert!(
1464 parallelism > 0,
1465 "map_async_unordered parallelism must be greater than zero"
1466 );
1467 let stage = Arc::new(f);
1468 self.via(Flow::from_runtime_transform(move |input, _materializer| {
1469 let stage = Arc::clone(&stage);
1470 let decider = Arc::clone(&decider);
1471 Ok(map_async_unordered_supervised(
1472 input,
1473 parallelism,
1474 stage,
1475 decider,
1476 ))
1477 }))
1478 }
1479
1480 #[must_use]
1481 pub fn map_async_partitioned<Key, Next, Partition, F, Fut>(
1485 self,
1486 parallelism: usize,
1487 per_partition: usize,
1488 partition: Partition,
1489 f: F,
1490 ) -> Flow<In, Next, Mat>
1491 where
1492 Key: Clone + Eq + Hash + Send + 'static,
1493 Next: Send + 'static,
1494 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
1495 F: Fn(Out) -> Fut + Send + Sync + 'static,
1496 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
1497 {
1498 assert!(
1499 parallelism > 0,
1500 "map_async_partitioned parallelism must be greater than zero"
1501 );
1502 assert!(
1503 per_partition > 0,
1504 "map_async_partitioned per_partition must be greater than zero"
1505 );
1506 let partition = Arc::new(partition);
1507 let stage = Arc::new(f);
1508 self.via(Flow::from_runtime_transform_with_hints(
1509 move |input, _materializer| {
1510 let partition = Arc::clone(&partition);
1511 let stage = Arc::clone(&stage);
1512 Ok(map_async_partitioned(
1513 input,
1514 parallelism,
1515 per_partition,
1516 partition,
1517 stage,
1518 ))
1519 },
1520 FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
1521 ))
1522 }
1523
1524 #[must_use]
1525 pub fn take(self, n: usize) -> Flow<In, Out, Mat> {
1526 self.via(Flow::from_transform(move |input| Box::new(input.take(n))))
1527 }
1528
1529 #[must_use]
1530 pub fn drop(self, n: usize) -> Flow<In, Out, Mat> {
1531 self.via(Flow::from_transform(move |input| {
1532 let mut remaining = n;
1533 Box::new(input.filter_map(move |item| match item {
1534 Ok(_) if remaining > 0 => {
1535 remaining -= 1;
1536 None
1537 }
1538 other => Some(other),
1539 }))
1540 }))
1541 }
1542
1543 #[must_use]
1544 pub fn take_while<F>(self, predicate: F) -> Flow<In, Out, Mat>
1545 where
1546 F: Fn(&Out) -> bool + Send + Sync + 'static,
1547 {
1548 let predicate = Arc::new(predicate);
1549 self.via(Flow::from_transform(move |mut input| {
1550 let predicate = Arc::clone(&predicate);
1551 let mut open = true;
1552 Box::new(std::iter::from_fn(move || {
1553 if !open {
1554 return None;
1555 }
1556
1557 match input.next() {
1558 Some(Ok(item)) if predicate(&item) => Some(Ok(item)),
1559 Some(Ok(_)) | None => {
1560 open = false;
1561 None
1562 }
1563 Some(Err(error)) => Some(Err(error)),
1564 }
1565 }))
1566 }))
1567 }
1568
1569 #[must_use]
1570 pub fn drop_while<F>(self, predicate: F) -> Flow<In, Out, Mat>
1571 where
1572 F: Fn(&Out) -> bool + Send + Sync + 'static,
1573 {
1574 let predicate = Arc::new(predicate);
1575 self.via(Flow::from_transform(move |mut input| {
1576 let predicate = Arc::clone(&predicate);
1577 let mut dropping = true;
1578 Box::new(std::iter::from_fn(move || {
1579 loop {
1580 let next = input.next()?;
1581 match next {
1582 Ok(item) if dropping && predicate(&item) => continue,
1583 Ok(item) => {
1584 dropping = false;
1585 return Some(Ok(item));
1586 }
1587 Err(error) => return Some(Err(error)),
1588 }
1589 }
1590 }))
1591 }))
1592 }
1593
1594 #[must_use]
1595 pub fn limit(self, max: u64) -> Flow<In, Out, Mat> {
1596 self.via(Flow::from_transform(move |input| {
1597 let mut seen = 0_u64;
1598 Box::new(input.map(move |item| match item {
1599 Ok(item) if seen < max => {
1600 seen += 1;
1601 Ok(item)
1602 }
1603 Ok(_) => Err(StreamError::LimitExceeded { max }),
1604 Err(error) => Err(error),
1605 }))
1606 }))
1607 }
1608
1609 #[must_use]
1610 pub fn grouped(self, size: usize) -> Flow<In, Vec<Out>, Mat> {
1611 assert!(size > 0, "grouped size must be greater than zero");
1612 self.via(Flow::from_transform(move |mut input| {
1613 Box::new(std::iter::from_fn(move || {
1614 let mut group = Vec::with_capacity(size);
1615 while group.len() < size {
1616 match input.next() {
1617 Some(Ok(item)) => group.push(item),
1618 Some(Err(error)) => return Some(Err(error)),
1619 None => break,
1620 }
1621 }
1622
1623 if group.is_empty() {
1624 None
1625 } else {
1626 Some(Ok(group))
1627 }
1628 }))
1629 }))
1630 }
1631
1632 #[must_use]
1633 pub fn scan<State, F>(self, seed: State, f: F) -> Flow<In, State, Mat>
1634 where
1635 State: Clone + Send + Sync + 'static,
1636 F: Fn(State, Out) -> State + Send + Sync + 'static,
1637 {
1638 let stage = Arc::new(f);
1639 self.via(Flow::from_transform(move |mut input| {
1640 let stage = Arc::clone(&stage);
1641 let mut state = Some(seed.clone());
1642 let mut emit_seed = true;
1643 Box::new(std::iter::from_fn(move || {
1644 if emit_seed {
1645 emit_seed = false;
1646 return Some(Ok(state.as_ref().expect("scan state present").clone()));
1647 }
1648
1649 match input.next()? {
1650 Ok(item) => {
1651 let prev = state.take().expect("scan state present");
1654 let next = stage(prev, item);
1655 state = Some(next.clone());
1656 Some(Ok(next))
1657 }
1658 Err(error) => Some(Err(error)),
1659 }
1660 }))
1661 }))
1662 }
1663
1664 #[must_use]
1665 pub fn scan_async<State, F, Fut>(self, seed: State, f: F) -> Flow<In, State, Mat>
1666 where
1667 State: Clone + Send + Sync + 'static,
1668 F: Fn(State, Out) -> Fut + Send + Sync + 'static,
1669 Fut: Future<Output = StreamResult<State>> + Send + 'static,
1670 {
1671 let stage = Arc::new(f);
1672 self.via(Flow::from_transform(move |mut input| {
1673 let stage = Arc::clone(&stage);
1674 let mut state = Some(seed.clone());
1675 let mut emit_seed = true;
1676 let mut terminated = false;
1677 Box::new(std::iter::from_fn(move || {
1678 if terminated {
1679 return None;
1680 }
1681 if emit_seed {
1682 emit_seed = false;
1683 return Some(Ok(state
1684 .as_ref()
1685 .expect("scan_async state present")
1686 .clone()));
1687 }
1688
1689 match input.next()? {
1690 Ok(item) => {
1691 let prev = state.take().expect("scan_async state present");
1692 match catch_unwind_failed("scan_async factory", || stage(prev, item))
1693 .and_then(run_future_inline_or_spawn)
1694 {
1695 Ok(next) => {
1696 state = Some(next.clone());
1697 Some(Ok(next))
1698 }
1699 Err(error) => {
1700 terminated = true;
1701 Some(Err(error))
1702 }
1703 }
1704 }
1705 Err(error) => {
1706 terminated = true;
1707 Some(Err(error))
1708 }
1709 }
1710 }))
1711 }))
1712 }
1713
1714 #[must_use]
1715 pub fn scan_result<State, F>(self, seed: State, f: F) -> Flow<In, State, Mat>
1716 where
1717 State: Clone + Send + Sync + 'static,
1718 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1719 {
1720 let stage = Arc::new(f);
1721 self.via(Flow::from_transform(move |mut input| {
1722 let stage = Arc::clone(&stage);
1723 let mut state = Some(seed.clone());
1724 let mut emit_seed = true;
1725 Box::new(std::iter::from_fn(move || {
1726 if emit_seed {
1727 emit_seed = false;
1728 return Some(Ok(state.as_ref().expect("scan state present").clone()));
1729 }
1730
1731 match input.next()? {
1732 Ok(item) => {
1733 let prev = state.take().expect("scan state present");
1734 match stage(prev, item) {
1735 Ok(next) => {
1736 state = Some(next.clone());
1737 Some(Ok(next))
1738 }
1739 Err(error) => Some(Err(error)),
1740 }
1741 }
1742 Err(error) => Some(Err(error)),
1743 }
1744 }))
1745 }))
1746 }
1747
1748 #[must_use]
1749 pub fn scan_result_with_supervision<State, F>(
1750 self,
1751 seed: State,
1752 f: F,
1753 decider: SupervisionDecider,
1754 ) -> Flow<In, State, Mat>
1755 where
1756 State: Clone + Send + Sync + 'static,
1757 F: Fn(State, Out) -> StreamResult<State> + Send + Sync + 'static,
1758 {
1759 let stage = Arc::new(f);
1760 self.via(Flow::from_transform(move |mut input| {
1761 let stage = Arc::clone(&stage);
1762 let decider = Arc::clone(&decider);
1763 let seed = seed.clone();
1764 let mut state = Some(seed.clone());
1765 let mut emit_seed = true;
1766 Box::new(std::iter::from_fn(move || {
1767 loop {
1768 if emit_seed {
1769 emit_seed = false;
1770 return Some(Ok(state.as_ref().expect("scan state present").clone()));
1771 }
1772
1773 match input.next()? {
1774 Ok(item) => {
1775 let prev = state.take().expect("scan state present");
1776 match call_supervised("scan_result callback", || {
1777 stage(prev.clone(), item)
1778 }) {
1779 Ok(next) => {
1780 state = Some(next.clone());
1781 return Some(Ok(next));
1782 }
1783 Err(error) => match decide_supervision(&decider, &error) {
1784 SupervisionDirective::Stop => return Some(Err(error)),
1785 SupervisionDirective::Resume => {
1786 state = Some(prev);
1787 }
1788 SupervisionDirective::Restart => {
1789 state = Some(seed.clone());
1790 emit_seed = true;
1791 }
1792 },
1793 }
1794 }
1795 Err(error) => return Some(Err(error)),
1796 }
1797 }
1798 }))
1799 }))
1800 }
1801
1802 #[must_use]
1803 pub fn sliding(self, size: usize, step: usize) -> Flow<In, Vec<Out>, Mat>
1804 where
1805 Out: Clone,
1806 {
1807 assert!(size > 0, "sliding size must be greater than zero");
1808 assert!(step > 0, "sliding step must be greater than zero");
1809 self.via(Flow::from_transform(move |mut input| {
1810 let mut buffer = VecDeque::with_capacity(size.max(step));
1816 let mut terminated = false;
1817
1818 Box::new(std::iter::from_fn(move || {
1819 if terminated {
1820 return None;
1821 }
1822
1823 loop {
1824 match input.next() {
1825 Some(Ok(item)) => {
1826 buffer.push_back(item);
1827 if buffer.len() < size {
1828 continue;
1829 } else if buffer.len() == size {
1830 return Some(Ok(buffer.iter().cloned().collect()));
1831 } else if step <= size {
1832 for _ in 0..step {
1833 buffer.pop_front();
1834 }
1835 if buffer.len() == size {
1836 return Some(Ok(buffer.iter().cloned().collect()));
1837 }
1838 } else if buffer.len() == step {
1839 buffer.clear();
1841 }
1842 }
1843 Some(Err(error)) => {
1844 terminated = true;
1845 return Some(Err(error));
1846 }
1847 None => {
1848 terminated = true;
1849 if !buffer.is_empty() && buffer.len() < size {
1850 return Some(Ok(buffer.iter().cloned().collect()));
1851 }
1852 return None;
1853 }
1854 }
1855 }
1856 }))
1857 }))
1858 }
1859
1860 #[must_use]
1861 pub fn fold<Acc, F>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
1862 where
1863 Acc: Clone + Send + Sync + 'static,
1864 F: Fn(Acc, Out) -> Acc + Send + Sync + 'static,
1865 {
1866 let stage = Arc::new(f);
1867 self.via(Flow::from_transform(move |input| {
1868 let stage = Arc::clone(&stage);
1869 let mut acc = zero.clone();
1870 for item in input {
1871 match item {
1872 Ok(item) => acc = stage(acc, item),
1873 Err(error) => return Box::new(std::iter::once(Err(error))),
1874 }
1875 }
1876 Box::new(std::iter::once(Ok(acc)))
1877 }))
1878 }
1879
1880 #[must_use]
1881 pub fn fold_async<Acc, F, Fut>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
1882 where
1883 Acc: Clone + Send + Sync + 'static,
1884 F: Fn(Acc, Out) -> Fut + Send + Sync + 'static,
1885 Fut: Future<Output = StreamResult<Acc>> + Send + 'static,
1886 {
1887 let stage = Arc::new(f);
1888 self.via(Flow::from_transform(move |mut input| {
1889 let stage = Arc::clone(&stage);
1890 let mut acc = Some(zero.clone());
1891 let mut done = false;
1892 Box::new(std::iter::from_fn(move || {
1893 if done {
1894 return None;
1895 }
1896 done = true;
1897
1898 for item in input.by_ref() {
1899 let item = match item {
1900 Ok(item) => item,
1901 Err(error) => return Some(Err(error)),
1902 };
1903 let current = acc.take().expect("fold_async accumulator present");
1904 match catch_unwind_failed("fold_async factory", || stage(current, item))
1905 .and_then(run_future_inline_or_spawn)
1906 {
1907 Ok(next) => acc = Some(next),
1908 Err(error) => return Some(Err(error)),
1909 }
1910 }
1911
1912 Some(Ok(acc.take().expect("fold_async accumulator present")))
1913 }))
1914 }))
1915 }
1916
1917 #[must_use]
1918 pub fn map_with_resource<Resource, Next, Create, F, Close>(
1919 self,
1920 create: Create,
1921 f: F,
1922 close: Close,
1923 ) -> Flow<In, Next, Mat>
1924 where
1925 Resource: Send + 'static,
1926 Next: Send + 'static,
1927 Create: Fn() -> StreamResult<Resource> + Send + Sync + 'static,
1928 F: Fn(&mut Resource, Out) -> StreamResult<Next> + Send + Sync + 'static,
1929 Close: Fn(Resource) -> StreamResult<Option<Next>> + Send + Sync + 'static,
1930 {
1931 let create = Arc::new(create);
1932 let stage = Arc::new(f);
1933 let close = Arc::new(close);
1934 self.via(Flow::from_transform(move |input| {
1935 Box::new(MapWithResourceStream {
1936 input,
1937 create: Arc::clone(&create),
1938 stage: Arc::clone(&stage),
1939 close: Arc::clone(&close),
1940 resource: None,
1941 created: false,
1942 pending_terminal: None,
1943 terminated: false,
1944 _marker: PhantomData,
1945 }) as BoxStream<Next>
1946 }))
1947 }
1948
1949 #[must_use]
1950 pub fn fold_result<Acc, F>(self, zero: Acc, f: F) -> Flow<In, Acc, Mat>
1951 where
1952 Acc: Clone + Send + Sync + 'static,
1953 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1954 {
1955 let stage = Arc::new(f);
1956 self.via(Flow::from_transform(move |input| {
1957 let stage = Arc::clone(&stage);
1958 let mut acc = zero.clone();
1959 for item in input {
1960 match item {
1961 Ok(item) => match stage(acc, item) {
1962 Ok(next) => acc = next,
1963 Err(error) => return Box::new(std::iter::once(Err(error))),
1964 },
1965 Err(error) => return Box::new(std::iter::once(Err(error))),
1966 }
1967 }
1968 Box::new(std::iter::once(Ok(acc)))
1969 }))
1970 }
1971
1972 #[must_use]
1973 pub fn fold_result_with_supervision<Acc, F>(
1974 self,
1975 zero: Acc,
1976 f: F,
1977 decider: SupervisionDecider,
1978 ) -> Flow<In, Acc, Mat>
1979 where
1980 Acc: Clone + Send + Sync + 'static,
1981 F: Fn(Acc, Out) -> StreamResult<Acc> + Send + Sync + 'static,
1982 {
1983 let stage = Arc::new(f);
1984 self.via(Flow::from_transform(move |input| {
1985 let stage = Arc::clone(&stage);
1986 let decider = Arc::clone(&decider);
1987 let mut acc = zero.clone();
1988 for item in input {
1989 match item {
1990 Ok(item) => {
1991 let previous = acc;
1992 match call_supervised("fold_result callback", || {
1993 stage(previous.clone(), item)
1994 }) {
1995 Ok(next) => acc = next,
1996 Err(error) => match decide_supervision(&decider, &error) {
1997 SupervisionDirective::Stop => {
1998 return Box::new(std::iter::once(Err(error)));
1999 }
2000 SupervisionDirective::Resume => acc = previous,
2001 SupervisionDirective::Restart => acc = zero.clone(),
2002 },
2003 }
2004 }
2005 Err(error) => return Box::new(std::iter::once(Err(error))),
2006 }
2007 }
2008 Box::new(std::iter::once(Ok(acc)))
2009 }))
2010 }
2011
2012 #[must_use]
2013 pub fn reduce<F>(self, f: F) -> Flow<In, Out, Mat>
2014 where
2015 F: Fn(Out, Out) -> Out + Send + Sync + 'static,
2016 {
2017 let stage = Arc::new(f);
2018 self.via(Flow::from_transform(move |mut input| {
2019 let Some(first) = input.next() else {
2020 return Box::new(std::iter::once(Err(StreamError::EmptyStream)));
2021 };
2022 let mut acc = match first {
2023 Ok(item) => item,
2024 Err(error) => return Box::new(std::iter::once(Err(error))),
2025 };
2026 for item in input {
2027 match item {
2028 Ok(item) => acc = stage(acc, item),
2029 Err(error) => return Box::new(std::iter::once(Err(error))),
2030 }
2031 }
2032 Box::new(std::iter::once(Ok(acc)))
2033 }))
2034 }
2035
2036 #[must_use]
2037 pub fn reduce_result<F>(self, f: F) -> Flow<In, Out, Mat>
2038 where
2039 Out: Clone,
2040 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
2041 {
2042 let stage = Arc::new(f);
2043 self.via(Flow::from_transform(move |mut input| {
2044 let Some(first) = input.next() else {
2045 return Box::new(std::iter::once(Err(StreamError::EmptyStream)));
2046 };
2047 let mut acc = match first {
2048 Ok(item) => item,
2049 Err(error) => return Box::new(std::iter::once(Err(error))),
2050 };
2051 for item in input {
2052 match item {
2053 Ok(item) => match stage(acc, item) {
2054 Ok(next) => acc = next,
2055 Err(error) => return Box::new(std::iter::once(Err(error))),
2056 },
2057 Err(error) => return Box::new(std::iter::once(Err(error))),
2058 }
2059 }
2060 Box::new(std::iter::once(Ok(acc)))
2061 }))
2062 }
2063
2064 #[must_use]
2065 pub fn reduce_result_with_supervision<F>(
2066 self,
2067 f: F,
2068 decider: SupervisionDecider,
2069 ) -> Flow<In, Out, Mat>
2070 where
2071 Out: Clone,
2072 F: Fn(Out, Out) -> StreamResult<Out> + Send + Sync + 'static,
2073 {
2074 let stage = Arc::new(f);
2075 self.via(Flow::from_transform(move |input| {
2076 let stage = Arc::clone(&stage);
2077 let decider = Arc::clone(&decider);
2078 let mut acc = None::<Out>;
2079 for item in input {
2080 match item {
2081 Ok(item) => {
2082 let Some(previous) = acc.take() else {
2083 acc = Some(item);
2084 continue;
2085 };
2086 match call_supervised("reduce_result callback", || {
2087 stage(previous.clone(), item)
2088 }) {
2089 Ok(next) => acc = Some(next),
2090 Err(error) => match decide_supervision(&decider, &error) {
2091 SupervisionDirective::Stop => {
2092 return Box::new(std::iter::once(Err(error)));
2093 }
2094 SupervisionDirective::Resume => acc = Some(previous),
2095 SupervisionDirective::Restart => acc = None,
2096 },
2097 }
2098 }
2099 Err(error) => return Box::new(std::iter::once(Err(error))),
2100 }
2101 }
2102 match acc {
2103 Some(acc) => Box::new(std::iter::once(Ok(acc))),
2104 None => Box::new(std::iter::once(Err(StreamError::EmptyStream))),
2105 }
2106 }))
2107 }
2108
2109 #[must_use]
2110 pub fn map_error<F>(self, f: F) -> Flow<In, Out, Mat>
2111 where
2112 F: Fn(StreamError) -> StreamError + Send + Sync + 'static,
2113 {
2114 let stage = Arc::new(f);
2115 self.via(Flow::from_transform(move |input| {
2116 let stage = Arc::clone(&stage);
2117 Box::new(input.map(move |item| item.map_err(|error| stage(error))))
2118 }))
2119 }
2120
2121 #[must_use]
2122 pub fn recover<F>(self, f: F) -> Flow<In, Out, Mat>
2123 where
2124 F: Fn(StreamError) -> Option<Out> + Send + Sync + 'static,
2125 {
2126 let stage = Arc::new(f);
2127 self.via(Flow::from_transform(move |mut input| {
2128 let stage = Arc::clone(&stage);
2129 let mut done = false;
2130 Box::new(std::iter::from_fn(move || {
2131 if done {
2132 return None;
2133 }
2134 match input.next()? {
2135 Ok(item) => Some(Ok(item)),
2136 Err(error) => {
2137 done = true;
2138 match stage(error.clone()) {
2139 Some(item) => Some(Ok(item)),
2140 None => Some(Err(error)),
2141 }
2142 }
2143 }
2144 }))
2145 }))
2146 }
2147
2148 #[must_use]
2149 pub fn recover_with<F>(self, f: F) -> Flow<In, Out, Mat>
2150 where
2151 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
2152 {
2153 self.recover_with_attempts(None, f)
2155 }
2156
2157 #[must_use]
2158 pub fn recover_with_retries<F>(self, retries: usize, f: F) -> Flow<In, Out, Mat>
2159 where
2160 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
2161 {
2162 self.recover_with_attempts(Some(retries), f)
2163 }
2164
2165 fn recover_with_attempts<F>(self, attempts: Option<usize>, f: F) -> Flow<In, Out, Mat>
2166 where
2167 F: Fn(StreamError) -> Option<Source<Out>> + Send + Sync + 'static,
2168 {
2169 let stage = Arc::new(f);
2170 self.via(Flow::from_runtime_transform(move |input, materializer| {
2171 let replacement_materializer =
2172 materializer.with_name_prefix(materializer.name_prefix().to_owned());
2173 let stage = Arc::clone(&stage);
2174 let mut remaining_retries = attempts;
2176 let mut current = input;
2177 let mut terminated = false;
2178
2179 Ok(Box::new(std::iter::from_fn(move || {
2180 if terminated {
2181 return None;
2182 }
2183
2184 loop {
2185 match current.next() {
2186 Some(Ok(item)) => return Some(Ok(item)),
2187 Some(Err(error)) if remaining_retries != Some(0) => {
2188 if let Some(remaining) = remaining_retries.as_mut() {
2189 *remaining -= 1;
2190 }
2191 match stage(error.clone()) {
2192 Some(source) => match Arc::clone(&source.factory)
2193 .create(&replacement_materializer)
2194 {
2195 Ok((stream, _)) => current = stream,
2196 Err(error) => {
2197 terminated = true;
2198 return Some(Err(error));
2199 }
2200 },
2201 None => {
2202 terminated = true;
2203 return Some(Err(error));
2204 }
2205 }
2206 }
2207 Some(Err(error)) => {
2208 terminated = true;
2209 return Some(Err(error));
2210 }
2211 None => {
2212 terminated = true;
2213 return None;
2214 }
2215 }
2216 }
2217 })) as BoxStream<Out>)
2218 }))
2219 }
2220
2221 #[must_use]
2222 pub fn on_error_complete(self) -> Flow<In, Out, Mat> {
2223 self.via(Flow::from_transform(move |mut input| {
2224 let mut done = false;
2225 Box::new(std::iter::from_fn(move || {
2226 if done {
2227 return None;
2228 }
2229 match input.next()? {
2230 Ok(item) => Some(Ok(item)),
2231 Err(_) => {
2232 done = true;
2233 None
2234 }
2235 }
2236 }))
2237 }))
2238 }
2239
2240 #[must_use]
2241 pub fn prefix_and_tail(self, n: usize) -> Flow<In, (Vec<Out>, Source<Out>), Mat> {
2242 self.via(Flow::from_runtime_transform(move |input, _materializer| {
2243 Ok(prefix_and_tail_stream(input, n))
2244 }))
2245 }
2246
2247 #[must_use]
2248 pub fn flat_map_prefix<Next, NextMat, F>(self, n: usize, f: F) -> Flow<In, Next, Mat>
2249 where
2250 Next: Send + 'static,
2251 NextMat: Send + 'static,
2252 F: Fn(Vec<Out>) -> Flow<Out, Next, NextMat> + Send + Sync + 'static,
2253 Out: Clone,
2254 {
2255 let stage = Arc::new(f);
2256 self.via(Flow::from_runtime_transform(
2257 move |mut input, materializer| {
2258 let mut prefix = Vec::with_capacity(n);
2259 while prefix.len() < n {
2260 match input.next() {
2261 Some(Ok(item)) => prefix.push(item),
2262 Some(Err(error)) => {
2263 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>);
2264 }
2265 None => break,
2266 }
2267 }
2268
2269 let flow = stage(prefix);
2270 let transform = flow.transform;
2271 let _ = (flow.materialize)()?;
2272 match transform {
2273 FlowTransform::Pure(transform) => Ok(transform(input)),
2274 FlowTransform::Runtime(transform) => transform(input, materializer),
2275 }
2276 },
2277 ))
2278 }
2279
2280 #[must_use]
2281 pub fn group_by<Key, F>(
2282 self,
2283 max_substreams: usize,
2284 f: F,
2285 allow_closed_substream_recreation: bool,
2286 ) -> Flow<In, Source<Out>, Mat>
2287 where
2288 Key: Clone + Eq + Hash + Send + 'static,
2289 F: Fn(&Out) -> Key + Send + Sync + 'static,
2290 Out: Clone,
2291 {
2292 self.group_by_with_batching(
2293 max_substreams,
2294 f,
2295 allow_closed_substream_recreation,
2296 GroupByBatchMode::Immediate,
2297 )
2298 }
2299
2300 pub(super) fn group_by_with_batching<Key, F>(
2301 self,
2302 max_substreams: usize,
2303 f: F,
2304 allow_closed_substream_recreation: bool,
2305 batch_mode: GroupByBatchMode,
2306 ) -> Flow<In, Source<Out>, Mat>
2307 where
2308 Key: Clone + Eq + Hash + Send + 'static,
2309 F: Fn(&Out) -> Key + Send + Sync + 'static,
2310 Out: Clone,
2311 {
2312 assert!(
2313 max_substreams > 0,
2314 "group_by max_substreams must be greater than zero"
2315 );
2316 let key_fn = Arc::new(f);
2317 self.via(Flow::from_runtime_transform(move |input, materializer| {
2318 Ok(group_by_stream(
2319 input,
2320 max_substreams,
2321 allow_closed_substream_recreation,
2322 Arc::clone(&key_fn),
2323 batch_mode,
2324 materializer,
2325 ))
2326 }))
2327 }
2328
2329 #[must_use]
2330 pub fn split_when<F>(self, predicate: F) -> Flow<In, Source<Out>, Mat>
2331 where
2332 F: Fn(&Out) -> bool + Send + Sync + 'static,
2333 Out: Clone,
2334 {
2335 let predicate = Arc::new(predicate);
2336 self.via(Flow::from_runtime_transform(move |input, materializer| {
2337 Ok(split_streams(
2338 input,
2339 SplitMode::When,
2340 Arc::clone(&predicate),
2341 materializer,
2342 ))
2343 }))
2344 }
2345
2346 #[must_use]
2347 pub fn split_after<F>(self, predicate: F) -> Flow<In, Source<Out>, Mat>
2348 where
2349 F: Fn(&Out) -> bool + Send + Sync + 'static,
2350 Out: Clone,
2351 {
2352 let predicate = Arc::new(predicate);
2353 self.via(Flow::from_runtime_transform(move |input, materializer| {
2354 Ok(split_streams(
2355 input,
2356 SplitMode::After,
2357 Arc::clone(&predicate),
2358 materializer,
2359 ))
2360 }))
2361 }
2362
2363 #[must_use]
2364 pub fn flat_map_concat<Next, NextMat, F>(self, f: F) -> Flow<In, Next, Mat>
2365 where
2366 Next: Send + 'static,
2367 NextMat: Send + 'static,
2368 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
2369 {
2370 let stage = Arc::new(f);
2371 self.via(Flow::from_runtime_transform(move |input, materializer| {
2372 Ok(flat_map_concat_stream(
2373 input,
2374 Arc::clone(&stage),
2375 materializer,
2376 ))
2377 }))
2378 }
2379
2380 #[must_use]
2381 pub fn flat_map_merge<Next, NextMat, F>(self, breadth: usize, f: F) -> Flow<In, Next, Mat>
2382 where
2383 Next: Send + 'static,
2384 NextMat: Send + 'static,
2385 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
2386 {
2387 assert!(
2388 breadth > 0,
2389 "flat_map_merge breadth must be greater than zero"
2390 );
2391 let stage = Arc::new(f);
2392 self.via(Flow::from_runtime_transform_with_hints(
2393 move |input, materializer| {
2394 Ok(flat_map_merge_stream(
2395 input,
2396 breadth,
2397 Arc::clone(&stage),
2398 materializer,
2399 ))
2400 },
2401 FlowHints::PRESERVES_TERMINAL_CONSUMER_BATCH,
2402 ))
2403 }
2404
2405 #[must_use]
2406 pub fn concat<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2407 where
2408 Mat2: Send + 'static,
2409 {
2410 self.concat_sources([that])
2411 }
2412
2413 #[must_use]
2414 pub fn concat_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2415 where
2416 Mat2: Send + 'static,
2417 {
2418 let that_factory = that.factory;
2419 self.via(Flow::from_runtime_transform(move |input, materializer| {
2420 let primary = input;
2421 Ok(concat_streams_lazy(
2422 primary,
2423 vec![Arc::clone(&that_factory)],
2424 materializer,
2425 ))
2426 }))
2427 }
2428
2429 #[must_use]
2430 pub fn concat_all_lazy<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
2431 where
2432 Mat2: Send + 'static,
2433 I: IntoIterator<Item = Source<Out, Mat2>>,
2434 {
2435 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2436 self.via(Flow::from_runtime_transform(move |input, materializer| {
2437 Ok(concat_streams_lazy(
2438 input,
2439 source_factories.clone(),
2440 materializer,
2441 ))
2442 }))
2443 }
2444
2445 #[must_use]
2446 pub fn prepend<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2447 where
2448 Mat2: Send + 'static,
2449 {
2450 self.prepend_sources([that])
2451 }
2452
2453 #[must_use]
2454 pub fn prepend_lazy<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2455 where
2456 Mat2: Send + 'static,
2457 {
2458 self.prepend(that)
2459 }
2460
2461 #[must_use]
2462 pub fn or_else<Mat2>(self, secondary: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2463 where
2464 Mat2: Send + 'static,
2465 {
2466 let secondary_factory = secondary.factory;
2467 self.via(Flow::from_runtime_transform(move |input, materializer| {
2468 let secondary = match Arc::clone(&secondary_factory).create(materializer) {
2469 Ok((stream, _)) => stream,
2470 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Out>,
2471 };
2472 Ok(or_else_stream(input, secondary))
2473 }))
2474 }
2475
2476 #[must_use]
2477 pub fn interleave<Mat2>(
2478 self,
2479 that: Source<Out, Mat2>,
2480 segment_size: usize,
2481 ) -> Flow<In, Out, Mat>
2482 where
2483 Mat2: Send + 'static,
2484 {
2485 self.interleave_all([that], segment_size, false)
2486 }
2487
2488 #[must_use]
2489 pub fn interleave_all<Mat2, I>(
2490 self,
2491 those: I,
2492 segment_size: usize,
2493 eager_close: bool,
2494 ) -> Flow<In, Out, Mat>
2495 where
2496 Mat2: Send + 'static,
2497 I: IntoIterator<Item = Source<Out, Mat2>>,
2498 {
2499 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2500 self.via(Flow::from_runtime_transform(move |input, materializer| {
2501 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2502 streams.push(input);
2503 for factory in &source_factories {
2504 let stream = match Arc::clone(factory).create(materializer) {
2505 Ok((stream, _)) => stream,
2506 Err(error) => {
2507 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2508 }
2509 };
2510 streams.push(stream);
2511 }
2512 Ok(interleave_streams(streams, segment_size, eager_close))
2513 }))
2514 }
2515
2516 #[must_use]
2517 pub fn merge_sorted<Mat2>(self, that: Source<Out, Mat2>) -> Flow<In, Out, Mat>
2518 where
2519 Out: Ord,
2520 Mat2: Send + 'static,
2521 {
2522 let source_factory = that.factory;
2523 self.via(Flow::from_runtime_transform(move |input, materializer| {
2524 let other = match Arc::clone(&source_factory).create(materializer) {
2525 Ok((stream, _)) => stream,
2526 Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>),
2527 };
2528 Ok(merge_sorted_stream(input, other))
2529 }))
2530 }
2531
2532 #[must_use]
2533 pub fn merge_latest<Mat2>(
2534 self,
2535 that: Source<Out, Mat2>,
2536 eager_complete: bool,
2537 ) -> Flow<In, Vec<Out>, Mat>
2538 where
2539 Out: Clone,
2540 Mat2: Send + 'static,
2541 {
2542 let source_factory = that.factory;
2543 self.via(Flow::from_runtime_transform(move |input, materializer| {
2544 let other = match Arc::clone(&source_factory).create(materializer) {
2545 Ok((stream, _)) => stream,
2546 Err(error) => {
2547 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Vec<Out>>);
2548 }
2549 };
2550 Ok(merge_latest_streams(vec![input, other], eager_complete))
2551 }))
2552 }
2553
2554 #[must_use]
2555 pub fn merge_all<Mat2, I>(self, those: I, eager_complete: bool) -> Flow<In, Out, Mat>
2556 where
2557 Mat2: Send + 'static,
2558 I: IntoIterator<Item = Source<Out, Mat2>>,
2559 {
2560 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2561 self.via(Flow::from_runtime_transform(move |input, materializer| {
2562 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2563 streams.push(input);
2564 for factory in &source_factories {
2565 let stream = match Arc::clone(factory).create(materializer) {
2566 Ok((stream, _)) => stream,
2567 Err(error) => {
2568 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2569 }
2570 };
2571 streams.push(stream);
2572 }
2573 Ok(merge_streams(streams, eager_complete))
2574 }))
2575 }
2576
2577 #[must_use]
2578 pub fn zip_with<Mat2, Out2, Next, F>(
2579 self,
2580 that: Source<Out2, Mat2>,
2581 combine: F,
2582 ) -> Flow<In, Next, Mat>
2583 where
2584 Out2: Send + 'static,
2585 Next: Send + 'static,
2586 Mat2: Send + 'static,
2587 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
2588 {
2589 let source_factory = that.factory;
2590 let combine = Arc::new(combine);
2591 self.via(Flow::from_runtime_transform(move |input, materializer| {
2592 let other = match Arc::clone(&source_factory).create(materializer) {
2593 Ok((stream, _)) => stream,
2594 Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>),
2595 };
2596 let combine = Arc::clone(&combine);
2597 Ok(Box::new(
2598 zip_streams(input, other)
2599 .map(move |item| item.map(|(left, right)| combine(left, right))),
2600 ) as BoxStream<Next>)
2601 }))
2602 }
2603
2604 #[must_use]
2605 pub fn zip_latest<Mat2, Out2>(self, that: Source<Out2, Mat2>) -> Flow<In, (Out, Out2), Mat>
2606 where
2607 Out: Clone,
2608 Out2: Clone + Send + 'static,
2609 Mat2: Send + 'static,
2610 {
2611 self.zip_latest_with(that, true, |left, right| (left, right))
2612 }
2613
2614 #[must_use]
2615 pub fn zip_latest_with<Mat2, Out2, Next, F>(
2616 self,
2617 that: Source<Out2, Mat2>,
2618 eager_complete: bool,
2619 combine: F,
2620 ) -> Flow<In, Next, Mat>
2621 where
2622 Out: Clone,
2623 Out2: Clone + Send + 'static,
2624 Next: Send + 'static,
2625 Mat2: Send + 'static,
2626 F: Fn(Out, Out2) -> Next + Send + Sync + 'static,
2627 {
2628 let source_factory = that.factory;
2629 let combine = Arc::new(combine);
2630 self.via(Flow::from_runtime_transform(move |input, materializer| {
2631 let other = match Arc::clone(&source_factory).create(materializer) {
2632 Ok((stream, _)) => stream,
2633 Err(error) => return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Next>),
2634 };
2635 Ok(zip_latest_with_stream(
2636 input,
2637 other,
2638 eager_complete,
2639 Arc::clone(&combine),
2640 ))
2641 }))
2642 }
2643
2644 #[must_use]
2645 pub fn zip_with_index(self) -> Flow<In, (Out, u64), Mat> {
2646 self.via(Flow::from_runtime_transform(
2647 move |mut input, _materializer| {
2648 let mut index = 0_u64;
2649 Ok(Box::new(std::iter::from_fn(move || {
2650 input.next().map(|item| {
2651 item.map(|value| {
2652 let pair = (value, index);
2653 index = index.wrapping_add(1);
2654 pair
2655 })
2656 })
2657 })) as BoxStream<(Out, u64)>)
2658 },
2659 ))
2660 }
2661
2662 #[must_use]
2663 pub fn zip_all<Mat2, Out2>(
2664 self,
2665 that: Source<Out2, Mat2>,
2666 this_elem: Out,
2667 that_elem: Out2,
2668 ) -> Flow<In, (Out, Out2), Mat>
2669 where
2670 Out: Clone + Sync,
2671 Out2: Clone + Send + Sync + 'static,
2672 Mat2: Send + 'static,
2673 {
2674 let source_factory = that.factory;
2675 self.via(Flow::from_runtime_transform(move |input, materializer| {
2676 let other = match Arc::clone(&source_factory).create(materializer) {
2677 Ok((stream, _)) => stream,
2678 Err(error) => {
2679 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<(Out, Out2)>);
2680 }
2681 };
2682 Ok(zip_all_stream(
2683 input,
2684 other,
2685 this_elem.clone(),
2686 that_elem.clone(),
2687 ))
2688 }))
2689 }
2690
2691 #[must_use]
2692 pub fn also_to<SideMat>(self, sink: Sink<Out, SideMat>) -> Flow<In, Out, Mat>
2693 where
2694 Out: Clone,
2695 SideMat: Send + 'static,
2696 {
2697 self.via(Flow::from_runtime_transform(
2698 move |mut input: BoxStream<Out>, materializer| {
2699 let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 0)?;
2700 let mut sender = Some(side_sender);
2701 let side_mat = side_mat;
2702 Ok(Box::new(std::iter::from_fn(move || match input.next() {
2703 Some(Ok(item)) => {
2704 let _ = &side_mat;
2705 if sender
2706 .as_ref()
2707 .is_some_and(|sender| sender.send(Ok(item.clone())).is_err())
2708 {
2709 sender = None;
2710 return None;
2711 }
2712 Some(Ok(item))
2713 }
2714 Some(Err(error)) => {
2715 let _ = &side_mat;
2716 let _ = sender
2717 .as_ref()
2718 .and_then(|sender| sender.send(Err(error.clone())).ok());
2719 sender = None;
2720 Some(Err(error))
2721 }
2722 None => {
2723 let _ = &side_mat;
2724 sender = None;
2725 None
2726 }
2727 })) as BoxStream<Out>)
2728 },
2729 ))
2730 }
2731
2732 #[must_use]
2733 pub fn also_to_all<SideMat, I>(self, sinks: I) -> Flow<In, Out, Mat>
2734 where
2735 Out: Clone,
2736 SideMat: Send + 'static,
2737 I: IntoIterator<Item = Sink<Out, SideMat>>,
2738 {
2739 let sinks: Vec<_> = sinks.into_iter().collect();
2740 if sinks.is_empty() {
2741 return self;
2742 }
2743
2744 self.via(Flow::from_runtime_transform(
2745 move |mut input: BoxStream<Out>, materializer| {
2746 let mut sides = sinks
2747 .iter()
2748 .map(|sink| materialize_side_sink(sink, materializer, 0))
2749 .collect::<StreamResult<Vec<_>>>()?;
2750 Ok(Box::new(std::iter::from_fn(move || match input.next() {
2751 Some(Ok(item)) => {
2752 for (sender, _) in &sides {
2753 if sender.send(Ok(item.clone())).is_err() {
2754 sides.clear();
2755 return None;
2756 }
2757 }
2758 Some(Ok(item))
2759 }
2760 Some(Err(error)) => {
2761 for (sender, _) in &sides {
2762 let _ = sender.send(Err(error.clone())).ok();
2763 }
2764 sides.clear();
2765 Some(Err(error))
2766 }
2767 None => {
2768 sides.clear();
2769 None
2770 }
2771 })) as BoxStream<Out>)
2772 },
2773 ))
2774 }
2775
2776 #[must_use]
2777 pub fn divert_to<SideMat, F>(self, sink: Sink<Out, SideMat>, predicate: F) -> Flow<In, Out, Mat>
2778 where
2779 SideMat: Send + 'static,
2780 F: Fn(&Out) -> bool + Send + Sync + 'static,
2781 {
2782 let predicate = Arc::new(predicate);
2783 self.via(Flow::from_runtime_transform(
2784 move |mut input: BoxStream<Out>, materializer| {
2785 let predicate = Arc::clone(&predicate);
2786 let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 0)?;
2787 let mut sender = Some(side_sender);
2788 let side_mat = side_mat;
2789 Ok(Box::new(std::iter::from_fn(move || {
2790 loop {
2791 let _ = &side_mat;
2792 match input.next() {
2793 Some(Ok(item)) if predicate(&item) => {
2794 if sender
2795 .as_ref()
2796 .is_some_and(|sender| sender.send(Ok(item)).is_err())
2797 {
2798 sender = None;
2799 return None;
2800 }
2801 }
2802 Some(Ok(item)) => return Some(Ok(item)),
2803 Some(Err(error)) => {
2804 let _ = sender
2805 .as_ref()
2806 .and_then(|sender| sender.send(Err(error.clone())).ok());
2807 sender = None;
2808 return Some(Err(error));
2809 }
2810 None => {
2811 sender = None;
2812 return None;
2813 }
2814 }
2815 }
2816 })) as BoxStream<Out>)
2817 },
2818 ))
2819 }
2820
2821 #[must_use]
2822 pub fn wire_tap<SideMat>(self, sink: Sink<Out, SideMat>) -> Flow<In, Out, Mat>
2823 where
2824 Out: Clone,
2825 SideMat: Send + 'static,
2826 {
2827 self.via(Flow::from_runtime_transform(
2828 move |mut input: BoxStream<Out>, materializer| {
2829 let (side_sender, side_mat) = materialize_side_sink(&sink, materializer, 1)?;
2830 let mut sender = Some(side_sender);
2831 let side_mat = side_mat;
2832 Ok(Box::new(std::iter::from_fn(move || match input.next() {
2833 Some(Ok(item)) => {
2834 let _ = &side_mat;
2835 if let Some(side) = sender.as_ref() {
2836 match side.try_send(Ok(item.clone())) {
2837 Ok(()) | Err(std::sync::mpsc::TrySendError::Full(_)) => {}
2838 Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
2839 sender = None
2840 }
2841 }
2842 }
2843 Some(Ok(item))
2844 }
2845 Some(Err(error)) => {
2846 let _ = &side_mat;
2847 if let Some(side) = sender.as_ref() {
2848 match side.try_send(Err(error.clone())) {
2849 Ok(())
2850 | Err(std::sync::mpsc::TrySendError::Full(_))
2851 | Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {}
2852 }
2853 }
2854 sender = None;
2855 Some(Err(error))
2856 }
2857 None => {
2858 let _ = &side_mat;
2859 sender = None;
2860 None
2861 }
2862 })) as BoxStream<Out>)
2863 },
2864 ))
2865 }
2866
2867 fn concat_sources<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
2868 where
2869 Mat2: Send + 'static,
2870 I: IntoIterator<Item = Source<Out, Mat2>>,
2871 {
2872 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2873 self.via(Flow::from_runtime_transform(move |input, materializer| {
2874 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2875 streams.push(input);
2876 for factory in &source_factories {
2877 let stream = match Arc::clone(factory).create(materializer) {
2878 Ok((stream, _)) => stream,
2879 Err(error) => {
2880 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2881 }
2882 };
2883 streams.push(stream);
2884 }
2885 Ok(concat_streams(streams))
2886 }))
2887 }
2888
2889 fn prepend_sources<Mat2, I>(self, those: I) -> Flow<In, Out, Mat>
2890 where
2891 Mat2: Send + 'static,
2892 I: IntoIterator<Item = Source<Out, Mat2>>,
2893 {
2894 let source_factories: Vec<_> = those.into_iter().map(|source| source.factory).collect();
2895 self.via(Flow::from_runtime_transform(move |input, materializer| {
2896 let mut streams = Vec::with_capacity(source_factories.len() + 1);
2897 for factory in &source_factories {
2898 let stream = match Arc::clone(factory).create(materializer) {
2899 Ok((stream, _)) => stream,
2900 Err(error) => {
2901 return Ok(Box::new(std::iter::once(Err(error))) as BoxStream<Out>);
2902 }
2903 };
2904 streams.push(stream);
2905 }
2906 streams.push(input);
2907 Ok(concat_streams(streams))
2908 }))
2909 }
2910
2911 #[must_use]
2914 pub fn intersperse(self, inject: Out) -> Flow<In, Out, Mat>
2915 where
2916 Out: Clone + Sync,
2917 {
2918 let inject = Arc::new(inject);
2919 self.via(Flow::from_transform(move |mut input| {
2920 let inject = Arc::clone(&inject);
2921 let mut first = true;
2922 Box::new(std::iter::from_fn(move || {
2923 if first {
2924 first = false;
2925 match input.next() {
2926 None => None,
2927 Some(item) => {
2928 if item.is_err() {
2929 first = true;
2930 }
2931 Some(item)
2932 }
2933 }
2934 } else {
2935 match input.next() {
2936 None => None,
2937 Some(item) => {
2938 if item.is_err() {
2939 first = true;
2940 Some(item)
2941 } else {
2942 Some(Ok((*inject).clone()))
2943 }
2944 }
2945 }
2946 }
2947 }))
2948 }))
2949 }
2950
2951 #[must_use]
2952 pub fn flatten_optional<Inner>(self) -> Flow<In, Inner, Mat>
2953 where
2954 Out: Into<Option<Inner>>,
2955 Inner: Send + 'static,
2956 {
2957 self.filter_map(|item| item.into())
2958 }
2959
2960 #[must_use]
2961 pub fn grouped_weighted<F>(self, max_weight: usize, cost_fn: F) -> Flow<In, Vec<Out>, Mat>
2962 where
2963 F: Fn(&Out) -> usize + Send + Sync + 'static,
2964 {
2965 let cost_fn = Arc::new(cost_fn);
2966 self.via(Flow::from_transform(move |mut input| {
2967 let cost_fn = Arc::clone(&cost_fn);
2968 Box::new(std::iter::from_fn(move || {
2969 let mut group = Vec::new();
2970 let mut weight = 0usize;
2971 while weight < max_weight {
2972 match input.next() {
2973 Some(Ok(item)) => {
2974 let item_weight = cost_fn(&item);
2975 if weight > 0 && weight + item_weight > max_weight {
2976 group.push(item);
2977 break;
2978 }
2979 weight += item_weight;
2980 group.push(item);
2981 }
2982 Some(Err(error)) => return Some(Err(error)),
2983 None => break,
2984 }
2985 }
2986 if group.is_empty() {
2987 None
2988 } else {
2989 Some(Ok(group))
2990 }
2991 }))
2992 }))
2993 }
2994
2995 #[must_use]
2996 pub fn limit_weighted<F>(self, max_weight: usize, cost_fn: F) -> Flow<In, Out, Mat>
2997 where
2998 F: Fn(&Out) -> usize + Send + Sync + 'static,
2999 {
3000 let cost_fn = Arc::new(cost_fn);
3001 self.via(Flow::from_transform(move |mut input| {
3002 let cost_fn = Arc::clone(&cost_fn);
3003 let mut weight = 0usize;
3004 Box::new(std::iter::from_fn(move || match input.next()? {
3005 Ok(item) => {
3006 let item_weight = cost_fn(&item);
3007 if weight + item_weight > max_weight {
3008 Some(Err(StreamError::LimitExceeded {
3009 max: max_weight as u64,
3010 }))
3011 } else {
3012 weight += item_weight;
3013 Some(Ok(item))
3014 }
3015 }
3016 Err(error) => Some(Err(error)),
3017 }))
3018 }))
3019 }
3020
3021 #[must_use]
3022 pub fn contramap<NewIn, F>(self, f: F) -> Flow<NewIn, Out, Mat>
3023 where
3024 NewIn: Send + 'static,
3025 F: Fn(NewIn) -> In + Send + Sync + 'static,
3026 {
3027 let stage = Arc::new(f);
3028 Flow::from_transform(move |input| {
3029 let stage = Arc::clone(&stage);
3030 Box::new(input.map(move |item| item.map(|item| stage(item))))
3031 })
3032 .via_mat(self, Keep::right)
3033 }
3034
3035 #[must_use]
3036 pub fn monitor<F>(self, f: F) -> Flow<In, Out, Mat>
3037 where
3038 Out: Clone,
3039 F: Fn(&Out) + Send + Sync + 'static,
3040 {
3041 let stage = Arc::new(f);
3042 self.via(Flow::from_transform(move |input| {
3043 let stage = Arc::clone(&stage);
3044 Box::new(input.map(move |item| match item {
3045 Ok(item) => {
3046 stage(&item);
3047 Ok(item)
3048 }
3049 Err(error) => Err(error),
3050 }))
3051 }))
3052 }
3053
3054 #[must_use]
3055 pub fn watch_termination<CallbackMat, F>(self, materialize_callback: F) -> Flow<In, Out, Mat>
3056 where
3057 Mat: Clone,
3058 CallbackMat: Send + 'static,
3059 F: Fn(Mat) -> CallbackMat + Send + Sync + 'static,
3060 {
3061 let cb = Arc::new(materialize_callback);
3062 self.map_materialized_value(move |mat| {
3063 let _ = cb(mat.clone());
3064 mat
3065 })
3066 }
3067
3068 #[must_use]
3069 pub fn to<SinkMat>(self, sink: Sink<Out, SinkMat>) -> Sink<In, Mat>
3070 where
3071 SinkMat: Send + 'static,
3072 {
3073 self.to_mat(sink, Keep::left)
3074 }
3075
3076 #[must_use]
3077 pub fn to_mat<SinkMat, Combined, F>(
3078 self,
3079 sink: Sink<Out, SinkMat>,
3080 combine: F,
3081 ) -> Sink<In, Combined>
3082 where
3083 SinkMat: Send + 'static,
3084 Combined: Send + 'static,
3085 F: Fn(Mat, SinkMat) -> Combined + Send + Sync + 'static,
3086 {
3087 let transform = self.transform;
3088 let materialize = self.materialize;
3089 let combine = Arc::new(combine);
3090 Sink::from_runner(move |input, materializer| {
3091 let flow_mat = materialize()?;
3092 let input = match &transform {
3093 FlowTransform::Pure(transform) => transform(input),
3094 FlowTransform::Runtime(transform) => transform(input, materializer)?,
3095 };
3096 let sink_mat = sink.run(input, materializer)?;
3097 Ok(combine(flow_mat, sink_mat))
3098 })
3099 }
3100
3101 #[must_use]
3102 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Flow<In, Out, NextMat>
3103 where
3104 NextMat: Send + 'static,
3105 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
3106 {
3107 let transform = self.transform;
3108 let materialize = self.materialize;
3109 let hints = self.hints;
3110 let attributes = self.attributes;
3111 let f = Arc::new(f);
3112 match transform {
3113 FlowTransform::Pure(transform) => Flow::from_parts_with_hints(
3114 move |input| transform(input),
3115 move || {
3116 let mat = materialize()?;
3117 Ok(f(mat))
3118 },
3119 hints,
3120 )
3121 .with_attributes(attributes),
3122 FlowTransform::Runtime(transform) => Flow {
3123 transform: FlowTransform::Runtime(transform),
3124 materialize: Arc::new(move || {
3125 let mat = materialize()?;
3126 Ok(f(mat))
3127 }),
3128 hints,
3129 attributes,
3130 },
3131 }
3132 }
3133}
3134
3135struct MapWithResourceStream<In, Out, Resource, Create, F, Close>
3136where
3137 Create: Fn() -> StreamResult<Resource>,
3138 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3139 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3140{
3141 input: BoxStream<In>,
3142 create: Arc<Create>,
3143 stage: Arc<F>,
3144 close: Arc<Close>,
3145 resource: Option<Resource>,
3146 created: bool,
3147 pending_terminal: Option<StreamError>,
3148 terminated: bool,
3149 _marker: PhantomData<fn() -> Out>,
3150}
3151
3152impl<In, Out, Resource, Create, F, Close> MapWithResourceStream<In, Out, Resource, Create, F, Close>
3153where
3154 Create: Fn() -> StreamResult<Resource>,
3155 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3156 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3157{
3158 fn ensure_created(&mut self) -> StreamResult<()> {
3159 if self.created {
3160 return Ok(());
3161 }
3162 self.created = true;
3163 let resource = catch_unwind_failed("map_with_resource create", || (self.create)())
3164 .and_then(|result| result)?;
3165 self.resource = Some(resource);
3166 Ok(())
3167 }
3168
3169 fn close_resource(&mut self) -> StreamResult<Option<Out>> {
3170 match self.resource.take() {
3171 Some(resource) => {
3172 catch_unwind_failed("map_with_resource close", || (self.close)(resource))
3173 .and_then(|result| result)
3174 }
3175 None => Ok(None),
3176 }
3177 }
3178
3179 fn close_with_terminal(&mut self, terminal: Option<StreamError>) -> Option<StreamResult<Out>> {
3180 self.terminated = terminal.is_none();
3181 match self.close_resource() {
3182 Ok(Some(item)) => {
3183 self.pending_terminal = terminal;
3184 Some(Ok(item))
3185 }
3186 Ok(None) => match terminal {
3187 Some(error) => {
3188 self.terminated = true;
3189 Some(Err(error))
3190 }
3191 None => {
3192 self.terminated = true;
3193 None
3194 }
3195 },
3196 Err(error) => {
3197 self.terminated = true;
3198 Some(Err(terminal.unwrap_or(error)))
3199 }
3200 }
3201 }
3202}
3203
3204impl<In, Out, Resource, Create, F, Close> Iterator
3205 for MapWithResourceStream<In, Out, Resource, Create, F, Close>
3206where
3207 Create: Fn() -> StreamResult<Resource>,
3208 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3209 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3210{
3211 type Item = StreamResult<Out>;
3212
3213 fn next(&mut self) -> Option<Self::Item> {
3214 if let Some(error) = self.pending_terminal.take() {
3215 self.terminated = true;
3216 return Some(Err(error));
3217 }
3218 if self.terminated {
3219 return None;
3220 }
3221 if let Err(error) = self.ensure_created() {
3222 self.terminated = true;
3223 return Some(Err(error));
3224 }
3225
3226 match self.input.next() {
3227 Some(Ok(item)) => {
3228 let result = {
3229 let resource = self
3230 .resource
3231 .as_mut()
3232 .expect("map_with_resource resource is open");
3233 catch_unwind_failed("map_with_resource function", || {
3234 (self.stage)(resource, item)
3235 })
3236 .and_then(|result| result)
3237 };
3238 match result {
3239 Ok(item) => Some(Ok(item)),
3240 Err(error) => self.close_with_terminal(Some(error)),
3241 }
3242 }
3243 Some(Err(error)) => self.close_with_terminal(Some(error)),
3244 None => self.close_with_terminal(None),
3245 }
3246 }
3247}
3248
3249impl<I1: Send + 'static, O1: Send + 'static, I2: Send + 'static, O2: Send + 'static>
3250 BidiFlow<I1, O1, I2, O2>
3251{
3252 #[must_use]
3253 pub fn from_flows<Mat1, Mat2>(top: Flow<I1, O1, Mat1>, bottom: Flow<I2, O2, Mat2>) -> Self
3254 where
3255 Mat1: Send + 'static,
3256 Mat2: Send + 'static,
3257 {
3258 Self {
3259 top: top.map_materialized_value(|_| NotUsed),
3260 bottom: bottom.map_materialized_value(|_| NotUsed),
3261 attributes: Attributes::default(),
3262 }
3263 }
3264}
3265
3266impl<I1: Send + 'static, O1: Send + 'static, I2: Send + 'static, O2: Send + 'static>
3267 BidiFlow<I1, O1, I2, O2>
3268{
3269 #[must_use]
3270 pub fn attributes(&self) -> &Attributes {
3271 &self.attributes
3272 }
3273
3274 #[must_use]
3275 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
3276 self.attributes = attributes;
3277 self
3278 }
3279
3280 #[must_use]
3281 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
3282 self.attributes = self.attributes.and(attributes);
3283 self
3284 }
3285
3286 #[must_use]
3287 pub fn named(self, name: impl Into<String>) -> Self {
3288 self.add_attributes(Attributes::named(name))
3289 }
3290
3291 #[must_use]
3292 pub fn join<Mat2>(self, flow: Flow<O1, I2, Mat2>) -> Flow<I1, O2, NotUsed>
3293 where
3294 Mat2: Send + 'static,
3295 {
3296 self.top
3297 .via(flow)
3298 .via(self.bottom)
3299 .map_materialized_value(|_| NotUsed)
3300 .with_attributes(self.attributes)
3301 }
3302
3303 #[must_use]
3304 pub fn atop<OO1: Send + 'static, II2: Send + 'static>(
3305 self,
3306 bidi: BidiFlow<O1, OO1, II2, I2>,
3307 ) -> BidiFlow<I1, OO1, II2, O2> {
3308 BidiFlow {
3309 top: self.top.via(bidi.top).map_materialized_value(|_| NotUsed),
3310 bottom: bidi
3311 .bottom
3312 .via(self.bottom)
3313 .map_materialized_value(|_| NotUsed),
3314 attributes: self.attributes.and(bidi.attributes),
3315 }
3316 }
3317
3318 #[must_use]
3319 pub fn reversed(self) -> BidiFlow<I2, O2, I1, O1> {
3320 BidiFlow {
3321 top: self.bottom,
3322 bottom: self.top.map_materialized_value(|_| NotUsed),
3323 attributes: self.attributes,
3324 }
3325 }
3326}
3327
3328impl<In, Out, Resource, Create, F, Close> Drop
3329 for MapWithResourceStream<In, Out, Resource, Create, F, Close>
3330where
3331 Create: Fn() -> StreamResult<Resource>,
3332 F: Fn(&mut Resource, In) -> StreamResult<Out>,
3333 Close: Fn(Resource) -> StreamResult<Option<Out>>,
3334{
3335 fn drop(&mut self) {
3336 let _ = self.close_resource();
3337 }
3338}
3339
3340fn materialize_inner_flow<In, Out, InnerMat>(
3341 flow: Flow<In, Out, InnerMat>,
3342 input: BoxStream<In>,
3343 materializer: &Materializer,
3344) -> StreamResult<(BoxStream<Out>, InnerMat)>
3345where
3346 In: Send + 'static,
3347 Out: Send + 'static,
3348 InnerMat: Send + 'static,
3349{
3350 let mat = (flow.materialize)()?;
3351 let stream = match flow.transform {
3352 FlowTransform::Pure(transform) => transform(input),
3353 FlowTransform::Runtime(transform) => transform(input, materializer)?,
3354 };
3355 Ok((stream, mat))
3356}
3357
3358struct FutureFlowStream<In, Out, InnerMat, F, Fut> {
3359 future: Arc<F>,
3360 materializer: Materializer,
3361 input: Option<BoxStream<In>>,
3362 current: Option<BoxStream<Out>>,
3363 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
3364 initialized: bool,
3365 terminated: bool,
3366 _marker: PhantomData<fn() -> Fut>,
3367}
3368
3369impl<In, Out, InnerMat, F, Fut> FutureFlowStream<In, Out, InnerMat, F, Fut>
3370where
3371 In: Send + 'static,
3372 Out: Send + 'static,
3373 InnerMat: Send + 'static,
3374 F: Fn() -> Fut,
3375 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3376{
3377 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
3378 if let Some(sender) = self.mat_sender.take() {
3379 let _ = sender.send(result);
3380 }
3381 }
3382
3383 fn initialize(&mut self) -> StreamResult<()> {
3384 if self.initialized {
3385 return Ok(());
3386 }
3387 self.initialized = true;
3388 let flow = match catch_unwind_failed("future_flow factory", || (self.future)())
3389 .and_then(run_future_inline_or_spawn)
3390 {
3391 Ok(flow) => flow,
3392 Err(error) => {
3393 self.complete_mat(Err(error.clone()));
3394 return Err(error);
3395 }
3396 };
3397 let input = self.input.take().expect("future_flow input available");
3398 match materialize_inner_flow(flow, input, &self.materializer) {
3399 Ok((stream, mat)) => {
3400 self.current = Some(stream);
3401 self.complete_mat(Ok(mat));
3402 Ok(())
3403 }
3404 Err(error) => {
3405 self.complete_mat(Err(error.clone()));
3406 Err(error)
3407 }
3408 }
3409 }
3410}
3411
3412impl<In, Out, InnerMat, F, Fut> Iterator for FutureFlowStream<In, Out, InnerMat, F, Fut>
3413where
3414 In: Send + 'static,
3415 Out: Send + 'static,
3416 InnerMat: Send + 'static,
3417 F: Fn() -> Fut,
3418 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3419{
3420 type Item = StreamResult<Out>;
3421
3422 fn next(&mut self) -> Option<Self::Item> {
3423 if self.terminated {
3424 return None;
3425 }
3426 if let Err(error) = self.initialize() {
3427 self.terminated = true;
3428 return Some(Err(error));
3429 }
3430 match self
3431 .current
3432 .as_mut()
3433 .expect("future_flow current stream initialized")
3434 .next()
3435 {
3436 Some(Ok(item)) => Some(Ok(item)),
3437 Some(Err(error)) => {
3438 self.terminated = true;
3439 Some(Err(error))
3440 }
3441 None => {
3442 self.terminated = true;
3443 None
3444 }
3445 }
3446 }
3447}
3448
3449impl<In, Out, InnerMat, F, Fut> Drop for FutureFlowStream<In, Out, InnerMat, F, Fut> {
3450 fn drop(&mut self) {
3451 if !self.initialized
3452 && let Some(sender) = self.mat_sender.take()
3453 {
3454 let _ = sender.send(Err(StreamError::Failed(
3455 "future flow was never materialized".into(),
3456 )));
3457 }
3458 }
3459}
3460
3461struct LazyFutureFlowStream<In, Out, InnerMat, F, Fut> {
3462 create: Arc<F>,
3463 materializer: Materializer,
3464 input: Option<BoxStream<In>>,
3465 current: Option<BoxStream<Out>>,
3466 mat_sender: Option<oneshot::Sender<StreamResult<InnerMat>>>,
3467 initialized: bool,
3468 terminated: bool,
3469 _marker: PhantomData<fn() -> Fut>,
3470}
3471
3472impl<In, Out, InnerMat, F, Fut> LazyFutureFlowStream<In, Out, InnerMat, F, Fut>
3473where
3474 In: Send + 'static,
3475 Out: Send + 'static,
3476 InnerMat: Send + 'static,
3477 F: Fn() -> Fut,
3478 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3479{
3480 fn complete_mat(&mut self, result: StreamResult<InnerMat>) {
3481 if let Some(sender) = self.mat_sender.take() {
3482 let _ = sender.send(result);
3483 }
3484 }
3485
3486 fn initialize(&mut self) -> Result<bool, StreamError> {
3487 if self.initialized {
3488 return Ok(true);
3489 }
3490 self.initialized = true;
3491 let first = match self
3492 .input
3493 .as_mut()
3494 .expect("lazy_future_flow input available")
3495 .next()
3496 {
3497 Some(Ok(item)) => item,
3498 Some(Err(error)) => {
3499 self.complete_mat(Err(error.clone()));
3500 return Err(error);
3501 }
3502 None => {
3503 self.complete_mat(Err(StreamError::Failed(
3504 "lazy flow was never materialized".into(),
3505 )));
3506 self.terminated = true;
3507 return Ok(false);
3508 }
3509 };
3510
3511 let flow = match catch_unwind_failed("lazy_future_flow factory", || (self.create)())
3512 .and_then(run_future_inline_or_spawn)
3513 {
3514 Ok(flow) => flow,
3515 Err(error) => {
3516 self.complete_mat(Err(error.clone()));
3517 return Err(error);
3518 }
3519 };
3520 let input = prepend_first_stream(
3521 first,
3522 self.input
3523 .take()
3524 .expect("lazy_future_flow input available after first element"),
3525 );
3526 match materialize_inner_flow(flow, input, &self.materializer) {
3527 Ok((stream, mat)) => {
3528 self.current = Some(stream);
3529 self.complete_mat(Ok(mat));
3530 Ok(true)
3531 }
3532 Err(error) => {
3533 self.complete_mat(Err(error.clone()));
3534 Err(error)
3535 }
3536 }
3537 }
3538}
3539
3540impl<In, Out, InnerMat, F, Fut> Iterator for LazyFutureFlowStream<In, Out, InnerMat, F, Fut>
3541where
3542 In: Send + 'static,
3543 Out: Send + 'static,
3544 InnerMat: Send + 'static,
3545 F: Fn() -> Fut,
3546 Fut: Future<Output = StreamResult<Flow<In, Out, InnerMat>>> + Send + 'static,
3547{
3548 type Item = StreamResult<Out>;
3549
3550 fn next(&mut self) -> Option<Self::Item> {
3551 if self.terminated {
3552 return None;
3553 }
3554 match self.initialize() {
3555 Ok(true) => {}
3556 Ok(false) => return None,
3557 Err(error) => {
3558 self.terminated = true;
3559 return Some(Err(error));
3560 }
3561 }
3562 match self
3563 .current
3564 .as_mut()
3565 .expect("lazy_future_flow current stream initialized")
3566 .next()
3567 {
3568 Some(Ok(item)) => Some(Ok(item)),
3569 Some(Err(error)) => {
3570 self.terminated = true;
3571 Some(Err(error))
3572 }
3573 None => {
3574 self.terminated = true;
3575 None
3576 }
3577 }
3578 }
3579}
3580
3581impl<In, Out, InnerMat, F, Fut> Drop for LazyFutureFlowStream<In, Out, InnerMat, F, Fut> {
3582 fn drop(&mut self) {
3583 if !self.initialized
3584 && let Some(sender) = self.mat_sender.take()
3585 {
3586 let _ = sender.send(Err(StreamError::Failed(
3587 "lazy flow was never materialized".into(),
3588 )));
3589 }
3590 }
3591}
3592
3593fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
3594where
3595 In: Send + 'static,
3596{
3597 let mut first = Some(first);
3598 Box::new(std::iter::from_fn(move || {
3599 if let Some(item) = first.take() {
3600 Some(Ok(item))
3601 } else {
3602 rest.next()
3603 }
3604 }))
3605}
3606
3607pub(super) fn poll_once_or_pending<Fut, T>(future: Fut) -> Result<StreamResult<T>, Pin<Box<Fut>>>
3608where
3609 Fut: Future<Output = StreamResult<T>>,
3610{
3611 let mut future = Box::pin(future);
3612 let _guard = stream_tokio_runtime().enter();
3613 let waker = noop_waker();
3614 let mut cx = Context::from_waker(&waker);
3615 match catch_unwind(AssertUnwindSafe(|| future.as_mut().poll(&mut cx))) {
3616 Ok(Poll::Ready(output)) => Ok(output),
3617 Ok(Poll::Pending) => Err(future),
3618 Err(_) => Ok(Err(StreamError::Failed("future task panicked".into()))),
3619 }
3620}
3621
3622pub(super) fn spawn_completion_task<Fut, T, Msg, Map>(
3623 task_id: usize,
3624 future: Pin<Box<Fut>>,
3625 sender: std::sync::mpsc::Sender<(usize, Msg)>,
3626 map: Map,
3627) -> AbortOnDropHandle<()>
3628where
3629 Fut: Future<Output = StreamResult<T>> + Send + 'static,
3630 T: Send + 'static,
3631 Msg: Send + 'static,
3632 Map: FnOnce(StreamResult<T>) -> Msg + Send + 'static,
3633{
3634 spawn_tokio_task(async move {
3635 let result = AssertUnwindSafe(future).catch_unwind().await;
3636 let message = match result {
3637 Ok(output) => map(output),
3638 Err(_) => map(Err(StreamError::Failed("future task panicked".into()))),
3639 };
3640 let _ = sender.send((task_id, message));
3641 })
3642}
3643
3644pub(super) fn recv_completion<Msg>(
3645 receiver: &std::sync::mpsc::Receiver<(usize, Msg)>,
3646) -> Option<(usize, Msg)> {
3647 let mut idle_spins = 0;
3648 loop {
3649 match receiver.try_recv() {
3650 Ok(message) => return Some(message),
3651 Err(std::sync::mpsc::TryRecvError::Disconnected) => return None,
3652 Err(std::sync::mpsc::TryRecvError::Empty) if idle_spins < STREAM_READY_SPINS => {
3653 idle_spins += STREAM_SPIN_BACKOFF;
3654 for _ in 0..STREAM_SPIN_BACKOFF {
3655 std::hint::spin_loop();
3656 }
3657 }
3658 Err(std::sync::mpsc::TryRecvError::Empty) => {
3659 idle_spins = 0;
3660 match receiver.recv_timeout(STREAM_MAX_PARK) {
3661 Ok(message) => return Some(message),
3662 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
3663 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => return None,
3664 }
3665 }
3666 }
3667 }
3668}
3669
3670pub(super) fn run_future_inline_or_spawn<Fut, T>(future: Fut) -> StreamResult<T>
3671where
3672 Fut: Future<Output = StreamResult<T>> + Send + 'static,
3673 T: Send + 'static,
3674{
3675 match poll_once_or_pending(future) {
3676 Ok(result) => result,
3677 Err(future) => {
3678 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<T>)>();
3679 let _task = spawn_completion_task(0, future, sender, |result| result);
3680 recv_completion(&receiver)
3681 .map(|(_, result)| result)
3682 .unwrap_or_else(|| Err(StreamError::Failed("future task dropped".into())))
3683 }
3684 }
3685}
3686
3687fn map_async_ordered<Out, Next, F, Fut>(
3688 mut input: BoxStream<Out>,
3689 parallelism: usize,
3690 stage: Arc<F>,
3691) -> BoxStream<Next>
3692where
3693 Out: Send + 'static,
3694 Next: Send + 'static,
3695 F: Fn(Out) -> Fut + Send + Sync + 'static,
3696 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
3697{
3698 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
3699 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
3700 let mut next_index = 0_usize;
3701 let mut next_to_emit = 0_usize;
3702 let mut completed = BTreeMap::new();
3703 let mut input_done = false;
3704
3705 Box::new(std::iter::from_fn(move || {
3706 loop {
3707 if let Some(result) = completed.remove(&next_to_emit) {
3708 next_to_emit += 1;
3709 return Some(result);
3710 }
3711
3712 while tasks.len() + completed.len() < parallelism && !input_done {
3713 match input.next() {
3714 Some(Ok(item)) => {
3715 let index = next_index;
3716 next_index += 1;
3717 match poll_once_or_pending(stage(item)) {
3718 Ok(result) => {
3719 if index == next_to_emit {
3720 next_to_emit += 1;
3721 return Some(result);
3722 }
3723 completed.insert(index, result);
3724 }
3725 Err(future) => {
3726 tasks.insert(
3727 index,
3728 spawn_completion_task(
3729 index,
3730 future,
3731 sender.clone(),
3732 |result| result,
3733 ),
3734 );
3735 }
3736 }
3737 }
3738 Some(Err(error)) => {
3739 completed.insert(next_index, Err(error));
3740 next_index += 1;
3741 input_done = true;
3742 }
3743 None => input_done = true,
3744 }
3745 }
3746
3747 if let Some(result) = completed.remove(&next_to_emit) {
3748 next_to_emit += 1;
3749 return Some(result);
3750 }
3751
3752 if tasks.is_empty() {
3753 return None;
3754 }
3755
3756 if let Some((index, result)) = recv_completion(&receiver) {
3757 tasks.remove(&index);
3758 if index == next_to_emit {
3759 next_to_emit += 1;
3760 return Some(result);
3761 }
3762 completed.insert(index, result);
3763 }
3764 }
3765 }))
3766}
3767
3768fn supervise_async_result<Next>(
3769 result: StreamResult<Next>,
3770 decider: &SupervisionDecider,
3771) -> Option<StreamResult<Next>> {
3772 match result {
3773 Ok(item) => Some(Ok(item)),
3774 Err(error) => match decide_supervision(decider, &error) {
3775 SupervisionDirective::Stop => Some(Err(error)),
3776 SupervisionDirective::Resume | SupervisionDirective::Restart => None,
3777 },
3778 }
3779}
3780
3781fn map_async_ordered_supervised<Out, Next, F, Fut>(
3782 mut input: BoxStream<Out>,
3783 parallelism: usize,
3784 stage: Arc<F>,
3785 decider: SupervisionDecider,
3786) -> BoxStream<Next>
3787where
3788 Out: Send + 'static,
3789 Next: Send + 'static,
3790 F: Fn(Out) -> Fut + Send + Sync + 'static,
3791 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
3792{
3793 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
3794 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
3795 let mut next_index = 0_usize;
3796 let mut next_to_emit = 0_usize;
3797 let mut completed = BTreeMap::<usize, Option<StreamResult<Next>>>::new();
3798 let mut input_done = false;
3799
3800 Box::new(std::iter::from_fn(move || {
3801 loop {
3802 while let Some(result) = completed.remove(&next_to_emit) {
3803 next_to_emit += 1;
3804 if let Some(result) = result {
3805 return Some(result);
3806 }
3807 }
3808
3809 while tasks.len() + completed.len() < parallelism && !input_done {
3810 match input.next() {
3811 Some(Ok(item)) => {
3812 let index = next_index;
3813 next_index += 1;
3814 match catch_unwind(AssertUnwindSafe(|| poll_once_or_pending(stage(item)))) {
3815 Ok(Ok(result)) => {
3816 let result = supervise_async_result(result, &decider);
3817 if index == next_to_emit {
3818 next_to_emit += 1;
3819 if let Some(result) = result {
3820 return Some(result);
3821 }
3822 } else {
3823 completed.insert(index, result);
3824 }
3825 }
3826 Ok(Err(future)) => {
3827 tasks.insert(
3828 index,
3829 spawn_completion_task(
3830 index,
3831 future,
3832 sender.clone(),
3833 |result| result,
3834 ),
3835 );
3836 }
3837 Err(_) => {
3838 let error = panic_stream_error("map_async callback");
3839 let result = supervise_async_result(Err(error), &decider);
3840 if index == next_to_emit {
3841 next_to_emit += 1;
3842 if let Some(result) = result {
3843 return Some(result);
3844 }
3845 } else {
3846 completed.insert(index, result);
3847 }
3848 }
3849 }
3850 }
3851 Some(Err(error)) => {
3852 completed.insert(next_index, Some(Err(error)));
3853 next_index += 1;
3854 input_done = true;
3855 }
3856 None => input_done = true,
3857 }
3858 }
3859
3860 while let Some(result) = completed.remove(&next_to_emit) {
3861 next_to_emit += 1;
3862 if let Some(result) = result {
3863 return Some(result);
3864 }
3865 }
3866
3867 if tasks.is_empty() {
3868 return None;
3869 }
3870
3871 if let Some((index, result)) = recv_completion(&receiver) {
3872 tasks.remove(&index);
3873 let result = supervise_async_result(result, &decider);
3874 if index == next_to_emit {
3875 next_to_emit += 1;
3876 if let Some(result) = result {
3877 return Some(result);
3878 }
3879 } else {
3880 completed.insert(index, result);
3881 }
3882 }
3883 }
3884 }))
3885}
3886
3887fn concat_streams<Out>(streams: Vec<BoxStream<Out>>) -> BoxStream<Out>
3888where
3889 Out: Send + 'static,
3890{
3891 let mut streams: VecDeque<_> = streams.into();
3892 let mut current = streams.pop_front();
3893 Box::new(std::iter::from_fn(move || {
3894 loop {
3895 match current.as_mut() {
3896 Some(stream) => match stream.next() {
3897 Some(item) => return Some(item),
3898 None => current = streams.pop_front(),
3899 },
3900 None => return None,
3901 }
3902 }
3903 }))
3904}
3905
3906fn concat_streams_lazy<Out, Mat>(
3907 initial: BoxStream<Out>,
3908 factories: Vec<Arc<dyn SourceFactory<Out, Mat>>>,
3909 materializer: &Materializer,
3910) -> BoxStream<Out>
3911where
3912 Out: Send + 'static,
3913 Mat: Send + 'static,
3914{
3915 let mut current = Some(initial);
3916 let mut remaining: VecDeque<_> = factories.into();
3917 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
3918 Box::new(std::iter::from_fn(move || {
3919 loop {
3920 match current.as_mut() {
3921 Some(stream) => match stream.next() {
3922 Some(item) => return Some(item),
3923 None => {
3924 current = remaining.pop_front().map(|factory| {
3925 match factory.create(&materializer) {
3926 Ok((stream, _)) => stream,
3927 Err(error) => {
3928 Box::new(std::iter::once(Err(error))) as BoxStream<Out>
3929 }
3930 }
3931 });
3932 }
3933 },
3934 None => return None,
3935 }
3936 }
3937 }))
3938}
3939
3940fn or_else_stream<Out>(mut primary: BoxStream<Out>, mut secondary: BoxStream<Out>) -> BoxStream<Out>
3941where
3942 Out: Send + 'static,
3943{
3944 let mut primary_emitted = false;
3945 let mut using_secondary = false;
3946 Box::new(std::iter::from_fn(move || {
3947 loop {
3948 if using_secondary {
3949 return secondary.next();
3950 }
3951
3952 match primary.next() {
3953 Some(Ok(item)) => {
3954 primary_emitted = true;
3955 return Some(Ok(item));
3956 }
3957 Some(Err(error)) => return Some(Err(error)),
3958 None if primary_emitted => return None,
3959 None => using_secondary = true,
3960 }
3961 }
3962 }))
3963}
3964
3965fn interleave_streams<Out>(
3966 streams: Vec<BoxStream<Out>>,
3967 segment_size: usize,
3968 eager_close: bool,
3969) -> BoxStream<Out>
3970where
3971 Out: Send + 'static,
3972{
3973 if segment_size == 0 {
3974 return Box::new(std::iter::once(Err(StreamError::GraphValidation(
3975 "interleave segment size must be greater than zero".into(),
3976 ))));
3977 }
3978
3979 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
3980 let mut pending: Vec<Option<StreamResult<Out>>> = (0..streams.len()).map(|_| None).collect();
3981 let mut current = 0usize;
3982 let mut emitted = 0usize;
3983 Box::new(std::iter::from_fn(move || {
3984 loop {
3985 if streams.iter().all(Option::is_none) {
3986 return None;
3987 }
3988 if streams[current].is_none() {
3989 match next_active_stream(&streams, current) {
3990 Some(next) => {
3991 current = next;
3992 emitted = 0;
3993 }
3994 None => return None,
3995 }
3996 }
3997
3998 let Some(stream) = streams[current].as_mut() else {
3999 continue;
4000 };
4001 let next_item = pending[current].take().or_else(|| stream.next());
4002 match next_item {
4003 Some(Ok(item)) => {
4004 emitted += 1;
4005 if emitted == segment_size {
4006 emitted = 0;
4007 if let Some(next) = next_active_stream(&streams, current) {
4008 current = next;
4009 }
4010 }
4011 return Some(Ok(item));
4012 }
4013 Some(Err(error)) => return Some(Err(error)),
4014 None => {
4015 streams[current] = None;
4016 emitted = 0;
4017 if eager_close {
4018 return None;
4019 }
4020 match next_active_stream(&streams, current) {
4021 Some(next) => current = next,
4022 None => return None,
4023 }
4024 }
4025 }
4026 }
4027 }))
4028}
4029
4030fn next_active_stream<Out>(streams: &[Option<BoxStream<Out>>], current: usize) -> Option<usize>
4031where
4032 Out: Send + 'static,
4033{
4034 if streams.is_empty() {
4035 return None;
4036 }
4037 for offset in 1..=streams.len() {
4038 let index = (current + offset) % streams.len();
4039 if streams[index].is_some() {
4040 return Some(index);
4041 }
4042 }
4043 None
4044}
4045
4046fn materialize_side_sink<Out, Mat>(
4047 sink: &Sink<Out, Mat>,
4048 materializer: &Materializer,
4049 buffer: usize,
4050) -> StreamResult<(std::sync::mpsc::SyncSender<StreamResult<Out>>, Mat)>
4051where
4052 Out: Send + 'static,
4053 Mat: Send + 'static,
4054{
4055 let (sender, receiver) = std::sync::mpsc::sync_channel(buffer);
4056 let mat = sink.run(side_receiver_stream(receiver), materializer)?;
4057 Ok((sender, mat))
4058}
4059
4060fn side_receiver_stream<Out>(
4061 receiver: std::sync::mpsc::Receiver<StreamResult<Out>>,
4062) -> BoxStream<Out>
4063where
4064 Out: Send + 'static,
4065{
4066 Box::new(std::iter::from_fn(move || receiver.recv().ok()))
4067}
4068
4069#[derive(Clone)]
4070enum LiveSubstreamTerminal {
4071 Complete,
4072 Error(StreamError),
4073}
4074
4075const LIVE_SUBSTREAM_CAPACITY: usize = 256;
4076const LIVE_SUBSTREAM_BATCH: usize = 64;
4077const FLAT_MAP_MERGE_SUBSTREAM_WINDOW: usize = 64;
4078
4079struct LiveSubstreamShared<T> {
4080 state: Mutex<LiveSubstreamState<T>>,
4081 available: Condvar,
4082 cancelled: Arc<AtomicBool>,
4083 capacity: usize,
4084 batch_size: usize,
4085}
4086
4087struct LiveSubstreamState<T> {
4088 buffered: usize,
4089 batches: VecDeque<VecDeque<T>>,
4090 terminal: Option<LiveSubstreamTerminal>,
4091}
4092
4093impl<T> LiveSubstreamShared<T> {
4094 fn new() -> Arc<Self> {
4095 Self::with_capacity(LIVE_SUBSTREAM_CAPACITY)
4096 }
4097
4098 fn with_capacity(capacity: usize) -> Arc<Self> {
4099 Self::with_batching(capacity, LIVE_SUBSTREAM_BATCH)
4100 }
4101
4102 fn with_batching(capacity: usize, batch_size: usize) -> Arc<Self> {
4103 Arc::new(Self {
4104 state: Mutex::new(LiveSubstreamState {
4105 buffered: 0,
4106 batches: VecDeque::new(),
4107 terminal: None,
4108 }),
4109 available: Condvar::new(),
4110 cancelled: Arc::new(AtomicBool::new(false)),
4111 capacity,
4112 batch_size: batch_size.max(1),
4113 })
4114 }
4115}
4116
4117struct LiveSubstreamStream<T> {
4118 shared: Arc<LiveSubstreamShared<T>>,
4119 completion: Option<StreamCompletion<NotUsed>>,
4120 local_batch: VecDeque<T>,
4121}
4122
4123impl<T> Iterator for LiveSubstreamStream<T> {
4124 type Item = StreamResult<T>;
4125
4126 fn next(&mut self) -> Option<Self::Item> {
4127 if let Some(item) = self.local_batch.pop_front() {
4128 return Some(Ok(item));
4129 }
4130
4131 let mut state = self
4132 .shared
4133 .state
4134 .lock()
4135 .unwrap_or_else(|poison| poison.into_inner());
4136 loop {
4137 if let Some(mut batch) = state.batches.pop_front() {
4138 state.buffered -= batch.len();
4139 drop(state);
4140 self.shared.available.notify_all();
4141 let item = batch.pop_front().expect("live substream batch has an item");
4142 self.local_batch = batch;
4143 return Some(Ok(item));
4144 }
4145 if let Some(terminal) = state.terminal.clone() {
4146 return match terminal {
4147 LiveSubstreamTerminal::Complete => None,
4148 LiveSubstreamTerminal::Error(error) => Some(Err(error)),
4149 };
4150 }
4151 state = self
4152 .shared
4153 .available
4154 .wait(state)
4155 .unwrap_or_else(|poison| poison.into_inner());
4156 }
4157 }
4158}
4159
4160impl<T> Drop for LiveSubstreamStream<T> {
4161 fn drop(&mut self) {
4162 self.shared.cancelled.store(true, Ordering::SeqCst);
4163 self.shared.available.notify_all();
4164 let _ = self.completion.take();
4165 }
4166}
4167
4168fn push_live_substream_batch<T>(
4172 shared: &Arc<LiveSubstreamShared<T>>,
4173 batch: &mut VecDeque<T>,
4174) -> Result<(), ()> {
4175 while !batch.is_empty() {
4176 let mut state = shared
4177 .state
4178 .lock()
4179 .unwrap_or_else(|poison| poison.into_inner());
4180 while state.buffered >= shared.capacity && state.terminal.is_none() {
4181 if shared.cancelled.load(Ordering::SeqCst) {
4182 batch.clear();
4183 return Err(());
4184 }
4185 state = shared
4186 .available
4187 .wait(state)
4188 .unwrap_or_else(|poison| poison.into_inner());
4189 }
4190 if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
4191 batch.clear();
4192 return Err(());
4193 }
4194 let was_empty = state.buffered == 0;
4195 while state.buffered < shared.capacity && !batch.is_empty() {
4197 let item = batch.pop_front().expect("batch non-empty");
4198 if let Some(back) = state.batches.back_mut()
4199 && back.len() < shared.batch_size
4200 {
4201 back.push_back(item);
4202 } else {
4203 let mut new_batch =
4204 VecDeque::with_capacity(shared.batch_size.min(shared.capacity.max(1)));
4205 new_batch.push_back(item);
4206 state.batches.push_back(new_batch);
4207 }
4208 state.buffered += 1;
4209 }
4210 drop(state);
4211 if was_empty {
4212 shared.available.notify_all();
4213 }
4214 }
4215 Ok(())
4216}
4217
4218fn push_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>, item: T) -> Result<(), T> {
4219 let mut state = shared
4220 .state
4221 .lock()
4222 .unwrap_or_else(|poison| poison.into_inner());
4223 while state.buffered >= shared.capacity && state.terminal.is_none() {
4224 if shared.cancelled.load(Ordering::SeqCst) {
4225 return Err(item);
4226 }
4227 state = shared
4228 .available
4229 .wait(state)
4230 .unwrap_or_else(|poison| poison.into_inner());
4231 }
4232 if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
4233 return Err(item);
4234 }
4235 let was_empty = state.buffered == 0;
4236 if let Some(batch) = state.batches.back_mut()
4237 && batch.len() < shared.batch_size
4238 {
4239 batch.push_back(item);
4240 } else {
4241 let mut batch = VecDeque::with_capacity(shared.batch_size.min(shared.capacity.max(1)));
4242 batch.push_back(item);
4243 state.batches.push_back(batch);
4244 }
4245 state.buffered += 1;
4246 drop(state);
4247 if was_empty {
4248 shared.available.notify_all();
4249 }
4250 Ok(())
4251}
4252
4253fn complete_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>) {
4254 let mut state = shared
4255 .state
4256 .lock()
4257 .unwrap_or_else(|poison| poison.into_inner());
4258 if state.terminal.is_none() {
4259 state.terminal = Some(LiveSubstreamTerminal::Complete);
4260 }
4261 drop(state);
4262 shared.available.notify_all();
4263}
4264
4265fn fail_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>, error: StreamError) {
4266 let mut state = shared
4267 .state
4268 .lock()
4269 .unwrap_or_else(|poison| poison.into_inner());
4270 if state.terminal.is_none() {
4271 state.terminal = Some(LiveSubstreamTerminal::Error(error));
4272 }
4273 drop(state);
4274 shared.available.notify_all();
4275}
4276
4277fn cancel_live_substream<T>(shared: &Arc<LiveSubstreamShared<T>>) {
4278 fail_live_substream(shared, StreamError::Cancelled);
4279}
4280
4281fn source_from_live_substream<T>(shared: Arc<LiveSubstreamShared<T>>) -> Source<T>
4282where
4283 T: Send + 'static,
4284{
4285 let claimed = Arc::new(AtomicBool::new(false));
4286 Source::from_materialized_factory(move |_materializer| {
4287 if claimed.swap(true, Ordering::SeqCst) {
4288 return Err(StreamError::Failed(
4289 "substream source cannot be materialized more than once".into(),
4290 ));
4291 }
4292 Ok((
4293 Box::new(LiveSubstreamStream {
4294 shared: Arc::clone(&shared),
4295 completion: None,
4296 local_batch: VecDeque::new(),
4297 }) as BoxStream<T>,
4298 NotUsed,
4299 ))
4300 })
4301}
4302
4303fn source_from_once_stream<T>(stream: BoxStream<T>) -> Source<T>
4304where
4305 T: Send + 'static,
4306{
4307 let stream = Arc::new(Mutex::new(Some(stream)));
4308 Source::from_materialized_factory(move |_materializer| {
4309 let mut slot = stream.lock().unwrap_or_else(|poison| poison.into_inner());
4310 let stream = slot.take().ok_or_else(|| {
4311 StreamError::Failed("substream source cannot be materialized more than once".into())
4312 })?;
4313 Ok((stream, NotUsed))
4314 })
4315}
4316
4317fn prefix_and_tail_stream<Out>(
4318 input: BoxStream<Out>,
4319 n: usize,
4320) -> BoxStream<(Vec<Out>, Source<Out>)>
4321where
4322 Out: Send + 'static,
4323{
4324 let mut input = Some(input);
4325 let mut emitted = false;
4326 Box::new(std::iter::from_fn(move || {
4327 if emitted {
4328 return None;
4329 }
4330 emitted = true;
4331
4332 let mut prefix = Vec::with_capacity(n);
4333 while prefix.len() < n {
4334 match input
4335 .as_mut()
4336 .expect("prefix_and_tail input available")
4337 .next()
4338 {
4339 Some(Ok(item)) => prefix.push(item),
4340 Some(Err(error)) => return Some(Err(error)),
4341 None => return Some(Ok((prefix, Source::empty()))),
4342 }
4343 }
4344
4345 Some(Ok((
4346 prefix,
4347 source_from_once_stream(input.take().expect("tail input available")),
4348 )))
4349 }))
4350}
4351
4352struct GroupByWorkerGuard<Key, Out> {
4353 outer: Arc<LiveSubstreamShared<Source<Out>>>,
4354 active: HashMap<Key, Arc<LiveSubstreamShared<Out>>>,
4355 closed: HashSet<Key>,
4356 armed: bool,
4357}
4358
4359impl<Key, Out> GroupByWorkerGuard<Key, Out>
4360where
4361 Key: Eq + Hash,
4362{
4363 fn new(outer: Arc<LiveSubstreamShared<Source<Out>>>) -> Self {
4364 Self {
4365 outer,
4366 active: HashMap::new(),
4367 closed: HashSet::new(),
4368 armed: true,
4369 }
4370 }
4371
4372 fn disarm(&mut self) {
4373 self.armed = false;
4374 }
4375
4376 fn fail_all(&self, error: StreamError)
4377 where
4378 Out: Send + 'static,
4379 {
4380 fail_live_substream(&self.outer, error.clone());
4381 for substream in self.active.values() {
4382 fail_live_substream(substream, error.clone());
4383 }
4384 }
4385
4386 fn complete_all(&self)
4387 where
4388 Out: Send + 'static,
4389 {
4390 complete_live_substream(&self.outer);
4391 for substream in self.active.values() {
4392 complete_live_substream(substream);
4393 }
4394 }
4395
4396 fn cancel_all(&self)
4397 where
4398 Out: Send + 'static,
4399 {
4400 for substream in self.active.values() {
4401 cancel_live_substream(substream);
4402 }
4403 }
4404}
4405
4406impl<Key, Out> Drop for GroupByWorkerGuard<Key, Out> {
4407 fn drop(&mut self) {
4408 if self.armed {
4409 fail_live_substream(&self.outer, StreamError::AbruptTermination);
4410 for substream in self.active.values() {
4411 fail_live_substream(substream, StreamError::AbruptTermination);
4412 }
4413 }
4414 }
4415}
4416
4417fn group_by_flush_write_batch<Key, Out>(
4418 guard: &mut GroupByWorkerGuard<Key, Out>,
4419 wb_key: &mut Option<Key>,
4420 wb_sub: &mut Option<Arc<LiveSubstreamShared<Out>>>,
4421 wb_items: &mut VecDeque<Out>,
4422 allow_closed_substream_recreation: bool,
4423) where
4424 Key: Clone + Eq + Hash,
4425 Out: Send + 'static,
4426{
4427 if wb_items.is_empty() {
4428 return;
4429 }
4430 let key = wb_key.take().expect("wb_key set when wb_items non-empty");
4431 if let Some(ref sub) = *wb_sub {
4432 if push_live_substream_batch(sub, wb_items).is_err() {
4433 guard.active.remove(&key);
4434 if !allow_closed_substream_recreation {
4435 guard.closed.insert(key);
4436 }
4437 }
4438 } else {
4439 wb_items.clear();
4440 }
4441 *wb_sub = None;
4442}
4443
4444fn group_by_stream<Out, Key, F>(
4445 mut input: BoxStream<Out>,
4446 max_substreams: usize,
4447 allow_closed_substream_recreation: bool,
4448 key_fn: Arc<F>,
4449 batch_mode: GroupByBatchMode,
4450 materializer: &Materializer,
4451) -> BoxStream<Source<Out>>
4452where
4453 Out: Clone + Send + 'static,
4454 Key: Clone + Eq + Hash + Send + 'static,
4455 F: Fn(&Out) -> Key + Send + Sync + 'static,
4456{
4457 let outer = LiveSubstreamShared::new();
4458 let worker_outer = Arc::clone(&outer);
4459 let batch_repeated_keys = batch_mode == GroupByBatchMode::FiniteEagerNoRecreate;
4460 let completion = materializer.spawn_stream(move |cancelled| {
4461 let mut guard = GroupByWorkerGuard::new(worker_outer);
4462
4463 let mut wb_key: Option<Key> = None;
4468 let mut wb_sub: Option<Arc<LiveSubstreamShared<Out>>> = None;
4469 let mut wb_items: VecDeque<Out> = VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH);
4470
4471 while !cancelled.load(Ordering::SeqCst) {
4472 if guard.outer.cancelled.load(Ordering::SeqCst) {
4473 guard.disarm();
4474 return Ok(NotUsed);
4475 }
4476
4477 match input.next() {
4478 Some(Ok(item)) => {
4479 let key = match catch_unwind(AssertUnwindSafe(|| key_fn(&item))) {
4480 Ok(key) => key,
4481 Err(_panic) => {
4482 wb_items.clear();
4483 guard.fail_all(StreamError::AbruptTermination);
4484 guard.disarm();
4485 return Ok(NotUsed);
4486 }
4487 };
4488
4489 if let Some(current) = guard.active.get(&key)
4493 && current.cancelled.load(Ordering::SeqCst)
4494 {
4495 if wb_key.as_ref() == Some(&key) {
4498 wb_items.clear();
4499 wb_key = None;
4500 wb_sub = None;
4501 }
4502 guard.active.remove(&key);
4503 if !allow_closed_substream_recreation {
4504 guard.closed.insert(key.clone());
4505 }
4506 }
4507
4508 let mut item = item;
4509 if let Some(current) = guard.active.get(&key).cloned() {
4510 if !batch_repeated_keys {
4511 item = match push_live_substream(¤t, item) {
4512 Ok(()) => {
4513 continue;
4514 }
4515 Err(item) => item,
4516 };
4517 guard.active.remove(&key);
4518 if !allow_closed_substream_recreation {
4519 guard.closed.insert(key.clone());
4520 continue;
4521 }
4522 } else {
4523 if wb_key.as_ref() != Some(&key) {
4527 group_by_flush_write_batch(
4528 &mut guard,
4529 &mut wb_key,
4530 &mut wb_sub,
4531 &mut wb_items,
4532 allow_closed_substream_recreation,
4533 );
4534 wb_key = Some(key.clone());
4535 wb_sub = Some(current);
4536 }
4537 wb_items.push_back(item);
4538 if wb_items.len() >= LIVE_SUBSTREAM_BATCH {
4539 group_by_flush_write_batch(
4540 &mut guard,
4541 &mut wb_key,
4542 &mut wb_sub,
4543 &mut wb_items,
4544 allow_closed_substream_recreation,
4545 );
4546 }
4547 continue;
4548 }
4549 }
4550
4551 if !wb_items.is_empty() {
4554 group_by_flush_write_batch(
4555 &mut guard,
4556 &mut wb_key,
4557 &mut wb_sub,
4558 &mut wb_items,
4559 allow_closed_substream_recreation,
4560 );
4561 }
4562
4563 if guard.closed.contains(&key) {
4564 continue;
4565 }
4566
4567 if guard.active.len() + guard.closed.len() == max_substreams {
4568 let error = StreamError::Failed(format!(
4569 "group_by reached max_substreams ({max_substreams})"
4570 ));
4571 guard.fail_all(error.clone());
4572 guard.disarm();
4573 return Err(error);
4574 }
4575
4576 let substream = LiveSubstreamShared::with_capacity(LIVE_SUBSTREAM_CAPACITY);
4577 push_live_substream(&substream, item)
4578 .unwrap_or_else(|_| unreachable!("fresh group_by substream"));
4579 guard.active.insert(key.clone(), Arc::clone(&substream));
4580 if push_live_substream(
4581 &guard.outer,
4582 source_from_live_substream(Arc::clone(&substream)),
4583 )
4584 .is_err()
4585 {
4586 guard.cancel_all();
4587 cancel_live_substream(&substream);
4588 guard.disarm();
4589 return Ok(NotUsed);
4590 }
4591 }
4592 Some(Err(error)) => {
4593 wb_items.clear();
4594 guard.fail_all(error.clone());
4595 guard.disarm();
4596 return Err(error);
4597 }
4598 None => {
4599 group_by_flush_write_batch(
4600 &mut guard,
4601 &mut wb_key,
4602 &mut wb_sub,
4603 &mut wb_items,
4604 allow_closed_substream_recreation,
4605 );
4606 guard.complete_all();
4607 guard.disarm();
4608 return Ok(NotUsed);
4609 }
4610 }
4611 }
4612
4613 group_by_flush_write_batch(
4614 &mut guard,
4615 &mut wb_key,
4616 &mut wb_sub,
4617 &mut wb_items,
4618 allow_closed_substream_recreation,
4619 );
4620 guard.complete_all();
4621 guard.disarm();
4622 Ok(NotUsed)
4623 });
4624
4625 Box::new(LiveSubstreamStream {
4626 shared: outer,
4627 completion: Some(completion),
4628 local_batch: VecDeque::new(),
4629 })
4630}
4631
4632#[derive(Clone, Copy, Debug)]
4633enum SplitMode {
4634 When,
4635 After,
4636}
4637
4638#[cfg(test)]
4639struct SplitWorkerGuard<Out> {
4640 outer: Arc<LiveSubstreamShared<Source<Out>>>,
4641 current: Option<Arc<LiveSubstreamShared<Out>>>,
4642 armed: bool,
4643 pending: VecDeque<Out>,
4647}
4648
4649#[cfg(test)]
4650impl<Out> SplitWorkerGuard<Out> {
4651 fn new(outer: Arc<LiveSubstreamShared<Source<Out>>>) -> Self {
4652 Self {
4653 outer,
4654 current: None,
4655 armed: true,
4656 pending: VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH),
4657 }
4658 }
4659
4660 fn disarm(&mut self) {
4661 self.armed = false;
4662 }
4663
4664 fn open_segment(&mut self) -> Result<(), ()>
4667 where
4668 Out: Send + 'static,
4669 {
4670 let substream = LiveSubstreamShared::new();
4671 self.current = Some(Arc::clone(&substream));
4672 push_live_substream(&self.outer, source_from_live_substream(substream)).map_err(|_| ())
4673 }
4674
4675 fn flush_pending(&mut self) -> Result<(), ()>
4678 where
4679 Out: Send + 'static,
4680 {
4681 if self.pending.is_empty() {
4682 return Ok(());
4683 }
4684 match self.current {
4685 Some(ref current) => push_live_substream_batch(current, &mut self.pending),
4686 None => {
4687 self.pending.clear();
4688 Ok(())
4689 }
4690 }
4691 }
4692
4693 fn push_item(&mut self, item: Out) -> Result<(), ()>
4697 where
4698 Out: Send + 'static,
4699 {
4700 if self.current.is_none() {
4701 return Ok(());
4703 }
4704 self.pending.push_back(item);
4705 if self.pending.len() >= LIVE_SUBSTREAM_BATCH {
4706 self.flush_pending()
4707 } else {
4708 Ok(())
4709 }
4710 }
4711
4712 fn close_segment(&mut self)
4715 where
4716 Out: Send + 'static,
4717 {
4718 let _ = self.flush_pending();
4719 if let Some(current) = self.current.take() {
4720 complete_live_substream(¤t);
4721 }
4722 }
4723
4724 fn fail_current(&mut self, error: StreamError)
4725 where
4726 Out: Send + 'static,
4727 {
4728 self.pending.clear();
4729 if let Some(current) = self.current.take() {
4730 fail_live_substream(¤t, error);
4731 }
4732 }
4733
4734 fn fail_all(&mut self, error: StreamError)
4735 where
4736 Out: Send + 'static,
4737 {
4738 self.fail_current(error.clone());
4739 fail_live_substream(&self.outer, error);
4740 }
4741
4742 fn complete_all(&mut self)
4743 where
4744 Out: Send + 'static,
4745 {
4746 self.close_segment();
4747 complete_live_substream(&self.outer);
4748 }
4749}
4750
4751#[cfg(test)]
4752impl<Out> Drop for SplitWorkerGuard<Out> {
4753 fn drop(&mut self) {
4754 if self.armed {
4755 self.pending.clear();
4756 if let Some(current) = self.current.take() {
4757 fail_live_substream(¤t, StreamError::AbruptTermination);
4758 }
4759 fail_live_substream(&self.outer, StreamError::AbruptTermination);
4760 }
4761 }
4762}
4763
4764fn split_streams<Out, F>(
4765 input: BoxStream<Out>,
4766 mode: SplitMode,
4767 predicate: Arc<F>,
4768 materializer: &Materializer,
4769) -> BoxStream<Source<Out>>
4770where
4771 Out: Clone + Send + 'static,
4772 F: Fn(&Out) -> bool + Send + Sync + 'static,
4773{
4774 #[cfg(test)]
4775 if current_substream_mode() == SubstreamExecutorMode::LegacyOnly {
4776 return split_streams_legacy(input, mode, predicate, materializer);
4777 }
4778 let parent_cancelled = Arc::new(AtomicBool::new(false));
4779 split_streams_fast(input, mode, predicate, parent_cancelled, materializer)
4780}
4781
4782#[cfg(test)]
4783fn split_streams_legacy<Out, F>(
4784 mut input: BoxStream<Out>,
4785 mode: SplitMode,
4786 predicate: Arc<F>,
4787 materializer: &Materializer,
4788) -> BoxStream<Source<Out>>
4789where
4790 Out: Clone + Send + 'static,
4791 F: Fn(&Out) -> bool + Send + Sync + 'static,
4792{
4793 let outer = LiveSubstreamShared::new();
4794 let worker_outer = Arc::clone(&outer);
4795 let completion = materializer.spawn_stream(move |cancelled| {
4796 let mut guard = SplitWorkerGuard::new(Arc::clone(&worker_outer));
4797
4798 while !cancelled.load(Ordering::SeqCst) {
4799 if worker_outer.cancelled.load(Ordering::SeqCst) {
4800 guard.disarm();
4801 return Ok(NotUsed);
4802 }
4803
4804 match input.next() {
4805 Some(Ok(item)) => {
4806 let split = match catch_unwind(AssertUnwindSafe(|| predicate(&item))) {
4807 Ok(split) => split,
4808 Err(_panic) => {
4809 guard.fail_all(StreamError::AbruptTermination);
4810 guard.disarm();
4811 return Ok(NotUsed);
4812 }
4813 };
4814
4815 match mode {
4816 SplitMode::When => {
4817 if split && guard.current.is_some() {
4818 guard.close_segment();
4820 }
4821 if guard.current.is_none() && guard.open_segment().is_err() {
4823 guard.disarm();
4824 return Ok(NotUsed);
4825 }
4826 if guard.push_item(item).is_err() {
4827 guard.disarm();
4828 return Ok(NotUsed);
4829 }
4830 }
4831 SplitMode::After => {
4832 if guard.current.is_none() && guard.open_segment().is_err() {
4833 guard.disarm();
4834 return Ok(NotUsed);
4835 }
4836 if guard.push_item(item).is_err() {
4837 guard.disarm();
4838 return Ok(NotUsed);
4839 }
4840 if split {
4841 guard.close_segment();
4842 }
4843 }
4844 }
4845 }
4846 Some(Err(error)) => {
4847 guard.fail_all(error.clone());
4848 guard.disarm();
4849 return Err(error);
4850 }
4851 None => {
4852 guard.complete_all();
4853 guard.disarm();
4854 return Ok(NotUsed);
4855 }
4856 }
4857 }
4858
4859 guard.complete_all();
4860 guard.disarm();
4861 Ok(NotUsed)
4862 });
4863
4864 Box::new(LiveSubstreamStream {
4865 shared: outer,
4866 completion: Some(completion),
4867 local_batch: VecDeque::new(),
4868 })
4869}
4870
4871trait SplitConsumer<T: Send + 'static>: Send + 'static {
4876 fn push_item(&mut self, item: T) -> StreamResult<()>;
4877 fn complete(self: Box<Self>);
4878 fn fail(self: Box<Self>, error: StreamError);
4879}
4880
4881struct FoldConsumer<T, Acc> {
4882 acc: Option<Acc>,
4883 f: Arc<dyn Fn(Acc, T) -> Acc + Send + Sync + 'static>,
4884 tx: futures::channel::oneshot::Sender<StreamResult<Acc>>,
4885}
4886
4887impl<T: Send + 'static, Acc: Send + 'static> SplitConsumer<T> for FoldConsumer<T, Acc> {
4888 fn push_item(&mut self, item: T) -> StreamResult<()> {
4889 let acc = self.acc.take().expect("FoldConsumer: push after done");
4890 self.acc = Some((self.f)(acc, item));
4891 Ok(())
4892 }
4893 fn complete(mut self: Box<Self>) {
4894 let acc = self
4895 .acc
4896 .take()
4897 .expect("FoldConsumer: complete called twice");
4898 let _ = self.tx.send(Ok(acc));
4899 }
4900 fn fail(mut self: Box<Self>, error: StreamError) {
4901 self.acc = None;
4902 let _ = self.tx.send(Err(error));
4903 }
4904}
4905
4906struct FoldResultConsumer<T, Acc> {
4907 acc: Option<Acc>,
4908 f: Arc<dyn Fn(Acc, T) -> StreamResult<Acc> + Send + Sync + 'static>,
4909 tx: futures::channel::oneshot::Sender<StreamResult<Acc>>,
4910}
4911
4912impl<T: Send + 'static, Acc: Send + 'static> SplitConsumer<T> for FoldResultConsumer<T, Acc> {
4913 fn push_item(&mut self, item: T) -> StreamResult<()> {
4914 let acc = self
4915 .acc
4916 .take()
4917 .expect("FoldResultConsumer: push after done");
4918 match (self.f)(acc, item) {
4919 Ok(new_acc) => {
4920 self.acc = Some(new_acc);
4921 Ok(())
4922 }
4923 Err(e) => Err(e),
4924 }
4925 }
4926 fn complete(mut self: Box<Self>) {
4927 let acc = self
4928 .acc
4929 .take()
4930 .expect("FoldResultConsumer: complete called twice");
4931 let _ = self.tx.send(Ok(acc));
4932 }
4933 fn fail(mut self: Box<Self>, error: StreamError) {
4934 self.acc = None;
4935 let _ = self.tx.send(Err(error));
4936 }
4937}
4938
4939struct CollectConsumer<T> {
4940 items: Vec<T>,
4941 tx: futures::channel::oneshot::Sender<StreamResult<Vec<T>>>,
4942}
4943
4944impl<T: Send + 'static> SplitConsumer<T> for CollectConsumer<T> {
4945 fn push_item(&mut self, item: T) -> StreamResult<()> {
4946 self.items.push(item);
4947 Ok(())
4948 }
4949 fn complete(self: Box<Self>) {
4950 let _ = self.tx.send(Ok(self.items));
4951 }
4952 fn fail(self: Box<Self>, error: StreamError) {
4953 let _ = self.tx.send(Err(error));
4954 }
4955}
4956
4957struct IgnoreConsumer<T> {
4958 tx: futures::channel::oneshot::Sender<StreamResult<NotUsed>>,
4959 _phantom: std::marker::PhantomData<fn(T)>,
4960}
4961
4962impl<T: Send + 'static> SplitConsumer<T> for IgnoreConsumer<T> {
4963 fn push_item(&mut self, _item: T) -> StreamResult<()> {
4964 Ok(())
4965 }
4966 fn complete(self: Box<Self>) {
4967 let _ = self.tx.send(Ok(NotUsed));
4968 }
4969 fn fail(self: Box<Self>, error: StreamError) {
4970 let _ = self.tx.send(Err(error));
4971 }
4972}
4973
4974struct TerminalDrainCancelGuard<T: 'static> {
4975 hook: Option<Arc<dyn TerminalSourceHookDyn<T>>>,
4976}
4977
4978impl<T: 'static> TerminalDrainCancelGuard<T> {
4979 fn new(hook: Arc<dyn TerminalSourceHookDyn<T>>) -> Self {
4980 Self { hook: Some(hook) }
4981 }
4982
4983 fn disarm(&mut self) {
4984 self.hook = None;
4985 }
4986}
4987
4988impl<T: 'static> Drop for TerminalDrainCancelGuard<T> {
4989 fn drop(&mut self) {
4990 if let Some(hook) = self.hook.take() {
4991 hook.cancel_terminal();
4992 }
4993 }
4994}
4995
4996fn terminal_drain_status<T: 'static>(
4997 hook: &Arc<dyn TerminalSourceHookDyn<T>>,
4998 materializer: &Materializer,
4999 cancelled: &Arc<AtomicBool>,
5000) -> StreamResult<()> {
5001 if materializer.is_shutdown() {
5002 hook.cancel_terminal();
5003 Err(StreamError::AbruptTermination)
5004 } else if cancelled.load(Ordering::SeqCst) {
5005 hook.cancel_terminal();
5006 Err(StreamError::Cancelled)
5007 } else {
5008 Ok(())
5009 }
5010}
5011
5012pub(super) struct FoldDescriptor<T, Acc> {
5015 pub(super) zero: Acc,
5016 pub(super) f: Arc<dyn Fn(Acc, T) -> Acc + Send + Sync + 'static>,
5017}
5018
5019impl<T: Send + 'static, Acc: Clone + Send + Sync + 'static> FoldFastPathDyn<T>
5020 for FoldDescriptor<T, Acc>
5021{
5022 fn try_register(
5023 &self,
5024 hook: Arc<dyn SplitSegmentHookDyn>,
5025 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5026 let hook_any = hook.as_any_arc();
5027 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
5028 if slot.claimed.swap(true, Ordering::SeqCst) {
5029 return Some(Err(StreamError::Failed(
5030 "substream source cannot be materialized more than once".into(),
5031 )));
5032 }
5033 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Acc>>();
5034 {
5035 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5036
5037 if let Some(terminal) = state.terminal.take() {
5039 let buffer = std::mem::take(&mut state.buffer);
5040 state.consumer = SegmentConsumer::DirectTaken;
5041 drop(state);
5042 let mut acc = self.zero.clone();
5044 for item in buffer {
5045 acc = (self.f)(acc, item);
5046 }
5047 let result = match terminal {
5048 LiveSubstreamTerminal::Complete => Ok(acc),
5049 LiveSubstreamTerminal::Error(e) => Err(e),
5050 };
5051 let _ = tx.send(result);
5052 let completion = StreamCompletion::from_receiver(rx, None);
5053 return Some(Ok(Box::new(completion)));
5054 }
5055
5056 let consumer: Box<dyn SplitConsumer<T>> = Box::new(FoldConsumer {
5058 acc: Some(self.zero.clone()),
5059 f: Arc::clone(&self.f),
5060 tx,
5061 });
5062 state.consumer = SegmentConsumer::Direct(consumer);
5063 }
5064 slot.available.notify_all();
5065 let completion = StreamCompletion::from_receiver(rx, None);
5066 Some(Ok(Box::new(completion)))
5067 }
5068
5069 fn supports_terminal_drain(&self) -> bool {
5070 true
5071 }
5072
5073 fn try_register_terminal_drain(
5074 &self,
5075 hook: Arc<dyn TerminalSourceHookDyn<T>>,
5076 materializer: &Materializer,
5077 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5078 let zero = self.zero.clone();
5079 let f = Arc::clone(&self.f);
5080 let worker_materializer =
5081 materializer.with_name_prefix(materializer.name_prefix().to_owned());
5082 let completion = materializer.spawn_stream(move |cancelled| {
5083 let mut guard = TerminalDrainCancelGuard::new(Arc::clone(&hook));
5084 let mut acc = Some(zero);
5085 let mut batch = Vec::new();
5086 loop {
5087 let status =
5088 hook.drain_terminal_batch(&worker_materializer, &cancelled, &mut batch)?;
5089 for (index, item) in batch.drain(..).enumerate() {
5090 let previous = acc.take().expect("fold accumulator present");
5091 acc = Some(f(previous, item));
5092 if (index + 1) % 64 == 0 {
5093 terminal_drain_status(&hook, &worker_materializer, &cancelled)?;
5094 }
5095 }
5096 if matches!(status, TerminalSourceStatus::Completed) {
5097 guard.disarm();
5098 return Ok(acc.expect("fold accumulator present"));
5099 }
5100 }
5101 });
5102 Some(Ok(Box::new(completion)))
5103 }
5104}
5105
5106pub(super) struct FoldResultDescriptor<T, Acc> {
5107 pub(super) zero: Acc,
5108 pub(super) f: Arc<dyn Fn(Acc, T) -> StreamResult<Acc> + Send + Sync + 'static>,
5109}
5110
5111impl<T: Send + 'static, Acc: Clone + Send + Sync + 'static> FoldFastPathDyn<T>
5112 for FoldResultDescriptor<T, Acc>
5113{
5114 fn try_register(
5115 &self,
5116 hook: Arc<dyn SplitSegmentHookDyn>,
5117 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5118 let hook_any = hook.as_any_arc();
5119 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
5120 if slot.claimed.swap(true, Ordering::SeqCst) {
5121 return Some(Err(StreamError::Failed(
5122 "substream source cannot be materialized more than once".into(),
5123 )));
5124 }
5125 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Acc>>();
5126 {
5127 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5128
5129 if let Some(terminal) = state.terminal.take() {
5131 let buffer = std::mem::take(&mut state.buffer);
5132 state.consumer = SegmentConsumer::DirectTaken;
5133 drop(state);
5134 let acc = self.zero.clone();
5135 let result = match terminal {
5136 LiveSubstreamTerminal::Complete => buffer
5137 .into_iter()
5138 .try_fold(acc, |a, item| (self.f)(a, item)),
5139 LiveSubstreamTerminal::Error(e) => Err(e),
5140 };
5141 let _ = tx.send(result);
5142 let completion = StreamCompletion::from_receiver(rx, None);
5143 return Some(Ok(Box::new(completion)));
5144 }
5145
5146 let consumer: Box<dyn SplitConsumer<T>> = Box::new(FoldResultConsumer {
5148 acc: Some(self.zero.clone()),
5149 f: Arc::clone(&self.f),
5150 tx,
5151 });
5152 state.consumer = SegmentConsumer::Direct(consumer);
5153 }
5154 slot.available.notify_all();
5155 let completion = StreamCompletion::from_receiver(rx, None);
5156 Some(Ok(Box::new(completion)))
5157 }
5158
5159 fn supports_terminal_drain(&self) -> bool {
5160 true
5161 }
5162
5163 fn try_register_terminal_drain(
5164 &self,
5165 hook: Arc<dyn TerminalSourceHookDyn<T>>,
5166 materializer: &Materializer,
5167 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5168 let zero = self.zero.clone();
5169 let f = Arc::clone(&self.f);
5170 let worker_materializer =
5171 materializer.with_name_prefix(materializer.name_prefix().to_owned());
5172 let completion = materializer.spawn_stream(move |cancelled| {
5173 let mut guard = TerminalDrainCancelGuard::new(Arc::clone(&hook));
5174 let mut acc = Some(zero);
5175 let mut batch = Vec::new();
5176 loop {
5177 let status =
5178 hook.drain_terminal_batch(&worker_materializer, &cancelled, &mut batch)?;
5179 for (index, item) in batch.drain(..).enumerate() {
5180 let previous = acc.take().expect("fold accumulator present");
5181 match f(previous, item) {
5182 Ok(next) => {
5183 acc = Some(next);
5184 }
5185 Err(error) => return Err(error),
5186 }
5187 if (index + 1) % 64 == 0 {
5188 terminal_drain_status(&hook, &worker_materializer, &cancelled)?;
5189 }
5190 }
5191 if matches!(status, TerminalSourceStatus::Completed) {
5192 guard.disarm();
5193 return Ok(acc.expect("fold accumulator present"));
5194 }
5195 }
5196 });
5197 Some(Ok(Box::new(completion)))
5198 }
5199}
5200
5201pub(super) struct CollectDescriptor<T> {
5202 pub(super) _phantom: std::marker::PhantomData<fn(T)>,
5203}
5204
5205impl<T: Send + 'static> FoldFastPathDyn<T> for CollectDescriptor<T> {
5206 fn try_register(
5207 &self,
5208 hook: Arc<dyn SplitSegmentHookDyn>,
5209 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5210 let hook_any = hook.as_any_arc();
5211 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
5212 if slot.claimed.swap(true, Ordering::SeqCst) {
5213 return Some(Err(StreamError::Failed(
5214 "substream source cannot be materialized more than once".into(),
5215 )));
5216 }
5217 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<Vec<T>>>();
5218 {
5219 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5220
5221 if let Some(terminal) = state.terminal.take() {
5223 let buffer = std::mem::take(&mut state.buffer);
5224 state.consumer = SegmentConsumer::DirectTaken;
5225 drop(state);
5226 let items: Vec<T> = buffer.into_iter().collect();
5227 let result = match terminal {
5228 LiveSubstreamTerminal::Complete => Ok(items),
5229 LiveSubstreamTerminal::Error(e) => Err(e),
5230 };
5231 let _ = tx.send(result);
5232 let completion = StreamCompletion::from_receiver(rx, None);
5233 return Some(Ok(Box::new(completion)));
5234 }
5235
5236 let consumer: Box<dyn SplitConsumer<T>> = Box::new(CollectConsumer {
5238 items: Vec::new(),
5239 tx,
5240 });
5241 state.consumer = SegmentConsumer::Direct(consumer);
5242 }
5243 slot.available.notify_all();
5244 let completion = StreamCompletion::from_receiver(rx, None);
5245 Some(Ok(Box::new(completion)))
5246 }
5247
5248 fn supports_terminal_drain(&self) -> bool {
5249 true
5250 }
5251
5252 fn try_register_terminal_drain(
5253 &self,
5254 hook: Arc<dyn TerminalSourceHookDyn<T>>,
5255 materializer: &Materializer,
5256 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5257 let worker_materializer =
5258 materializer.with_name_prefix(materializer.name_prefix().to_owned());
5259 let completion = materializer.spawn_stream(move |cancelled| {
5260 let mut guard = TerminalDrainCancelGuard::new(Arc::clone(&hook));
5261 let mut items = Vec::new();
5262 let mut batch = Vec::new();
5263 loop {
5264 let status =
5265 hook.drain_terminal_batch(&worker_materializer, &cancelled, &mut batch)?;
5266 for (index, item) in batch.drain(..).enumerate() {
5267 items.push(item);
5268 if (index + 1) % 64 == 0 {
5269 terminal_drain_status(&hook, &worker_materializer, &cancelled)?;
5270 }
5271 }
5272 if matches!(status, TerminalSourceStatus::Completed) {
5273 guard.disarm();
5274 return Ok(items);
5275 }
5276 }
5277 });
5278 Some(Ok(Box::new(completion)))
5279 }
5280}
5281
5282pub(super) struct IgnoreDescriptor<T> {
5283 pub(super) _phantom: std::marker::PhantomData<fn(T)>,
5284}
5285
5286impl<T: Send + 'static> FoldFastPathDyn<T> for IgnoreDescriptor<T> {
5287 fn try_register(
5288 &self,
5289 hook: Arc<dyn SplitSegmentHookDyn>,
5290 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5291 let hook_any = hook.as_any_arc();
5292 let slot = hook_any.downcast::<SegmentConsumerSlot<T>>().ok()?;
5293 if slot.claimed.swap(true, Ordering::SeqCst) {
5294 return Some(Err(StreamError::Failed(
5295 "substream source cannot be materialized more than once".into(),
5296 )));
5297 }
5298 let (tx, rx) = futures::channel::oneshot::channel::<StreamResult<NotUsed>>();
5299 {
5300 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5301
5302 if let Some(terminal) = state.terminal.take() {
5304 state.consumer = SegmentConsumer::DirectTaken;
5305 drop(state);
5306 let result = match terminal {
5307 LiveSubstreamTerminal::Complete => Ok(NotUsed),
5308 LiveSubstreamTerminal::Error(e) => Err(e),
5309 };
5310 let _ = tx.send(result);
5311 let completion = StreamCompletion::from_receiver(rx, None);
5312 return Some(Ok(Box::new(completion)));
5313 }
5314
5315 let consumer: Box<dyn SplitConsumer<T>> = Box::new(IgnoreConsumer {
5317 tx,
5318 _phantom: std::marker::PhantomData,
5319 });
5320 state.consumer = SegmentConsumer::Direct(consumer);
5321 }
5322 slot.available.notify_all();
5323 let completion = StreamCompletion::from_receiver(rx, None);
5324 Some(Ok(Box::new(completion)))
5325 }
5326
5327 fn supports_terminal_drain(&self) -> bool {
5328 true
5329 }
5330
5331 fn try_register_terminal_drain(
5332 &self,
5333 hook: Arc<dyn TerminalSourceHookDyn<T>>,
5334 materializer: &Materializer,
5335 ) -> Option<StreamResult<Box<dyn Any + Send>>> {
5336 let worker_materializer =
5337 materializer.with_name_prefix(materializer.name_prefix().to_owned());
5338 let completion = materializer.spawn_stream(move |cancelled| {
5339 let mut guard = TerminalDrainCancelGuard::new(Arc::clone(&hook));
5340 let mut batch = Vec::new();
5341 loop {
5342 let status =
5343 hook.drain_terminal_batch(&worker_materializer, &cancelled, &mut batch)?;
5344 batch.clear();
5345 if matches!(status, TerminalSourceStatus::Completed) {
5346 guard.disarm();
5347 return Ok(NotUsed);
5348 }
5349 }
5350 });
5351 Some(Ok(Box::new(completion)))
5352 }
5353}
5354
5355enum SegmentConsumer<T: Send + 'static> {
5358 Pending,
5359 Direct(Box<dyn SplitConsumer<T>>),
5360 DirectTaken,
5361 Fallback,
5362}
5363
5364struct SegmentSlotState<T: Send + 'static> {
5365 buffer: VecDeque<T>,
5366 consumer: SegmentConsumer<T>,
5367 terminal: Option<LiveSubstreamTerminal>,
5368}
5369
5370pub(super) struct SegmentConsumerSlot<T: Send + 'static> {
5371 claimed: Arc<AtomicBool>,
5372 state: Mutex<SegmentSlotState<T>>,
5373 available: Condvar,
5374 parent_cancelled: Arc<AtomicBool>,
5375}
5376
5377impl<T: Clone + Send + 'static> SegmentConsumerSlot<T> {
5378 fn new(parent_cancelled: Arc<AtomicBool>) -> Arc<Self> {
5379 Arc::new(Self {
5380 claimed: Arc::new(AtomicBool::new(false)),
5381 state: Mutex::new(SegmentSlotState {
5382 buffer: VecDeque::new(),
5383 consumer: SegmentConsumer::Pending,
5384 terminal: None,
5385 }),
5386 available: Condvar::new(),
5387 parent_cancelled,
5388 })
5389 }
5390}
5391
5392impl<T: Clone + Send + 'static> SplitSegmentHookDyn for SegmentConsumerSlot<T> {
5393 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
5394 self
5395 }
5396}
5397
5398impl<T: Clone + Send + 'static> SourceFactory<T, NotUsed> for SegmentConsumerSlot<T> {
5399 fn create(
5400 self: Arc<Self>,
5401 _materializer: &Materializer,
5402 ) -> StreamResult<(BoxStream<T>, NotUsed)> {
5403 if self.claimed.swap(true, Ordering::SeqCst) {
5404 return Err(StreamError::Failed(
5405 "substream source cannot be materialized more than once".into(),
5406 ));
5407 }
5408 {
5409 let mut state = self.state.lock().unwrap_or_else(|p| p.into_inner());
5410 state.consumer = SegmentConsumer::Fallback;
5411 }
5412 self.available.notify_all();
5413 let stream = FallbackSegmentStream {
5414 slot: Arc::clone(&self),
5415 };
5416 Ok((Box::new(stream), NotUsed))
5417 }
5418}
5419
5420struct FallbackSegmentStream<T: Send + 'static> {
5421 slot: Arc<SegmentConsumerSlot<T>>,
5422}
5423
5424impl<T: Clone + Send + 'static> Iterator for FallbackSegmentStream<T> {
5425 type Item = StreamResult<T>;
5426
5427 fn next(&mut self) -> Option<Self::Item> {
5428 let mut state = self.slot.state.lock().unwrap_or_else(|p| p.into_inner());
5429 loop {
5430 if let Some(item) = state.buffer.pop_front() {
5431 drop(state);
5432 self.slot.available.notify_all();
5433 return Some(Ok(item));
5434 }
5435 if let Some(terminal) = &state.terminal {
5436 return match terminal {
5437 LiveSubstreamTerminal::Complete => None,
5438 LiveSubstreamTerminal::Error(e) => Some(Err(e.clone())),
5439 };
5440 }
5441 if self.slot.parent_cancelled.load(Ordering::SeqCst) {
5442 return Some(Err(StreamError::AbruptTermination));
5443 }
5444 state = self
5445 .slot
5446 .available
5447 .wait(state)
5448 .unwrap_or_else(|p| p.into_inner());
5449 }
5450 }
5451}
5452
5453impl<T: Send + 'static> Drop for FallbackSegmentStream<T> {
5454 fn drop(&mut self) {
5455 let mut state = self.slot.state.lock().unwrap_or_else(|p| p.into_inner());
5456 if state.terminal.is_none() {
5457 state.terminal = Some(LiveSubstreamTerminal::Error(StreamError::Cancelled));
5458 }
5459 drop(state);
5460 self.slot.available.notify_all();
5461 }
5462}
5463
5464struct SplitFastWorkerGuard<Out: Send + 'static> {
5467 outer: Arc<LiveSubstreamShared<Source<Out>>>,
5468 current_slot: Option<Arc<SegmentConsumerSlot<Out>>>,
5469 current_consumer: Option<Box<dyn SplitConsumer<Out>>>,
5470 armed: bool,
5471 parent_cancelled: Arc<AtomicBool>,
5472 local_pending: VecDeque<Out>,
5473}
5474
5475impl<Out: Clone + Send + 'static> SplitFastWorkerGuard<Out> {
5476 fn new(
5477 outer: Arc<LiveSubstreamShared<Source<Out>>>,
5478 parent_cancelled: Arc<AtomicBool>,
5479 ) -> Self {
5480 Self {
5481 outer,
5482 current_slot: None,
5483 current_consumer: None,
5484 armed: true,
5485 parent_cancelled,
5486 local_pending: VecDeque::with_capacity(LIVE_SUBSTREAM_BATCH),
5487 }
5488 }
5489
5490 fn disarm(&mut self) {
5491 self.armed = false;
5492 }
5493
5494 fn open_segment(&mut self) -> Result<(), ()> {
5495 let slot = SegmentConsumerSlot::new(Arc::clone(&self.parent_cancelled));
5496 let factory: Arc<dyn SourceFactory<Out, NotUsed>> =
5497 Arc::clone(&slot) as Arc<dyn SourceFactory<Out, NotUsed>>;
5498 let hook: Arc<dyn SplitSegmentHookDyn> = Arc::clone(&slot) as Arc<dyn SplitSegmentHookDyn>;
5499 let source = Source {
5500 factory,
5501 terminal_factory: None,
5502 hints: SourceHints::default(),
5503 attributes: Attributes::default(),
5504 split_hook: Some(hook),
5505 };
5506 self.current_slot = Some(slot);
5507 push_live_substream(&self.outer, source).map_err(|_| ())
5508 }
5509
5510 fn push_item(&mut self, item: Out) -> Result<(), ()> {
5511 if let Some(ref mut consumer) = self.current_consumer {
5513 if let Err(e) = consumer.push_item(item) {
5514 let c = self.current_consumer.take().unwrap();
5515 c.fail(e);
5516 return Err(());
5517 }
5518 return Ok(());
5519 }
5520
5521 self.local_pending.push_back(item);
5523 if self.local_pending.len() >= LIVE_SUBSTREAM_BATCH {
5524 self.flush_pending()
5525 } else {
5526 Ok(())
5527 }
5528 }
5529
5530 fn flush_pending(&mut self) -> Result<(), ()> {
5533 if self.local_pending.is_empty() {
5534 return Ok(());
5535 }
5536
5537 if let Some(ref mut consumer) = self.current_consumer {
5539 for item in self.local_pending.drain(..) {
5540 if let Err(e) = consumer.push_item(item) {
5541 let c = self.current_consumer.take().unwrap();
5542 c.fail(e);
5543 return Err(());
5544 }
5545 }
5546 return Ok(());
5547 }
5548
5549 let slot = match &self.current_slot {
5550 Some(s) => Arc::clone(s),
5551 None => {
5552 self.local_pending.clear();
5553 return Ok(());
5554 }
5555 };
5556
5557 loop {
5558 if self.parent_cancelled.load(Ordering::SeqCst) {
5559 self.local_pending.clear();
5560 return Err(());
5561 }
5562
5563 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5564
5565 if matches!(state.consumer, SegmentConsumer::Direct(_)) {
5566 let consumer =
5567 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5568 SegmentConsumer::Direct(c) => c,
5569 _ => unreachable!(),
5570 };
5571 let mut drain_buf = std::mem::take(&mut state.buffer);
5572 drop(state);
5573 let mut consumer_box = consumer;
5574 for item in drain_buf.drain(..) {
5575 if let Err(e) = consumer_box.push_item(item) {
5576 consumer_box.fail(e);
5577 self.local_pending.clear();
5578 return Err(());
5579 }
5580 }
5581 for item in self.local_pending.drain(..) {
5582 if let Err(e) = consumer_box.push_item(item) {
5583 consumer_box.fail(e);
5584 return Err(());
5585 }
5586 }
5587 self.current_consumer = Some(consumer_box);
5588 return Ok(());
5589 }
5590
5591 if matches!(
5592 state.consumer,
5593 SegmentConsumer::Pending | SegmentConsumer::Fallback
5594 ) {
5595 let is_fallback = matches!(state.consumer, SegmentConsumer::Fallback);
5596 let cap = LIVE_SUBSTREAM_CAPACITY.saturating_sub(state.buffer.len());
5597 if cap > 0 {
5598 let to_flush = cap.min(self.local_pending.len());
5599 let items: Vec<Out> = self.local_pending.drain(..to_flush).collect();
5600 state.buffer.extend(items);
5601 let has_more = !self.local_pending.is_empty();
5602 drop(state);
5603 if is_fallback {
5604 slot.available.notify_all();
5605 }
5606 if !has_more {
5607 return Ok(());
5608 }
5609 continue;
5613 } else {
5614 state = slot
5616 .available
5617 .wait(state)
5618 .unwrap_or_else(|p| p.into_inner());
5619 continue;
5620 }
5621 }
5622
5623 return Ok(());
5625 }
5626 }
5627
5628 fn close_segment(&mut self) {
5629 let _ = self.flush_pending();
5631
5632 if let Some(consumer) = self.current_consumer.take() {
5634 consumer.complete();
5635 self.current_slot = None;
5636 return;
5637 }
5638
5639 let slot = match self.current_slot.take() {
5640 Some(s) => s,
5641 None => return,
5642 };
5643
5644 if self.parent_cancelled.load(Ordering::SeqCst) {
5645 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5646 if state.terminal.is_none() {
5647 state.terminal = Some(LiveSubstreamTerminal::Error(StreamError::AbruptTermination));
5648 }
5649 drop(state);
5650 slot.available.notify_all();
5651 return;
5652 }
5653
5654 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5655 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5656 SegmentConsumer::Direct(mut consumer_box) => {
5657 let mut drain_buf = std::mem::take(&mut state.buffer);
5658 drop(state);
5659 for item in drain_buf.drain(..) {
5660 if let Err(e) = consumer_box.push_item(item) {
5661 consumer_box.fail(e);
5662 return;
5663 }
5664 }
5665 consumer_box.complete();
5666 }
5667 SegmentConsumer::DirectTaken => {
5668 }
5670 SegmentConsumer::Fallback => {
5671 state.consumer = SegmentConsumer::DirectTaken;
5672 if state.terminal.is_none() {
5673 state.terminal = Some(LiveSubstreamTerminal::Complete);
5674 }
5675 drop(state);
5676 slot.available.notify_all();
5677 }
5678 SegmentConsumer::Pending => {
5679 state.consumer = SegmentConsumer::Pending;
5681 if state.terminal.is_none() {
5682 state.terminal = Some(LiveSubstreamTerminal::Complete);
5683 }
5684 drop(state);
5685 slot.available.notify_all();
5687 }
5688 }
5689 }
5690
5691 fn fail_segment(&mut self, error: StreamError) {
5692 self.local_pending.clear();
5694
5695 if let Some(consumer) = self.current_consumer.take() {
5696 consumer.fail(error);
5697 self.current_slot = None;
5698 return;
5699 }
5700 let slot = match self.current_slot.take() {
5701 Some(s) => s,
5702 None => return,
5703 };
5704 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5705 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5706 SegmentConsumer::Direct(c) => {
5707 drop(state);
5708 c.fail(error);
5709 }
5710 _ => {
5711 if state.terminal.is_none() {
5712 state.terminal = Some(LiveSubstreamTerminal::Error(error));
5713 }
5714 drop(state);
5715 slot.available.notify_all();
5716 }
5717 }
5718 }
5719
5720 fn fail_all(&mut self, error: StreamError) {
5721 self.fail_segment(error.clone());
5722 fail_live_substream(&self.outer, error);
5723 }
5724
5725 fn complete_all(&mut self) {
5726 self.close_segment();
5727 complete_live_substream(&self.outer);
5728 }
5729}
5730
5731impl<Out: Send + 'static> Drop for SplitFastWorkerGuard<Out> {
5732 fn drop(&mut self) {
5733 if self.armed {
5734 self.local_pending.clear(); if let Some(consumer) = self.current_consumer.take() {
5736 consumer.fail(StreamError::AbruptTermination);
5737 } else if let Some(slot) = self.current_slot.take() {
5738 let mut state = slot.state.lock().unwrap_or_else(|p| p.into_inner());
5739 match std::mem::replace(&mut state.consumer, SegmentConsumer::DirectTaken) {
5740 SegmentConsumer::Direct(c) => {
5741 drop(state);
5742 c.fail(StreamError::AbruptTermination);
5743 }
5744 _ => {
5745 if state.terminal.is_none() {
5746 state.terminal =
5747 Some(LiveSubstreamTerminal::Error(StreamError::AbruptTermination));
5748 }
5749 drop(state);
5750 slot.available.notify_all();
5751 }
5752 }
5753 }
5754 fail_live_substream(&self.outer, StreamError::AbruptTermination);
5755 }
5756 }
5757}
5758
5759fn split_streams_fast<Out, F>(
5760 mut input: BoxStream<Out>,
5761 mode: SplitMode,
5762 predicate: Arc<F>,
5763 parent_cancelled: Arc<AtomicBool>,
5764 materializer: &Materializer,
5765) -> BoxStream<Source<Out>>
5766where
5767 Out: Clone + Send + 'static,
5768 F: Fn(&Out) -> bool + Send + Sync + 'static,
5769{
5770 let outer = LiveSubstreamShared::new();
5771 let worker_outer = Arc::clone(&outer);
5772 let completion = materializer.spawn_stream(move |cancelled| {
5773 let mut guard =
5774 SplitFastWorkerGuard::new(Arc::clone(&worker_outer), Arc::clone(&parent_cancelled));
5775
5776 while !cancelled.load(Ordering::SeqCst) {
5777 if worker_outer.cancelled.load(Ordering::SeqCst) {
5778 guard.disarm();
5779 return Ok(NotUsed);
5780 }
5781
5782 match input.next() {
5783 Some(Ok(item)) => {
5784 let split = match catch_unwind(AssertUnwindSafe(|| predicate(&item))) {
5785 Ok(s) => s,
5786 Err(_) => {
5787 guard.fail_all(StreamError::AbruptTermination);
5788 guard.disarm();
5789 return Ok(NotUsed);
5790 }
5791 };
5792
5793 match mode {
5794 SplitMode::When => {
5795 if split
5796 && (guard.current_slot.is_some()
5797 || guard.current_consumer.is_some())
5798 {
5799 guard.close_segment();
5800 }
5801 if guard.current_slot.is_none()
5802 && guard.current_consumer.is_none()
5803 && guard.open_segment().is_err()
5804 {
5805 guard.disarm();
5806 return Ok(NotUsed);
5807 }
5808 if guard.push_item(item).is_err() {
5809 guard.disarm();
5810 return Ok(NotUsed);
5811 }
5812 }
5813 SplitMode::After => {
5814 if guard.current_slot.is_none()
5815 && guard.current_consumer.is_none()
5816 && guard.open_segment().is_err()
5817 {
5818 guard.disarm();
5819 return Ok(NotUsed);
5820 }
5821 if guard.push_item(item).is_err() {
5822 guard.disarm();
5823 return Ok(NotUsed);
5824 }
5825 if split {
5826 guard.close_segment();
5827 }
5828 }
5829 }
5830 }
5831 Some(Err(error)) => {
5832 guard.fail_all(error.clone());
5833 guard.disarm();
5834 return Err(error);
5835 }
5836 None => {
5837 guard.complete_all();
5838 guard.disarm();
5839 return Ok(NotUsed);
5840 }
5841 }
5842 }
5843 guard.complete_all();
5844 guard.disarm();
5845 Ok(NotUsed)
5846 });
5847
5848 Box::new(LiveSubstreamStream {
5849 shared: outer,
5850 completion: Some(completion),
5851 local_batch: VecDeque::new(),
5852 })
5853}
5854
5855#[cfg(test)]
5857struct FlatMapMergeShared<T> {
5858 state: Mutex<FlatMapMergeState<T>>,
5859 available: Condvar,
5860 cancelled: Arc<AtomicBool>,
5861}
5862
5863#[cfg(test)]
5864struct FlatMapMergeState<T> {
5865 queued: VecDeque<(usize, T)>,
5866 window: HashMap<usize, usize>,
5867 active_streams: usize,
5868 input_done: bool,
5869 terminal: Option<StreamError>,
5870}
5871
5872#[cfg(test)]
5873impl<T> FlatMapMergeShared<T> {
5874 fn new() -> Arc<Self> {
5875 Arc::new(Self {
5876 state: Mutex::new(FlatMapMergeState {
5877 queued: VecDeque::new(),
5878 window: HashMap::new(),
5879 active_streams: 0,
5880 input_done: false,
5881 terminal: None,
5882 }),
5883 available: Condvar::new(),
5884 cancelled: Arc::new(AtomicBool::new(false)),
5885 })
5886 }
5887}
5888
5889#[cfg(test)]
5890struct FlatMapMergeStream<T> {
5891 shared: Arc<FlatMapMergeShared<T>>,
5892 completion: Option<StreamCompletion<NotUsed>>,
5893}
5894
5895#[cfg(test)]
5896impl<T> Iterator for FlatMapMergeStream<T> {
5897 type Item = StreamResult<T>;
5898
5899 fn next(&mut self) -> Option<Self::Item> {
5900 let mut state = self
5901 .shared
5902 .state
5903 .lock()
5904 .unwrap_or_else(|poison| poison.into_inner());
5905 loop {
5906 if let Some((stream_id, item)) = state.queued.pop_front() {
5907 if let Some(count) = state.window.get_mut(&stream_id) {
5910 *count -= 1;
5911 if *count == 0 {
5912 state.window.remove(&stream_id);
5913 }
5914 }
5915 drop(state);
5916 self.shared.available.notify_all();
5917 return Some(Ok(item));
5918 }
5919 if let Some(error) = state.terminal.clone() {
5920 return Some(Err(error));
5921 }
5922 if state.input_done && state.active_streams == 0 {
5923 return None;
5924 }
5925 state = self
5926 .shared
5927 .available
5928 .wait(state)
5929 .unwrap_or_else(|poison| poison.into_inner());
5930 }
5931 }
5932}
5933
5934#[cfg(test)]
5935impl<T> Drop for FlatMapMergeStream<T> {
5936 fn drop(&mut self) {
5937 self.shared.cancelled.store(true, Ordering::SeqCst);
5938 self.shared.available.notify_all();
5939 let _ = self.completion.take();
5940 }
5941}
5942
5943#[cfg(test)]
5944fn flat_map_merge_register_stream<T>(shared: &Arc<FlatMapMergeShared<T>>) {
5945 let mut state = shared
5946 .state
5947 .lock()
5948 .unwrap_or_else(|poison| poison.into_inner());
5949 state.active_streams += 1;
5950 drop(state);
5951 shared.available.notify_all();
5952}
5953
5954#[cfg(test)]
5955fn flat_map_merge_finish_stream<T>(
5956 shared: &Arc<FlatMapMergeShared<T>>,
5957 stream_id: usize,
5958 terminal: Result<(), StreamError>,
5959) {
5960 let mut state = shared
5961 .state
5962 .lock()
5963 .unwrap_or_else(|poison| poison.into_inner());
5964 state.active_streams = state.active_streams.saturating_sub(1);
5965 if let Err(error) = terminal
5966 && state.terminal.is_none()
5967 {
5968 state.terminal = Some(error);
5969 }
5970 state.window.entry(stream_id).or_default();
5972 drop(state);
5973 shared.available.notify_all();
5974}
5975
5976#[cfg(test)]
5977fn flat_map_merge_push<T>(
5978 shared: &Arc<FlatMapMergeShared<T>>,
5979 stream_id: usize,
5980 item: T,
5981) -> Result<(), T> {
5982 let mut state = shared
5985 .state
5986 .lock()
5987 .unwrap_or_else(|poison| poison.into_inner());
5988 while state.window.get(&stream_id).copied().unwrap_or(0) >= FLAT_MAP_MERGE_SUBSTREAM_WINDOW
5989 && state.terminal.is_none()
5990 {
5991 if shared.cancelled.load(Ordering::SeqCst) {
5992 return Err(item);
5993 }
5994 state = shared
5995 .available
5996 .wait(state)
5997 .unwrap_or_else(|poison| poison.into_inner());
5998 }
5999 if shared.cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
6000 return Err(item);
6001 }
6002 *state.window.entry(stream_id).or_insert(0) += 1;
6003 let was_empty = state.queued.is_empty();
6004 state.queued.push_back((stream_id, item));
6005 drop(state);
6006 if was_empty {
6007 shared.available.notify_all();
6008 }
6009 Ok(())
6010}
6011
6012#[cfg(test)]
6013struct FlatMapMergeCoordinatorGuard<T> {
6014 shared: Arc<FlatMapMergeShared<T>>,
6015 armed: bool,
6016}
6017
6018#[cfg(test)]
6019impl<T> FlatMapMergeCoordinatorGuard<T> {
6020 fn new(shared: Arc<FlatMapMergeShared<T>>) -> Self {
6021 Self {
6022 shared,
6023 armed: true,
6024 }
6025 }
6026
6027 fn finish(&mut self) {
6028 let mut state = self
6029 .shared
6030 .state
6031 .lock()
6032 .unwrap_or_else(|poison| poison.into_inner());
6033 state.input_done = true;
6034 drop(state);
6035 self.shared.available.notify_all();
6036 self.armed = false;
6037 }
6038}
6039
6040#[cfg(test)]
6041impl<T> Drop for FlatMapMergeCoordinatorGuard<T> {
6042 fn drop(&mut self) {
6043 if self.armed {
6044 let mut state = self
6045 .shared
6046 .state
6047 .lock()
6048 .unwrap_or_else(|poison| poison.into_inner());
6049 if state.terminal.is_none() {
6050 state.terminal = Some(StreamError::AbruptTermination);
6051 }
6052 state.input_done = true;
6053 drop(state);
6054 self.shared.cancelled.store(true, Ordering::SeqCst);
6055 self.shared.available.notify_all();
6056 }
6057 }
6058}
6059
6060#[cfg(test)]
6061struct FlatMapMergeWorkerGuard<T> {
6062 shared: Arc<FlatMapMergeShared<T>>,
6063 stream_id: usize,
6064 armed: bool,
6065}
6066
6067#[cfg(test)]
6068impl<T> FlatMapMergeWorkerGuard<T> {
6069 fn new(shared: Arc<FlatMapMergeShared<T>>, stream_id: usize) -> Self {
6070 Self {
6071 shared,
6072 stream_id,
6073 armed: true,
6074 }
6075 }
6076
6077 fn finish(&mut self, terminal: Result<(), StreamError>) {
6078 flat_map_merge_finish_stream(&self.shared, self.stream_id, terminal);
6079 self.armed = false;
6080 }
6081}
6082
6083#[cfg(test)]
6084impl<T> Drop for FlatMapMergeWorkerGuard<T> {
6085 fn drop(&mut self) {
6086 if self.armed {
6087 flat_map_merge_finish_stream(
6088 &self.shared,
6089 self.stream_id,
6090 Err(StreamError::AbruptTermination),
6091 );
6092 self.shared.cancelled.store(true, Ordering::SeqCst);
6093 }
6094 }
6095}
6096
6097#[cfg(test)]
6100#[derive(Clone, Copy, PartialEq, Eq, Debug)]
6101pub(crate) enum SubstreamExecutorMode {
6102 Auto,
6103 LegacyOnly,
6104 ReadyRingOnly,
6105 SplitSinkOnly,
6106}
6107
6108#[cfg(test)]
6109thread_local! {
6110 static SUBSTREAM_EXECUTOR_MODE: std::cell::Cell<SubstreamExecutorMode> =
6111 const { std::cell::Cell::new(SubstreamExecutorMode::Auto) };
6112}
6113
6114#[cfg(test)]
6115pub(crate) fn with_substream_mode<R>(mode: SubstreamExecutorMode, f: impl FnOnce() -> R) -> R {
6116 SUBSTREAM_EXECUTOR_MODE.with(|m| {
6117 let old = m.get();
6118 m.set(mode);
6119 let result = f();
6120 m.set(old);
6121 result
6122 })
6123}
6124
6125#[cfg(test)]
6126fn current_substream_mode() -> SubstreamExecutorMode {
6127 SUBSTREAM_EXECUTOR_MODE.with(|m| m.get())
6128}
6129
6130fn flat_map_merge_stream<Out, Next, NextMat, F>(
6133 input: BoxStream<Out>,
6134 breadth: usize,
6135 stage: Arc<F>,
6136 materializer: &Materializer,
6137) -> BoxStream<Next>
6138where
6139 Out: Send + 'static,
6140 Next: Send + 'static,
6141 NextMat: Send + 'static,
6142 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
6143{
6144 #[cfg(test)]
6145 if current_substream_mode() == SubstreamExecutorMode::LegacyOnly {
6146 return flat_map_merge_stream_legacy(input, breadth, stage, materializer);
6147 }
6148 flat_map_merge_stream_ready(input, breadth, stage, materializer)
6149}
6150
6151#[cfg(test)]
6154fn flat_map_merge_stream_legacy<Out, Next, NextMat, F>(
6155 mut input: BoxStream<Out>,
6156 breadth: usize,
6157 stage: Arc<F>,
6158 materializer: &Materializer,
6159) -> BoxStream<Next>
6160where
6161 Out: Send + 'static,
6162 Next: Send + 'static,
6163 NextMat: Send + 'static,
6164 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
6165{
6166 let worker_materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
6167 let shared = FlatMapMergeShared::new();
6168 let worker_shared = Arc::clone(&shared);
6169 let completion = materializer.spawn_stream(move |cancelled| {
6170 let mut next_id = 0usize;
6171 let mut guard = FlatMapMergeCoordinatorGuard::new(worker_shared);
6172 let mut workers = HashMap::<usize, StreamCompletion<NotUsed>>::with_capacity(breadth);
6173
6174 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
6175 workers.retain(|_, completion| completion.try_wait().is_none());
6176 {
6177 let mut state = guard
6178 .shared
6179 .state
6180 .lock()
6181 .unwrap_or_else(|poison| poison.into_inner());
6182 while state.active_streams >= breadth
6183 && state.terminal.is_none()
6184 && !cancelled.load(Ordering::SeqCst)
6185 && !guard.shared.cancelled.load(Ordering::SeqCst)
6186 {
6187 state = guard
6188 .shared
6189 .available
6190 .wait(state)
6191 .unwrap_or_else(|poison| poison.into_inner());
6192 }
6193 if state.terminal.is_some() {
6194 let error = state.terminal.clone().expect("terminal checked above");
6195 drop(state);
6196 guard.finish();
6197 return Err(error);
6198 }
6199 }
6200
6201 match input.next() {
6202 Some(Ok(item)) => {
6203 let source = stage(item);
6204 let (mut stream, _) =
6205 match Arc::clone(&source.factory).create(&worker_materializer) {
6206 Ok(parts) => parts,
6207 Err(error) => {
6208 let mut state = guard
6209 .shared
6210 .state
6211 .lock()
6212 .unwrap_or_else(|poison| poison.into_inner());
6213 if state.terminal.is_none() {
6214 state.terminal = Some(error.clone());
6215 }
6216 drop(state);
6217 guard.shared.available.notify_all();
6218 guard.finish();
6219 return Err(error);
6220 }
6221 };
6222 let stream_id = next_id;
6223 next_id += 1;
6224 flat_map_merge_register_stream(&guard.shared);
6225 let worker_shared = Arc::clone(&guard.shared);
6226 workers.insert(
6227 stream_id,
6228 worker_materializer.spawn_stream(move |inner_cancelled| {
6229 let mut worker_guard =
6230 FlatMapMergeWorkerGuard::new(Arc::clone(&worker_shared), stream_id);
6231 while !inner_cancelled.load(Ordering::SeqCst)
6232 && !worker_shared.cancelled.load(Ordering::SeqCst)
6233 {
6234 match stream.next() {
6235 Some(Ok(item)) => {
6236 if flat_map_merge_push(&worker_shared, stream_id, item)
6237 .is_err()
6238 {
6239 worker_guard.finish(Ok(()));
6240 return Ok(NotUsed);
6241 }
6242 }
6243 Some(Err(error)) => {
6244 worker_guard.finish(Err(error.clone()));
6245 return Err(error);
6246 }
6247 None => {
6248 worker_guard.finish(Ok(()));
6249 return Ok(NotUsed);
6250 }
6251 }
6252 }
6253 worker_guard.finish(Ok(()));
6254 Ok(NotUsed)
6255 }),
6256 );
6257 }
6258 Some(Err(error)) => {
6259 let mut state = guard
6260 .shared
6261 .state
6262 .lock()
6263 .unwrap_or_else(|poison| poison.into_inner());
6264 if state.terminal.is_none() {
6265 state.terminal = Some(error.clone());
6266 }
6267 drop(state);
6268 guard.shared.available.notify_all();
6269 guard.finish();
6270 return Err(error);
6271 }
6272 None => {
6273 guard.finish();
6274 break;
6275 }
6276 }
6277 }
6278
6279 if guard.armed {
6280 guard.finish();
6281 }
6282 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
6290 workers.retain(|_, completion| completion.try_wait().is_none());
6291 let state = guard
6297 .shared
6298 .state
6299 .lock()
6300 .unwrap_or_else(|poison| poison.into_inner());
6301 if state.active_streams == 0 {
6302 return Ok(NotUsed);
6303 }
6304 drop(
6305 guard
6306 .shared
6307 .available
6308 .wait(state)
6309 .unwrap_or_else(|poison| poison.into_inner()),
6310 );
6311 }
6312 Ok(NotUsed)
6313 });
6314
6315 Box::new(FlatMapMergeStream {
6316 shared,
6317 completion: Some(completion),
6318 })
6319}
6320
6321const FLAT_MAP_MERGE_READY_BATCH: usize = 16;
6324
6325const FLAT_MAP_MERGE_INLINE_MICRO_MAX: usize = FLAT_MAP_MERGE_READY_BATCH;
6329
6330struct FlatMapMergeReadyShared<T> {
6331 coordinator: Mutex<FlatMapMergeReadyState<T>>,
6332 available: Condvar,
6333 cancelled: Arc<AtomicBool>,
6334}
6335
6336struct FlatMapMergeReadyState<T> {
6337 lanes: HashMap<usize, Arc<FlatMapMergeLane<T>>>,
6338 ready: std::collections::VecDeque<usize>,
6339 queued_items: usize,
6340 active_streams: usize,
6341 input_done: bool,
6342 terminal: Option<StreamError>,
6343 generation: u64,
6344}
6345
6346struct FlatMapMergeLane<T> {
6347 state: Mutex<FlatMapMergeLaneState<T>>,
6348 space_available: Condvar,
6349}
6350
6351struct FlatMapMergeLaneState<T> {
6352 buffer: std::collections::VecDeque<T>,
6353 in_ready_ring: bool,
6354 publishing: bool,
6355 closed: bool,
6356}
6357
6358struct FlatMapMergeReadyStream<T> {
6359 shared: Arc<FlatMapMergeReadyShared<T>>,
6360 completion: Option<StreamCompletion<NotUsed>>,
6361 local_batch: std::collections::VecDeque<T>,
6362}
6363
6364impl<T: Send + 'static> Iterator for FlatMapMergeReadyStream<T> {
6365 type Item = StreamResult<T>;
6366
6367 fn next(&mut self) -> Option<Self::Item> {
6368 if let Some(item) = self.local_batch.pop_front() {
6370 return Some(Ok(item));
6371 }
6372
6373 let mut coord = self
6375 .shared
6376 .coordinator
6377 .lock()
6378 .unwrap_or_else(|p| p.into_inner());
6379 loop {
6380 if let Some(error) = coord.terminal.clone() {
6382 return Some(Err(error));
6383 }
6384
6385 if let Some(lane_id) = coord.ready.pop_front() {
6387 let lane = coord.lanes.get(&lane_id).cloned();
6388 drop(coord);
6389
6390 if let Some(lane) = lane {
6391 let mut lane_state = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6393 while lane_state.publishing {
6394 lane_state = lane
6395 .space_available
6396 .wait(lane_state)
6397 .unwrap_or_else(|p| p.into_inner());
6398 }
6399 let drain_n = lane_state.buffer.len().min(FLAT_MAP_MERGE_READY_BATCH);
6400 let mut batch = std::collections::VecDeque::with_capacity(drain_n);
6401 for _ in 0..drain_n {
6402 if let Some(item) = lane_state.buffer.pop_front() {
6403 batch.push_back(item);
6404 }
6405 }
6406 let still_has_items = !lane_state.buffer.is_empty();
6407 let is_closed = lane_state.closed;
6408 if !still_has_items {
6409 lane_state.in_ready_ring = false;
6410 }
6411 drop(lane_state);
6413 if !batch.is_empty() {
6415 lane.space_available.notify_all();
6416 }
6417
6418 let freed = batch.len();
6420 let mut coord = self
6421 .shared
6422 .coordinator
6423 .lock()
6424 .unwrap_or_else(|p| p.into_inner());
6425 coord.queued_items = coord.queued_items.saturating_sub(freed);
6426 if still_has_items {
6427 coord.ready.push_back(lane_id);
6428 } else if is_closed {
6429 coord.lanes.remove(&lane_id);
6430 }
6431 coord.generation += 1;
6432 drop(coord);
6433 self.shared.available.notify_all();
6434
6435 let mut iter = batch.into_iter();
6437 if let Some(first) = iter.next() {
6438 self.local_batch.extend(iter);
6439 return Some(Ok(first));
6440 }
6441 }
6442 coord = self
6444 .shared
6445 .coordinator
6446 .lock()
6447 .unwrap_or_else(|p| p.into_inner());
6448 continue;
6449 }
6450
6451 if coord.terminal.is_none()
6453 && coord.input_done
6454 && coord.active_streams == 0
6455 && coord.queued_items == 0
6456 {
6457 return None;
6458 }
6459
6460 let seen = coord.generation;
6462 coord = self
6463 .shared
6464 .available
6465 .wait_while(coord, |s| {
6466 s.generation == seen
6467 && s.terminal.is_none()
6468 && s.ready.is_empty()
6469 && !(s.input_done && s.active_streams == 0 && s.queued_items == 0)
6470 })
6471 .unwrap_or_else(|p| p.into_inner());
6472 }
6473 }
6474}
6475
6476impl<T> Drop for FlatMapMergeReadyStream<T> {
6477 fn drop(&mut self) {
6478 let lanes: Vec<Arc<FlatMapMergeLane<T>>> = {
6479 let mut coord = self
6480 .shared
6481 .coordinator
6482 .lock()
6483 .unwrap_or_else(|p| p.into_inner());
6484 coord.generation += 1;
6485 coord.lanes.values().cloned().collect()
6486 };
6487 self.shared.cancelled.store(true, Ordering::SeqCst);
6488 self.shared.available.notify_all();
6489 for lane in lanes {
6490 lane.space_available.notify_all();
6491 }
6492 let _ = self.completion.take();
6493 }
6494}
6495
6496fn finish_lane_ready<T>(
6499 shared: &Arc<FlatMapMergeReadyShared<T>>,
6500 stream_id: usize,
6501 terminal: Result<(), StreamError>,
6502) {
6503 let is_error = terminal.is_err();
6504
6505 let lane_is_empty = {
6507 let coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
6508 let lane_opt = coord.lanes.get(&stream_id).cloned();
6509 drop(coord);
6510 if let Some(lane) = lane_opt {
6511 let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6512 ls.closed = true;
6513 ls.buffer.is_empty()
6514 } else {
6515 true
6516 }
6517 };
6518
6519 let lanes_to_notify: Vec<Arc<FlatMapMergeLane<T>>> = {
6521 let mut coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
6522 coord.active_streams = coord.active_streams.saturating_sub(1);
6523 if let Err(ref error) = terminal
6524 && coord.terminal.is_none()
6525 {
6526 coord.terminal = Some(error.clone());
6527 }
6528 if lane_is_empty {
6529 coord.lanes.remove(&stream_id);
6530 }
6531 coord.generation += 1;
6532 if is_error {
6533 coord.lanes.values().cloned().collect()
6534 } else {
6535 vec![]
6536 }
6537 };
6538
6539 if is_error {
6540 shared.cancelled.store(true, Ordering::SeqCst);
6541 for lane in &lanes_to_notify {
6542 lane.space_available.notify_all();
6543 }
6544 }
6545 shared.available.notify_all();
6546}
6547
6548struct FlatMapMergeReadyCoordinatorGuard<T> {
6549 shared: Arc<FlatMapMergeReadyShared<T>>,
6550 armed: bool,
6551}
6552
6553impl<T> FlatMapMergeReadyCoordinatorGuard<T> {
6554 fn new(shared: Arc<FlatMapMergeReadyShared<T>>) -> Self {
6555 Self {
6556 shared,
6557 armed: true,
6558 }
6559 }
6560
6561 fn finish(&mut self) {
6562 let mut coord = self
6563 .shared
6564 .coordinator
6565 .lock()
6566 .unwrap_or_else(|p| p.into_inner());
6567 coord.input_done = true;
6568 coord.generation += 1;
6569 drop(coord);
6570 self.shared.available.notify_all();
6571 self.armed = false;
6572 }
6573}
6574
6575impl<T> Drop for FlatMapMergeReadyCoordinatorGuard<T> {
6576 fn drop(&mut self) {
6577 if self.armed {
6578 let lanes: Vec<Arc<FlatMapMergeLane<T>>> = {
6579 let mut coord = self
6580 .shared
6581 .coordinator
6582 .lock()
6583 .unwrap_or_else(|p| p.into_inner());
6584 if coord.terminal.is_none() {
6585 coord.terminal = Some(StreamError::AbruptTermination);
6586 }
6587 coord.input_done = true;
6588 coord.generation += 1;
6589 coord.lanes.values().cloned().collect()
6590 };
6591 self.shared.cancelled.store(true, Ordering::SeqCst);
6592 self.shared.available.notify_all();
6593 for lane in lanes {
6594 lane.space_available.notify_all();
6595 }
6596 }
6597 }
6598}
6599
6600struct FlatMapMergeReadyWorkerGuard<T> {
6601 shared: Arc<FlatMapMergeReadyShared<T>>,
6602 stream_id: usize,
6603 armed: bool,
6604}
6605
6606impl<T> FlatMapMergeReadyWorkerGuard<T> {
6607 fn new(shared: Arc<FlatMapMergeReadyShared<T>>, stream_id: usize) -> Self {
6608 Self {
6609 shared,
6610 stream_id,
6611 armed: true,
6612 }
6613 }
6614
6615 fn finish(&mut self, terminal: Result<(), StreamError>) {
6616 finish_lane_ready(&self.shared, self.stream_id, terminal);
6617 self.armed = false;
6618 }
6619}
6620
6621impl<T> Drop for FlatMapMergeReadyWorkerGuard<T> {
6622 fn drop(&mut self) {
6623 if self.armed {
6624 finish_lane_ready(
6625 &self.shared,
6626 self.stream_id,
6627 Err(StreamError::AbruptTermination),
6628 );
6629 self.shared.cancelled.store(true, Ordering::SeqCst);
6630 }
6631 }
6632}
6633
6634fn publish_ready_batch<T>(
6649 shared: &Arc<FlatMapMergeReadyShared<T>>,
6650 stream_id: usize,
6651 lane: &Arc<FlatMapMergeLane<T>>,
6652 batch: std::collections::VecDeque<T>,
6653) {
6654 let batch_len = batch.len();
6655 if batch_len == 0 {
6656 return;
6657 }
6658
6659 let was_not_in_ready = {
6661 let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6662 let was_not = !ls.in_ready_ring;
6663 ls.publishing = true;
6664 ls.buffer.extend(batch);
6665 if !ls.buffer.is_empty() {
6666 ls.in_ready_ring = true;
6667 }
6668 was_not
6669 };
6670
6671 {
6673 let mut coord = shared.coordinator.lock().unwrap_or_else(|p| p.into_inner());
6674 coord.queued_items += batch_len;
6675 if was_not_in_ready {
6676 coord.ready.push_back(stream_id);
6677 }
6678 coord.generation += 1;
6679 }
6680 {
6681 let mut ls = lane.state.lock().unwrap_or_else(|p| p.into_inner());
6682 ls.publishing = false;
6683 }
6684 lane.space_available.notify_all();
6685 shared.available.notify_all();
6686}
6687
6688struct FlatMapMergeReadyInlineGuard<T> {
6698 shared: Arc<FlatMapMergeReadyShared<T>>,
6699 stream_id: usize,
6700 armed: bool,
6701}
6702
6703impl<T> FlatMapMergeReadyInlineGuard<T> {
6704 fn new(shared: Arc<FlatMapMergeReadyShared<T>>, stream_id: usize) -> Self {
6705 Self {
6706 shared,
6707 stream_id,
6708 armed: true,
6709 }
6710 }
6711
6712 fn finish(&mut self, terminal: Result<(), StreamError>) {
6713 finish_lane_ready(&self.shared, self.stream_id, terminal);
6714 self.armed = false;
6715 }
6716
6717 fn hand_off(&mut self) {
6718 self.armed = false;
6719 }
6720}
6721
6722impl<T> Drop for FlatMapMergeReadyInlineGuard<T> {
6723 fn drop(&mut self) {
6724 if self.armed {
6725 finish_lane_ready(
6726 &self.shared,
6727 self.stream_id,
6728 Err(StreamError::AbruptTermination),
6729 );
6730 }
6731 }
6732}
6733
6734fn flat_map_merge_stream_ready<Out, Next, NextMat, F>(
6735 mut input: BoxStream<Out>,
6736 breadth: usize,
6737 stage: Arc<F>,
6738 materializer: &Materializer,
6739) -> BoxStream<Next>
6740where
6741 Out: Send + 'static,
6742 Next: Send + 'static,
6743 NextMat: Send + 'static,
6744 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
6745{
6746 let worker_materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
6747 let shared: Arc<FlatMapMergeReadyShared<Next>> = Arc::new(FlatMapMergeReadyShared {
6748 coordinator: Mutex::new(FlatMapMergeReadyState {
6749 lanes: HashMap::new(),
6750 ready: std::collections::VecDeque::new(),
6751 queued_items: 0,
6752 active_streams: 0,
6753 input_done: false,
6754 terminal: None,
6755 generation: 0,
6756 }),
6757 available: Condvar::new(),
6758 cancelled: Arc::new(AtomicBool::new(false)),
6759 });
6760
6761 let worker_shared = Arc::clone(&shared);
6762 let completion = materializer.spawn_stream(move |cancelled| {
6763 let mut next_id = 0usize;
6764 let mut guard = FlatMapMergeReadyCoordinatorGuard::new(worker_shared);
6765 let mut workers = HashMap::<usize, StreamCompletion<NotUsed>>::with_capacity(breadth);
6766
6767 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
6768 workers.retain(|_, c| c.try_wait().is_none());
6769
6770 {
6772 let mut coord = guard
6773 .shared
6774 .coordinator
6775 .lock()
6776 .unwrap_or_else(|p| p.into_inner());
6777 loop {
6778 if coord.terminal.is_some()
6779 || coord.active_streams < breadth
6780 || cancelled.load(Ordering::SeqCst)
6781 || guard.shared.cancelled.load(Ordering::SeqCst)
6782 {
6783 break;
6784 }
6785 let seen = coord.generation;
6786 coord = guard
6787 .shared
6788 .available
6789 .wait_while(coord, |s| {
6790 s.generation == seen
6791 && s.terminal.is_none()
6792 && s.active_streams >= breadth
6793 && !cancelled.load(Ordering::SeqCst)
6794 && !guard.shared.cancelled.load(Ordering::SeqCst)
6795 })
6796 .unwrap_or_else(|p| p.into_inner());
6797 }
6798 if coord.terminal.is_some() {
6799 let error = coord.terminal.clone().expect("terminal checked");
6800 drop(coord);
6801 guard.finish();
6802 return Err(error);
6803 }
6804 }
6805
6806 match input.next() {
6807 Some(Ok(item)) => {
6808 let source = stage(item);
6810 let inline_hint = source.hints.inline_micro;
6812 let (stream, _) = match Arc::clone(&source.factory).create(&worker_materializer)
6813 {
6814 Ok(parts) => parts,
6815 Err(error) => {
6816 let lanes: Vec<Arc<FlatMapMergeLane<Next>>> = {
6817 let mut coord = guard
6818 .shared
6819 .coordinator
6820 .lock()
6821 .unwrap_or_else(|p| p.into_inner());
6822 if coord.terminal.is_none() {
6823 coord.terminal = Some(error.clone());
6824 }
6825 coord.generation += 1;
6826 coord.lanes.values().cloned().collect()
6827 };
6828 guard.shared.cancelled.store(true, Ordering::SeqCst);
6829 guard.shared.available.notify_all();
6830 for lane in lanes {
6831 lane.space_available.notify_all();
6832 }
6833 guard.finish();
6834 return Err(error);
6835 }
6836 };
6837
6838 let stream_id = next_id;
6839 next_id += 1;
6840 let mut stream = stream;
6841
6842 let lane: Arc<FlatMapMergeLane<Next>> = Arc::new(FlatMapMergeLane {
6844 state: Mutex::new(FlatMapMergeLaneState {
6845 buffer: std::collections::VecDeque::new(),
6846 in_ready_ring: false,
6847 publishing: false,
6848 closed: false,
6849 }),
6850 space_available: Condvar::new(),
6851 });
6852 {
6853 let mut coord = guard
6854 .shared
6855 .coordinator
6856 .lock()
6857 .unwrap_or_else(|p| p.into_inner());
6858 coord.lanes.insert(stream_id, Arc::clone(&lane));
6859 coord.active_streams += 1;
6860 }
6862
6863 let mut inline_guard =
6866 FlatMapMergeReadyInlineGuard::new(Arc::clone(&guard.shared), stream_id);
6867
6868 let is_inline = inline_hint
6869 .is_some_and(|h| h.max_success_items <= FLAT_MAP_MERGE_INLINE_MICRO_MAX);
6870
6871 if is_inline {
6872 let max_items = inline_hint.expect("checked").max_success_items;
6876
6877 if guard.shared.cancelled.load(Ordering::SeqCst)
6879 || cancelled.load(Ordering::SeqCst)
6880 {
6881 inline_guard.finish(Ok(()));
6882 continue;
6883 }
6884
6885 let max_pulls = max_items.saturating_add(1);
6888 let mut local_batch = std::collections::VecDeque::with_capacity(max_items);
6889 let mut terminal_result: Option<Result<(), StreamError>> = None;
6890
6891 for _ in 0..max_pulls {
6892 if guard.shared.cancelled.load(Ordering::SeqCst)
6893 || cancelled.load(Ordering::SeqCst)
6894 {
6895 break;
6896 }
6897 match stream.next() {
6898 Some(Ok(item)) => local_batch.push_back(item),
6899 Some(Err(e)) => {
6900 terminal_result = Some(Err(e));
6901 break;
6902 }
6903 None => {
6904 terminal_result = Some(Ok(()));
6905 break;
6906 }
6907 }
6908 }
6909
6910 if guard.shared.cancelled.load(Ordering::SeqCst)
6912 || cancelled.load(Ordering::SeqCst)
6913 {
6914 inline_guard.finish(Ok(()));
6915 continue;
6916 }
6917
6918 publish_ready_batch(&guard.shared, stream_id, &lane, local_batch);
6921
6922 match terminal_result {
6923 Some(Err(e)) => {
6924 inline_guard.finish(Err(e));
6927 }
6928 Some(Ok(())) | None => {
6929 inline_guard.finish(Ok(()));
6930 }
6931 }
6932 } else {
6933 inline_guard.hand_off();
6935 let worker_shared = Arc::clone(&guard.shared);
6936 let worker_lane = Arc::clone(&lane);
6937 workers.insert(
6938 stream_id,
6939 worker_materializer.spawn_stream(move |inner_cancelled| {
6940 let mut worker_guard = FlatMapMergeReadyWorkerGuard::new(
6941 Arc::clone(&worker_shared),
6942 stream_id,
6943 );
6944
6945 loop {
6946 if inner_cancelled.load(Ordering::SeqCst)
6947 || worker_shared.cancelled.load(Ordering::SeqCst)
6948 {
6949 worker_guard.finish(Ok(()));
6950 return Ok(NotUsed);
6951 }
6952
6953 let capacity;
6955 {
6956 let mut ls = worker_lane
6957 .state
6958 .lock()
6959 .unwrap_or_else(|p| p.into_inner());
6960 while ls.buffer.len() >= FLAT_MAP_MERGE_SUBSTREAM_WINDOW
6961 && !worker_shared.cancelled.load(Ordering::SeqCst)
6962 && !ls.closed
6963 && !inner_cancelled.load(Ordering::SeqCst)
6964 {
6965 ls = worker_lane
6966 .space_available
6967 .wait(ls)
6968 .unwrap_or_else(|p| p.into_inner());
6969 }
6970 if worker_shared.cancelled.load(Ordering::SeqCst)
6971 || ls.closed
6972 || inner_cancelled.load(Ordering::SeqCst)
6973 {
6974 drop(ls);
6975 worker_guard.finish(Ok(()));
6976 return Ok(NotUsed);
6977 }
6978 capacity =
6979 FLAT_MAP_MERGE_SUBSTREAM_WINDOW - ls.buffer.len();
6980 }
6981 let batch_size = capacity.min(FLAT_MAP_MERGE_READY_BATCH);
6985 let mut local_batch =
6986 std::collections::VecDeque::with_capacity(batch_size);
6987 let mut terminal_result: Option<Result<(), StreamError>> = None;
6988 for _ in 0..batch_size {
6989 if inner_cancelled.load(Ordering::SeqCst)
6990 || worker_shared.cancelled.load(Ordering::SeqCst)
6991 {
6992 break;
6993 }
6994 match stream.next() {
6995 Some(Ok(item)) => local_batch.push_back(item),
6996 Some(Err(e)) => {
6997 terminal_result = Some(Err(e));
6998 break;
6999 }
7000 None => {
7001 terminal_result = Some(Ok(()));
7002 break;
7003 }
7004 }
7005 }
7006 let batch_len = local_batch.len();
7007
7008 publish_ready_batch(
7011 &worker_shared,
7012 stream_id,
7013 &worker_lane,
7014 local_batch,
7015 );
7016
7017 match terminal_result {
7018 Some(Ok(())) => {
7019 worker_guard.finish(Ok(()));
7020 return Ok(NotUsed);
7021 }
7022 Some(Err(e)) => {
7023 worker_guard.finish(Err(e.clone()));
7024 return Err(e);
7025 }
7026 None => {
7027 if batch_len == 0 {
7028 worker_guard.finish(Ok(()));
7029 return Ok(NotUsed);
7030 }
7031 }
7032 }
7033 }
7034 }),
7035 );
7036 }
7037 }
7038 Some(Err(error)) => {
7039 let lanes: Vec<Arc<FlatMapMergeLane<Next>>> = {
7040 let mut coord = guard
7041 .shared
7042 .coordinator
7043 .lock()
7044 .unwrap_or_else(|p| p.into_inner());
7045 if coord.terminal.is_none() {
7046 coord.terminal = Some(error.clone());
7047 }
7048 coord.generation += 1;
7049 coord.lanes.values().cloned().collect()
7050 };
7051 guard.shared.cancelled.store(true, Ordering::SeqCst);
7052 guard.shared.available.notify_all();
7053 for lane in lanes {
7054 lane.space_available.notify_all();
7055 }
7056 guard.finish();
7057 return Err(error);
7058 }
7059 None => {
7060 guard.finish();
7061 break;
7062 }
7063 }
7064 }
7065
7066 if guard.armed {
7067 guard.finish();
7068 }
7069
7070 while !cancelled.load(Ordering::SeqCst) && !guard.shared.cancelled.load(Ordering::SeqCst) {
7074 workers.retain(|_, c| c.try_wait().is_none());
7075 let coord = guard
7076 .shared
7077 .coordinator
7078 .lock()
7079 .unwrap_or_else(|p| p.into_inner());
7080 if coord.active_streams == 0 {
7081 return Ok(NotUsed);
7082 }
7083 let seen = coord.generation;
7084 drop(
7085 guard
7086 .shared
7087 .available
7088 .wait_while(coord, |s| s.generation == seen && s.active_streams > 0)
7089 .unwrap_or_else(|p| p.into_inner()),
7090 );
7091 }
7092 Ok(NotUsed)
7093 });
7094
7095 Box::new(FlatMapMergeReadyStream {
7096 shared,
7097 completion: Some(completion),
7098 local_batch: std::collections::VecDeque::new(),
7099 })
7100}
7101
7102fn flat_map_concat_stream<Out, Next, NextMat, F>(
7103 mut input: BoxStream<Out>,
7104 stage: Arc<F>,
7105 materializer: &Materializer,
7106) -> BoxStream<Next>
7107where
7108 Out: Send + 'static,
7109 Next: Send + 'static,
7110 NextMat: Send + 'static,
7111 F: Fn(Out) -> Source<Next, NextMat> + Send + Sync + 'static,
7112{
7113 let materializer = materializer.with_name_prefix(materializer.name_prefix().to_owned());
7114 let mut current: Option<BoxStream<Next>> = None;
7115 Box::new(std::iter::from_fn(move || {
7116 loop {
7117 if let Some(stream) = current.as_mut() {
7118 match stream.next() {
7119 Some(item) => return Some(item),
7120 None => current = None,
7121 }
7122 }
7123
7124 match input.next() {
7125 Some(Ok(item)) => {
7126 let source = stage(item);
7127 current = Some(match Arc::clone(&source.factory).create(&materializer) {
7128 Ok((stream, _)) => stream,
7129 Err(error) => Box::new(std::iter::once(Err(error))) as BoxStream<Next>,
7130 });
7131 }
7132 Some(Err(error)) => return Some(Err(error)),
7133 None => return None,
7134 }
7135 }
7136 }))
7137}
7138
7139fn map_async_unordered<Out, Next, F, Fut>(
7140 mut input: BoxStream<Out>,
7141 parallelism: usize,
7142 stage: Arc<F>,
7143) -> BoxStream<Next>
7144where
7145 Out: Send + 'static,
7146 Next: Send + 'static,
7147 F: Fn(Out) -> Fut + Send + Sync + 'static,
7148 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7149{
7150 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
7151 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
7152 let mut next_task_id = 0_usize;
7153 let mut input_done = false;
7154
7155 Box::new(std::iter::from_fn(move || {
7156 loop {
7157 while tasks.len() < parallelism && !input_done {
7158 match input.next() {
7159 Some(Ok(item)) => match poll_once_or_pending(stage(item)) {
7160 Ok(result) => return Some(result),
7161 Err(future) => {
7162 let task_id = next_task_id;
7163 next_task_id += 1;
7164 tasks.insert(
7165 task_id,
7166 spawn_completion_task(task_id, future, sender.clone(), |result| {
7167 result
7168 }),
7169 );
7170 }
7171 },
7172 Some(Err(error)) => {
7173 input_done = true;
7174 return Some(Err(error));
7175 }
7176 None => input_done = true,
7177 }
7178 }
7179
7180 if tasks.is_empty() {
7181 return None;
7182 }
7183
7184 if let Some((task_id, result)) = recv_completion(&receiver) {
7185 tasks.remove(&task_id);
7186 return Some(result);
7187 }
7188 }
7189 }))
7190}
7191
7192fn map_async_unordered_supervised<Out, Next, F, Fut>(
7193 mut input: BoxStream<Out>,
7194 parallelism: usize,
7195 stage: Arc<F>,
7196 decider: SupervisionDecider,
7197) -> BoxStream<Next>
7198where
7199 Out: Send + 'static,
7200 Next: Send + 'static,
7201 F: Fn(Out) -> Fut + Send + Sync + 'static,
7202 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7203{
7204 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
7205 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
7206 let mut next_task_id = 0_usize;
7207 let mut input_done = false;
7208
7209 Box::new(std::iter::from_fn(move || {
7210 loop {
7211 while tasks.len() < parallelism && !input_done {
7212 match input.next() {
7213 Some(Ok(item)) => {
7214 match catch_unwind(AssertUnwindSafe(|| poll_once_or_pending(stage(item)))) {
7215 Ok(Ok(result)) => {
7216 if let Some(result) = supervise_async_result(result, &decider) {
7217 return Some(result);
7218 }
7219 }
7220 Ok(Err(future)) => {
7221 let task_id = next_task_id;
7222 next_task_id += 1;
7223 tasks.insert(
7224 task_id,
7225 spawn_completion_task(
7226 task_id,
7227 future,
7228 sender.clone(),
7229 |result| result,
7230 ),
7231 );
7232 }
7233 Err(_) => {
7234 let error = panic_stream_error("map_async_unordered callback");
7235 if let Some(result) = supervise_async_result(Err(error), &decider) {
7236 return Some(result);
7237 }
7238 }
7239 }
7240 }
7241 Some(Err(error)) => {
7242 input_done = true;
7243 return Some(Err(error));
7244 }
7245 None => input_done = true,
7246 }
7247 }
7248
7249 if tasks.is_empty() {
7250 return None;
7251 }
7252
7253 if let Some((task_id, result)) = recv_completion(&receiver) {
7254 tasks.remove(&task_id);
7255 if let Some(result) = supervise_async_result(result, &decider) {
7256 return Some(result);
7257 }
7258 }
7259 }
7260 }))
7261}
7262
7263#[inline(always)]
7264fn map_async_partitioned_serial<Out, Key, Next, Partition, F, Fut>(
7265 mut input: BoxStream<Out>,
7266 partition: Arc<Partition>,
7267 stage: Arc<F>,
7268) -> BoxStream<Next>
7269where
7270 Out: Send + 'static,
7271 Key: Send + 'static,
7272 Next: Send + 'static,
7273 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
7274 F: Fn(Out) -> Fut + Send + Sync + 'static,
7275 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7276{
7277 let (sender, receiver) = std::sync::mpsc::channel::<(usize, StreamResult<Next>)>();
7278 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(1);
7279 let mut next_index = 0_usize;
7280 Box::new(std::iter::from_fn(move || {
7281 if !tasks.is_empty()
7285 && let Some((task_id, result)) = recv_completion(&receiver)
7286 {
7287 tasks.remove(&task_id);
7288 return Some(result);
7289 }
7290
7291 let item = input.next()?;
7292 match item {
7293 Ok(item) => {
7294 let _ = partition(&item);
7295 let index = next_index;
7296 next_index += 1;
7297 Some(match poll_once_or_pending(stage(item)) {
7298 Ok(result) => result,
7299 Err(future) => {
7300 tasks.insert(
7301 index,
7302 spawn_completion_task(index, future, sender.clone(), |result| result),
7303 );
7304 let (task_id, result) =
7305 recv_completion(&receiver).expect("pending map_async task completion");
7306 tasks.remove(&task_id);
7307 result
7308 }
7309 })
7310 }
7311 Err(error) => Some(Err(error)),
7312 }
7313 }))
7314}
7315
7316#[inline(always)]
7317fn map_async_partitioned_scanning<Out, Key, Next, Partition, F, Fut>(
7318 mut input: BoxStream<Out>,
7319 parallelism: usize,
7320 per_partition: usize,
7321 partition: Arc<Partition>,
7322 stage: Arc<F>,
7323) -> BoxStream<Next>
7324where
7325 Out: Send + 'static,
7326 Key: Clone + Eq + Hash + Send + 'static,
7327 Next: Send + 'static,
7328 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
7329 F: Fn(Out) -> Fut + Send + Sync + 'static,
7330 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7331{
7332 let (sender, receiver) = std::sync::mpsc::channel::<(usize, (Key, StreamResult<Next>))>();
7333 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
7334 let mut active_by_key = HashMap::<Key, usize>::with_capacity(parallelism);
7335 let mut pending = VecDeque::<(usize, Key, Out)>::with_capacity(parallelism);
7336 let mut completed = BTreeMap::<usize, StreamResult<Next>>::new();
7337 let mut next_index = 0_usize;
7338 let mut next_to_emit = 0_usize;
7339 let mut input_done = false;
7340
7341 Box::new(std::iter::from_fn(move || {
7342 loop {
7343 if let Some(result) = completed.remove(&next_to_emit) {
7344 next_to_emit += 1;
7345 return Some(result);
7346 }
7347
7348 while tasks.len() + completed.len() < parallelism {
7349 let next = pending
7350 .iter()
7351 .position(|(_, key, _)| {
7352 active_by_key.get(key).copied().unwrap_or(0) < per_partition
7353 })
7354 .and_then(|index| pending.remove(index))
7355 .or_else(|| {
7356 if input_done {
7357 return None;
7358 }
7359 match input.next() {
7360 Some(Ok(item)) => {
7361 let key = partition(&item);
7362 let index = next_index;
7363 next_index += 1;
7364 Some((index, key, item))
7365 }
7366 Some(Err(error)) => {
7367 completed.insert(next_index, Err(error));
7368 next_index += 1;
7369 input_done = true;
7370 None
7371 }
7372 None => {
7373 input_done = true;
7374 None
7375 }
7376 }
7377 });
7378
7379 let Some((index, key, item)) = next else {
7380 break;
7381 };
7382 if active_by_key.get(&key).copied().unwrap_or(0) >= per_partition {
7383 pending.push_back((index, key, item));
7384 if input_done || pending.len() >= parallelism {
7385 break;
7386 }
7387 continue;
7388 }
7389 *active_by_key.entry(key.clone()).or_default() += 1;
7390 match poll_once_or_pending(stage(item)) {
7391 Ok(result) => {
7392 if let Some(count) = active_by_key.get_mut(&key) {
7393 *count -= 1;
7394 if *count == 0 {
7395 active_by_key.remove(&key);
7396 }
7397 }
7398 completed.insert(index, result);
7399 }
7400 Err(future) => {
7401 tasks.insert(
7402 index,
7403 spawn_completion_task(index, future, sender.clone(), move |result| {
7404 (key, result)
7405 }),
7406 );
7407 }
7408 }
7409 }
7410
7411 if let Some(result) = completed.remove(&next_to_emit) {
7412 next_to_emit += 1;
7413 return Some(result);
7414 }
7415
7416 if tasks.is_empty() {
7417 return None;
7418 }
7419
7420 if let Some((index, (key, result))) = recv_completion(&receiver) {
7421 tasks.remove(&index);
7422 if let Some(count) = active_by_key.get_mut(&key) {
7423 *count -= 1;
7424 if *count == 0 {
7425 active_by_key.remove(&key);
7426 }
7427 }
7428 if index == next_to_emit {
7429 next_to_emit += 1;
7430 return Some(result);
7431 }
7432 completed.insert(index, result);
7433 }
7434 }
7435 }))
7436}
7437
7438fn map_async_partitioned<Out, Key, Next, Partition, F, Fut>(
7439 mut input: BoxStream<Out>,
7440 parallelism: usize,
7441 per_partition: usize,
7442 partition: Arc<Partition>,
7443 stage: Arc<F>,
7444) -> BoxStream<Next>
7445where
7446 Out: Send + 'static,
7447 Key: Clone + Eq + Hash + Send + 'static,
7448 Next: Send + 'static,
7449 Partition: Fn(&Out) -> Key + Send + Sync + 'static,
7450 F: Fn(Out) -> Fut + Send + Sync + 'static,
7451 Fut: Future<Output = StreamResult<Next>> + Send + 'static,
7452{
7453 if parallelism == 1 {
7454 return map_async_partitioned_serial(input, partition, stage);
7455 }
7456 if parallelism <= 4 {
7458 return map_async_partitioned_scanning(input, parallelism, per_partition, partition, stage);
7459 }
7460
7461 let (sender, receiver) = std::sync::mpsc::channel::<(usize, (usize, StreamResult<Next>))>();
7462 let mut tasks = HashMap::<usize, AbortOnDropHandle<()>>::with_capacity(parallelism);
7463 let mut slots_by_key = HashMap::<Key, usize>::with_capacity(parallelism);
7464 let mut slots = Vec::<PartitionSlot<Key, Out>>::with_capacity(parallelism);
7465 let mut free_slots = Vec::<usize>::new();
7466 let mut ready_slots = VecDeque::<usize>::with_capacity(parallelism);
7467 let mut completed = BTreeMap::<usize, StreamResult<Next>>::new();
7468 let mut next_index = 0_usize;
7469 let mut next_to_emit = 0_usize;
7470 let mut input_done = false;
7471 Box::new(std::iter::from_fn(move || {
7472 loop {
7473 if let Some(result) = completed.remove(&next_to_emit) {
7474 next_to_emit += 1;
7475 return Some(result);
7476 }
7477
7478 while tasks.len() + completed.len() < parallelism {
7479 if let Some((index, slot, item)) =
7480 pop_ready_partition_slot(&mut slots, &mut ready_slots, per_partition)
7481 {
7482 match poll_once_or_pending(stage(item)) {
7483 Ok(result) => {
7484 let mut remove_empty = false;
7485 if let Some(state) = slots.get_mut(slot) {
7486 state.active -= 1;
7487 remove_empty = state.active == 0
7488 && state.queued.is_empty()
7489 && !state.in_ready_queue;
7490 }
7491 if remove_empty {
7492 retire_partition_slot(
7493 slot,
7494 &mut slots_by_key,
7495 &mut slots,
7496 &mut free_slots,
7497 );
7498 } else {
7499 ready_partition_slot(
7500 &mut slots,
7501 &mut ready_slots,
7502 slot,
7503 per_partition,
7504 );
7505 }
7506 if index == next_to_emit {
7507 next_to_emit += 1;
7508 return Some(result);
7509 }
7510 completed.insert(index, result);
7511 }
7512 Err(future) => {
7513 tasks.insert(
7514 index,
7515 spawn_completion_task(
7516 index,
7517 future,
7518 sender.clone(),
7519 move |result| (slot, result),
7520 ),
7521 );
7522 }
7523 }
7524 continue;
7525 }
7526
7527 if input_done {
7528 break;
7529 }
7530
7531 match input.next() {
7532 Some(Ok(item)) => {
7533 let key = partition(&item);
7534 let index = next_index;
7535 next_index += 1;
7536 let slot =
7537 partition_slot_for(key, &mut slots_by_key, &mut slots, &mut free_slots);
7538 let state = &mut slots[slot];
7539 if state.active < per_partition {
7540 match poll_once_or_pending(stage(item)) {
7541 Ok(result) => {
7542 if index == next_to_emit {
7543 next_to_emit += 1;
7544 if state.queued.is_empty() && !state.in_ready_queue {
7545 retire_partition_slot(
7546 slot,
7547 &mut slots_by_key,
7548 &mut slots,
7549 &mut free_slots,
7550 );
7551 }
7552 return Some(result);
7553 }
7554 completed.insert(index, result);
7555 if state.queued.is_empty() && !state.in_ready_queue {
7556 retire_partition_slot(
7557 slot,
7558 &mut slots_by_key,
7559 &mut slots,
7560 &mut free_slots,
7561 );
7562 }
7563 }
7564 Err(future) => {
7565 state.active += 1;
7566 tasks.insert(
7567 index,
7568 spawn_completion_task(
7569 index,
7570 future,
7571 sender.clone(),
7572 move |result| (slot, result),
7573 ),
7574 );
7575 }
7576 }
7577 } else {
7578 state.queued.push_back((index, item));
7579 }
7580 }
7581 Some(Err(error)) => {
7582 completed.insert(next_index, Err(error));
7583 next_index += 1;
7584 input_done = true;
7585 break;
7586 }
7587 None => {
7588 input_done = true;
7589 break;
7590 }
7591 }
7592 }
7593
7594 if let Some(result) = completed.remove(&next_to_emit) {
7595 next_to_emit += 1;
7596 return Some(result);
7597 }
7598
7599 if tasks.is_empty() {
7600 return None;
7601 }
7602
7603 if let Some((index, (slot, result))) = recv_completion(&receiver) {
7604 tasks.remove(&index);
7605 let mut remove_empty = false;
7606 if let Some(state) = slots.get_mut(slot) {
7607 state.active -= 1;
7608 remove_empty =
7609 state.active == 0 && state.queued.is_empty() && !state.in_ready_queue;
7610 }
7611 if remove_empty {
7612 retire_partition_slot(slot, &mut slots_by_key, &mut slots, &mut free_slots);
7613 } else {
7614 ready_partition_slot(&mut slots, &mut ready_slots, slot, per_partition);
7615 }
7616 if index == next_to_emit {
7617 next_to_emit += 1;
7618 return Some(result);
7619 }
7620 completed.insert(index, result);
7621 }
7622 }
7623 }))
7624}
7625
7626#[cfg(test)]
7627mod flat_map_merge_ready_ring_tests {
7628 use super::*;
7629 use std::sync::mpsc;
7630 use std::time::Duration;
7631
7632 fn run_sorted<T: Ord + Send + 'static>(source: crate::Source<T>) -> Vec<T> {
7633 let mut v = source.run_collect().unwrap();
7634 v.sort_unstable();
7635 v
7636 }
7637
7638 #[test]
7639 fn ready_ring_empty_upstream() {
7640 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7641 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7642 });
7643 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7644 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7645 });
7646 assert_eq!(legacy, ring);
7647 assert!(ring.is_empty());
7648 }
7649
7650 #[test]
7651 fn ready_ring_single_lane() {
7652 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7653 run_sorted(crate::Source::single(42_i32).flat_map_merge(4, crate::Source::single))
7654 });
7655 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7656 run_sorted(crate::Source::single(42_i32).flat_map_merge(4, crate::Source::single))
7657 });
7658 assert_eq!(legacy, ring);
7659 assert_eq!(ring, vec![42]);
7660 }
7661
7662 #[test]
7663 fn ready_ring_breadth_one_exact_order() {
7664 let make = || {
7666 crate::Source::from_iter(0_i32..5)
7667 .flat_map_merge(1, |x| crate::Source::single(x * 10))
7668 .run_collect()
7669 .unwrap()
7670 };
7671 let mut legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7672 let mut ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7673 legacy.sort_unstable();
7674 ring.sort_unstable();
7675 assert_eq!(legacy, ring);
7676 }
7677
7678 #[test]
7679 fn ready_ring_breadth_gt_input() {
7680 let make = || {
7681 run_sorted(
7682 crate::Source::from_iter(0_i32..3)
7683 .flat_map_merge(100, |x| crate::Source::from_iter([x, x + 1, x + 2])),
7684 )
7685 };
7686 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7687 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7688 assert_eq!(legacy, ring);
7689 assert_eq!(ring.len(), 9);
7690 }
7691
7692 #[test]
7693 fn ready_ring_mixed_short_long() {
7694 let make = || {
7695 run_sorted(crate::Source::from_iter(0_i32..8).flat_map_merge(4, |x| {
7696 if x % 3 == 0 {
7697 crate::Source::from_iter(0..20_i32).map(move |i| x * 100 + i)
7698 } else {
7699 crate::Source::single(x)
7700 }
7701 }))
7702 };
7703 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
7704 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7705 assert_eq!(legacy, ring);
7706 }
7707
7708 #[test]
7709 fn ready_ring_respects_breadth_bound() {
7710 use std::sync::atomic::{AtomicUsize, Ordering as Ord};
7711 let active = Arc::new(AtomicUsize::new(0));
7712 let max_active = Arc::new(AtomicUsize::new(0));
7713 let a2 = Arc::clone(&active);
7714 let m2 = Arc::clone(&max_active);
7715 let mut values = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7716 crate::Source::from_iter(0_i32..6)
7717 .flat_map_merge(2, move |item| {
7718 let a = Arc::clone(&a2);
7719 let m = Arc::clone(&m2);
7720 crate::Source::future(move || {
7721 let a = Arc::clone(&a);
7722 let m = Arc::clone(&m);
7723 async move {
7724 let now = a.fetch_add(1, Ord::SeqCst) + 1;
7725 let mut seen = m.load(Ord::SeqCst);
7726 while now > seen {
7727 match m.compare_exchange(seen, now, Ord::SeqCst, Ord::SeqCst) {
7728 Ok(_) => break,
7729 Err(v) => seen = v,
7730 }
7731 }
7732 thread::sleep(Duration::from_millis(20));
7733 a.fetch_sub(1, Ord::SeqCst);
7734 Ok(item)
7735 }
7736 })
7737 })
7738 .run_collect()
7739 .unwrap()
7740 });
7741 values.sort_unstable();
7742 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
7743 assert!(max_active.load(std::sync::atomic::Ordering::SeqCst) <= 2);
7744 }
7745
7746 #[test]
7747 fn ready_ring_fairness_slow_lane_not_starved() {
7748 use std::sync::atomic::{AtomicBool, Ordering as Ord};
7749 let slow_emitted = Arc::new(AtomicBool::new(false));
7750 let slow_flag = Arc::clone(&slow_emitted);
7751 let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7752 crate::Source::from_iter(0_i32..3)
7753 .flat_map_merge(4, move |lane_id| {
7754 let flag = Arc::clone(&slow_flag);
7755 match lane_id {
7756 0 => crate::Source::from_iter(0_i32..50),
7757 1 => crate::Source::from_iter(100_i32..150),
7758 _ => crate::Source::future(move || {
7759 let flag = Arc::clone(&flag);
7760 async move {
7761 thread::sleep(Duration::from_millis(10));
7762 flag.store(true, Ord::SeqCst);
7763 Ok(999_i32)
7764 }
7765 }),
7766 }
7767 })
7768 .run_collect()
7769 .unwrap()
7770 });
7771 assert!(slow_emitted.load(std::sync::atomic::Ordering::SeqCst));
7772 assert!(results.contains(&999));
7773 assert_eq!(results.len(), 101);
7774 }
7775
7776 #[test]
7777 fn ready_ring_inner_failure_no_hang() {
7778 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7779 crate::Source::from_iter(0_i32..8)
7780 .flat_map_merge(4, |x| {
7781 if x == 3 {
7782 crate::Source::failed(StreamError::Failed("lane-fail".into()))
7783 } else {
7784 crate::Source::from_iter(0_i32..10)
7785 }
7786 })
7787 .run_collect()
7788 });
7789 assert_eq!(result, Err(StreamError::Failed("lane-fail".into())));
7790 }
7791
7792 #[test]
7793 fn ready_ring_factory_failure_propagates() {
7794 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7795 crate::Source::from_iter(0_i32..4)
7796 .flat_map_merge(2, |x| {
7797 if x == 2 {
7798 crate::Source::failed(StreamError::Failed("factory-fail".into()))
7799 } else {
7800 crate::Source::single(x)
7801 }
7802 })
7803 .run_collect()
7804 });
7805 assert!(result.is_err());
7806 }
7807
7808 #[test]
7809 fn ready_ring_closure_not_under_coordinator_lock() {
7810 let guard_mutex = Arc::new(std::sync::Mutex::<()>::new(()));
7811 let gm = Arc::clone(&guard_mutex);
7812 let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7813 crate::Source::from_iter(0_i32..10)
7814 .flat_map_merge(4, move |x| {
7815 let _lock = gm.lock().unwrap();
7816 crate::Source::single(x)
7817 })
7818 .run_collect()
7819 .unwrap()
7820 });
7821 assert_eq!(results.len(), 10);
7822 }
7823
7824 #[test]
7831 fn ready_ring_bounded_memory_producer_blocks_at_window() {
7832 const WINDOW: usize = FLAT_MAP_MERGE_SUBSTREAM_WINDOW;
7833 const EXTRA: usize = 4;
7834 let (gate_tx, gate_rx) = mpsc::channel::<()>();
7837 let gate_tx = Arc::new(std::sync::Mutex::new(gate_tx));
7838 let gate_tx2 = Arc::clone(&gate_tx);
7839 let produced = Arc::new(std::sync::atomic::AtomicUsize::new(0));
7840 let prod2 = Arc::clone(&produced);
7841
7842 let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7843 crate::Source::single(0_i32)
7844 .flat_map_merge(2, move |_| {
7845 let tx = Arc::clone(&gate_tx2);
7846 let prod = Arc::clone(&prod2);
7847 crate::Source::from_factory(move || {
7848 let tx = Arc::clone(&tx);
7849 let prod = Arc::clone(&prod);
7850 let mut i = 0_i32;
7851 Box::new(std::iter::from_fn(move || {
7852 if i as usize >= WINDOW + EXTRA {
7853 return None;
7854 }
7855 if i as usize == WINDOW {
7856 let _ = tx.lock().unwrap().send(());
7858 }
7859 prod.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
7860 i += 1;
7861 Some(Ok(i))
7862 }))
7863 })
7864 })
7865 .run_with(crate::Sink::queue())
7866 .unwrap()
7867 });
7868
7869 let mut total = 0;
7871 while queue.pull().unwrap().is_some() {
7872 total += 1;
7873 }
7874 let signal = gate_rx.recv_timeout(Duration::from_secs(1));
7876 assert!(signal.is_ok(), "producer never reached the window boundary");
7877 assert_eq!(total, WINDOW + EXTRA);
7878 }
7879
7880 #[test]
7881 fn ready_ring_cancellation_wakes_blocked_lanes() {
7882 let rt = crate::stream::runtime::Runtime::new();
7883 let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7884 crate::Source::from_iter(0_i32..4)
7885 .flat_map_merge(4, |_| crate::Source::repeat(1_i32))
7886 .run_with_materializer(crate::Sink::queue(), &rt)
7887 .unwrap()
7888 });
7889 for _ in 0..8 {
7890 let _ = queue.pull();
7891 }
7892 drop(queue);
7893 rt.shutdown();
7894 }
7895
7896 #[test]
7897 fn ready_ring_lost_wakeup_stress() {
7898 for _ in 0..20 {
7899 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7900 crate::Source::from_iter(0_i32..50)
7901 .flat_map_merge(8, |item| {
7902 crate::Source::from_iter([item, item + 1, item + 2])
7903 })
7904 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v as i64))
7905 .unwrap()
7906 .wait()
7907 });
7908 assert_eq!(result, Ok(3825), "lost-wakeup stress: wrong sum");
7909 }
7910 }
7911
7912 #[test]
7913 fn ready_ring_concurrent_streams_lost_wakeup_stress() {
7914 const STREAMS: usize = 32;
7915 const ROUNDS: usize = 8;
7916 const EXPECTED: i64 = 998_080;
7917
7918 for _ in 0..ROUNDS {
7919 let barrier = Arc::new(std::sync::Barrier::new(STREAMS));
7920 let mut handles = Vec::with_capacity(STREAMS);
7921 for _ in 0..STREAMS {
7922 let barrier = Arc::clone(&barrier);
7923 handles.push(thread::spawn(move || {
7924 barrier.wait();
7925 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7926 crate::Source::from_iter(0_i64..32)
7927 .flat_map_merge(8, |item| {
7928 crate::Source::from_iter(item * 100..item * 100 + 20)
7929 })
7930 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
7931 .unwrap()
7932 .wait()
7933 });
7934 assert_eq!(result, Ok(EXPECTED), "concurrent ready-ring sum");
7935 }));
7936 }
7937
7938 for handle in handles {
7939 handle.join().expect("ready-ring stress worker panicked");
7940 }
7941 }
7942 }
7943
7944 #[test]
7945 fn ready_ring_tail_loop_stress() {
7946 for _ in 0..20 {
7947 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7948 crate::Source::from_iter(0_i64..100)
7949 .flat_map_merge(16, |item| crate::Source::from_iter([item, item + 1000]))
7950 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
7951 .unwrap()
7952 .wait()
7953 });
7954 assert_eq!(result, Ok(109_900), "tail-loop stress: wrong sum");
7955 }
7956 }
7957
7958 #[test]
7959 fn ready_ring_auto_mode_matches_ring() {
7960 let make = || {
7961 run_sorted(
7962 crate::Source::from_iter(0_i32..10)
7963 .flat_map_merge(4, |x| crate::Source::from_iter([x, x + 100])),
7964 )
7965 };
7966 let auto = with_substream_mode(SubstreamExecutorMode::Auto, make);
7967 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
7968 assert_eq!(auto, ring);
7969 }
7970}
7971
7972#[cfg(test)]
7974mod inline_micro_source_tests {
7975 use super::*;
7976 use crate::stream::source::test_source_with_inline_micro_hint;
7977 use std::sync::mpsc;
7978
7979 fn run_sorted<T: Ord + Send + 'static>(source: crate::Source<T>) -> Vec<T> {
7980 let mut v = source.run_collect().unwrap();
7981 v.sort_unstable();
7982 v
7983 }
7984
7985 #[test]
7988 fn inline_empty_upstream() {
7989 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
7990 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7991 });
7992 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
7993 run_sorted(crate::Source::<i32>::empty().flat_map_merge(4, crate::Source::single))
7994 });
7995 assert_eq!(legacy, ring);
7996 assert!(ring.is_empty());
7997 }
7998
7999 #[test]
8000 fn inline_single_inner_source() {
8001 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, || {
8002 run_sorted(
8003 crate::Source::single(99_i32).flat_map_merge(4, |x| crate::Source::single(x * 2)),
8004 )
8005 });
8006 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
8007 run_sorted(
8008 crate::Source::single(99_i32).flat_map_merge(4, |x| crate::Source::single(x * 2)),
8009 )
8010 });
8011 assert_eq!(legacy, ring);
8012 assert_eq!(ring, vec![198]);
8013 }
8014
8015 #[test]
8016 fn inline_breadth_one_exact_order() {
8017 let make = || {
8019 crate::Source::from_iter(0_i32..6)
8020 .flat_map_merge(1, |x| {
8021 crate::Source::from_iter([x * 10, x * 10 + 1, x * 10 + 2, x * 10 + 3])
8022 })
8023 .run_collect()
8024 .unwrap()
8025 };
8026 let mut legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
8027 let mut ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
8028 legacy.sort_unstable();
8029 ring.sort_unstable();
8030 assert_eq!(legacy, ring);
8031 }
8032
8033 #[test]
8034 fn inline_breadth_gt_input() {
8035 let make = || {
8036 run_sorted(
8037 crate::Source::from_iter(0_i32..3)
8038 .flat_map_merge(100, |x| crate::Source::from_iter([x, x + 1, x + 2, x + 3])),
8039 )
8040 };
8041 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
8042 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
8043 assert_eq!(legacy, ring);
8044 assert_eq!(ring.len(), 12);
8045 }
8046
8047 #[test]
8048 fn inline_mixed_empty_single_four_item() {
8049 let make = || {
8050 run_sorted(
8051 crate::Source::from_iter(0_i32..12).flat_map_merge(4, |x| match x % 3 {
8052 0 => crate::Source::empty(),
8053 1 => crate::Source::single(x),
8054 _ => crate::Source::from_iter([x, x + 100, x + 200, x + 300]),
8055 }),
8056 )
8057 };
8058 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
8059 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
8060 assert_eq!(legacy, ring);
8061 }
8062
8063 #[test]
8064 fn inline_2k_x4_b8_benchmark_shape() {
8065 let make = || {
8066 run_sorted(
8067 crate::Source::from_iter(0_i32..2_000).flat_map_merge(8, |item| {
8068 crate::Source::from_iter([item, item + 1, item + 2, item + 3])
8069 }),
8070 )
8071 };
8072 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
8073 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
8074 assert_eq!(legacy, ring);
8075 assert_eq!(ring.len(), 8_000);
8076 }
8077
8078 #[test]
8081 fn inline_lost_wakeup_stress() {
8082 for _ in 0..20 {
8083 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
8084 crate::Source::from_iter(0_i32..50)
8085 .flat_map_merge(8, |item| {
8086 crate::Source::from_iter([item, item + 1, item + 2, item + 3])
8087 })
8088 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v as i64))
8089 .unwrap()
8090 .wait()
8091 });
8092 assert_eq!(result, Ok(5200), "lost-wakeup stress: wrong sum");
8096 }
8097 }
8098
8099 #[test]
8100 fn inline_tail_loop_stress() {
8101 for _ in 0..20 {
8102 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
8103 crate::Source::from_iter(0_i64..100)
8104 .flat_map_merge(16, |item| crate::Source::from_iter([item, item + 1000]))
8105 .run_with(crate::Sink::fold(0i64, |acc, v| acc + v))
8106 .unwrap()
8107 .wait()
8108 });
8109 assert_eq!(result, Ok(109_900), "tail-loop stress: wrong sum");
8111 }
8112 }
8113
8114 #[test]
8117 fn inline_large_source_uses_worker_fallback() {
8118 let make = || {
8121 run_sorted(crate::Source::from_iter(0_i32..4).flat_map_merge(2, |x| {
8122 crate::Source::from_iter(0_i32..20).map(move |i| x * 100 + i)
8123 }))
8124 };
8125 let legacy = with_substream_mode(SubstreamExecutorMode::LegacyOnly, make);
8126 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
8127 assert_eq!(legacy, ring);
8128 assert_eq!(ring.len(), 80);
8129 }
8130
8131 #[test]
8134 fn inline_inner_error_before_any_item() {
8135 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
8136 crate::Source::from_iter(0_i32..4)
8137 .flat_map_merge(2, |x| {
8138 if x == 1 {
8139 crate::Source::failed(StreamError::Failed("inline-err".into()))
8140 } else {
8141 crate::Source::from_iter([x, x + 1])
8142 }
8143 })
8144 .run_collect()
8145 });
8146 assert_eq!(result, Err(StreamError::Failed("inline-err".into())));
8147 }
8148
8149 #[test]
8150 fn inline_inner_error_after_some_items() {
8151 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
8155 crate::Source::single(0_i32)
8156 .flat_map_merge(2, |_x| {
8157 test_source_with_inline_micro_hint(
8158 || {
8159 let mut count = 0;
8160 Box::new(std::iter::from_fn(move || {
8161 count += 1;
8162 match count {
8163 1 => Some(Ok(42_i32)),
8164 2 => Some(Err(StreamError::Failed("after-items".into()))),
8165 _ => None,
8166 }
8167 }))
8168 },
8169 1, )
8171 })
8172 .run_collect()
8173 });
8174 assert!(result.is_err());
8178 }
8179
8180 #[test]
8181 fn inline_worker_lane_fails_during_inline_drain() {
8182 let result = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
8185 crate::Source::from_iter(0_i32..10)
8186 .flat_map_merge(2, |x| {
8187 if x == 5 {
8188 crate::Source::failed(StreamError::Failed("worker-fail".into()))
8189 } else if x % 2 == 0 {
8190 crate::Source::from_iter([x, x + 1, x + 2])
8192 } else {
8193 crate::Source::from_iter(0_i32..40).map(move |i| x * 100 + i)
8195 }
8196 })
8197 .run_collect()
8198 });
8199 assert!(result.is_err());
8200 }
8201
8202 #[test]
8205 fn inline_cancellation_drop_output() {
8206 let rt = crate::stream::runtime::Runtime::new();
8209 let queue = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
8210 crate::Source::from_iter(0_i32..100)
8211 .flat_map_merge(8, |x| {
8212 if x % 3 == 0 {
8213 crate::Source::from_iter([x, x + 1, x + 2, x + 3]) } else {
8215 crate::Source::repeat(x) }
8217 })
8218 .run_with_materializer(crate::Sink::queue(), &rt)
8219 .unwrap()
8220 });
8221 for _ in 0..16 {
8223 let _ = queue.pull();
8224 }
8225 drop(queue);
8226 rt.shutdown();
8227 }
8228
8229 #[test]
8232 fn inline_next_not_under_coordinator_lock() {
8233 let (tx, rx) = mpsc::channel::<()>();
8241 let tx = Arc::new(std::sync::Mutex::new(tx));
8242 let tx2 = Arc::clone(&tx);
8243
8244 let results = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, || {
8245 crate::Source::from_iter(0_i32..2)
8246 .flat_map_merge(2, move |x| {
8247 let sig = Arc::clone(&tx2);
8248 if x == 0 {
8249 crate::Source::from_iter(0_i32..40)
8251 } else {
8252 test_source_with_inline_micro_hint(
8255 move || {
8256 let sig = Arc::clone(&sig);
8257 let mut emitted = false;
8258 Box::new(std::iter::from_fn(move || {
8259 if !emitted {
8260 emitted = true;
8261 let _ = sig.lock().unwrap().send(());
8262 Some(Ok(999_i32))
8263 } else {
8264 None
8265 }
8266 }))
8267 },
8268 1,
8269 )
8270 }
8271 })
8272 .run_collect()
8273 .unwrap()
8274 });
8275
8276 let signal = rx.recv_timeout(std::time::Duration::from_secs(5));
8278 assert!(signal.is_ok(), "inline next() never ran");
8279 assert!(results.contains(&999));
8280 assert_eq!(results.len(), 41);
8281 }
8282
8283 #[test]
8286 fn inline_auto_matches_readyring() {
8287 let make = || {
8288 run_sorted(
8289 crate::Source::from_iter(0_i32..20)
8290 .flat_map_merge(4, |x| crate::Source::from_iter([x, x + 1, x + 2, x + 3])),
8291 )
8292 };
8293 let auto = with_substream_mode(SubstreamExecutorMode::Auto, make);
8294 let ring = with_substream_mode(SubstreamExecutorMode::ReadyRingOnly, make);
8295 assert_eq!(auto, ring);
8296 }
8297}
8298
8299#[cfg(test)]
8301mod split_sink_fast_path_tests {
8302 use super::*;
8303
8304 fn run_split_fold(
8306 items: Vec<u64>,
8307 split_mode: SplitMode,
8308 executor_mode: SubstreamExecutorMode,
8309 ) -> Vec<u64> {
8310 with_substream_mode(executor_mode, || {
8311 let source = match split_mode {
8312 SplitMode::When => crate::Source::from_iter(items).split_when(|x| x % 10 == 0),
8313 SplitMode::After => crate::Source::from_iter(items).split_after(|x| x % 10 == 0),
8314 };
8315 source
8316 .run_with(crate::Sink::fold(
8317 Vec::new(),
8318 |mut acc, seg: crate::Source<u64>| {
8319 let sum = seg
8320 .run_with(crate::Sink::fold(0u64, |a, x| a + x))
8321 .unwrap()
8322 .wait()
8323 .unwrap();
8324 acc.push(sum);
8325 acc
8326 },
8327 ))
8328 .unwrap()
8329 .wait()
8330 .unwrap()
8331 })
8332 }
8333
8334 fn run_split_collect_segments(
8336 items: Vec<u64>,
8337 split_mode: SplitMode,
8338 executor_mode: SubstreamExecutorMode,
8339 ) -> Vec<Vec<u64>> {
8340 with_substream_mode(executor_mode, || {
8341 let source = match split_mode {
8342 SplitMode::When => crate::Source::from_iter(items).split_when(move |x| x % 10 == 0),
8343 SplitMode::After => {
8344 crate::Source::from_iter(items).split_after(move |x| x % 10 == 0)
8345 }
8346 };
8347 source
8348 .run_with(crate::Sink::fold(
8349 Vec::new(),
8350 |mut acc, seg: crate::Source<u64>| {
8351 let v = seg
8352 .run_with(crate::Sink::collect())
8353 .unwrap()
8354 .wait()
8355 .unwrap();
8356 acc.push(v);
8357 acc
8358 },
8359 ))
8360 .unwrap()
8361 .wait()
8362 .unwrap()
8363 })
8364 }
8365
8366 #[test]
8369 fn split_fast_equivalence_empty_input() {
8370 for sm in [SplitMode::When, SplitMode::After] {
8371 let legacy = run_split_collect_segments(vec![], sm, SubstreamExecutorMode::LegacyOnly);
8372 let fast = run_split_collect_segments(vec![], sm, SubstreamExecutorMode::SplitSinkOnly);
8373 assert_eq!(legacy, fast, "empty input, mode {sm:?}");
8374 }
8375 }
8376
8377 #[test]
8378 fn split_fast_equivalence_no_boundaries() {
8379 let items: Vec<u64> = (1..=9).collect();
8380 for sm in [SplitMode::When, SplitMode::After] {
8381 let legacy =
8382 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8383 let fast =
8384 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8385 assert_eq!(legacy, fast, "no boundaries, mode {sm:?}");
8386 }
8387 }
8388
8389 #[test]
8390 fn split_fast_equivalence_first_element_boundary_when() {
8391 let items: Vec<u64> = vec![10, 1, 2, 3];
8393 let legacy = run_split_collect_segments(
8394 items.clone(),
8395 SplitMode::When,
8396 SubstreamExecutorMode::LegacyOnly,
8397 );
8398 let fast = run_split_collect_segments(
8399 items,
8400 SplitMode::When,
8401 SubstreamExecutorMode::SplitSinkOnly,
8402 );
8403 assert_eq!(legacy, fast);
8404 }
8405
8406 #[test]
8407 fn split_fast_equivalence_first_element_boundary_after() {
8408 let items: Vec<u64> = vec![10, 1, 2, 3];
8410 let legacy = run_split_collect_segments(
8411 items.clone(),
8412 SplitMode::After,
8413 SubstreamExecutorMode::LegacyOnly,
8414 );
8415 let fast = run_split_collect_segments(
8416 items,
8417 SplitMode::After,
8418 SubstreamExecutorMode::SplitSinkOnly,
8419 );
8420 assert_eq!(legacy, fast);
8421 }
8422
8423 #[test]
8424 fn split_fast_equivalence_consecutive_matches() {
8425 let items: Vec<u64> = vec![10, 20, 30, 1, 2, 40];
8426 for sm in [SplitMode::When, SplitMode::After] {
8427 let legacy =
8428 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8429 let fast =
8430 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8431 assert_eq!(legacy, fast, "consecutive matches, mode {sm:?}");
8432 }
8433 }
8434
8435 #[test]
8436 fn split_fast_equivalence_last_element_boundary() {
8437 let items: Vec<u64> = vec![1, 2, 3, 10];
8438 for sm in [SplitMode::When, SplitMode::After] {
8439 let legacy =
8440 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8441 let fast =
8442 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8443 assert_eq!(legacy, fast, "last element boundary, mode {sm:?}");
8444 }
8445 }
8446
8447 #[test]
8448 fn split_fast_equivalence_mixed() {
8449 let items: Vec<u64> = (0..50u64).collect();
8450 for sm in [SplitMode::When, SplitMode::After] {
8451 let legacy =
8452 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8453 let fast =
8454 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8455 assert_eq!(legacy, fast, "mixed 0..50, mode {sm:?}");
8456 }
8457 }
8458
8459 #[test]
8460 fn split_fast_equivalence_fold_sums() {
8461 let items: Vec<u64> = (0..50u64).collect();
8462 for sm in [SplitMode::When, SplitMode::After] {
8463 let legacy = run_split_fold(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8464 let fast = run_split_fold(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8465 assert_eq!(legacy, fast, "fold sums, mode {sm:?}");
8466 }
8467 }
8468
8469 #[test]
8470 fn split_fast_equivalence_with_collect() {
8471 let items: Vec<u64> = (0..312u64).collect();
8473 for sm in [SplitMode::When, SplitMode::After] {
8474 let legacy =
8475 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::LegacyOnly);
8476 let fast =
8477 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8478 assert_eq!(legacy, fast, "collect 312 items, mode {sm:?}");
8479 }
8480 }
8481
8482 #[test]
8485 fn split_fast_fold_result_equivalence() {
8486 let items: Vec<u64> = (0..50u64).collect();
8487 let run = |executor_mode| {
8488 with_substream_mode(executor_mode, || {
8489 crate::Source::from_iter(items.clone())
8490 .split_when(move |x| x % 10 == 0)
8491 .run_with(crate::Sink::fold(
8492 Vec::new(),
8493 |mut acc, seg: crate::Source<u64>| {
8494 let sum = seg
8495 .run_with(crate::Sink::fold_result(0u64, |a, x| Ok(a + x)))
8496 .unwrap()
8497 .wait()
8498 .unwrap();
8499 acc.push(sum);
8500 acc
8501 },
8502 ))
8503 .unwrap()
8504 .wait()
8505 .unwrap()
8506 })
8507 };
8508 assert_eq!(
8509 run(SubstreamExecutorMode::LegacyOnly),
8510 run(SubstreamExecutorMode::SplitSinkOnly)
8511 );
8512 }
8513
8514 #[test]
8517 fn split_fast_ignore_equivalence() {
8518 let items: Vec<u64> = (0..50u64).collect();
8519 let run = |executor_mode| {
8520 with_substream_mode(executor_mode, || {
8521 crate::Source::from_iter(items.clone())
8522 .split_when(move |x| x % 10 == 0)
8523 .run_with(crate::Sink::fold(0u64, |count, seg: crate::Source<u64>| {
8524 seg.run_with(crate::Sink::ignore()).unwrap().wait().unwrap();
8525 count + 1
8526 }))
8527 .unwrap()
8528 .wait()
8529 .unwrap()
8530 })
8531 };
8532 let legacy = run(SubstreamExecutorMode::LegacyOnly);
8533 let fast = run(SubstreamExecutorMode::SplitSinkOnly);
8534 assert_eq!(legacy, fast, "ignore segment counts must match");
8535 }
8536
8537 #[test]
8540 fn split_fast_one_shot_cannot_materialize_twice() {
8541 with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8542 let materializer = crate::Runtime::default();
8543 let result = crate::Source::from_iter(1u64..=5)
8544 .split_when(|x| x % 3 == 0)
8545 .run_with(crate::Sink::fold(0u64, |_, seg: crate::Source<u64>| {
8546 let c1 = seg.clone().run_with(crate::Sink::fold(0u64, |a, x| a + x));
8548 let c2 = seg.run_with(crate::Sink::fold(0u64, |a, x| a + x));
8550 assert!(c1.is_ok(), "first materialization should succeed");
8551 assert!(c2.is_err(), "second materialization should fail: {c2:?}");
8552 let _ = c1.unwrap().wait();
8553 0u64
8554 }));
8555 let _ = result;
8556 let _ = &materializer;
8557 });
8558 }
8559
8560 #[test]
8563 fn split_fast_predicate_panic_both_modes() {
8564 for sm in [SplitMode::When, SplitMode::After] {
8567 let result = with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8568 let source = match sm {
8569 SplitMode::When => crate::Source::from_iter(0u64..10).split_when(|x| {
8570 if *x == 5 {
8571 panic!("test panic");
8572 }
8573 x % 3 == 0
8574 }),
8575 SplitMode::After => crate::Source::from_iter(0u64..10).split_after(|x| {
8576 if *x == 5 {
8577 panic!("test panic");
8578 }
8579 x % 3 == 0
8580 }),
8581 };
8582 source
8583 .run_with(crate::Sink::fold(
8584 Vec::<u64>::new(),
8585 |mut acc, seg: crate::Source<u64>| {
8586 let completion = seg.run_with(crate::Sink::ignore());
8588 if let Ok(c) = completion {
8589 let _ = c.wait();
8590 }
8591 acc.push(0u64);
8592 acc
8593 },
8594 ))
8595 .map(|c| c.wait())
8596 });
8597 let _ = result;
8599 }
8600 }
8601
8602 #[test]
8605 fn split_fast_stress_20x() {
8606 for i in 0..20 {
8607 let items: Vec<u64> = (0..10_000u64).collect();
8608 for sm in [SplitMode::When, SplitMode::After] {
8609 let fast = run_split_collect_segments(
8610 items.clone(),
8611 sm,
8612 SubstreamExecutorMode::SplitSinkOnly,
8613 );
8614 let legacy = run_split_collect_segments(
8615 items.clone(),
8616 sm,
8617 SubstreamExecutorMode::LegacyOnly,
8618 );
8619 assert_eq!(
8620 fast.len(),
8621 legacy.len(),
8622 "stress run {i} segment count mismatch, mode {sm:?}"
8623 );
8624 assert_eq!(
8625 fast.iter().flatten().sum::<u64>(),
8626 legacy.iter().flatten().sum::<u64>(),
8627 "stress run {i} sum mismatch, mode {sm:?}"
8628 );
8629 }
8630 }
8631 }
8632
8633 #[test]
8636 fn split_fast_auto_mode_matches_fast() {
8637 let items: Vec<u64> = (0..50u64).collect();
8638 for sm in [SplitMode::When, SplitMode::After] {
8639 let auto = run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::Auto);
8640 let fast =
8641 run_split_collect_segments(items.clone(), sm, SubstreamExecutorMode::SplitSinkOnly);
8642 assert_eq!(auto, fast, "auto == fast, mode {sm:?}");
8643 }
8644 }
8645
8646 #[test]
8649 fn split_fast_fallback_path_via_foreach() {
8650 use std::sync::atomic::{AtomicU64, Ordering as Ord};
8653 let total = Arc::new(AtomicU64::new(0));
8654 let total2 = Arc::clone(&total);
8655 let result = with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8656 crate::Source::from_iter(0u64..30)
8657 .split_when(|x| x % 10 == 0)
8658 .run_with(crate::Sink::fold(
8659 Vec::new(),
8660 move |mut acc, seg: crate::Source<u64>| {
8661 let t = Arc::clone(&total2);
8662 seg.run_with(crate::Sink::foreach(move |x| {
8665 t.fetch_add(x, Ord::SeqCst);
8666 }))
8667 .unwrap()
8668 .wait()
8669 .unwrap();
8670 acc.push(1u64);
8671 acc
8672 },
8673 ))
8674 .unwrap()
8675 .wait()
8676 .unwrap()
8677 });
8678 assert_eq!(result.len(), 3, "should have 3 segments");
8679 assert_eq!(
8681 total.load(std::sync::atomic::Ordering::SeqCst),
8682 (0..30u64).sum::<u64>()
8683 );
8684 }
8685
8686 #[test]
8689 fn split_fast_liveness_segment_count_when() {
8690 let items: Vec<u64> = (0..30u64).collect();
8691 let fast = run_split_collect_segments(
8692 items.clone(),
8693 SplitMode::When,
8694 SubstreamExecutorMode::SplitSinkOnly,
8695 );
8696 let legacy =
8697 run_split_collect_segments(items, SplitMode::When, SubstreamExecutorMode::LegacyOnly);
8698 assert_eq!(fast.len(), legacy.len());
8699 }
8700
8701 #[test]
8702 fn split_fast_liveness_segment_count_after() {
8703 let items: Vec<u64> = (0..30u64).collect();
8704 let fast = run_split_collect_segments(
8705 items.clone(),
8706 SplitMode::After,
8707 SubstreamExecutorMode::SplitSinkOnly,
8708 );
8709 let legacy =
8710 run_split_collect_segments(items, SplitMode::After, SubstreamExecutorMode::LegacyOnly);
8711 assert_eq!(fast.len(), legacy.len());
8712 }
8713
8714 #[test]
8721 fn split_fast_bounded_memory_rendezvous() {
8722 use std::sync::{
8723 atomic::{AtomicBool, AtomicUsize, Ordering},
8724 mpsc,
8725 };
8726 use std::time::{Duration, Instant};
8727
8728 const CAPACITY: usize = LIVE_SUBSTREAM_CAPACITY;
8729 const BATCH: usize = LIVE_SUBSTREAM_BATCH;
8730 const TOTAL: usize = CAPACITY * 2;
8731 const MAX_IN_FLIGHT: usize = CAPACITY + BATCH;
8734
8735 let produced = Arc::new(AtomicUsize::new(0));
8736 let consumed = Arc::new(AtomicUsize::new(0));
8737 let bound_violated = Arc::new(AtomicBool::new(false));
8738
8739 let (item_tx, item_rx) = mpsc::channel::<u64>();
8741
8742 let prod_for_factory = Arc::clone(&produced);
8743 let prod_for_fold = Arc::clone(&produced);
8744 let cons_for_fold = Arc::clone(&consumed);
8745 let bv_for_fold = Arc::clone(&bound_violated);
8746
8747 let join = std::thread::spawn(move || {
8748 with_substream_mode(SubstreamExecutorMode::SplitSinkOnly, || {
8749 crate::Source::from_factory(move || {
8750 let prod = Arc::clone(&prod_for_factory);
8751 let mut i = 0u64;
8752 Box::new(std::iter::from_fn(move || {
8753 if i as usize >= TOTAL {
8754 return None;
8755 }
8756 prod.fetch_add(1, Ordering::SeqCst);
8757 let val = i;
8758 i += 1;
8759 Some(Ok(val))
8760 }))
8761 })
8762 .split_when(|_| false)
8764 .run_with(crate::Sink::fold(
8765 0usize,
8766 move |count, seg: crate::Source<u64>| {
8767 let cons = Arc::clone(&cons_for_fold);
8768 let bv = Arc::clone(&bv_for_fold);
8769 let prod = Arc::clone(&prod_for_fold);
8770 let itx = item_tx.clone();
8772 seg.run_with(crate::Sink::foreach(move |x: u64| {
8774 let c = cons.fetch_add(1, Ordering::SeqCst) + 1;
8775 let p = prod.load(Ordering::SeqCst);
8776 if p > c + MAX_IN_FLIGHT {
8777 bv.store(true, Ordering::SeqCst);
8778 }
8779 let _ = itx.send(x);
8780 }))
8781 .unwrap()
8782 .wait()
8783 .unwrap();
8784 count + 1
8785 },
8786 ))
8787 .unwrap()
8788 .wait()
8789 .unwrap()
8790 })
8791 });
8792
8793 let mut received = Vec::with_capacity(TOTAL);
8798 let timeout = Duration::from_secs(60);
8799 let deadline = Instant::now() + timeout;
8800 for i in 0..TOTAL {
8801 let remaining = deadline.saturating_duration_since(Instant::now());
8802 if remaining == Duration::ZERO {
8803 panic!(
8804 "deadlock: received {} of {TOTAL} items within {timeout:?}",
8805 received.len()
8806 );
8807 }
8808 match item_rx.recv_timeout(remaining) {
8809 Ok(item) => received.push(item),
8810 Err(mpsc::RecvTimeoutError::Timeout) => {
8811 panic!(
8812 "deadlock: no item {i} before {timeout:?} rendezvous deadline; received {} of {TOTAL}",
8813 received.len()
8814 )
8815 }
8816 Err(mpsc::RecvTimeoutError::Disconnected) => {
8817 panic!("stream ended early at item {i}")
8818 }
8819 }
8820 }
8821
8822 let seg_count = join.join().expect("stream thread panicked");
8823
8824 assert!(
8826 !bound_violated.load(Ordering::SeqCst),
8827 "bound violated: producer ran >MAX_IN_FLIGHT={MAX_IN_FLIGHT} ahead of consumer"
8828 );
8829
8830 assert_eq!(seg_count, 1, "expected exactly 1 segment");
8832 assert_eq!(received.len(), TOTAL, "not all items received");
8833 let expected: Vec<u64> = (0..TOTAL as u64).collect();
8834 assert_eq!(received, expected, "items not in correct order");
8835 }
8836}