1use super::*;
2
3#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
20pub(crate) enum ExecutorMode {
21 #[default]
23 Auto,
24 ErasedOnly,
26 TypedOnly,
28}
29
30impl<In, Out> GraphBlueprint<FlowShape<In, Out>>
31where
32 In: Clone + Send + 'static,
33 Out: Send + 'static,
34{
35 pub fn run_with_input<I>(&self, input: I) -> StreamResult<Vec<Out>>
36 where
37 I: IntoIterator<Item = In>,
38 {
39 Ok(self
40 .run_with_input_report(input, FusedExecutionConfig::default())?
41 .output)
42 }
43
44 pub fn run_with_input_report<I>(
45 &self,
46 input: I,
47 config: FusedExecutionConfig,
48 ) -> StreamResult<FusedExecutionReport<Out>>
49 where
50 I: IntoIterator<Item = In>,
51 {
52 self.run_with_input_report_mode(input, config, ExecutorMode::Auto)
53 }
54
55 pub(crate) fn run_with_input_report_mode<I>(
56 &self,
57 input: I,
58 config: FusedExecutionConfig,
59 mode: ExecutorMode,
60 ) -> StreamResult<FusedExecutionReport<Out>>
61 where
62 I: IntoIterator<Item = In>,
63 {
64 if mode != ExecutorMode::ErasedOnly {
66 let linear_plan = try_typed_flow_plan::<In, Out>(
68 &self.stages,
69 &self.edges,
70 self.shape.inlet().id(),
71 self.shape.outlet().id(),
72 );
73 if let Some(plan) = linear_plan {
74 let input = input.into_iter();
75 let mut output = Vec::with_capacity(input.size_hint().0);
76 let mut events = 0usize;
77 let mut async_boundary_crossings = 0usize;
78 for item in input {
79 let out =
80 plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
81 output.push(out);
82 }
83 return Ok(FusedExecutionReport {
84 output,
85 events,
86 async_boundary_crossings,
87 });
88 }
89
90 let ms_plan = try_typed_merge_sequence_plan::<In, Out>(
92 &self.stages,
93 &self.edges,
94 self.shape.inlet().id(),
95 self.shape.outlet().id(),
96 );
97 if let Some(mut plan) = ms_plan {
98 let output = run_typed_merge_sequence(&mut plan, input)?;
99 return Ok(FusedExecutionReport {
100 output,
101 events: 0,
102 async_boundary_crossings: 0,
103 });
104 }
105
106 let inlet_id = self.shape.inlet().id();
120 let outlet_id = self.shape.outlet().id();
121 if let Some(runner) = try_build_typed_merge_latest_dispatch::<In, Out>(
122 &self.stages,
123 &self.edges,
124 inlet_id,
125 outlet_id,
126 ) {
127 let mut input_iter = input.into_iter();
128 let output = runner(&mut input_iter)?;
129 return Ok(FusedExecutionReport {
130 output,
131 events: 0,
132 async_boundary_crossings: 0,
133 });
134 }
135
136 if mode == ExecutorMode::TypedOnly {
137 return Err(StreamError::GraphValidation(
138 "typed executor does not support this graph shape".into(),
139 ));
140 }
141 }
142
143 let input = input.into_iter();
145 let mut executor = FusedExecutor::new(self, config);
146 let inlet = self.shape.inlet().id();
147 let outlet = self.shape.outlet().id();
148 let mut output = Vec::with_capacity(input.size_hint().0);
149
150 {
151 let mut output_sink = VecOutputSink {
152 output: &mut output,
153 };
154 executor.request(outlet, outlet, &mut output_sink)?;
155 for item in input {
156 executor.deliver(inlet, datum(item), outlet, &mut output_sink)?;
157 }
158 executor.complete(inlet, outlet, &mut output_sink)?;
159 }
160
161 Ok(FusedExecutionReport {
162 output,
163 events: executor.events,
164 async_boundary_crossings: executor.async_boundary_crossings,
165 })
166 }
167
168 pub fn run_count_with_input<I>(&self, input: I) -> StreamResult<usize>
169 where
170 I: IntoIterator<Item = In>,
171 {
172 Ok(self
173 .run_count_with_input_report(input, FusedExecutionConfig::default())?
174 .result)
175 }
176
177 pub fn run_count_with_input_report<I>(
178 &self,
179 input: I,
180 config: FusedExecutionConfig,
181 ) -> StreamResult<FusedTerminalReport<usize>>
182 where
183 I: IntoIterator<Item = In>,
184 {
185 self.run_count_with_input_report_mode(input, config, ExecutorMode::Auto)
186 }
187
188 pub(crate) fn run_count_with_input_report_mode<I>(
189 &self,
190 input: I,
191 config: FusedExecutionConfig,
192 mode: ExecutorMode,
193 ) -> StreamResult<FusedTerminalReport<usize>>
194 where
195 I: IntoIterator<Item = In>,
196 {
197 if mode != ExecutorMode::ErasedOnly {
199 let plan = try_typed_flow_plan::<In, Out>(
200 &self.stages,
201 &self.edges,
202 self.shape.inlet().id(),
203 self.shape.outlet().id(),
204 );
205 if let Some(plan) = plan {
206 let mut count = 0usize;
207 let mut events = 0usize;
208 let mut async_boundary_crossings = 0usize;
209 for item in input {
210 plan.run_item_count(item, config, &mut events, &mut async_boundary_crossings)?;
211 count += 1;
212 }
213 return Ok(FusedTerminalReport {
214 result: count,
215 events,
216 async_boundary_crossings,
217 });
218 } else if mode == ExecutorMode::TypedOnly {
219 return Err(StreamError::GraphValidation(
220 "typed executor does not support this graph shape".into(),
221 ));
222 }
223 }
224
225 let mut executor = FusedExecutor::new(self, config);
227 let inlet = self.shape.inlet().id();
228 let outlet = self.shape.outlet().id();
229 let mut output_sink = CountOutputSink { count: 0 };
230
231 executor.request::<Out>(outlet, outlet, &mut output_sink)?;
232 for item in input {
233 executor.deliver::<Out>(inlet, datum(item), outlet, &mut output_sink)?;
234 }
235 executor.complete::<Out>(inlet, outlet, &mut output_sink)?;
236
237 Ok(FusedTerminalReport {
238 result: output_sink.count,
239 events: executor.events,
240 async_boundary_crossings: executor.async_boundary_crossings,
241 })
242 }
243
244 pub fn run_fold_with_input<I, Acc, F>(&self, input: I, zero: Acc, fold: F) -> StreamResult<Acc>
245 where
246 I: IntoIterator<Item = In>,
247 F: FnMut(Acc, Out) -> Acc,
248 {
249 Ok(self
250 .run_fold_with_input_report(input, zero, fold, FusedExecutionConfig::default())?
251 .result)
252 }
253
254 pub fn run_fold_with_input_report<I, Acc, F>(
255 &self,
256 input: I,
257 zero: Acc,
258 fold: F,
259 config: FusedExecutionConfig,
260 ) -> StreamResult<FusedTerminalReport<Acc>>
261 where
262 I: IntoIterator<Item = In>,
263 F: FnMut(Acc, Out) -> Acc,
264 {
265 self.run_fold_with_input_report_mode(input, zero, fold, config, ExecutorMode::Auto)
266 }
267
268 pub(crate) fn run_fold_with_input_report_mode<I, Acc, F>(
269 &self,
270 input: I,
271 zero: Acc,
272 mut fold: F,
273 config: FusedExecutionConfig,
274 mode: ExecutorMode,
275 ) -> StreamResult<FusedTerminalReport<Acc>>
276 where
277 I: IntoIterator<Item = In>,
278 F: FnMut(Acc, Out) -> Acc,
279 {
280 if mode != ExecutorMode::ErasedOnly {
282 let plan = try_typed_flow_plan::<In, Out>(
283 &self.stages,
284 &self.edges,
285 self.shape.inlet().id(),
286 self.shape.outlet().id(),
287 );
288 if let Some(plan) = plan {
289 let mut accumulator = zero;
290 let mut events = 0usize;
291 let mut async_boundary_crossings = 0usize;
292 for item in input {
293 let out =
294 plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
295 accumulator = fold(accumulator, out);
296 }
297 return Ok(FusedTerminalReport {
298 result: accumulator,
299 events,
300 async_boundary_crossings,
301 });
302 } else if mode == ExecutorMode::TypedOnly {
303 return Err(StreamError::GraphValidation(
304 "typed executor does not support this graph shape".into(),
305 ));
306 }
307 }
308
309 let mut executor = FusedExecutor::new(self, config);
311 let inlet = self.shape.inlet().id();
312 let outlet = self.shape.outlet().id();
313 let mut output_sink = FoldOutputSink {
314 accumulator: Some(zero),
315 fold,
316 };
317
318 executor.request(outlet, outlet, &mut output_sink)?;
319 for item in input {
320 executor.deliver(inlet, datum(item), outlet, &mut output_sink)?;
321 }
322 executor.complete(inlet, outlet, &mut output_sink)?;
323
324 Ok(FusedTerminalReport {
325 result: output_sink.finish(),
326 events: executor.events,
327 async_boundary_crossings: executor.async_boundary_crossings,
328 })
329 }
330
331 #[cfg_attr(not(test), allow(dead_code))]
341 pub(crate) fn run_with_input_mode<I>(
342 &self,
343 input: I,
344 mode: ExecutorMode,
345 ) -> StreamResult<Vec<Out>>
346 where
347 I: IntoIterator<Item = In>,
348 {
349 Ok(self
350 .run_with_input_report_mode(input, FusedExecutionConfig::default(), mode)?
351 .output)
352 }
353
354 #[allow(dead_code)]
358 pub(crate) fn run_count_with_input_mode<I>(
359 &self,
360 input: I,
361 mode: ExecutorMode,
362 ) -> StreamResult<usize>
363 where
364 I: IntoIterator<Item = In>,
365 {
366 Ok(self
367 .run_count_with_input_report_mode(input, FusedExecutionConfig::default(), mode)?
368 .result)
369 }
370
371 #[allow(dead_code)]
375 pub(crate) fn run_fold_with_input_mode<I, Acc, F>(
376 &self,
377 input: I,
378 zero: Acc,
379 fold: F,
380 mode: ExecutorMode,
381 ) -> StreamResult<Acc>
382 where
383 I: IntoIterator<Item = In>,
384 F: FnMut(Acc, Out) -> Acc,
385 {
386 Ok(self
387 .run_fold_with_input_report_mode(
388 input,
389 zero,
390 fold,
391 FusedExecutionConfig::default(),
392 mode,
393 )?
394 .result)
395 }
396}
397
398impl<T> GraphBlueprint<FlowShape<T, T>>
399where
400 T: Send + 'static,
401{
402 pub fn run_typed_linear_with_input<I>(&self, input: I) -> StreamResult<Vec<T>>
403 where
404 I: IntoIterator<Item = T>,
405 {
406 Ok(self
407 .run_typed_linear_with_input_report(input, FusedExecutionConfig::default())?
408 .output)
409 }
410
411 pub fn run_typed_linear_with_input_report<I>(
412 &self,
413 input: I,
414 config: FusedExecutionConfig,
415 ) -> StreamResult<FusedExecutionReport<T>>
416 where
417 I: IntoIterator<Item = T>,
418 {
419 let input = input.into_iter();
420 let plan = self.typed_linear_plan()?;
421 let mut output = Vec::with_capacity(input.size_hint().0);
422 let mut events = 0;
423 let mut async_boundary_crossings = 0;
424
425 for item in input {
426 let item = plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
427 output.push(item);
428 }
429
430 Ok(FusedExecutionReport {
431 output,
432 events,
433 async_boundary_crossings,
434 })
435 }
436
437 pub fn run_typed_linear_count_with_input<I>(&self, input: I) -> StreamResult<usize>
438 where
439 I: IntoIterator<Item = T>,
440 {
441 Ok(self
442 .run_typed_linear_count_with_input_report(input, FusedExecutionConfig::default())?
443 .result)
444 }
445
446 pub fn run_typed_linear_count_with_input_report<I>(
447 &self,
448 input: I,
449 config: FusedExecutionConfig,
450 ) -> StreamResult<FusedTerminalReport<usize>>
451 where
452 I: IntoIterator<Item = T>,
453 {
454 let plan = self.typed_linear_plan()?;
455 let mut count = 0;
456 let mut events = 0;
457 let mut async_boundary_crossings = 0;
458
459 for item in input {
460 let _ = plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
461 count += 1;
462 }
463
464 Ok(FusedTerminalReport {
465 result: count,
466 events,
467 async_boundary_crossings,
468 })
469 }
470
471 pub fn run_typed_linear_fold_with_input<I, Acc, F>(
472 &self,
473 input: I,
474 zero: Acc,
475 fold: F,
476 ) -> StreamResult<Acc>
477 where
478 I: IntoIterator<Item = T>,
479 F: FnMut(Acc, T) -> Acc,
480 {
481 Ok(self
482 .run_typed_linear_fold_with_input_report(
483 input,
484 zero,
485 fold,
486 FusedExecutionConfig::default(),
487 )?
488 .result)
489 }
490
491 pub fn run_typed_linear_fold_with_input_report<I, Acc, F>(
492 &self,
493 input: I,
494 zero: Acc,
495 mut fold: F,
496 config: FusedExecutionConfig,
497 ) -> StreamResult<FusedTerminalReport<Acc>>
498 where
499 I: IntoIterator<Item = T>,
500 F: FnMut(Acc, T) -> Acc,
501 {
502 let plan = self.typed_linear_plan()?;
503 let mut accumulator = zero;
504 let mut events = 0;
505 let mut async_boundary_crossings = 0;
506
507 for item in input {
508 let item = plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
509 accumulator = fold(accumulator, item);
510 }
511
512 Ok(FusedTerminalReport {
513 result: accumulator,
514 events,
515 async_boundary_crossings,
516 })
517 }
518
519 pub fn run_async_boundary_count_with_input_report<I>(
526 &self,
527 input: I,
528 config: AsyncBoundaryExecutionConfig,
529 ) -> StreamResult<FusedTerminalReport<usize>>
530 where
531 I: IntoIterator<Item = T> + Send,
532 I::IntoIter: Send + 'static,
533 {
534 let segments = self.typed_linear_async_segments()?;
535 BoundaryCountExecutor::Ractor.run_count(input, segments, config)
536 }
537
538 fn typed_linear_plan(&self) -> StreamResult<TypedLinearPlan<T>> {
539 let graph_inlet = self.shape.inlet().id();
540 let graph_outlet = self.shape.outlet().id();
541 let type_id = TypeId::of::<T>();
542 let mut current_inlet = graph_inlet;
543 let mut seen = HashSet::new();
544 let mut steps = Vec::new();
545
546 loop {
547 let stage_index = self
548 .stages
549 .iter()
550 .position(|stage| {
551 stage
552 .spec
553 .inlets
554 .iter()
555 .any(|inlet| inlet.id() == current_inlet)
556 })
557 .ok_or_else(|| {
558 StreamError::GraphValidation(format!(
559 "typed linear fast path could not find inlet {}",
560 current_inlet.as_usize()
561 ))
562 })?;
563 if !seen.insert(stage_index) {
564 return Err(StreamError::GraphValidation(
565 "typed linear fast path does not support cycles".into(),
566 ));
567 }
568
569 let stage = &self.stages[stage_index];
570 if stage.spec.inlets.len() != 1 || stage.spec.outlets.len() != 1 {
571 return Err(StreamError::GraphValidation(format!(
572 "typed linear fast path requires single-inlet single-outlet stages; {} has {} inlet(s) and {} outlet(s)",
573 stage.spec.name(),
574 stage.spec.inlets.len(),
575 stage.spec.outlets.len()
576 )));
577 }
578 let inlet = &stage.spec.inlets[0];
579 let outlet = &stage.spec.outlets[0];
580 if inlet.type_id() != type_id || outlet.type_id() != type_id {
581 return Err(StreamError::GraphValidation(format!(
582 "typed linear fast path requires every port to use {}",
583 type_name::<T>()
584 )));
585 }
586
587 let step = match &stage.spec.kind {
588 StageKind::Identity | StageKind::Opaque => TypedLinearStep::Pass,
589 StageKind::AsyncBoundary => TypedLinearStep::AsyncBoundary,
590 StageKind::Map(map) => {
591 let mapper = map
592 .typed
593 .as_ref()
594 .downcast_ref::<Arc<dyn Fn(T) -> T + Send + Sync>>()
595 .ok_or_else(|| {
596 StreamError::GraphValidation(format!(
597 "typed linear fast path could not downcast map stage {}",
598 stage.spec.name()
599 ))
600 })?;
601 TypedLinearStep::Map(Arc::clone(mapper))
602 }
603 _ => {
604 return Err(StreamError::GraphValidation(format!(
605 "typed linear fast path does not support {}",
606 stage.spec.name()
607 )));
608 }
609 };
610 steps.push(step);
611
612 if outlet.id() == graph_outlet {
613 break;
614 }
615 current_inlet = self
616 .edges
617 .iter()
618 .find_map(|edge| (edge.outlet == outlet.id()).then_some(edge.inlet))
619 .ok_or_else(|| {
620 StreamError::GraphValidation(format!(
621 "typed linear fast path could not follow outlet {}",
622 outlet.id().as_usize()
623 ))
624 })?;
625 }
626
627 if seen.len() != self.stages.len() {
628 return Err(StreamError::GraphValidation(
629 "typed linear fast path requires all stages to be on the result path".into(),
630 ));
631 }
632
633 Ok(TypedLinearPlan { steps })
634 }
635
636 pub(super) fn typed_linear_async_segments(&self) -> StreamResult<TypedLinearSegments<T>> {
637 let plan = self.typed_linear_plan()?;
638 let mut segments = Vec::new();
639 let mut current = Vec::new();
640
641 for step in plan.steps {
642 match step {
643 TypedLinearStep::AsyncBoundary => {
644 segments.push(current);
645 current = Vec::new();
646 }
647 step => current.push(step),
648 }
649 }
650 segments.push(current);
651
652 if segments.len() == 1 {
653 return Err(StreamError::GraphValidation(
654 "async boundary execution requires at least one AsyncBoundary stage".into(),
655 ));
656 }
657
658 Ok(TypedLinearSegments { segments })
659 }
660}
661
662pub(super) struct TypedLinearPlan<T> {
663 steps: Vec<TypedLinearStep<T>>,
664}
665
666pub(super) struct TypedLinearSegments<T> {
667 segments: Vec<Vec<TypedLinearStep<T>>>,
668}
669
670pub(super) enum TypedLinearStep<T> {
671 Pass,
672 Map(Arc<dyn Fn(T) -> T + Send + Sync>),
673 AsyncBoundary,
674}
675
676impl<T> Clone for TypedLinearStep<T> {
677 fn clone(&self) -> Self {
678 match self {
679 Self::Pass => Self::Pass,
680 Self::Map(mapper) => Self::Map(Arc::clone(mapper)),
681 Self::AsyncBoundary => Self::AsyncBoundary,
682 }
683 }
684}
685
686impl<T> TypedLinearPlan<T> {
687 fn run_item(
688 &self,
689 mut item: T,
690 config: FusedExecutionConfig,
691 events: &mut usize,
692 async_boundary_crossings: &mut usize,
693 ) -> StreamResult<T>
694 where
695 T: Send + 'static,
696 {
697 for step in &self.steps {
698 bump_fused_event(events, config)?;
699 match step {
700 TypedLinearStep::Pass => {}
701 TypedLinearStep::Map(mapper) => {
702 item = mapper(item);
703 }
704 TypedLinearStep::AsyncBoundary => {
705 *async_boundary_crossings += 1;
706 }
707 }
708 bump_fused_event(events, config)?;
709 }
710 Ok(item)
711 }
712}
713
714#[allow(dead_code)]
725pub(crate) struct TypedSlot<T>(Option<T>);
726
727#[allow(dead_code)]
728impl<T> TypedSlot<T> {
729 pub(crate) fn empty() -> Self {
730 Self(None)
731 }
732
733 pub(crate) fn put(&mut self, value: T) {
734 self.0 = Some(value);
735 }
736
737 pub(crate) fn take(&mut self) -> Option<T> {
738 self.0.take()
739 }
740
741 pub(crate) fn is_some(&self) -> bool {
742 self.0.is_some()
743 }
744}
745
746#[allow(dead_code)]
752pub(crate) struct TypedPortRegistry {
753 slots: HashMap<PortId, Box<dyn Any + Send>>,
754}
755
756#[allow(dead_code)]
757impl TypedPortRegistry {
758 pub(crate) fn new() -> Self {
759 Self {
760 slots: HashMap::new(),
761 }
762 }
763
764 pub(crate) fn register<T: Any + Send>(&mut self, port_id: PortId) {
767 let prev = self
768 .slots
769 .insert(port_id, Box::new(TypedSlot::<T>::empty()));
770 assert!(prev.is_none(), "port {port_id:?} registered twice");
771 }
772
773 pub(crate) fn get_mut<T: Any + Send>(&mut self, port_id: PortId) -> Option<&mut TypedSlot<T>> {
778 self.slots.get_mut(&port_id)?.downcast_mut::<TypedSlot<T>>()
779 }
780}
781
782#[allow(dead_code)]
790pub(crate) trait TypedKernel<In, Out>: Send + Sync {
791 fn run(&self, input: In) -> Out;
792}
793
794#[allow(dead_code)]
802pub(crate) trait TypedStageFactory<In, Out>: Send + Sync {
803 fn try_build(&self, spec: &StageSpec) -> Option<Box<dyn TypedKernel<In, Out>>>;
804}
805
806enum TypedMiddleStep<T: 'static> {
816 Pass,
818 Map(Arc<dyn Fn(T) -> T + Send + Sync>),
820 AsyncBoundary,
822}
823
824enum TypedLastStep<In: 'static, Out: 'static> {
837 Map(Arc<dyn Fn(In) -> Out + Send + Sync>),
839 Identity(Arc<dyn Fn(In) -> Out + Send + Sync>),
843}
844
845pub(crate) struct TypedFlowPlan<In: 'static, Out: 'static> {
856 middle_steps: Vec<TypedMiddleStep<In>>,
858 last_step: TypedLastStep<In, Out>,
860 #[allow(dead_code)]
863 stage_count: usize,
864}
865
866impl<In: Send + 'static, Out: Send + 'static> TypedFlowPlan<In, Out> {
867 pub(crate) fn run_item(
872 &self,
873 item: In,
874 config: FusedExecutionConfig,
875 events: &mut usize,
876 async_boundary_crossings: &mut usize,
877 ) -> StreamResult<Out> {
878 let mut val = item;
879 for step in &self.middle_steps {
880 bump_fused_event(events, config)?;
881 val = match step {
882 TypedMiddleStep::Pass => val,
883 TypedMiddleStep::Map(f) => f(val),
884 TypedMiddleStep::AsyncBoundary => {
885 *async_boundary_crossings += 1;
886 val
887 }
888 };
889 bump_fused_event(events, config)?;
890 }
891 bump_fused_event(events, config)?;
893 let out = match &self.last_step {
894 TypedLastStep::Map(f) => f(val),
895 TypedLastStep::Identity(f) => f(val),
896 };
897 bump_fused_event(events, config)?;
898 Ok(out)
899 }
900
901 pub(crate) fn run_item_count(
910 &self,
911 item: In,
912 config: FusedExecutionConfig,
913 events: &mut usize,
914 async_boundary_crossings: &mut usize,
915 ) -> StreamResult<()> {
916 let mut val = item;
917 for step in &self.middle_steps {
918 bump_fused_event(events, config)?;
919 val = match step {
920 TypedMiddleStep::Pass => val,
921 TypedMiddleStep::Map(f) => f(val),
922 TypedMiddleStep::AsyncBoundary => {
923 *async_boundary_crossings += 1;
924 val
925 }
926 };
927 bump_fused_event(events, config)?;
928 }
929 bump_fused_event(events, config)?;
931 match &self.last_step {
932 TypedLastStep::Map(f) => {
933 let _ = f(val);
934 }
935 TypedLastStep::Identity(_) => {
936 drop(val);
938 }
939 }
940 bump_fused_event(events, config)?;
941 Ok(())
942 }
943}
944
945pub(crate) fn try_typed_flow_plan<In, Out>(
959 stages: &[super::builder::StageRecord],
960 edges: &[super::builder::Edge],
961 graph_inlet: PortId,
962 graph_outlet: PortId,
963) -> Option<TypedFlowPlan<In, Out>>
964where
965 In: Clone + Send + 'static,
966 Out: Send + 'static,
967{
968 let in_type_id = TypeId::of::<In>();
969 let out_type_id = TypeId::of::<Out>();
970
971 let mut current_inlet = graph_inlet;
972 let mut seen = HashSet::new();
973 let mut stage_infos: Vec<(&StageKind, TypeId)> = Vec::new();
975
976 loop {
977 let stage_index = stages.iter().position(|s| {
978 s.spec
979 .inlets
980 .iter()
981 .any(|inlet| inlet.id() == current_inlet)
982 })?;
983 if !seen.insert(stage_index) {
984 return None;
986 }
987 let stage = &stages[stage_index];
988 if stage.spec.inlets.len() != 1 || stage.spec.outlets.len() != 1 {
989 return None;
991 }
992 let inlet = &stage.spec.inlets[0];
993 let outlet = &stage.spec.outlets[0];
994
995 if inlet.type_id() != in_type_id {
997 return None;
998 }
999
1000 stage_infos.push((&stage.spec.kind, outlet.type_id()));
1001
1002 if outlet.id() == graph_outlet {
1003 break;
1004 }
1005 current_inlet = edges
1007 .iter()
1008 .find_map(|e| (e.outlet == outlet.id()).then_some(e.inlet))?;
1009 }
1010
1011 if seen.len() != stages.len() {
1012 return None;
1014 }
1015
1016 let (last_kind, last_outlet_type) = stage_infos.last()?;
1019 if *last_outlet_type != out_type_id {
1021 return None;
1022 }
1023
1024 let total = stage_infos.len();
1025 let mut middle_steps: Vec<TypedMiddleStep<In>> = Vec::with_capacity(total.saturating_sub(1));
1026
1027 for (kind, outlet_type) in &stage_infos[..total.saturating_sub(1)] {
1028 if *outlet_type != in_type_id {
1030 return None;
1031 }
1032 let step = match kind {
1033 StageKind::Identity => TypedMiddleStep::Pass,
1034 StageKind::Opaque => return None,
1038 StageKind::AsyncBoundary => TypedMiddleStep::AsyncBoundary,
1039 StageKind::Map(map) => {
1040 let f = map
1042 .typed
1043 .downcast_ref::<Arc<dyn Fn(In) -> In + Send + Sync>>()?;
1044 TypedMiddleStep::Map(Arc::clone(f))
1045 }
1046 _ => return None,
1047 };
1048 middle_steps.push(step);
1049 }
1050
1051 let last_step: TypedLastStep<In, Out> = match last_kind {
1053 StageKind::Identity => {
1054 if in_type_id != out_type_id {
1056 return None;
1057 }
1058 TypedLastStep::Identity(Arc::new(|x: In| -> Out {
1061 let boxed: Box<dyn Any + Send> = Box::new(x);
1062 *boxed
1063 .downcast::<Out>()
1064 .expect("TypeId equality verified at plan time")
1065 }))
1066 }
1067 StageKind::Opaque => return None,
1069 StageKind::AsyncBoundary => {
1070 if in_type_id != out_type_id {
1073 return None;
1074 }
1075 TypedLastStep::Identity(Arc::new(|x: In| -> Out {
1076 let boxed: Box<dyn Any + Send> = Box::new(x);
1077 *boxed
1078 .downcast::<Out>()
1079 .expect("TypeId equality verified at plan time")
1080 }))
1081 }
1082 StageKind::Map(map) => {
1083 let f = map
1085 .typed
1086 .downcast_ref::<Arc<dyn Fn(In) -> Out + Send + Sync>>()?;
1087 TypedLastStep::Map(Arc::clone(f))
1088 }
1089 _ => return None,
1090 };
1091
1092 Some(TypedFlowPlan {
1093 middle_steps,
1094 last_step,
1095 stage_count: total,
1096 })
1097}
1098
1099pub(crate) struct MergeSequenceCore<T> {
1114 next_sequence: u64,
1115 pending: Vec<(u64, T)>,
1117 output_buffer: VecDeque<T>,
1119 completed_count: usize,
1121 input_count: usize,
1123 completed: bool,
1125}
1126
1127impl<T> MergeSequenceCore<T> {
1128 pub(crate) fn new(input_count: usize) -> Self {
1129 Self {
1130 next_sequence: 0,
1131 pending: Vec::new(),
1132 output_buffer: VecDeque::new(),
1133 completed_count: 0,
1134 input_count,
1135 completed: false,
1136 }
1137 }
1138
1139 fn reset(&mut self) {
1141 self.next_sequence = 0;
1142 self.pending.clear();
1143 self.output_buffer.clear();
1144 self.completed_count = 0;
1145 self.completed = false;
1146 }
1147
1148 fn push_item(&mut self, seq: u64, val: T) -> StreamResult<()> {
1154 if seq == self.next_sequence {
1155 self.output_buffer.push_back(val);
1156 self.next_sequence += 1;
1157 while let Some(index) = self
1159 .pending
1160 .iter()
1161 .position(|(s, _)| *s == self.next_sequence)
1162 {
1163 let (_, item) = self.pending.remove(index);
1164 self.output_buffer.push_back(item);
1165 self.next_sequence += 1;
1166 }
1167 } else {
1168 if self.pending.iter().any(|(s, _)| *s == seq) {
1169 return Err(StreamError::Failed(format!(
1170 "duplicate sequence {seq} on merge sequence"
1171 )));
1172 }
1173 self.pending.push((seq, val));
1174 self.pending.sort_by_key(|(s, _)| *s);
1175 while let Some(index) = self
1177 .pending
1178 .iter()
1179 .position(|(s, _)| *s == self.next_sequence)
1180 {
1181 let (_, item) = self.pending.remove(index);
1182 self.output_buffer.push_back(item);
1183 self.next_sequence += 1;
1184 }
1185 }
1186 Ok(())
1187 }
1188
1189 fn on_inlet_complete(&mut self) -> StreamResult<bool> {
1197 self.completed_count += 1;
1198 if self.completed_count >= self.input_count && self.output_buffer.is_empty() {
1199 if !self.pending.is_empty() {
1200 return Err(StreamError::Failed(format!(
1201 "expected sequence {}, but all input ports have pushed or are complete",
1202 self.next_sequence,
1203 )));
1204 }
1205 self.completed = true;
1206 Ok(true)
1207 } else {
1208 Ok(false)
1209 }
1210 }
1211
1212 fn drain_into(&mut self, out: &mut Vec<T>) {
1214 out.extend(self.output_buffer.drain(..));
1215 }
1216}
1217
1218pub(crate) struct TypedMergeSequencePlan<In, T> {
1232 splits: Vec<Arc<dyn Fn(In) -> T + Send + Sync>>,
1235 extract_sequence: Arc<dyn Fn(&T) -> u64 + Send + Sync>,
1237 core: MergeSequenceCore<T>,
1239}
1240
1241impl<In: Clone + Send + 'static, T: Send + 'static> TypedMergeSequencePlan<In, T> {
1242 fn push_item(&mut self, item: In, out: &mut Vec<T>) -> StreamResult<()> {
1245 for split_fn in &self.splits {
1246 let val = split_fn(item.clone());
1247 let seq = (self.extract_sequence)(&val);
1248 self.core.push_item(seq, val)?;
1249 }
1250 self.core.drain_into(out);
1251 Ok(())
1252 }
1253
1254 fn finish(&mut self, out: &mut Vec<T>) -> StreamResult<()> {
1258 for _ in 0..self.splits.len() {
1259 self.core.on_inlet_complete()?;
1260 }
1261 self.core.drain_into(out);
1262 Ok(())
1263 }
1264
1265 fn reset(&mut self) {
1267 self.core.reset();
1268 }
1269}
1270
1271pub(crate) fn try_typed_merge_sequence_plan<In, Out>(
1283 stages: &[super::builder::StageRecord],
1284 edges: &[super::builder::Edge],
1285 graph_inlet: PortId,
1286 graph_outlet: PortId,
1287) -> Option<TypedMergeSequencePlan<In, Out>>
1288where
1289 In: Clone + Send + 'static,
1290 Out: Send + 'static,
1291{
1292 if stages.len() != 2 {
1294 return None;
1295 }
1296
1297 let in_type_id = TypeId::of::<In>();
1298 let out_type_id = TypeId::of::<Out>();
1299
1300 let unzip_idx = stages
1302 .iter()
1303 .position(|s| s.spec.inlets.len() == 1 && s.spec.inlets[0].id() == graph_inlet)?;
1304 let unzip_stage = &stages[unzip_idx];
1305
1306 let typed_split_any = match &unzip_stage.spec.kind {
1308 StageKind::Unzip { typed_split, .. } => Arc::clone(typed_split),
1309 _ => return None,
1310 };
1311
1312 if unzip_stage.spec.inlets[0].type_id() != in_type_id {
1314 return None;
1315 }
1316
1317 let k = unzip_stage.spec.outlets.len();
1319 if k == 0 {
1320 return None;
1321 }
1322 for outlet in &unzip_stage.spec.outlets {
1323 if outlet.type_id() != out_type_id {
1324 return None;
1325 }
1326 }
1327
1328 let ms_idx = 1 - unzip_idx;
1330 let ms_stage = &stages[ms_idx];
1331
1332 let (ms_input_count, typed_extract_any) = match &ms_stage.spec.kind {
1334 StageKind::MergeSequence {
1335 input_count,
1336 typed_extract,
1337 ..
1338 } => (*input_count, Arc::clone(typed_extract)),
1339 _ => return None,
1340 };
1341
1342 if ms_stage.spec.inlets.len() != k || ms_stage.spec.outlets.len() != 1 {
1343 return None;
1344 }
1345 if ms_input_count != k {
1346 return None;
1347 }
1348 for inlet in &ms_stage.spec.inlets {
1350 if inlet.type_id() != out_type_id {
1351 return None;
1352 }
1353 }
1354 if ms_stage.spec.outlets[0].type_id() != out_type_id {
1355 return None;
1356 }
1357
1358 if ms_stage.spec.outlets[0].id() != graph_outlet {
1360 return None;
1361 }
1362
1363 let unzip_outlet_ids: Vec<PortId> =
1367 unzip_stage.spec.outlets.iter().map(AnyOutlet::id).collect();
1368 let ms_inlet_ids: Vec<PortId> = ms_stage.spec.inlets.iter().map(AnyInlet::id).collect();
1369
1370 let mut outlet_to_ms_inlet: Vec<Option<usize>> = vec![None; k];
1372 for edge in edges {
1373 if let Some(uo_idx) = unzip_outlet_ids.iter().position(|&id| id == edge.outlet) {
1374 if let Some(mi_idx) = ms_inlet_ids.iter().position(|&id| id == edge.inlet) {
1375 outlet_to_ms_inlet[uo_idx] = Some(mi_idx);
1376 } else {
1377 return None; }
1379 }
1380 }
1381 if outlet_to_ms_inlet.iter().any(|x| x.is_none()) {
1382 return None; }
1384
1385 if k != 2 {
1392 return None;
1394 }
1395
1396 let typed_split =
1398 typed_split_any.downcast_ref::<Arc<dyn Fn(In) -> (Out, Out) + Send + Sync>>()?;
1399 let typed_split = Arc::clone(typed_split);
1400
1401 let typed_extract =
1403 typed_extract_any.downcast_ref::<Arc<dyn Fn(&Out) -> u64 + Send + Sync>>()?;
1404 let typed_extract = Arc::clone(typed_extract);
1405
1406 #[allow(clippy::type_complexity)]
1412 let mut splits: Vec<Option<Arc<dyn Fn(In) -> Out + Send + Sync>>> = vec![None; k];
1413
1414 let split0 = Arc::clone(&typed_split);
1415 let split1 = Arc::clone(&typed_split);
1416
1417 let ms_idx_for_out0 = outlet_to_ms_inlet[0].unwrap();
1418 let ms_idx_for_out1 = outlet_to_ms_inlet[1].unwrap();
1419
1420 splits[ms_idx_for_out0] = Some(Arc::new(move |input: In| split0(input).0));
1421 splits[ms_idx_for_out1] = Some(Arc::new(move |input: In| split1(input).1));
1422
1423 if splits.iter().any(|s| s.is_none()) {
1425 return None;
1426 }
1427 let splits: Vec<Arc<dyn Fn(In) -> Out + Send + Sync>> =
1428 splits.into_iter().map(|s| s.unwrap()).collect();
1429
1430 Some(TypedMergeSequencePlan {
1431 splits,
1432 extract_sequence: typed_extract,
1433 core: MergeSequenceCore::new(k),
1434 })
1435}
1436
1437pub(crate) fn run_typed_merge_sequence<In, T, I>(
1441 plan: &mut TypedMergeSequencePlan<In, T>,
1442 input: I,
1443) -> StreamResult<Vec<T>>
1444where
1445 In: Clone + Send + 'static,
1446 T: Send + 'static,
1447 I: IntoIterator<Item = In>,
1448{
1449 plan.reset();
1450 let input = input.into_iter();
1452 let hint = input.size_hint().0;
1453 let mut output: Vec<T> = Vec::with_capacity(hint * plan.splits.len());
1454 for item in input {
1455 plan.push_item(item, &mut output)?;
1456 }
1457 plan.finish(&mut output)?;
1458 Ok(output)
1459}
1460
1461pub(crate) struct MergeLatestCore<T> {
1474 latest: Vec<Option<T>>,
1476 seen_count: usize,
1478 completed_count: usize,
1480 input_count: usize,
1482 pending: VecDeque<Vec<T>>,
1484 completed: bool,
1486 eager_complete: bool,
1488}
1489
1490impl<T: Clone> MergeLatestCore<T> {
1491 pub(crate) fn new(input_count: usize, eager_complete: bool) -> Self {
1492 Self {
1493 latest: vec![None; input_count],
1494 seen_count: 0,
1495 completed_count: 0,
1496 input_count,
1497 pending: VecDeque::new(),
1498 completed: false,
1499 eager_complete,
1500 }
1501 }
1502
1503 fn reset(&mut self) {
1505 for slot in &mut self.latest {
1506 *slot = None;
1507 }
1508 self.seen_count = 0;
1509 self.completed_count = 0;
1510 self.pending.clear();
1511 self.completed = false;
1512 }
1513
1514 fn push_item(&mut self, inlet_index: usize, val: T) {
1517 if self.latest[inlet_index].is_none() {
1518 self.seen_count += 1;
1519 }
1520 self.latest[inlet_index] = Some(val);
1521 if self.seen_count >= self.input_count {
1522 let snapshot: Vec<T> = self
1524 .latest
1525 .iter()
1526 .map(|s| s.clone().expect("merge-latest typed: slot seen but None"))
1527 .collect();
1528 self.pending.push_back(snapshot);
1529 }
1530 }
1531
1532 fn on_inlet_complete(&mut self) -> bool {
1537 self.completed_count += 1;
1538 let all_done = self.completed_count >= self.input_count;
1539 let eager_done = self.eager_complete && self.pending.is_empty();
1540 if all_done || eager_done {
1541 self.completed = true;
1542 true
1543 } else {
1544 false
1545 }
1546 }
1547
1548 fn drain_into(&mut self, out: &mut Vec<Vec<T>>) {
1550 out.extend(self.pending.drain(..));
1551 }
1552}
1553
1554pub(crate) struct TypedMergeLatestPlan<In, T> {
1568 splits: Vec<Arc<dyn Fn(In) -> T + Send + Sync>>,
1571 core: MergeLatestCore<T>,
1573}
1574
1575impl<In: Clone + Send + 'static, T: Clone + Send + 'static> TypedMergeLatestPlan<In, T> {
1576 fn push_item(&mut self, item: In, out: &mut Vec<Vec<T>>) {
1578 for (idx, split_fn) in self.splits.iter().enumerate() {
1579 let val = split_fn(item.clone());
1580 self.core.push_item(idx, val);
1581 }
1582 self.core.drain_into(out);
1583 }
1584
1585 fn finish(&mut self) -> bool {
1588 for _ in 0..self.splits.len() {
1590 if self.core.on_inlet_complete() {
1591 return true;
1592 }
1593 }
1594 true
1595 }
1596
1597 fn reset(&mut self) {
1599 self.core.reset();
1600 }
1601}
1602
1603pub(crate) fn try_typed_merge_latest_plan<In, T>(
1619 stages: &[super::builder::StageRecord],
1620 edges: &[super::builder::Edge],
1621 graph_inlet: PortId,
1622 graph_outlet: PortId,
1623) -> Option<TypedMergeLatestPlan<In, T>>
1624where
1625 In: Clone + Send + 'static,
1626 T: Clone + Send + 'static,
1627{
1628 if stages.len() != 2 {
1630 return None;
1631 }
1632
1633 let in_type_id = TypeId::of::<In>();
1634 let elem_type_id = TypeId::of::<T>();
1635 let vec_type_id = TypeId::of::<Vec<T>>();
1636
1637 let unzip_idx = stages
1639 .iter()
1640 .position(|s| s.spec.inlets.len() == 1 && s.spec.inlets[0].id() == graph_inlet)?;
1641 let unzip_stage = &stages[unzip_idx];
1642
1643 let typed_split_any = match &unzip_stage.spec.kind {
1645 StageKind::Unzip { typed_split, .. } => Arc::clone(typed_split),
1646 _ => return None,
1647 };
1648
1649 if unzip_stage.spec.inlets[0].type_id() != in_type_id {
1651 return None;
1652 }
1653
1654 let k = unzip_stage.spec.outlets.len();
1656 if k == 0 {
1657 return None;
1658 }
1659 for outlet in &unzip_stage.spec.outlets {
1660 if outlet.type_id() != elem_type_id {
1661 return None;
1662 }
1663 }
1664
1665 let ml_idx = 1 - unzip_idx;
1667 let ml_stage = &stages[ml_idx];
1668
1669 let (ml_input_count, typed_snapshot_any) = match &ml_stage.spec.kind {
1671 StageKind::MergeLatest {
1672 input_count,
1673 typed_snapshot,
1674 ..
1675 } => (*input_count, Arc::clone(typed_snapshot)),
1676 _ => return None,
1677 };
1678
1679 if ml_stage.spec.inlets.len() != k || ml_stage.spec.outlets.len() != 1 {
1680 return None;
1681 }
1682 if ml_input_count != k {
1683 return None;
1684 }
1685 for inlet in &ml_stage.spec.inlets {
1687 if inlet.type_id() != elem_type_id {
1688 return None;
1689 }
1690 }
1691 if ml_stage.spec.outlets[0].type_id() != vec_type_id {
1693 return None;
1694 }
1695
1696 if ml_stage.spec.outlets[0].id() != graph_outlet {
1698 return None;
1699 }
1700
1701 let unzip_outlet_ids: Vec<PortId> =
1703 unzip_stage.spec.outlets.iter().map(AnyOutlet::id).collect();
1704 let ml_inlet_ids: Vec<PortId> = ml_stage.spec.inlets.iter().map(AnyInlet::id).collect();
1705
1706 let mut outlet_to_ml_inlet: Vec<Option<usize>> = vec![None; k];
1707 for edge in edges {
1708 if let Some(uo_idx) = unzip_outlet_ids.iter().position(|&id| id == edge.outlet) {
1709 if let Some(mi_idx) = ml_inlet_ids.iter().position(|&id| id == edge.inlet) {
1710 outlet_to_ml_inlet[uo_idx] = Some(mi_idx);
1711 } else {
1712 return None; }
1714 }
1715 }
1716 if outlet_to_ml_inlet.iter().any(|x| x.is_none()) {
1717 return None; }
1719
1720 if k != 2 {
1722 return None;
1723 }
1724
1725 type SplitFn<A, B> = Arc<dyn Fn(A) -> (B, B) + Send + Sync>;
1727 let typed_split = typed_split_any.downcast_ref::<SplitFn<In, T>>()?;
1728 let typed_split = Arc::clone(typed_split);
1729
1730 type SnapshotFn<U> = Arc<dyn Fn(&[Option<U>]) -> Vec<U> + Send + Sync>;
1732 typed_snapshot_any.downcast_ref::<SnapshotFn<T>>()?;
1733
1734 #[allow(clippy::type_complexity)]
1736 let mut splits: Vec<Option<Arc<dyn Fn(In) -> T + Send + Sync>>> = vec![None; k];
1737
1738 let split0 = Arc::clone(&typed_split);
1739 let split1 = Arc::clone(&typed_split);
1740
1741 let ml_idx_for_out0 = outlet_to_ml_inlet[0].unwrap();
1742 let ml_idx_for_out1 = outlet_to_ml_inlet[1].unwrap();
1743
1744 splits[ml_idx_for_out0] = Some(Arc::new(move |input: In| split0(input).0));
1745 splits[ml_idx_for_out1] = Some(Arc::new(move |input: In| split1(input).1));
1746
1747 if splits.iter().any(|s| s.is_none()) {
1748 return None;
1749 }
1750 let splits: Vec<Arc<dyn Fn(In) -> T + Send + Sync>> =
1751 splits.into_iter().map(|s| s.unwrap()).collect();
1752
1753 let eager_complete = match &ml_stage.spec.kind {
1755 StageKind::MergeLatest { eager_complete, .. } => *eager_complete,
1756 _ => return None,
1757 };
1758
1759 Some(TypedMergeLatestPlan {
1760 splits,
1761 core: MergeLatestCore::new(k, eager_complete),
1762 })
1763}
1764
1765type MergeLatestRunner<In, Out> =
1776 Box<dyn FnOnce(&mut dyn Iterator<Item = In>) -> StreamResult<Vec<Out>>>;
1777
1778pub(crate) fn try_build_typed_merge_latest_dispatch<In, Out>(
1798 stages: &[super::builder::StageRecord],
1799 edges: &[super::builder::Edge],
1800 inlet: PortId,
1801 outlet: PortId,
1802) -> Option<MergeLatestRunner<In, Out>>
1803where
1804 In: Clone + Send + 'static,
1805 Out: Send + 'static,
1806{
1807 let elem_type_id: TypeId = stages.iter().find_map(|s| {
1809 if let StageKind::MergeLatest { .. } = &s.spec.kind {
1810 s.spec.inlets.first().map(|i| i.type_id())
1811 } else {
1812 None
1813 }
1814 })?;
1815
1816 macro_rules! try_elem {
1826 ($($T:ty),*) => {
1827 $(
1828 if elem_type_id == TypeId::of::<$T>() {
1829 let mut plan = try_typed_merge_latest_plan::<In, $T>(stages, edges, inlet, outlet)?;
1830 let runner: MergeLatestRunner<In, Out> = Box::new(
1833 move |iter: &mut dyn Iterator<Item = In>| {
1834 plan.reset();
1835 let hint = iter.size_hint().0;
1836 let mut output: Vec<Vec<$T>> = Vec::with_capacity(hint);
1837 for item in iter {
1838 plan.push_item(item, &mut output);
1839 }
1840 plan.finish();
1841 let boxed: Box<dyn Any + Send> = Box::new(output);
1845 boxed
1846 .downcast::<Vec<Out>>()
1847 .map(|b| *b)
1848 .map_err(|_| StreamError::Failed(
1849 "merge-latest typed runner: Out type mismatch".into()
1850 ))
1851 }
1852 );
1853 return Some(runner);
1854 }
1855 )*
1856 };
1857 }
1858
1859 try_elem!(
1860 u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize, f32, f64, bool, String
1861 );
1862
1863 None
1865}
1866
1867pub(super) enum BoundaryCountExecutor {
1870 #[cfg(test)]
1871 Threaded,
1872 Ractor,
1873}
1874
1875impl BoundaryCountExecutor {
1876 pub(super) fn run_count<I, T>(
1877 &self,
1878 input: I,
1879 segments: TypedLinearSegments<T>,
1880 config: AsyncBoundaryExecutionConfig,
1881 ) -> StreamResult<FusedTerminalReport<usize>>
1882 where
1883 I: IntoIterator<Item = T> + Send,
1884 I::IntoIter: Send + 'static,
1885 T: Send + 'static,
1886 {
1887 match self {
1888 #[cfg(test)]
1889 Self::Threaded => run_threaded_async_linear_count(input, segments, config),
1890 Self::Ractor => run_ractor_async_linear_count(input, segments, config),
1891 }
1892 }
1893}
1894
1895#[cfg(test)]
1896mod tests {
1897 use super::*;
1898
1899 #[derive(Default)]
1900 struct BufferedFlowState {
1901 queued: VecDeque<i32>,
1902 upstream_closed: bool,
1903 pull_calls: usize,
1904 finish_calls: usize,
1905 }
1906
1907 struct BufferedFlowOnPull {
1908 state: Arc<Mutex<BufferedFlowState>>,
1909 }
1910
1911 impl GraphStage for BufferedFlowOnPull {
1912 type Shape = FlowShape<i32, i32>;
1913
1914 fn name(&self) -> &str {
1915 "BufferedFlowOnPull"
1916 }
1917
1918 fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
1919 let first_id = next_port_id_block(2);
1920 FlowShape::new(
1921 Inlet::with_id(first_id, "buffered-flow.in"),
1922 Outlet::with_id(first_id.offset(1), "buffered-flow.out"),
1923 )
1924 }
1925
1926 fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
1927 StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
1928 }
1929
1930 fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
1931 struct In {
1932 state: Arc<Mutex<BufferedFlowState>>,
1933 }
1934
1935 impl InHandler for In {
1936 fn on_push(
1937 &mut self,
1938 logic: &mut GraphStageLogic,
1939 inlet: AnyInlet,
1940 ) -> StreamResult<()> {
1941 let value: i32 = logic.grab_datum(inlet.id()).and_then(|value| {
1942 downcast_datum(value, "grab", || format!("inlet#{}", inlet.id().as_usize()))
1943 })?;
1944 self.state.lock().unwrap().queued.push_back(value);
1945 Ok(())
1946 }
1947
1948 fn on_upstream_finish(
1949 &mut self,
1950 _logic: &mut GraphStageLogic,
1951 _inlet: AnyInlet,
1952 ) -> StreamResult<()> {
1953 self.state.lock().unwrap().upstream_closed = true;
1954 Ok(())
1955 }
1956 }
1957
1958 struct Out {
1959 outlet: Outlet<i32>,
1960 state: Arc<Mutex<BufferedFlowState>>,
1961 }
1962
1963 impl OutHandler for Out {
1964 fn on_pull(
1965 &mut self,
1966 logic: &mut GraphStageLogic,
1967 _outlet: AnyOutlet,
1968 ) -> StreamResult<()> {
1969 let (next, upstream_closed) = {
1970 let mut state = self.state.lock().unwrap();
1971 state.pull_calls += 1;
1972 (state.queued.pop_front(), state.upstream_closed)
1973 };
1974 if let Some(value) = next {
1975 logic.emit(&self.outlet, value)
1976 } else if upstream_closed {
1977 logic.complete(&self.outlet)
1978 } else {
1979 Ok(())
1980 }
1981 }
1982
1983 fn on_downstream_finish(
1984 &mut self,
1985 logic: &mut GraphStageLogic,
1986 _outlet: AnyOutlet,
1987 ) -> StreamResult<()> {
1988 self.state.lock().unwrap().finish_calls += 1;
1989 logic.complete_stage()
1990 }
1991 }
1992
1993 let mut logic = GraphStageLogic::new(shape);
1994 logic
1995 .set_handler(
1996 &shape.inlet(),
1997 Box::new(In {
1998 state: Arc::clone(&self.state),
1999 }),
2000 )
2001 .unwrap();
2002 logic
2003 .set_out_handler(
2004 &shape.outlet(),
2005 Box::new(Out {
2006 outlet: shape.outlet(),
2007 state: Arc::clone(&self.state),
2008 }),
2009 )
2010 .unwrap();
2011 logic
2012 }
2013 }
2014
2015 struct EmitMultipleThenFailOnPush;
2016
2017 impl GraphStage for EmitMultipleThenFailOnPush {
2018 type Shape = FlowShape<i32, i32>;
2019
2020 fn name(&self) -> &str {
2021 "EmitMultipleThenFailOnPush"
2022 }
2023
2024 fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
2025 let first_id = next_port_id_block(2);
2026 FlowShape::new(
2027 Inlet::with_id(first_id, "emit-fail.in"),
2028 Outlet::with_id(first_id.offset(1), "emit-fail.out"),
2029 )
2030 }
2031
2032 fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
2033 StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
2034 }
2035
2036 fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
2037 struct Handler {
2038 outlet: Outlet<i32>,
2039 }
2040
2041 impl InHandler for Handler {
2042 fn on_push(
2043 &mut self,
2044 logic: &mut GraphStageLogic,
2045 _inlet: AnyInlet,
2046 ) -> StreamResult<()> {
2047 logic.emit_multiple(&self.outlet, [1, 2])?;
2048 Err(StreamError::Failed("emit_multiple boom".into()))
2049 }
2050 }
2051
2052 let mut logic = GraphStageLogic::new(shape);
2053 logic
2054 .set_handler(
2055 &shape.inlet(),
2056 Box::new(Handler {
2057 outlet: shape.outlet(),
2058 }),
2059 )
2060 .unwrap();
2061 logic
2062 }
2063 }
2064
2065 struct ReadNThenFailOnFinish;
2066
2067 struct EmitMultipleOnPush;
2068
2069 impl GraphStage for EmitMultipleOnPush {
2070 type Shape = FlowShape<i32, i32>;
2071
2072 fn name(&self) -> &str {
2073 "EmitMultipleOnPush"
2074 }
2075
2076 fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
2077 let first_id = next_port_id_block(2);
2078 FlowShape::new(
2079 Inlet::with_id(first_id, "emit-multiple.in"),
2080 Outlet::with_id(first_id.offset(1), "emit-multiple.out"),
2081 )
2082 }
2083
2084 fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
2085 StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
2086 }
2087
2088 fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
2089 struct Handler {
2090 outlet: Outlet<i32>,
2091 }
2092
2093 impl InHandler for Handler {
2094 fn on_push(
2095 &mut self,
2096 logic: &mut GraphStageLogic,
2097 _inlet: AnyInlet,
2098 ) -> StreamResult<()> {
2099 logic.emit_multiple(&self.outlet, [1, 2])
2100 }
2101 }
2102
2103 let mut logic = GraphStageLogic::new(shape);
2104 logic
2105 .set_handler(
2106 &shape.inlet(),
2107 Box::new(Handler {
2108 outlet: shape.outlet(),
2109 }),
2110 )
2111 .unwrap();
2112 logic
2113 }
2114 }
2115
2116 impl GraphStage for ReadNThenFailOnFinish {
2117 type Shape = FlowShape<i32, i32>;
2118
2119 fn name(&self) -> &str {
2120 "ReadNThenFailOnFinish"
2121 }
2122
2123 fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
2124 let first_id = next_port_id_block(2);
2125 FlowShape::new(
2126 Inlet::with_id(first_id, "read-n.in"),
2127 Outlet::with_id(first_id.offset(1), "read-n.out"),
2128 )
2129 }
2130
2131 fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
2132 StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
2133 }
2134
2135 fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
2136 struct Handler {
2137 inlet: Inlet<i32>,
2138 armed: bool,
2139 }
2140
2141 impl InHandler for Handler {
2142 fn on_push(
2143 &mut self,
2144 logic: &mut GraphStageLogic,
2145 _inlet: AnyInlet,
2146 ) -> StreamResult<()> {
2147 if !self.armed {
2148 self.armed = true;
2149 logic.read_n(&self.inlet, 2, |_values| {}, |_values| {})
2150 } else {
2151 Ok(())
2152 }
2153 }
2154
2155 fn on_upstream_finish(
2156 &mut self,
2157 _logic: &mut GraphStageLogic,
2158 _inlet: AnyInlet,
2159 ) -> StreamResult<()> {
2160 Err(StreamError::Failed("read_n finish boom".into()))
2161 }
2162 }
2163
2164 let mut logic = GraphStageLogic::new(shape);
2165 logic
2166 .set_handler(
2167 &shape.inlet(),
2168 Box::new(Handler {
2169 inlet: shape.inlet(),
2170 armed: false,
2171 }),
2172 )
2173 .unwrap();
2174 logic
2175 }
2176 }
2177
2178 fn single_opaque_stage_graph<G>(stage: G) -> GraphBlueprint<FlowShape<i32, i32>>
2179 where
2180 G: GraphStage<Shape = FlowShape<i32, i32>>,
2181 {
2182 GraphDsl::create(|builder| builder.add(stage)).unwrap()
2183 }
2184
2185 #[test]
2186 fn process_push_restores_handler_before_emit_multiple_error_propagates() {
2187 let graph = single_opaque_stage_graph(EmitMultipleThenFailOnPush);
2188 let inlet = graph.shape.inlet().id();
2189 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2190
2191 let result = executor.process_stage(0, inlet, datum(10));
2192
2193 assert!(matches!(
2194 result,
2195 Err(StreamError::Failed(message)) if message == "emit_multiple boom"
2196 ));
2197 assert!(
2198 executor.opaque_logics[0]
2199 .as_mut()
2200 .unwrap()
2201 .get_in_handler_mut(inlet)
2202 .is_some()
2203 );
2204 }
2205
2206 #[test]
2207 fn process_completion_restores_handler_before_read_n_finish_error_propagates() {
2208 let graph = single_opaque_stage_graph(ReadNThenFailOnFinish);
2209 let inlet = graph.shape.inlet().id();
2210 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2211
2212 executor.process_stage(0, inlet, datum(1)).unwrap();
2213 executor.process_stage(0, inlet, datum(2)).unwrap();
2214 let result = executor.process_completion(0, inlet);
2215
2216 assert!(matches!(
2217 result,
2218 Err(StreamError::Failed(message)) if message == "read_n finish boom"
2219 ));
2220 assert!(
2221 executor.opaque_logics[0]
2222 .as_mut()
2223 .unwrap()
2224 .get_in_handler_mut(inlet)
2225 .is_some()
2226 );
2227 }
2228
2229 #[test]
2230 fn opaque_request_drives_out_handler_for_buffered_output() {
2231 let state = Arc::new(Mutex::new(BufferedFlowState::default()));
2232 let graph = single_opaque_stage_graph(BufferedFlowOnPull {
2233 state: Arc::clone(&state),
2234 });
2235 let inlet = graph.shape.inlet().id();
2236 let outlet = graph.shape.outlet().id();
2237 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2238 let mut output = Vec::<i32>::new();
2239 let mut output_sink = VecOutputSink {
2240 output: &mut output,
2241 };
2242
2243 executor
2244 .deliver(inlet, datum(7_i32), outlet, &mut output_sink)
2245 .unwrap();
2246 assert!(output_sink.output.is_empty());
2247
2248 executor.request(outlet, outlet, &mut output_sink).unwrap();
2249
2250 assert_eq!(&*output_sink.output, &[7]);
2251 let state = state.lock().unwrap();
2252 assert_eq!(state.pull_calls, 2);
2253 assert_eq!(state.finish_calls, 0);
2254 }
2255
2256 #[test]
2257 fn opaque_downstream_finish_before_first_demand_invokes_out_handler() {
2258 let state = Arc::new(Mutex::new(BufferedFlowState::default()));
2259 let graph = single_opaque_stage_graph(BufferedFlowOnPull {
2260 state: Arc::clone(&state),
2261 });
2262 let outlet = graph.shape.outlet().id();
2263 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2264 let mut output = Vec::<i32>::new();
2265 let mut output_sink = VecOutputSink {
2266 output: &mut output,
2267 };
2268
2269 executor
2270 .downstream_finish(outlet, outlet, &mut output_sink)
2271 .unwrap();
2272 executor.request(outlet, outlet, &mut output_sink).unwrap();
2273
2274 assert!(output_sink.output.is_empty());
2275 let state = state.lock().unwrap();
2276 assert_eq!(state.pull_calls, 0);
2277 assert_eq!(state.finish_calls, 1);
2278 }
2279
2280 #[test]
2281 fn opaque_downstream_finish_drops_buffered_output_after_upstream_complete() {
2282 let state = Arc::new(Mutex::new(BufferedFlowState::default()));
2283 let graph = single_opaque_stage_graph(BufferedFlowOnPull {
2284 state: Arc::clone(&state),
2285 });
2286 let inlet = graph.shape.inlet().id();
2287 let outlet = graph.shape.outlet().id();
2288 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2289 let mut output = Vec::<i32>::new();
2290 let mut output_sink = VecOutputSink {
2291 output: &mut output,
2292 };
2293
2294 executor
2295 .deliver(inlet, datum(11_i32), outlet, &mut output_sink)
2296 .unwrap();
2297 executor.complete(inlet, outlet, &mut output_sink).unwrap();
2298 executor
2299 .downstream_finish(outlet, outlet, &mut output_sink)
2300 .unwrap();
2301 executor.request(outlet, outlet, &mut output_sink).unwrap();
2302
2303 assert!(output_sink.output.is_empty());
2304 let state = state.lock().unwrap();
2305 assert_eq!(state.finish_calls, 1);
2306 }
2307
2308 #[test]
2309 fn broadcast_cancels_upstream_only_after_all_outlets_cancel() {
2310 let graph = GraphDsl::try_create(|builder| {
2311 let broadcast = builder.add(Broadcast::<i32>::new(2));
2312 let merge = builder.add(Merge::<i32>::new(2));
2313 builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2314 builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2315 Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2316 })
2317 .unwrap();
2318
2319 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2320 let broadcast_index = *executor
2321 .stage_by_inlet
2322 .get(&graph.shape.inlet().id())
2323 .unwrap();
2324 let first = graph.stages[broadcast_index].spec.outlets[0].id();
2325 let second = graph.stages[broadcast_index].spec.outlets[1].id();
2326
2327 let first_transition = executor
2328 .process_downstream_finish(broadcast_index, first)
2329 .unwrap();
2330 assert!(first_transition.cancelled_inlets.is_empty());
2331
2332 let second_transition = executor
2333 .process_downstream_finish(broadcast_index, second)
2334 .unwrap();
2335 assert_eq!(
2336 second_transition.cancelled_inlets,
2337 vec![graph.stages[broadcast_index].spec.inlets[0].id()]
2338 );
2339 }
2340
2341 #[test]
2342 fn downstream_finish_propagates_through_merge_and_broadcast() {
2343 let graph = GraphDsl::try_create(|builder| {
2344 let broadcast = builder.add(Broadcast::<i32>::new(2));
2345 let merge = builder.add(Merge::<i32>::new(2));
2346 builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2347 builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2348 Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2349 })
2350 .unwrap();
2351
2352 let outlet = graph.shape.outlet().id();
2353 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2354 let mut output = Vec::<i32>::new();
2355 let mut output_sink = VecOutputSink {
2356 output: &mut output,
2357 };
2358
2359 executor
2360 .downstream_finish(outlet, outlet, &mut output_sink)
2361 .unwrap();
2362
2363 let broadcast_index = *executor
2364 .stage_by_inlet
2365 .get(&graph.shape.inlet().id())
2366 .unwrap();
2367 let StageState::Broadcast {
2368 live_outlets,
2369 cancelled_outlets,
2370 ..
2371 } = &executor.stage_states[broadcast_index]
2372 else {
2373 panic!("expected broadcast state");
2374 };
2375 assert_eq!(*live_outlets, 0);
2376 assert_eq!(cancelled_outlets, &vec![true, true]);
2377 }
2378
2379 #[test]
2380 fn partition_holds_routed_element_until_target_outlet_pulls() {
2381 let graph = GraphDsl::create(|builder| {
2382 builder.add(Partition::<i32>::new(2, |value| usize::from(*value >= 10)))
2383 })
2384 .unwrap();
2385
2386 let stage_index = 0usize;
2387 let inlet = graph.shape.inlet().id();
2388 let out0 = graph.shape.outlet(0).unwrap().id();
2389 let out1 = graph.shape.outlet(1).unwrap().id();
2390 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2391
2392 executor.process_pull(stage_index, out0).unwrap();
2393 let transition = executor
2394 .process_stage(stage_index, inlet, datum(11_i32))
2395 .unwrap();
2396 assert!(matches!(transition.emissions, StageEmissions::None));
2397
2398 let pull_transition = executor.process_pull(stage_index, out1).unwrap();
2399 match pull_transition.emissions {
2400 StageEmissions::One(port, value) => {
2401 assert_eq!(port, out1);
2402 assert_eq!(
2403 downcast_datum::<i32, _>(value, "emit", || "Partition.out1").unwrap(),
2404 11
2405 );
2406 }
2407 _ => panic!("expected one pending partition emission"),
2408 }
2409 }
2410
2411 #[test]
2412 fn partition_cancels_upstream_only_after_all_outlets_cancel_when_not_eager() {
2413 let graph = GraphDsl::create(|builder| {
2414 builder.add(Partition::<i32>::new(2, |value| usize::from(*value >= 10)))
2415 })
2416 .unwrap();
2417
2418 let stage_index = 0usize;
2419 let out0 = graph.shape.outlet(0).unwrap().id();
2420 let out1 = graph.shape.outlet(1).unwrap().id();
2421 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2422
2423 let first = executor
2424 .process_downstream_finish(stage_index, out0)
2425 .unwrap();
2426 assert!(first.cancelled_inlets.is_empty());
2427
2428 let second = executor
2429 .process_downstream_finish(stage_index, out1)
2430 .unwrap();
2431 assert_eq!(second.cancelled_inlets, vec![graph.shape.inlet().id()]);
2432 }
2433
2434 #[test]
2435 fn unzip_continues_emitting_to_live_outlet_after_peer_cancels() {
2436 let graph =
2437 GraphDsl::create(|builder| builder.add(Unzip::<i32, &'static str>::new())).unwrap();
2438
2439 let stage_index = 0usize;
2440 let inlet = graph.shape.inlet().id();
2441 let out0 = graph.shape.out0().id();
2442 let out1 = graph.shape.out1().id();
2443 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2444
2445 executor.process_pull(stage_index, out0).unwrap();
2446 executor.process_pull(stage_index, out1).unwrap();
2447 let cancel = executor
2448 .process_downstream_finish(stage_index, out1)
2449 .unwrap();
2450 assert!(cancel.cancelled_inlets.is_empty());
2451
2452 let transition = executor
2453 .process_stage(stage_index, inlet, datum((7_i32, "seven")))
2454 .unwrap();
2455 match transition.emissions {
2456 StageEmissions::One(port, value) => {
2457 assert_eq!(port, out0);
2458 assert_eq!(
2459 downcast_datum::<i32, _>(value, "emit", || "Unzip.out0").unwrap(),
2460 7
2461 );
2462 }
2463 StageEmissions::Many(values) => {
2464 assert_eq!(values.len(), 1);
2465 assert_eq!(values[0].0, out0);
2466 }
2467 _ => panic!("expected emission to the remaining live unzip outlet"),
2468 }
2469 }
2470
2471 #[test]
2472 fn unzip_cancels_upstream_only_after_both_outlets_cancel() {
2473 let graph =
2474 GraphDsl::create(|builder| builder.add(Unzip::<i32, &'static str>::new())).unwrap();
2475
2476 let stage_index = 0usize;
2477 let out0 = graph.shape.out0().id();
2478 let out1 = graph.shape.out1().id();
2479 let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2480
2481 let first = executor
2482 .process_downstream_finish(stage_index, out0)
2483 .unwrap();
2484 assert!(first.cancelled_inlets.is_empty());
2485
2486 let second = executor
2487 .process_downstream_finish(stage_index, out1)
2488 .unwrap();
2489 assert_eq!(second.cancelled_inlets, vec![graph.shape.inlet().id()]);
2490 }
2491
2492 #[test]
2493 fn opaque_internal_outlet_repulls_after_first_emission() {
2494 let graph = GraphDsl::try_create(|builder| {
2495 let opaque = builder.add(EmitMultipleOnPush);
2496 let identity = builder.add(Identity::<i32>::new());
2497 builder.connect(opaque.outlet(), identity.inlet())?;
2498 Ok(FlowShape::new(opaque.inlet(), identity.outlet()))
2499 })
2500 .unwrap();
2501
2502 assert_eq!(graph.run_with_input([10]).unwrap(), vec![1, 2]);
2503 }
2504
2505 #[test]
2518 fn executor_mode_auto_erased_identical_typed_errors() {
2519 let graph = GraphDsl::try_create(|builder| {
2520 let broadcast = builder.add(Broadcast::<i32>::new(2));
2521 let merge = builder.add(Merge::<i32>::new(2));
2522 builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2523 builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2524 Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2525 })
2526 .unwrap();
2527
2528 let input = vec![1, 2, 3];
2529
2530 let auto_result = graph
2531 .run_with_input_mode(input.clone(), ExecutorMode::Auto)
2532 .unwrap();
2533 let erased_result = graph
2534 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2535 .unwrap();
2536
2537 assert_eq!(auto_result, erased_result);
2540 assert_eq!(auto_result.len(), input.len() * 2);
2542
2543 let typed_result = graph.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly);
2545 assert!(
2546 matches!(
2547 typed_result,
2548 Err(StreamError::GraphValidation(ref msg))
2549 if msg.contains("typed executor does not support this graph shape")
2550 ),
2551 "expected TypedOnly to error for junction graph, got: {typed_result:?}"
2552 );
2553 }
2554
2555 fn identity_chain_bp(n: usize) -> GraphBlueprint<FlowShape<i64, i64>> {
2559 assert!(n >= 1);
2560 GraphDsl::try_create(|builder| {
2561 let first = builder.add(Identity::<i64>::new());
2562 let inlet = first.inlet();
2563 let mut outlet = first.outlet();
2564 for _ in 1..n {
2565 let next = builder.add(Identity::<i64>::new());
2566 builder.connect(outlet, next.inlet())?;
2567 outlet = next.outlet();
2568 }
2569 Ok(FlowShape::new(inlet, outlet))
2570 })
2571 .unwrap()
2572 }
2573
2574 fn map_chain_bp(n: usize) -> GraphBlueprint<FlowShape<i64, i64>> {
2576 assert!(n >= 1);
2577 GraphDsl::try_create(|builder| {
2578 let first = builder.add(MapStage::new(|x: i64| x.wrapping_mul(2)));
2579 let inlet = first.inlet();
2580 let mut outlet = first.outlet();
2581 for _ in 1..n {
2582 let next = builder.add(MapStage::new(|x: i64| x.wrapping_mul(2)));
2583 builder.connect(outlet, next.inlet())?;
2584 outlet = next.outlet();
2585 }
2586 Ok(FlowShape::new(inlet, outlet))
2587 })
2588 .unwrap()
2589 }
2590
2591 #[test]
2594 fn typed_erased_equivalence_identity_collect() {
2595 let graph = identity_chain_bp(5);
2596 let input: Vec<i64> = (0..20).collect();
2597
2598 let erased = graph
2599 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2600 .unwrap();
2601 let typed = graph
2602 .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2603 .unwrap();
2604
2605 assert_eq!(
2606 typed, erased,
2607 "typed and erased paths disagree on identity×5"
2608 );
2609 }
2610
2611 #[test]
2614 fn typed_erased_equivalence_identity_count() {
2615 let graph = identity_chain_bp(5);
2616 let input: Vec<i64> = (0..20).collect();
2617 let config = FusedExecutionConfig::default();
2618
2619 let erased = graph
2620 .run_count_with_input_report_mode(input.clone(), config, ExecutorMode::ErasedOnly)
2621 .unwrap()
2622 .result;
2623 let typed = graph
2624 .run_count_with_input_report_mode(input.clone(), config, ExecutorMode::TypedOnly)
2625 .unwrap()
2626 .result;
2627
2628 assert_eq!(typed, erased, "typed and erased count differ on identity×5");
2629 }
2630
2631 #[test]
2634 fn typed_erased_equivalence_map_fold() {
2635 let graph = map_chain_bp(5);
2636 let input: Vec<i64> = (1..=10).collect();
2637 let config = FusedExecutionConfig::default();
2638
2639 let erased = graph
2640 .run_fold_with_input_report_mode(
2641 input.clone(),
2642 0i64,
2643 |acc, x| acc.wrapping_add(x),
2644 config,
2645 ExecutorMode::ErasedOnly,
2646 )
2647 .unwrap()
2648 .result;
2649 let typed = graph
2650 .run_fold_with_input_report_mode(
2651 input.clone(),
2652 0i64,
2653 |acc, x| acc.wrapping_add(x),
2654 config,
2655 ExecutorMode::TypedOnly,
2656 )
2657 .unwrap()
2658 .result;
2659
2660 assert_eq!(typed, erased, "typed and erased fold differ on map×5");
2661 }
2662
2663 #[test]
2666 fn typed_erased_equivalence_map_collect() {
2667 let graph = map_chain_bp(5);
2668 let input: Vec<i64> = (0..20).collect();
2669
2670 let erased = graph
2671 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2672 .unwrap();
2673 let typed = graph
2674 .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2675 .unwrap();
2676
2677 assert_eq!(typed, erased, "typed and erased paths disagree on map×5");
2678 }
2679
2680 #[test]
2683 fn typed_only_errors_on_junction_graph() {
2684 let graph = GraphDsl::try_create(|builder| {
2685 let broadcast = builder.add(Broadcast::<i32>::new(2));
2686 let merge = builder.add(Merge::<i32>::new(2));
2687 builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2688 builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2689 Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2690 })
2691 .unwrap();
2692
2693 let result = graph.run_with_input_mode(vec![1], ExecutorMode::TypedOnly);
2694 assert!(
2695 matches!(
2696 result,
2697 Err(StreamError::GraphValidation(ref msg))
2698 if msg.contains("typed executor does not support this graph shape")
2699 ),
2700 "expected TypedOnly to error on junction, got: {result:?}"
2701 );
2702 }
2703
2704 #[test]
2706 fn auto_falls_back_silently_for_junction_graph() {
2707 let graph = GraphDsl::try_create(|builder| {
2708 let broadcast = builder.add(Broadcast::<i32>::new(2));
2709 let merge = builder.add(Merge::<i32>::new(2));
2710 builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2711 builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2712 Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2713 })
2714 .unwrap();
2715
2716 let result = graph.run_with_input_mode(vec![1, 2, 3], ExecutorMode::Auto);
2717 assert!(result.is_ok(), "Auto should succeed (fallback to erased)");
2718 assert_eq!(result.unwrap().len(), 6); }
2720
2721 fn merge_sequence_graph() -> GraphBlueprint<FlowShape<(u64, u64), u64>> {
2725 GraphDsl::try_create(|builder| {
2726 let unzip = builder.add(Unzip::<u64, u64>::new());
2727 let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
2728 builder.connect(unzip.out0(), merge.inlet(0)?)?;
2729 builder.connect(unzip.out1(), merge.inlet(1)?)?;
2730 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
2731 })
2732 .unwrap()
2733 }
2734
2735 #[test]
2738 fn typed_only_accepts_merge_sequence_topology() {
2739 let graph = merge_sequence_graph();
2740 let result =
2741 graph.run_with_input_mode(vec![(0u64, 1u64), (2u64, 3u64)], ExecutorMode::TypedOnly);
2742 assert!(
2743 result.is_ok(),
2744 "TypedOnly should accept Unzip→MergeSequence topology, got: {result:?}"
2745 );
2746 }
2747
2748 #[test]
2751 fn typed_erased_equivalence_merge_sequence_in_order() {
2752 let graph = merge_sequence_graph();
2753 let input: Vec<(u64, u64)> = (0..10).step_by(2).map(|i| (i, i + 1)).collect();
2754
2755 let erased = graph
2756 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2757 .unwrap();
2758 let typed = graph
2759 .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2760 .unwrap();
2761
2762 assert_eq!(
2763 typed, erased,
2764 "typed and erased disagree on in-order merge_sequence"
2765 );
2766 let expected: Vec<u64> = (0..10).collect();
2768 assert_eq!(typed, expected);
2769 }
2770
2771 #[test]
2775 fn typed_erased_equivalence_merge_sequence_out_of_order() {
2776 let graph = merge_sequence_graph();
2777 let input: Vec<(u64, u64)> = vec![(1, 0), (3, 2), (5, 4)];
2779
2780 let erased = graph
2781 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2782 .unwrap();
2783 let typed = graph
2784 .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2785 .unwrap();
2786
2787 assert_eq!(
2788 typed, erased,
2789 "typed and erased disagree on out-of-order merge_sequence"
2790 );
2791 assert_eq!(typed, vec![0u64, 1, 2, 3, 4, 5]);
2793 }
2794
2795 #[test]
2800 fn typed_erased_equivalence_merge_sequence_gap_failure() {
2801 let graph = merge_sequence_graph();
2804 let input = vec![(1u64, 2u64)];
2805
2806 let erased = graph.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly);
2807 let typed = graph.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly);
2808
2809 assert!(
2810 matches!(&erased, Err(StreamError::Failed(msg)) if msg.contains("expected sequence")),
2811 "ErasedOnly should fail on gap: {erased:?}"
2812 );
2813 assert!(
2814 matches!(&typed, Err(StreamError::Failed(msg)) if msg.contains("expected sequence")),
2815 "TypedOnly should fail on gap: {typed:?}"
2816 );
2817 }
2818
2819 #[test]
2822 fn typed_erased_equivalence_merge_sequence_completion() {
2823 let graph = merge_sequence_graph();
2824 let input = vec![(0u64, 1u64)];
2825
2826 let erased = graph
2827 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2828 .unwrap();
2829 let typed = graph
2830 .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2831 .unwrap();
2832
2833 assert_eq!(typed, erased, "typed and erased disagree on completion");
2834 assert_eq!(typed, vec![0u64, 1u64]);
2835 }
2836
2837 #[test]
2840 fn auto_selects_typed_for_merge_sequence_topology() {
2841 let graph = merge_sequence_graph();
2842 let input: Vec<(u64, u64)> = (0..20).step_by(2).map(|i| (i, i + 1)).collect();
2843
2844 let auto_result = graph
2845 .run_with_input_mode(input.clone(), ExecutorMode::Auto)
2846 .unwrap();
2847 let erased_result = graph
2848 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2849 .unwrap();
2850
2851 assert_eq!(
2852 auto_result, erased_result,
2853 "Auto and ErasedOnly disagree on merge_sequence topology"
2854 );
2855 }
2856
2857 fn merge_latest_graph_exec() -> GraphBlueprint<FlowShape<(u64, u64), Vec<u64>>> {
2861 GraphDsl::try_create(|builder| {
2862 let unzip = builder.add(Unzip::<u64, u64>::new());
2863 let merge = builder.add(MergeLatest::<u64>::new(2, false));
2864 builder.connect(unzip.out0(), merge.inlet(0)?)?;
2865 builder.connect(unzip.out1(), merge.inlet(1)?)?;
2866 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
2867 })
2868 .unwrap()
2869 }
2870
2871 fn merge_latest_eager_graph_exec() -> GraphBlueprint<FlowShape<(i32, i32), Vec<i32>>> {
2873 GraphDsl::try_create(|builder| {
2874 let unzip = builder.add(Unzip::<i32, i32>::new());
2875 let merge = builder.add(MergeLatest::<i32>::new(2, true));
2876 builder.connect(unzip.out0(), merge.inlet(0)?)?;
2877 builder.connect(unzip.out1(), merge.inlet(1)?)?;
2878 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
2879 })
2880 .unwrap()
2881 }
2882
2883 #[test]
2885 fn typed_only_accepts_merge_latest_topology() {
2886 let graph = merge_latest_graph_exec();
2887 let result =
2888 graph.run_with_input_mode(vec![(0u64, 1u64), (2u64, 3u64)], ExecutorMode::TypedOnly);
2889 assert!(
2890 result.is_ok(),
2891 "TypedOnly should accept Unzip→MergeLatest topology, got: {result:?}"
2892 );
2893 }
2894
2895 #[test]
2897 fn typed_erased_equivalence_merge_latest_snapshot_ordering() {
2898 let graph = merge_latest_graph_exec();
2899 let input: Vec<(u64, u64)> = (0..10).map(|i| (i, i + 100)).collect();
2902
2903 let erased = graph
2904 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2905 .unwrap();
2906 let typed = graph
2907 .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2908 .unwrap();
2909
2910 assert_eq!(
2911 typed, erased,
2912 "typed and erased disagree on snapshot ordering"
2913 );
2914 assert!(
2916 typed.iter().all(|s| s.len() == 2),
2917 "snapshots must have len 2"
2918 );
2919 }
2920
2921 #[test]
2923 fn typed_erased_equivalence_merge_latest_partial_fill() {
2924 let graph = merge_latest_graph_exec();
2927 let input = vec![(5u64, 42u64)];
2928
2929 let erased = graph
2930 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2931 .unwrap();
2932 let typed = graph
2933 .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2934 .unwrap();
2935
2936 assert_eq!(typed, erased, "typed and erased disagree on partial-fill");
2937 assert_eq!(typed.len(), 1, "expected exactly one snapshot");
2939 }
2940
2941 #[test]
2947 fn typed_erased_equivalence_merge_latest_eager_complete() {
2948 let graph_eager = merge_latest_eager_graph_exec();
2949 let input = vec![(1i32, 10i32)];
2950
2951 let erased = graph_eager
2952 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2953 .unwrap();
2954 let typed = graph_eager
2955 .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2956 .unwrap();
2957
2958 assert_eq!(
2959 typed, erased,
2960 "typed and erased disagree on eager-complete behavior"
2961 );
2962 assert!(
2963 !typed.is_empty(),
2964 "eager-complete graph should produce at least one snapshot"
2965 );
2966 }
2967
2968 #[test]
2970 fn typed_erased_equivalence_merge_latest_completion() {
2971 let graph = merge_latest_graph_exec();
2972 let input = vec![(0u64, 1u64)];
2973
2974 let erased = graph
2975 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2976 .unwrap();
2977 let typed = graph
2978 .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2979 .unwrap();
2980
2981 assert_eq!(typed, erased, "typed and erased disagree on completion");
2982 }
2983
2984 #[test]
2987 fn auto_selects_typed_for_merge_latest_topology() {
2988 let graph = merge_latest_graph_exec();
2989 let input: Vec<(u64, u64)> = (0..20).map(|i| (i, i + 1_000)).collect();
2990
2991 let auto_result = graph
2992 .run_with_input_mode(input.clone(), ExecutorMode::Auto)
2993 .unwrap();
2994 let erased_result = graph
2995 .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2996 .unwrap();
2997
2998 assert_eq!(
2999 auto_result, erased_result,
3000 "Auto and ErasedOnly disagree on merge_latest topology"
3001 );
3002 }
3003
3004 #[test]
3014 fn merge_latest_blueprint_sequential_reuse_is_independent() {
3015 let graph = merge_latest_graph_exec();
3016 let input_a: Vec<(u64, u64)> = (0..5).map(|i| (i, i + 100)).collect();
3017 let input_b: Vec<(u64, u64)> = (10..15).map(|i| (i, i + 200)).collect();
3018
3019 let result_a_typed = graph
3021 .run_with_input_mode(input_a.clone(), ExecutorMode::TypedOnly)
3022 .unwrap();
3023 let result_b_typed = graph
3024 .run_with_input_mode(input_b.clone(), ExecutorMode::TypedOnly)
3025 .unwrap();
3026
3027 let result_a_erased = graph
3029 .run_with_input_mode(input_a, ExecutorMode::ErasedOnly)
3030 .unwrap();
3031 let result_b_erased = graph
3032 .run_with_input_mode(input_b, ExecutorMode::ErasedOnly)
3033 .unwrap();
3034
3035 assert_eq!(
3036 result_a_typed, result_a_erased,
3037 "sequential run A: typed and erased disagree"
3038 );
3039 assert_eq!(
3040 result_b_typed, result_b_erased,
3041 "sequential run B: typed and erased disagree"
3042 );
3043 assert_ne!(
3045 result_a_typed, result_b_typed,
3046 "runs A and B should differ (different inputs)"
3047 );
3048 }
3049
3050 #[test]
3056 fn merge_latest_blueprint_concurrent_reuse_is_independent() {
3057 use std::sync::Arc as StdArc;
3058
3059 let graph = StdArc::new(merge_latest_graph_exec());
3061
3062 let input_a: Vec<(u64, u64)> = (0..50).map(|i| (i, i + 1_000)).collect();
3063 let input_b: Vec<(u64, u64)> = (100..150).map(|i| (i, i + 2_000)).collect();
3064
3065 let graph_a = StdArc::clone(&graph);
3066 let graph_b = StdArc::clone(&graph);
3067 let ia = input_a.clone();
3068 let ib = input_b.clone();
3069
3070 let handle_a =
3071 std::thread::spawn(move || graph_a.run_with_input_mode(ia, ExecutorMode::TypedOnly));
3072 let handle_b =
3073 std::thread::spawn(move || graph_b.run_with_input_mode(ib, ExecutorMode::TypedOnly));
3074
3075 let result_a = handle_a.join().expect("thread A panicked").unwrap();
3076 let result_b = handle_b.join().expect("thread B panicked").unwrap();
3077
3078 let ref_a = graph
3080 .run_with_input_mode(input_a, ExecutorMode::ErasedOnly)
3081 .unwrap();
3082 let ref_b = graph
3083 .run_with_input_mode(input_b, ExecutorMode::ErasedOnly)
3084 .unwrap();
3085
3086 assert_eq!(
3087 result_a, ref_a,
3088 "concurrent run A: typed and erased disagree"
3089 );
3090 assert_eq!(
3091 result_b, ref_b,
3092 "concurrent run B: typed and erased disagree"
3093 );
3094 assert_ne!(result_a, result_b, "concurrent runs must be independent");
3096 }
3097}
3098
3099enum AsyncLinearMessage<T> {
3100 Item(T),
3101 Done,
3102 Failed(StreamError),
3103}
3104
3105enum RactorBoundaryCommand {
3106 Run,
3107}
3108
3109#[cfg(feature = "cluster")]
3110impl ractor::Message for RactorBoundaryCommand {}
3111
3112#[cfg(test)]
3113fn run_threaded_async_linear_count<I, T>(
3114 input: I,
3115 segments: TypedLinearSegments<T>,
3116 config: AsyncBoundaryExecutionConfig,
3117) -> StreamResult<FusedTerminalReport<usize>>
3118where
3119 I: IntoIterator<Item = T> + Send,
3120 I::IntoIter: Send,
3121 T: Send + 'static,
3122{
3123 let channels = segments.segments.len() + 1;
3124 let mut senders = Vec::with_capacity(channels);
3125 let mut receivers = Vec::with_capacity(channels);
3126 for _ in 0..channels {
3127 let (sender, receiver) = mpsc::sync_channel(config.buffer_size);
3128 senders.push(sender);
3129 receivers.push(Some(receiver));
3130 }
3131
3132 let first_sender = senders
3133 .first()
3134 .expect("at least one async segment channel")
3135 .clone();
3136 let mut final_receiver = Some(
3137 receivers
3138 .last_mut()
3139 .expect("at least one async segment channel")
3140 .take()
3141 .expect("final receiver is present"),
3142 );
3143 let events = AtomicUsize::new(0);
3144 let async_boundary_crossings = AtomicUsize::new(0);
3145
3146 let result = thread::scope(|scope| {
3147 let source = scope.spawn(move || feed_threaded_async_linear_input(input, first_sender));
3148 let mut workers = Vec::with_capacity(segments.segments.len());
3149
3150 for (index, steps) in segments.segments.iter().enumerate() {
3151 let input = receivers[index].take().expect("worker receiver is present");
3152 let output = senders[index + 1].clone();
3153 let has_boundary_after = index + 1 < segments.segments.len();
3154 let events = &events;
3155 let async_boundary_crossings = &async_boundary_crossings;
3156 workers.push(scope.spawn(move || {
3157 run_threaded_async_linear_segment(
3158 steps,
3159 input,
3160 output,
3161 has_boundary_after,
3162 events,
3163 async_boundary_crossings,
3164 config,
3165 )
3166 }));
3167 }
3168 drop(senders);
3169
3170 let final_rx = final_receiver.take().expect("final receiver present");
3171 let mut count = 0;
3172 let mut terminal_error = None;
3173 loop {
3174 match final_rx.recv() {
3175 Ok(AsyncLinearMessage::Item(_)) => count += 1,
3176 Ok(AsyncLinearMessage::Done) => break,
3177 Ok(AsyncLinearMessage::Failed(error)) => {
3178 terminal_error = Some(error);
3179 break;
3180 }
3181 Err(_) => {
3182 terminal_error = Some(StreamError::AbruptTermination);
3183 break;
3184 }
3185 }
3186 }
3187 drop(final_rx);
3188
3189 let mut worker_error = join_threaded_async_linear_worker(source)?;
3190 for worker in workers {
3191 if worker_error.is_none() {
3192 worker_error = join_threaded_async_linear_worker(worker)?;
3193 } else {
3194 let _ = join_threaded_async_linear_worker(worker);
3195 }
3196 }
3197
3198 match (terminal_error, worker_error) {
3199 (Some(error), _) if error != StreamError::AbruptTermination => return Err(error),
3200 (_, Some(error)) => return Err(error),
3201 (Some(error), None) => return Err(error),
3202 (None, None) => {}
3203 }
3204
3205 Ok(count)
3206 });
3207
3208 Ok(FusedTerminalReport {
3209 result: result?,
3210 events: events.load(Ordering::Relaxed),
3211 async_boundary_crossings: async_boundary_crossings.load(Ordering::Relaxed),
3212 })
3213}
3214
3215fn run_ractor_async_linear_count<I, T>(
3216 input: I,
3217 segments: TypedLinearSegments<T>,
3218 config: AsyncBoundaryExecutionConfig,
3219) -> StreamResult<FusedTerminalReport<usize>>
3220where
3221 I: IntoIterator<Item = T> + Send,
3222 I::IntoIter: Send + 'static,
3223 T: Send + 'static,
3224{
3225 if config.buffer_size == 0 {
3226 return Err(StreamError::GraphValidation(
3227 "ractor async boundary execution requires buffer_size greater than zero".into(),
3228 ));
3229 }
3230
3231 let input = input.into_iter();
3232 let runtime = ractor_boundary_runtime()?;
3233 if tokio::runtime::Handle::try_current().is_ok() {
3234 thread::scope(|scope| {
3235 let handle = scope.spawn(move || {
3236 runtime.block_on(run_ractor_async_linear_count_on_runtime(
3237 input, segments, config,
3238 ))
3239 });
3240 handle.join().map_err(|_| {
3241 StreamError::Failed("ractor async boundary runtime thread panicked".into())
3242 })?
3243 })
3244 } else {
3245 runtime.block_on(run_ractor_async_linear_count_on_runtime(
3246 input, segments, config,
3247 ))
3248 }
3249}
3250
3251fn ractor_boundary_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
3252 static RUNTIME: OnceLock<Result<tokio::runtime::Runtime, String>> = OnceLock::new();
3253
3254 match RUNTIME.get_or_init(|| {
3255 tokio::runtime::Builder::new_multi_thread()
3256 .build()
3257 .map_err(|error| format!("ractor async boundary runtime failed to start: {error}"))
3258 }) {
3259 Ok(runtime) => Ok(runtime),
3260 Err(error) => Err(StreamError::Failed(error.clone())),
3261 }
3262}
3263
3264async fn run_ractor_async_linear_count_on_runtime<I, T>(
3265 input: I,
3266 segments: TypedLinearSegments<T>,
3267 config: AsyncBoundaryExecutionConfig,
3268) -> StreamResult<FusedTerminalReport<usize>>
3269where
3270 I: Iterator<Item = T> + Send + 'static,
3271 T: Send + 'static,
3272{
3273 let channels = segments.segments.len() + 1;
3274 let mut senders = Vec::with_capacity(channels);
3275 let mut receivers = Vec::with_capacity(channels);
3276 for _ in 0..channels {
3277 let (sender, receiver) = ractor::concurrency::mpsc_bounded(config.buffer_size);
3278 senders.push(sender);
3279 receivers.push(Some(receiver));
3280 }
3281
3282 let first_sender = senders
3283 .first()
3284 .expect("at least one async segment channel")
3285 .clone();
3286 let mut final_receiver = receivers
3287 .last_mut()
3288 .expect("at least one async segment channel")
3289 .take()
3290 .expect("final receiver is present");
3291 let events = Arc::new(AtomicUsize::new(0));
3292 let async_boundary_crossings = Arc::new(AtomicUsize::new(0));
3293
3294 let (source_ref, source_handle) = Actor::spawn(
3295 None,
3296 RactorLinearSourceActor::<I, T>::new(),
3297 RactorLinearSourceState {
3298 input: Some(input),
3299 output: first_sender,
3300 },
3301 )
3302 .await
3303 .map_err(ractor_spawn_error)?;
3304
3305 let mut actors = Vec::with_capacity(segments.segments.len() + 1);
3306 actors.push((source_ref, source_handle));
3307
3308 for (index, steps) in segments.segments.into_iter().enumerate() {
3309 let input = receivers[index].take().expect("worker receiver is present");
3310 let output = senders[index + 1].clone();
3311 let has_boundary_after = index + 1 < channels - 1;
3312 let (worker_ref, worker_handle) = match Actor::spawn(
3313 None,
3314 RactorLinearSegmentActor::<T>::new(),
3315 RactorLinearSegmentState {
3316 steps,
3317 input,
3318 output,
3319 has_boundary_after,
3320 events: Arc::clone(&events),
3321 async_boundary_crossings: Arc::clone(&async_boundary_crossings),
3322 config,
3323 },
3324 )
3325 .await
3326 {
3327 Ok(actor) => actor,
3328 Err(error) => {
3329 let error = ractor_spawn_error(error);
3330 stop_ractor_async_linear_actors(&actors);
3331 let _ = join_ractor_async_linear_actors(actors).await;
3332 return Err(error);
3333 }
3334 };
3335 actors.push((worker_ref, worker_handle));
3336 }
3337 drop(senders);
3338
3339 let mut start_error = None;
3340 for (actor, _) in &actors {
3341 if actor.send_message(RactorBoundaryCommand::Run).is_err() {
3342 start_error = Some(StreamError::AbruptTermination);
3343 break;
3344 }
3345 }
3346 if let Some(error) = start_error {
3347 stop_ractor_async_linear_actors(&actors);
3348 let _ = join_ractor_async_linear_actors(actors).await;
3349 return Err(error);
3350 }
3351
3352 let mut count = 0;
3353 let mut terminal_error = None;
3354 loop {
3355 match final_receiver.recv().await {
3356 Some(AsyncLinearMessage::Item(_)) => count += 1,
3357 Some(AsyncLinearMessage::Done) => break,
3358 Some(AsyncLinearMessage::Failed(error)) => {
3359 terminal_error = Some(error);
3360 break;
3361 }
3362 None => {
3363 terminal_error = Some(StreamError::AbruptTermination);
3364 break;
3365 }
3366 }
3367 }
3368 drop(final_receiver);
3369
3370 stop_ractor_async_linear_actors(&actors);
3371 let actor_error = join_ractor_async_linear_actors(actors).await;
3372
3373 match (terminal_error, actor_error) {
3374 (Some(error), _) if error != StreamError::AbruptTermination => return Err(error),
3375 (_, Some(error)) => return Err(error),
3376 (Some(error), None) => return Err(error),
3377 (None, None) => {}
3378 }
3379
3380 Ok(FusedTerminalReport {
3381 result: count,
3382 events: events.load(Ordering::Relaxed),
3383 async_boundary_crossings: async_boundary_crossings.load(Ordering::Relaxed),
3384 })
3385}
3386
3387struct RactorLinearSourceActor<I, T> {
3388 _marker: PhantomData<fn() -> (I, T)>,
3389}
3390
3391impl<I, T> RactorLinearSourceActor<I, T> {
3392 fn new() -> Self {
3393 Self {
3394 _marker: PhantomData,
3395 }
3396 }
3397}
3398
3399struct RactorLinearSourceState<I, T> {
3400 input: Option<I>,
3401 output: ractor::concurrency::MpscSender<AsyncLinearMessage<T>>,
3402}
3403
3404impl<I, T> Actor for RactorLinearSourceActor<I, T>
3405where
3406 I: Iterator<Item = T> + Send + 'static,
3407 T: Send + 'static,
3408{
3409 type Msg = RactorBoundaryCommand;
3410 type State = RactorLinearSourceState<I, T>;
3411 type Arguments = RactorLinearSourceState<I, T>;
3412
3413 async fn pre_start(
3414 &self,
3415 _myself: ActorRef<Self::Msg>,
3416 args: Self::Arguments,
3417 ) -> Result<Self::State, ActorProcessingErr> {
3418 Ok(args)
3419 }
3420
3421 async fn handle(
3422 &self,
3423 myself: ActorRef<Self::Msg>,
3424 message: Self::Msg,
3425 state: &mut Self::State,
3426 ) -> Result<(), ActorProcessingErr> {
3427 match message {
3428 RactorBoundaryCommand::Run => {
3429 let input = state.input.take().ok_or_else(|| {
3430 actor_processing_error(StreamError::GraphValidation(
3431 "ractor async boundary source actor was run more than once".into(),
3432 ))
3433 })?;
3434 feed_ractor_async_linear_input(input, &state.output)
3435 .await
3436 .map_err(actor_processing_error)?;
3437 myself.stop(None);
3438 }
3439 }
3440 Ok(())
3441 }
3442}
3443
3444struct RactorLinearSegmentActor<T> {
3445 _marker: PhantomData<fn() -> T>,
3446}
3447
3448impl<T> RactorLinearSegmentActor<T> {
3449 fn new() -> Self {
3450 Self {
3451 _marker: PhantomData,
3452 }
3453 }
3454}
3455
3456struct RactorLinearSegmentState<T> {
3457 steps: Vec<TypedLinearStep<T>>,
3458 input: ractor::concurrency::MpscReceiver<AsyncLinearMessage<T>>,
3459 output: ractor::concurrency::MpscSender<AsyncLinearMessage<T>>,
3460 has_boundary_after: bool,
3461 events: Arc<AtomicUsize>,
3462 async_boundary_crossings: Arc<AtomicUsize>,
3463 config: AsyncBoundaryExecutionConfig,
3464}
3465
3466impl<T> Actor for RactorLinearSegmentActor<T>
3467where
3468 T: Send + 'static,
3469{
3470 type Msg = RactorBoundaryCommand;
3471 type State = RactorLinearSegmentState<T>;
3472 type Arguments = RactorLinearSegmentState<T>;
3473
3474 async fn pre_start(
3475 &self,
3476 _myself: ActorRef<Self::Msg>,
3477 args: Self::Arguments,
3478 ) -> Result<Self::State, ActorProcessingErr> {
3479 Ok(args)
3480 }
3481
3482 async fn handle(
3483 &self,
3484 myself: ActorRef<Self::Msg>,
3485 message: Self::Msg,
3486 state: &mut Self::State,
3487 ) -> Result<(), ActorProcessingErr> {
3488 match message {
3489 RactorBoundaryCommand::Run => {
3490 run_ractor_async_linear_segment(state)
3491 .await
3492 .map_err(actor_processing_error)?;
3493 myself.stop(None);
3494 }
3495 }
3496 Ok(())
3497 }
3498}
3499
3500async fn feed_ractor_async_linear_input<I, T>(
3501 input: I,
3502 output: &ractor::concurrency::MpscSender<AsyncLinearMessage<T>>,
3503) -> StreamResult<()>
3504where
3505 I: Iterator<Item = T>,
3506{
3507 for item in input {
3508 output
3509 .send(AsyncLinearMessage::Item(item))
3510 .await
3511 .map_err(|_| StreamError::AbruptTermination)?;
3512 }
3513 output
3514 .send(AsyncLinearMessage::Done)
3515 .await
3516 .map_err(|_| StreamError::AbruptTermination)
3517}
3518
3519async fn run_ractor_async_linear_segment<T>(
3520 state: &mut RactorLinearSegmentState<T>,
3521) -> StreamResult<()>
3522where
3523 T: Send + 'static,
3524{
3525 loop {
3526 match state.input.recv().await {
3527 Some(AsyncLinearMessage::Item(item)) => {
3528 let result =
3529 run_async_linear_item(item, &state.steps, &state.events, state.config.fused)
3530 .and_then(|item| {
3531 if state.has_boundary_after {
3532 bump_fused_event_atomic(&state.events, state.config.fused)?;
3533 state
3534 .async_boundary_crossings
3535 .fetch_add(1, Ordering::Relaxed);
3536 bump_fused_event_atomic(&state.events, state.config.fused)?;
3537 }
3538 Ok(item)
3539 });
3540
3541 match result {
3542 Ok(item) => state
3543 .output
3544 .send(AsyncLinearMessage::Item(item))
3545 .await
3546 .map_err(|_| StreamError::AbruptTermination)?,
3547 Err(error) => {
3548 let _ = state
3549 .output
3550 .send(AsyncLinearMessage::Failed(error.clone()))
3551 .await;
3552 return Err(error);
3553 }
3554 }
3555 }
3556 Some(AsyncLinearMessage::Done) => {
3557 state
3558 .output
3559 .send(AsyncLinearMessage::Done)
3560 .await
3561 .map_err(|_| StreamError::AbruptTermination)?;
3562 return Ok(());
3563 }
3564 Some(AsyncLinearMessage::Failed(error)) => {
3565 let _ = state
3566 .output
3567 .send(AsyncLinearMessage::Failed(error.clone()))
3568 .await;
3569 return Err(error);
3570 }
3571 None => return Err(StreamError::AbruptTermination),
3572 }
3573 }
3574}
3575
3576async fn join_ractor_async_linear_actor(
3577 handle: ractor::concurrency::JoinHandle<()>,
3578) -> StreamResult<()> {
3579 handle.await.map_err(|error| {
3580 StreamError::Failed(format!("ractor async boundary actor task failed: {error}"))
3581 })
3582}
3583
3584fn stop_ractor_async_linear_actors(
3585 actors: &[(
3586 ActorRef<RactorBoundaryCommand>,
3587 ractor::concurrency::JoinHandle<()>,
3588 )],
3589) {
3590 for (actor, _) in actors {
3591 actor.stop(None);
3592 }
3593}
3594
3595#[cfg_attr(not(test), allow(dead_code))]
3596async fn join_ractor_async_linear_actors(
3597 actors: Vec<(
3598 ActorRef<RactorBoundaryCommand>,
3599 ractor::concurrency::JoinHandle<()>,
3600 )>,
3601) -> Option<StreamError> {
3602 let mut actor_error = None;
3603 for (_, handle) in actors {
3604 let result = join_ractor_async_linear_actor(handle).await;
3605 if actor_error.is_some() {
3606 continue;
3607 }
3608 if let Err(error) = result {
3609 actor_error = Some(error);
3610 }
3611 }
3612 actor_error
3613}
3614
3615fn ractor_spawn_error(error: ractor::SpawnErr) -> StreamError {
3616 StreamError::Failed(format!(
3617 "ractor async boundary actor failed to spawn: {error}"
3618 ))
3619}
3620
3621fn actor_processing_error(error: StreamError) -> ActorProcessingErr {
3622 Box::new(error)
3623}
3624
3625#[cfg(test)]
3626fn feed_threaded_async_linear_input<I, T>(
3627 input: I,
3628 output: mpsc::SyncSender<AsyncLinearMessage<T>>,
3629) -> StreamResult<()>
3630where
3631 I: IntoIterator<Item = T>,
3632{
3633 for item in input {
3634 output
3635 .send(AsyncLinearMessage::Item(item))
3636 .map_err(|_| StreamError::AbruptTermination)?;
3637 }
3638 output
3639 .send(AsyncLinearMessage::Done)
3640 .map_err(|_| StreamError::AbruptTermination)
3641}
3642
3643#[cfg(test)]
3644fn run_threaded_async_linear_segment<T>(
3645 steps: &[TypedLinearStep<T>],
3646 input: mpsc::Receiver<AsyncLinearMessage<T>>,
3647 output: mpsc::SyncSender<AsyncLinearMessage<T>>,
3648 has_boundary_after: bool,
3649 events: &AtomicUsize,
3650 async_boundary_crossings: &AtomicUsize,
3651 config: AsyncBoundaryExecutionConfig,
3652) -> StreamResult<()>
3653where
3654 T: Send + 'static,
3655{
3656 loop {
3657 match input.recv().map_err(|_| StreamError::AbruptTermination)? {
3658 AsyncLinearMessage::Item(item) => {
3659 let result =
3660 run_async_linear_item(item, steps, events, config.fused).and_then(|item| {
3661 if has_boundary_after {
3662 bump_fused_event_atomic(events, config.fused)?;
3663 async_boundary_crossings.fetch_add(1, Ordering::Relaxed);
3664 bump_fused_event_atomic(events, config.fused)?;
3665 }
3666 output
3667 .send(AsyncLinearMessage::Item(item))
3668 .map_err(|_| StreamError::AbruptTermination)
3669 });
3670 if let Err(error) = result {
3671 let _ = output.send(AsyncLinearMessage::Failed(error.clone()));
3672 return Err(error);
3673 }
3674 }
3675 AsyncLinearMessage::Done => {
3676 output
3677 .send(AsyncLinearMessage::Done)
3678 .map_err(|_| StreamError::AbruptTermination)?;
3679 return Ok(());
3680 }
3681 AsyncLinearMessage::Failed(error) => {
3682 let _ = output.send(AsyncLinearMessage::Failed(error.clone()));
3683 return Err(error);
3684 }
3685 }
3686 }
3687}
3688
3689fn run_async_linear_item<T>(
3690 mut item: T,
3691 steps: &[TypedLinearStep<T>],
3692 events: &AtomicUsize,
3693 config: FusedExecutionConfig,
3694) -> StreamResult<T>
3695where
3696 T: Send + 'static,
3697{
3698 for step in steps {
3699 bump_fused_event_atomic(events, config)?;
3700 match step {
3701 TypedLinearStep::Pass => {}
3702 TypedLinearStep::Map(mapper) => {
3703 item = mapper(item);
3704 }
3705 TypedLinearStep::AsyncBoundary => {
3706 return Err(StreamError::GraphValidation(
3707 "async boundary execution expects pre-split linear segments".into(),
3708 ));
3709 }
3710 }
3711 bump_fused_event_atomic(events, config)?;
3712 }
3713 Ok(item)
3714}
3715
3716#[cfg(test)]
3717fn join_threaded_async_linear_worker(
3718 handle: thread::ScopedJoinHandle<'_, StreamResult<()>>,
3719) -> StreamResult<Option<StreamError>> {
3720 match handle.join() {
3721 Ok(Ok(())) => Ok(None),
3722 Ok(Err(error)) => Ok(Some(error)),
3723 Err(_) => Err(StreamError::Failed("async boundary worker panicked".into())),
3724 }
3725}
3726
3727fn bump_fused_event_atomic(events: &AtomicUsize, config: FusedExecutionConfig) -> StreamResult<()> {
3728 let events = events.fetch_add(1, Ordering::Relaxed) + 1;
3729 if events > config.event_limit {
3730 return Err(StreamError::EventLimitExceeded {
3731 limit: config.event_limit,
3732 });
3733 }
3734 Ok(())
3735}
3736
3737impl<T> GraphBlueprint<FanInShape<T, T>>
3738where
3739 T: Clone + Send + 'static,
3740{
3741 pub fn run_fan_in(&self, inputs: Vec<Vec<T>>) -> StreamResult<Vec<T>> {
3742 Ok(self
3743 .run_fan_in_report(inputs, FusedExecutionConfig::default())?
3744 .output)
3745 }
3746
3747 pub fn run_fan_in_report(
3748 &self,
3749 inputs: Vec<Vec<T>>,
3750 config: FusedExecutionConfig,
3751 ) -> StreamResult<FusedExecutionReport<T>> {
3752 if inputs.len() != self.shape.inlet_count() {
3753 return Err(StreamError::GraphValidation(format!(
3754 "expected {} input streams, got {}",
3755 self.shape.inlet_count(),
3756 inputs.len()
3757 )));
3758 }
3759
3760 let output_capacity = inputs.iter().map(Vec::len).sum();
3761 let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
3762 let schedule = self.fan_in_schedule();
3763 let mut schedule_index = 0;
3764 let outlet = self.shape.outlet().id();
3765 let mut executor = FusedExecutor::new(self, config);
3766 let mut output = Vec::with_capacity(output_capacity);
3767 let mut completed = vec![false; queues.len()];
3768
3769 {
3770 let mut output_sink = VecOutputSink {
3771 output: &mut output,
3772 };
3773 for (index, queue) in queues.iter().enumerate() {
3774 if queue.len() == 0 {
3775 executor.complete(self.shape.inlet(index)?.id(), outlet, &mut output_sink)?;
3776 completed[index] = true;
3777 }
3778 }
3779 while queues.iter().any(|queue| queue.len() > 0) {
3780 let input_index = next_scheduled_input(&queues, &schedule, &mut schedule_index)
3781 .ok_or_else(|| {
3782 StreamError::GraphValidation("no runnable fan-in input".into())
3783 })?;
3784 let item = queues[input_index]
3785 .next()
3786 .expect("scheduled input had an item");
3787 executor.deliver(
3788 self.shape.inlet(input_index)?.id(),
3789 datum(item),
3790 outlet,
3791 &mut output_sink,
3792 )?;
3793 if queues[input_index].len() == 0 && !completed[input_index] {
3794 executor.complete(
3795 self.shape.inlet(input_index)?.id(),
3796 outlet,
3797 &mut output_sink,
3798 )?;
3799 completed[input_index] = true;
3800 }
3801 }
3802 }
3803
3804 Ok(FusedExecutionReport {
3805 output,
3806 events: executor.events,
3807 async_boundary_crossings: executor.async_boundary_crossings,
3808 })
3809 }
3810
3811 fn fan_in_schedule(&self) -> Vec<usize> {
3812 self.stages
3813 .iter()
3814 .find_map(|stage| match &stage.spec.kind {
3815 StageKind::MergePrioritized { weights }
3816 if weights.len() == self.shape.inlet_count()
3817 && stage.spec.outlets.len() == 1
3818 && stage.spec.outlets[0].id() == self.shape.outlet().id()
3819 && stage.spec.inlets.iter().map(AnyInlet::id).eq(self
3820 .shape
3821 .inlets()
3822 .iter()
3823 .map(|inlet| inlet.id())) =>
3824 {
3825 Some(
3826 weights
3827 .iter()
3828 .enumerate()
3829 .flat_map(|(index, weight)| std::iter::repeat_n(index, *weight))
3830 .collect(),
3831 )
3832 }
3833 _ => None,
3834 })
3835 .unwrap_or_else(|| (0..self.shape.inlet_count()).collect())
3836 }
3837
3838 pub fn run_concat(&self, inputs: Vec<Vec<T>>) -> StreamResult<Vec<T>> {
3839 Ok(self
3840 .run_concat_report(inputs, FusedExecutionConfig::default())?
3841 .output)
3842 }
3843
3844 pub fn run_concat_report(
3845 &self,
3846 inputs: Vec<Vec<T>>,
3847 config: FusedExecutionConfig,
3848 ) -> StreamResult<FusedExecutionReport<T>> {
3849 if inputs.len() != self.shape.inlet_count() {
3850 return Err(StreamError::GraphValidation(format!(
3851 "expected {} input streams, got {}",
3852 self.shape.inlet_count(),
3853 inputs.len()
3854 )));
3855 }
3856
3857 let output_capacity = inputs.iter().map(Vec::len).sum();
3858 let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
3859 let outlet = self.shape.outlet().id();
3860 let mut executor = FusedExecutor::new(self, config);
3861 let mut output = Vec::with_capacity(output_capacity);
3862
3863 {
3864 let mut output_sink = VecOutputSink {
3865 output: &mut output,
3866 };
3867 for (index, queue) in queues.iter_mut().enumerate() {
3868 for item in queue.by_ref() {
3869 executor.deliver(
3870 self.shape.inlet(index)?.id(),
3871 datum(item),
3872 outlet,
3873 &mut output_sink,
3874 )?;
3875 }
3876 executor.complete(self.shape.inlet(index)?.id(), outlet, &mut output_sink)?;
3877 }
3878 }
3879
3880 Ok(FusedExecutionReport {
3881 output,
3882 events: executor.events,
3883 async_boundary_crossings: executor.async_boundary_crossings,
3884 })
3885 }
3886
3887 pub fn run_or_else(&self, primary: Vec<T>, secondary: Vec<T>) -> StreamResult<Vec<T>> {
3888 Ok(self
3889 .run_or_else_report(primary, secondary, FusedExecutionConfig::default())?
3890 .output)
3891 }
3892
3893 pub fn run_or_else_report(
3894 &self,
3895 primary: Vec<T>,
3896 secondary: Vec<T>,
3897 config: FusedExecutionConfig,
3898 ) -> StreamResult<FusedExecutionReport<T>> {
3899 if self.shape.inlet_count() != 2 {
3900 return Err(StreamError::GraphValidation(format!(
3901 "or-else helper expected 2 inlets, got {}",
3902 self.shape.inlet_count()
3903 )));
3904 }
3905
3906 let primary = primary.into_iter();
3907 let secondary = secondary.into_iter();
3908 let primary_inlet = self.shape.inlet(0)?.id();
3909 let secondary_inlet = self.shape.inlet(1)?.id();
3910 let outlet = self.shape.outlet().id();
3911 let mut executor = FusedExecutor::new(self, config);
3912 let mut output = Vec::new();
3913 let mut primary_emitted = false;
3914
3915 {
3916 let mut output_sink = VecOutputSink {
3917 output: &mut output,
3918 };
3919 for item in primary {
3920 primary_emitted = true;
3921 executor.deliver(primary_inlet, datum(item), outlet, &mut output_sink)?;
3922 }
3923 executor.complete(primary_inlet, outlet, &mut output_sink)?;
3924
3925 if !primary_emitted {
3926 for item in secondary {
3927 executor.deliver(secondary_inlet, datum(item), outlet, &mut output_sink)?;
3928 }
3929 }
3930 executor.complete(secondary_inlet, outlet, &mut output_sink)?;
3931 }
3932
3933 Ok(FusedExecutionReport {
3934 output,
3935 events: executor.events,
3936 async_boundary_crossings: executor.async_boundary_crossings,
3937 })
3938 }
3939
3940 pub fn run_or_else_secondary_first(
3941 &self,
3942 primary: Vec<T>,
3943 secondary: Vec<T>,
3944 ) -> StreamResult<Vec<T>> {
3945 Ok(self
3946 .run_or_else_secondary_first_report(
3947 primary,
3948 secondary,
3949 FusedExecutionConfig::default(),
3950 )?
3951 .output)
3952 }
3953
3954 pub fn run_or_else_secondary_first_report(
3955 &self,
3956 primary: Vec<T>,
3957 secondary: Vec<T>,
3958 config: FusedExecutionConfig,
3959 ) -> StreamResult<FusedExecutionReport<T>> {
3960 if self.shape.inlet_count() != 2 {
3961 return Err(StreamError::GraphValidation(format!(
3962 "or-else helper expected 2 inlets, got {}",
3963 self.shape.inlet_count()
3964 )));
3965 }
3966
3967 let primary_inlet = self.shape.inlet(0)?.id();
3968 let secondary_inlet = self.shape.inlet(1)?.id();
3969 let outlet = self.shape.outlet().id();
3970 let mut executor = FusedExecutor::new(self, config);
3971 let mut output = Vec::new();
3972
3973 {
3974 let mut output_sink = VecOutputSink {
3975 output: &mut output,
3976 };
3977 for item in secondary {
3978 executor.deliver(secondary_inlet, datum(item), outlet, &mut output_sink)?;
3979 }
3980 for item in primary {
3981 executor.deliver(primary_inlet, datum(item), outlet, &mut output_sink)?;
3982 }
3983 executor.complete(primary_inlet, outlet, &mut output_sink)?;
3984 executor.complete(secondary_inlet, outlet, &mut output_sink)?;
3985 }
3986
3987 Ok(FusedExecutionReport {
3988 output,
3989 events: executor.events,
3990 async_boundary_crossings: executor.async_boundary_crossings,
3991 })
3992 }
3993
3994 pub fn run_or_else_secondary_closed_first(&self, secondary: Vec<T>) -> StreamResult<Vec<T>> {
3995 if self.shape.inlet_count() != 2 {
3996 return Err(StreamError::GraphValidation(format!(
3997 "or-else helper expected 2 inlets, got {}",
3998 self.shape.inlet_count()
3999 )));
4000 }
4001
4002 let primary_inlet = self.shape.inlet(0)?.id();
4003 let secondary_inlet = self.shape.inlet(1)?.id();
4004 let outlet = self.shape.outlet().id();
4005 let mut executor = FusedExecutor::new(self, FusedExecutionConfig::default());
4006 let mut output = Vec::new();
4007
4008 {
4009 let mut output_sink = VecOutputSink {
4010 output: &mut output,
4011 };
4012 for item in secondary {
4013 executor.deliver(secondary_inlet, datum(item), outlet, &mut output_sink)?;
4014 }
4015 executor.complete(secondary_inlet, outlet, &mut output_sink)?;
4016 executor.complete(primary_inlet, outlet, &mut output_sink)?;
4017 }
4018
4019 Ok(output)
4020 }
4021
4022 pub fn run_interleave(
4023 &self,
4024 inputs: Vec<Vec<T>>,
4025 segment_size: usize,
4026 eager_close: bool,
4027 ) -> StreamResult<Vec<T>> {
4028 Ok(self
4029 .run_interleave_report(
4030 inputs,
4031 segment_size,
4032 eager_close,
4033 FusedExecutionConfig::default(),
4034 )?
4035 .output)
4036 }
4037
4038 pub fn run_interleave_report(
4039 &self,
4040 inputs: Vec<Vec<T>>,
4041 segment_size: usize,
4042 eager_close: bool,
4043 config: FusedExecutionConfig,
4044 ) -> StreamResult<FusedExecutionReport<T>> {
4045 if inputs.len() != self.shape.inlet_count() {
4046 return Err(StreamError::GraphValidation(format!(
4047 "expected {} input streams, got {}",
4048 self.shape.inlet_count(),
4049 inputs.len()
4050 )));
4051 }
4052 if segment_size == 0 {
4053 return Err(StreamError::GraphValidation(
4054 "interleave segment size must be greater than zero".into(),
4055 ));
4056 }
4057
4058 let output_capacity = inputs.iter().map(Vec::len).sum();
4059 let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
4060 let mut completed = vec![false; queues.len()];
4061 let outlet = self.shape.outlet().id();
4062 let mut executor = FusedExecutor::new(self, config);
4063 let mut output = Vec::with_capacity(output_capacity);
4064
4065 {
4066 let mut output_sink = VecOutputSink {
4067 output: &mut output,
4068 };
4069 for (index, queue) in queues.iter().enumerate() {
4070 if queue.len() == 0 {
4071 executor.complete(self.shape.inlet(index)?.id(), outlet, &mut output_sink)?;
4072 completed[index] = true;
4073 if eager_close {
4074 return Ok(FusedExecutionReport {
4075 output,
4076 events: executor.events,
4077 async_boundary_crossings: executor.async_boundary_crossings,
4078 });
4079 }
4080 }
4081 }
4082
4083 let mut current = 0usize;
4084 while completed.iter().any(|done| !done) {
4085 if completed[current] {
4086 current = next_open_index(&completed, current).ok_or_else(|| {
4087 StreamError::GraphValidation("no open interleave input".into())
4088 })?;
4089 continue;
4090 }
4091
4092 let mut emitted = 0usize;
4093 while emitted < segment_size {
4094 match queues[current].next() {
4095 Some(item) => {
4096 executor.deliver(
4097 self.shape.inlet(current)?.id(),
4098 datum(item),
4099 outlet,
4100 &mut output_sink,
4101 )?;
4102 emitted += 1;
4103 }
4104 None => {
4105 executor.complete(
4106 self.shape.inlet(current)?.id(),
4107 outlet,
4108 &mut output_sink,
4109 )?;
4110 completed[current] = true;
4111 if eager_close {
4112 return Ok(FusedExecutionReport {
4113 output,
4114 events: executor.events,
4115 async_boundary_crossings: executor.async_boundary_crossings,
4116 });
4117 }
4118 break;
4119 }
4120 }
4121 }
4122
4123 if completed.iter().all(|done| *done) {
4124 break;
4125 }
4126 current = next_open_index(&completed, current).ok_or_else(|| {
4127 StreamError::GraphValidation("no open interleave input".into())
4128 })?;
4129 }
4130 }
4131
4132 Ok(FusedExecutionReport {
4133 output,
4134 events: executor.events,
4135 async_boundary_crossings: executor.async_boundary_crossings,
4136 })
4137 }
4138}
4139
4140impl<T> GraphBlueprint<MergePreferredShape<T>>
4141where
4142 T: Clone + Send + 'static,
4143{
4144 pub fn run_merge_preferred(
4145 &self,
4146 preferred: Vec<T>,
4147 secondary_inputs: Vec<Vec<T>>,
4148 ) -> StreamResult<Vec<T>> {
4149 Ok(self
4150 .run_merge_preferred_report(
4151 preferred,
4152 secondary_inputs,
4153 FusedExecutionConfig::default(),
4154 )?
4155 .output)
4156 }
4157
4158 pub fn run_merge_preferred_report(
4159 &self,
4160 preferred: Vec<T>,
4161 secondary_inputs: Vec<Vec<T>>,
4162 config: FusedExecutionConfig,
4163 ) -> StreamResult<FusedExecutionReport<T>> {
4164 if secondary_inputs.len() != self.shape.secondary_count() {
4165 return Err(StreamError::GraphValidation(format!(
4166 "expected {} secondary input streams, got {}",
4167 self.shape.secondary_count(),
4168 secondary_inputs.len()
4169 )));
4170 }
4171
4172 let output_capacity =
4173 preferred.len() + secondary_inputs.iter().map(Vec::len).sum::<usize>();
4174 let mut preferred = preferred.into_iter();
4175 let mut secondary: Vec<_> = secondary_inputs.into_iter().map(Vec::into_iter).collect();
4176 let secondary_schedule = (0..secondary.len()).collect::<Vec<_>>();
4177 let mut secondary_index = 0;
4178 let outlet = self.shape.outlet().id();
4179 let preferred_inlet = self.shape.preferred().id();
4180 let mut executor = FusedExecutor::new(self, config);
4181 let mut output = Vec::with_capacity(output_capacity);
4182 let mut preferred_completed = false;
4183 let mut secondary_completed = vec![false; secondary.len()];
4184
4185 {
4186 let mut output_sink = VecOutputSink {
4187 output: &mut output,
4188 };
4189 if preferred.len() == 0 {
4190 executor.complete(preferred_inlet, outlet, &mut output_sink)?;
4191 preferred_completed = true;
4192 }
4193 for (index, queue) in secondary.iter().enumerate() {
4194 if queue.len() == 0 {
4195 executor.complete(
4196 self.shape.secondary(index)?.id(),
4197 outlet,
4198 &mut output_sink,
4199 )?;
4200 secondary_completed[index] = true;
4201 }
4202 }
4203 while preferred.len() > 0 || secondary.iter().any(|queue| queue.len() > 0) {
4204 if let Some(item) = preferred.next() {
4205 executor.deliver(preferred_inlet, datum(item), outlet, &mut output_sink)?;
4206 if preferred.len() == 0 && !preferred_completed {
4207 executor.complete(preferred_inlet, outlet, &mut output_sink)?;
4208 preferred_completed = true;
4209 }
4210 continue;
4211 }
4212
4213 let input_index =
4214 next_scheduled_input(&secondary, &secondary_schedule, &mut secondary_index)
4215 .ok_or_else(|| {
4216 StreamError::GraphValidation("no runnable secondary input".into())
4217 })?;
4218 let item = secondary[input_index]
4219 .next()
4220 .expect("scheduled secondary input had an item");
4221 executor.deliver(
4222 self.shape.secondary(input_index)?.id(),
4223 datum(item),
4224 outlet,
4225 &mut output_sink,
4226 )?;
4227 if secondary[input_index].len() == 0 && !secondary_completed[input_index] {
4228 executor.complete(
4229 self.shape.secondary(input_index)?.id(),
4230 outlet,
4231 &mut output_sink,
4232 )?;
4233 secondary_completed[input_index] = true;
4234 }
4235 }
4236 }
4237
4238 Ok(FusedExecutionReport {
4239 output,
4240 events: executor.events,
4241 async_boundary_crossings: executor.async_boundary_crossings,
4242 })
4243 }
4244}
4245
4246fn next_scheduled_input<I>(
4247 queues: &[I],
4248 schedule: &[usize],
4249 schedule_index: &mut usize,
4250) -> Option<usize>
4251where
4252 I: ExactSizeIterator,
4253{
4254 if schedule.is_empty() {
4255 return queues.iter().position(|queue| queue.len() > 0);
4256 }
4257
4258 for _ in 0..schedule.len() {
4259 let index = schedule[*schedule_index % schedule.len()];
4260 *schedule_index += 1;
4261 if queues.get(index).is_some_and(|queue| queue.len() > 0) {
4262 return Some(index);
4263 }
4264 }
4265
4266 queues.iter().position(|queue| queue.len() > 0)
4267}
4268
4269fn next_open_index(completed: &[bool], current: usize) -> Option<usize> {
4270 if completed.is_empty() {
4271 return None;
4272 }
4273 let len = completed.len();
4274 for offset in 1..=len {
4275 let index = (current + offset) % len;
4276 if !completed[index] {
4277 return Some(index);
4278 }
4279 }
4280 None
4281}
4282
4283pub(crate) trait FusedOutputSink<Out> {
4292 fn emit(&mut self, value: Out) -> StreamResult<()>;
4293}
4294
4295pub(crate) struct VecOutputSink<'a, Out> {
4296 pub(crate) output: &'a mut Vec<Out>,
4297}
4298
4299impl<Out> FusedOutputSink<Out> for VecOutputSink<'_, Out> {
4300 fn emit(&mut self, value: Out) -> StreamResult<()> {
4301 self.output.push(value);
4302 Ok(())
4303 }
4304}
4305
4306pub(crate) struct CountOutputSink {
4307 pub(crate) count: usize,
4308}
4309
4310impl<Out> FusedOutputSink<Out> for CountOutputSink {
4311 fn emit(&mut self, _value: Out) -> StreamResult<()> {
4312 self.count += 1;
4313 Ok(())
4314 }
4315}
4316
4317pub(crate) struct FoldOutputSink<Acc, F> {
4318 pub(crate) accumulator: Option<Acc>,
4319 pub(crate) fold: F,
4320}
4321
4322impl<Out, Acc, F> FusedOutputSink<Out> for FoldOutputSink<Acc, F>
4323where
4324 F: FnMut(Acc, Out) -> Acc,
4325{
4326 fn emit(&mut self, value: Out) -> StreamResult<()> {
4327 let accumulator = self
4328 .accumulator
4329 .take()
4330 .expect("fold accumulator is present while executor is running");
4331 self.accumulator = Some((self.fold)(accumulator, value));
4332 Ok(())
4333 }
4334}
4335
4336impl<Acc, F> FoldOutputSink<Acc, F> {
4337 pub(crate) fn finish(mut self) -> Acc {
4338 self.accumulator
4339 .take()
4340 .expect("fold accumulator is present after executor finishes")
4341 }
4342}
4343
4344#[derive(Debug)]
4356pub(crate) struct FusedExecutor<'a, S: Shape> {
4357 graph: &'a GraphBlueprint<S>,
4358 pub(crate) edge_by_outlet: HashMap<PortId, PortId>,
4360 pub(crate) edge_by_inlet: HashMap<PortId, PortId>,
4362 pub(crate) stage_by_inlet: HashMap<PortId, usize>,
4364 pub(crate) stage_by_outlet: HashMap<PortId, usize>,
4366 stage_states: Vec<StageState>,
4367 pub(crate) opaque_logics: Vec<Option<GraphStageLogic>>,
4368 config: FusedExecutionConfig,
4369 pub(crate) events: usize,
4370 pub(crate) async_boundary_crossings: usize,
4371 cancelled_outlets: HashSet<PortId>,
4372}
4373
4374#[derive(Debug)]
4375enum StageState {
4376 Stateless,
4377 Broadcast {
4378 fast_path: Option<BroadcastZipFastPath>,
4379 cancelled_outlets: Vec<bool>,
4380 live_outlets: usize,
4381 },
4382 Balance {
4383 fast_path: Option<BalanceMergeFastPath>,
4384 next: usize,
4385 cancelled_outlets: Vec<bool>,
4386 live_outlets: usize,
4387 },
4388 Merge {
4389 open_inputs: usize,
4390 eager_complete: bool,
4391 completed: bool,
4392 },
4393 OrElse {
4394 primary_emitted: bool,
4395 buffer: VecDeque<DatumValue>,
4396 primary_closed: bool,
4397 secondary_closed: bool,
4398 completed: bool,
4399 },
4400 Zip {
4401 left_inlet: PortId,
4402 right_inlet: PortId,
4403 left: VecDeque<DatumValue>,
4404 right: VecDeque<DatumValue>,
4405 left_pending_complete: bool,
4406 right_pending_complete: bool,
4407 completed: bool,
4408 },
4409 Unzip {
4410 fast_path: Option<UnzipFanInFastPath>,
4411 zip_fast_path: Option<UnzipZipFastPath>,
4412 demand: [bool; 2],
4413 cancelled: [bool; 2],
4414 upstream_closed: bool,
4415 },
4416 MergeSorted {
4417 left: VecDeque<DatumValue>,
4418 right: VecDeque<DatumValue>,
4419 left_closed: bool,
4420 right_closed: bool,
4421 pending: VecDeque<DatumValue>,
4422 completed: bool,
4423 },
4424 MergeSequence {
4425 next_sequence: u64,
4426 pending: Vec<(u64, DatumValue)>,
4427 completed_count: usize,
4428 output_buffer: VecDeque<DatumValue>,
4429 completed: bool,
4430 },
4431 MergeLatest {
4432 latest: Vec<Option<DatumValue>>,
4433 seen_count: usize,
4434 completed_count: usize,
4435 pending: VecDeque<DatumValue>,
4436 completed: bool,
4437 },
4438 Partition {
4439 fast_path: Option<PartitionMergeFastPath>,
4440 pending: Option<(usize, DatumValue)>,
4441 upstream_closed: bool,
4442 demand: Vec<bool>,
4443 cancelled: Vec<bool>,
4444 output_count: usize,
4445 completed: bool,
4446 },
4447}
4448
4449#[derive(Clone, Copy, Debug)]
4450struct BroadcastZipFastPath {
4451 zip_stage_index: usize,
4452 left_uses_clone: bool,
4453}
4454
4455#[derive(Clone, Copy, Debug)]
4456struct BalanceMergeFastPath {
4457 merge_stage_index: usize,
4458}
4459
4460#[derive(Clone, Copy, Debug)]
4461struct UnzipFanInFastPath {
4462 fan_in_stage_index: usize,
4463 target_inlet_indices: [usize; 2],
4467}
4468
4469#[derive(Clone, Copy, Debug)]
4470struct UnzipZipFastPath {
4471 zip_stage_index: usize,
4472}
4473
4474#[derive(Clone, Copy, Debug)]
4475struct PartitionMergeFastPath {
4476 merge_stage_index: usize,
4477}
4478
4479enum StageEmissions {
4480 None,
4481 One(PortId, DatumValue),
4482 Two((PortId, DatumValue), (PortId, DatumValue)),
4483 Many(Vec<(PortId, DatumValue)>),
4484}
4485
4486struct StageTransition {
4487 emissions: StageEmissions,
4488 completed_outlets: Vec<PortId>,
4489 cancelled_inlets: Vec<PortId>,
4490}
4491
4492impl StageTransition {
4493 fn none() -> Self {
4494 Self {
4495 emissions: StageEmissions::None,
4496 completed_outlets: Vec::new(),
4497 cancelled_inlets: Vec::new(),
4498 }
4499 }
4500
4501 fn emit(emissions: StageEmissions) -> Self {
4502 Self {
4503 emissions,
4504 completed_outlets: Vec::new(),
4505 cancelled_inlets: Vec::new(),
4506 }
4507 }
4508
4509 fn with_completion(mut self, completed_outlets: Vec<PortId>) -> Self {
4510 self.completed_outlets = completed_outlets;
4511 self
4512 }
4513
4514 fn with_cancellations(mut self, cancelled_inlets: Vec<PortId>) -> Self {
4515 self.cancelled_inlets = cancelled_inlets;
4516 self
4517 }
4518}
4519
4520impl<'a, S: Shape> FusedExecutor<'a, S> {
4521 fn is_outlet_cancelled(&self, outlet: PortId) -> bool {
4522 !self.cancelled_outlets.is_empty() && self.cancelled_outlets.contains(&outlet)
4523 }
4524
4525 fn new(graph: &'a GraphBlueprint<S>, config: FusedExecutionConfig) -> Self {
4526 let edge_by_outlet = graph
4527 .edges
4528 .iter()
4529 .map(|edge| (edge.outlet, edge.inlet))
4530 .collect();
4531 let edge_by_inlet = graph
4532 .edges
4533 .iter()
4534 .map(|edge| (edge.inlet, edge.outlet))
4535 .collect();
4536 let mut stage_by_inlet = HashMap::new();
4537 let mut stage_by_outlet = HashMap::new();
4538 for (index, stage) in graph.stages.iter().enumerate() {
4539 for inlet in &stage.spec.inlets {
4540 stage_by_inlet.insert(inlet.id(), index);
4541 }
4542 for outlet in &stage.spec.outlets {
4543 stage_by_outlet.insert(outlet.id(), index);
4544 }
4545 }
4546
4547 let opaque_logics: Vec<_> = graph
4548 .stages
4549 .iter()
4550 .map(|stage| stage.logic_factory.as_ref().map(|factory| factory()))
4551 .collect();
4552
4553 let stage_states = graph
4554 .stages
4555 .iter()
4556 .map(|stage| match stage.spec.kind {
4557 StageKind::Broadcast => StageState::Broadcast {
4558 fast_path: broadcast_zip_fast_path(
4559 stage,
4560 graph,
4561 &edge_by_outlet,
4562 &stage_by_inlet,
4563 ),
4564 cancelled_outlets: vec![false; stage.spec.outlets.len()],
4565 live_outlets: stage.spec.outlets.len(),
4566 },
4567 StageKind::Balance => StageState::Balance {
4568 fast_path: balance_merge_fast_path(
4569 stage,
4570 graph,
4571 &edge_by_outlet,
4572 &stage_by_inlet,
4573 ),
4574 next: 0,
4575 cancelled_outlets: vec![false; stage.spec.outlets.len()],
4576 live_outlets: stage.spec.outlets.len(),
4577 },
4578 StageKind::Merge => StageState::Merge {
4579 open_inputs: stage.spec.inlets.len(),
4580 eager_complete: false,
4581 completed: false,
4582 },
4583 StageKind::MergePreferred => StageState::Merge {
4584 open_inputs: stage.spec.inlets.len(),
4585 eager_complete: false,
4586 completed: false,
4587 },
4588 StageKind::MergePrioritized { .. } => StageState::Merge {
4589 open_inputs: stage.spec.inlets.len(),
4590 eager_complete: false,
4591 completed: false,
4592 },
4593 StageKind::Concat => StageState::Merge {
4594 open_inputs: stage.spec.inlets.len(),
4595 eager_complete: false,
4596 completed: false,
4597 },
4598 StageKind::Interleave { eager_close, .. } => StageState::Merge {
4599 open_inputs: stage.spec.inlets.len(),
4600 eager_complete: eager_close,
4601 completed: false,
4602 },
4603 StageKind::OrElse { primary_inlet: _ } => StageState::OrElse {
4604 primary_emitted: false,
4605 buffer: VecDeque::new(),
4606 primary_closed: false,
4607 secondary_closed: false,
4608 completed: false,
4609 },
4610 StageKind::Zip(_) => {
4611 if let [left, right] = stage.spec.inlets.as_slice() {
4612 StageState::Zip {
4613 left_inlet: left.id(),
4614 right_inlet: right.id(),
4615 left: VecDeque::new(),
4616 right: VecDeque::new(),
4617 left_pending_complete: false,
4618 right_pending_complete: false,
4619 completed: false,
4620 }
4621 } else {
4622 StageState::Stateless
4623 }
4624 }
4625 StageKind::Unzip { .. } => StageState::Unzip {
4626 fast_path: unzip_fan_in_fast_path(
4627 stage,
4628 graph,
4629 &edge_by_outlet,
4630 &stage_by_inlet,
4631 ),
4632 zip_fast_path: unzip_zip_fast_path(
4633 stage,
4634 graph,
4635 &edge_by_outlet,
4636 &stage_by_inlet,
4637 ),
4638 demand: [false; 2],
4639 cancelled: [false; 2],
4640 upstream_closed: false,
4641 },
4642 StageKind::MergeSorted(_) => StageState::MergeSorted {
4643 left: VecDeque::new(),
4644 right: VecDeque::new(),
4645 left_closed: false,
4646 right_closed: false,
4647 pending: VecDeque::new(),
4648 completed: false,
4649 },
4650 StageKind::MergeSequence { .. } => StageState::MergeSequence {
4651 next_sequence: 0,
4652 pending: Vec::new(),
4653 completed_count: 0,
4654 output_buffer: VecDeque::new(),
4655 completed: false,
4656 },
4657 StageKind::MergeLatest { input_count, .. } => {
4658 let mut latest = Vec::with_capacity(input_count);
4659 for _ in 0..input_count {
4660 latest.push(None);
4661 }
4662 StageState::MergeLatest {
4663 latest,
4664 seen_count: 0,
4665 completed_count: 0,
4666 pending: VecDeque::new(),
4667 completed: false,
4668 }
4669 }
4670 StageKind::Partition { output_count, .. } => StageState::Partition {
4671 fast_path: partition_merge_fast_path(
4672 stage,
4673 graph,
4674 &edge_by_outlet,
4675 &stage_by_inlet,
4676 ),
4677 pending: None,
4678 upstream_closed: false,
4679 demand: vec![false; output_count],
4680 cancelled: vec![false; output_count],
4681 output_count,
4682 completed: false,
4683 },
4684 _ => StageState::Stateless,
4685 })
4686 .collect();
4687
4688 let mut executor = Self {
4689 graph,
4690 edge_by_outlet,
4691 edge_by_inlet,
4692 stage_by_inlet,
4693 stage_by_outlet,
4694 stage_states,
4695 opaque_logics,
4696 config,
4697 events: 0,
4698 async_boundary_crossings: 0,
4699 cancelled_outlets: HashSet::new(),
4700 };
4701 executor.prime_connected_demands();
4702 executor
4703 }
4704
4705 fn deliver<Out>(
4706 &mut self,
4707 inlet: PortId,
4708 value: DatumValue,
4709 graph_outlet: PortId,
4710 output: &mut impl FusedOutputSink<Out>,
4711 ) -> StreamResult<()>
4712 where
4713 Out: Send + 'static,
4714 {
4715 self.bump_event()?;
4716 let stage_index = *self.stage_by_inlet.get(&inlet).ok_or_else(|| {
4717 StreamError::GraphValidation(format!("inlet {} has no owning stage", inlet.as_usize()))
4718 })?;
4719
4720 let transition = self.process_stage(stage_index, inlet, value)?;
4721 self.emit_all(transition.emissions, graph_outlet, output)?;
4722 self.complete_all(transition.completed_outlets, graph_outlet, output)?;
4723 self.cancel_all(transition.cancelled_inlets, graph_outlet, output)
4724 }
4725
4726 fn complete<Out>(
4727 &mut self,
4728 inlet: PortId,
4729 graph_outlet: PortId,
4730 output: &mut impl FusedOutputSink<Out>,
4731 ) -> StreamResult<()>
4732 where
4733 Out: Send + 'static,
4734 {
4735 self.bump_event()?;
4736 let stage_index = *self.stage_by_inlet.get(&inlet).ok_or_else(|| {
4737 StreamError::GraphValidation(format!("inlet {} has no owning stage", inlet.as_usize()))
4738 })?;
4739 let transition = self.process_completion(stage_index, inlet)?;
4740 self.emit_all(transition.emissions, graph_outlet, output)?;
4741 self.complete_all(transition.completed_outlets, graph_outlet, output)?;
4742 self.cancel_all(transition.cancelled_inlets, graph_outlet, output)
4743 }
4744
4745 fn request<Out>(
4746 &mut self,
4747 outlet: PortId,
4748 graph_outlet: PortId,
4749 output: &mut impl FusedOutputSink<Out>,
4750 ) -> StreamResult<()>
4751 where
4752 Out: Send + 'static,
4753 {
4754 if self.is_outlet_cancelled(outlet) {
4755 return Ok(());
4756 }
4757 self.bump_event()?;
4758 let Some(stage_index) = self.stage_by_outlet.get(&outlet).copied() else {
4759 return Ok(());
4760 };
4761 let transition = self.process_pull(stage_index, outlet)?;
4762 self.emit_all(transition.emissions, graph_outlet, output)?;
4763 self.complete_all(transition.completed_outlets, graph_outlet, output)?;
4764 self.cancel_all(transition.cancelled_inlets, graph_outlet, output)
4765 }
4766
4767 fn downstream_finish<Out>(
4768 &mut self,
4769 outlet: PortId,
4770 graph_outlet: PortId,
4771 output: &mut impl FusedOutputSink<Out>,
4772 ) -> StreamResult<()>
4773 where
4774 Out: Send + 'static,
4775 {
4776 if !self.cancelled_outlets.insert(outlet) {
4777 return Ok(());
4778 }
4779 self.bump_event()?;
4780 let stage_index = *self.stage_by_outlet.get(&outlet).ok_or_else(|| {
4781 StreamError::GraphValidation(format!(
4782 "outlet {} has no owning stage",
4783 outlet.as_usize()
4784 ))
4785 })?;
4786 let transition = self.process_downstream_finish(stage_index, outlet)?;
4787 self.emit_all(transition.emissions, graph_outlet, output)?;
4788 self.complete_all(transition.completed_outlets, graph_outlet, output)?;
4789 self.cancel_all(transition.cancelled_inlets, graph_outlet, output)
4790 }
4791
4792 fn emit_all<Out>(
4793 &mut self,
4794 emitted: StageEmissions,
4795 graph_outlet: PortId,
4796 output: &mut impl FusedOutputSink<Out>,
4797 ) -> StreamResult<()>
4798 where
4799 Out: Send + 'static,
4800 {
4801 match emitted {
4802 StageEmissions::None => Ok(()),
4803 StageEmissions::One(outlet, value) => self.emit(outlet, value, graph_outlet, output),
4804 StageEmissions::Two(first, second) => {
4805 self.emit(first.0, first.1, graph_outlet, output)?;
4806 self.emit(second.0, second.1, graph_outlet, output)
4807 }
4808 StageEmissions::Many(emitted) => {
4809 for (outlet, value) in emitted {
4810 self.emit(outlet, value, graph_outlet, output)?;
4811 }
4812 Ok(())
4813 }
4814 }
4815 }
4816
4817 fn complete_all<Out>(
4818 &mut self,
4819 completed_outlets: Vec<PortId>,
4820 graph_outlet: PortId,
4821 output: &mut impl FusedOutputSink<Out>,
4822 ) -> StreamResult<()>
4823 where
4824 Out: Send + 'static,
4825 {
4826 for outlet in completed_outlets {
4827 if self.is_outlet_cancelled(outlet) {
4828 continue;
4829 }
4830 self.complete_outlet(outlet, graph_outlet, output)?;
4831 }
4832 Ok(())
4833 }
4834
4835 fn cancel_all<Out>(
4836 &mut self,
4837 cancelled_inlets: Vec<PortId>,
4838 graph_outlet: PortId,
4839 output: &mut impl FusedOutputSink<Out>,
4840 ) -> StreamResult<()>
4841 where
4842 Out: Send + 'static,
4843 {
4844 for inlet in cancelled_inlets {
4845 if let Some(outlet) = self.edge_by_inlet.get(&inlet).copied() {
4846 self.downstream_finish(outlet, graph_outlet, output)?;
4847 }
4848 }
4849 Ok(())
4850 }
4851
4852 fn emit<Out>(
4853 &mut self,
4854 outlet: PortId,
4855 value: DatumValue,
4856 graph_outlet: PortId,
4857 output: &mut impl FusedOutputSink<Out>,
4858 ) -> StreamResult<()>
4859 where
4860 Out: Send + 'static,
4861 {
4862 self.bump_event()?;
4863 if self.is_outlet_cancelled(outlet) {
4864 return Ok(());
4865 }
4866 if outlet == graph_outlet {
4867 output.emit(downcast_datum(value, "emit", || {
4868 format!("outlet#{}", outlet.as_usize())
4869 })?)?;
4870 self.request(outlet, graph_outlet, output)?;
4871 return Ok(());
4872 }
4873
4874 let Some(inlet) = self.edge_by_outlet.get(&outlet).copied() else {
4875 return Err(StreamError::GraphValidation(format!(
4876 "outlet {} is neither connected nor graph output",
4877 outlet.as_usize()
4878 )));
4879 };
4880 self.deliver(inlet, value, graph_outlet, output)?;
4881 if self
4882 .stage_by_outlet
4883 .get(&outlet)
4884 .copied()
4885 .is_some_and(|stage_index| {
4886 matches!(
4887 self.graph.stages[stage_index].spec.kind,
4888 StageKind::Opaque | StageKind::Unzip { .. } | StageKind::Partition { .. }
4889 )
4890 })
4891 {
4892 self.request(outlet, graph_outlet, output)?;
4893 }
4894 Ok(())
4895 }
4896
4897 fn complete_outlet<Out>(
4898 &mut self,
4899 outlet: PortId,
4900 graph_outlet: PortId,
4901 output: &mut impl FusedOutputSink<Out>,
4902 ) -> StreamResult<()>
4903 where
4904 Out: Send + 'static,
4905 {
4906 self.bump_event()?;
4907 if outlet == graph_outlet {
4908 return Ok(());
4909 }
4910 let Some(inlet) = self.edge_by_outlet.get(&outlet).copied() else {
4911 return Err(StreamError::GraphValidation(format!(
4912 "outlet {} is neither connected nor graph output",
4913 outlet.as_usize()
4914 )));
4915 };
4916 self.complete(inlet, graph_outlet, output)
4917 }
4918
4919 fn process_stage(
4920 &mut self,
4921 stage_index: usize,
4922 inlet: PortId,
4923 value: DatumValue,
4924 ) -> StreamResult<StageTransition> {
4925 let stage = &self.graph.stages[stage_index];
4926 match &stage.spec.kind {
4927 StageKind::Identity => Ok(StageTransition::emit(StageEmissions::One(
4928 single_outlet(stage)?,
4929 value,
4930 ))),
4931 StageKind::Opaque => {
4932 if let Some(logic) = self
4933 .opaque_logics
4934 .get_mut(stage_index)
4935 .and_then(|l| l.as_mut())
4936 {
4937 logic.drain_async_callbacks();
4938 let inlet_ref = stage.spec.inlets.iter().find(|i| i.id() == inlet).cloned();
4939 if let Some(inlet_ref) = inlet_ref {
4940 logic.offer_datum(inlet, value)?;
4941 let mut handler = logic.take_in_handler(inlet);
4942 let result = if let Some(ref mut handler) = handler {
4943 let inlet_any = inlet_ref;
4944 handler.on_push(logic, inlet_any)
4945 } else {
4946 Ok(())
4947 };
4948 if let Some(handler) = handler {
4949 logic.restore_in_handler(inlet, handler);
4950 }
4951 result?;
4952 }
4953 self.collect_opaque_emissions(stage, stage_index)
4954 } else {
4955 Ok(StageTransition::emit(StageEmissions::One(
4956 single_outlet(stage)?,
4957 value,
4958 )))
4959 }
4960 }
4961 StageKind::Map(map) => Ok(StageTransition::emit(StageEmissions::One(
4962 single_outlet(stage)?,
4963 (map.erased)(value)?,
4964 ))),
4965 StageKind::AsyncBoundary => {
4966 self.async_boundary_crossings += 1;
4967 Ok(StageTransition::emit(StageEmissions::One(
4968 single_outlet(stage)?,
4969 value,
4970 )))
4971 }
4972 StageKind::Broadcast => {
4973 let fast_path = match &self.stage_states[stage_index] {
4974 StageState::Broadcast { fast_path, .. } => *fast_path,
4975 _ => None,
4976 };
4977 if let Some(fast_path) = fast_path {
4978 self.broadcast_zip_emissions(fast_path, value)
4979 .map(StageTransition::emit)
4980 } else {
4981 broadcast_emissions(&stage.spec.outlets, value).map(StageTransition::emit)
4982 }
4983 }
4984 StageKind::Balance => {
4985 let outlets = &stage.spec.outlets;
4986 if outlets.is_empty() {
4987 return Err(StreamError::GraphValidation(
4988 "balance has no outlets".into(),
4989 ));
4990 }
4991 let StageState::Balance {
4992 fast_path,
4993 next,
4994 cancelled_outlets,
4995 live_outlets,
4996 } = &mut self.stage_states[stage_index]
4997 else {
4998 return Err(StreamError::GraphValidation(
4999 "balance state is missing".into(),
5000 ));
5001 };
5002 if *live_outlets == 0 {
5003 return Ok(StageTransition::none());
5004 }
5005 if let Some(fast_path) = *fast_path {
5006 return self
5007 .balance_merge_emissions(fast_path, value)
5008 .map(StageTransition::emit);
5009 }
5010 let mut selected = None;
5011 for offset in 0..outlets.len() {
5012 let index = (*next + offset) % outlets.len();
5013 if !cancelled_outlets[index] {
5014 selected = Some(index);
5015 break;
5016 }
5017 }
5018 let index = selected.ok_or_else(|| {
5019 StreamError::GraphValidation("balance has no live outlets".into())
5020 })?;
5021 let outlet = outlets[index].id();
5022 *next = (index + 1) % outlets.len();
5023 Ok(StageTransition::emit(StageEmissions::One(outlet, value)))
5024 }
5025 StageKind::Merge | StageKind::MergePreferred | StageKind::MergePrioritized { .. } => {
5026 let StageState::Merge { completed, .. } = &self.stage_states[stage_index] else {
5027 return Err(StreamError::GraphValidation(
5028 "merge state is missing".into(),
5029 ));
5030 };
5031 if *completed {
5032 return Ok(StageTransition::none());
5033 }
5034 Ok(StageTransition::emit(StageEmissions::One(
5035 single_outlet(stage)?,
5036 value,
5037 )))
5038 }
5039 StageKind::Concat | StageKind::Interleave { .. } => {
5040 let StageState::Merge { completed, .. } = &self.stage_states[stage_index] else {
5041 return Err(StreamError::GraphValidation(
5042 "fan-in state is missing".into(),
5043 ));
5044 };
5045 if *completed {
5046 return Ok(StageTransition::none());
5047 }
5048 Ok(StageTransition::emit(StageEmissions::One(
5049 single_outlet(stage)?,
5050 value,
5051 )))
5052 }
5053 StageKind::OrElse { primary_inlet } => {
5054 let StageState::OrElse {
5055 primary_emitted,
5056 buffer,
5057 primary_closed,
5058 completed,
5059 ..
5060 } = &mut self.stage_states[stage_index]
5061 else {
5062 return Err(StreamError::GraphValidation(
5063 "or-else state is missing".into(),
5064 ));
5065 };
5066 if *completed {
5067 return Ok(StageTransition::none());
5068 }
5069 if inlet == *primary_inlet {
5070 *primary_emitted = true;
5071 buffer.clear();
5072 Ok(StageTransition::emit(StageEmissions::One(
5073 single_outlet(stage)?,
5074 value,
5075 )))
5076 } else if *primary_emitted {
5077 Ok(StageTransition::none())
5078 } else if *primary_closed {
5079 Ok(StageTransition::emit(StageEmissions::One(
5080 single_outlet(stage)?,
5081 value,
5082 )))
5083 } else {
5084 buffer.push_back(value);
5085 Ok(StageTransition::none())
5086 }
5087 }
5088 StageKind::Zip(zip) => {
5089 if stage.spec.inlets.len() != 2 {
5090 return Err(StreamError::GraphValidation(format!(
5091 "zip stage {} expected 2 inlets, got {}",
5092 stage.spec.name(),
5093 stage.spec.inlets.len()
5094 )));
5095 }
5096
5097 let ready = {
5098 let StageState::Zip {
5099 left_inlet,
5100 right_inlet,
5101 left,
5102 right,
5103 left_pending_complete,
5104 right_pending_complete,
5105 completed,
5106 } = &mut self.stage_states[stage_index]
5107 else {
5108 return Err(StreamError::GraphValidation("zip state is missing".into()));
5109 };
5110
5111 if *completed {
5112 return Ok(StageTransition::none());
5113 }
5114
5115 if inlet == *left_inlet {
5119 left.push_back(value);
5120 } else if inlet == *right_inlet {
5121 right.push_back(value);
5122 } else {
5123 return Err(StreamError::GraphValidation(format!(
5124 "zip inlet {} is not part of the stage",
5125 inlet.as_usize()
5126 )));
5127 }
5128
5129 match (left.front().is_some(), right.front().is_some()) {
5130 (true, true) => {
5131 let left_item =
5132 left.pop_front().expect("zip left buffer had an element");
5133 let right_item =
5134 right.pop_front().expect("zip right buffer had an element");
5135 let should_complete = (*left_pending_complete && left.is_empty())
5136 || (*right_pending_complete && right.is_empty());
5137 Some((left_item, right_item, should_complete))
5138 }
5139 _ => None,
5140 }
5141 };
5142
5143 if let Some((left, right, should_complete)) = ready {
5144 let outlet = single_outlet(stage)?;
5145 if should_complete {
5146 let StageState::Zip { completed, .. } = &mut self.stage_states[stage_index]
5147 else {
5148 return Err(StreamError::GraphValidation(
5149 "zip state is missing".into(),
5150 ));
5151 };
5152 *completed = true;
5153 }
5154 let mut transition =
5155 StageTransition::emit(StageEmissions::One(outlet, zip(left, right)?));
5156 if should_complete {
5157 transition.completed_outlets.push(outlet);
5158 }
5159 Ok(transition)
5160 } else {
5161 Ok(StageTransition::none())
5162 }
5163 }
5164 StageKind::Unzip { split, .. } => {
5165 let (fan_in, zip_fast) = match &self.stage_states[stage_index] {
5166 StageState::Unzip {
5167 fast_path,
5168 zip_fast_path,
5169 ..
5170 } => (*fast_path, *zip_fast_path),
5171 _ => (None, None),
5172 };
5173 if let Some(zip_fast) = zip_fast {
5174 let (out0_val, out1_val) = split(value);
5175 let zip_stage = &self.graph.stages[zip_fast.zip_stage_index];
5176 let StageKind::Zip(zip) = &zip_stage.spec.kind else {
5177 return Err(StreamError::GraphValidation(
5178 "unzip-zip fast path references a non-zip stage".into(),
5179 ));
5180 };
5181 let outlet = single_outlet(zip_stage)?;
5182 let zipped = zip(out0_val, out1_val)?;
5183 return Ok(StageTransition::emit(StageEmissions::One(outlet, zipped)));
5184 }
5185 if let Some(fast_path) = fan_in {
5186 let (out0_val, out1_val) = split(value);
5187 let target = &self.graph.stages[fast_path.fan_in_stage_index];
5188 match &target.spec.kind {
5189 StageKind::MergeSorted(compare) => {
5190 let result = {
5191 let StageState::MergeSorted {
5192 left,
5193 right,
5194 left_closed,
5195 right_closed,
5196 pending,
5197 completed,
5198 } = &mut self.stage_states[fast_path.fan_in_stage_index]
5199 else {
5200 return Err(StreamError::GraphValidation(
5201 "merge-sorted state is missing".into(),
5202 ));
5203 };
5204 if *completed {
5205 return Ok(StageTransition::none());
5206 }
5207 if fast_path.target_inlet_indices[0] == 0 {
5210 left.push_back(out0_val);
5211 right.push_back(out1_val);
5212 } else {
5213 left.push_back(out1_val);
5214 right.push_back(out0_val);
5215 }
5216
5217 loop {
5218 let next = match (left.front(), right.front()) {
5219 (Some(l), Some(r)) => {
5220 if compare(l, r) != std::cmp::Ordering::Greater {
5221 left.pop_front()
5222 } else {
5223 right.pop_front()
5224 }
5225 }
5226 (Some(_), None) if *right_closed => left.pop_front(),
5227 (None, Some(_)) if *left_closed => right.pop_front(),
5228 _ => break,
5229 };
5230 if let Some(val) = next {
5231 pending.push_back(val);
5232 } else {
5233 break;
5234 }
5235 }
5236
5237 if let Some(output) = pending.pop_front() {
5238 let outlet = single_outlet(target)?;
5239 let all_done = *left_closed
5240 && *right_closed
5241 && left.is_empty()
5242 && right.is_empty()
5243 && pending.is_empty();
5244 if all_done {
5245 *completed = true;
5246 StageTransition::emit(StageEmissions::One(outlet, output))
5247 .with_completion(vec![outlet])
5248 } else {
5249 StageTransition::emit(StageEmissions::One(outlet, output))
5250 }
5251 } else {
5252 StageTransition::none()
5253 }
5254 };
5255 Ok(result)
5256 }
5257 StageKind::MergeSequence {
5258 extract_sequence,
5259 input_count,
5260 ..
5261 } => {
5262 let result = {
5263 let StageState::MergeSequence {
5264 next_sequence,
5265 pending,
5266 completed_count,
5267 output_buffer,
5268 completed,
5269 } = &mut self.stage_states[fast_path.fan_in_stage_index]
5270 else {
5271 return Err(StreamError::GraphValidation(
5272 "merge-sequence state is missing".into(),
5273 ));
5274 };
5275 if *completed {
5276 return Ok(StageTransition::none());
5277 }
5278 for val in [out0_val, out1_val] {
5279 let seq = extract_sequence(&val);
5280 if seq == *next_sequence {
5281 output_buffer.push_back(val);
5282 *next_sequence += 1;
5283 while let Some(index) =
5284 pending.iter().position(|(s, _)| *s == *next_sequence)
5285 {
5286 let (_, item) = pending.remove(index);
5287 output_buffer.push_back(item);
5288 *next_sequence += 1;
5289 }
5290 } else {
5291 if pending.iter().any(|(s, _)| *s == seq) {
5292 return Err(StreamError::Failed(format!(
5293 "duplicate sequence {seq} on merge sequence"
5294 )));
5295 }
5296 pending.push((seq, val));
5297 pending.sort_by_key(|(s, _)| *s);
5298 while let Some(index) =
5299 pending.iter().position(|(s, _)| *s == *next_sequence)
5300 {
5301 let (_, item) = pending.remove(index);
5302 output_buffer.push_back(item);
5303 *next_sequence += 1;
5304 }
5305 }
5306 }
5307
5308 if !output_buffer.is_empty() {
5309 let outlet = single_outlet(target)?;
5310 let all_done = *completed_count >= *input_count;
5311 let emissions: Vec<_> =
5312 output_buffer.drain(..).map(|v| (outlet, v)).collect();
5313 if all_done {
5314 *completed = true;
5315 StageTransition::emit(StageEmissions::Many(emissions))
5316 .with_completion(vec![outlet])
5317 } else {
5318 StageTransition::emit(StageEmissions::Many(emissions))
5319 }
5320 } else {
5321 StageTransition::none()
5322 }
5323 };
5324 Ok(result)
5325 }
5326 StageKind::MergeLatest {
5327 input_count,
5328 build_snapshot,
5329 ..
5330 } => {
5331 let result = {
5332 let StageState::MergeLatest {
5333 latest,
5334 seen_count,
5335 completed_count,
5336 pending,
5337 completed,
5338 } = &mut self.stage_states[fast_path.fan_in_stage_index]
5339 else {
5340 return Err(StreamError::GraphValidation(
5341 "merge-latest state is missing".into(),
5342 ));
5343 };
5344 if *completed {
5345 return Ok(StageTransition::none());
5346 }
5347 let inlets = &target.spec.inlets;
5348 for (idx, val) in fast_path
5349 .target_inlet_indices
5350 .into_iter()
5351 .zip([out0_val, out1_val])
5352 {
5353 if idx < inlets.len() && latest[idx].is_none() {
5354 *seen_count += 1;
5355 }
5356 latest[idx] = Some(val);
5357 if *seen_count >= *input_count {
5358 let values: Vec<&DatumValue> =
5359 latest.iter().filter_map(|v| v.as_ref()).collect();
5360 let snapshot = build_snapshot(&values);
5361 pending.push_back(snapshot);
5362 }
5363 }
5364
5365 if !pending.is_empty() {
5366 let outlet = single_outlet(target)?;
5367 let all_done = *completed_count >= *input_count;
5368 let emissions: Vec<_> =
5369 pending.drain(..).map(|v| (outlet, v)).collect();
5370 if all_done {
5371 *completed = true;
5372 StageTransition::emit(StageEmissions::Many(emissions))
5373 .with_completion(vec![outlet])
5374 } else {
5375 StageTransition::emit(StageEmissions::Many(emissions))
5376 }
5377 } else {
5378 StageTransition::none()
5379 }
5380 };
5381 Ok(result)
5382 }
5383 _ => {
5384 let transition = {
5385 let StageState::Unzip { cancelled, .. } =
5386 &self.stage_states[stage_index]
5387 else {
5388 return Err(StreamError::GraphValidation(
5389 "unzip state is missing".into(),
5390 ));
5391 };
5392 let out0 = stage.spec.outlets.first().map(AnyOutlet::id);
5393 let out1 = stage.spec.outlets.get(1).map(AnyOutlet::id);
5394 let live0 = out0.is_some() && !cancelled[0];
5395 let live1 = out1.is_some() && !cancelled[1];
5396 if live0 && live1 {
5397 StageTransition::emit(StageEmissions::Two(
5398 (out0.unwrap(), out0_val),
5399 (out1.unwrap(), out1_val),
5400 ))
5401 } else if live0 {
5402 StageTransition::emit(StageEmissions::One(
5403 out0.unwrap(),
5404 out0_val,
5405 ))
5406 } else if live1 {
5407 StageTransition::emit(StageEmissions::One(
5408 out1.unwrap(),
5409 out1_val,
5410 ))
5411 } else {
5412 StageTransition::none()
5413 }
5414 };
5415 Ok(transition)
5416 }
5417 }
5418 } else {
5419 let transition = {
5420 let StageState::Unzip { cancelled, .. } = &self.stage_states[stage_index]
5421 else {
5422 return Err(StreamError::GraphValidation(
5423 "unzip state is missing".into(),
5424 ));
5425 };
5426 let (out0_val, out1_val) = split(value);
5427 let out0 = stage.spec.outlets.first().map(AnyOutlet::id);
5428 let out1 = stage.spec.outlets.get(1).map(AnyOutlet::id);
5429 let live0 = out0.is_some() && !cancelled[0];
5430 let live1 = out1.is_some() && !cancelled[1];
5431 if live0 && live1 {
5432 StageTransition::emit(StageEmissions::Two(
5433 (out0.unwrap(), out0_val),
5434 (out1.unwrap(), out1_val),
5435 ))
5436 } else if live0 {
5437 StageTransition::emit(StageEmissions::One(out0.unwrap(), out0_val))
5438 } else if live1 {
5439 StageTransition::emit(StageEmissions::One(out1.unwrap(), out1_val))
5440 } else {
5441 StageTransition::none()
5442 }
5443 };
5444 Ok(transition)
5445 }
5446 }
5447 StageKind::MergeSorted(compare) => {
5448 let result = {
5449 let StageState::MergeSorted {
5450 left,
5451 right,
5452 left_closed,
5453 right_closed,
5454 pending,
5455 completed,
5456 } = &mut self.stage_states[stage_index]
5457 else {
5458 return Err(StreamError::GraphValidation(
5459 "merge-sorted state is missing".into(),
5460 ));
5461 };
5462 if *completed {
5463 return Ok(StageTransition::none());
5464 }
5465 let is_left = stage.spec.inlets.first().is_some_and(|i| i.id() == inlet);
5466 if is_left {
5467 left.push_back(value);
5468 } else {
5469 right.push_back(value);
5470 }
5471
5472 loop {
5473 let next = match (left.front(), right.front()) {
5474 (Some(l), Some(r)) => {
5475 if compare(l, r) != std::cmp::Ordering::Greater {
5476 left.pop_front()
5477 } else {
5478 right.pop_front()
5479 }
5480 }
5481 (Some(_), None) if *right_closed => left.pop_front(),
5482 (None, Some(_)) if *left_closed => right.pop_front(),
5483 _ => break,
5484 };
5485 if let Some(val) = next {
5486 pending.push_back(val);
5487 } else {
5488 break;
5489 }
5490 }
5491
5492 if let Some(output) = pending.pop_front() {
5493 let outlet = single_outlet(stage)?;
5494 let all_done = *left_closed
5495 && *right_closed
5496 && left.is_empty()
5497 && right.is_empty()
5498 && pending.is_empty();
5499 if all_done {
5500 *completed = true;
5501 StageTransition::emit(StageEmissions::One(outlet, output))
5502 .with_completion(vec![outlet])
5503 } else {
5504 StageTransition::emit(StageEmissions::One(outlet, output))
5505 }
5506 } else {
5507 StageTransition::none()
5508 }
5509 };
5510 Ok(result)
5511 }
5512 StageKind::MergeSequence {
5513 input_count,
5514 extract_sequence,
5515 ..
5516 } => {
5517 let result = {
5518 let StageState::MergeSequence {
5519 next_sequence,
5520 pending,
5521 completed_count,
5522 output_buffer,
5523 completed,
5524 } = &mut self.stage_states[stage_index]
5525 else {
5526 return Err(StreamError::GraphValidation(
5527 "merge-sequence state is missing".into(),
5528 ));
5529 };
5530 if *completed {
5531 return Ok(StageTransition::none());
5532 }
5533 let seq = extract_sequence(&value);
5534 if seq == *next_sequence {
5535 output_buffer.push_back(value);
5536 *next_sequence += 1;
5537 while let Some(index) =
5538 pending.iter().position(|(s, _)| *s == *next_sequence)
5539 {
5540 let (_, item) = pending.remove(index);
5541 output_buffer.push_back(item);
5542 *next_sequence += 1;
5543 }
5544 } else {
5545 if pending.iter().any(|(s, _)| *s == seq) {
5546 return Err(StreamError::Failed(format!(
5547 "duplicate sequence {seq} on merge sequence"
5548 )));
5549 }
5550 pending.push((seq, value));
5551 pending.sort_by_key(|(s, _)| *s);
5552 while let Some(index) =
5553 pending.iter().position(|(s, _)| *s == *next_sequence)
5554 {
5555 let (_, item) = pending.remove(index);
5556 output_buffer.push_back(item);
5557 *next_sequence += 1;
5558 }
5559 }
5560
5561 if !output_buffer.is_empty() {
5562 let outlet = single_outlet(stage)?;
5563 let all_done = *completed_count >= *input_count;
5564 let emissions: Vec<_> =
5565 output_buffer.drain(..).map(|v| (outlet, v)).collect();
5566 if all_done {
5567 *completed = true;
5568 StageTransition::emit(StageEmissions::Many(emissions))
5569 .with_completion(vec![outlet])
5570 } else {
5571 StageTransition::emit(StageEmissions::Many(emissions))
5572 }
5573 } else {
5574 StageTransition::none()
5575 }
5576 };
5577 Ok(result)
5578 }
5579 StageKind::MergeLatest {
5580 input_count,
5581 build_snapshot,
5582 ..
5583 } => {
5584 let result = {
5585 let StageState::MergeLatest {
5586 latest,
5587 seen_count,
5588 completed_count,
5589 pending,
5590 completed,
5591 } = &mut self.stage_states[stage_index]
5592 else {
5593 return Err(StreamError::GraphValidation(
5594 "merge-latest state is missing".into(),
5595 ));
5596 };
5597 if *completed {
5598 return Ok(StageTransition::none());
5599 }
5600 let inlet_index = stage
5601 .spec
5602 .inlets
5603 .iter()
5604 .position(|i| i.id() == inlet)
5605 .ok_or_else(|| {
5606 StreamError::GraphValidation(format!(
5607 "merge-latest inlet {} not part of stage",
5608 inlet.as_usize()
5609 ))
5610 })?;
5611 if latest[inlet_index].is_none() {
5612 *seen_count += 1;
5613 }
5614 latest[inlet_index] = Some(value);
5615 if *seen_count >= *input_count {
5616 let values: Vec<&DatumValue> =
5617 latest.iter().filter_map(|v| v.as_ref()).collect();
5618 let snapshot = build_snapshot(&values);
5619 pending.push_back(snapshot);
5620 }
5621
5622 if !pending.is_empty() {
5623 let outlet = single_outlet(stage)?;
5624 let all_done = *completed_count >= *input_count;
5625 let emissions: Vec<_> = pending.drain(..).map(|v| (outlet, v)).collect();
5626 if all_done {
5627 *completed = true;
5628 StageTransition::emit(StageEmissions::Many(emissions))
5629 .with_completion(vec![outlet])
5630 } else {
5631 StageTransition::emit(StageEmissions::Many(emissions))
5632 }
5633 } else {
5634 StageTransition::none()
5635 }
5636 };
5637 Ok(result)
5638 }
5639 StageKind::Partition {
5640 output_count,
5641 partitioner,
5642 ..
5643 } => {
5644 let fast_path = match &self.stage_states[stage_index] {
5645 StageState::Partition { fast_path, .. } => *fast_path,
5646 _ => None,
5647 };
5648 if let Some(fast_path) = fast_path {
5649 let idx = partitioner(&value);
5650 if idx >= *output_count {
5651 return Err(StreamError::Failed(format!(
5652 "partitioner returned out-of-bounds index {idx} for {output_count} outputs"
5653 )));
5654 }
5655 let merge_stage = &self.graph.stages[fast_path.merge_stage_index];
5656 let StageState::Merge { completed, .. } =
5657 &self.stage_states[fast_path.merge_stage_index]
5658 else {
5659 return Err(StreamError::GraphValidation(
5660 "partition-merge fast path: merge state missing".into(),
5661 ));
5662 };
5663 if *completed {
5664 return Ok(StageTransition::none());
5665 }
5666 return Ok(StageTransition::emit(StageEmissions::One(
5667 single_outlet(merge_stage)?,
5668 value,
5669 )));
5670 }
5671 let result = {
5672 let StageState::Partition {
5673 pending,
5674 upstream_closed: _,
5675 demand,
5676 cancelled,
5677 output_count: _,
5678 completed,
5679 ..
5680 } = &mut self.stage_states[stage_index]
5681 else {
5682 return Err(StreamError::GraphValidation(
5683 "partition state is missing".into(),
5684 ));
5685 };
5686 if *completed {
5687 return Ok(StageTransition::none());
5688 }
5689 let idx = partitioner(&value);
5690 if idx >= *output_count {
5691 return Err(StreamError::Failed(format!(
5692 "partitioner returned out-of-bounds index {idx} for {output_count} outputs"
5693 )));
5694 }
5695 if cancelled[idx] {
5696 return Ok(StageTransition::none());
5697 }
5698 if demand[idx] {
5699 demand[idx] = false;
5700 let outlet = stage.spec.outlets[idx].id();
5701 StageTransition::emit(StageEmissions::One(outlet, value))
5702 } else {
5703 *pending = Some((idx, value));
5704 StageTransition::none()
5705 }
5706 };
5707 Ok(result)
5708 }
5709 }
5710 }
5711
5712 fn process_completion(
5713 &mut self,
5714 stage_index: usize,
5715 inlet: PortId,
5716 ) -> StreamResult<StageTransition> {
5717 let stage = &self.graph.stages[stage_index];
5718 match &stage.spec.kind {
5719 StageKind::Identity | StageKind::Map(_) | StageKind::AsyncBoundary => {
5720 Ok(StageTransition::emit(StageEmissions::None)
5721 .with_completion(vec![single_outlet(stage)?]))
5722 }
5723 StageKind::Opaque => {
5724 if let Some(logic) = self
5725 .opaque_logics
5726 .get_mut(stage_index)
5727 .and_then(|l| l.as_mut())
5728 {
5729 logic.drain_async_callbacks();
5730 logic.complete_inlet_by_id(inlet)?;
5731 let inlet_ref = stage.spec.inlets.iter().find(|i| i.id() == inlet).cloned();
5732 if let Some(inlet_ref) = inlet_ref {
5733 let mut handler = logic.take_in_handler(inlet);
5734 let result = if let Some(ref mut handler) = handler {
5735 let inlet_any = inlet_ref;
5736 handler.on_upstream_finish(logic, inlet_any)
5737 } else {
5738 Ok(())
5739 };
5740 if let Some(handler) = handler {
5741 logic.restore_in_handler(inlet, handler);
5742 }
5743 result?;
5744 }
5745 self.collect_opaque_emissions(stage, stage_index)
5746 } else {
5747 Ok(StageTransition::emit(StageEmissions::None)
5748 .with_completion(vec![single_outlet(stage)?]))
5749 }
5750 }
5751 StageKind::Broadcast | StageKind::Balance => {
5752 Ok(StageTransition::emit(StageEmissions::None)
5753 .with_completion(stage.spec.outlets.iter().map(AnyOutlet::id).collect()))
5754 }
5755 StageKind::Merge | StageKind::MergePreferred | StageKind::MergePrioritized { .. } => {
5756 let StageState::Merge {
5757 open_inputs,
5758 eager_complete,
5759 completed,
5760 } = &mut self.stage_states[stage_index]
5761 else {
5762 return Err(StreamError::GraphValidation(
5763 "merge state is missing".into(),
5764 ));
5765 };
5766
5767 if *completed {
5768 return Ok(StageTransition::none());
5769 }
5770 if *open_inputs == 0 {
5771 return Ok(StageTransition::none());
5772 }
5773 *open_inputs -= 1;
5774 if *eager_complete || *open_inputs == 0 {
5775 *completed = true;
5776 Ok(StageTransition::emit(StageEmissions::None)
5777 .with_completion(vec![single_outlet(stage)?]))
5778 } else {
5779 Ok(StageTransition::none())
5780 }
5781 }
5782 StageKind::Concat | StageKind::Interleave { .. } => {
5783 let StageState::Merge {
5784 open_inputs,
5785 eager_complete,
5786 completed,
5787 } = &mut self.stage_states[stage_index]
5788 else {
5789 return Err(StreamError::GraphValidation(
5790 "fan-in state is missing".into(),
5791 ));
5792 };
5793
5794 if *completed {
5795 return Ok(StageTransition::none());
5796 }
5797 if *open_inputs == 0 {
5798 return Ok(StageTransition::none());
5799 }
5800 *open_inputs -= 1;
5801 if *eager_complete || *open_inputs == 0 {
5802 *completed = true;
5803 Ok(StageTransition::emit(StageEmissions::None)
5804 .with_completion(vec![single_outlet(stage)?]))
5805 } else {
5806 Ok(StageTransition::none())
5807 }
5808 }
5809 StageKind::OrElse { primary_inlet } => {
5810 let StageState::OrElse {
5811 primary_emitted,
5812 buffer,
5813 primary_closed,
5814 secondary_closed,
5815 completed,
5816 ..
5817 } = &mut self.stage_states[stage_index]
5818 else {
5819 return Err(StreamError::GraphValidation(
5820 "or-else state is missing".into(),
5821 ));
5822 };
5823 if *completed {
5824 return Ok(StageTransition::none());
5825 }
5826 if inlet == *primary_inlet {
5827 *primary_closed = true;
5828 if *primary_emitted {
5829 *completed = true;
5830 buffer.clear();
5831 Ok(StageTransition::emit(StageEmissions::None)
5832 .with_completion(vec![single_outlet(stage)?]))
5833 } else {
5834 let outlet = single_outlet(stage)?;
5835 let emissions: Vec<_> = buffer.drain(..).map(|v| (outlet, v)).collect();
5836 if *secondary_closed {
5837 *completed = true;
5838 let transition = if emissions.is_empty() {
5839 StageTransition::emit(StageEmissions::None)
5840 } else {
5841 StageTransition::emit(StageEmissions::Many(emissions))
5842 };
5843 Ok(transition.with_completion(vec![outlet]))
5844 } else {
5845 if emissions.is_empty() {
5846 Ok(StageTransition::none())
5847 } else {
5848 Ok(StageTransition::emit(StageEmissions::Many(emissions)))
5849 }
5850 }
5851 }
5852 } else {
5853 *secondary_closed = true;
5854 if *primary_closed && !*primary_emitted {
5855 let outlet = single_outlet(stage)?;
5856 let emissions: Vec<_> = buffer.drain(..).map(|v| (outlet, v)).collect();
5857 *completed = true;
5858 if emissions.is_empty() {
5859 Ok(StageTransition::emit(StageEmissions::None)
5860 .with_completion(vec![outlet]))
5861 } else {
5862 Ok(StageTransition::emit(StageEmissions::Many(emissions))
5863 .with_completion(vec![outlet]))
5864 }
5865 } else {
5866 Ok(StageTransition::none())
5867 }
5868 }
5869 }
5870 StageKind::Zip(_) => {
5871 let StageState::Zip {
5872 left_inlet,
5873 right_inlet,
5874 left,
5875 right,
5876 left_pending_complete,
5877 right_pending_complete,
5878 completed,
5879 } = &mut self.stage_states[stage_index]
5880 else {
5881 return Err(StreamError::GraphValidation("zip state is missing".into()));
5882 };
5883 if *completed {
5884 return Ok(StageTransition::none());
5885 }
5886 let finishes_left = inlet == *left_inlet;
5887 let finishes_right = inlet == *right_inlet;
5888 if (finishes_left && left.is_empty()) || (finishes_right && right.is_empty()) {
5889 *completed = true;
5890 Ok(StageTransition::emit(StageEmissions::None)
5891 .with_completion(vec![single_outlet(stage)?]))
5892 } else {
5893 if finishes_left {
5894 *left_pending_complete = true;
5895 }
5896 if finishes_right {
5897 *right_pending_complete = true;
5898 }
5899 Ok(StageTransition::none())
5900 }
5901 }
5902 StageKind::Unzip { .. } => {
5903 let (fan_in, zip_fast) = match &self.stage_states[stage_index] {
5904 StageState::Unzip {
5905 fast_path,
5906 zip_fast_path,
5907 ..
5908 } => (*fast_path, *zip_fast_path),
5909 _ => (None, None),
5910 };
5911 if let Some(zip_fast) = zip_fast {
5912 let StageState::Unzip {
5913 upstream_closed, ..
5914 } = &mut self.stage_states[stage_index]
5915 else {
5916 return Err(StreamError::GraphValidation(
5917 "unzip state is missing".into(),
5918 ));
5919 };
5920 *upstream_closed = true;
5921 let zip_stage = &self.graph.stages[zip_fast.zip_stage_index];
5922 return Ok(StageTransition::emit(StageEmissions::None)
5923 .with_completion(vec![single_outlet(zip_stage)?]));
5924 }
5925 if let Some(fast_path) = fan_in {
5926 let StageState::Unzip {
5927 upstream_closed, ..
5928 } = &mut self.stage_states[stage_index]
5929 else {
5930 return Err(StreamError::GraphValidation(
5931 "unzip state is missing".into(),
5932 ));
5933 };
5934 *upstream_closed = true;
5935 let target_stage = &self.graph.stages[fast_path.fan_in_stage_index];
5936 let target_inlets = [
5940 target_stage.spec.inlets[fast_path.target_inlet_indices[0]].id(),
5941 target_stage.spec.inlets[fast_path.target_inlet_indices[1]].id(),
5942 ];
5943 let mut combined = StageTransition::none();
5944 for target_inlet in target_inlets {
5945 let t =
5946 self.process_completion(fast_path.fan_in_stage_index, target_inlet)?;
5947 combined.emissions = merge_emissions(combined.emissions, t.emissions);
5948 combined.completed_outlets.extend(t.completed_outlets);
5949 combined.cancelled_inlets.extend(t.cancelled_inlets);
5950 }
5951 Ok(combined)
5952 } else {
5953 let StageState::Unzip {
5954 upstream_closed, ..
5955 } = &mut self.stage_states[stage_index]
5956 else {
5957 return Err(StreamError::GraphValidation(
5958 "unzip state is missing".into(),
5959 ));
5960 };
5961 *upstream_closed = true;
5962 Ok(StageTransition::emit(StageEmissions::None)
5963 .with_completion(stage.spec.outlets.iter().map(AnyOutlet::id).collect()))
5964 }
5965 }
5966 StageKind::MergeSorted(compare) => {
5967 let result = {
5968 let StageState::MergeSorted {
5969 left,
5970 right,
5971 left_closed,
5972 right_closed,
5973 pending,
5974 completed,
5975 } = &mut self.stage_states[stage_index]
5976 else {
5977 return Err(StreamError::GraphValidation(
5978 "merge-sorted state is missing".into(),
5979 ));
5980 };
5981 if *completed {
5982 return Ok(StageTransition::none());
5983 }
5984 let is_left = stage.spec.inlets.first().is_some_and(|i| i.id() == inlet);
5985 if is_left {
5986 *left_closed = true;
5987 } else {
5988 *right_closed = true;
5989 }
5990
5991 loop {
5992 let next = match (left.front(), right.front()) {
5993 (Some(l), Some(r)) => {
5994 if compare(l, r) != std::cmp::Ordering::Greater {
5995 left.pop_front()
5996 } else {
5997 right.pop_front()
5998 }
5999 }
6000 (Some(_), None) if *right_closed => left.pop_front(),
6001 (None, Some(_)) if *left_closed => right.pop_front(),
6002 _ => break,
6003 };
6004 if let Some(val) = next {
6005 pending.push_back(val);
6006 } else {
6007 break;
6008 }
6009 }
6010
6011 if pending.is_empty() {
6012 let all_done =
6013 *left_closed && *right_closed && left.is_empty() && right.is_empty();
6014 if all_done {
6015 *completed = true;
6016 StageTransition::emit(StageEmissions::None)
6017 .with_completion(vec![single_outlet(stage)?])
6018 } else {
6019 StageTransition::none()
6020 }
6021 } else {
6022 let outlet = single_outlet(stage)?;
6023 let emissions: Vec<_> = pending.drain(..).map(|v| (outlet, v)).collect();
6024 let all_done =
6025 *left_closed && *right_closed && left.is_empty() && right.is_empty();
6026 if all_done {
6027 *completed = true;
6028 StageTransition::emit(StageEmissions::Many(emissions))
6029 .with_completion(vec![outlet])
6030 } else {
6031 StageTransition::emit(StageEmissions::Many(emissions))
6032 }
6033 }
6034 };
6035 Ok(result)
6036 }
6037 StageKind::MergeSequence { input_count, .. } => {
6038 let result = {
6039 let StageState::MergeSequence {
6040 next_sequence,
6041 pending,
6042 completed_count,
6043 output_buffer,
6044 completed,
6045 } = &mut self.stage_states[stage_index]
6046 else {
6047 return Err(StreamError::GraphValidation(
6048 "merge-sequence state is missing".into(),
6049 ));
6050 };
6051 if *completed {
6052 return Ok(StageTransition::none());
6053 }
6054 *completed_count += 1;
6055 if *completed_count >= *input_count && output_buffer.is_empty() {
6056 if !pending.is_empty() {
6057 return Err(StreamError::Failed(format!(
6061 "expected sequence {next_sequence}, but all input ports have pushed or are complete",
6062 )));
6063 }
6064 *completed = true;
6065 StageTransition::emit(StageEmissions::None)
6066 .with_completion(vec![single_outlet(stage)?])
6067 } else {
6068 StageTransition::none()
6069 }
6070 };
6071 Ok(result)
6072 }
6073 StageKind::MergeLatest {
6074 input_count,
6075 eager_complete,
6076 ..
6077 } => {
6078 let result = {
6079 let StageState::MergeLatest {
6080 completed_count,
6081 pending,
6082 completed,
6083 ..
6084 } = &mut self.stage_states[stage_index]
6085 else {
6086 return Err(StreamError::GraphValidation(
6087 "merge-latest state is missing".into(),
6088 ));
6089 };
6090 if *completed {
6091 return Ok(StageTransition::none());
6092 }
6093 *completed_count += 1;
6094 let all_done = *completed_count >= *input_count;
6099 let eager_done = *eager_complete && pending.is_empty();
6100 if all_done || eager_done {
6101 *completed = true;
6102 StageTransition::emit(StageEmissions::None)
6103 .with_completion(vec![single_outlet(stage)?])
6104 } else {
6105 StageTransition::none()
6106 }
6107 };
6108 Ok(result)
6109 }
6110 StageKind::Partition { .. } => {
6111 let fast_path = match &self.stage_states[stage_index] {
6112 StageState::Partition { fast_path, .. } => *fast_path,
6113 _ => None,
6114 };
6115 let result = {
6116 let StageState::Partition {
6117 pending,
6118 upstream_closed,
6119 completed,
6120 ..
6121 } = &mut self.stage_states[stage_index]
6122 else {
6123 return Err(StreamError::GraphValidation(
6124 "partition state is missing".into(),
6125 ));
6126 };
6127 if *completed {
6128 return Ok(StageTransition::none());
6129 }
6130 *upstream_closed = true;
6131 if pending.is_none() {
6132 *completed = true;
6133 if let Some(fast_path) = fast_path {
6134 let merge_stage = &self.graph.stages[fast_path.merge_stage_index];
6135 StageTransition::emit(StageEmissions::None)
6136 .with_completion(vec![single_outlet(merge_stage)?])
6137 } else {
6138 StageTransition::emit(StageEmissions::None).with_completion(
6139 stage.spec.outlets.iter().map(AnyOutlet::id).collect(),
6140 )
6141 }
6142 } else {
6143 StageTransition::none()
6144 }
6145 };
6146 Ok(result)
6147 }
6148 }
6149 }
6150
6151 fn process_pull(
6152 &mut self,
6153 stage_index: usize,
6154 outlet: PortId,
6155 ) -> StreamResult<StageTransition> {
6156 let stage = &self.graph.stages[stage_index];
6157 match &stage.spec.kind {
6158 StageKind::Opaque => {
6159 if let Some(logic) = self
6160 .opaque_logics
6161 .get_mut(stage_index)
6162 .and_then(|l| l.as_mut())
6163 {
6164 logic.drain_async_callbacks();
6165 logic.set_demand_by_id(outlet)?;
6166 let outlet_ref = stage
6167 .spec
6168 .outlets
6169 .iter()
6170 .find(|o| o.id() == outlet)
6171 .cloned();
6172 if let Some(outlet_ref) = outlet_ref {
6173 let mut handler = logic.take_out_handler(outlet);
6174 let result = if let Some(ref mut handler) = handler {
6175 handler.on_pull(logic, outlet_ref)
6176 } else {
6177 Ok(())
6178 };
6179 if let Some(handler) = handler
6180 && handler.keep_handler()
6181 && logic.get_out_handler_mut(outlet).is_none()
6182 {
6183 logic.restore_out_handler(outlet, handler);
6184 }
6185 result?;
6186 }
6187 self.collect_opaque_emissions(stage, stage_index)
6188 } else {
6189 Ok(StageTransition::none())
6190 }
6191 }
6192 StageKind::Unzip { .. } => {
6193 let StageState::Unzip {
6194 demand, cancelled, ..
6195 } = &mut self.stage_states[stage_index]
6196 else {
6197 return Ok(StageTransition::none());
6198 };
6199 let Some(idx) = stage.spec.outlets.iter().position(|o| o.id() == outlet) else {
6200 return Ok(StageTransition::none());
6201 };
6202 if idx < 2 && !cancelled[idx] {
6203 demand[idx] = true;
6204 }
6205 Ok(StageTransition::none())
6206 }
6207 StageKind::Partition { .. } => {
6208 let result = {
6209 let StageState::Partition {
6210 pending,
6211 upstream_closed,
6212 demand,
6213 cancelled,
6214 completed,
6215 ..
6216 } = &mut self.stage_states[stage_index]
6217 else {
6218 return Ok(StageTransition::none());
6219 };
6220 if *completed {
6221 return Ok(StageTransition::none());
6222 }
6223 let Some(idx) = stage.spec.outlets.iter().position(|o| o.id() == outlet) else {
6224 return Ok(StageTransition::none());
6225 };
6226 if cancelled[idx] {
6227 return Ok(StageTransition::none());
6228 }
6229
6230 if let Some((p_idx, p_val)) = pending.take() {
6231 if p_idx == idx {
6232 let out = stage.spec.outlets[idx].id();
6233 if *upstream_closed {
6234 *completed = true;
6235 StageTransition::emit(StageEmissions::One(out, p_val))
6236 .with_completion(
6237 stage.spec.outlets.iter().map(AnyOutlet::id).collect(),
6238 )
6239 } else {
6240 StageTransition::emit(StageEmissions::One(out, p_val))
6241 }
6242 } else {
6243 *pending = Some((p_idx, p_val));
6244 demand[idx] = true;
6245 StageTransition::none()
6246 }
6247 } else {
6248 demand[idx] = true;
6249 StageTransition::none()
6250 }
6251 };
6252 Ok(result)
6253 }
6254 _ => Ok(StageTransition::none()),
6255 }
6256 }
6257
6258 fn process_downstream_finish(
6259 &mut self,
6260 stage_index: usize,
6261 outlet: PortId,
6262 ) -> StreamResult<StageTransition> {
6263 let stage = &self.graph.stages[stage_index];
6264 match &stage.spec.kind {
6265 StageKind::Broadcast => {
6266 let StageState::Broadcast {
6267 cancelled_outlets,
6268 live_outlets,
6269 ..
6270 } = &mut self.stage_states[stage_index]
6271 else {
6272 return Err(StreamError::GraphValidation(
6273 "broadcast state is missing".into(),
6274 ));
6275 };
6276 let index = stage
6277 .spec
6278 .outlets
6279 .iter()
6280 .position(|candidate| candidate.id() == outlet)
6281 .ok_or_else(|| {
6282 StreamError::GraphValidation(format!(
6283 "broadcast outlet {} is not part of the stage",
6284 outlet.as_usize()
6285 ))
6286 })?;
6287 if cancelled_outlets[index] {
6288 return Ok(StageTransition::none());
6289 }
6290 cancelled_outlets[index] = true;
6291 *live_outlets -= 1;
6292 if *live_outlets == 0 {
6293 Ok(StageTransition::none()
6294 .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect()))
6295 } else {
6296 Ok(StageTransition::none())
6297 }
6298 }
6299 StageKind::Balance => {
6300 let StageState::Balance {
6301 cancelled_outlets,
6302 live_outlets,
6303 ..
6304 } = &mut self.stage_states[stage_index]
6305 else {
6306 return Err(StreamError::GraphValidation(
6307 "balance state is missing".into(),
6308 ));
6309 };
6310 let index = stage
6311 .spec
6312 .outlets
6313 .iter()
6314 .position(|candidate| candidate.id() == outlet)
6315 .ok_or_else(|| {
6316 StreamError::GraphValidation(format!(
6317 "balance outlet {} is not part of the stage",
6318 outlet.as_usize()
6319 ))
6320 })?;
6321 if cancelled_outlets[index] {
6322 return Ok(StageTransition::none());
6323 }
6324 cancelled_outlets[index] = true;
6325 *live_outlets -= 1;
6326 if *live_outlets == 0 {
6327 Ok(StageTransition::none()
6328 .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect()))
6329 } else {
6330 Ok(StageTransition::none())
6331 }
6332 }
6333 StageKind::Unzip { .. } => {
6334 let StageState::Unzip { cancelled, .. } = &mut self.stage_states[stage_index]
6335 else {
6336 return Err(StreamError::GraphValidation(
6337 "unzip state is missing".into(),
6338 ));
6339 };
6340 let idx = stage
6341 .spec
6342 .outlets
6343 .iter()
6344 .position(|o| o.id() == outlet)
6345 .unwrap_or(0);
6346 if idx < 2 && !cancelled[idx] {
6347 cancelled[idx] = true;
6348 let all_cancelled = cancelled.iter().all(|c| *c);
6349 if all_cancelled {
6350 Ok(StageTransition::none().with_cancellations(
6351 stage.spec.inlets.iter().map(AnyInlet::id).collect(),
6352 ))
6353 } else {
6354 Ok(StageTransition::none())
6355 }
6356 } else {
6357 Ok(StageTransition::none())
6358 }
6359 }
6360 StageKind::MergeSorted(_)
6361 | StageKind::MergeSequence { .. }
6362 | StageKind::MergeLatest { .. } => Ok(StageTransition::none()
6363 .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect())),
6364 StageKind::Partition { eager_cancel, .. } => {
6365 let result = {
6366 let StageState::Partition {
6367 pending,
6368 cancelled,
6369 completed,
6370 ..
6371 } = &mut self.stage_states[stage_index]
6372 else {
6373 return Err(StreamError::GraphValidation(
6374 "partition state is missing".into(),
6375 ));
6376 };
6377 if *completed {
6378 return Ok(StageTransition::none());
6379 }
6380 let Some(idx) = stage.spec.outlets.iter().position(|o| o.id() == outlet) else {
6381 return Ok(StageTransition::none());
6382 };
6383 if cancelled[idx] {
6384 return Ok(StageTransition::none());
6385 }
6386 cancelled[idx] = true;
6387 if let Some((p_idx, _)) = pending
6389 && *p_idx == idx
6390 {
6391 *pending = None;
6392 }
6393 let all_cancelled = cancelled.iter().all(|c| *c);
6394 if all_cancelled || *eager_cancel {
6395 *completed = true;
6396 StageTransition::none().with_cancellations(
6397 stage.spec.inlets.iter().map(AnyInlet::id).collect(),
6398 )
6399 } else {
6400 StageTransition::none()
6401 }
6402 };
6403 Ok(result)
6404 }
6405 StageKind::Opaque => {
6406 let no_cancelled_outlets = self.cancelled_outlets.is_empty();
6407 if let Some(logic) = self
6408 .opaque_logics
6409 .get_mut(stage_index)
6410 .and_then(|l| l.as_mut())
6411 {
6412 logic.drain_async_callbacks();
6413 logic.downstream_finish_by_id(outlet, "downstream_finish")?;
6414 let outlet_ref = stage
6415 .spec
6416 .outlets
6417 .iter()
6418 .find(|o| o.id() == outlet)
6419 .cloned();
6420 if let Some(outlet_ref) = outlet_ref {
6421 let mut handler = logic.take_out_handler(outlet);
6422 let result = if let Some(ref mut handler) = handler {
6423 handler.on_downstream_finish(logic, outlet_ref)
6424 } else {
6425 Ok(())
6426 };
6427 if let Some(handler) = handler
6428 && handler.keep_handler()
6429 && logic.get_out_handler_mut(outlet).is_none()
6430 {
6431 logic.restore_out_handler(outlet, handler);
6432 }
6433 result?;
6434 }
6435 let all_outlets_closed = stage.spec.outlets.iter().all(|candidate| {
6436 logic.is_closed_by_id(candidate.id())
6437 || (!no_cancelled_outlets
6438 && self.cancelled_outlets.contains(&candidate.id()))
6439 });
6440 let mut transition = self.collect_opaque_emissions(stage, stage_index)?;
6441 if all_outlets_closed {
6442 transition.cancelled_inlets =
6443 stage.spec.inlets.iter().map(AnyInlet::id).collect();
6444 }
6445 Ok(transition)
6446 } else {
6447 Ok(StageTransition::none()
6448 .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect()))
6449 }
6450 }
6451 _ => Ok(StageTransition::none()
6452 .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect())),
6453 }
6454 }
6455
6456 fn collect_opaque_emissions(
6457 &mut self,
6458 stage: &StageRecord,
6459 stage_index: usize,
6460 ) -> StreamResult<StageTransition> {
6461 if let Some(logic) = self
6462 .opaque_logics
6463 .get_mut(stage_index)
6464 .and_then(|l| l.as_mut())
6465 {
6466 let emissions_slots = std::mem::take(&mut logic.pending_emissions);
6467 let completions = std::mem::take(&mut logic.pending_completions);
6468 let has_stage_failed = logic.stage_error().is_some();
6469
6470 let emissions = if emissions_slots.is_empty() {
6471 StageEmissions::None
6472 } else if emissions_slots.len() == 1 {
6473 let (port, val) = emissions_slots.into_iter().next().unwrap();
6474 StageEmissions::One(port, val)
6475 } else {
6476 StageEmissions::Many(emissions_slots)
6477 };
6478
6479 if has_stage_failed {
6480 let _ = has_stage_failed;
6481 }
6482
6483 Ok(StageTransition {
6484 emissions,
6485 completed_outlets: completions,
6486 cancelled_inlets: Vec::new(),
6487 })
6488 } else {
6489 Ok(StageTransition::emit(StageEmissions::None)
6490 .with_completion(vec![single_outlet(stage)?]))
6491 }
6492 }
6493
6494 fn bump_event(&mut self) -> StreamResult<()> {
6495 bump_fused_event(&mut self.events, self.config)
6496 }
6497
6498 fn broadcast_zip_emissions(
6499 &mut self,
6500 fast_path: BroadcastZipFastPath,
6501 value: DatumValue,
6502 ) -> StreamResult<StageEmissions> {
6503 for _ in 0..4 {
6505 self.bump_event()?;
6506 }
6507
6508 let zip_stage = &self.graph.stages[fast_path.zip_stage_index];
6509 let StageKind::Zip(zip) = &zip_stage.spec.kind else {
6510 return Err(StreamError::GraphValidation(
6511 "broadcast zip fast path references a non-zip stage".into(),
6512 ));
6513 };
6514
6515 let cloned = value.clone_box();
6516 let (left, right) = if fast_path.left_uses_clone {
6517 (cloned, value)
6518 } else {
6519 (value, cloned)
6520 };
6521
6522 Ok(StageEmissions::One(
6523 single_outlet(zip_stage)?,
6524 zip(left, right)?,
6525 ))
6526 }
6527
6528 fn balance_merge_emissions(
6529 &mut self,
6530 fast_path: BalanceMergeFastPath,
6531 value: DatumValue,
6532 ) -> StreamResult<StageEmissions> {
6533 for _ in 0..4 {
6535 self.bump_event()?;
6536 }
6537
6538 let merge_stage = &self.graph.stages[fast_path.merge_stage_index];
6539 let StageKind::Merge = merge_stage.spec.kind else {
6540 return Err(StreamError::GraphValidation(
6541 "balance merge fast path references a non-merge stage".into(),
6542 ));
6543 };
6544
6545 Ok(StageEmissions::One(single_outlet(merge_stage)?, value))
6546 }
6547
6548 fn prime_connected_demands(&mut self) {
6549 for (stage_index, stage) in self.graph.stages.iter().enumerate() {
6550 match &stage.spec.kind {
6551 StageKind::Opaque => {
6552 let Some(logic) = self
6553 .opaque_logics
6554 .get_mut(stage_index)
6555 .and_then(|logic| logic.as_mut())
6556 else {
6557 continue;
6558 };
6559 for outlet in &stage.spec.outlets {
6560 if self.edge_by_outlet.contains_key(&outlet.id()) {
6561 let _ = logic.set_demand_by_id(outlet.id());
6562 }
6563 }
6564 }
6565 StageKind::Unzip { .. } => {
6566 let StageState::Unzip { demand, .. } = &mut self.stage_states[stage_index]
6567 else {
6568 continue;
6569 };
6570 for (idx, outlet) in stage.spec.outlets.iter().enumerate() {
6571 if self.edge_by_outlet.contains_key(&outlet.id()) {
6572 demand[idx] = true;
6573 }
6574 }
6575 }
6576 StageKind::Partition { .. } => {
6577 let StageState::Partition {
6578 demand,
6579 output_count,
6580 ..
6581 } = &mut self.stage_states[stage_index]
6582 else {
6583 continue;
6584 };
6585 for (idx, demand_slot) in demand.iter_mut().enumerate().take(*output_count) {
6586 if idx < stage.spec.outlets.len()
6587 && self
6588 .edge_by_outlet
6589 .contains_key(&stage.spec.outlets[idx].id())
6590 {
6591 *demand_slot = true;
6592 }
6593 }
6594 }
6595 _ => {}
6596 }
6597 }
6598 }
6599}
6600
6601impl<Left, Right> GraphBlueprint<ZipShape<Left, Right>>
6602where
6603 Left: Clone + Send + 'static,
6604 Right: Clone + Send + 'static,
6605{
6606 pub fn run_zip(&self, left: Vec<Left>, right: Vec<Right>) -> StreamResult<Vec<(Left, Right)>> {
6607 Ok(self
6608 .run_zip_report(left, right, FusedExecutionConfig::default())?
6609 .output)
6610 }
6611
6612 pub fn run_zip_report(
6613 &self,
6614 left: Vec<Left>,
6615 right: Vec<Right>,
6616 config: FusedExecutionConfig,
6617 ) -> StreamResult<FusedExecutionReport<(Left, Right)>> {
6618 let mut left = left.into_iter();
6619 let mut right = right.into_iter();
6620 let left_inlet = self.shape.in0().id();
6621 let right_inlet = self.shape.in1().id();
6622 let outlet = self.shape.outlet().id();
6623 let mut executor = FusedExecutor::new(self, config);
6624 let mut output = Vec::with_capacity(left.len().min(right.len()));
6625 let mut left_completed = false;
6626 let mut right_completed = false;
6627
6628 {
6629 let mut output_sink = VecOutputSink {
6630 output: &mut output,
6631 };
6632 if left.len() == 0 {
6633 executor.complete(left_inlet, outlet, &mut output_sink)?;
6634 left_completed = true;
6635 }
6636 if right.len() == 0 {
6637 executor.complete(right_inlet, outlet, &mut output_sink)?;
6638 right_completed = true;
6639 }
6640
6641 while left.len() > 0 || right.len() > 0 {
6642 if let Some(item) = left.next() {
6643 executor.deliver(left_inlet, datum(item), outlet, &mut output_sink)?;
6644 if left.len() == 0 && !left_completed {
6645 executor.complete(left_inlet, outlet, &mut output_sink)?;
6646 left_completed = true;
6647 }
6648 }
6649 if let Some(item) = right.next() {
6650 executor.deliver(right_inlet, datum(item), outlet, &mut output_sink)?;
6651 if right.len() == 0 && !right_completed {
6652 executor.complete(right_inlet, outlet, &mut output_sink)?;
6653 right_completed = true;
6654 }
6655 }
6656 }
6657 }
6658
6659 Ok(FusedExecutionReport {
6660 output,
6661 events: executor.events,
6662 async_boundary_crossings: executor.async_boundary_crossings,
6663 })
6664 }
6665}
6666
6667fn broadcast_zip_fast_path<S: Shape>(
6668 stage: &StageRecord,
6669 graph: &GraphBlueprint<S>,
6670 edge_by_outlet: &HashMap<PortId, PortId>,
6671 stage_by_inlet: &HashMap<PortId, usize>,
6672) -> Option<BroadcastZipFastPath> {
6673 let [first_outlet, second_outlet] = stage.spec.outlets.as_slice() else {
6674 return None;
6675 };
6676
6677 let first_inlet = edge_by_outlet.get(&first_outlet.id()).copied()?;
6678 let second_inlet = edge_by_outlet.get(&second_outlet.id()).copied()?;
6679 let first_stage = stage_by_inlet.get(&first_inlet).copied()?;
6680 let second_stage = stage_by_inlet.get(&second_inlet).copied()?;
6681 if first_stage != second_stage {
6682 return None;
6683 }
6684
6685 let zip_stage = &graph.stages[first_stage];
6686 if !matches!(zip_stage.spec.kind, StageKind::Zip(_)) {
6687 return None;
6688 }
6689 let [left_inlet, right_inlet] = zip_stage.spec.inlets.as_slice() else {
6690 return None;
6691 };
6692
6693 if first_inlet == left_inlet.id() && second_inlet == right_inlet.id() {
6694 Some(BroadcastZipFastPath {
6695 zip_stage_index: first_stage,
6696 left_uses_clone: true,
6697 })
6698 } else if first_inlet == right_inlet.id() && second_inlet == left_inlet.id() {
6699 Some(BroadcastZipFastPath {
6700 zip_stage_index: first_stage,
6701 left_uses_clone: false,
6702 })
6703 } else {
6704 None
6705 }
6706}
6707
6708fn balance_merge_fast_path<S: Shape>(
6709 stage: &StageRecord,
6710 graph: &GraphBlueprint<S>,
6711 edge_by_outlet: &HashMap<PortId, PortId>,
6712 stage_by_inlet: &HashMap<PortId, usize>,
6713) -> Option<BalanceMergeFastPath> {
6714 let outlets = &stage.spec.outlets;
6715 let first_inlet = edge_by_outlet.get(&outlets.first()?.id()).copied()?;
6716 let merge_stage_index = *stage_by_inlet.get(&first_inlet)?;
6717 let merge_stage = graph.stages.get(merge_stage_index)?;
6718 if !matches!(merge_stage.spec.kind, StageKind::Merge) {
6719 return None;
6720 }
6721 if merge_stage.spec.inlets.len() != outlets.len() {
6722 return None;
6723 }
6724 for (outlet, merge_inlet) in outlets.iter().zip(merge_stage.spec.inlets.iter()) {
6725 if edge_by_outlet.get(&outlet.id()).copied() != Some(merge_inlet.id()) {
6726 return None;
6727 }
6728 }
6729 Some(BalanceMergeFastPath { merge_stage_index })
6730}
6731
6732fn unzip_fan_in_fast_path<S: Shape>(
6733 stage: &StageRecord,
6734 graph: &GraphBlueprint<S>,
6735 edge_by_outlet: &HashMap<PortId, PortId>,
6736 stage_by_inlet: &HashMap<PortId, usize>,
6737) -> Option<UnzipFanInFastPath> {
6738 let outlets = &stage.spec.outlets;
6739 if outlets.len() != 2 {
6740 return None;
6741 }
6742 let inlet0 = edge_by_outlet.get(&outlets[0].id()).copied()?;
6743 let inlet1 = edge_by_outlet.get(&outlets[1].id()).copied()?;
6744 let stage0 = *stage_by_inlet.get(&inlet0)?;
6745 let stage1 = *stage_by_inlet.get(&inlet1)?;
6746 if stage0 != stage1 {
6747 return None;
6748 }
6749 let target = graph.stages.get(stage0)?;
6750 if !matches!(
6751 target.spec.kind,
6752 StageKind::MergeSorted(_) | StageKind::MergeSequence { .. } | StageKind::MergeLatest { .. }
6753 ) {
6754 return None;
6755 }
6756 let idx0 = target.spec.inlets.iter().position(|i| i.id() == inlet0)?;
6759 let idx1 = target.spec.inlets.iter().position(|i| i.id() == inlet1)?;
6760 Some(UnzipFanInFastPath {
6761 fan_in_stage_index: stage0,
6762 target_inlet_indices: [idx0, idx1],
6763 })
6764}
6765
6766fn unzip_zip_fast_path<S: Shape>(
6767 stage: &StageRecord,
6768 graph: &GraphBlueprint<S>,
6769 edge_by_outlet: &HashMap<PortId, PortId>,
6770 stage_by_inlet: &HashMap<PortId, usize>,
6771) -> Option<UnzipZipFastPath> {
6772 let outlets = &stage.spec.outlets;
6773 if outlets.len() != 2 {
6774 return None;
6775 }
6776 let inlet0 = edge_by_outlet.get(&outlets[0].id()).copied()?;
6777 let inlet1 = edge_by_outlet.get(&outlets[1].id()).copied()?;
6778 let stage0 = *stage_by_inlet.get(&inlet0)?;
6779 let stage1 = *stage_by_inlet.get(&inlet1)?;
6780 if stage0 != stage1 {
6781 return None;
6782 }
6783 let target = graph.stages.get(stage0)?;
6784 if !matches!(target.spec.kind, StageKind::Zip(_)) {
6785 return None;
6786 }
6787 Some(UnzipZipFastPath {
6788 zip_stage_index: stage0,
6789 })
6790}
6791
6792fn partition_merge_fast_path<S: Shape>(
6793 stage: &StageRecord,
6794 graph: &GraphBlueprint<S>,
6795 edge_by_outlet: &HashMap<PortId, PortId>,
6796 stage_by_inlet: &HashMap<PortId, usize>,
6797) -> Option<PartitionMergeFastPath> {
6798 let outlets = &stage.spec.outlets;
6799 let first_inlet = edge_by_outlet.get(&outlets.first()?.id()).copied()?;
6800 let merge_stage_index = *stage_by_inlet.get(&first_inlet)?;
6801 let merge_stage = graph.stages.get(merge_stage_index)?;
6802 if !matches!(merge_stage.spec.kind, StageKind::Merge) {
6803 return None;
6804 }
6805 if merge_stage.spec.inlets.len() != outlets.len() {
6806 return None;
6807 }
6808 for (outlet, merge_inlet) in outlets.iter().zip(merge_stage.spec.inlets.iter()) {
6809 if edge_by_outlet.get(&outlet.id()).copied() != Some(merge_inlet.id()) {
6810 return None;
6811 }
6812 }
6813 Some(PartitionMergeFastPath { merge_stage_index })
6814}
6815
6816pub(crate) fn bump_fused_event(
6822 events: &mut usize,
6823 config: FusedExecutionConfig,
6824) -> StreamResult<()> {
6825 *events += 1;
6826 if *events > config.event_limit {
6827 return Err(StreamError::EventLimitExceeded {
6828 limit: config.event_limit,
6829 });
6830 }
6831 Ok(())
6832}
6833
6834fn broadcast_emissions(outlets: &[AnyOutlet], value: DatumValue) -> StreamResult<StageEmissions> {
6835 match outlets {
6836 [] => Err(StreamError::GraphValidation(
6837 "broadcast has no outlets".into(),
6838 )),
6839 [outlet] => Ok(StageEmissions::One(outlet.id(), value)),
6840 [first, second] => Ok(StageEmissions::Two(
6841 (first.id(), value.clone_box()),
6842 (second.id(), value),
6843 )),
6844 outlets => {
6845 let mut emitted = Vec::with_capacity(outlets.len());
6846 for outlet in &outlets[..outlets.len() - 1] {
6847 emitted.push((outlet.id(), value.clone_box()));
6848 }
6849 emitted.push((outlets[outlets.len() - 1].id(), value));
6850 Ok(StageEmissions::Many(emitted))
6851 }
6852 }
6853}
6854
6855fn single_outlet(stage: &StageRecord) -> StreamResult<PortId> {
6856 stage
6857 .spec
6858 .outlets
6859 .first()
6860 .map(AnyOutlet::id)
6861 .ok_or_else(|| {
6862 StreamError::GraphValidation(format!("stage {} has no outlet", stage.spec.name()))
6863 })
6864}
6865
6866fn merge_emissions(first: StageEmissions, second: StageEmissions) -> StageEmissions {
6867 match (first, second) {
6868 (StageEmissions::None, other) | (other, StageEmissions::None) => other,
6869 (StageEmissions::One(p1, v1), StageEmissions::One(p2, v2)) => {
6870 StageEmissions::Many(vec![(p1, v1), (p2, v2)])
6871 }
6872 (StageEmissions::One(p, v), StageEmissions::Many(mut vec))
6873 | (StageEmissions::Many(mut vec), StageEmissions::One(p, v)) => {
6874 vec.push((p, v));
6875 StageEmissions::Many(vec)
6876 }
6877 (StageEmissions::Many(mut v1), StageEmissions::Many(v2)) => {
6878 v1.extend(v2);
6879 StageEmissions::Many(v1)
6880 }
6881 (a, b) => {
6882 let mut all = Vec::new();
6883 push_emissions(&mut all, a);
6884 push_emissions(&mut all, b);
6885 StageEmissions::Many(all)
6886 }
6887 }
6888}
6889
6890fn push_emissions(out: &mut Vec<(PortId, DatumValue)>, emissions: StageEmissions) {
6891 match emissions {
6892 StageEmissions::None => {}
6893 StageEmissions::One(p, v) => out.push((p, v)),
6894 StageEmissions::Two((p1, v1), (p2, v2)) => {
6895 out.push((p1, v1));
6896 out.push((p2, v2));
6897 }
6898 StageEmissions::Many(vec) => out.extend(vec),
6899 }
6900}