Skip to main content

datum/graph/
executor.rs

1use super::*;
2
3// ---------------------------------------------------------------------------
4// ExecutorMode — Phase 0 scaffolding (WP-18)
5// ---------------------------------------------------------------------------
6
7/// Selects which executor is used to run a fused graph.
8///
9/// This is an internal test/diagnostic hook — it is **not** part of the public
10/// API and must not be exposed through `pub use` or any user-facing surface.
11///
12/// * `Auto` — default; tries the typed flow plan first, falls back to the
13///   erased executor for unsupported graphs.
14/// * `ErasedOnly` — always runs the existing erased (`Box<dyn DatumElement>`)
15///   executor regardless of graph shape.
16/// * `TypedOnly` — always tries the typed flow plan; returns
17///   `StreamError::GraphValidation("typed executor does not support this graph
18///   shape")` when the plan cannot be built.  **Test/diagnostic only.**
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
20pub(crate) enum ExecutorMode {
21    /// Try typed first; fall back to erased for unsupported shapes.
22    #[default]
23    Auto,
24    /// Always use the erased executor.
25    ErasedOnly,
26    /// Always use typed; error if unsupported.  Test/diagnostic only.
27    TypedOnly,
28}
29
30impl<In, Out> GraphBlueprint<FlowShape<In, Out>>
31where
32    In: Clone + Send + 'static,
33    Out: Send + 'static,
34{
35    pub fn run_with_input<I>(&self, input: I) -> StreamResult<Vec<Out>>
36    where
37        I: IntoIterator<Item = In>,
38    {
39        Ok(self
40            .run_with_input_report(input, FusedExecutionConfig::default())?
41            .output)
42    }
43
44    pub fn run_with_input_report<I>(
45        &self,
46        input: I,
47        config: FusedExecutionConfig,
48    ) -> StreamResult<FusedExecutionReport<Out>>
49    where
50        I: IntoIterator<Item = In>,
51    {
52        self.run_with_input_report_mode(input, config, ExecutorMode::Auto)
53    }
54
55    pub(crate) fn run_with_input_report_mode<I>(
56        &self,
57        input: I,
58        config: FusedExecutionConfig,
59        mode: ExecutorMode,
60    ) -> StreamResult<FusedExecutionReport<Out>>
61    where
62        I: IntoIterator<Item = In>,
63    {
64        // --- Typed path (Auto / TypedOnly) ---
65        if mode != ExecutorMode::ErasedOnly {
66            // Phase 2: linear typed flow plan (Identity/Map/AsyncBoundary chains).
67            let linear_plan = try_typed_flow_plan::<In, Out>(
68                &self.stages,
69                &self.edges,
70                self.shape.inlet().id(),
71                self.shape.outlet().id(),
72            );
73            if let Some(plan) = linear_plan {
74                let input = input.into_iter();
75                let mut output = Vec::with_capacity(input.size_hint().0);
76                let mut events = 0usize;
77                let mut async_boundary_crossings = 0usize;
78                for item in input {
79                    let out =
80                        plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
81                    output.push(out);
82                }
83                return Ok(FusedExecutionReport {
84                    output,
85                    events,
86                    async_boundary_crossings,
87                });
88            }
89
90            // Phase 3a: typed MergeSequence kernel (Unzip → MergeSequence topology).
91            let ms_plan = try_typed_merge_sequence_plan::<In, Out>(
92                &self.stages,
93                &self.edges,
94                self.shape.inlet().id(),
95                self.shape.outlet().id(),
96            );
97            if let Some(mut plan) = ms_plan {
98                let output = run_typed_merge_sequence(&mut plan, input)?;
99                return Ok(FusedExecutionReport {
100                    output,
101                    events: 0,
102                    async_boundary_crossings: 0,
103                });
104            }
105
106            // Phase 3b: typed MergeLatest kernel (Unzip → MergeLatest topology).
107            //
108            // Build a FRESH TypedMergeLatestPlan per run — no cached mutable state,
109            // no Mutex.  Blueprint reuse and concurrent runs are fully independent.
110            // The per-run cost (topology/type checks, a few Arc clones, one small
111            // MergeLatestCore alloc, and one Box for the output Vec) is negligible
112            // vs a 10k-element run.
113            //
114            // try_build_typed_merge_latest_dispatch checks topology and element
115            // TypeId without consuming input; if it returns Some(runner), we then
116            // consume input for the typed path.  Custom `T` not in the bounded
117            // 17-type set returns None → falls back to the erased executor (Auto)
118            // or errors (TypedOnly).
119            let inlet_id = self.shape.inlet().id();
120            let outlet_id = self.shape.outlet().id();
121            if let Some(runner) = try_build_typed_merge_latest_dispatch::<In, Out>(
122                &self.stages,
123                &self.edges,
124                inlet_id,
125                outlet_id,
126            ) {
127                let mut input_iter = input.into_iter();
128                let output = runner(&mut input_iter)?;
129                return Ok(FusedExecutionReport {
130                    output,
131                    events: 0,
132                    async_boundary_crossings: 0,
133                });
134            }
135
136            if mode == ExecutorMode::TypedOnly {
137                return Err(StreamError::GraphValidation(
138                    "typed executor does not support this graph shape".into(),
139                ));
140            }
141        }
142
143        // --- Erased fallback ---
144        let input = input.into_iter();
145        let mut executor = FusedExecutor::new(self, config);
146        let inlet = self.shape.inlet().id();
147        let outlet = self.shape.outlet().id();
148        let mut output = Vec::with_capacity(input.size_hint().0);
149
150        {
151            let mut output_sink = VecOutputSink {
152                output: &mut output,
153            };
154            executor.request(outlet, outlet, &mut output_sink)?;
155            for item in input {
156                executor.deliver(inlet, datum(item), outlet, &mut output_sink)?;
157            }
158            executor.complete(inlet, outlet, &mut output_sink)?;
159        }
160
161        Ok(FusedExecutionReport {
162            output,
163            events: executor.events,
164            async_boundary_crossings: executor.async_boundary_crossings,
165        })
166    }
167
168    pub fn run_count_with_input<I>(&self, input: I) -> StreamResult<usize>
169    where
170        I: IntoIterator<Item = In>,
171    {
172        Ok(self
173            .run_count_with_input_report(input, FusedExecutionConfig::default())?
174            .result)
175    }
176
177    pub fn run_count_with_input_report<I>(
178        &self,
179        input: I,
180        config: FusedExecutionConfig,
181    ) -> StreamResult<FusedTerminalReport<usize>>
182    where
183        I: IntoIterator<Item = In>,
184    {
185        self.run_count_with_input_report_mode(input, config, ExecutorMode::Auto)
186    }
187
188    pub(crate) fn run_count_with_input_report_mode<I>(
189        &self,
190        input: I,
191        config: FusedExecutionConfig,
192        mode: ExecutorMode,
193    ) -> StreamResult<FusedTerminalReport<usize>>
194    where
195        I: IntoIterator<Item = In>,
196    {
197        // --- Typed path (Auto / TypedOnly) ---
198        if mode != ExecutorMode::ErasedOnly {
199            let plan = try_typed_flow_plan::<In, Out>(
200                &self.stages,
201                &self.edges,
202                self.shape.inlet().id(),
203                self.shape.outlet().id(),
204            );
205            if let Some(plan) = plan {
206                let mut count = 0usize;
207                let mut events = 0usize;
208                let mut async_boundary_crossings = 0usize;
209                for item in input {
210                    plan.run_item_count(item, config, &mut events, &mut async_boundary_crossings)?;
211                    count += 1;
212                }
213                return Ok(FusedTerminalReport {
214                    result: count,
215                    events,
216                    async_boundary_crossings,
217                });
218            } else if mode == ExecutorMode::TypedOnly {
219                return Err(StreamError::GraphValidation(
220                    "typed executor does not support this graph shape".into(),
221                ));
222            }
223        }
224
225        // --- Erased fallback ---
226        let mut executor = FusedExecutor::new(self, config);
227        let inlet = self.shape.inlet().id();
228        let outlet = self.shape.outlet().id();
229        let mut output_sink = CountOutputSink { count: 0 };
230
231        executor.request::<Out>(outlet, outlet, &mut output_sink)?;
232        for item in input {
233            executor.deliver::<Out>(inlet, datum(item), outlet, &mut output_sink)?;
234        }
235        executor.complete::<Out>(inlet, outlet, &mut output_sink)?;
236
237        Ok(FusedTerminalReport {
238            result: output_sink.count,
239            events: executor.events,
240            async_boundary_crossings: executor.async_boundary_crossings,
241        })
242    }
243
244    pub fn run_fold_with_input<I, Acc, F>(&self, input: I, zero: Acc, fold: F) -> StreamResult<Acc>
245    where
246        I: IntoIterator<Item = In>,
247        F: FnMut(Acc, Out) -> Acc,
248    {
249        Ok(self
250            .run_fold_with_input_report(input, zero, fold, FusedExecutionConfig::default())?
251            .result)
252    }
253
254    pub fn run_fold_with_input_report<I, Acc, F>(
255        &self,
256        input: I,
257        zero: Acc,
258        fold: F,
259        config: FusedExecutionConfig,
260    ) -> StreamResult<FusedTerminalReport<Acc>>
261    where
262        I: IntoIterator<Item = In>,
263        F: FnMut(Acc, Out) -> Acc,
264    {
265        self.run_fold_with_input_report_mode(input, zero, fold, config, ExecutorMode::Auto)
266    }
267
268    pub(crate) fn run_fold_with_input_report_mode<I, Acc, F>(
269        &self,
270        input: I,
271        zero: Acc,
272        mut fold: F,
273        config: FusedExecutionConfig,
274        mode: ExecutorMode,
275    ) -> StreamResult<FusedTerminalReport<Acc>>
276    where
277        I: IntoIterator<Item = In>,
278        F: FnMut(Acc, Out) -> Acc,
279    {
280        // --- Typed path (Auto / TypedOnly) ---
281        if mode != ExecutorMode::ErasedOnly {
282            let plan = try_typed_flow_plan::<In, Out>(
283                &self.stages,
284                &self.edges,
285                self.shape.inlet().id(),
286                self.shape.outlet().id(),
287            );
288            if let Some(plan) = plan {
289                let mut accumulator = zero;
290                let mut events = 0usize;
291                let mut async_boundary_crossings = 0usize;
292                for item in input {
293                    let out =
294                        plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
295                    accumulator = fold(accumulator, out);
296                }
297                return Ok(FusedTerminalReport {
298                    result: accumulator,
299                    events,
300                    async_boundary_crossings,
301                });
302            } else if mode == ExecutorMode::TypedOnly {
303                return Err(StreamError::GraphValidation(
304                    "typed executor does not support this graph shape".into(),
305                ));
306            }
307        }
308
309        // --- Erased fallback ---
310        let mut executor = FusedExecutor::new(self, config);
311        let inlet = self.shape.inlet().id();
312        let outlet = self.shape.outlet().id();
313        let mut output_sink = FoldOutputSink {
314            accumulator: Some(zero),
315            fold,
316        };
317
318        executor.request(outlet, outlet, &mut output_sink)?;
319        for item in input {
320            executor.deliver(inlet, datum(item), outlet, &mut output_sink)?;
321        }
322        executor.complete(inlet, outlet, &mut output_sink)?;
323
324        Ok(FusedTerminalReport {
325            result: output_sink.finish(),
326            events: executor.events,
327            async_boundary_crossings: executor.async_boundary_crossings,
328        })
329    }
330
331    // -- Phase 0/Phase 2 mode-aware entry points (pub(crate) / test-hook) ---
332
333    /// Like `run_with_input` but dispatches based on [`ExecutorMode`].
334    ///
335    /// * `Auto` — tries the typed flow plan; falls back to erased if unsupported.
336    /// * `ErasedOnly` — always uses the erased executor.
337    /// * `TypedOnly` — errors if the typed plan cannot be built.
338    ///
339    /// This method is `pub(crate)` and exercised in `#[cfg(test)]` modules.
340    #[cfg_attr(not(test), allow(dead_code))]
341    pub(crate) fn run_with_input_mode<I>(
342        &self,
343        input: I,
344        mode: ExecutorMode,
345    ) -> StreamResult<Vec<Out>>
346    where
347        I: IntoIterator<Item = In>,
348    {
349        Ok(self
350            .run_with_input_report_mode(input, FusedExecutionConfig::default(), mode)?
351            .output)
352    }
353
354    /// Like `run_count_with_input` but dispatches based on [`ExecutorMode`].
355    ///
356    /// `pub(crate)` test/diagnostic hook.
357    #[allow(dead_code)]
358    pub(crate) fn run_count_with_input_mode<I>(
359        &self,
360        input: I,
361        mode: ExecutorMode,
362    ) -> StreamResult<usize>
363    where
364        I: IntoIterator<Item = In>,
365    {
366        Ok(self
367            .run_count_with_input_report_mode(input, FusedExecutionConfig::default(), mode)?
368            .result)
369    }
370
371    /// Like `run_fold_with_input` but dispatches based on [`ExecutorMode`].
372    ///
373    /// `pub(crate)` test/diagnostic hook.
374    #[allow(dead_code)]
375    pub(crate) fn run_fold_with_input_mode<I, Acc, F>(
376        &self,
377        input: I,
378        zero: Acc,
379        fold: F,
380        mode: ExecutorMode,
381    ) -> StreamResult<Acc>
382    where
383        I: IntoIterator<Item = In>,
384        F: FnMut(Acc, Out) -> Acc,
385    {
386        Ok(self
387            .run_fold_with_input_report_mode(
388                input,
389                zero,
390                fold,
391                FusedExecutionConfig::default(),
392                mode,
393            )?
394            .result)
395    }
396}
397
398impl<T> GraphBlueprint<FlowShape<T, T>>
399where
400    T: Send + 'static,
401{
402    pub fn run_typed_linear_with_input<I>(&self, input: I) -> StreamResult<Vec<T>>
403    where
404        I: IntoIterator<Item = T>,
405    {
406        Ok(self
407            .run_typed_linear_with_input_report(input, FusedExecutionConfig::default())?
408            .output)
409    }
410
411    pub fn run_typed_linear_with_input_report<I>(
412        &self,
413        input: I,
414        config: FusedExecutionConfig,
415    ) -> StreamResult<FusedExecutionReport<T>>
416    where
417        I: IntoIterator<Item = T>,
418    {
419        let input = input.into_iter();
420        let plan = self.typed_linear_plan()?;
421        let mut output = Vec::with_capacity(input.size_hint().0);
422        let mut events = 0;
423        let mut async_boundary_crossings = 0;
424
425        for item in input {
426            let item = plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
427            output.push(item);
428        }
429
430        Ok(FusedExecutionReport {
431            output,
432            events,
433            async_boundary_crossings,
434        })
435    }
436
437    pub fn run_typed_linear_count_with_input<I>(&self, input: I) -> StreamResult<usize>
438    where
439        I: IntoIterator<Item = T>,
440    {
441        Ok(self
442            .run_typed_linear_count_with_input_report(input, FusedExecutionConfig::default())?
443            .result)
444    }
445
446    pub fn run_typed_linear_count_with_input_report<I>(
447        &self,
448        input: I,
449        config: FusedExecutionConfig,
450    ) -> StreamResult<FusedTerminalReport<usize>>
451    where
452        I: IntoIterator<Item = T>,
453    {
454        let plan = self.typed_linear_plan()?;
455        let mut count = 0;
456        let mut events = 0;
457        let mut async_boundary_crossings = 0;
458
459        for item in input {
460            let _ = plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
461            count += 1;
462        }
463
464        Ok(FusedTerminalReport {
465            result: count,
466            events,
467            async_boundary_crossings,
468        })
469    }
470
471    pub fn run_typed_linear_fold_with_input<I, Acc, F>(
472        &self,
473        input: I,
474        zero: Acc,
475        fold: F,
476    ) -> StreamResult<Acc>
477    where
478        I: IntoIterator<Item = T>,
479        F: FnMut(Acc, T) -> Acc,
480    {
481        Ok(self
482            .run_typed_linear_fold_with_input_report(
483                input,
484                zero,
485                fold,
486                FusedExecutionConfig::default(),
487            )?
488            .result)
489    }
490
491    pub fn run_typed_linear_fold_with_input_report<I, Acc, F>(
492        &self,
493        input: I,
494        zero: Acc,
495        mut fold: F,
496        config: FusedExecutionConfig,
497    ) -> StreamResult<FusedTerminalReport<Acc>>
498    where
499        I: IntoIterator<Item = T>,
500        F: FnMut(Acc, T) -> Acc,
501    {
502        let plan = self.typed_linear_plan()?;
503        let mut accumulator = zero;
504        let mut events = 0;
505        let mut async_boundary_crossings = 0;
506
507        for item in input {
508            let item = plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
509            accumulator = fold(accumulator, item);
510        }
511
512        Ok(FusedTerminalReport {
513            result: accumulator,
514            events,
515            async_boundary_crossings,
516        })
517    }
518
519    /// Runs the internal async-boundary count path.
520    ///
521    /// The graph is still runtime-validated through the typed-linear plan, then
522    /// split at `AsyncBoundary` stages and connected with bounded handoff.
523    /// The concrete boundary executor is intentionally private so Datum does
524    /// not expose multiple public runtime backends.
525    pub fn run_async_boundary_count_with_input_report<I>(
526        &self,
527        input: I,
528        config: AsyncBoundaryExecutionConfig,
529    ) -> StreamResult<FusedTerminalReport<usize>>
530    where
531        I: IntoIterator<Item = T> + Send,
532        I::IntoIter: Send + 'static,
533    {
534        let segments = self.typed_linear_async_segments()?;
535        BoundaryCountExecutor::Ractor.run_count(input, segments, config)
536    }
537
538    fn typed_linear_plan(&self) -> StreamResult<TypedLinearPlan<T>> {
539        let graph_inlet = self.shape.inlet().id();
540        let graph_outlet = self.shape.outlet().id();
541        let type_id = TypeId::of::<T>();
542        let mut current_inlet = graph_inlet;
543        let mut seen = HashSet::new();
544        let mut steps = Vec::new();
545
546        loop {
547            let stage_index = self
548                .stages
549                .iter()
550                .position(|stage| {
551                    stage
552                        .spec
553                        .inlets
554                        .iter()
555                        .any(|inlet| inlet.id() == current_inlet)
556                })
557                .ok_or_else(|| {
558                    StreamError::GraphValidation(format!(
559                        "typed linear fast path could not find inlet {}",
560                        current_inlet.as_usize()
561                    ))
562                })?;
563            if !seen.insert(stage_index) {
564                return Err(StreamError::GraphValidation(
565                    "typed linear fast path does not support cycles".into(),
566                ));
567            }
568
569            let stage = &self.stages[stage_index];
570            if stage.spec.inlets.len() != 1 || stage.spec.outlets.len() != 1 {
571                return Err(StreamError::GraphValidation(format!(
572                    "typed linear fast path requires single-inlet single-outlet stages; {} has {} inlet(s) and {} outlet(s)",
573                    stage.spec.name(),
574                    stage.spec.inlets.len(),
575                    stage.spec.outlets.len()
576                )));
577            }
578            let inlet = &stage.spec.inlets[0];
579            let outlet = &stage.spec.outlets[0];
580            if inlet.type_id() != type_id || outlet.type_id() != type_id {
581                return Err(StreamError::GraphValidation(format!(
582                    "typed linear fast path requires every port to use {}",
583                    type_name::<T>()
584                )));
585            }
586
587            let step = match &stage.spec.kind {
588                StageKind::Identity | StageKind::Opaque => TypedLinearStep::Pass,
589                StageKind::AsyncBoundary => TypedLinearStep::AsyncBoundary,
590                StageKind::Map(map) => {
591                    let mapper = map
592                        .typed
593                        .as_ref()
594                        .downcast_ref::<Arc<dyn Fn(T) -> T + Send + Sync>>()
595                        .ok_or_else(|| {
596                            StreamError::GraphValidation(format!(
597                                "typed linear fast path could not downcast map stage {}",
598                                stage.spec.name()
599                            ))
600                        })?;
601                    TypedLinearStep::Map(Arc::clone(mapper))
602                }
603                _ => {
604                    return Err(StreamError::GraphValidation(format!(
605                        "typed linear fast path does not support {}",
606                        stage.spec.name()
607                    )));
608                }
609            };
610            steps.push(step);
611
612            if outlet.id() == graph_outlet {
613                break;
614            }
615            current_inlet = self
616                .edges
617                .iter()
618                .find_map(|edge| (edge.outlet == outlet.id()).then_some(edge.inlet))
619                .ok_or_else(|| {
620                    StreamError::GraphValidation(format!(
621                        "typed linear fast path could not follow outlet {}",
622                        outlet.id().as_usize()
623                    ))
624                })?;
625        }
626
627        if seen.len() != self.stages.len() {
628            return Err(StreamError::GraphValidation(
629                "typed linear fast path requires all stages to be on the result path".into(),
630            ));
631        }
632
633        Ok(TypedLinearPlan { steps })
634    }
635
636    pub(super) fn typed_linear_async_segments(&self) -> StreamResult<TypedLinearSegments<T>> {
637        let plan = self.typed_linear_plan()?;
638        let mut segments = Vec::new();
639        let mut current = Vec::new();
640
641        for step in plan.steps {
642            match step {
643                TypedLinearStep::AsyncBoundary => {
644                    segments.push(current);
645                    current = Vec::new();
646                }
647                step => current.push(step),
648            }
649        }
650        segments.push(current);
651
652        if segments.len() == 1 {
653            return Err(StreamError::GraphValidation(
654                "async boundary execution requires at least one AsyncBoundary stage".into(),
655            ));
656        }
657
658        Ok(TypedLinearSegments { segments })
659    }
660}
661
662pub(super) struct TypedLinearPlan<T> {
663    steps: Vec<TypedLinearStep<T>>,
664}
665
666pub(super) struct TypedLinearSegments<T> {
667    segments: Vec<Vec<TypedLinearStep<T>>>,
668}
669
670pub(super) enum TypedLinearStep<T> {
671    Pass,
672    Map(Arc<dyn Fn(T) -> T + Send + Sync>),
673    AsyncBoundary,
674}
675
676impl<T> Clone for TypedLinearStep<T> {
677    fn clone(&self) -> Self {
678        match self {
679            Self::Pass => Self::Pass,
680            Self::Map(mapper) => Self::Map(Arc::clone(mapper)),
681            Self::AsyncBoundary => Self::AsyncBoundary,
682        }
683    }
684}
685
686impl<T> TypedLinearPlan<T> {
687    fn run_item(
688        &self,
689        mut item: T,
690        config: FusedExecutionConfig,
691        events: &mut usize,
692        async_boundary_crossings: &mut usize,
693    ) -> StreamResult<T>
694    where
695        T: Send + 'static,
696    {
697        for step in &self.steps {
698            bump_fused_event(events, config)?;
699            match step {
700                TypedLinearStep::Pass => {}
701                TypedLinearStep::Map(mapper) => {
702                    item = mapper(item);
703                }
704                TypedLinearStep::AsyncBoundary => {
705                    *async_boundary_crossings += 1;
706                }
707            }
708            bump_fused_event(events, config)?;
709        }
710        Ok(item)
711    }
712}
713
714// ---------------------------------------------------------------------------
715// Phase 1 — Typed-port substrate (WP-18)
716// ---------------------------------------------------------------------------
717
718/// A typed storage cell for a single value associated with one port.
719///
720/// Used by typed junction kernels (Phase 3+) to avoid `DatumValue` boxing for
721/// per-port buffering.  Phase 2 linear kernels do not need per-port storage
722/// because values flow sequentially through the pipeline without buffering.
723// Phase 3+ will use TypedSlot actively; allow dead_code until then.
724#[allow(dead_code)]
725pub(crate) struct TypedSlot<T>(Option<T>);
726
727#[allow(dead_code)]
728impl<T> TypedSlot<T> {
729    pub(crate) fn empty() -> Self {
730        Self(None)
731    }
732
733    pub(crate) fn put(&mut self, value: T) {
734        self.0 = Some(value);
735    }
736
737    pub(crate) fn take(&mut self) -> Option<T> {
738        self.0.take()
739    }
740
741    pub(crate) fn is_some(&self) -> bool {
742        self.0.is_some()
743    }
744}
745
746/// Heterogeneous map from [`PortId`] to a type-erased [`TypedSlot<T>`].
747///
748/// Typed junction kernels (Phase 3+) register per-port slots here at plan
749/// time, then access them during execution without boxing the values.
750// Phase 3+ will use TypedPortRegistry actively; allow dead_code until then.
751#[allow(dead_code)]
752pub(crate) struct TypedPortRegistry {
753    slots: HashMap<PortId, Box<dyn Any + Send>>,
754}
755
756#[allow(dead_code)]
757impl TypedPortRegistry {
758    pub(crate) fn new() -> Self {
759        Self {
760            slots: HashMap::new(),
761        }
762    }
763
764    /// Register a typed slot for `port_id`.  Panics if a slot for that port
765    /// already exists (programming error — ports must be registered once).
766    pub(crate) fn register<T: Any + Send>(&mut self, port_id: PortId) {
767        let prev = self
768            .slots
769            .insert(port_id, Box::new(TypedSlot::<T>::empty()));
770        assert!(prev.is_none(), "port {port_id:?} registered twice");
771    }
772
773    /// Obtain a mutable reference to the typed slot for `port_id`.
774    ///
775    /// Returns `None` if the port was not registered or the stored type does
776    /// not match `T`.
777    pub(crate) fn get_mut<T: Any + Send>(&mut self, port_id: PortId) -> Option<&mut TypedSlot<T>> {
778        self.slots.get_mut(&port_id)?.downcast_mut::<TypedSlot<T>>()
779    }
780}
781
782/// Per-event kernel for a single stage in the typed-port executor.
783///
784/// A kernel receives a strongly-typed input value and produces a strongly-typed
785/// output — **no `DatumValue` boxing, no per-element downcast.**  Kernels are
786/// constructed once at plan time via a [`TypedStageFactory`] and reused across
787/// all items in the stream.
788// Phase 3+ will add typed junction kernels; allow dead_code until then.
789#[allow(dead_code)]
790pub(crate) trait TypedKernel<In, Out>: Send + Sync {
791    fn run(&self, input: In) -> Out;
792}
793
794/// Factory that constructs a [`TypedKernel<In, Out>`] for a specific stage at
795/// plan time.
796///
797/// Factories perform **safe `Any` downcasts** on the stage spec's stored
798/// type-erased functions once, during planning.  They return `None` when the
799/// stage is unsupported (falls back to the erased executor in `Auto` mode).
800// Phase 3+ will add typed junction factories; allow dead_code until then.
801#[allow(dead_code)]
802pub(crate) trait TypedStageFactory<In, Out>: Send + Sync {
803    fn try_build(&self, spec: &StageSpec) -> Option<Box<dyn TypedKernel<In, Out>>>;
804}
805
806// ---------------------------------------------------------------------------
807// Phase 2 — TypedFlowPlan<In, Out> (WP-18)
808// ---------------------------------------------------------------------------
809
810/// A typed, per-element step in a linear flow plan.
811///
812/// `In` here is the **intermediate** element type — all middle steps are typed
813/// as `In → In`.  Only the last step may produce a different type `Out`; that
814/// is handled separately in [`TypedFlowPlan`].
815enum TypedMiddleStep<T: 'static> {
816    /// Pass the value through unchanged.
817    Pass,
818    /// Apply a pure typed map function.
819    Map(Arc<dyn Fn(T) -> T + Send + Sync>),
820    /// Record an async-boundary crossing but do not move to a different thread.
821    AsyncBoundary,
822}
823
824/// The terminal step in a [`TypedFlowPlan`], which produces the final `Out`.
825///
826/// Two variants:
827///
828/// * `Map` — the last stage has a stored typed `Fn(In) → Out`.  Obtained by a
829///   plan-time `downcast_ref` on `StageMapFns::typed`.  Zero per-element
830///   allocations.
831/// * `Identity` — the last stage is a pass-through.  This variant is only
832///   constructed when `TypeId::of::<In>() == TypeId::of::<Out>()`, i.e. the
833///   types are the same at runtime.  The stored closure performs a single
834///   `Box<dyn Any + Send>::downcast::<Out>()` per element — one heap allocation
835///   — to safely reinterpret the `In` value as `Out` without `unsafe`.
836enum TypedLastStep<In: 'static, Out: 'static> {
837    /// Last stage is a typed map that directly produces `Out`.
838    Map(Arc<dyn Fn(In) -> Out + Send + Sync>),
839    /// Last stage is a pass-through; `In` and `Out` are the same type at
840    /// runtime (verified via [`TypeId`] at plan time).  One allocation per
841    /// element to perform the safe `In → Out` reinterpretation.
842    Identity(Arc<dyn Fn(In) -> Out + Send + Sync>),
843}
844
845/// A typed, fused execution plan for a linear `FlowShape<In, Out>` graph.
846///
847/// Built once at plan time by [`try_typed_flow_plan`]; reused across items.
848///
849/// **No `DatumValue` per-element boxing for Map graphs.**  For Identity-only
850/// graphs a single `Box<dyn Any + Send>` allocation per element converts the
851/// final `In` value to `Out` safely (see [`TypedLastStep::Identity`]).
852///
853/// For **count** sinks the plan short-circuits before the last step: no output
854/// value is produced and no allocation occurs.
855pub(crate) struct TypedFlowPlan<In: 'static, Out: 'static> {
856    /// Intermediate steps: `In → In`.
857    middle_steps: Vec<TypedMiddleStep<In>>,
858    /// Terminal step: `In → Out`.
859    last_step: TypedLastStep<In, Out>,
860    /// Total stage count (= `middle_steps.len() + 1`).
861    /// Retained for diagnostic use; not yet read in execution.
862    #[allow(dead_code)]
863    stage_count: usize,
864}
865
866impl<In: Send + 'static, Out: Send + 'static> TypedFlowPlan<In, Out> {
867    /// Run a single item through the typed plan, producing `Out`.
868    ///
869    /// Bumps the fused event counter **twice** per stage (before and after),
870    /// matching the erased executor's accounting.
871    pub(crate) fn run_item(
872        &self,
873        item: In,
874        config: FusedExecutionConfig,
875        events: &mut usize,
876        async_boundary_crossings: &mut usize,
877    ) -> StreamResult<Out> {
878        let mut val = item;
879        for step in &self.middle_steps {
880            bump_fused_event(events, config)?;
881            val = match step {
882                TypedMiddleStep::Pass => val,
883                TypedMiddleStep::Map(f) => f(val),
884                TypedMiddleStep::AsyncBoundary => {
885                    *async_boundary_crossings += 1;
886                    val
887                }
888            };
889            bump_fused_event(events, config)?;
890        }
891        // Terminal step.
892        bump_fused_event(events, config)?;
893        let out = match &self.last_step {
894            TypedLastStep::Map(f) => f(val),
895            TypedLastStep::Identity(f) => f(val),
896        };
897        bump_fused_event(events, config)?;
898        Ok(out)
899    }
900
901    /// Run a single item through all **middle** stages only, counting events
902    /// for the terminal stage but **not** producing an `Out` value.
903    ///
904    /// Used by `run_count_with_input_report` to avoid any allocation when the
905    /// last step is an identity pass-through (`TypedLastStep::Identity`).
906    /// Map-last graphs call this path too (the terminal map function is still
907    /// executed to maintain semantic equivalence with the erased path, but
908    /// its result is discarded — pure functions have no observable side effects).
909    pub(crate) fn run_item_count(
910        &self,
911        item: In,
912        config: FusedExecutionConfig,
913        events: &mut usize,
914        async_boundary_crossings: &mut usize,
915    ) -> StreamResult<()> {
916        let mut val = item;
917        for step in &self.middle_steps {
918            bump_fused_event(events, config)?;
919            val = match step {
920                TypedMiddleStep::Pass => val,
921                TypedMiddleStep::Map(f) => f(val),
922                TypedMiddleStep::AsyncBoundary => {
923                    *async_boundary_crossings += 1;
924                    val
925                }
926            };
927            bump_fused_event(events, config)?;
928        }
929        // Terminal stage: bump events and run (discard output).
930        bump_fused_event(events, config)?;
931        match &self.last_step {
932            TypedLastStep::Map(f) => {
933                let _ = f(val);
934            }
935            TypedLastStep::Identity(_) => {
936                // Pass-through: no allocation needed.  Drop `val` in place.
937                drop(val);
938            }
939        }
940        bump_fused_event(events, config)?;
941        Ok(())
942    }
943}
944
945/// Attempt to build a [`TypedFlowPlan<In, Out>`] for the given linear graph.
946///
947/// Returns `None` when the graph cannot be executed on the typed path.  This
948/// happens for:
949/// - Non-linear graphs (junctions, multi-inlet/outlet stages).
950/// - Cyclic stage graphs.
951/// - Graphs where not all stage port types match `In`/`Out` in a way that
952///   allows safe plan-time downcasts.
953/// - Stages with `StageKind::Opaque` (custom logic; always falls back to
954///   erased).
955///
956/// On success, returns `Some(plan)` and the callers (`run_with_input_report`
957/// etc.) skip the erased executor entirely.
958pub(crate) fn try_typed_flow_plan<In, Out>(
959    stages: &[super::builder::StageRecord],
960    edges: &[super::builder::Edge],
961    graph_inlet: PortId,
962    graph_outlet: PortId,
963) -> Option<TypedFlowPlan<In, Out>>
964where
965    In: Clone + Send + 'static,
966    Out: Send + 'static,
967{
968    let in_type_id = TypeId::of::<In>();
969    let out_type_id = TypeId::of::<Out>();
970
971    let mut current_inlet = graph_inlet;
972    let mut seen = HashSet::new();
973    // Collect (StageKind, outlet_type_id) pairs as we walk the chain.
974    let mut stage_infos: Vec<(&StageKind, TypeId)> = Vec::new();
975
976    loop {
977        let stage_index = stages.iter().position(|s| {
978            s.spec
979                .inlets
980                .iter()
981                .any(|inlet| inlet.id() == current_inlet)
982        })?;
983        if !seen.insert(stage_index) {
984            // Cycle detected — not supported on typed path.
985            return None;
986        }
987        let stage = &stages[stage_index];
988        if stage.spec.inlets.len() != 1 || stage.spec.outlets.len() != 1 {
989            // Non-linear stage (junction) — fall back to erased.
990            return None;
991        }
992        let inlet = &stage.spec.inlets[0];
993        let outlet = &stage.spec.outlets[0];
994
995        // All intermediate inlets must have type In.
996        if inlet.type_id() != in_type_id {
997            return None;
998        }
999
1000        stage_infos.push((&stage.spec.kind, outlet.type_id()));
1001
1002        if outlet.id() == graph_outlet {
1003            break;
1004        }
1005        // Follow the edge to the next inlet.
1006        current_inlet = edges
1007            .iter()
1008            .find_map(|e| (e.outlet == outlet.id()).then_some(e.inlet))?;
1009    }
1010
1011    if seen.len() != stages.len() {
1012        // Not all stages are on the result path (unreachable stages).
1013        return None;
1014    }
1015
1016    // Build the plan.  All stages except the last are `In → In`; the last
1017    // stage may produce a different `Out`.
1018    let (last_kind, last_outlet_type) = stage_infos.last()?;
1019    // The last outlet must have type `Out`.
1020    if *last_outlet_type != out_type_id {
1021        return None;
1022    }
1023
1024    let total = stage_infos.len();
1025    let mut middle_steps: Vec<TypedMiddleStep<In>> = Vec::with_capacity(total.saturating_sub(1));
1026
1027    for (kind, outlet_type) in &stage_infos[..total.saturating_sub(1)] {
1028        // Intermediate outlets must all have type `In`.
1029        if *outlet_type != in_type_id {
1030            return None;
1031        }
1032        let step = match kind {
1033            StageKind::Identity => TypedMiddleStep::Pass,
1034            // Opaque stages have custom GraphStageLogic — they can emit multiple
1035            // values, buffer, or do arbitrary things.  They are never safe to
1036            // treat as a pass-through on the typed path.
1037            StageKind::Opaque => return None,
1038            StageKind::AsyncBoundary => TypedMiddleStep::AsyncBoundary,
1039            StageKind::Map(map) => {
1040                // Downcast `typed` to `Arc<Fn(In) -> In>`.
1041                let f = map
1042                    .typed
1043                    .downcast_ref::<Arc<dyn Fn(In) -> In + Send + Sync>>()?;
1044                TypedMiddleStep::Map(Arc::clone(f))
1045            }
1046            _ => return None,
1047        };
1048        middle_steps.push(step);
1049    }
1050
1051    // Build the terminal step.
1052    let last_step: TypedLastStep<In, Out> = match last_kind {
1053        StageKind::Identity => {
1054            // Only valid when In = Out (same TypeId).
1055            if in_type_id != out_type_id {
1056                return None;
1057            }
1058            // Safe reinterpretation via Box<dyn Any + Send> downcast.
1059            // One allocation per element; no `unsafe`.
1060            TypedLastStep::Identity(Arc::new(|x: In| -> Out {
1061                let boxed: Box<dyn Any + Send> = Box::new(x);
1062                *boxed
1063                    .downcast::<Out>()
1064                    .expect("TypeId equality verified at plan time")
1065            }))
1066        }
1067        // Opaque stages always fall back to the erased executor.
1068        StageKind::Opaque => return None,
1069        StageKind::AsyncBoundary => {
1070            // Async boundary as last stage: same as Identity (sync pass-through
1071            // in the non-async fused executor).
1072            if in_type_id != out_type_id {
1073                return None;
1074            }
1075            TypedLastStep::Identity(Arc::new(|x: In| -> Out {
1076                let boxed: Box<dyn Any + Send> = Box::new(x);
1077                *boxed
1078                    .downcast::<Out>()
1079                    .expect("TypeId equality verified at plan time")
1080            }))
1081        }
1082        StageKind::Map(map) => {
1083            // Downcast `typed` to `Arc<Fn(In) -> Out>`.
1084            let f = map
1085                .typed
1086                .downcast_ref::<Arc<dyn Fn(In) -> Out + Send + Sync>>()?;
1087            TypedLastStep::Map(Arc::clone(f))
1088        }
1089        _ => return None,
1090    };
1091
1092    Some(TypedFlowPlan {
1093        middle_steps,
1094        last_step,
1095        stage_count: total,
1096    })
1097}
1098
1099// ---------------------------------------------------------------------------
1100// Phase 3a — Typed MergeSequence kernel (WP-18)
1101// ---------------------------------------------------------------------------
1102
1103/// Shared ordering core for `MergeSequence<T>`.
1104///
1105/// Holds all mutable state for sequence-ordering a fan-in: the next expected
1106/// sequence number, a pending buffer for out-of-order arrivals, and an output
1107/// queue for items whose sequence number has been resolved.
1108///
1109/// Used by the **typed** `MergeSequencePlan` execution path.  The erased
1110/// executor maintains equivalent state inline in [`StageState::MergeSequence`]
1111/// (shared semantics, forced equivalence via tests — see the `typed_vs_erased`
1112/// test suite).
1113pub(crate) struct MergeSequenceCore<T> {
1114    next_sequence: u64,
1115    /// Out-of-order items waiting for their sequence slot to open.
1116    pending: Vec<(u64, T)>,
1117    /// Items ready to emit, in sequence order.
1118    output_buffer: VecDeque<T>,
1119    /// Number of input ports that have completed.
1120    completed_count: usize,
1121    /// Total number of input ports.
1122    input_count: usize,
1123    /// Whether this stage has finished (all inputs done, all outputs drained).
1124    completed: bool,
1125}
1126
1127impl<T> MergeSequenceCore<T> {
1128    pub(crate) fn new(input_count: usize) -> Self {
1129        Self {
1130            next_sequence: 0,
1131            pending: Vec::new(),
1132            output_buffer: VecDeque::new(),
1133            completed_count: 0,
1134            input_count,
1135            completed: false,
1136        }
1137    }
1138
1139    /// Reset state for reuse across benchmark iterations.
1140    fn reset(&mut self) {
1141        self.next_sequence = 0;
1142        self.pending.clear();
1143        self.output_buffer.clear();
1144        self.completed_count = 0;
1145        self.completed = false;
1146    }
1147
1148    /// Push a typed value into the core with its extracted sequence number.
1149    ///
1150    /// On success, all newly-resolved items (whose sequence is now contiguous
1151    /// from `next_sequence`) are appended to `output_buffer`.  On duplicate
1152    /// sequence, returns `StreamError::Failed`.
1153    fn push_item(&mut self, seq: u64, val: T) -> StreamResult<()> {
1154        if seq == self.next_sequence {
1155            self.output_buffer.push_back(val);
1156            self.next_sequence += 1;
1157            // Drain any pending items that are now in sequence.
1158            while let Some(index) = self
1159                .pending
1160                .iter()
1161                .position(|(s, _)| *s == self.next_sequence)
1162            {
1163                let (_, item) = self.pending.remove(index);
1164                self.output_buffer.push_back(item);
1165                self.next_sequence += 1;
1166            }
1167        } else {
1168            if self.pending.iter().any(|(s, _)| *s == seq) {
1169                return Err(StreamError::Failed(format!(
1170                    "duplicate sequence {seq} on merge sequence"
1171                )));
1172            }
1173            self.pending.push((seq, val));
1174            self.pending.sort_by_key(|(s, _)| *s);
1175            // Check whether the new item resolves any contiguous run.
1176            while let Some(index) = self
1177                .pending
1178                .iter()
1179                .position(|(s, _)| *s == self.next_sequence)
1180            {
1181                let (_, item) = self.pending.remove(index);
1182                self.output_buffer.push_back(item);
1183                self.next_sequence += 1;
1184            }
1185        }
1186        Ok(())
1187    }
1188
1189    /// Signal that one input port has completed.
1190    ///
1191    /// Returns `true` when all inputs have completed *and* the output buffer
1192    /// is empty (the stage can now terminate cleanly).
1193    ///
1194    /// Returns `Err` when all inputs have completed but there are still items
1195    /// in `pending` that cannot be resolved — a sequence gap.
1196    fn on_inlet_complete(&mut self) -> StreamResult<bool> {
1197        self.completed_count += 1;
1198        if self.completed_count >= self.input_count && self.output_buffer.is_empty() {
1199            if !self.pending.is_empty() {
1200                return Err(StreamError::Failed(format!(
1201                    "expected sequence {}, but all input ports have pushed or are complete",
1202                    self.next_sequence,
1203                )));
1204            }
1205            self.completed = true;
1206            Ok(true)
1207        } else {
1208            Ok(false)
1209        }
1210    }
1211
1212    /// Drain all available output items into `out`.
1213    fn drain_into(&mut self, out: &mut Vec<T>) {
1214        out.extend(self.output_buffer.drain(..));
1215    }
1216}
1217
1218// ---------------------------------------------------------------------------
1219// Typed MergeSequence plan: Unzip<A,B> → MergeSequence<T> topology
1220// ---------------------------------------------------------------------------
1221
1222/// A typed, fused execution plan for a graph whose only stages are one
1223/// `Unzip`-like fan-out followed by one `MergeSequence` fan-in, where every
1224/// port shares the same element type `T` and `In` is the split input type.
1225///
1226/// Built once at plan time by [`try_typed_merge_sequence_plan`]; executed by
1227/// [`run_typed_merge_sequence`].
1228///
1229/// **No `DatumValue` boxing per element.**  The split and sequence-extractor
1230/// functions are called on strongly-typed values.
1231pub(crate) struct TypedMergeSequencePlan<In, T> {
1232    /// Splits one `In` value into `n` typed `T` values (one per MergeSequence
1233    /// inlet).  Currently only n=2 is supported (Unzip → MergeSequence(2)).
1234    splits: Vec<Arc<dyn Fn(In) -> T + Send + Sync>>,
1235    /// Extracts the sequence number from a `T` value.
1236    extract_sequence: Arc<dyn Fn(&T) -> u64 + Send + Sync>,
1237    /// Mutable ordering state, reset between runs.
1238    core: MergeSequenceCore<T>,
1239}
1240
1241impl<In: Clone + Send + 'static, T: Send + 'static> TypedMergeSequencePlan<In, T> {
1242    /// Push one input item, extract and order both split values, drain ready
1243    /// outputs into `out`.
1244    fn push_item(&mut self, item: In, out: &mut Vec<T>) -> StreamResult<()> {
1245        for split_fn in &self.splits {
1246            let val = split_fn(item.clone());
1247            let seq = (self.extract_sequence)(&val);
1248            self.core.push_item(seq, val)?;
1249        }
1250        self.core.drain_into(out);
1251        Ok(())
1252    }
1253
1254    /// Signal end-of-stream (all inputs exhausted) and check for gaps.
1255    ///
1256    /// Each split function simulates one logical inlet completing.
1257    fn finish(&mut self, out: &mut Vec<T>) -> StreamResult<()> {
1258        for _ in 0..self.splits.len() {
1259            self.core.on_inlet_complete()?;
1260        }
1261        self.core.drain_into(out);
1262        Ok(())
1263    }
1264
1265    /// Reset internal state so the plan can be reused across benchmark iterations.
1266    fn reset(&mut self) {
1267        self.core.reset();
1268    }
1269}
1270
1271/// Attempt to build a [`TypedMergeSequencePlan<In, T>`] for the given graph.
1272///
1273/// Succeeds when the graph has exactly these two stages (in topological order):
1274/// 1. A `StageKind::Unzip` with one inlet (type `In`) and `k` outlets (type `T`).
1275/// 2. A `StageKind::MergeSequence` with `k` inlets (type `T`) and one outlet
1276///    (type `T` = `Out`).
1277///
1278/// All `k` outlets of the Unzip must connect to the `k` inlets of the
1279/// MergeSequence, and all port types must match `T`.
1280///
1281/// Returns `None` for any other topology (falls back to erased executor).
1282pub(crate) fn try_typed_merge_sequence_plan<In, Out>(
1283    stages: &[super::builder::StageRecord],
1284    edges: &[super::builder::Edge],
1285    graph_inlet: PortId,
1286    graph_outlet: PortId,
1287) -> Option<TypedMergeSequencePlan<In, Out>>
1288where
1289    In: Clone + Send + 'static,
1290    Out: Send + 'static,
1291{
1292    // We require exactly two stages.
1293    if stages.len() != 2 {
1294        return None;
1295    }
1296
1297    let in_type_id = TypeId::of::<In>();
1298    let out_type_id = TypeId::of::<Out>();
1299
1300    // Find the Unzip stage (the one whose inlet id matches the graph inlet).
1301    let unzip_idx = stages
1302        .iter()
1303        .position(|s| s.spec.inlets.len() == 1 && s.spec.inlets[0].id() == graph_inlet)?;
1304    let unzip_stage = &stages[unzip_idx];
1305
1306    // Must be a StageKind::Unzip.
1307    let typed_split_any = match &unzip_stage.spec.kind {
1308        StageKind::Unzip { typed_split, .. } => Arc::clone(typed_split),
1309        _ => return None,
1310    };
1311
1312    // The unzip inlet must have type In.
1313    if unzip_stage.spec.inlets[0].type_id() != in_type_id {
1314        return None;
1315    }
1316
1317    // All unzip outlets must have type Out.
1318    let k = unzip_stage.spec.outlets.len();
1319    if k == 0 {
1320        return None;
1321    }
1322    for outlet in &unzip_stage.spec.outlets {
1323        if outlet.type_id() != out_type_id {
1324            return None;
1325        }
1326    }
1327
1328    // Find the MergeSequence stage.
1329    let ms_idx = 1 - unzip_idx;
1330    let ms_stage = &stages[ms_idx];
1331
1332    // Must be a StageKind::MergeSequence with k inlets and 1 outlet.
1333    let (ms_input_count, typed_extract_any) = match &ms_stage.spec.kind {
1334        StageKind::MergeSequence {
1335            input_count,
1336            typed_extract,
1337            ..
1338        } => (*input_count, Arc::clone(typed_extract)),
1339        _ => return None,
1340    };
1341
1342    if ms_stage.spec.inlets.len() != k || ms_stage.spec.outlets.len() != 1 {
1343        return None;
1344    }
1345    if ms_input_count != k {
1346        return None;
1347    }
1348    // All MergeSequence ports must have type Out.
1349    for inlet in &ms_stage.spec.inlets {
1350        if inlet.type_id() != out_type_id {
1351            return None;
1352        }
1353    }
1354    if ms_stage.spec.outlets[0].type_id() != out_type_id {
1355        return None;
1356    }
1357
1358    // MergeSequence outlet must be the graph outlet.
1359    if ms_stage.spec.outlets[0].id() != graph_outlet {
1360        return None;
1361    }
1362
1363    // Verify edge wiring: each Unzip outlet must connect to one MergeSequence inlet.
1364    // Build a set of (unzip_outlet_id → ms_inlet_id) edges and verify all k outlets
1365    // are connected.
1366    let unzip_outlet_ids: Vec<PortId> =
1367        unzip_stage.spec.outlets.iter().map(AnyOutlet::id).collect();
1368    let ms_inlet_ids: Vec<PortId> = ms_stage.spec.inlets.iter().map(AnyInlet::id).collect();
1369
1370    // For each Unzip outlet, find the edge and verify it leads to one of the MS inlets.
1371    let mut outlet_to_ms_inlet: Vec<Option<usize>> = vec![None; k];
1372    for edge in edges {
1373        if let Some(uo_idx) = unzip_outlet_ids.iter().position(|&id| id == edge.outlet) {
1374            if let Some(mi_idx) = ms_inlet_ids.iter().position(|&id| id == edge.inlet) {
1375                outlet_to_ms_inlet[uo_idx] = Some(mi_idx);
1376            } else {
1377                return None; // Unzip outlet goes somewhere unexpected.
1378            }
1379        }
1380    }
1381    if outlet_to_ms_inlet.iter().any(|x| x.is_none()) {
1382        return None; // Not all Unzip outlets are wired to MergeSequence.
1383    }
1384
1385    // Down-cast typed_split: `Arc<StageTypedUnzipFn>` holds an
1386    // `Arc<dyn Fn(In) -> (Out0, Out1) + Send + Sync>` (for k=2) or
1387    // `Arc<dyn Fn(In) -> (Out, Out)>` etc.  We need per-outlet extractors.
1388    //
1389    // For k=2 (the only Unzip arity we support today): downcast to
1390    // `Arc<dyn Fn(In) -> (Out, Out) + Send + Sync>`.
1391    if k != 2 {
1392        // Only Unzip (2 outputs) is supported right now.
1393        return None;
1394    }
1395
1396    // Downcast typed_split to Arc<dyn Fn(In) -> (Out, Out) + Send + Sync>.
1397    let typed_split =
1398        typed_split_any.downcast_ref::<Arc<dyn Fn(In) -> (Out, Out) + Send + Sync>>()?;
1399    let typed_split = Arc::clone(typed_split);
1400
1401    // Downcast typed_extract to Arc<dyn Fn(&Out) -> u64 + Send + Sync>.
1402    let typed_extract =
1403        typed_extract_any.downcast_ref::<Arc<dyn Fn(&Out) -> u64 + Send + Sync>>()?;
1404    let typed_extract = Arc::clone(typed_extract);
1405
1406    // Build per-inlet split closures respecting the wiring order.
1407    // outlet_to_ms_inlet[0] = which MS inlet Unzip.out0 connects to.
1408    // outlet_to_ms_inlet[1] = which MS inlet Unzip.out1 connects to.
1409    //
1410    // We build `splits[ms_inlet_idx]` = the closure that extracts that value.
1411    #[allow(clippy::type_complexity)]
1412    let mut splits: Vec<Option<Arc<dyn Fn(In) -> Out + Send + Sync>>> = vec![None; k];
1413
1414    let split0 = Arc::clone(&typed_split);
1415    let split1 = Arc::clone(&typed_split);
1416
1417    let ms_idx_for_out0 = outlet_to_ms_inlet[0].unwrap();
1418    let ms_idx_for_out1 = outlet_to_ms_inlet[1].unwrap();
1419
1420    splits[ms_idx_for_out0] = Some(Arc::new(move |input: In| split0(input).0));
1421    splits[ms_idx_for_out1] = Some(Arc::new(move |input: In| split1(input).1));
1422
1423    // All slots must be filled.
1424    if splits.iter().any(|s| s.is_none()) {
1425        return None;
1426    }
1427    let splits: Vec<Arc<dyn Fn(In) -> Out + Send + Sync>> =
1428        splits.into_iter().map(|s| s.unwrap()).collect();
1429
1430    Some(TypedMergeSequencePlan {
1431        splits,
1432        extract_sequence: typed_extract,
1433        core: MergeSequenceCore::new(k),
1434    })
1435}
1436
1437/// Execute a [`TypedMergeSequencePlan`], collecting all outputs.
1438///
1439/// Called from [`run_with_input_report_mode`] when the typed plan is available.
1440pub(crate) fn run_typed_merge_sequence<In, T, I>(
1441    plan: &mut TypedMergeSequencePlan<In, T>,
1442    input: I,
1443) -> StreamResult<Vec<T>>
1444where
1445    In: Clone + Send + 'static,
1446    T: Send + 'static,
1447    I: IntoIterator<Item = In>,
1448{
1449    plan.reset();
1450    // Pre-allocate with a 2× hint (each input produces 2 outputs).
1451    let input = input.into_iter();
1452    let hint = input.size_hint().0;
1453    let mut output: Vec<T> = Vec::with_capacity(hint * plan.splits.len());
1454    for item in input {
1455        plan.push_item(item, &mut output)?;
1456    }
1457    plan.finish(&mut output)?;
1458    Ok(output)
1459}
1460
1461// ---------------------------------------------------------------------------
1462// Phase 3b — Typed MergeLatest kernel (WP-18)
1463// ---------------------------------------------------------------------------
1464
1465/// Shared latest-value core for `MergeLatest<T>`.
1466///
1467/// Holds per-inlet latest slots, a seen/completed count, and the pending-snapshot
1468/// queue.  The typed plan allocates `Vec<T>` snapshots directly without boxing.
1469///
1470/// Used by the **typed** `MergeLatestPlan` execution path.  The erased executor
1471/// maintains equivalent state inline in [`StageState::MergeLatest`] (shared
1472/// semantics, forced equivalence via typed-vs-erased tests).
1473pub(crate) struct MergeLatestCore<T> {
1474    /// Latest value seen on each inlet, `None` until first push.
1475    latest: Vec<Option<T>>,
1476    /// Number of inlets that have received at least one value.
1477    seen_count: usize,
1478    /// Number of inlets that have completed.
1479    completed_count: usize,
1480    /// Total number of input inlets.
1481    input_count: usize,
1482    /// Snapshots ready to emit (built when all inlets seen at least once).
1483    pending: VecDeque<Vec<T>>,
1484    /// Whether this stage has finished.
1485    completed: bool,
1486    /// When `true`, complete on the first inlet completion (Akka eager semantics).
1487    eager_complete: bool,
1488}
1489
1490impl<T: Clone> MergeLatestCore<T> {
1491    pub(crate) fn new(input_count: usize, eager_complete: bool) -> Self {
1492        Self {
1493            latest: vec![None; input_count],
1494            seen_count: 0,
1495            completed_count: 0,
1496            input_count,
1497            pending: VecDeque::new(),
1498            completed: false,
1499            eager_complete,
1500        }
1501    }
1502
1503    /// Reset state for reuse across benchmark iterations.
1504    fn reset(&mut self) {
1505        for slot in &mut self.latest {
1506            *slot = None;
1507        }
1508        self.seen_count = 0;
1509        self.completed_count = 0;
1510        self.pending.clear();
1511        self.completed = false;
1512    }
1513
1514    /// Push a value on `inlet_index`, update latest, and enqueue a snapshot if
1515    /// all inlets have been seen at least once.
1516    fn push_item(&mut self, inlet_index: usize, val: T) {
1517        if self.latest[inlet_index].is_none() {
1518            self.seen_count += 1;
1519        }
1520        self.latest[inlet_index] = Some(val);
1521        if self.seen_count >= self.input_count {
1522            // Build a snapshot without boxing: clone each slot directly.
1523            let snapshot: Vec<T> = self
1524                .latest
1525                .iter()
1526                .map(|s| s.clone().expect("merge-latest typed: slot seen but None"))
1527                .collect();
1528            self.pending.push_back(snapshot);
1529        }
1530    }
1531
1532    /// Signal that one inlet has completed.
1533    ///
1534    /// Returns `true` if the stage should now terminate (all inlets done, or
1535    /// `eager_complete` with no pending output).
1536    fn on_inlet_complete(&mut self) -> bool {
1537        self.completed_count += 1;
1538        let all_done = self.completed_count >= self.input_count;
1539        let eager_done = self.eager_complete && self.pending.is_empty();
1540        if all_done || eager_done {
1541            self.completed = true;
1542            true
1543        } else {
1544            false
1545        }
1546    }
1547
1548    /// Drain all pending snapshots into `out`.
1549    fn drain_into(&mut self, out: &mut Vec<Vec<T>>) {
1550        out.extend(self.pending.drain(..));
1551    }
1552}
1553
1554// ---------------------------------------------------------------------------
1555// Typed MergeLatest plan: Unzip<In> → MergeLatest<T> topology
1556// ---------------------------------------------------------------------------
1557
1558/// A typed, fused execution plan for a graph whose only stages are one
1559/// `Unzip`-like fan-out followed by one `MergeLatest` fan-in, where every
1560/// inlet shares the same element type `T` and the output is `Vec<T>`.
1561///
1562/// Built once at plan time by [`try_typed_merge_latest_plan`]; executed by
1563/// [`run_typed_merge_latest`].
1564///
1565/// **No `DatumValue` boxing per element.**  Only the final `Vec<T>` snapshot
1566/// allocation is unavoidable (it is the genuine output type).
1567pub(crate) struct TypedMergeLatestPlan<In, T> {
1568    /// Per-outlet split functions: `splits[i](input)` produces the value for
1569    /// MergeLatest inlet `i`.
1570    splits: Vec<Arc<dyn Fn(In) -> T + Send + Sync>>,
1571    /// Mutable state, reset between runs.
1572    core: MergeLatestCore<T>,
1573}
1574
1575impl<In: Clone + Send + 'static, T: Clone + Send + 'static> TypedMergeLatestPlan<In, T> {
1576    /// Push one input item, fan it out to all inlets, and drain ready snapshots.
1577    fn push_item(&mut self, item: In, out: &mut Vec<Vec<T>>) {
1578        for (idx, split_fn) in self.splits.iter().enumerate() {
1579            let val = split_fn(item.clone());
1580            self.core.push_item(idx, val);
1581        }
1582        self.core.drain_into(out);
1583    }
1584
1585    /// Signal end-of-stream (the upstream Unzip completed — all inlets complete
1586    /// simultaneously).
1587    fn finish(&mut self) -> bool {
1588        // The Unzip completes all its outlets at once; each outlet is one ML inlet.
1589        for _ in 0..self.splits.len() {
1590            if self.core.on_inlet_complete() {
1591                return true;
1592            }
1593        }
1594        true
1595    }
1596
1597    /// Reset internal state so the plan can be reused across benchmark iterations.
1598    fn reset(&mut self) {
1599        self.core.reset();
1600    }
1601}
1602
1603/// Attempt to build a [`TypedMergeLatestPlan<In, T>`] for the given graph.
1604///
1605/// Succeeds when the graph has exactly these two stages (in topological order):
1606/// 1. A `StageKind::Unzip` with one inlet (type `In`) and `k` outlets (type `T`).
1607/// 2. A `StageKind::MergeLatest` with `k` inlets (type `T`) and one outlet
1608///    (type `Vec<T>`).
1609///
1610/// All `k` outlets of the Unzip must connect to the `k` inlets of the
1611/// MergeLatest, and all port types must match.
1612///
1613/// **Type parameters**: `In` is the graph inlet type; `T` is the MergeLatest
1614/// element type (each inlet and each slot has type `T`; the output per snapshot
1615/// is `Vec<T>`).
1616///
1617/// Returns `None` for any other topology (falls back to erased executor).
1618pub(crate) fn try_typed_merge_latest_plan<In, T>(
1619    stages: &[super::builder::StageRecord],
1620    edges: &[super::builder::Edge],
1621    graph_inlet: PortId,
1622    graph_outlet: PortId,
1623) -> Option<TypedMergeLatestPlan<In, T>>
1624where
1625    In: Clone + Send + 'static,
1626    T: Clone + Send + 'static,
1627{
1628    // We require exactly two stages.
1629    if stages.len() != 2 {
1630        return None;
1631    }
1632
1633    let in_type_id = TypeId::of::<In>();
1634    let elem_type_id = TypeId::of::<T>();
1635    let vec_type_id = TypeId::of::<Vec<T>>();
1636
1637    // Find the Unzip stage (the one whose inlet id matches the graph inlet).
1638    let unzip_idx = stages
1639        .iter()
1640        .position(|s| s.spec.inlets.len() == 1 && s.spec.inlets[0].id() == graph_inlet)?;
1641    let unzip_stage = &stages[unzip_idx];
1642
1643    // Must be a StageKind::Unzip.
1644    let typed_split_any = match &unzip_stage.spec.kind {
1645        StageKind::Unzip { typed_split, .. } => Arc::clone(typed_split),
1646        _ => return None,
1647    };
1648
1649    // The unzip inlet must have type In.
1650    if unzip_stage.spec.inlets[0].type_id() != in_type_id {
1651        return None;
1652    }
1653
1654    // All unzip outlets must have type T.
1655    let k = unzip_stage.spec.outlets.len();
1656    if k == 0 {
1657        return None;
1658    }
1659    for outlet in &unzip_stage.spec.outlets {
1660        if outlet.type_id() != elem_type_id {
1661            return None;
1662        }
1663    }
1664
1665    // Find the MergeLatest stage.
1666    let ml_idx = 1 - unzip_idx;
1667    let ml_stage = &stages[ml_idx];
1668
1669    // Must be a StageKind::MergeLatest with k inlets and 1 outlet.
1670    let (ml_input_count, typed_snapshot_any) = match &ml_stage.spec.kind {
1671        StageKind::MergeLatest {
1672            input_count,
1673            typed_snapshot,
1674            ..
1675        } => (*input_count, Arc::clone(typed_snapshot)),
1676        _ => return None,
1677    };
1678
1679    if ml_stage.spec.inlets.len() != k || ml_stage.spec.outlets.len() != 1 {
1680        return None;
1681    }
1682    if ml_input_count != k {
1683        return None;
1684    }
1685    // All MergeLatest inlets must have type T.
1686    for inlet in &ml_stage.spec.inlets {
1687        if inlet.type_id() != elem_type_id {
1688            return None;
1689        }
1690    }
1691    // MergeLatest outlet must have type Vec<T>.
1692    if ml_stage.spec.outlets[0].type_id() != vec_type_id {
1693        return None;
1694    }
1695
1696    // MergeLatest outlet must be the graph outlet.
1697    if ml_stage.spec.outlets[0].id() != graph_outlet {
1698        return None;
1699    }
1700
1701    // Verify edge wiring: each Unzip outlet must connect to one MergeLatest inlet.
1702    let unzip_outlet_ids: Vec<PortId> =
1703        unzip_stage.spec.outlets.iter().map(AnyOutlet::id).collect();
1704    let ml_inlet_ids: Vec<PortId> = ml_stage.spec.inlets.iter().map(AnyInlet::id).collect();
1705
1706    let mut outlet_to_ml_inlet: Vec<Option<usize>> = vec![None; k];
1707    for edge in edges {
1708        if let Some(uo_idx) = unzip_outlet_ids.iter().position(|&id| id == edge.outlet) {
1709            if let Some(mi_idx) = ml_inlet_ids.iter().position(|&id| id == edge.inlet) {
1710                outlet_to_ml_inlet[uo_idx] = Some(mi_idx);
1711            } else {
1712                return None; // Unzip outlet goes somewhere unexpected.
1713            }
1714        }
1715    }
1716    if outlet_to_ml_inlet.iter().any(|x| x.is_none()) {
1717        return None; // Not all Unzip outlets are wired to MergeLatest.
1718    }
1719
1720    // Only k=2 Unzip is supported (same as MergeSequence typed plan).
1721    if k != 2 {
1722        return None;
1723    }
1724
1725    // Downcast typed_split to Arc<dyn Fn(In) -> (T, T) + Send + Sync>.
1726    type SplitFn<A, B> = Arc<dyn Fn(A) -> (B, B) + Send + Sync>;
1727    let typed_split = typed_split_any.downcast_ref::<SplitFn<In, T>>()?;
1728    let typed_split = Arc::clone(typed_split);
1729
1730    // Verify typed_snapshot can be downcast (plan-time only; not called per-element).
1731    type SnapshotFn<U> = Arc<dyn Fn(&[Option<U>]) -> Vec<U> + Send + Sync>;
1732    typed_snapshot_any.downcast_ref::<SnapshotFn<T>>()?;
1733
1734    // Build per-inlet split closures respecting the wiring order.
1735    #[allow(clippy::type_complexity)]
1736    let mut splits: Vec<Option<Arc<dyn Fn(In) -> T + Send + Sync>>> = vec![None; k];
1737
1738    let split0 = Arc::clone(&typed_split);
1739    let split1 = Arc::clone(&typed_split);
1740
1741    let ml_idx_for_out0 = outlet_to_ml_inlet[0].unwrap();
1742    let ml_idx_for_out1 = outlet_to_ml_inlet[1].unwrap();
1743
1744    splits[ml_idx_for_out0] = Some(Arc::new(move |input: In| split0(input).0));
1745    splits[ml_idx_for_out1] = Some(Arc::new(move |input: In| split1(input).1));
1746
1747    if splits.iter().any(|s| s.is_none()) {
1748        return None;
1749    }
1750    let splits: Vec<Arc<dyn Fn(In) -> T + Send + Sync>> =
1751        splits.into_iter().map(|s| s.unwrap()).collect();
1752
1753    // Retrieve eager_complete from the MergeLatest stage kind.
1754    let eager_complete = match &ml_stage.spec.kind {
1755        StageKind::MergeLatest { eager_complete, .. } => *eager_complete,
1756        _ => return None,
1757    };
1758
1759    Some(TypedMergeLatestPlan {
1760        splits,
1761        core: MergeLatestCore::new(k, eager_complete),
1762    })
1763}
1764
1765// ---------------------------------------------------------------------------
1766// Phase 3b — per-run typed MergeLatest plan dispatcher
1767// ---------------------------------------------------------------------------
1768
1769/// Type alias for the boxed per-run MergeLatest runner returned by
1770/// [`try_build_typed_merge_latest_dispatch`].
1771///
1772/// The runner consumes a `&mut dyn Iterator<Item = In>` and returns
1773/// `StreamResult<Vec<Out>>`.  It is built fresh on every call to
1774/// [`run_with_input_report_mode`] — no mutable state is shared between runs.
1775type MergeLatestRunner<In, Out> =
1776    Box<dyn FnOnce(&mut dyn Iterator<Item = In>) -> StreamResult<Vec<Out>>>;
1777
1778/// Build a fresh one-shot typed MergeLatest runner for the given graph.
1779///
1780/// Inspects the stages to find a `MergeLatest` stage and reads its element
1781/// `TypeId`.  Then dispatches to the concrete `try_typed_merge_latest_plan`
1782/// instantiation for the matching type among the **bounded set** of 17 common
1783/// primitive/`String` types.
1784///
1785/// Returns `Some(runner)` if the topology matches and the element type is in
1786/// the supported set.  Returns `None` otherwise — callers fall back to the
1787/// erased executor (in `Auto` mode) or return a typed-unsupported error
1788/// (`TypedOnly`).
1789///
1790/// **Bounded set note**: covers all benchmark and typical user types.
1791/// A custom `T` not in this list falls back to the erased executor in `Auto`
1792/// mode; there is no silent data-loss.
1793///
1794/// **Blueprint independence**: each call builds a new [`TypedMergeLatestPlan`]
1795/// with its own [`MergeLatestCore`]; running the same blueprint concurrently
1796/// or sequentially produces independent, correct results.
1797pub(crate) fn try_build_typed_merge_latest_dispatch<In, Out>(
1798    stages: &[super::builder::StageRecord],
1799    edges: &[super::builder::Edge],
1800    inlet: PortId,
1801    outlet: PortId,
1802) -> Option<MergeLatestRunner<In, Out>>
1803where
1804    In: Clone + Send + 'static,
1805    Out: Send + 'static,
1806{
1807    // Read the element TypeId from the MergeLatest stage's inlet ports.
1808    let elem_type_id: TypeId = stages.iter().find_map(|s| {
1809        if let StageKind::MergeLatest { .. } = &s.spec.kind {
1810            s.spec.inlets.first().map(|i| i.type_id())
1811        } else {
1812            None
1813        }
1814    })?;
1815
1816    // Dispatch to the concrete instantiation for each supported element type.
1817    // The TypeId guard ensures we only call try_typed_merge_latest_plan when
1818    // T matches; the plan builder's own checks validate the full topology.
1819    //
1820    // The `Box<dyn Any>` round-trip for the output vector (`output` is a
1821    // `Vec<Vec<$T>>` which is the same as `Vec<Out>` when `Out == Vec<$T>`)
1822    // is a single allocation of the Vec header (24 bytes); no element data is
1823    // copied.  This is the minimum necessary to safely reinterpret the output
1824    // type without `unsafe` code.
1825    macro_rules! try_elem {
1826        ($($T:ty),*) => {
1827            $(
1828                if elem_type_id == TypeId::of::<$T>() {
1829                    let mut plan = try_typed_merge_latest_plan::<In, $T>(stages, edges, inlet, outlet)?;
1830                    // At this point Out == Vec<$T> (enforced by port TypeId checks
1831                    // inside try_typed_merge_latest_plan).
1832                    let runner: MergeLatestRunner<In, Out> = Box::new(
1833                        move |iter: &mut dyn Iterator<Item = In>| {
1834                            plan.reset();
1835                            let hint = iter.size_hint().0;
1836                            let mut output: Vec<Vec<$T>> = Vec::with_capacity(hint);
1837                            for item in iter {
1838                                plan.push_item(item, &mut output);
1839                            }
1840                            plan.finish();
1841                            // Vec<Vec<$T>> → Vec<Out>: safe because Out == Vec<$T>
1842                            // guaranteed by TypeId dispatch above and by
1843                            // try_typed_merge_latest_plan's port TypeId checks.
1844                            let boxed: Box<dyn Any + Send> = Box::new(output);
1845                            boxed
1846                                .downcast::<Vec<Out>>()
1847                                .map(|b| *b)
1848                                .map_err(|_| StreamError::Failed(
1849                                    "merge-latest typed runner: Out type mismatch".into()
1850                                ))
1851                        }
1852                    );
1853                    return Some(runner);
1854                }
1855            )*
1856        };
1857    }
1858
1859    try_elem!(
1860        u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize, f32, f64, bool, String
1861    );
1862
1863    // Element type not in the supported set → fall back to erased executor.
1864    None
1865}
1866
1867// ---------------------------------------------------------------------------
1868
1869pub(super) enum BoundaryCountExecutor {
1870    #[cfg(test)]
1871    Threaded,
1872    Ractor,
1873}
1874
1875impl BoundaryCountExecutor {
1876    pub(super) fn run_count<I, T>(
1877        &self,
1878        input: I,
1879        segments: TypedLinearSegments<T>,
1880        config: AsyncBoundaryExecutionConfig,
1881    ) -> StreamResult<FusedTerminalReport<usize>>
1882    where
1883        I: IntoIterator<Item = T> + Send,
1884        I::IntoIter: Send + 'static,
1885        T: Send + 'static,
1886    {
1887        match self {
1888            #[cfg(test)]
1889            Self::Threaded => run_threaded_async_linear_count(input, segments, config),
1890            Self::Ractor => run_ractor_async_linear_count(input, segments, config),
1891        }
1892    }
1893}
1894
1895#[cfg(test)]
1896mod tests {
1897    use super::*;
1898
1899    #[derive(Default)]
1900    struct BufferedFlowState {
1901        queued: VecDeque<i32>,
1902        upstream_closed: bool,
1903        pull_calls: usize,
1904        finish_calls: usize,
1905    }
1906
1907    struct BufferedFlowOnPull {
1908        state: Arc<Mutex<BufferedFlowState>>,
1909    }
1910
1911    impl GraphStage for BufferedFlowOnPull {
1912        type Shape = FlowShape<i32, i32>;
1913
1914        fn name(&self) -> &str {
1915            "BufferedFlowOnPull"
1916        }
1917
1918        fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
1919            let first_id = next_port_id_block(2);
1920            FlowShape::new(
1921                Inlet::with_id(first_id, "buffered-flow.in"),
1922                Outlet::with_id(first_id.offset(1), "buffered-flow.out"),
1923            )
1924        }
1925
1926        fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
1927            StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
1928        }
1929
1930        fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
1931            struct In {
1932                state: Arc<Mutex<BufferedFlowState>>,
1933            }
1934
1935            impl InHandler for In {
1936                fn on_push(
1937                    &mut self,
1938                    logic: &mut GraphStageLogic,
1939                    inlet: AnyInlet,
1940                ) -> StreamResult<()> {
1941                    let value: i32 = logic.grab_datum(inlet.id()).and_then(|value| {
1942                        downcast_datum(value, "grab", || format!("inlet#{}", inlet.id().as_usize()))
1943                    })?;
1944                    self.state.lock().unwrap().queued.push_back(value);
1945                    Ok(())
1946                }
1947
1948                fn on_upstream_finish(
1949                    &mut self,
1950                    _logic: &mut GraphStageLogic,
1951                    _inlet: AnyInlet,
1952                ) -> StreamResult<()> {
1953                    self.state.lock().unwrap().upstream_closed = true;
1954                    Ok(())
1955                }
1956            }
1957
1958            struct Out {
1959                outlet: Outlet<i32>,
1960                state: Arc<Mutex<BufferedFlowState>>,
1961            }
1962
1963            impl OutHandler for Out {
1964                fn on_pull(
1965                    &mut self,
1966                    logic: &mut GraphStageLogic,
1967                    _outlet: AnyOutlet,
1968                ) -> StreamResult<()> {
1969                    let (next, upstream_closed) = {
1970                        let mut state = self.state.lock().unwrap();
1971                        state.pull_calls += 1;
1972                        (state.queued.pop_front(), state.upstream_closed)
1973                    };
1974                    if let Some(value) = next {
1975                        logic.emit(&self.outlet, value)
1976                    } else if upstream_closed {
1977                        logic.complete(&self.outlet)
1978                    } else {
1979                        Ok(())
1980                    }
1981                }
1982
1983                fn on_downstream_finish(
1984                    &mut self,
1985                    logic: &mut GraphStageLogic,
1986                    _outlet: AnyOutlet,
1987                ) -> StreamResult<()> {
1988                    self.state.lock().unwrap().finish_calls += 1;
1989                    logic.complete_stage()
1990                }
1991            }
1992
1993            let mut logic = GraphStageLogic::new(shape);
1994            logic
1995                .set_handler(
1996                    &shape.inlet(),
1997                    Box::new(In {
1998                        state: Arc::clone(&self.state),
1999                    }),
2000                )
2001                .unwrap();
2002            logic
2003                .set_out_handler(
2004                    &shape.outlet(),
2005                    Box::new(Out {
2006                        outlet: shape.outlet(),
2007                        state: Arc::clone(&self.state),
2008                    }),
2009                )
2010                .unwrap();
2011            logic
2012        }
2013    }
2014
2015    struct EmitMultipleThenFailOnPush;
2016
2017    impl GraphStage for EmitMultipleThenFailOnPush {
2018        type Shape = FlowShape<i32, i32>;
2019
2020        fn name(&self) -> &str {
2021            "EmitMultipleThenFailOnPush"
2022        }
2023
2024        fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
2025            let first_id = next_port_id_block(2);
2026            FlowShape::new(
2027                Inlet::with_id(first_id, "emit-fail.in"),
2028                Outlet::with_id(first_id.offset(1), "emit-fail.out"),
2029            )
2030        }
2031
2032        fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
2033            StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
2034        }
2035
2036        fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
2037            struct Handler {
2038                outlet: Outlet<i32>,
2039            }
2040
2041            impl InHandler for Handler {
2042                fn on_push(
2043                    &mut self,
2044                    logic: &mut GraphStageLogic,
2045                    _inlet: AnyInlet,
2046                ) -> StreamResult<()> {
2047                    logic.emit_multiple(&self.outlet, [1, 2])?;
2048                    Err(StreamError::Failed("emit_multiple boom".into()))
2049                }
2050            }
2051
2052            let mut logic = GraphStageLogic::new(shape);
2053            logic
2054                .set_handler(
2055                    &shape.inlet(),
2056                    Box::new(Handler {
2057                        outlet: shape.outlet(),
2058                    }),
2059                )
2060                .unwrap();
2061            logic
2062        }
2063    }
2064
2065    struct ReadNThenFailOnFinish;
2066
2067    struct EmitMultipleOnPush;
2068
2069    impl GraphStage for EmitMultipleOnPush {
2070        type Shape = FlowShape<i32, i32>;
2071
2072        fn name(&self) -> &str {
2073            "EmitMultipleOnPush"
2074        }
2075
2076        fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
2077            let first_id = next_port_id_block(2);
2078            FlowShape::new(
2079                Inlet::with_id(first_id, "emit-multiple.in"),
2080                Outlet::with_id(first_id.offset(1), "emit-multiple.out"),
2081            )
2082        }
2083
2084        fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
2085            StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
2086        }
2087
2088        fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
2089            struct Handler {
2090                outlet: Outlet<i32>,
2091            }
2092
2093            impl InHandler for Handler {
2094                fn on_push(
2095                    &mut self,
2096                    logic: &mut GraphStageLogic,
2097                    _inlet: AnyInlet,
2098                ) -> StreamResult<()> {
2099                    logic.emit_multiple(&self.outlet, [1, 2])
2100                }
2101            }
2102
2103            let mut logic = GraphStageLogic::new(shape);
2104            logic
2105                .set_handler(
2106                    &shape.inlet(),
2107                    Box::new(Handler {
2108                        outlet: shape.outlet(),
2109                    }),
2110                )
2111                .unwrap();
2112            logic
2113        }
2114    }
2115
2116    impl GraphStage for ReadNThenFailOnFinish {
2117        type Shape = FlowShape<i32, i32>;
2118
2119        fn name(&self) -> &str {
2120            "ReadNThenFailOnFinish"
2121        }
2122
2123        fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
2124            let first_id = next_port_id_block(2);
2125            FlowShape::new(
2126                Inlet::with_id(first_id, "read-n.in"),
2127                Outlet::with_id(first_id.offset(1), "read-n.out"),
2128            )
2129        }
2130
2131        fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
2132            StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
2133        }
2134
2135        fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
2136            struct Handler {
2137                inlet: Inlet<i32>,
2138                armed: bool,
2139            }
2140
2141            impl InHandler for Handler {
2142                fn on_push(
2143                    &mut self,
2144                    logic: &mut GraphStageLogic,
2145                    _inlet: AnyInlet,
2146                ) -> StreamResult<()> {
2147                    if !self.armed {
2148                        self.armed = true;
2149                        logic.read_n(&self.inlet, 2, |_values| {}, |_values| {})
2150                    } else {
2151                        Ok(())
2152                    }
2153                }
2154
2155                fn on_upstream_finish(
2156                    &mut self,
2157                    _logic: &mut GraphStageLogic,
2158                    _inlet: AnyInlet,
2159                ) -> StreamResult<()> {
2160                    Err(StreamError::Failed("read_n finish boom".into()))
2161                }
2162            }
2163
2164            let mut logic = GraphStageLogic::new(shape);
2165            logic
2166                .set_handler(
2167                    &shape.inlet(),
2168                    Box::new(Handler {
2169                        inlet: shape.inlet(),
2170                        armed: false,
2171                    }),
2172                )
2173                .unwrap();
2174            logic
2175        }
2176    }
2177
2178    fn single_opaque_stage_graph<G>(stage: G) -> GraphBlueprint<FlowShape<i32, i32>>
2179    where
2180        G: GraphStage<Shape = FlowShape<i32, i32>>,
2181    {
2182        GraphDsl::create(|builder| builder.add(stage)).unwrap()
2183    }
2184
2185    #[test]
2186    fn process_push_restores_handler_before_emit_multiple_error_propagates() {
2187        let graph = single_opaque_stage_graph(EmitMultipleThenFailOnPush);
2188        let inlet = graph.shape.inlet().id();
2189        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2190
2191        let result = executor.process_stage(0, inlet, datum(10));
2192
2193        assert!(matches!(
2194            result,
2195            Err(StreamError::Failed(message)) if message == "emit_multiple boom"
2196        ));
2197        assert!(
2198            executor.opaque_logics[0]
2199                .as_mut()
2200                .unwrap()
2201                .get_in_handler_mut(inlet)
2202                .is_some()
2203        );
2204    }
2205
2206    #[test]
2207    fn process_completion_restores_handler_before_read_n_finish_error_propagates() {
2208        let graph = single_opaque_stage_graph(ReadNThenFailOnFinish);
2209        let inlet = graph.shape.inlet().id();
2210        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2211
2212        executor.process_stage(0, inlet, datum(1)).unwrap();
2213        executor.process_stage(0, inlet, datum(2)).unwrap();
2214        let result = executor.process_completion(0, inlet);
2215
2216        assert!(matches!(
2217            result,
2218            Err(StreamError::Failed(message)) if message == "read_n finish boom"
2219        ));
2220        assert!(
2221            executor.opaque_logics[0]
2222                .as_mut()
2223                .unwrap()
2224                .get_in_handler_mut(inlet)
2225                .is_some()
2226        );
2227    }
2228
2229    #[test]
2230    fn opaque_request_drives_out_handler_for_buffered_output() {
2231        let state = Arc::new(Mutex::new(BufferedFlowState::default()));
2232        let graph = single_opaque_stage_graph(BufferedFlowOnPull {
2233            state: Arc::clone(&state),
2234        });
2235        let inlet = graph.shape.inlet().id();
2236        let outlet = graph.shape.outlet().id();
2237        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2238        let mut output = Vec::<i32>::new();
2239        let mut output_sink = VecOutputSink {
2240            output: &mut output,
2241        };
2242
2243        executor
2244            .deliver(inlet, datum(7_i32), outlet, &mut output_sink)
2245            .unwrap();
2246        assert!(output_sink.output.is_empty());
2247
2248        executor.request(outlet, outlet, &mut output_sink).unwrap();
2249
2250        assert_eq!(&*output_sink.output, &[7]);
2251        let state = state.lock().unwrap();
2252        assert_eq!(state.pull_calls, 2);
2253        assert_eq!(state.finish_calls, 0);
2254    }
2255
2256    #[test]
2257    fn opaque_downstream_finish_before_first_demand_invokes_out_handler() {
2258        let state = Arc::new(Mutex::new(BufferedFlowState::default()));
2259        let graph = single_opaque_stage_graph(BufferedFlowOnPull {
2260            state: Arc::clone(&state),
2261        });
2262        let outlet = graph.shape.outlet().id();
2263        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2264        let mut output = Vec::<i32>::new();
2265        let mut output_sink = VecOutputSink {
2266            output: &mut output,
2267        };
2268
2269        executor
2270            .downstream_finish(outlet, outlet, &mut output_sink)
2271            .unwrap();
2272        executor.request(outlet, outlet, &mut output_sink).unwrap();
2273
2274        assert!(output_sink.output.is_empty());
2275        let state = state.lock().unwrap();
2276        assert_eq!(state.pull_calls, 0);
2277        assert_eq!(state.finish_calls, 1);
2278    }
2279
2280    #[test]
2281    fn opaque_downstream_finish_drops_buffered_output_after_upstream_complete() {
2282        let state = Arc::new(Mutex::new(BufferedFlowState::default()));
2283        let graph = single_opaque_stage_graph(BufferedFlowOnPull {
2284            state: Arc::clone(&state),
2285        });
2286        let inlet = graph.shape.inlet().id();
2287        let outlet = graph.shape.outlet().id();
2288        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2289        let mut output = Vec::<i32>::new();
2290        let mut output_sink = VecOutputSink {
2291            output: &mut output,
2292        };
2293
2294        executor
2295            .deliver(inlet, datum(11_i32), outlet, &mut output_sink)
2296            .unwrap();
2297        executor.complete(inlet, outlet, &mut output_sink).unwrap();
2298        executor
2299            .downstream_finish(outlet, outlet, &mut output_sink)
2300            .unwrap();
2301        executor.request(outlet, outlet, &mut output_sink).unwrap();
2302
2303        assert!(output_sink.output.is_empty());
2304        let state = state.lock().unwrap();
2305        assert_eq!(state.finish_calls, 1);
2306    }
2307
2308    #[test]
2309    fn broadcast_cancels_upstream_only_after_all_outlets_cancel() {
2310        let graph = GraphDsl::try_create(|builder| {
2311            let broadcast = builder.add(Broadcast::<i32>::new(2));
2312            let merge = builder.add(Merge::<i32>::new(2));
2313            builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2314            builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2315            Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2316        })
2317        .unwrap();
2318
2319        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2320        let broadcast_index = *executor
2321            .stage_by_inlet
2322            .get(&graph.shape.inlet().id())
2323            .unwrap();
2324        let first = graph.stages[broadcast_index].spec.outlets[0].id();
2325        let second = graph.stages[broadcast_index].spec.outlets[1].id();
2326
2327        let first_transition = executor
2328            .process_downstream_finish(broadcast_index, first)
2329            .unwrap();
2330        assert!(first_transition.cancelled_inlets.is_empty());
2331
2332        let second_transition = executor
2333            .process_downstream_finish(broadcast_index, second)
2334            .unwrap();
2335        assert_eq!(
2336            second_transition.cancelled_inlets,
2337            vec![graph.stages[broadcast_index].spec.inlets[0].id()]
2338        );
2339    }
2340
2341    #[test]
2342    fn downstream_finish_propagates_through_merge_and_broadcast() {
2343        let graph = GraphDsl::try_create(|builder| {
2344            let broadcast = builder.add(Broadcast::<i32>::new(2));
2345            let merge = builder.add(Merge::<i32>::new(2));
2346            builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2347            builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2348            Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2349        })
2350        .unwrap();
2351
2352        let outlet = graph.shape.outlet().id();
2353        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2354        let mut output = Vec::<i32>::new();
2355        let mut output_sink = VecOutputSink {
2356            output: &mut output,
2357        };
2358
2359        executor
2360            .downstream_finish(outlet, outlet, &mut output_sink)
2361            .unwrap();
2362
2363        let broadcast_index = *executor
2364            .stage_by_inlet
2365            .get(&graph.shape.inlet().id())
2366            .unwrap();
2367        let StageState::Broadcast {
2368            live_outlets,
2369            cancelled_outlets,
2370            ..
2371        } = &executor.stage_states[broadcast_index]
2372        else {
2373            panic!("expected broadcast state");
2374        };
2375        assert_eq!(*live_outlets, 0);
2376        assert_eq!(cancelled_outlets, &vec![true, true]);
2377    }
2378
2379    #[test]
2380    fn partition_holds_routed_element_until_target_outlet_pulls() {
2381        let graph = GraphDsl::create(|builder| {
2382            builder.add(Partition::<i32>::new(2, |value| usize::from(*value >= 10)))
2383        })
2384        .unwrap();
2385
2386        let stage_index = 0usize;
2387        let inlet = graph.shape.inlet().id();
2388        let out0 = graph.shape.outlet(0).unwrap().id();
2389        let out1 = graph.shape.outlet(1).unwrap().id();
2390        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2391
2392        executor.process_pull(stage_index, out0).unwrap();
2393        let transition = executor
2394            .process_stage(stage_index, inlet, datum(11_i32))
2395            .unwrap();
2396        assert!(matches!(transition.emissions, StageEmissions::None));
2397
2398        let pull_transition = executor.process_pull(stage_index, out1).unwrap();
2399        match pull_transition.emissions {
2400            StageEmissions::One(port, value) => {
2401                assert_eq!(port, out1);
2402                assert_eq!(
2403                    downcast_datum::<i32, _>(value, "emit", || "Partition.out1").unwrap(),
2404                    11
2405                );
2406            }
2407            _ => panic!("expected one pending partition emission"),
2408        }
2409    }
2410
2411    #[test]
2412    fn partition_cancels_upstream_only_after_all_outlets_cancel_when_not_eager() {
2413        let graph = GraphDsl::create(|builder| {
2414            builder.add(Partition::<i32>::new(2, |value| usize::from(*value >= 10)))
2415        })
2416        .unwrap();
2417
2418        let stage_index = 0usize;
2419        let out0 = graph.shape.outlet(0).unwrap().id();
2420        let out1 = graph.shape.outlet(1).unwrap().id();
2421        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2422
2423        let first = executor
2424            .process_downstream_finish(stage_index, out0)
2425            .unwrap();
2426        assert!(first.cancelled_inlets.is_empty());
2427
2428        let second = executor
2429            .process_downstream_finish(stage_index, out1)
2430            .unwrap();
2431        assert_eq!(second.cancelled_inlets, vec![graph.shape.inlet().id()]);
2432    }
2433
2434    #[test]
2435    fn unzip_continues_emitting_to_live_outlet_after_peer_cancels() {
2436        let graph =
2437            GraphDsl::create(|builder| builder.add(Unzip::<i32, &'static str>::new())).unwrap();
2438
2439        let stage_index = 0usize;
2440        let inlet = graph.shape.inlet().id();
2441        let out0 = graph.shape.out0().id();
2442        let out1 = graph.shape.out1().id();
2443        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2444
2445        executor.process_pull(stage_index, out0).unwrap();
2446        executor.process_pull(stage_index, out1).unwrap();
2447        let cancel = executor
2448            .process_downstream_finish(stage_index, out1)
2449            .unwrap();
2450        assert!(cancel.cancelled_inlets.is_empty());
2451
2452        let transition = executor
2453            .process_stage(stage_index, inlet, datum((7_i32, "seven")))
2454            .unwrap();
2455        match transition.emissions {
2456            StageEmissions::One(port, value) => {
2457                assert_eq!(port, out0);
2458                assert_eq!(
2459                    downcast_datum::<i32, _>(value, "emit", || "Unzip.out0").unwrap(),
2460                    7
2461                );
2462            }
2463            StageEmissions::Many(values) => {
2464                assert_eq!(values.len(), 1);
2465                assert_eq!(values[0].0, out0);
2466            }
2467            _ => panic!("expected emission to the remaining live unzip outlet"),
2468        }
2469    }
2470
2471    #[test]
2472    fn unzip_cancels_upstream_only_after_both_outlets_cancel() {
2473        let graph =
2474            GraphDsl::create(|builder| builder.add(Unzip::<i32, &'static str>::new())).unwrap();
2475
2476        let stage_index = 0usize;
2477        let out0 = graph.shape.out0().id();
2478        let out1 = graph.shape.out1().id();
2479        let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
2480
2481        let first = executor
2482            .process_downstream_finish(stage_index, out0)
2483            .unwrap();
2484        assert!(first.cancelled_inlets.is_empty());
2485
2486        let second = executor
2487            .process_downstream_finish(stage_index, out1)
2488            .unwrap();
2489        assert_eq!(second.cancelled_inlets, vec![graph.shape.inlet().id()]);
2490    }
2491
2492    #[test]
2493    fn opaque_internal_outlet_repulls_after_first_emission() {
2494        let graph = GraphDsl::try_create(|builder| {
2495            let opaque = builder.add(EmitMultipleOnPush);
2496            let identity = builder.add(Identity::<i32>::new());
2497            builder.connect(opaque.outlet(), identity.inlet())?;
2498            Ok(FlowShape::new(opaque.inlet(), identity.outlet()))
2499        })
2500        .unwrap();
2501
2502        assert_eq!(graph.run_with_input([10]).unwrap(), vec![1, 2]);
2503    }
2504
2505    // -- Phase 0 (WP-18): ExecutorMode scaffolding tests -------------------
2506
2507    /// Verifies that `Auto` and `ErasedOnly` produce identical results on a
2508    /// representative junction graph (Broadcast→Merge) and that `TypedOnly`
2509    /// errors cleanly (junctions are not yet on the typed path).
2510    ///
2511    /// Graph topology:
2512    ///   in → Broadcast(2) → out0 → Merge(2).in0 → out
2513    ///                      → out1 → Merge(2).in1
2514    ///
2515    /// This exercises fan-out + fan-in junctions that cannot hit the typed
2516    /// linear fast path, so Auto falls back to the erased executor.
2517    #[test]
2518    fn executor_mode_auto_erased_identical_typed_errors() {
2519        let graph = GraphDsl::try_create(|builder| {
2520            let broadcast = builder.add(Broadcast::<i32>::new(2));
2521            let merge = builder.add(Merge::<i32>::new(2));
2522            builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2523            builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2524            Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2525        })
2526        .unwrap();
2527
2528        let input = vec![1, 2, 3];
2529
2530        let auto_result = graph
2531            .run_with_input_mode(input.clone(), ExecutorMode::Auto)
2532            .unwrap();
2533        let erased_result = graph
2534            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2535            .unwrap();
2536
2537        // Auto and ErasedOnly must produce identical output (broadcast
2538        // duplicates each element onto both merge inlets; merge emits both).
2539        assert_eq!(auto_result, erased_result);
2540        // Each element is broadcast to both merge inlets, so output is 2× input.
2541        assert_eq!(auto_result.len(), input.len() * 2);
2542
2543        // TypedOnly must error because junctions are not on the typed path yet.
2544        let typed_result = graph.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly);
2545        assert!(
2546            matches!(
2547                typed_result,
2548                Err(StreamError::GraphValidation(ref msg))
2549                if msg.contains("typed executor does not support this graph shape")
2550            ),
2551            "expected TypedOnly to error for junction graph, got: {typed_result:?}"
2552        );
2553    }
2554
2555    // -- Phase 2 (WP-18): typed-vs-erased equivalence tests ----------------
2556
2557    /// Builds an identity chain of `n` stages with the given type.
2558    fn identity_chain_bp(n: usize) -> GraphBlueprint<FlowShape<i64, i64>> {
2559        assert!(n >= 1);
2560        GraphDsl::try_create(|builder| {
2561            let first = builder.add(Identity::<i64>::new());
2562            let inlet = first.inlet();
2563            let mut outlet = first.outlet();
2564            for _ in 1..n {
2565                let next = builder.add(Identity::<i64>::new());
2566                builder.connect(outlet, next.inlet())?;
2567                outlet = next.outlet();
2568            }
2569            Ok(FlowShape::new(inlet, outlet))
2570        })
2571        .unwrap()
2572    }
2573
2574    /// Builds a map chain of `n` stages that double each value.
2575    fn map_chain_bp(n: usize) -> GraphBlueprint<FlowShape<i64, i64>> {
2576        assert!(n >= 1);
2577        GraphDsl::try_create(|builder| {
2578            let first = builder.add(MapStage::new(|x: i64| x.wrapping_mul(2)));
2579            let inlet = first.inlet();
2580            let mut outlet = first.outlet();
2581            for _ in 1..n {
2582                let next = builder.add(MapStage::new(|x: i64| x.wrapping_mul(2)));
2583                builder.connect(outlet, next.inlet())?;
2584                outlet = next.outlet();
2585            }
2586            Ok(FlowShape::new(inlet, outlet))
2587        })
2588        .unwrap()
2589    }
2590
2591    /// `ErasedOnly` and `TypedOnly` (via `run_with_input_mode`) produce
2592    /// identical output on a 5-stage identity graph.
2593    #[test]
2594    fn typed_erased_equivalence_identity_collect() {
2595        let graph = identity_chain_bp(5);
2596        let input: Vec<i64> = (0..20).collect();
2597
2598        let erased = graph
2599            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2600            .unwrap();
2601        let typed = graph
2602            .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2603            .unwrap();
2604
2605        assert_eq!(
2606            typed, erased,
2607            "typed and erased paths disagree on identity×5"
2608        );
2609    }
2610
2611    /// `ErasedOnly` and `TypedOnly` produce identical count on a 5-stage
2612    /// identity graph.
2613    #[test]
2614    fn typed_erased_equivalence_identity_count() {
2615        let graph = identity_chain_bp(5);
2616        let input: Vec<i64> = (0..20).collect();
2617        let config = FusedExecutionConfig::default();
2618
2619        let erased = graph
2620            .run_count_with_input_report_mode(input.clone(), config, ExecutorMode::ErasedOnly)
2621            .unwrap()
2622            .result;
2623        let typed = graph
2624            .run_count_with_input_report_mode(input.clone(), config, ExecutorMode::TypedOnly)
2625            .unwrap()
2626            .result;
2627
2628        assert_eq!(typed, erased, "typed and erased count differ on identity×5");
2629    }
2630
2631    /// `ErasedOnly` and `TypedOnly` produce identical fold result on a 5-stage
2632    /// map graph (each stage doubles the value).
2633    #[test]
2634    fn typed_erased_equivalence_map_fold() {
2635        let graph = map_chain_bp(5);
2636        let input: Vec<i64> = (1..=10).collect();
2637        let config = FusedExecutionConfig::default();
2638
2639        let erased = graph
2640            .run_fold_with_input_report_mode(
2641                input.clone(),
2642                0i64,
2643                |acc, x| acc.wrapping_add(x),
2644                config,
2645                ExecutorMode::ErasedOnly,
2646            )
2647            .unwrap()
2648            .result;
2649        let typed = graph
2650            .run_fold_with_input_report_mode(
2651                input.clone(),
2652                0i64,
2653                |acc, x| acc.wrapping_add(x),
2654                config,
2655                ExecutorMode::TypedOnly,
2656            )
2657            .unwrap()
2658            .result;
2659
2660        assert_eq!(typed, erased, "typed and erased fold differ on map×5");
2661    }
2662
2663    /// `ErasedOnly` and `TypedOnly` produce identical output on a 5-stage map
2664    /// graph.
2665    #[test]
2666    fn typed_erased_equivalence_map_collect() {
2667        let graph = map_chain_bp(5);
2668        let input: Vec<i64> = (0..20).collect();
2669
2670        let erased = graph
2671            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2672            .unwrap();
2673        let typed = graph
2674            .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2675            .unwrap();
2676
2677        assert_eq!(typed, erased, "typed and erased paths disagree on map×5");
2678    }
2679
2680    /// `TypedOnly` on a junction graph (Broadcast→Merge) returns a
2681    /// `GraphValidation` error (junctions not yet on the typed path).
2682    #[test]
2683    fn typed_only_errors_on_junction_graph() {
2684        let graph = GraphDsl::try_create(|builder| {
2685            let broadcast = builder.add(Broadcast::<i32>::new(2));
2686            let merge = builder.add(Merge::<i32>::new(2));
2687            builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2688            builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2689            Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2690        })
2691        .unwrap();
2692
2693        let result = graph.run_with_input_mode(vec![1], ExecutorMode::TypedOnly);
2694        assert!(
2695            matches!(
2696                result,
2697                Err(StreamError::GraphValidation(ref msg))
2698                if msg.contains("typed executor does not support this graph shape")
2699            ),
2700            "expected TypedOnly to error on junction, got: {result:?}"
2701        );
2702    }
2703
2704    /// `Auto` on a junction graph falls back silently to the erased executor.
2705    #[test]
2706    fn auto_falls_back_silently_for_junction_graph() {
2707        let graph = GraphDsl::try_create(|builder| {
2708            let broadcast = builder.add(Broadcast::<i32>::new(2));
2709            let merge = builder.add(Merge::<i32>::new(2));
2710            builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
2711            builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
2712            Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
2713        })
2714        .unwrap();
2715
2716        let result = graph.run_with_input_mode(vec![1, 2, 3], ExecutorMode::Auto);
2717        assert!(result.is_ok(), "Auto should succeed (fallback to erased)");
2718        assert_eq!(result.unwrap().len(), 6); // broadcast × 2
2719    }
2720
2721    // -- Phase 3a (WP-18): typed MergeSequence equivalence tests ------------
2722
2723    /// Build the Unzip → MergeSequence graph used in the benchmark.
2724    fn merge_sequence_graph() -> GraphBlueprint<FlowShape<(u64, u64), u64>> {
2725        GraphDsl::try_create(|builder| {
2726            let unzip = builder.add(Unzip::<u64, u64>::new());
2727            let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
2728            builder.connect(unzip.out0(), merge.inlet(0)?)?;
2729            builder.connect(unzip.out1(), merge.inlet(1)?)?;
2730            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
2731        })
2732        .unwrap()
2733    }
2734
2735    /// `TypedOnly` accepts the Unzip → MergeSequence topology (does not
2736    /// error with "typed executor does not support this graph shape").
2737    #[test]
2738    fn typed_only_accepts_merge_sequence_topology() {
2739        let graph = merge_sequence_graph();
2740        let result =
2741            graph.run_with_input_mode(vec![(0u64, 1u64), (2u64, 3u64)], ExecutorMode::TypedOnly);
2742        assert!(
2743            result.is_ok(),
2744            "TypedOnly should accept Unzip→MergeSequence topology, got: {result:?}"
2745        );
2746    }
2747
2748    /// `ErasedOnly` and `TypedOnly` produce identical output on an in-order
2749    /// Unzip → MergeSequence graph.
2750    #[test]
2751    fn typed_erased_equivalence_merge_sequence_in_order() {
2752        let graph = merge_sequence_graph();
2753        let input: Vec<(u64, u64)> = (0..10).step_by(2).map(|i| (i, i + 1)).collect();
2754
2755        let erased = graph
2756            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2757            .unwrap();
2758        let typed = graph
2759            .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2760            .unwrap();
2761
2762        assert_eq!(
2763            typed, erased,
2764            "typed and erased disagree on in-order merge_sequence"
2765        );
2766        // Verify the output is actually sorted.
2767        let expected: Vec<u64> = (0..10).collect();
2768        assert_eq!(typed, expected);
2769    }
2770
2771    /// `ErasedOnly` and `TypedOnly` produce identical output on an adversarial
2772    /// (out-of-order) Unzip → MergeSequence graph where pairs are (even, odd)
2773    /// with `even > odd - 1`, forcing out-of-order arrivals.
2774    #[test]
2775    fn typed_erased_equivalence_merge_sequence_out_of_order() {
2776        let graph = merge_sequence_graph();
2777        // Pairs (1,0), (3,2), (5,4): out0 gets larger sequence, out1 smaller.
2778        let input: Vec<(u64, u64)> = vec![(1, 0), (3, 2), (5, 4)];
2779
2780        let erased = graph
2781            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2782            .unwrap();
2783        let typed = graph
2784            .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2785            .unwrap();
2786
2787        assert_eq!(
2788            typed, erased,
2789            "typed and erased disagree on out-of-order merge_sequence"
2790        );
2791        // Output should be sorted.
2792        assert_eq!(typed, vec![0u64, 1, 2, 3, 4, 5]);
2793    }
2794
2795    /// Both `ErasedOnly` and `TypedOnly` fail with a sequence-gap error when
2796    /// the pending items can never be resolved at completion.
2797    ///
2798    /// This is the #78 regression: `merge_sequence_fails_on_gap_at_completion`.
2799    #[test]
2800    fn typed_erased_equivalence_merge_sequence_gap_failure() {
2801        // Sequences 1 and 2 arrive (via the unzip split of pair (1, 2)), but
2802        // sequence 0 never does.  Both paths must return an error.
2803        let graph = merge_sequence_graph();
2804        let input = vec![(1u64, 2u64)];
2805
2806        let erased = graph.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly);
2807        let typed = graph.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly);
2808
2809        assert!(
2810            matches!(&erased, Err(StreamError::Failed(msg)) if msg.contains("expected sequence")),
2811            "ErasedOnly should fail on gap: {erased:?}"
2812        );
2813        assert!(
2814            matches!(&typed, Err(StreamError::Failed(msg)) if msg.contains("expected sequence")),
2815            "TypedOnly should fail on gap: {typed:?}"
2816        );
2817    }
2818
2819    /// `ErasedOnly` and `TypedOnly` both complete cleanly with a single-item
2820    /// in-order input.
2821    #[test]
2822    fn typed_erased_equivalence_merge_sequence_completion() {
2823        let graph = merge_sequence_graph();
2824        let input = vec![(0u64, 1u64)];
2825
2826        let erased = graph
2827            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2828            .unwrap();
2829        let typed = graph
2830            .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2831            .unwrap();
2832
2833        assert_eq!(typed, erased, "typed and erased disagree on completion");
2834        assert_eq!(typed, vec![0u64, 1u64]);
2835    }
2836
2837    /// `Auto` selects the typed path for the Unzip → MergeSequence topology
2838    /// and produces the same result as `ErasedOnly`.
2839    #[test]
2840    fn auto_selects_typed_for_merge_sequence_topology() {
2841        let graph = merge_sequence_graph();
2842        let input: Vec<(u64, u64)> = (0..20).step_by(2).map(|i| (i, i + 1)).collect();
2843
2844        let auto_result = graph
2845            .run_with_input_mode(input.clone(), ExecutorMode::Auto)
2846            .unwrap();
2847        let erased_result = graph
2848            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2849            .unwrap();
2850
2851        assert_eq!(
2852            auto_result, erased_result,
2853            "Auto and ErasedOnly disagree on merge_sequence topology"
2854        );
2855    }
2856
2857    // -- Phase 3b (WP-18): typed MergeLatest equivalence tests ---------------
2858
2859    /// Build the Unzip → MergeLatest graph used in the benchmark.
2860    fn merge_latest_graph_exec() -> GraphBlueprint<FlowShape<(u64, u64), Vec<u64>>> {
2861        GraphDsl::try_create(|builder| {
2862            let unzip = builder.add(Unzip::<u64, u64>::new());
2863            let merge = builder.add(MergeLatest::<u64>::new(2, false));
2864            builder.connect(unzip.out0(), merge.inlet(0)?)?;
2865            builder.connect(unzip.out1(), merge.inlet(1)?)?;
2866            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
2867        })
2868        .unwrap()
2869    }
2870
2871    /// Build a Unzip → MergeLatest graph with `eager_complete = true`.
2872    fn merge_latest_eager_graph_exec() -> GraphBlueprint<FlowShape<(i32, i32), Vec<i32>>> {
2873        GraphDsl::try_create(|builder| {
2874            let unzip = builder.add(Unzip::<i32, i32>::new());
2875            let merge = builder.add(MergeLatest::<i32>::new(2, true));
2876            builder.connect(unzip.out0(), merge.inlet(0)?)?;
2877            builder.connect(unzip.out1(), merge.inlet(1)?)?;
2878            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
2879        })
2880        .unwrap()
2881    }
2882
2883    /// `TypedOnly` accepts the Unzip → MergeLatest topology.
2884    #[test]
2885    fn typed_only_accepts_merge_latest_topology() {
2886        let graph = merge_latest_graph_exec();
2887        let result =
2888            graph.run_with_input_mode(vec![(0u64, 1u64), (2u64, 3u64)], ExecutorMode::TypedOnly);
2889        assert!(
2890            result.is_ok(),
2891            "TypedOnly should accept Unzip→MergeLatest topology, got: {result:?}"
2892        );
2893    }
2894
2895    /// `ErasedOnly` and `TypedOnly` produce identical latest-snapshot sequences.
2896    #[test]
2897    fn typed_erased_equivalence_merge_latest_snapshot_ordering() {
2898        let graph = merge_latest_graph_exec();
2899        // Each pair (a, b) updates both inlets; after the first pair all inlets
2900        // are seen and every subsequent pair also produces a snapshot.
2901        let input: Vec<(u64, u64)> = (0..10).map(|i| (i, i + 100)).collect();
2902
2903        let erased = graph
2904            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2905            .unwrap();
2906        let typed = graph
2907            .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2908            .unwrap();
2909
2910        assert_eq!(
2911            typed, erased,
2912            "typed and erased disagree on snapshot ordering"
2913        );
2914        // All snapshots should have length 2 (one entry per inlet).
2915        assert!(
2916            typed.iter().all(|s| s.len() == 2),
2917            "snapshots must have len 2"
2918        );
2919    }
2920
2921    /// Partial-fill: first item sees only 1 of 2 inlets; no snapshot until both seen.
2922    #[test]
2923    fn typed_erased_equivalence_merge_latest_partial_fill() {
2924        // With a single input pair, both inlets get their first value simultaneously
2925        // (Unzip splits into two values), so exactly one snapshot is produced.
2926        let graph = merge_latest_graph_exec();
2927        let input = vec![(5u64, 42u64)];
2928
2929        let erased = graph
2930            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2931            .unwrap();
2932        let typed = graph
2933            .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2934            .unwrap();
2935
2936        assert_eq!(typed, erased, "typed and erased disagree on partial-fill");
2937        // Exactly one snapshot, containing [5, 42] in inlet order.
2938        assert_eq!(typed.len(), 1, "expected exactly one snapshot");
2939    }
2940
2941    /// Eager-complete: #78 regression must pass in BOTH `ErasedOnly` and `TypedOnly`.
2942    ///
2943    /// With `eager_complete = true`, the graph should complete as soon as any
2944    /// inlet finishes.  Since the Unzip stage completes all outlets simultaneously,
2945    /// both paths must produce the same result.
2946    #[test]
2947    fn typed_erased_equivalence_merge_latest_eager_complete() {
2948        let graph_eager = merge_latest_eager_graph_exec();
2949        let input = vec![(1i32, 10i32)];
2950
2951        let erased = graph_eager
2952            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2953            .unwrap();
2954        let typed = graph_eager
2955            .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2956            .unwrap();
2957
2958        assert_eq!(
2959            typed, erased,
2960            "typed and erased disagree on eager-complete behavior"
2961        );
2962        assert!(
2963            !typed.is_empty(),
2964            "eager-complete graph should produce at least one snapshot"
2965        );
2966    }
2967
2968    /// Completion: a single pair produces one snapshot; both paths complete cleanly.
2969    #[test]
2970    fn typed_erased_equivalence_merge_latest_completion() {
2971        let graph = merge_latest_graph_exec();
2972        let input = vec![(0u64, 1u64)];
2973
2974        let erased = graph
2975            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2976            .unwrap();
2977        let typed = graph
2978            .run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
2979            .unwrap();
2980
2981        assert_eq!(typed, erased, "typed and erased disagree on completion");
2982    }
2983
2984    /// `Auto` selects the typed path for the Unzip → MergeLatest topology and
2985    /// produces the same result as `ErasedOnly`.
2986    #[test]
2987    fn auto_selects_typed_for_merge_latest_topology() {
2988        let graph = merge_latest_graph_exec();
2989        let input: Vec<(u64, u64)> = (0..20).map(|i| (i, i + 1_000)).collect();
2990
2991        let auto_result = graph
2992            .run_with_input_mode(input.clone(), ExecutorMode::Auto)
2993            .unwrap();
2994        let erased_result = graph
2995            .run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
2996            .unwrap();
2997
2998        assert_eq!(
2999            auto_result, erased_result,
3000            "Auto and ErasedOnly disagree on merge_latest topology"
3001        );
3002    }
3003
3004    // -- Phase 3b blueprint-independence tests --------------------------------
3005
3006    /// Running the same MergeLatest blueprint twice (sequentially) produces
3007    /// independent, correct results — no shared mutable state between runs.
3008    ///
3009    /// Pins the fix that removed the `TypedPlanCache` / `MergeLatestRunnerCell`
3010    /// (which serialised concurrent reuse on a `Mutex` and shared one execution
3011    /// core across runs).  The typed path must build a fresh `MergeLatestCore`
3012    /// per call to `run_with_input_report_mode`.
3013    #[test]
3014    fn merge_latest_blueprint_sequential_reuse_is_independent() {
3015        let graph = merge_latest_graph_exec();
3016        let input_a: Vec<(u64, u64)> = (0..5).map(|i| (i, i + 100)).collect();
3017        let input_b: Vec<(u64, u64)> = (10..15).map(|i| (i, i + 200)).collect();
3018
3019        // Run the same blueprint with two different inputs.
3020        let result_a_typed = graph
3021            .run_with_input_mode(input_a.clone(), ExecutorMode::TypedOnly)
3022            .unwrap();
3023        let result_b_typed = graph
3024            .run_with_input_mode(input_b.clone(), ExecutorMode::TypedOnly)
3025            .unwrap();
3026
3027        // Each run should match its own erased reference.
3028        let result_a_erased = graph
3029            .run_with_input_mode(input_a, ExecutorMode::ErasedOnly)
3030            .unwrap();
3031        let result_b_erased = graph
3032            .run_with_input_mode(input_b, ExecutorMode::ErasedOnly)
3033            .unwrap();
3034
3035        assert_eq!(
3036            result_a_typed, result_a_erased,
3037            "sequential run A: typed and erased disagree"
3038        );
3039        assert_eq!(
3040            result_b_typed, result_b_erased,
3041            "sequential run B: typed and erased disagree"
3042        );
3043        // The two runs must NOT share state — run B must not contain run A's values.
3044        assert_ne!(
3045            result_a_typed, result_b_typed,
3046            "runs A and B should differ (different inputs)"
3047        );
3048    }
3049
3050    /// Running the same MergeLatest blueprint concurrently from two threads
3051    /// produces independent, correct results — no Mutex serialisation, no
3052    /// shared execution state.
3053    ///
3054    /// This is the concurrency variant of `merge_latest_blueprint_sequential_reuse_is_independent`.
3055    #[test]
3056    fn merge_latest_blueprint_concurrent_reuse_is_independent() {
3057        use std::sync::Arc as StdArc;
3058
3059        // Wrap in Arc so both threads can share the (immutable) blueprint.
3060        let graph = StdArc::new(merge_latest_graph_exec());
3061
3062        let input_a: Vec<(u64, u64)> = (0..50).map(|i| (i, i + 1_000)).collect();
3063        let input_b: Vec<(u64, u64)> = (100..150).map(|i| (i, i + 2_000)).collect();
3064
3065        let graph_a = StdArc::clone(&graph);
3066        let graph_b = StdArc::clone(&graph);
3067        let ia = input_a.clone();
3068        let ib = input_b.clone();
3069
3070        let handle_a =
3071            std::thread::spawn(move || graph_a.run_with_input_mode(ia, ExecutorMode::TypedOnly));
3072        let handle_b =
3073            std::thread::spawn(move || graph_b.run_with_input_mode(ib, ExecutorMode::TypedOnly));
3074
3075        let result_a = handle_a.join().expect("thread A panicked").unwrap();
3076        let result_b = handle_b.join().expect("thread B panicked").unwrap();
3077
3078        // Reference results from the erased executor.
3079        let ref_a = graph
3080            .run_with_input_mode(input_a, ExecutorMode::ErasedOnly)
3081            .unwrap();
3082        let ref_b = graph
3083            .run_with_input_mode(input_b, ExecutorMode::ErasedOnly)
3084            .unwrap();
3085
3086        assert_eq!(
3087            result_a, ref_a,
3088            "concurrent run A: typed and erased disagree"
3089        );
3090        assert_eq!(
3091            result_b, ref_b,
3092            "concurrent run B: typed and erased disagree"
3093        );
3094        // Sanity: the two runs must have produced different outputs.
3095        assert_ne!(result_a, result_b, "concurrent runs must be independent");
3096    }
3097}
3098
3099enum AsyncLinearMessage<T> {
3100    Item(T),
3101    Done,
3102    Failed(StreamError),
3103}
3104
3105enum RactorBoundaryCommand {
3106    Run,
3107}
3108
3109#[cfg(feature = "cluster")]
3110impl ractor::Message for RactorBoundaryCommand {}
3111
3112#[cfg(test)]
3113fn run_threaded_async_linear_count<I, T>(
3114    input: I,
3115    segments: TypedLinearSegments<T>,
3116    config: AsyncBoundaryExecutionConfig,
3117) -> StreamResult<FusedTerminalReport<usize>>
3118where
3119    I: IntoIterator<Item = T> + Send,
3120    I::IntoIter: Send,
3121    T: Send + 'static,
3122{
3123    let channels = segments.segments.len() + 1;
3124    let mut senders = Vec::with_capacity(channels);
3125    let mut receivers = Vec::with_capacity(channels);
3126    for _ in 0..channels {
3127        let (sender, receiver) = mpsc::sync_channel(config.buffer_size);
3128        senders.push(sender);
3129        receivers.push(Some(receiver));
3130    }
3131
3132    let first_sender = senders
3133        .first()
3134        .expect("at least one async segment channel")
3135        .clone();
3136    let mut final_receiver = Some(
3137        receivers
3138            .last_mut()
3139            .expect("at least one async segment channel")
3140            .take()
3141            .expect("final receiver is present"),
3142    );
3143    let events = AtomicUsize::new(0);
3144    let async_boundary_crossings = AtomicUsize::new(0);
3145
3146    let result = thread::scope(|scope| {
3147        let source = scope.spawn(move || feed_threaded_async_linear_input(input, first_sender));
3148        let mut workers = Vec::with_capacity(segments.segments.len());
3149
3150        for (index, steps) in segments.segments.iter().enumerate() {
3151            let input = receivers[index].take().expect("worker receiver is present");
3152            let output = senders[index + 1].clone();
3153            let has_boundary_after = index + 1 < segments.segments.len();
3154            let events = &events;
3155            let async_boundary_crossings = &async_boundary_crossings;
3156            workers.push(scope.spawn(move || {
3157                run_threaded_async_linear_segment(
3158                    steps,
3159                    input,
3160                    output,
3161                    has_boundary_after,
3162                    events,
3163                    async_boundary_crossings,
3164                    config,
3165                )
3166            }));
3167        }
3168        drop(senders);
3169
3170        let final_rx = final_receiver.take().expect("final receiver present");
3171        let mut count = 0;
3172        let mut terminal_error = None;
3173        loop {
3174            match final_rx.recv() {
3175                Ok(AsyncLinearMessage::Item(_)) => count += 1,
3176                Ok(AsyncLinearMessage::Done) => break,
3177                Ok(AsyncLinearMessage::Failed(error)) => {
3178                    terminal_error = Some(error);
3179                    break;
3180                }
3181                Err(_) => {
3182                    terminal_error = Some(StreamError::AbruptTermination);
3183                    break;
3184                }
3185            }
3186        }
3187        drop(final_rx);
3188
3189        let mut worker_error = join_threaded_async_linear_worker(source)?;
3190        for worker in workers {
3191            if worker_error.is_none() {
3192                worker_error = join_threaded_async_linear_worker(worker)?;
3193            } else {
3194                let _ = join_threaded_async_linear_worker(worker);
3195            }
3196        }
3197
3198        match (terminal_error, worker_error) {
3199            (Some(error), _) if error != StreamError::AbruptTermination => return Err(error),
3200            (_, Some(error)) => return Err(error),
3201            (Some(error), None) => return Err(error),
3202            (None, None) => {}
3203        }
3204
3205        Ok(count)
3206    });
3207
3208    Ok(FusedTerminalReport {
3209        result: result?,
3210        events: events.load(Ordering::Relaxed),
3211        async_boundary_crossings: async_boundary_crossings.load(Ordering::Relaxed),
3212    })
3213}
3214
3215fn run_ractor_async_linear_count<I, T>(
3216    input: I,
3217    segments: TypedLinearSegments<T>,
3218    config: AsyncBoundaryExecutionConfig,
3219) -> StreamResult<FusedTerminalReport<usize>>
3220where
3221    I: IntoIterator<Item = T> + Send,
3222    I::IntoIter: Send + 'static,
3223    T: Send + 'static,
3224{
3225    if config.buffer_size == 0 {
3226        return Err(StreamError::GraphValidation(
3227            "ractor async boundary execution requires buffer_size greater than zero".into(),
3228        ));
3229    }
3230
3231    let input = input.into_iter();
3232    let runtime = ractor_boundary_runtime()?;
3233    if tokio::runtime::Handle::try_current().is_ok() {
3234        thread::scope(|scope| {
3235            let handle = scope.spawn(move || {
3236                runtime.block_on(run_ractor_async_linear_count_on_runtime(
3237                    input, segments, config,
3238                ))
3239            });
3240            handle.join().map_err(|_| {
3241                StreamError::Failed("ractor async boundary runtime thread panicked".into())
3242            })?
3243        })
3244    } else {
3245        runtime.block_on(run_ractor_async_linear_count_on_runtime(
3246            input, segments, config,
3247        ))
3248    }
3249}
3250
3251fn ractor_boundary_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
3252    static RUNTIME: OnceLock<Result<tokio::runtime::Runtime, String>> = OnceLock::new();
3253
3254    match RUNTIME.get_or_init(|| {
3255        tokio::runtime::Builder::new_multi_thread()
3256            .build()
3257            .map_err(|error| format!("ractor async boundary runtime failed to start: {error}"))
3258    }) {
3259        Ok(runtime) => Ok(runtime),
3260        Err(error) => Err(StreamError::Failed(error.clone())),
3261    }
3262}
3263
3264async fn run_ractor_async_linear_count_on_runtime<I, T>(
3265    input: I,
3266    segments: TypedLinearSegments<T>,
3267    config: AsyncBoundaryExecutionConfig,
3268) -> StreamResult<FusedTerminalReport<usize>>
3269where
3270    I: Iterator<Item = T> + Send + 'static,
3271    T: Send + 'static,
3272{
3273    let channels = segments.segments.len() + 1;
3274    let mut senders = Vec::with_capacity(channels);
3275    let mut receivers = Vec::with_capacity(channels);
3276    for _ in 0..channels {
3277        let (sender, receiver) = ractor::concurrency::mpsc_bounded(config.buffer_size);
3278        senders.push(sender);
3279        receivers.push(Some(receiver));
3280    }
3281
3282    let first_sender = senders
3283        .first()
3284        .expect("at least one async segment channel")
3285        .clone();
3286    let mut final_receiver = receivers
3287        .last_mut()
3288        .expect("at least one async segment channel")
3289        .take()
3290        .expect("final receiver is present");
3291    let events = Arc::new(AtomicUsize::new(0));
3292    let async_boundary_crossings = Arc::new(AtomicUsize::new(0));
3293
3294    let (source_ref, source_handle) = Actor::spawn(
3295        None,
3296        RactorLinearSourceActor::<I, T>::new(),
3297        RactorLinearSourceState {
3298            input: Some(input),
3299            output: first_sender,
3300        },
3301    )
3302    .await
3303    .map_err(ractor_spawn_error)?;
3304
3305    let mut actors = Vec::with_capacity(segments.segments.len() + 1);
3306    actors.push((source_ref, source_handle));
3307
3308    for (index, steps) in segments.segments.into_iter().enumerate() {
3309        let input = receivers[index].take().expect("worker receiver is present");
3310        let output = senders[index + 1].clone();
3311        let has_boundary_after = index + 1 < channels - 1;
3312        let (worker_ref, worker_handle) = match Actor::spawn(
3313            None,
3314            RactorLinearSegmentActor::<T>::new(),
3315            RactorLinearSegmentState {
3316                steps,
3317                input,
3318                output,
3319                has_boundary_after,
3320                events: Arc::clone(&events),
3321                async_boundary_crossings: Arc::clone(&async_boundary_crossings),
3322                config,
3323            },
3324        )
3325        .await
3326        {
3327            Ok(actor) => actor,
3328            Err(error) => {
3329                let error = ractor_spawn_error(error);
3330                stop_ractor_async_linear_actors(&actors);
3331                let _ = join_ractor_async_linear_actors(actors).await;
3332                return Err(error);
3333            }
3334        };
3335        actors.push((worker_ref, worker_handle));
3336    }
3337    drop(senders);
3338
3339    let mut start_error = None;
3340    for (actor, _) in &actors {
3341        if actor.send_message(RactorBoundaryCommand::Run).is_err() {
3342            start_error = Some(StreamError::AbruptTermination);
3343            break;
3344        }
3345    }
3346    if let Some(error) = start_error {
3347        stop_ractor_async_linear_actors(&actors);
3348        let _ = join_ractor_async_linear_actors(actors).await;
3349        return Err(error);
3350    }
3351
3352    let mut count = 0;
3353    let mut terminal_error = None;
3354    loop {
3355        match final_receiver.recv().await {
3356            Some(AsyncLinearMessage::Item(_)) => count += 1,
3357            Some(AsyncLinearMessage::Done) => break,
3358            Some(AsyncLinearMessage::Failed(error)) => {
3359                terminal_error = Some(error);
3360                break;
3361            }
3362            None => {
3363                terminal_error = Some(StreamError::AbruptTermination);
3364                break;
3365            }
3366        }
3367    }
3368    drop(final_receiver);
3369
3370    stop_ractor_async_linear_actors(&actors);
3371    let actor_error = join_ractor_async_linear_actors(actors).await;
3372
3373    match (terminal_error, actor_error) {
3374        (Some(error), _) if error != StreamError::AbruptTermination => return Err(error),
3375        (_, Some(error)) => return Err(error),
3376        (Some(error), None) => return Err(error),
3377        (None, None) => {}
3378    }
3379
3380    Ok(FusedTerminalReport {
3381        result: count,
3382        events: events.load(Ordering::Relaxed),
3383        async_boundary_crossings: async_boundary_crossings.load(Ordering::Relaxed),
3384    })
3385}
3386
3387struct RactorLinearSourceActor<I, T> {
3388    _marker: PhantomData<fn() -> (I, T)>,
3389}
3390
3391impl<I, T> RactorLinearSourceActor<I, T> {
3392    fn new() -> Self {
3393        Self {
3394            _marker: PhantomData,
3395        }
3396    }
3397}
3398
3399struct RactorLinearSourceState<I, T> {
3400    input: Option<I>,
3401    output: ractor::concurrency::MpscSender<AsyncLinearMessage<T>>,
3402}
3403
3404impl<I, T> Actor for RactorLinearSourceActor<I, T>
3405where
3406    I: Iterator<Item = T> + Send + 'static,
3407    T: Send + 'static,
3408{
3409    type Msg = RactorBoundaryCommand;
3410    type State = RactorLinearSourceState<I, T>;
3411    type Arguments = RactorLinearSourceState<I, T>;
3412
3413    async fn pre_start(
3414        &self,
3415        _myself: ActorRef<Self::Msg>,
3416        args: Self::Arguments,
3417    ) -> Result<Self::State, ActorProcessingErr> {
3418        Ok(args)
3419    }
3420
3421    async fn handle(
3422        &self,
3423        myself: ActorRef<Self::Msg>,
3424        message: Self::Msg,
3425        state: &mut Self::State,
3426    ) -> Result<(), ActorProcessingErr> {
3427        match message {
3428            RactorBoundaryCommand::Run => {
3429                let input = state.input.take().ok_or_else(|| {
3430                    actor_processing_error(StreamError::GraphValidation(
3431                        "ractor async boundary source actor was run more than once".into(),
3432                    ))
3433                })?;
3434                feed_ractor_async_linear_input(input, &state.output)
3435                    .await
3436                    .map_err(actor_processing_error)?;
3437                myself.stop(None);
3438            }
3439        }
3440        Ok(())
3441    }
3442}
3443
3444struct RactorLinearSegmentActor<T> {
3445    _marker: PhantomData<fn() -> T>,
3446}
3447
3448impl<T> RactorLinearSegmentActor<T> {
3449    fn new() -> Self {
3450        Self {
3451            _marker: PhantomData,
3452        }
3453    }
3454}
3455
3456struct RactorLinearSegmentState<T> {
3457    steps: Vec<TypedLinearStep<T>>,
3458    input: ractor::concurrency::MpscReceiver<AsyncLinearMessage<T>>,
3459    output: ractor::concurrency::MpscSender<AsyncLinearMessage<T>>,
3460    has_boundary_after: bool,
3461    events: Arc<AtomicUsize>,
3462    async_boundary_crossings: Arc<AtomicUsize>,
3463    config: AsyncBoundaryExecutionConfig,
3464}
3465
3466impl<T> Actor for RactorLinearSegmentActor<T>
3467where
3468    T: Send + 'static,
3469{
3470    type Msg = RactorBoundaryCommand;
3471    type State = RactorLinearSegmentState<T>;
3472    type Arguments = RactorLinearSegmentState<T>;
3473
3474    async fn pre_start(
3475        &self,
3476        _myself: ActorRef<Self::Msg>,
3477        args: Self::Arguments,
3478    ) -> Result<Self::State, ActorProcessingErr> {
3479        Ok(args)
3480    }
3481
3482    async fn handle(
3483        &self,
3484        myself: ActorRef<Self::Msg>,
3485        message: Self::Msg,
3486        state: &mut Self::State,
3487    ) -> Result<(), ActorProcessingErr> {
3488        match message {
3489            RactorBoundaryCommand::Run => {
3490                run_ractor_async_linear_segment(state)
3491                    .await
3492                    .map_err(actor_processing_error)?;
3493                myself.stop(None);
3494            }
3495        }
3496        Ok(())
3497    }
3498}
3499
3500async fn feed_ractor_async_linear_input<I, T>(
3501    input: I,
3502    output: &ractor::concurrency::MpscSender<AsyncLinearMessage<T>>,
3503) -> StreamResult<()>
3504where
3505    I: Iterator<Item = T>,
3506{
3507    for item in input {
3508        output
3509            .send(AsyncLinearMessage::Item(item))
3510            .await
3511            .map_err(|_| StreamError::AbruptTermination)?;
3512    }
3513    output
3514        .send(AsyncLinearMessage::Done)
3515        .await
3516        .map_err(|_| StreamError::AbruptTermination)
3517}
3518
3519async fn run_ractor_async_linear_segment<T>(
3520    state: &mut RactorLinearSegmentState<T>,
3521) -> StreamResult<()>
3522where
3523    T: Send + 'static,
3524{
3525    loop {
3526        match state.input.recv().await {
3527            Some(AsyncLinearMessage::Item(item)) => {
3528                let result =
3529                    run_async_linear_item(item, &state.steps, &state.events, state.config.fused)
3530                        .and_then(|item| {
3531                            if state.has_boundary_after {
3532                                bump_fused_event_atomic(&state.events, state.config.fused)?;
3533                                state
3534                                    .async_boundary_crossings
3535                                    .fetch_add(1, Ordering::Relaxed);
3536                                bump_fused_event_atomic(&state.events, state.config.fused)?;
3537                            }
3538                            Ok(item)
3539                        });
3540
3541                match result {
3542                    Ok(item) => state
3543                        .output
3544                        .send(AsyncLinearMessage::Item(item))
3545                        .await
3546                        .map_err(|_| StreamError::AbruptTermination)?,
3547                    Err(error) => {
3548                        let _ = state
3549                            .output
3550                            .send(AsyncLinearMessage::Failed(error.clone()))
3551                            .await;
3552                        return Err(error);
3553                    }
3554                }
3555            }
3556            Some(AsyncLinearMessage::Done) => {
3557                state
3558                    .output
3559                    .send(AsyncLinearMessage::Done)
3560                    .await
3561                    .map_err(|_| StreamError::AbruptTermination)?;
3562                return Ok(());
3563            }
3564            Some(AsyncLinearMessage::Failed(error)) => {
3565                let _ = state
3566                    .output
3567                    .send(AsyncLinearMessage::Failed(error.clone()))
3568                    .await;
3569                return Err(error);
3570            }
3571            None => return Err(StreamError::AbruptTermination),
3572        }
3573    }
3574}
3575
3576async fn join_ractor_async_linear_actor(
3577    handle: ractor::concurrency::JoinHandle<()>,
3578) -> StreamResult<()> {
3579    handle.await.map_err(|error| {
3580        StreamError::Failed(format!("ractor async boundary actor task failed: {error}"))
3581    })
3582}
3583
3584fn stop_ractor_async_linear_actors(
3585    actors: &[(
3586        ActorRef<RactorBoundaryCommand>,
3587        ractor::concurrency::JoinHandle<()>,
3588    )],
3589) {
3590    for (actor, _) in actors {
3591        actor.stop(None);
3592    }
3593}
3594
3595#[cfg_attr(not(test), allow(dead_code))]
3596async fn join_ractor_async_linear_actors(
3597    actors: Vec<(
3598        ActorRef<RactorBoundaryCommand>,
3599        ractor::concurrency::JoinHandle<()>,
3600    )>,
3601) -> Option<StreamError> {
3602    let mut actor_error = None;
3603    for (_, handle) in actors {
3604        let result = join_ractor_async_linear_actor(handle).await;
3605        if actor_error.is_some() {
3606            continue;
3607        }
3608        if let Err(error) = result {
3609            actor_error = Some(error);
3610        }
3611    }
3612    actor_error
3613}
3614
3615fn ractor_spawn_error(error: ractor::SpawnErr) -> StreamError {
3616    StreamError::Failed(format!(
3617        "ractor async boundary actor failed to spawn: {error}"
3618    ))
3619}
3620
3621fn actor_processing_error(error: StreamError) -> ActorProcessingErr {
3622    Box::new(error)
3623}
3624
3625#[cfg(test)]
3626fn feed_threaded_async_linear_input<I, T>(
3627    input: I,
3628    output: mpsc::SyncSender<AsyncLinearMessage<T>>,
3629) -> StreamResult<()>
3630where
3631    I: IntoIterator<Item = T>,
3632{
3633    for item in input {
3634        output
3635            .send(AsyncLinearMessage::Item(item))
3636            .map_err(|_| StreamError::AbruptTermination)?;
3637    }
3638    output
3639        .send(AsyncLinearMessage::Done)
3640        .map_err(|_| StreamError::AbruptTermination)
3641}
3642
3643#[cfg(test)]
3644fn run_threaded_async_linear_segment<T>(
3645    steps: &[TypedLinearStep<T>],
3646    input: mpsc::Receiver<AsyncLinearMessage<T>>,
3647    output: mpsc::SyncSender<AsyncLinearMessage<T>>,
3648    has_boundary_after: bool,
3649    events: &AtomicUsize,
3650    async_boundary_crossings: &AtomicUsize,
3651    config: AsyncBoundaryExecutionConfig,
3652) -> StreamResult<()>
3653where
3654    T: Send + 'static,
3655{
3656    loop {
3657        match input.recv().map_err(|_| StreamError::AbruptTermination)? {
3658            AsyncLinearMessage::Item(item) => {
3659                let result =
3660                    run_async_linear_item(item, steps, events, config.fused).and_then(|item| {
3661                        if has_boundary_after {
3662                            bump_fused_event_atomic(events, config.fused)?;
3663                            async_boundary_crossings.fetch_add(1, Ordering::Relaxed);
3664                            bump_fused_event_atomic(events, config.fused)?;
3665                        }
3666                        output
3667                            .send(AsyncLinearMessage::Item(item))
3668                            .map_err(|_| StreamError::AbruptTermination)
3669                    });
3670                if let Err(error) = result {
3671                    let _ = output.send(AsyncLinearMessage::Failed(error.clone()));
3672                    return Err(error);
3673                }
3674            }
3675            AsyncLinearMessage::Done => {
3676                output
3677                    .send(AsyncLinearMessage::Done)
3678                    .map_err(|_| StreamError::AbruptTermination)?;
3679                return Ok(());
3680            }
3681            AsyncLinearMessage::Failed(error) => {
3682                let _ = output.send(AsyncLinearMessage::Failed(error.clone()));
3683                return Err(error);
3684            }
3685        }
3686    }
3687}
3688
3689fn run_async_linear_item<T>(
3690    mut item: T,
3691    steps: &[TypedLinearStep<T>],
3692    events: &AtomicUsize,
3693    config: FusedExecutionConfig,
3694) -> StreamResult<T>
3695where
3696    T: Send + 'static,
3697{
3698    for step in steps {
3699        bump_fused_event_atomic(events, config)?;
3700        match step {
3701            TypedLinearStep::Pass => {}
3702            TypedLinearStep::Map(mapper) => {
3703                item = mapper(item);
3704            }
3705            TypedLinearStep::AsyncBoundary => {
3706                return Err(StreamError::GraphValidation(
3707                    "async boundary execution expects pre-split linear segments".into(),
3708                ));
3709            }
3710        }
3711        bump_fused_event_atomic(events, config)?;
3712    }
3713    Ok(item)
3714}
3715
3716#[cfg(test)]
3717fn join_threaded_async_linear_worker(
3718    handle: thread::ScopedJoinHandle<'_, StreamResult<()>>,
3719) -> StreamResult<Option<StreamError>> {
3720    match handle.join() {
3721        Ok(Ok(())) => Ok(None),
3722        Ok(Err(error)) => Ok(Some(error)),
3723        Err(_) => Err(StreamError::Failed("async boundary worker panicked".into())),
3724    }
3725}
3726
3727fn bump_fused_event_atomic(events: &AtomicUsize, config: FusedExecutionConfig) -> StreamResult<()> {
3728    let events = events.fetch_add(1, Ordering::Relaxed) + 1;
3729    if events > config.event_limit {
3730        return Err(StreamError::EventLimitExceeded {
3731            limit: config.event_limit,
3732        });
3733    }
3734    Ok(())
3735}
3736
3737impl<T> GraphBlueprint<FanInShape<T, T>>
3738where
3739    T: Clone + Send + 'static,
3740{
3741    pub fn run_fan_in(&self, inputs: Vec<Vec<T>>) -> StreamResult<Vec<T>> {
3742        Ok(self
3743            .run_fan_in_report(inputs, FusedExecutionConfig::default())?
3744            .output)
3745    }
3746
3747    pub fn run_fan_in_report(
3748        &self,
3749        inputs: Vec<Vec<T>>,
3750        config: FusedExecutionConfig,
3751    ) -> StreamResult<FusedExecutionReport<T>> {
3752        if inputs.len() != self.shape.inlet_count() {
3753            return Err(StreamError::GraphValidation(format!(
3754                "expected {} input streams, got {}",
3755                self.shape.inlet_count(),
3756                inputs.len()
3757            )));
3758        }
3759
3760        let output_capacity = inputs.iter().map(Vec::len).sum();
3761        let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
3762        let schedule = self.fan_in_schedule();
3763        let mut schedule_index = 0;
3764        let outlet = self.shape.outlet().id();
3765        let mut executor = FusedExecutor::new(self, config);
3766        let mut output = Vec::with_capacity(output_capacity);
3767        let mut completed = vec![false; queues.len()];
3768
3769        {
3770            let mut output_sink = VecOutputSink {
3771                output: &mut output,
3772            };
3773            for (index, queue) in queues.iter().enumerate() {
3774                if queue.len() == 0 {
3775                    executor.complete(self.shape.inlet(index)?.id(), outlet, &mut output_sink)?;
3776                    completed[index] = true;
3777                }
3778            }
3779            while queues.iter().any(|queue| queue.len() > 0) {
3780                let input_index = next_scheduled_input(&queues, &schedule, &mut schedule_index)
3781                    .ok_or_else(|| {
3782                        StreamError::GraphValidation("no runnable fan-in input".into())
3783                    })?;
3784                let item = queues[input_index]
3785                    .next()
3786                    .expect("scheduled input had an item");
3787                executor.deliver(
3788                    self.shape.inlet(input_index)?.id(),
3789                    datum(item),
3790                    outlet,
3791                    &mut output_sink,
3792                )?;
3793                if queues[input_index].len() == 0 && !completed[input_index] {
3794                    executor.complete(
3795                        self.shape.inlet(input_index)?.id(),
3796                        outlet,
3797                        &mut output_sink,
3798                    )?;
3799                    completed[input_index] = true;
3800                }
3801            }
3802        }
3803
3804        Ok(FusedExecutionReport {
3805            output,
3806            events: executor.events,
3807            async_boundary_crossings: executor.async_boundary_crossings,
3808        })
3809    }
3810
3811    fn fan_in_schedule(&self) -> Vec<usize> {
3812        self.stages
3813            .iter()
3814            .find_map(|stage| match &stage.spec.kind {
3815                StageKind::MergePrioritized { weights }
3816                    if weights.len() == self.shape.inlet_count()
3817                        && stage.spec.outlets.len() == 1
3818                        && stage.spec.outlets[0].id() == self.shape.outlet().id()
3819                        && stage.spec.inlets.iter().map(AnyInlet::id).eq(self
3820                            .shape
3821                            .inlets()
3822                            .iter()
3823                            .map(|inlet| inlet.id())) =>
3824                {
3825                    Some(
3826                        weights
3827                            .iter()
3828                            .enumerate()
3829                            .flat_map(|(index, weight)| std::iter::repeat_n(index, *weight))
3830                            .collect(),
3831                    )
3832                }
3833                _ => None,
3834            })
3835            .unwrap_or_else(|| (0..self.shape.inlet_count()).collect())
3836    }
3837
3838    pub fn run_concat(&self, inputs: Vec<Vec<T>>) -> StreamResult<Vec<T>> {
3839        Ok(self
3840            .run_concat_report(inputs, FusedExecutionConfig::default())?
3841            .output)
3842    }
3843
3844    pub fn run_concat_report(
3845        &self,
3846        inputs: Vec<Vec<T>>,
3847        config: FusedExecutionConfig,
3848    ) -> StreamResult<FusedExecutionReport<T>> {
3849        if inputs.len() != self.shape.inlet_count() {
3850            return Err(StreamError::GraphValidation(format!(
3851                "expected {} input streams, got {}",
3852                self.shape.inlet_count(),
3853                inputs.len()
3854            )));
3855        }
3856
3857        let output_capacity = inputs.iter().map(Vec::len).sum();
3858        let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
3859        let outlet = self.shape.outlet().id();
3860        let mut executor = FusedExecutor::new(self, config);
3861        let mut output = Vec::with_capacity(output_capacity);
3862
3863        {
3864            let mut output_sink = VecOutputSink {
3865                output: &mut output,
3866            };
3867            for (index, queue) in queues.iter_mut().enumerate() {
3868                for item in queue.by_ref() {
3869                    executor.deliver(
3870                        self.shape.inlet(index)?.id(),
3871                        datum(item),
3872                        outlet,
3873                        &mut output_sink,
3874                    )?;
3875                }
3876                executor.complete(self.shape.inlet(index)?.id(), outlet, &mut output_sink)?;
3877            }
3878        }
3879
3880        Ok(FusedExecutionReport {
3881            output,
3882            events: executor.events,
3883            async_boundary_crossings: executor.async_boundary_crossings,
3884        })
3885    }
3886
3887    pub fn run_or_else(&self, primary: Vec<T>, secondary: Vec<T>) -> StreamResult<Vec<T>> {
3888        Ok(self
3889            .run_or_else_report(primary, secondary, FusedExecutionConfig::default())?
3890            .output)
3891    }
3892
3893    pub fn run_or_else_report(
3894        &self,
3895        primary: Vec<T>,
3896        secondary: Vec<T>,
3897        config: FusedExecutionConfig,
3898    ) -> StreamResult<FusedExecutionReport<T>> {
3899        if self.shape.inlet_count() != 2 {
3900            return Err(StreamError::GraphValidation(format!(
3901                "or-else helper expected 2 inlets, got {}",
3902                self.shape.inlet_count()
3903            )));
3904        }
3905
3906        let primary = primary.into_iter();
3907        let secondary = secondary.into_iter();
3908        let primary_inlet = self.shape.inlet(0)?.id();
3909        let secondary_inlet = self.shape.inlet(1)?.id();
3910        let outlet = self.shape.outlet().id();
3911        let mut executor = FusedExecutor::new(self, config);
3912        let mut output = Vec::new();
3913        let mut primary_emitted = false;
3914
3915        {
3916            let mut output_sink = VecOutputSink {
3917                output: &mut output,
3918            };
3919            for item in primary {
3920                primary_emitted = true;
3921                executor.deliver(primary_inlet, datum(item), outlet, &mut output_sink)?;
3922            }
3923            executor.complete(primary_inlet, outlet, &mut output_sink)?;
3924
3925            if !primary_emitted {
3926                for item in secondary {
3927                    executor.deliver(secondary_inlet, datum(item), outlet, &mut output_sink)?;
3928                }
3929            }
3930            executor.complete(secondary_inlet, outlet, &mut output_sink)?;
3931        }
3932
3933        Ok(FusedExecutionReport {
3934            output,
3935            events: executor.events,
3936            async_boundary_crossings: executor.async_boundary_crossings,
3937        })
3938    }
3939
3940    pub fn run_or_else_secondary_first(
3941        &self,
3942        primary: Vec<T>,
3943        secondary: Vec<T>,
3944    ) -> StreamResult<Vec<T>> {
3945        Ok(self
3946            .run_or_else_secondary_first_report(
3947                primary,
3948                secondary,
3949                FusedExecutionConfig::default(),
3950            )?
3951            .output)
3952    }
3953
3954    pub fn run_or_else_secondary_first_report(
3955        &self,
3956        primary: Vec<T>,
3957        secondary: Vec<T>,
3958        config: FusedExecutionConfig,
3959    ) -> StreamResult<FusedExecutionReport<T>> {
3960        if self.shape.inlet_count() != 2 {
3961            return Err(StreamError::GraphValidation(format!(
3962                "or-else helper expected 2 inlets, got {}",
3963                self.shape.inlet_count()
3964            )));
3965        }
3966
3967        let primary_inlet = self.shape.inlet(0)?.id();
3968        let secondary_inlet = self.shape.inlet(1)?.id();
3969        let outlet = self.shape.outlet().id();
3970        let mut executor = FusedExecutor::new(self, config);
3971        let mut output = Vec::new();
3972
3973        {
3974            let mut output_sink = VecOutputSink {
3975                output: &mut output,
3976            };
3977            for item in secondary {
3978                executor.deliver(secondary_inlet, datum(item), outlet, &mut output_sink)?;
3979            }
3980            for item in primary {
3981                executor.deliver(primary_inlet, datum(item), outlet, &mut output_sink)?;
3982            }
3983            executor.complete(primary_inlet, outlet, &mut output_sink)?;
3984            executor.complete(secondary_inlet, outlet, &mut output_sink)?;
3985        }
3986
3987        Ok(FusedExecutionReport {
3988            output,
3989            events: executor.events,
3990            async_boundary_crossings: executor.async_boundary_crossings,
3991        })
3992    }
3993
3994    pub fn run_or_else_secondary_closed_first(&self, secondary: Vec<T>) -> StreamResult<Vec<T>> {
3995        if self.shape.inlet_count() != 2 {
3996            return Err(StreamError::GraphValidation(format!(
3997                "or-else helper expected 2 inlets, got {}",
3998                self.shape.inlet_count()
3999            )));
4000        }
4001
4002        let primary_inlet = self.shape.inlet(0)?.id();
4003        let secondary_inlet = self.shape.inlet(1)?.id();
4004        let outlet = self.shape.outlet().id();
4005        let mut executor = FusedExecutor::new(self, FusedExecutionConfig::default());
4006        let mut output = Vec::new();
4007
4008        {
4009            let mut output_sink = VecOutputSink {
4010                output: &mut output,
4011            };
4012            for item in secondary {
4013                executor.deliver(secondary_inlet, datum(item), outlet, &mut output_sink)?;
4014            }
4015            executor.complete(secondary_inlet, outlet, &mut output_sink)?;
4016            executor.complete(primary_inlet, outlet, &mut output_sink)?;
4017        }
4018
4019        Ok(output)
4020    }
4021
4022    pub fn run_interleave(
4023        &self,
4024        inputs: Vec<Vec<T>>,
4025        segment_size: usize,
4026        eager_close: bool,
4027    ) -> StreamResult<Vec<T>> {
4028        Ok(self
4029            .run_interleave_report(
4030                inputs,
4031                segment_size,
4032                eager_close,
4033                FusedExecutionConfig::default(),
4034            )?
4035            .output)
4036    }
4037
4038    pub fn run_interleave_report(
4039        &self,
4040        inputs: Vec<Vec<T>>,
4041        segment_size: usize,
4042        eager_close: bool,
4043        config: FusedExecutionConfig,
4044    ) -> StreamResult<FusedExecutionReport<T>> {
4045        if inputs.len() != self.shape.inlet_count() {
4046            return Err(StreamError::GraphValidation(format!(
4047                "expected {} input streams, got {}",
4048                self.shape.inlet_count(),
4049                inputs.len()
4050            )));
4051        }
4052        if segment_size == 0 {
4053            return Err(StreamError::GraphValidation(
4054                "interleave segment size must be greater than zero".into(),
4055            ));
4056        }
4057
4058        let output_capacity = inputs.iter().map(Vec::len).sum();
4059        let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
4060        let mut completed = vec![false; queues.len()];
4061        let outlet = self.shape.outlet().id();
4062        let mut executor = FusedExecutor::new(self, config);
4063        let mut output = Vec::with_capacity(output_capacity);
4064
4065        {
4066            let mut output_sink = VecOutputSink {
4067                output: &mut output,
4068            };
4069            for (index, queue) in queues.iter().enumerate() {
4070                if queue.len() == 0 {
4071                    executor.complete(self.shape.inlet(index)?.id(), outlet, &mut output_sink)?;
4072                    completed[index] = true;
4073                    if eager_close {
4074                        return Ok(FusedExecutionReport {
4075                            output,
4076                            events: executor.events,
4077                            async_boundary_crossings: executor.async_boundary_crossings,
4078                        });
4079                    }
4080                }
4081            }
4082
4083            let mut current = 0usize;
4084            while completed.iter().any(|done| !done) {
4085                if completed[current] {
4086                    current = next_open_index(&completed, current).ok_or_else(|| {
4087                        StreamError::GraphValidation("no open interleave input".into())
4088                    })?;
4089                    continue;
4090                }
4091
4092                let mut emitted = 0usize;
4093                while emitted < segment_size {
4094                    match queues[current].next() {
4095                        Some(item) => {
4096                            executor.deliver(
4097                                self.shape.inlet(current)?.id(),
4098                                datum(item),
4099                                outlet,
4100                                &mut output_sink,
4101                            )?;
4102                            emitted += 1;
4103                        }
4104                        None => {
4105                            executor.complete(
4106                                self.shape.inlet(current)?.id(),
4107                                outlet,
4108                                &mut output_sink,
4109                            )?;
4110                            completed[current] = true;
4111                            if eager_close {
4112                                return Ok(FusedExecutionReport {
4113                                    output,
4114                                    events: executor.events,
4115                                    async_boundary_crossings: executor.async_boundary_crossings,
4116                                });
4117                            }
4118                            break;
4119                        }
4120                    }
4121                }
4122
4123                if completed.iter().all(|done| *done) {
4124                    break;
4125                }
4126                current = next_open_index(&completed, current).ok_or_else(|| {
4127                    StreamError::GraphValidation("no open interleave input".into())
4128                })?;
4129            }
4130        }
4131
4132        Ok(FusedExecutionReport {
4133            output,
4134            events: executor.events,
4135            async_boundary_crossings: executor.async_boundary_crossings,
4136        })
4137    }
4138}
4139
4140impl<T> GraphBlueprint<MergePreferredShape<T>>
4141where
4142    T: Clone + Send + 'static,
4143{
4144    pub fn run_merge_preferred(
4145        &self,
4146        preferred: Vec<T>,
4147        secondary_inputs: Vec<Vec<T>>,
4148    ) -> StreamResult<Vec<T>> {
4149        Ok(self
4150            .run_merge_preferred_report(
4151                preferred,
4152                secondary_inputs,
4153                FusedExecutionConfig::default(),
4154            )?
4155            .output)
4156    }
4157
4158    pub fn run_merge_preferred_report(
4159        &self,
4160        preferred: Vec<T>,
4161        secondary_inputs: Vec<Vec<T>>,
4162        config: FusedExecutionConfig,
4163    ) -> StreamResult<FusedExecutionReport<T>> {
4164        if secondary_inputs.len() != self.shape.secondary_count() {
4165            return Err(StreamError::GraphValidation(format!(
4166                "expected {} secondary input streams, got {}",
4167                self.shape.secondary_count(),
4168                secondary_inputs.len()
4169            )));
4170        }
4171
4172        let output_capacity =
4173            preferred.len() + secondary_inputs.iter().map(Vec::len).sum::<usize>();
4174        let mut preferred = preferred.into_iter();
4175        let mut secondary: Vec<_> = secondary_inputs.into_iter().map(Vec::into_iter).collect();
4176        let secondary_schedule = (0..secondary.len()).collect::<Vec<_>>();
4177        let mut secondary_index = 0;
4178        let outlet = self.shape.outlet().id();
4179        let preferred_inlet = self.shape.preferred().id();
4180        let mut executor = FusedExecutor::new(self, config);
4181        let mut output = Vec::with_capacity(output_capacity);
4182        let mut preferred_completed = false;
4183        let mut secondary_completed = vec![false; secondary.len()];
4184
4185        {
4186            let mut output_sink = VecOutputSink {
4187                output: &mut output,
4188            };
4189            if preferred.len() == 0 {
4190                executor.complete(preferred_inlet, outlet, &mut output_sink)?;
4191                preferred_completed = true;
4192            }
4193            for (index, queue) in secondary.iter().enumerate() {
4194                if queue.len() == 0 {
4195                    executor.complete(
4196                        self.shape.secondary(index)?.id(),
4197                        outlet,
4198                        &mut output_sink,
4199                    )?;
4200                    secondary_completed[index] = true;
4201                }
4202            }
4203            while preferred.len() > 0 || secondary.iter().any(|queue| queue.len() > 0) {
4204                if let Some(item) = preferred.next() {
4205                    executor.deliver(preferred_inlet, datum(item), outlet, &mut output_sink)?;
4206                    if preferred.len() == 0 && !preferred_completed {
4207                        executor.complete(preferred_inlet, outlet, &mut output_sink)?;
4208                        preferred_completed = true;
4209                    }
4210                    continue;
4211                }
4212
4213                let input_index =
4214                    next_scheduled_input(&secondary, &secondary_schedule, &mut secondary_index)
4215                        .ok_or_else(|| {
4216                            StreamError::GraphValidation("no runnable secondary input".into())
4217                        })?;
4218                let item = secondary[input_index]
4219                    .next()
4220                    .expect("scheduled secondary input had an item");
4221                executor.deliver(
4222                    self.shape.secondary(input_index)?.id(),
4223                    datum(item),
4224                    outlet,
4225                    &mut output_sink,
4226                )?;
4227                if secondary[input_index].len() == 0 && !secondary_completed[input_index] {
4228                    executor.complete(
4229                        self.shape.secondary(input_index)?.id(),
4230                        outlet,
4231                        &mut output_sink,
4232                    )?;
4233                    secondary_completed[input_index] = true;
4234                }
4235            }
4236        }
4237
4238        Ok(FusedExecutionReport {
4239            output,
4240            events: executor.events,
4241            async_boundary_crossings: executor.async_boundary_crossings,
4242        })
4243    }
4244}
4245
4246fn next_scheduled_input<I>(
4247    queues: &[I],
4248    schedule: &[usize],
4249    schedule_index: &mut usize,
4250) -> Option<usize>
4251where
4252    I: ExactSizeIterator,
4253{
4254    if schedule.is_empty() {
4255        return queues.iter().position(|queue| queue.len() > 0);
4256    }
4257
4258    for _ in 0..schedule.len() {
4259        let index = schedule[*schedule_index % schedule.len()];
4260        *schedule_index += 1;
4261        if queues.get(index).is_some_and(|queue| queue.len() > 0) {
4262            return Some(index);
4263        }
4264    }
4265
4266    queues.iter().position(|queue| queue.len() > 0)
4267}
4268
4269fn next_open_index(completed: &[bool], current: usize) -> Option<usize> {
4270    if completed.is_empty() {
4271        return None;
4272    }
4273    let len = completed.len();
4274    for offset in 1..=len {
4275        let index = (current + offset) % len;
4276        if !completed[index] {
4277            return Some(index);
4278        }
4279    }
4280    None
4281}
4282
4283// ---------------------------------------------------------------------------
4284// Output-sink plumbing — pub(crate) so Phase 1+ typed executors can reuse
4285// ---------------------------------------------------------------------------
4286
4287/// Typed output receiver used by the fused executor.
4288///
4289/// `pub(crate)` so the upcoming typed-port executor (Phase 1+) can reuse the
4290/// same sink abstractions without duplicating the erased-path infrastructure.
4291pub(crate) trait FusedOutputSink<Out> {
4292    fn emit(&mut self, value: Out) -> StreamResult<()>;
4293}
4294
4295pub(crate) struct VecOutputSink<'a, Out> {
4296    pub(crate) output: &'a mut Vec<Out>,
4297}
4298
4299impl<Out> FusedOutputSink<Out> for VecOutputSink<'_, Out> {
4300    fn emit(&mut self, value: Out) -> StreamResult<()> {
4301        self.output.push(value);
4302        Ok(())
4303    }
4304}
4305
4306pub(crate) struct CountOutputSink {
4307    pub(crate) count: usize,
4308}
4309
4310impl<Out> FusedOutputSink<Out> for CountOutputSink {
4311    fn emit(&mut self, _value: Out) -> StreamResult<()> {
4312        self.count += 1;
4313        Ok(())
4314    }
4315}
4316
4317pub(crate) struct FoldOutputSink<Acc, F> {
4318    pub(crate) accumulator: Option<Acc>,
4319    pub(crate) fold: F,
4320}
4321
4322impl<Out, Acc, F> FusedOutputSink<Out> for FoldOutputSink<Acc, F>
4323where
4324    F: FnMut(Acc, Out) -> Acc,
4325{
4326    fn emit(&mut self, value: Out) -> StreamResult<()> {
4327        let accumulator = self
4328            .accumulator
4329            .take()
4330            .expect("fold accumulator is present while executor is running");
4331        self.accumulator = Some((self.fold)(accumulator, value));
4332        Ok(())
4333    }
4334}
4335
4336impl<Acc, F> FoldOutputSink<Acc, F> {
4337    pub(crate) fn finish(mut self) -> Acc {
4338        self.accumulator
4339            .take()
4340            .expect("fold accumulator is present after executor finishes")
4341    }
4342}
4343
4344// ---------------------------------------------------------------------------
4345// Graph-index scaffolding — pub(crate) for Phase 1+ typed executor reuse
4346// ---------------------------------------------------------------------------
4347
4348/// Fused graph executor over erased (`Box<dyn DatumElement>`) values.
4349///
4350/// The struct itself and its edge/stage-lookup maps are `pub(crate)` so the
4351/// typed-port executor introduced in Phase 1+ can reuse the graph-index
4352/// infrastructure (edge maps, stage-index maps) without duplicating it.
4353/// The per-element hot path remains encapsulated — only the structural pieces
4354/// are exposed.
4355#[derive(Debug)]
4356pub(crate) struct FusedExecutor<'a, S: Shape> {
4357    graph: &'a GraphBlueprint<S>,
4358    /// Outlet → connected inlet (internal graph edge lookup).
4359    pub(crate) edge_by_outlet: HashMap<PortId, PortId>,
4360    /// Inlet → connected outlet (internal graph edge lookup).
4361    pub(crate) edge_by_inlet: HashMap<PortId, PortId>,
4362    /// Inlet port → owning stage index.
4363    pub(crate) stage_by_inlet: HashMap<PortId, usize>,
4364    /// Outlet port → owning stage index.
4365    pub(crate) stage_by_outlet: HashMap<PortId, usize>,
4366    stage_states: Vec<StageState>,
4367    pub(crate) opaque_logics: Vec<Option<GraphStageLogic>>,
4368    config: FusedExecutionConfig,
4369    pub(crate) events: usize,
4370    pub(crate) async_boundary_crossings: usize,
4371    cancelled_outlets: HashSet<PortId>,
4372}
4373
4374#[derive(Debug)]
4375enum StageState {
4376    Stateless,
4377    Broadcast {
4378        fast_path: Option<BroadcastZipFastPath>,
4379        cancelled_outlets: Vec<bool>,
4380        live_outlets: usize,
4381    },
4382    Balance {
4383        fast_path: Option<BalanceMergeFastPath>,
4384        next: usize,
4385        cancelled_outlets: Vec<bool>,
4386        live_outlets: usize,
4387    },
4388    Merge {
4389        open_inputs: usize,
4390        eager_complete: bool,
4391        completed: bool,
4392    },
4393    OrElse {
4394        primary_emitted: bool,
4395        buffer: VecDeque<DatumValue>,
4396        primary_closed: bool,
4397        secondary_closed: bool,
4398        completed: bool,
4399    },
4400    Zip {
4401        left_inlet: PortId,
4402        right_inlet: PortId,
4403        left: VecDeque<DatumValue>,
4404        right: VecDeque<DatumValue>,
4405        left_pending_complete: bool,
4406        right_pending_complete: bool,
4407        completed: bool,
4408    },
4409    Unzip {
4410        fast_path: Option<UnzipFanInFastPath>,
4411        zip_fast_path: Option<UnzipZipFastPath>,
4412        demand: [bool; 2],
4413        cancelled: [bool; 2],
4414        upstream_closed: bool,
4415    },
4416    MergeSorted {
4417        left: VecDeque<DatumValue>,
4418        right: VecDeque<DatumValue>,
4419        left_closed: bool,
4420        right_closed: bool,
4421        pending: VecDeque<DatumValue>,
4422        completed: bool,
4423    },
4424    MergeSequence {
4425        next_sequence: u64,
4426        pending: Vec<(u64, DatumValue)>,
4427        completed_count: usize,
4428        output_buffer: VecDeque<DatumValue>,
4429        completed: bool,
4430    },
4431    MergeLatest {
4432        latest: Vec<Option<DatumValue>>,
4433        seen_count: usize,
4434        completed_count: usize,
4435        pending: VecDeque<DatumValue>,
4436        completed: bool,
4437    },
4438    Partition {
4439        fast_path: Option<PartitionMergeFastPath>,
4440        pending: Option<(usize, DatumValue)>,
4441        upstream_closed: bool,
4442        demand: Vec<bool>,
4443        cancelled: Vec<bool>,
4444        output_count: usize,
4445        completed: bool,
4446    },
4447}
4448
4449#[derive(Clone, Copy, Debug)]
4450struct BroadcastZipFastPath {
4451    zip_stage_index: usize,
4452    left_uses_clone: bool,
4453}
4454
4455#[derive(Clone, Copy, Debug)]
4456struct BalanceMergeFastPath {
4457    merge_stage_index: usize,
4458}
4459
4460#[derive(Clone, Copy, Debug)]
4461struct UnzipFanInFastPath {
4462    fan_in_stage_index: usize,
4463    /// Which inlet indices of the fan-in stage the two Unzip outlets connect to.
4464    /// `target_inlet_indices[0]` is the index of the inlet connected to `out0`,
4465    /// and `target_inlet_indices[1]` is the index connected to `out1`.
4466    target_inlet_indices: [usize; 2],
4467}
4468
4469#[derive(Clone, Copy, Debug)]
4470struct UnzipZipFastPath {
4471    zip_stage_index: usize,
4472}
4473
4474#[derive(Clone, Copy, Debug)]
4475struct PartitionMergeFastPath {
4476    merge_stage_index: usize,
4477}
4478
4479enum StageEmissions {
4480    None,
4481    One(PortId, DatumValue),
4482    Two((PortId, DatumValue), (PortId, DatumValue)),
4483    Many(Vec<(PortId, DatumValue)>),
4484}
4485
4486struct StageTransition {
4487    emissions: StageEmissions,
4488    completed_outlets: Vec<PortId>,
4489    cancelled_inlets: Vec<PortId>,
4490}
4491
4492impl StageTransition {
4493    fn none() -> Self {
4494        Self {
4495            emissions: StageEmissions::None,
4496            completed_outlets: Vec::new(),
4497            cancelled_inlets: Vec::new(),
4498        }
4499    }
4500
4501    fn emit(emissions: StageEmissions) -> Self {
4502        Self {
4503            emissions,
4504            completed_outlets: Vec::new(),
4505            cancelled_inlets: Vec::new(),
4506        }
4507    }
4508
4509    fn with_completion(mut self, completed_outlets: Vec<PortId>) -> Self {
4510        self.completed_outlets = completed_outlets;
4511        self
4512    }
4513
4514    fn with_cancellations(mut self, cancelled_inlets: Vec<PortId>) -> Self {
4515        self.cancelled_inlets = cancelled_inlets;
4516        self
4517    }
4518}
4519
4520impl<'a, S: Shape> FusedExecutor<'a, S> {
4521    fn is_outlet_cancelled(&self, outlet: PortId) -> bool {
4522        !self.cancelled_outlets.is_empty() && self.cancelled_outlets.contains(&outlet)
4523    }
4524
4525    fn new(graph: &'a GraphBlueprint<S>, config: FusedExecutionConfig) -> Self {
4526        let edge_by_outlet = graph
4527            .edges
4528            .iter()
4529            .map(|edge| (edge.outlet, edge.inlet))
4530            .collect();
4531        let edge_by_inlet = graph
4532            .edges
4533            .iter()
4534            .map(|edge| (edge.inlet, edge.outlet))
4535            .collect();
4536        let mut stage_by_inlet = HashMap::new();
4537        let mut stage_by_outlet = HashMap::new();
4538        for (index, stage) in graph.stages.iter().enumerate() {
4539            for inlet in &stage.spec.inlets {
4540                stage_by_inlet.insert(inlet.id(), index);
4541            }
4542            for outlet in &stage.spec.outlets {
4543                stage_by_outlet.insert(outlet.id(), index);
4544            }
4545        }
4546
4547        let opaque_logics: Vec<_> = graph
4548            .stages
4549            .iter()
4550            .map(|stage| stage.logic_factory.as_ref().map(|factory| factory()))
4551            .collect();
4552
4553        let stage_states = graph
4554            .stages
4555            .iter()
4556            .map(|stage| match stage.spec.kind {
4557                StageKind::Broadcast => StageState::Broadcast {
4558                    fast_path: broadcast_zip_fast_path(
4559                        stage,
4560                        graph,
4561                        &edge_by_outlet,
4562                        &stage_by_inlet,
4563                    ),
4564                    cancelled_outlets: vec![false; stage.spec.outlets.len()],
4565                    live_outlets: stage.spec.outlets.len(),
4566                },
4567                StageKind::Balance => StageState::Balance {
4568                    fast_path: balance_merge_fast_path(
4569                        stage,
4570                        graph,
4571                        &edge_by_outlet,
4572                        &stage_by_inlet,
4573                    ),
4574                    next: 0,
4575                    cancelled_outlets: vec![false; stage.spec.outlets.len()],
4576                    live_outlets: stage.spec.outlets.len(),
4577                },
4578                StageKind::Merge => StageState::Merge {
4579                    open_inputs: stage.spec.inlets.len(),
4580                    eager_complete: false,
4581                    completed: false,
4582                },
4583                StageKind::MergePreferred => StageState::Merge {
4584                    open_inputs: stage.spec.inlets.len(),
4585                    eager_complete: false,
4586                    completed: false,
4587                },
4588                StageKind::MergePrioritized { .. } => StageState::Merge {
4589                    open_inputs: stage.spec.inlets.len(),
4590                    eager_complete: false,
4591                    completed: false,
4592                },
4593                StageKind::Concat => StageState::Merge {
4594                    open_inputs: stage.spec.inlets.len(),
4595                    eager_complete: false,
4596                    completed: false,
4597                },
4598                StageKind::Interleave { eager_close, .. } => StageState::Merge {
4599                    open_inputs: stage.spec.inlets.len(),
4600                    eager_complete: eager_close,
4601                    completed: false,
4602                },
4603                StageKind::OrElse { primary_inlet: _ } => StageState::OrElse {
4604                    primary_emitted: false,
4605                    buffer: VecDeque::new(),
4606                    primary_closed: false,
4607                    secondary_closed: false,
4608                    completed: false,
4609                },
4610                StageKind::Zip(_) => {
4611                    if let [left, right] = stage.spec.inlets.as_slice() {
4612                        StageState::Zip {
4613                            left_inlet: left.id(),
4614                            right_inlet: right.id(),
4615                            left: VecDeque::new(),
4616                            right: VecDeque::new(),
4617                            left_pending_complete: false,
4618                            right_pending_complete: false,
4619                            completed: false,
4620                        }
4621                    } else {
4622                        StageState::Stateless
4623                    }
4624                }
4625                StageKind::Unzip { .. } => StageState::Unzip {
4626                    fast_path: unzip_fan_in_fast_path(
4627                        stage,
4628                        graph,
4629                        &edge_by_outlet,
4630                        &stage_by_inlet,
4631                    ),
4632                    zip_fast_path: unzip_zip_fast_path(
4633                        stage,
4634                        graph,
4635                        &edge_by_outlet,
4636                        &stage_by_inlet,
4637                    ),
4638                    demand: [false; 2],
4639                    cancelled: [false; 2],
4640                    upstream_closed: false,
4641                },
4642                StageKind::MergeSorted(_) => StageState::MergeSorted {
4643                    left: VecDeque::new(),
4644                    right: VecDeque::new(),
4645                    left_closed: false,
4646                    right_closed: false,
4647                    pending: VecDeque::new(),
4648                    completed: false,
4649                },
4650                StageKind::MergeSequence { .. } => StageState::MergeSequence {
4651                    next_sequence: 0,
4652                    pending: Vec::new(),
4653                    completed_count: 0,
4654                    output_buffer: VecDeque::new(),
4655                    completed: false,
4656                },
4657                StageKind::MergeLatest { input_count, .. } => {
4658                    let mut latest = Vec::with_capacity(input_count);
4659                    for _ in 0..input_count {
4660                        latest.push(None);
4661                    }
4662                    StageState::MergeLatest {
4663                        latest,
4664                        seen_count: 0,
4665                        completed_count: 0,
4666                        pending: VecDeque::new(),
4667                        completed: false,
4668                    }
4669                }
4670                StageKind::Partition { output_count, .. } => StageState::Partition {
4671                    fast_path: partition_merge_fast_path(
4672                        stage,
4673                        graph,
4674                        &edge_by_outlet,
4675                        &stage_by_inlet,
4676                    ),
4677                    pending: None,
4678                    upstream_closed: false,
4679                    demand: vec![false; output_count],
4680                    cancelled: vec![false; output_count],
4681                    output_count,
4682                    completed: false,
4683                },
4684                _ => StageState::Stateless,
4685            })
4686            .collect();
4687
4688        let mut executor = Self {
4689            graph,
4690            edge_by_outlet,
4691            edge_by_inlet,
4692            stage_by_inlet,
4693            stage_by_outlet,
4694            stage_states,
4695            opaque_logics,
4696            config,
4697            events: 0,
4698            async_boundary_crossings: 0,
4699            cancelled_outlets: HashSet::new(),
4700        };
4701        executor.prime_connected_demands();
4702        executor
4703    }
4704
4705    fn deliver<Out>(
4706        &mut self,
4707        inlet: PortId,
4708        value: DatumValue,
4709        graph_outlet: PortId,
4710        output: &mut impl FusedOutputSink<Out>,
4711    ) -> StreamResult<()>
4712    where
4713        Out: Send + 'static,
4714    {
4715        self.bump_event()?;
4716        let stage_index = *self.stage_by_inlet.get(&inlet).ok_or_else(|| {
4717            StreamError::GraphValidation(format!("inlet {} has no owning stage", inlet.as_usize()))
4718        })?;
4719
4720        let transition = self.process_stage(stage_index, inlet, value)?;
4721        self.emit_all(transition.emissions, graph_outlet, output)?;
4722        self.complete_all(transition.completed_outlets, graph_outlet, output)?;
4723        self.cancel_all(transition.cancelled_inlets, graph_outlet, output)
4724    }
4725
4726    fn complete<Out>(
4727        &mut self,
4728        inlet: PortId,
4729        graph_outlet: PortId,
4730        output: &mut impl FusedOutputSink<Out>,
4731    ) -> StreamResult<()>
4732    where
4733        Out: Send + 'static,
4734    {
4735        self.bump_event()?;
4736        let stage_index = *self.stage_by_inlet.get(&inlet).ok_or_else(|| {
4737            StreamError::GraphValidation(format!("inlet {} has no owning stage", inlet.as_usize()))
4738        })?;
4739        let transition = self.process_completion(stage_index, inlet)?;
4740        self.emit_all(transition.emissions, graph_outlet, output)?;
4741        self.complete_all(transition.completed_outlets, graph_outlet, output)?;
4742        self.cancel_all(transition.cancelled_inlets, graph_outlet, output)
4743    }
4744
4745    fn request<Out>(
4746        &mut self,
4747        outlet: PortId,
4748        graph_outlet: PortId,
4749        output: &mut impl FusedOutputSink<Out>,
4750    ) -> StreamResult<()>
4751    where
4752        Out: Send + 'static,
4753    {
4754        if self.is_outlet_cancelled(outlet) {
4755            return Ok(());
4756        }
4757        self.bump_event()?;
4758        let Some(stage_index) = self.stage_by_outlet.get(&outlet).copied() else {
4759            return Ok(());
4760        };
4761        let transition = self.process_pull(stage_index, outlet)?;
4762        self.emit_all(transition.emissions, graph_outlet, output)?;
4763        self.complete_all(transition.completed_outlets, graph_outlet, output)?;
4764        self.cancel_all(transition.cancelled_inlets, graph_outlet, output)
4765    }
4766
4767    fn downstream_finish<Out>(
4768        &mut self,
4769        outlet: PortId,
4770        graph_outlet: PortId,
4771        output: &mut impl FusedOutputSink<Out>,
4772    ) -> StreamResult<()>
4773    where
4774        Out: Send + 'static,
4775    {
4776        if !self.cancelled_outlets.insert(outlet) {
4777            return Ok(());
4778        }
4779        self.bump_event()?;
4780        let stage_index = *self.stage_by_outlet.get(&outlet).ok_or_else(|| {
4781            StreamError::GraphValidation(format!(
4782                "outlet {} has no owning stage",
4783                outlet.as_usize()
4784            ))
4785        })?;
4786        let transition = self.process_downstream_finish(stage_index, outlet)?;
4787        self.emit_all(transition.emissions, graph_outlet, output)?;
4788        self.complete_all(transition.completed_outlets, graph_outlet, output)?;
4789        self.cancel_all(transition.cancelled_inlets, graph_outlet, output)
4790    }
4791
4792    fn emit_all<Out>(
4793        &mut self,
4794        emitted: StageEmissions,
4795        graph_outlet: PortId,
4796        output: &mut impl FusedOutputSink<Out>,
4797    ) -> StreamResult<()>
4798    where
4799        Out: Send + 'static,
4800    {
4801        match emitted {
4802            StageEmissions::None => Ok(()),
4803            StageEmissions::One(outlet, value) => self.emit(outlet, value, graph_outlet, output),
4804            StageEmissions::Two(first, second) => {
4805                self.emit(first.0, first.1, graph_outlet, output)?;
4806                self.emit(second.0, second.1, graph_outlet, output)
4807            }
4808            StageEmissions::Many(emitted) => {
4809                for (outlet, value) in emitted {
4810                    self.emit(outlet, value, graph_outlet, output)?;
4811                }
4812                Ok(())
4813            }
4814        }
4815    }
4816
4817    fn complete_all<Out>(
4818        &mut self,
4819        completed_outlets: Vec<PortId>,
4820        graph_outlet: PortId,
4821        output: &mut impl FusedOutputSink<Out>,
4822    ) -> StreamResult<()>
4823    where
4824        Out: Send + 'static,
4825    {
4826        for outlet in completed_outlets {
4827            if self.is_outlet_cancelled(outlet) {
4828                continue;
4829            }
4830            self.complete_outlet(outlet, graph_outlet, output)?;
4831        }
4832        Ok(())
4833    }
4834
4835    fn cancel_all<Out>(
4836        &mut self,
4837        cancelled_inlets: Vec<PortId>,
4838        graph_outlet: PortId,
4839        output: &mut impl FusedOutputSink<Out>,
4840    ) -> StreamResult<()>
4841    where
4842        Out: Send + 'static,
4843    {
4844        for inlet in cancelled_inlets {
4845            if let Some(outlet) = self.edge_by_inlet.get(&inlet).copied() {
4846                self.downstream_finish(outlet, graph_outlet, output)?;
4847            }
4848        }
4849        Ok(())
4850    }
4851
4852    fn emit<Out>(
4853        &mut self,
4854        outlet: PortId,
4855        value: DatumValue,
4856        graph_outlet: PortId,
4857        output: &mut impl FusedOutputSink<Out>,
4858    ) -> StreamResult<()>
4859    where
4860        Out: Send + 'static,
4861    {
4862        self.bump_event()?;
4863        if self.is_outlet_cancelled(outlet) {
4864            return Ok(());
4865        }
4866        if outlet == graph_outlet {
4867            output.emit(downcast_datum(value, "emit", || {
4868                format!("outlet#{}", outlet.as_usize())
4869            })?)?;
4870            self.request(outlet, graph_outlet, output)?;
4871            return Ok(());
4872        }
4873
4874        let Some(inlet) = self.edge_by_outlet.get(&outlet).copied() else {
4875            return Err(StreamError::GraphValidation(format!(
4876                "outlet {} is neither connected nor graph output",
4877                outlet.as_usize()
4878            )));
4879        };
4880        self.deliver(inlet, value, graph_outlet, output)?;
4881        if self
4882            .stage_by_outlet
4883            .get(&outlet)
4884            .copied()
4885            .is_some_and(|stage_index| {
4886                matches!(
4887                    self.graph.stages[stage_index].spec.kind,
4888                    StageKind::Opaque | StageKind::Unzip { .. } | StageKind::Partition { .. }
4889                )
4890            })
4891        {
4892            self.request(outlet, graph_outlet, output)?;
4893        }
4894        Ok(())
4895    }
4896
4897    fn complete_outlet<Out>(
4898        &mut self,
4899        outlet: PortId,
4900        graph_outlet: PortId,
4901        output: &mut impl FusedOutputSink<Out>,
4902    ) -> StreamResult<()>
4903    where
4904        Out: Send + 'static,
4905    {
4906        self.bump_event()?;
4907        if outlet == graph_outlet {
4908            return Ok(());
4909        }
4910        let Some(inlet) = self.edge_by_outlet.get(&outlet).copied() else {
4911            return Err(StreamError::GraphValidation(format!(
4912                "outlet {} is neither connected nor graph output",
4913                outlet.as_usize()
4914            )));
4915        };
4916        self.complete(inlet, graph_outlet, output)
4917    }
4918
4919    fn process_stage(
4920        &mut self,
4921        stage_index: usize,
4922        inlet: PortId,
4923        value: DatumValue,
4924    ) -> StreamResult<StageTransition> {
4925        let stage = &self.graph.stages[stage_index];
4926        match &stage.spec.kind {
4927            StageKind::Identity => Ok(StageTransition::emit(StageEmissions::One(
4928                single_outlet(stage)?,
4929                value,
4930            ))),
4931            StageKind::Opaque => {
4932                if let Some(logic) = self
4933                    .opaque_logics
4934                    .get_mut(stage_index)
4935                    .and_then(|l| l.as_mut())
4936                {
4937                    logic.drain_async_callbacks();
4938                    let inlet_ref = stage.spec.inlets.iter().find(|i| i.id() == inlet).cloned();
4939                    if let Some(inlet_ref) = inlet_ref {
4940                        logic.offer_datum(inlet, value)?;
4941                        let mut handler = logic.take_in_handler(inlet);
4942                        let result = if let Some(ref mut handler) = handler {
4943                            let inlet_any = inlet_ref;
4944                            handler.on_push(logic, inlet_any)
4945                        } else {
4946                            Ok(())
4947                        };
4948                        if let Some(handler) = handler {
4949                            logic.restore_in_handler(inlet, handler);
4950                        }
4951                        result?;
4952                    }
4953                    self.collect_opaque_emissions(stage, stage_index)
4954                } else {
4955                    Ok(StageTransition::emit(StageEmissions::One(
4956                        single_outlet(stage)?,
4957                        value,
4958                    )))
4959                }
4960            }
4961            StageKind::Map(map) => Ok(StageTransition::emit(StageEmissions::One(
4962                single_outlet(stage)?,
4963                (map.erased)(value)?,
4964            ))),
4965            StageKind::AsyncBoundary => {
4966                self.async_boundary_crossings += 1;
4967                Ok(StageTransition::emit(StageEmissions::One(
4968                    single_outlet(stage)?,
4969                    value,
4970                )))
4971            }
4972            StageKind::Broadcast => {
4973                let fast_path = match &self.stage_states[stage_index] {
4974                    StageState::Broadcast { fast_path, .. } => *fast_path,
4975                    _ => None,
4976                };
4977                if let Some(fast_path) = fast_path {
4978                    self.broadcast_zip_emissions(fast_path, value)
4979                        .map(StageTransition::emit)
4980                } else {
4981                    broadcast_emissions(&stage.spec.outlets, value).map(StageTransition::emit)
4982                }
4983            }
4984            StageKind::Balance => {
4985                let outlets = &stage.spec.outlets;
4986                if outlets.is_empty() {
4987                    return Err(StreamError::GraphValidation(
4988                        "balance has no outlets".into(),
4989                    ));
4990                }
4991                let StageState::Balance {
4992                    fast_path,
4993                    next,
4994                    cancelled_outlets,
4995                    live_outlets,
4996                } = &mut self.stage_states[stage_index]
4997                else {
4998                    return Err(StreamError::GraphValidation(
4999                        "balance state is missing".into(),
5000                    ));
5001                };
5002                if *live_outlets == 0 {
5003                    return Ok(StageTransition::none());
5004                }
5005                if let Some(fast_path) = *fast_path {
5006                    return self
5007                        .balance_merge_emissions(fast_path, value)
5008                        .map(StageTransition::emit);
5009                }
5010                let mut selected = None;
5011                for offset in 0..outlets.len() {
5012                    let index = (*next + offset) % outlets.len();
5013                    if !cancelled_outlets[index] {
5014                        selected = Some(index);
5015                        break;
5016                    }
5017                }
5018                let index = selected.ok_or_else(|| {
5019                    StreamError::GraphValidation("balance has no live outlets".into())
5020                })?;
5021                let outlet = outlets[index].id();
5022                *next = (index + 1) % outlets.len();
5023                Ok(StageTransition::emit(StageEmissions::One(outlet, value)))
5024            }
5025            StageKind::Merge | StageKind::MergePreferred | StageKind::MergePrioritized { .. } => {
5026                let StageState::Merge { completed, .. } = &self.stage_states[stage_index] else {
5027                    return Err(StreamError::GraphValidation(
5028                        "merge state is missing".into(),
5029                    ));
5030                };
5031                if *completed {
5032                    return Ok(StageTransition::none());
5033                }
5034                Ok(StageTransition::emit(StageEmissions::One(
5035                    single_outlet(stage)?,
5036                    value,
5037                )))
5038            }
5039            StageKind::Concat | StageKind::Interleave { .. } => {
5040                let StageState::Merge { completed, .. } = &self.stage_states[stage_index] else {
5041                    return Err(StreamError::GraphValidation(
5042                        "fan-in state is missing".into(),
5043                    ));
5044                };
5045                if *completed {
5046                    return Ok(StageTransition::none());
5047                }
5048                Ok(StageTransition::emit(StageEmissions::One(
5049                    single_outlet(stage)?,
5050                    value,
5051                )))
5052            }
5053            StageKind::OrElse { primary_inlet } => {
5054                let StageState::OrElse {
5055                    primary_emitted,
5056                    buffer,
5057                    primary_closed,
5058                    completed,
5059                    ..
5060                } = &mut self.stage_states[stage_index]
5061                else {
5062                    return Err(StreamError::GraphValidation(
5063                        "or-else state is missing".into(),
5064                    ));
5065                };
5066                if *completed {
5067                    return Ok(StageTransition::none());
5068                }
5069                if inlet == *primary_inlet {
5070                    *primary_emitted = true;
5071                    buffer.clear();
5072                    Ok(StageTransition::emit(StageEmissions::One(
5073                        single_outlet(stage)?,
5074                        value,
5075                    )))
5076                } else if *primary_emitted {
5077                    Ok(StageTransition::none())
5078                } else if *primary_closed {
5079                    Ok(StageTransition::emit(StageEmissions::One(
5080                        single_outlet(stage)?,
5081                        value,
5082                    )))
5083                } else {
5084                    buffer.push_back(value);
5085                    Ok(StageTransition::none())
5086                }
5087            }
5088            StageKind::Zip(zip) => {
5089                if stage.spec.inlets.len() != 2 {
5090                    return Err(StreamError::GraphValidation(format!(
5091                        "zip stage {} expected 2 inlets, got {}",
5092                        stage.spec.name(),
5093                        stage.spec.inlets.len()
5094                    )));
5095                }
5096
5097                let ready = {
5098                    let StageState::Zip {
5099                        left_inlet,
5100                        right_inlet,
5101                        left,
5102                        right,
5103                        left_pending_complete,
5104                        right_pending_complete,
5105                        completed,
5106                    } = &mut self.stage_states[stage_index]
5107                    else {
5108                        return Err(StreamError::GraphValidation("zip state is missing".into()));
5109                    };
5110
5111                    if *completed {
5112                        return Ok(StageTransition::none());
5113                    }
5114
5115                    // Bounded per-inlet buffering: an inlet may receive several
5116                    // elements before its pair arrives (e.g. an asymmetric upstream
5117                    // topology), so queue them and pair in arrival order.
5118                    if inlet == *left_inlet {
5119                        left.push_back(value);
5120                    } else if inlet == *right_inlet {
5121                        right.push_back(value);
5122                    } else {
5123                        return Err(StreamError::GraphValidation(format!(
5124                            "zip inlet {} is not part of the stage",
5125                            inlet.as_usize()
5126                        )));
5127                    }
5128
5129                    match (left.front().is_some(), right.front().is_some()) {
5130                        (true, true) => {
5131                            let left_item =
5132                                left.pop_front().expect("zip left buffer had an element");
5133                            let right_item =
5134                                right.pop_front().expect("zip right buffer had an element");
5135                            let should_complete = (*left_pending_complete && left.is_empty())
5136                                || (*right_pending_complete && right.is_empty());
5137                            Some((left_item, right_item, should_complete))
5138                        }
5139                        _ => None,
5140                    }
5141                };
5142
5143                if let Some((left, right, should_complete)) = ready {
5144                    let outlet = single_outlet(stage)?;
5145                    if should_complete {
5146                        let StageState::Zip { completed, .. } = &mut self.stage_states[stage_index]
5147                        else {
5148                            return Err(StreamError::GraphValidation(
5149                                "zip state is missing".into(),
5150                            ));
5151                        };
5152                        *completed = true;
5153                    }
5154                    let mut transition =
5155                        StageTransition::emit(StageEmissions::One(outlet, zip(left, right)?));
5156                    if should_complete {
5157                        transition.completed_outlets.push(outlet);
5158                    }
5159                    Ok(transition)
5160                } else {
5161                    Ok(StageTransition::none())
5162                }
5163            }
5164            StageKind::Unzip { split, .. } => {
5165                let (fan_in, zip_fast) = match &self.stage_states[stage_index] {
5166                    StageState::Unzip {
5167                        fast_path,
5168                        zip_fast_path,
5169                        ..
5170                    } => (*fast_path, *zip_fast_path),
5171                    _ => (None, None),
5172                };
5173                if let Some(zip_fast) = zip_fast {
5174                    let (out0_val, out1_val) = split(value);
5175                    let zip_stage = &self.graph.stages[zip_fast.zip_stage_index];
5176                    let StageKind::Zip(zip) = &zip_stage.spec.kind else {
5177                        return Err(StreamError::GraphValidation(
5178                            "unzip-zip fast path references a non-zip stage".into(),
5179                        ));
5180                    };
5181                    let outlet = single_outlet(zip_stage)?;
5182                    let zipped = zip(out0_val, out1_val)?;
5183                    return Ok(StageTransition::emit(StageEmissions::One(outlet, zipped)));
5184                }
5185                if let Some(fast_path) = fan_in {
5186                    let (out0_val, out1_val) = split(value);
5187                    let target = &self.graph.stages[fast_path.fan_in_stage_index];
5188                    match &target.spec.kind {
5189                        StageKind::MergeSorted(compare) => {
5190                            let result = {
5191                                let StageState::MergeSorted {
5192                                    left,
5193                                    right,
5194                                    left_closed,
5195                                    right_closed,
5196                                    pending,
5197                                    completed,
5198                                } = &mut self.stage_states[fast_path.fan_in_stage_index]
5199                                else {
5200                                    return Err(StreamError::GraphValidation(
5201                                        "merge-sorted state is missing".into(),
5202                                    ));
5203                                };
5204                                if *completed {
5205                                    return Ok(StageTransition::none());
5206                                }
5207                                // Route each Unzip output to the correct left/right queue
5208                                // based on which inlet index each outlet connects to.
5209                                if fast_path.target_inlet_indices[0] == 0 {
5210                                    left.push_back(out0_val);
5211                                    right.push_back(out1_val);
5212                                } else {
5213                                    left.push_back(out1_val);
5214                                    right.push_back(out0_val);
5215                                }
5216
5217                                loop {
5218                                    let next = match (left.front(), right.front()) {
5219                                        (Some(l), Some(r)) => {
5220                                            if compare(l, r) != std::cmp::Ordering::Greater {
5221                                                left.pop_front()
5222                                            } else {
5223                                                right.pop_front()
5224                                            }
5225                                        }
5226                                        (Some(_), None) if *right_closed => left.pop_front(),
5227                                        (None, Some(_)) if *left_closed => right.pop_front(),
5228                                        _ => break,
5229                                    };
5230                                    if let Some(val) = next {
5231                                        pending.push_back(val);
5232                                    } else {
5233                                        break;
5234                                    }
5235                                }
5236
5237                                if let Some(output) = pending.pop_front() {
5238                                    let outlet = single_outlet(target)?;
5239                                    let all_done = *left_closed
5240                                        && *right_closed
5241                                        && left.is_empty()
5242                                        && right.is_empty()
5243                                        && pending.is_empty();
5244                                    if all_done {
5245                                        *completed = true;
5246                                        StageTransition::emit(StageEmissions::One(outlet, output))
5247                                            .with_completion(vec![outlet])
5248                                    } else {
5249                                        StageTransition::emit(StageEmissions::One(outlet, output))
5250                                    }
5251                                } else {
5252                                    StageTransition::none()
5253                                }
5254                            };
5255                            Ok(result)
5256                        }
5257                        StageKind::MergeSequence {
5258                            extract_sequence,
5259                            input_count,
5260                            ..
5261                        } => {
5262                            let result = {
5263                                let StageState::MergeSequence {
5264                                    next_sequence,
5265                                    pending,
5266                                    completed_count,
5267                                    output_buffer,
5268                                    completed,
5269                                } = &mut self.stage_states[fast_path.fan_in_stage_index]
5270                                else {
5271                                    return Err(StreamError::GraphValidation(
5272                                        "merge-sequence state is missing".into(),
5273                                    ));
5274                                };
5275                                if *completed {
5276                                    return Ok(StageTransition::none());
5277                                }
5278                                for val in [out0_val, out1_val] {
5279                                    let seq = extract_sequence(&val);
5280                                    if seq == *next_sequence {
5281                                        output_buffer.push_back(val);
5282                                        *next_sequence += 1;
5283                                        while let Some(index) =
5284                                            pending.iter().position(|(s, _)| *s == *next_sequence)
5285                                        {
5286                                            let (_, item) = pending.remove(index);
5287                                            output_buffer.push_back(item);
5288                                            *next_sequence += 1;
5289                                        }
5290                                    } else {
5291                                        if pending.iter().any(|(s, _)| *s == seq) {
5292                                            return Err(StreamError::Failed(format!(
5293                                                "duplicate sequence {seq} on merge sequence"
5294                                            )));
5295                                        }
5296                                        pending.push((seq, val));
5297                                        pending.sort_by_key(|(s, _)| *s);
5298                                        while let Some(index) =
5299                                            pending.iter().position(|(s, _)| *s == *next_sequence)
5300                                        {
5301                                            let (_, item) = pending.remove(index);
5302                                            output_buffer.push_back(item);
5303                                            *next_sequence += 1;
5304                                        }
5305                                    }
5306                                }
5307
5308                                if !output_buffer.is_empty() {
5309                                    let outlet = single_outlet(target)?;
5310                                    let all_done = *completed_count >= *input_count;
5311                                    let emissions: Vec<_> =
5312                                        output_buffer.drain(..).map(|v| (outlet, v)).collect();
5313                                    if all_done {
5314                                        *completed = true;
5315                                        StageTransition::emit(StageEmissions::Many(emissions))
5316                                            .with_completion(vec![outlet])
5317                                    } else {
5318                                        StageTransition::emit(StageEmissions::Many(emissions))
5319                                    }
5320                                } else {
5321                                    StageTransition::none()
5322                                }
5323                            };
5324                            Ok(result)
5325                        }
5326                        StageKind::MergeLatest {
5327                            input_count,
5328                            build_snapshot,
5329                            ..
5330                        } => {
5331                            let result = {
5332                                let StageState::MergeLatest {
5333                                    latest,
5334                                    seen_count,
5335                                    completed_count,
5336                                    pending,
5337                                    completed,
5338                                } = &mut self.stage_states[fast_path.fan_in_stage_index]
5339                                else {
5340                                    return Err(StreamError::GraphValidation(
5341                                        "merge-latest state is missing".into(),
5342                                    ));
5343                                };
5344                                if *completed {
5345                                    return Ok(StageTransition::none());
5346                                }
5347                                let inlets = &target.spec.inlets;
5348                                for (idx, val) in fast_path
5349                                    .target_inlet_indices
5350                                    .into_iter()
5351                                    .zip([out0_val, out1_val])
5352                                {
5353                                    if idx < inlets.len() && latest[idx].is_none() {
5354                                        *seen_count += 1;
5355                                    }
5356                                    latest[idx] = Some(val);
5357                                    if *seen_count >= *input_count {
5358                                        let values: Vec<&DatumValue> =
5359                                            latest.iter().filter_map(|v| v.as_ref()).collect();
5360                                        let snapshot = build_snapshot(&values);
5361                                        pending.push_back(snapshot);
5362                                    }
5363                                }
5364
5365                                if !pending.is_empty() {
5366                                    let outlet = single_outlet(target)?;
5367                                    let all_done = *completed_count >= *input_count;
5368                                    let emissions: Vec<_> =
5369                                        pending.drain(..).map(|v| (outlet, v)).collect();
5370                                    if all_done {
5371                                        *completed = true;
5372                                        StageTransition::emit(StageEmissions::Many(emissions))
5373                                            .with_completion(vec![outlet])
5374                                    } else {
5375                                        StageTransition::emit(StageEmissions::Many(emissions))
5376                                    }
5377                                } else {
5378                                    StageTransition::none()
5379                                }
5380                            };
5381                            Ok(result)
5382                        }
5383                        _ => {
5384                            let transition = {
5385                                let StageState::Unzip { cancelled, .. } =
5386                                    &self.stage_states[stage_index]
5387                                else {
5388                                    return Err(StreamError::GraphValidation(
5389                                        "unzip state is missing".into(),
5390                                    ));
5391                                };
5392                                let out0 = stage.spec.outlets.first().map(AnyOutlet::id);
5393                                let out1 = stage.spec.outlets.get(1).map(AnyOutlet::id);
5394                                let live0 = out0.is_some() && !cancelled[0];
5395                                let live1 = out1.is_some() && !cancelled[1];
5396                                if live0 && live1 {
5397                                    StageTransition::emit(StageEmissions::Two(
5398                                        (out0.unwrap(), out0_val),
5399                                        (out1.unwrap(), out1_val),
5400                                    ))
5401                                } else if live0 {
5402                                    StageTransition::emit(StageEmissions::One(
5403                                        out0.unwrap(),
5404                                        out0_val,
5405                                    ))
5406                                } else if live1 {
5407                                    StageTransition::emit(StageEmissions::One(
5408                                        out1.unwrap(),
5409                                        out1_val,
5410                                    ))
5411                                } else {
5412                                    StageTransition::none()
5413                                }
5414                            };
5415                            Ok(transition)
5416                        }
5417                    }
5418                } else {
5419                    let transition = {
5420                        let StageState::Unzip { cancelled, .. } = &self.stage_states[stage_index]
5421                        else {
5422                            return Err(StreamError::GraphValidation(
5423                                "unzip state is missing".into(),
5424                            ));
5425                        };
5426                        let (out0_val, out1_val) = split(value);
5427                        let out0 = stage.spec.outlets.first().map(AnyOutlet::id);
5428                        let out1 = stage.spec.outlets.get(1).map(AnyOutlet::id);
5429                        let live0 = out0.is_some() && !cancelled[0];
5430                        let live1 = out1.is_some() && !cancelled[1];
5431                        if live0 && live1 {
5432                            StageTransition::emit(StageEmissions::Two(
5433                                (out0.unwrap(), out0_val),
5434                                (out1.unwrap(), out1_val),
5435                            ))
5436                        } else if live0 {
5437                            StageTransition::emit(StageEmissions::One(out0.unwrap(), out0_val))
5438                        } else if live1 {
5439                            StageTransition::emit(StageEmissions::One(out1.unwrap(), out1_val))
5440                        } else {
5441                            StageTransition::none()
5442                        }
5443                    };
5444                    Ok(transition)
5445                }
5446            }
5447            StageKind::MergeSorted(compare) => {
5448                let result = {
5449                    let StageState::MergeSorted {
5450                        left,
5451                        right,
5452                        left_closed,
5453                        right_closed,
5454                        pending,
5455                        completed,
5456                    } = &mut self.stage_states[stage_index]
5457                    else {
5458                        return Err(StreamError::GraphValidation(
5459                            "merge-sorted state is missing".into(),
5460                        ));
5461                    };
5462                    if *completed {
5463                        return Ok(StageTransition::none());
5464                    }
5465                    let is_left = stage.spec.inlets.first().is_some_and(|i| i.id() == inlet);
5466                    if is_left {
5467                        left.push_back(value);
5468                    } else {
5469                        right.push_back(value);
5470                    }
5471
5472                    loop {
5473                        let next = match (left.front(), right.front()) {
5474                            (Some(l), Some(r)) => {
5475                                if compare(l, r) != std::cmp::Ordering::Greater {
5476                                    left.pop_front()
5477                                } else {
5478                                    right.pop_front()
5479                                }
5480                            }
5481                            (Some(_), None) if *right_closed => left.pop_front(),
5482                            (None, Some(_)) if *left_closed => right.pop_front(),
5483                            _ => break,
5484                        };
5485                        if let Some(val) = next {
5486                            pending.push_back(val);
5487                        } else {
5488                            break;
5489                        }
5490                    }
5491
5492                    if let Some(output) = pending.pop_front() {
5493                        let outlet = single_outlet(stage)?;
5494                        let all_done = *left_closed
5495                            && *right_closed
5496                            && left.is_empty()
5497                            && right.is_empty()
5498                            && pending.is_empty();
5499                        if all_done {
5500                            *completed = true;
5501                            StageTransition::emit(StageEmissions::One(outlet, output))
5502                                .with_completion(vec![outlet])
5503                        } else {
5504                            StageTransition::emit(StageEmissions::One(outlet, output))
5505                        }
5506                    } else {
5507                        StageTransition::none()
5508                    }
5509                };
5510                Ok(result)
5511            }
5512            StageKind::MergeSequence {
5513                input_count,
5514                extract_sequence,
5515                ..
5516            } => {
5517                let result = {
5518                    let StageState::MergeSequence {
5519                        next_sequence,
5520                        pending,
5521                        completed_count,
5522                        output_buffer,
5523                        completed,
5524                    } = &mut self.stage_states[stage_index]
5525                    else {
5526                        return Err(StreamError::GraphValidation(
5527                            "merge-sequence state is missing".into(),
5528                        ));
5529                    };
5530                    if *completed {
5531                        return Ok(StageTransition::none());
5532                    }
5533                    let seq = extract_sequence(&value);
5534                    if seq == *next_sequence {
5535                        output_buffer.push_back(value);
5536                        *next_sequence += 1;
5537                        while let Some(index) =
5538                            pending.iter().position(|(s, _)| *s == *next_sequence)
5539                        {
5540                            let (_, item) = pending.remove(index);
5541                            output_buffer.push_back(item);
5542                            *next_sequence += 1;
5543                        }
5544                    } else {
5545                        if pending.iter().any(|(s, _)| *s == seq) {
5546                            return Err(StreamError::Failed(format!(
5547                                "duplicate sequence {seq} on merge sequence"
5548                            )));
5549                        }
5550                        pending.push((seq, value));
5551                        pending.sort_by_key(|(s, _)| *s);
5552                        while let Some(index) =
5553                            pending.iter().position(|(s, _)| *s == *next_sequence)
5554                        {
5555                            let (_, item) = pending.remove(index);
5556                            output_buffer.push_back(item);
5557                            *next_sequence += 1;
5558                        }
5559                    }
5560
5561                    if !output_buffer.is_empty() {
5562                        let outlet = single_outlet(stage)?;
5563                        let all_done = *completed_count >= *input_count;
5564                        let emissions: Vec<_> =
5565                            output_buffer.drain(..).map(|v| (outlet, v)).collect();
5566                        if all_done {
5567                            *completed = true;
5568                            StageTransition::emit(StageEmissions::Many(emissions))
5569                                .with_completion(vec![outlet])
5570                        } else {
5571                            StageTransition::emit(StageEmissions::Many(emissions))
5572                        }
5573                    } else {
5574                        StageTransition::none()
5575                    }
5576                };
5577                Ok(result)
5578            }
5579            StageKind::MergeLatest {
5580                input_count,
5581                build_snapshot,
5582                ..
5583            } => {
5584                let result = {
5585                    let StageState::MergeLatest {
5586                        latest,
5587                        seen_count,
5588                        completed_count,
5589                        pending,
5590                        completed,
5591                    } = &mut self.stage_states[stage_index]
5592                    else {
5593                        return Err(StreamError::GraphValidation(
5594                            "merge-latest state is missing".into(),
5595                        ));
5596                    };
5597                    if *completed {
5598                        return Ok(StageTransition::none());
5599                    }
5600                    let inlet_index = stage
5601                        .spec
5602                        .inlets
5603                        .iter()
5604                        .position(|i| i.id() == inlet)
5605                        .ok_or_else(|| {
5606                            StreamError::GraphValidation(format!(
5607                                "merge-latest inlet {} not part of stage",
5608                                inlet.as_usize()
5609                            ))
5610                        })?;
5611                    if latest[inlet_index].is_none() {
5612                        *seen_count += 1;
5613                    }
5614                    latest[inlet_index] = Some(value);
5615                    if *seen_count >= *input_count {
5616                        let values: Vec<&DatumValue> =
5617                            latest.iter().filter_map(|v| v.as_ref()).collect();
5618                        let snapshot = build_snapshot(&values);
5619                        pending.push_back(snapshot);
5620                    }
5621
5622                    if !pending.is_empty() {
5623                        let outlet = single_outlet(stage)?;
5624                        let all_done = *completed_count >= *input_count;
5625                        let emissions: Vec<_> = pending.drain(..).map(|v| (outlet, v)).collect();
5626                        if all_done {
5627                            *completed = true;
5628                            StageTransition::emit(StageEmissions::Many(emissions))
5629                                .with_completion(vec![outlet])
5630                        } else {
5631                            StageTransition::emit(StageEmissions::Many(emissions))
5632                        }
5633                    } else {
5634                        StageTransition::none()
5635                    }
5636                };
5637                Ok(result)
5638            }
5639            StageKind::Partition {
5640                output_count,
5641                partitioner,
5642                ..
5643            } => {
5644                let fast_path = match &self.stage_states[stage_index] {
5645                    StageState::Partition { fast_path, .. } => *fast_path,
5646                    _ => None,
5647                };
5648                if let Some(fast_path) = fast_path {
5649                    let idx = partitioner(&value);
5650                    if idx >= *output_count {
5651                        return Err(StreamError::Failed(format!(
5652                            "partitioner returned out-of-bounds index {idx} for {output_count} outputs"
5653                        )));
5654                    }
5655                    let merge_stage = &self.graph.stages[fast_path.merge_stage_index];
5656                    let StageState::Merge { completed, .. } =
5657                        &self.stage_states[fast_path.merge_stage_index]
5658                    else {
5659                        return Err(StreamError::GraphValidation(
5660                            "partition-merge fast path: merge state missing".into(),
5661                        ));
5662                    };
5663                    if *completed {
5664                        return Ok(StageTransition::none());
5665                    }
5666                    return Ok(StageTransition::emit(StageEmissions::One(
5667                        single_outlet(merge_stage)?,
5668                        value,
5669                    )));
5670                }
5671                let result = {
5672                    let StageState::Partition {
5673                        pending,
5674                        upstream_closed: _,
5675                        demand,
5676                        cancelled,
5677                        output_count: _,
5678                        completed,
5679                        ..
5680                    } = &mut self.stage_states[stage_index]
5681                    else {
5682                        return Err(StreamError::GraphValidation(
5683                            "partition state is missing".into(),
5684                        ));
5685                    };
5686                    if *completed {
5687                        return Ok(StageTransition::none());
5688                    }
5689                    let idx = partitioner(&value);
5690                    if idx >= *output_count {
5691                        return Err(StreamError::Failed(format!(
5692                            "partitioner returned out-of-bounds index {idx} for {output_count} outputs"
5693                        )));
5694                    }
5695                    if cancelled[idx] {
5696                        return Ok(StageTransition::none());
5697                    }
5698                    if demand[idx] {
5699                        demand[idx] = false;
5700                        let outlet = stage.spec.outlets[idx].id();
5701                        StageTransition::emit(StageEmissions::One(outlet, value))
5702                    } else {
5703                        *pending = Some((idx, value));
5704                        StageTransition::none()
5705                    }
5706                };
5707                Ok(result)
5708            }
5709        }
5710    }
5711
5712    fn process_completion(
5713        &mut self,
5714        stage_index: usize,
5715        inlet: PortId,
5716    ) -> StreamResult<StageTransition> {
5717        let stage = &self.graph.stages[stage_index];
5718        match &stage.spec.kind {
5719            StageKind::Identity | StageKind::Map(_) | StageKind::AsyncBoundary => {
5720                Ok(StageTransition::emit(StageEmissions::None)
5721                    .with_completion(vec![single_outlet(stage)?]))
5722            }
5723            StageKind::Opaque => {
5724                if let Some(logic) = self
5725                    .opaque_logics
5726                    .get_mut(stage_index)
5727                    .and_then(|l| l.as_mut())
5728                {
5729                    logic.drain_async_callbacks();
5730                    logic.complete_inlet_by_id(inlet)?;
5731                    let inlet_ref = stage.spec.inlets.iter().find(|i| i.id() == inlet).cloned();
5732                    if let Some(inlet_ref) = inlet_ref {
5733                        let mut handler = logic.take_in_handler(inlet);
5734                        let result = if let Some(ref mut handler) = handler {
5735                            let inlet_any = inlet_ref;
5736                            handler.on_upstream_finish(logic, inlet_any)
5737                        } else {
5738                            Ok(())
5739                        };
5740                        if let Some(handler) = handler {
5741                            logic.restore_in_handler(inlet, handler);
5742                        }
5743                        result?;
5744                    }
5745                    self.collect_opaque_emissions(stage, stage_index)
5746                } else {
5747                    Ok(StageTransition::emit(StageEmissions::None)
5748                        .with_completion(vec![single_outlet(stage)?]))
5749                }
5750            }
5751            StageKind::Broadcast | StageKind::Balance => {
5752                Ok(StageTransition::emit(StageEmissions::None)
5753                    .with_completion(stage.spec.outlets.iter().map(AnyOutlet::id).collect()))
5754            }
5755            StageKind::Merge | StageKind::MergePreferred | StageKind::MergePrioritized { .. } => {
5756                let StageState::Merge {
5757                    open_inputs,
5758                    eager_complete,
5759                    completed,
5760                } = &mut self.stage_states[stage_index]
5761                else {
5762                    return Err(StreamError::GraphValidation(
5763                        "merge state is missing".into(),
5764                    ));
5765                };
5766
5767                if *completed {
5768                    return Ok(StageTransition::none());
5769                }
5770                if *open_inputs == 0 {
5771                    return Ok(StageTransition::none());
5772                }
5773                *open_inputs -= 1;
5774                if *eager_complete || *open_inputs == 0 {
5775                    *completed = true;
5776                    Ok(StageTransition::emit(StageEmissions::None)
5777                        .with_completion(vec![single_outlet(stage)?]))
5778                } else {
5779                    Ok(StageTransition::none())
5780                }
5781            }
5782            StageKind::Concat | StageKind::Interleave { .. } => {
5783                let StageState::Merge {
5784                    open_inputs,
5785                    eager_complete,
5786                    completed,
5787                } = &mut self.stage_states[stage_index]
5788                else {
5789                    return Err(StreamError::GraphValidation(
5790                        "fan-in state is missing".into(),
5791                    ));
5792                };
5793
5794                if *completed {
5795                    return Ok(StageTransition::none());
5796                }
5797                if *open_inputs == 0 {
5798                    return Ok(StageTransition::none());
5799                }
5800                *open_inputs -= 1;
5801                if *eager_complete || *open_inputs == 0 {
5802                    *completed = true;
5803                    Ok(StageTransition::emit(StageEmissions::None)
5804                        .with_completion(vec![single_outlet(stage)?]))
5805                } else {
5806                    Ok(StageTransition::none())
5807                }
5808            }
5809            StageKind::OrElse { primary_inlet } => {
5810                let StageState::OrElse {
5811                    primary_emitted,
5812                    buffer,
5813                    primary_closed,
5814                    secondary_closed,
5815                    completed,
5816                    ..
5817                } = &mut self.stage_states[stage_index]
5818                else {
5819                    return Err(StreamError::GraphValidation(
5820                        "or-else state is missing".into(),
5821                    ));
5822                };
5823                if *completed {
5824                    return Ok(StageTransition::none());
5825                }
5826                if inlet == *primary_inlet {
5827                    *primary_closed = true;
5828                    if *primary_emitted {
5829                        *completed = true;
5830                        buffer.clear();
5831                        Ok(StageTransition::emit(StageEmissions::None)
5832                            .with_completion(vec![single_outlet(stage)?]))
5833                    } else {
5834                        let outlet = single_outlet(stage)?;
5835                        let emissions: Vec<_> = buffer.drain(..).map(|v| (outlet, v)).collect();
5836                        if *secondary_closed {
5837                            *completed = true;
5838                            let transition = if emissions.is_empty() {
5839                                StageTransition::emit(StageEmissions::None)
5840                            } else {
5841                                StageTransition::emit(StageEmissions::Many(emissions))
5842                            };
5843                            Ok(transition.with_completion(vec![outlet]))
5844                        } else {
5845                            if emissions.is_empty() {
5846                                Ok(StageTransition::none())
5847                            } else {
5848                                Ok(StageTransition::emit(StageEmissions::Many(emissions)))
5849                            }
5850                        }
5851                    }
5852                } else {
5853                    *secondary_closed = true;
5854                    if *primary_closed && !*primary_emitted {
5855                        let outlet = single_outlet(stage)?;
5856                        let emissions: Vec<_> = buffer.drain(..).map(|v| (outlet, v)).collect();
5857                        *completed = true;
5858                        if emissions.is_empty() {
5859                            Ok(StageTransition::emit(StageEmissions::None)
5860                                .with_completion(vec![outlet]))
5861                        } else {
5862                            Ok(StageTransition::emit(StageEmissions::Many(emissions))
5863                                .with_completion(vec![outlet]))
5864                        }
5865                    } else {
5866                        Ok(StageTransition::none())
5867                    }
5868                }
5869            }
5870            StageKind::Zip(_) => {
5871                let StageState::Zip {
5872                    left_inlet,
5873                    right_inlet,
5874                    left,
5875                    right,
5876                    left_pending_complete,
5877                    right_pending_complete,
5878                    completed,
5879                } = &mut self.stage_states[stage_index]
5880                else {
5881                    return Err(StreamError::GraphValidation("zip state is missing".into()));
5882                };
5883                if *completed {
5884                    return Ok(StageTransition::none());
5885                }
5886                let finishes_left = inlet == *left_inlet;
5887                let finishes_right = inlet == *right_inlet;
5888                if (finishes_left && left.is_empty()) || (finishes_right && right.is_empty()) {
5889                    *completed = true;
5890                    Ok(StageTransition::emit(StageEmissions::None)
5891                        .with_completion(vec![single_outlet(stage)?]))
5892                } else {
5893                    if finishes_left {
5894                        *left_pending_complete = true;
5895                    }
5896                    if finishes_right {
5897                        *right_pending_complete = true;
5898                    }
5899                    Ok(StageTransition::none())
5900                }
5901            }
5902            StageKind::Unzip { .. } => {
5903                let (fan_in, zip_fast) = match &self.stage_states[stage_index] {
5904                    StageState::Unzip {
5905                        fast_path,
5906                        zip_fast_path,
5907                        ..
5908                    } => (*fast_path, *zip_fast_path),
5909                    _ => (None, None),
5910                };
5911                if let Some(zip_fast) = zip_fast {
5912                    let StageState::Unzip {
5913                        upstream_closed, ..
5914                    } = &mut self.stage_states[stage_index]
5915                    else {
5916                        return Err(StreamError::GraphValidation(
5917                            "unzip state is missing".into(),
5918                        ));
5919                    };
5920                    *upstream_closed = true;
5921                    let zip_stage = &self.graph.stages[zip_fast.zip_stage_index];
5922                    return Ok(StageTransition::emit(StageEmissions::None)
5923                        .with_completion(vec![single_outlet(zip_stage)?]));
5924                }
5925                if let Some(fast_path) = fan_in {
5926                    let StageState::Unzip {
5927                        upstream_closed, ..
5928                    } = &mut self.stage_states[stage_index]
5929                    else {
5930                        return Err(StreamError::GraphValidation(
5931                            "unzip state is missing".into(),
5932                        ));
5933                    };
5934                    *upstream_closed = true;
5935                    let target_stage = &self.graph.stages[fast_path.fan_in_stage_index];
5936                    // Only notify the two inlets that the Unzip outlets are wired to —
5937                    // do not call process_completion on unrelated inlets of the fan-in
5938                    // stage (e.g. a third inlet fed by a separate source).
5939                    let target_inlets = [
5940                        target_stage.spec.inlets[fast_path.target_inlet_indices[0]].id(),
5941                        target_stage.spec.inlets[fast_path.target_inlet_indices[1]].id(),
5942                    ];
5943                    let mut combined = StageTransition::none();
5944                    for target_inlet in target_inlets {
5945                        let t =
5946                            self.process_completion(fast_path.fan_in_stage_index, target_inlet)?;
5947                        combined.emissions = merge_emissions(combined.emissions, t.emissions);
5948                        combined.completed_outlets.extend(t.completed_outlets);
5949                        combined.cancelled_inlets.extend(t.cancelled_inlets);
5950                    }
5951                    Ok(combined)
5952                } else {
5953                    let StageState::Unzip {
5954                        upstream_closed, ..
5955                    } = &mut self.stage_states[stage_index]
5956                    else {
5957                        return Err(StreamError::GraphValidation(
5958                            "unzip state is missing".into(),
5959                        ));
5960                    };
5961                    *upstream_closed = true;
5962                    Ok(StageTransition::emit(StageEmissions::None)
5963                        .with_completion(stage.spec.outlets.iter().map(AnyOutlet::id).collect()))
5964                }
5965            }
5966            StageKind::MergeSorted(compare) => {
5967                let result = {
5968                    let StageState::MergeSorted {
5969                        left,
5970                        right,
5971                        left_closed,
5972                        right_closed,
5973                        pending,
5974                        completed,
5975                    } = &mut self.stage_states[stage_index]
5976                    else {
5977                        return Err(StreamError::GraphValidation(
5978                            "merge-sorted state is missing".into(),
5979                        ));
5980                    };
5981                    if *completed {
5982                        return Ok(StageTransition::none());
5983                    }
5984                    let is_left = stage.spec.inlets.first().is_some_and(|i| i.id() == inlet);
5985                    if is_left {
5986                        *left_closed = true;
5987                    } else {
5988                        *right_closed = true;
5989                    }
5990
5991                    loop {
5992                        let next = match (left.front(), right.front()) {
5993                            (Some(l), Some(r)) => {
5994                                if compare(l, r) != std::cmp::Ordering::Greater {
5995                                    left.pop_front()
5996                                } else {
5997                                    right.pop_front()
5998                                }
5999                            }
6000                            (Some(_), None) if *right_closed => left.pop_front(),
6001                            (None, Some(_)) if *left_closed => right.pop_front(),
6002                            _ => break,
6003                        };
6004                        if let Some(val) = next {
6005                            pending.push_back(val);
6006                        } else {
6007                            break;
6008                        }
6009                    }
6010
6011                    if pending.is_empty() {
6012                        let all_done =
6013                            *left_closed && *right_closed && left.is_empty() && right.is_empty();
6014                        if all_done {
6015                            *completed = true;
6016                            StageTransition::emit(StageEmissions::None)
6017                                .with_completion(vec![single_outlet(stage)?])
6018                        } else {
6019                            StageTransition::none()
6020                        }
6021                    } else {
6022                        let outlet = single_outlet(stage)?;
6023                        let emissions: Vec<_> = pending.drain(..).map(|v| (outlet, v)).collect();
6024                        let all_done =
6025                            *left_closed && *right_closed && left.is_empty() && right.is_empty();
6026                        if all_done {
6027                            *completed = true;
6028                            StageTransition::emit(StageEmissions::Many(emissions))
6029                                .with_completion(vec![outlet])
6030                        } else {
6031                            StageTransition::emit(StageEmissions::Many(emissions))
6032                        }
6033                    }
6034                };
6035                Ok(result)
6036            }
6037            StageKind::MergeSequence { input_count, .. } => {
6038                let result = {
6039                    let StageState::MergeSequence {
6040                        next_sequence,
6041                        pending,
6042                        completed_count,
6043                        output_buffer,
6044                        completed,
6045                    } = &mut self.stage_states[stage_index]
6046                    else {
6047                        return Err(StreamError::GraphValidation(
6048                            "merge-sequence state is missing".into(),
6049                        ));
6050                    };
6051                    if *completed {
6052                        return Ok(StageTransition::none());
6053                    }
6054                    *completed_count += 1;
6055                    if *completed_count >= *input_count && output_buffer.is_empty() {
6056                        if !pending.is_empty() {
6057                            // All inputs have completed but there are buffered elements
6058                            // whose sequence numbers do not include `next_sequence`.
6059                            // This is a gap — fail exactly as the GraphStage logic does.
6060                            return Err(StreamError::Failed(format!(
6061                                "expected sequence {next_sequence}, but all input ports have pushed or are complete",
6062                            )));
6063                        }
6064                        *completed = true;
6065                        StageTransition::emit(StageEmissions::None)
6066                            .with_completion(vec![single_outlet(stage)?])
6067                    } else {
6068                        StageTransition::none()
6069                    }
6070                };
6071                Ok(result)
6072            }
6073            StageKind::MergeLatest {
6074                input_count,
6075                eager_complete,
6076                ..
6077            } => {
6078                let result = {
6079                    let StageState::MergeLatest {
6080                        completed_count,
6081                        pending,
6082                        completed,
6083                        ..
6084                    } = &mut self.stage_states[stage_index]
6085                    else {
6086                        return Err(StreamError::GraphValidation(
6087                            "merge-latest state is missing".into(),
6088                        ));
6089                    };
6090                    if *completed {
6091                        return Ok(StageTransition::none());
6092                    }
6093                    *completed_count += 1;
6094                    // Complete when all inputs are done, OR when eager_complete is set
6095                    // and there is no pending output (matches Akka ZipLatestWith semantics:
6096                    // complete as soon as any upstream completes if eager is true and the
6097                    // pending queue is drained).
6098                    let all_done = *completed_count >= *input_count;
6099                    let eager_done = *eager_complete && pending.is_empty();
6100                    if all_done || eager_done {
6101                        *completed = true;
6102                        StageTransition::emit(StageEmissions::None)
6103                            .with_completion(vec![single_outlet(stage)?])
6104                    } else {
6105                        StageTransition::none()
6106                    }
6107                };
6108                Ok(result)
6109            }
6110            StageKind::Partition { .. } => {
6111                let fast_path = match &self.stage_states[stage_index] {
6112                    StageState::Partition { fast_path, .. } => *fast_path,
6113                    _ => None,
6114                };
6115                let result = {
6116                    let StageState::Partition {
6117                        pending,
6118                        upstream_closed,
6119                        completed,
6120                        ..
6121                    } = &mut self.stage_states[stage_index]
6122                    else {
6123                        return Err(StreamError::GraphValidation(
6124                            "partition state is missing".into(),
6125                        ));
6126                    };
6127                    if *completed {
6128                        return Ok(StageTransition::none());
6129                    }
6130                    *upstream_closed = true;
6131                    if pending.is_none() {
6132                        *completed = true;
6133                        if let Some(fast_path) = fast_path {
6134                            let merge_stage = &self.graph.stages[fast_path.merge_stage_index];
6135                            StageTransition::emit(StageEmissions::None)
6136                                .with_completion(vec![single_outlet(merge_stage)?])
6137                        } else {
6138                            StageTransition::emit(StageEmissions::None).with_completion(
6139                                stage.spec.outlets.iter().map(AnyOutlet::id).collect(),
6140                            )
6141                        }
6142                    } else {
6143                        StageTransition::none()
6144                    }
6145                };
6146                Ok(result)
6147            }
6148        }
6149    }
6150
6151    fn process_pull(
6152        &mut self,
6153        stage_index: usize,
6154        outlet: PortId,
6155    ) -> StreamResult<StageTransition> {
6156        let stage = &self.graph.stages[stage_index];
6157        match &stage.spec.kind {
6158            StageKind::Opaque => {
6159                if let Some(logic) = self
6160                    .opaque_logics
6161                    .get_mut(stage_index)
6162                    .and_then(|l| l.as_mut())
6163                {
6164                    logic.drain_async_callbacks();
6165                    logic.set_demand_by_id(outlet)?;
6166                    let outlet_ref = stage
6167                        .spec
6168                        .outlets
6169                        .iter()
6170                        .find(|o| o.id() == outlet)
6171                        .cloned();
6172                    if let Some(outlet_ref) = outlet_ref {
6173                        let mut handler = logic.take_out_handler(outlet);
6174                        let result = if let Some(ref mut handler) = handler {
6175                            handler.on_pull(logic, outlet_ref)
6176                        } else {
6177                            Ok(())
6178                        };
6179                        if let Some(handler) = handler
6180                            && handler.keep_handler()
6181                            && logic.get_out_handler_mut(outlet).is_none()
6182                        {
6183                            logic.restore_out_handler(outlet, handler);
6184                        }
6185                        result?;
6186                    }
6187                    self.collect_opaque_emissions(stage, stage_index)
6188                } else {
6189                    Ok(StageTransition::none())
6190                }
6191            }
6192            StageKind::Unzip { .. } => {
6193                let StageState::Unzip {
6194                    demand, cancelled, ..
6195                } = &mut self.stage_states[stage_index]
6196                else {
6197                    return Ok(StageTransition::none());
6198                };
6199                let Some(idx) = stage.spec.outlets.iter().position(|o| o.id() == outlet) else {
6200                    return Ok(StageTransition::none());
6201                };
6202                if idx < 2 && !cancelled[idx] {
6203                    demand[idx] = true;
6204                }
6205                Ok(StageTransition::none())
6206            }
6207            StageKind::Partition { .. } => {
6208                let result = {
6209                    let StageState::Partition {
6210                        pending,
6211                        upstream_closed,
6212                        demand,
6213                        cancelled,
6214                        completed,
6215                        ..
6216                    } = &mut self.stage_states[stage_index]
6217                    else {
6218                        return Ok(StageTransition::none());
6219                    };
6220                    if *completed {
6221                        return Ok(StageTransition::none());
6222                    }
6223                    let Some(idx) = stage.spec.outlets.iter().position(|o| o.id() == outlet) else {
6224                        return Ok(StageTransition::none());
6225                    };
6226                    if cancelled[idx] {
6227                        return Ok(StageTransition::none());
6228                    }
6229
6230                    if let Some((p_idx, p_val)) = pending.take() {
6231                        if p_idx == idx {
6232                            let out = stage.spec.outlets[idx].id();
6233                            if *upstream_closed {
6234                                *completed = true;
6235                                StageTransition::emit(StageEmissions::One(out, p_val))
6236                                    .with_completion(
6237                                        stage.spec.outlets.iter().map(AnyOutlet::id).collect(),
6238                                    )
6239                            } else {
6240                                StageTransition::emit(StageEmissions::One(out, p_val))
6241                            }
6242                        } else {
6243                            *pending = Some((p_idx, p_val));
6244                            demand[idx] = true;
6245                            StageTransition::none()
6246                        }
6247                    } else {
6248                        demand[idx] = true;
6249                        StageTransition::none()
6250                    }
6251                };
6252                Ok(result)
6253            }
6254            _ => Ok(StageTransition::none()),
6255        }
6256    }
6257
6258    fn process_downstream_finish(
6259        &mut self,
6260        stage_index: usize,
6261        outlet: PortId,
6262    ) -> StreamResult<StageTransition> {
6263        let stage = &self.graph.stages[stage_index];
6264        match &stage.spec.kind {
6265            StageKind::Broadcast => {
6266                let StageState::Broadcast {
6267                    cancelled_outlets,
6268                    live_outlets,
6269                    ..
6270                } = &mut self.stage_states[stage_index]
6271                else {
6272                    return Err(StreamError::GraphValidation(
6273                        "broadcast state is missing".into(),
6274                    ));
6275                };
6276                let index = stage
6277                    .spec
6278                    .outlets
6279                    .iter()
6280                    .position(|candidate| candidate.id() == outlet)
6281                    .ok_or_else(|| {
6282                        StreamError::GraphValidation(format!(
6283                            "broadcast outlet {} is not part of the stage",
6284                            outlet.as_usize()
6285                        ))
6286                    })?;
6287                if cancelled_outlets[index] {
6288                    return Ok(StageTransition::none());
6289                }
6290                cancelled_outlets[index] = true;
6291                *live_outlets -= 1;
6292                if *live_outlets == 0 {
6293                    Ok(StageTransition::none()
6294                        .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect()))
6295                } else {
6296                    Ok(StageTransition::none())
6297                }
6298            }
6299            StageKind::Balance => {
6300                let StageState::Balance {
6301                    cancelled_outlets,
6302                    live_outlets,
6303                    ..
6304                } = &mut self.stage_states[stage_index]
6305                else {
6306                    return Err(StreamError::GraphValidation(
6307                        "balance state is missing".into(),
6308                    ));
6309                };
6310                let index = stage
6311                    .spec
6312                    .outlets
6313                    .iter()
6314                    .position(|candidate| candidate.id() == outlet)
6315                    .ok_or_else(|| {
6316                        StreamError::GraphValidation(format!(
6317                            "balance outlet {} is not part of the stage",
6318                            outlet.as_usize()
6319                        ))
6320                    })?;
6321                if cancelled_outlets[index] {
6322                    return Ok(StageTransition::none());
6323                }
6324                cancelled_outlets[index] = true;
6325                *live_outlets -= 1;
6326                if *live_outlets == 0 {
6327                    Ok(StageTransition::none()
6328                        .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect()))
6329                } else {
6330                    Ok(StageTransition::none())
6331                }
6332            }
6333            StageKind::Unzip { .. } => {
6334                let StageState::Unzip { cancelled, .. } = &mut self.stage_states[stage_index]
6335                else {
6336                    return Err(StreamError::GraphValidation(
6337                        "unzip state is missing".into(),
6338                    ));
6339                };
6340                let idx = stage
6341                    .spec
6342                    .outlets
6343                    .iter()
6344                    .position(|o| o.id() == outlet)
6345                    .unwrap_or(0);
6346                if idx < 2 && !cancelled[idx] {
6347                    cancelled[idx] = true;
6348                    let all_cancelled = cancelled.iter().all(|c| *c);
6349                    if all_cancelled {
6350                        Ok(StageTransition::none().with_cancellations(
6351                            stage.spec.inlets.iter().map(AnyInlet::id).collect(),
6352                        ))
6353                    } else {
6354                        Ok(StageTransition::none())
6355                    }
6356                } else {
6357                    Ok(StageTransition::none())
6358                }
6359            }
6360            StageKind::MergeSorted(_)
6361            | StageKind::MergeSequence { .. }
6362            | StageKind::MergeLatest { .. } => Ok(StageTransition::none()
6363                .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect())),
6364            StageKind::Partition { eager_cancel, .. } => {
6365                let result = {
6366                    let StageState::Partition {
6367                        pending,
6368                        cancelled,
6369                        completed,
6370                        ..
6371                    } = &mut self.stage_states[stage_index]
6372                    else {
6373                        return Err(StreamError::GraphValidation(
6374                            "partition state is missing".into(),
6375                        ));
6376                    };
6377                    if *completed {
6378                        return Ok(StageTransition::none());
6379                    }
6380                    let Some(idx) = stage.spec.outlets.iter().position(|o| o.id() == outlet) else {
6381                        return Ok(StageTransition::none());
6382                    };
6383                    if cancelled[idx] {
6384                        return Ok(StageTransition::none());
6385                    }
6386                    cancelled[idx] = true;
6387                    // If the pending element was routed to this outlet, discard it
6388                    if let Some((p_idx, _)) = pending
6389                        && *p_idx == idx
6390                    {
6391                        *pending = None;
6392                    }
6393                    let all_cancelled = cancelled.iter().all(|c| *c);
6394                    if all_cancelled || *eager_cancel {
6395                        *completed = true;
6396                        StageTransition::none().with_cancellations(
6397                            stage.spec.inlets.iter().map(AnyInlet::id).collect(),
6398                        )
6399                    } else {
6400                        StageTransition::none()
6401                    }
6402                };
6403                Ok(result)
6404            }
6405            StageKind::Opaque => {
6406                let no_cancelled_outlets = self.cancelled_outlets.is_empty();
6407                if let Some(logic) = self
6408                    .opaque_logics
6409                    .get_mut(stage_index)
6410                    .and_then(|l| l.as_mut())
6411                {
6412                    logic.drain_async_callbacks();
6413                    logic.downstream_finish_by_id(outlet, "downstream_finish")?;
6414                    let outlet_ref = stage
6415                        .spec
6416                        .outlets
6417                        .iter()
6418                        .find(|o| o.id() == outlet)
6419                        .cloned();
6420                    if let Some(outlet_ref) = outlet_ref {
6421                        let mut handler = logic.take_out_handler(outlet);
6422                        let result = if let Some(ref mut handler) = handler {
6423                            handler.on_downstream_finish(logic, outlet_ref)
6424                        } else {
6425                            Ok(())
6426                        };
6427                        if let Some(handler) = handler
6428                            && handler.keep_handler()
6429                            && logic.get_out_handler_mut(outlet).is_none()
6430                        {
6431                            logic.restore_out_handler(outlet, handler);
6432                        }
6433                        result?;
6434                    }
6435                    let all_outlets_closed = stage.spec.outlets.iter().all(|candidate| {
6436                        logic.is_closed_by_id(candidate.id())
6437                            || (!no_cancelled_outlets
6438                                && self.cancelled_outlets.contains(&candidate.id()))
6439                    });
6440                    let mut transition = self.collect_opaque_emissions(stage, stage_index)?;
6441                    if all_outlets_closed {
6442                        transition.cancelled_inlets =
6443                            stage.spec.inlets.iter().map(AnyInlet::id).collect();
6444                    }
6445                    Ok(transition)
6446                } else {
6447                    Ok(StageTransition::none()
6448                        .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect()))
6449                }
6450            }
6451            _ => Ok(StageTransition::none()
6452                .with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect())),
6453        }
6454    }
6455
6456    fn collect_opaque_emissions(
6457        &mut self,
6458        stage: &StageRecord,
6459        stage_index: usize,
6460    ) -> StreamResult<StageTransition> {
6461        if let Some(logic) = self
6462            .opaque_logics
6463            .get_mut(stage_index)
6464            .and_then(|l| l.as_mut())
6465        {
6466            let emissions_slots = std::mem::take(&mut logic.pending_emissions);
6467            let completions = std::mem::take(&mut logic.pending_completions);
6468            let has_stage_failed = logic.stage_error().is_some();
6469
6470            let emissions = if emissions_slots.is_empty() {
6471                StageEmissions::None
6472            } else if emissions_slots.len() == 1 {
6473                let (port, val) = emissions_slots.into_iter().next().unwrap();
6474                StageEmissions::One(port, val)
6475            } else {
6476                StageEmissions::Many(emissions_slots)
6477            };
6478
6479            if has_stage_failed {
6480                let _ = has_stage_failed;
6481            }
6482
6483            Ok(StageTransition {
6484                emissions,
6485                completed_outlets: completions,
6486                cancelled_inlets: Vec::new(),
6487            })
6488        } else {
6489            Ok(StageTransition::emit(StageEmissions::None)
6490                .with_completion(vec![single_outlet(stage)?]))
6491        }
6492    }
6493
6494    fn bump_event(&mut self) -> StreamResult<()> {
6495        bump_fused_event(&mut self.events, self.config)
6496    }
6497
6498    fn broadcast_zip_emissions(
6499        &mut self,
6500        fast_path: BroadcastZipFastPath,
6501        value: DatumValue,
6502    ) -> StreamResult<StageEmissions> {
6503        // 4 = broadcast fan-out (2) + zip pair (~2) stage events the unfused path would bump.
6504        for _ in 0..4 {
6505            self.bump_event()?;
6506        }
6507
6508        let zip_stage = &self.graph.stages[fast_path.zip_stage_index];
6509        let StageKind::Zip(zip) = &zip_stage.spec.kind else {
6510            return Err(StreamError::GraphValidation(
6511                "broadcast zip fast path references a non-zip stage".into(),
6512            ));
6513        };
6514
6515        let cloned = value.clone_box();
6516        let (left, right) = if fast_path.left_uses_clone {
6517            (cloned, value)
6518        } else {
6519            (value, cloned)
6520        };
6521
6522        Ok(StageEmissions::One(
6523            single_outlet(zip_stage)?,
6524            zip(left, right)?,
6525        ))
6526    }
6527
6528    fn balance_merge_emissions(
6529        &mut self,
6530        fast_path: BalanceMergeFastPath,
6531        value: DatumValue,
6532    ) -> StreamResult<StageEmissions> {
6533        // 4 = balance route (~2) + merge handoff (~2) stage events the unfused path would bump.
6534        for _ in 0..4 {
6535            self.bump_event()?;
6536        }
6537
6538        let merge_stage = &self.graph.stages[fast_path.merge_stage_index];
6539        let StageKind::Merge = merge_stage.spec.kind else {
6540            return Err(StreamError::GraphValidation(
6541                "balance merge fast path references a non-merge stage".into(),
6542            ));
6543        };
6544
6545        Ok(StageEmissions::One(single_outlet(merge_stage)?, value))
6546    }
6547
6548    fn prime_connected_demands(&mut self) {
6549        for (stage_index, stage) in self.graph.stages.iter().enumerate() {
6550            match &stage.spec.kind {
6551                StageKind::Opaque => {
6552                    let Some(logic) = self
6553                        .opaque_logics
6554                        .get_mut(stage_index)
6555                        .and_then(|logic| logic.as_mut())
6556                    else {
6557                        continue;
6558                    };
6559                    for outlet in &stage.spec.outlets {
6560                        if self.edge_by_outlet.contains_key(&outlet.id()) {
6561                            let _ = logic.set_demand_by_id(outlet.id());
6562                        }
6563                    }
6564                }
6565                StageKind::Unzip { .. } => {
6566                    let StageState::Unzip { demand, .. } = &mut self.stage_states[stage_index]
6567                    else {
6568                        continue;
6569                    };
6570                    for (idx, outlet) in stage.spec.outlets.iter().enumerate() {
6571                        if self.edge_by_outlet.contains_key(&outlet.id()) {
6572                            demand[idx] = true;
6573                        }
6574                    }
6575                }
6576                StageKind::Partition { .. } => {
6577                    let StageState::Partition {
6578                        demand,
6579                        output_count,
6580                        ..
6581                    } = &mut self.stage_states[stage_index]
6582                    else {
6583                        continue;
6584                    };
6585                    for (idx, demand_slot) in demand.iter_mut().enumerate().take(*output_count) {
6586                        if idx < stage.spec.outlets.len()
6587                            && self
6588                                .edge_by_outlet
6589                                .contains_key(&stage.spec.outlets[idx].id())
6590                        {
6591                            *demand_slot = true;
6592                        }
6593                    }
6594                }
6595                _ => {}
6596            }
6597        }
6598    }
6599}
6600
6601impl<Left, Right> GraphBlueprint<ZipShape<Left, Right>>
6602where
6603    Left: Clone + Send + 'static,
6604    Right: Clone + Send + 'static,
6605{
6606    pub fn run_zip(&self, left: Vec<Left>, right: Vec<Right>) -> StreamResult<Vec<(Left, Right)>> {
6607        Ok(self
6608            .run_zip_report(left, right, FusedExecutionConfig::default())?
6609            .output)
6610    }
6611
6612    pub fn run_zip_report(
6613        &self,
6614        left: Vec<Left>,
6615        right: Vec<Right>,
6616        config: FusedExecutionConfig,
6617    ) -> StreamResult<FusedExecutionReport<(Left, Right)>> {
6618        let mut left = left.into_iter();
6619        let mut right = right.into_iter();
6620        let left_inlet = self.shape.in0().id();
6621        let right_inlet = self.shape.in1().id();
6622        let outlet = self.shape.outlet().id();
6623        let mut executor = FusedExecutor::new(self, config);
6624        let mut output = Vec::with_capacity(left.len().min(right.len()));
6625        let mut left_completed = false;
6626        let mut right_completed = false;
6627
6628        {
6629            let mut output_sink = VecOutputSink {
6630                output: &mut output,
6631            };
6632            if left.len() == 0 {
6633                executor.complete(left_inlet, outlet, &mut output_sink)?;
6634                left_completed = true;
6635            }
6636            if right.len() == 0 {
6637                executor.complete(right_inlet, outlet, &mut output_sink)?;
6638                right_completed = true;
6639            }
6640
6641            while left.len() > 0 || right.len() > 0 {
6642                if let Some(item) = left.next() {
6643                    executor.deliver(left_inlet, datum(item), outlet, &mut output_sink)?;
6644                    if left.len() == 0 && !left_completed {
6645                        executor.complete(left_inlet, outlet, &mut output_sink)?;
6646                        left_completed = true;
6647                    }
6648                }
6649                if let Some(item) = right.next() {
6650                    executor.deliver(right_inlet, datum(item), outlet, &mut output_sink)?;
6651                    if right.len() == 0 && !right_completed {
6652                        executor.complete(right_inlet, outlet, &mut output_sink)?;
6653                        right_completed = true;
6654                    }
6655                }
6656            }
6657        }
6658
6659        Ok(FusedExecutionReport {
6660            output,
6661            events: executor.events,
6662            async_boundary_crossings: executor.async_boundary_crossings,
6663        })
6664    }
6665}
6666
6667fn broadcast_zip_fast_path<S: Shape>(
6668    stage: &StageRecord,
6669    graph: &GraphBlueprint<S>,
6670    edge_by_outlet: &HashMap<PortId, PortId>,
6671    stage_by_inlet: &HashMap<PortId, usize>,
6672) -> Option<BroadcastZipFastPath> {
6673    let [first_outlet, second_outlet] = stage.spec.outlets.as_slice() else {
6674        return None;
6675    };
6676
6677    let first_inlet = edge_by_outlet.get(&first_outlet.id()).copied()?;
6678    let second_inlet = edge_by_outlet.get(&second_outlet.id()).copied()?;
6679    let first_stage = stage_by_inlet.get(&first_inlet).copied()?;
6680    let second_stage = stage_by_inlet.get(&second_inlet).copied()?;
6681    if first_stage != second_stage {
6682        return None;
6683    }
6684
6685    let zip_stage = &graph.stages[first_stage];
6686    if !matches!(zip_stage.spec.kind, StageKind::Zip(_)) {
6687        return None;
6688    }
6689    let [left_inlet, right_inlet] = zip_stage.spec.inlets.as_slice() else {
6690        return None;
6691    };
6692
6693    if first_inlet == left_inlet.id() && second_inlet == right_inlet.id() {
6694        Some(BroadcastZipFastPath {
6695            zip_stage_index: first_stage,
6696            left_uses_clone: true,
6697        })
6698    } else if first_inlet == right_inlet.id() && second_inlet == left_inlet.id() {
6699        Some(BroadcastZipFastPath {
6700            zip_stage_index: first_stage,
6701            left_uses_clone: false,
6702        })
6703    } else {
6704        None
6705    }
6706}
6707
6708fn balance_merge_fast_path<S: Shape>(
6709    stage: &StageRecord,
6710    graph: &GraphBlueprint<S>,
6711    edge_by_outlet: &HashMap<PortId, PortId>,
6712    stage_by_inlet: &HashMap<PortId, usize>,
6713) -> Option<BalanceMergeFastPath> {
6714    let outlets = &stage.spec.outlets;
6715    let first_inlet = edge_by_outlet.get(&outlets.first()?.id()).copied()?;
6716    let merge_stage_index = *stage_by_inlet.get(&first_inlet)?;
6717    let merge_stage = graph.stages.get(merge_stage_index)?;
6718    if !matches!(merge_stage.spec.kind, StageKind::Merge) {
6719        return None;
6720    }
6721    if merge_stage.spec.inlets.len() != outlets.len() {
6722        return None;
6723    }
6724    for (outlet, merge_inlet) in outlets.iter().zip(merge_stage.spec.inlets.iter()) {
6725        if edge_by_outlet.get(&outlet.id()).copied() != Some(merge_inlet.id()) {
6726            return None;
6727        }
6728    }
6729    Some(BalanceMergeFastPath { merge_stage_index })
6730}
6731
6732fn unzip_fan_in_fast_path<S: Shape>(
6733    stage: &StageRecord,
6734    graph: &GraphBlueprint<S>,
6735    edge_by_outlet: &HashMap<PortId, PortId>,
6736    stage_by_inlet: &HashMap<PortId, usize>,
6737) -> Option<UnzipFanInFastPath> {
6738    let outlets = &stage.spec.outlets;
6739    if outlets.len() != 2 {
6740        return None;
6741    }
6742    let inlet0 = edge_by_outlet.get(&outlets[0].id()).copied()?;
6743    let inlet1 = edge_by_outlet.get(&outlets[1].id()).copied()?;
6744    let stage0 = *stage_by_inlet.get(&inlet0)?;
6745    let stage1 = *stage_by_inlet.get(&inlet1)?;
6746    if stage0 != stage1 {
6747        return None;
6748    }
6749    let target = graph.stages.get(stage0)?;
6750    if !matches!(
6751        target.spec.kind,
6752        StageKind::MergeSorted(_) | StageKind::MergeSequence { .. } | StageKind::MergeLatest { .. }
6753    ) {
6754        return None;
6755    }
6756    // Resolve the exact inlet indices so the fast path routes each Unzip output
6757    // to the correct slot in the fan-in stage, regardless of wiring order.
6758    let idx0 = target.spec.inlets.iter().position(|i| i.id() == inlet0)?;
6759    let idx1 = target.spec.inlets.iter().position(|i| i.id() == inlet1)?;
6760    Some(UnzipFanInFastPath {
6761        fan_in_stage_index: stage0,
6762        target_inlet_indices: [idx0, idx1],
6763    })
6764}
6765
6766fn unzip_zip_fast_path<S: Shape>(
6767    stage: &StageRecord,
6768    graph: &GraphBlueprint<S>,
6769    edge_by_outlet: &HashMap<PortId, PortId>,
6770    stage_by_inlet: &HashMap<PortId, usize>,
6771) -> Option<UnzipZipFastPath> {
6772    let outlets = &stage.spec.outlets;
6773    if outlets.len() != 2 {
6774        return None;
6775    }
6776    let inlet0 = edge_by_outlet.get(&outlets[0].id()).copied()?;
6777    let inlet1 = edge_by_outlet.get(&outlets[1].id()).copied()?;
6778    let stage0 = *stage_by_inlet.get(&inlet0)?;
6779    let stage1 = *stage_by_inlet.get(&inlet1)?;
6780    if stage0 != stage1 {
6781        return None;
6782    }
6783    let target = graph.stages.get(stage0)?;
6784    if !matches!(target.spec.kind, StageKind::Zip(_)) {
6785        return None;
6786    }
6787    Some(UnzipZipFastPath {
6788        zip_stage_index: stage0,
6789    })
6790}
6791
6792fn partition_merge_fast_path<S: Shape>(
6793    stage: &StageRecord,
6794    graph: &GraphBlueprint<S>,
6795    edge_by_outlet: &HashMap<PortId, PortId>,
6796    stage_by_inlet: &HashMap<PortId, usize>,
6797) -> Option<PartitionMergeFastPath> {
6798    let outlets = &stage.spec.outlets;
6799    let first_inlet = edge_by_outlet.get(&outlets.first()?.id()).copied()?;
6800    let merge_stage_index = *stage_by_inlet.get(&first_inlet)?;
6801    let merge_stage = graph.stages.get(merge_stage_index)?;
6802    if !matches!(merge_stage.spec.kind, StageKind::Merge) {
6803        return None;
6804    }
6805    if merge_stage.spec.inlets.len() != outlets.len() {
6806        return None;
6807    }
6808    for (outlet, merge_inlet) in outlets.iter().zip(merge_stage.spec.inlets.iter()) {
6809        if edge_by_outlet.get(&outlet.id()).copied() != Some(merge_inlet.id()) {
6810            return None;
6811        }
6812    }
6813    Some(PartitionMergeFastPath { merge_stage_index })
6814}
6815
6816/// Increments the event counter and returns an error if the configured limit
6817/// is exceeded.
6818///
6819/// `pub(crate)` so the typed-port executor (Phase 1+) can reuse the same
6820/// event-budget enforcement without duplicating the check.
6821pub(crate) fn bump_fused_event(
6822    events: &mut usize,
6823    config: FusedExecutionConfig,
6824) -> StreamResult<()> {
6825    *events += 1;
6826    if *events > config.event_limit {
6827        return Err(StreamError::EventLimitExceeded {
6828            limit: config.event_limit,
6829        });
6830    }
6831    Ok(())
6832}
6833
6834fn broadcast_emissions(outlets: &[AnyOutlet], value: DatumValue) -> StreamResult<StageEmissions> {
6835    match outlets {
6836        [] => Err(StreamError::GraphValidation(
6837            "broadcast has no outlets".into(),
6838        )),
6839        [outlet] => Ok(StageEmissions::One(outlet.id(), value)),
6840        [first, second] => Ok(StageEmissions::Two(
6841            (first.id(), value.clone_box()),
6842            (second.id(), value),
6843        )),
6844        outlets => {
6845            let mut emitted = Vec::with_capacity(outlets.len());
6846            for outlet in &outlets[..outlets.len() - 1] {
6847                emitted.push((outlet.id(), value.clone_box()));
6848            }
6849            emitted.push((outlets[outlets.len() - 1].id(), value));
6850            Ok(StageEmissions::Many(emitted))
6851        }
6852    }
6853}
6854
6855fn single_outlet(stage: &StageRecord) -> StreamResult<PortId> {
6856    stage
6857        .spec
6858        .outlets
6859        .first()
6860        .map(AnyOutlet::id)
6861        .ok_or_else(|| {
6862            StreamError::GraphValidation(format!("stage {} has no outlet", stage.spec.name()))
6863        })
6864}
6865
6866fn merge_emissions(first: StageEmissions, second: StageEmissions) -> StageEmissions {
6867    match (first, second) {
6868        (StageEmissions::None, other) | (other, StageEmissions::None) => other,
6869        (StageEmissions::One(p1, v1), StageEmissions::One(p2, v2)) => {
6870            StageEmissions::Many(vec![(p1, v1), (p2, v2)])
6871        }
6872        (StageEmissions::One(p, v), StageEmissions::Many(mut vec))
6873        | (StageEmissions::Many(mut vec), StageEmissions::One(p, v)) => {
6874            vec.push((p, v));
6875            StageEmissions::Many(vec)
6876        }
6877        (StageEmissions::Many(mut v1), StageEmissions::Many(v2)) => {
6878            v1.extend(v2);
6879            StageEmissions::Many(v1)
6880        }
6881        (a, b) => {
6882            let mut all = Vec::new();
6883            push_emissions(&mut all, a);
6884            push_emissions(&mut all, b);
6885            StageEmissions::Many(all)
6886        }
6887    }
6888}
6889
6890fn push_emissions(out: &mut Vec<(PortId, DatumValue)>, emissions: StageEmissions) {
6891    match emissions {
6892        StageEmissions::None => {}
6893        StageEmissions::One(p, v) => out.push((p, v)),
6894        StageEmissions::Two((p1, v1), (p2, v2)) => {
6895            out.push((p1, v1));
6896            out.push((p2, v2));
6897        }
6898        StageEmissions::Many(vec) => out.extend(vec),
6899    }
6900}