Skip to main content

datum/graph/
mod.rs

1//! Runtime-checked graph, shape, port, and junction primitives.
2//!
3//! This module contains Datum's Akka-like graph layer: typed public ports,
4//! runtime-validated GraphDSL wiring, fused acyclic fast paths, and a queued
5//! fused interpreter for cyclic feedback topologies. Invalid wiring and illegal
6//! stage operations fail deterministically.
7
8use std::{
9    any::{Any, TypeId, type_name},
10    collections::{HashMap, HashSet, VecDeque},
11    fmt,
12    marker::PhantomData,
13    sync::{
14        Arc, Mutex, OnceLock,
15        atomic::{AtomicUsize, Ordering},
16    },
17    thread,
18};
19
20#[cfg(test)]
21use std::sync::mpsc;
22
23use crate::{
24    actor::{Actor, ActorProcessingErr, ActorRef},
25    stream::{OverflowStrategy, StreamError, StreamResult},
26};
27
28type DatumValue = Box<dyn DatumElement>;
29type StageMapFn = dyn Fn(DatumValue) -> StreamResult<DatumValue> + Send + Sync;
30type StageTypedMapFn = dyn Any + Send + Sync;
31type StageZipFn = dyn Fn(DatumValue, DatumValue) -> StreamResult<DatumValue> + Send + Sync;
32type StageUnzipFn = dyn Fn(DatumValue) -> (DatumValue, DatumValue) + Send + Sync;
33/// Opaque container for a typed split function `Arc<dyn Fn(In) -> (Out0, Out1)>`.
34/// Down-cast at plan time — same pattern as [`StageTypedMapFn`].
35type StageTypedUnzipFn = dyn Any + Send + Sync;
36type StageCompareFn = dyn Fn(&DatumValue, &DatumValue) -> std::cmp::Ordering + Send + Sync;
37type StageSequenceFn = dyn Fn(&DatumValue) -> u64 + Send + Sync;
38/// Opaque container for a typed sequence-extractor `Arc<dyn Fn(&T) -> u64>`.
39/// Down-cast at plan time.
40type StageTypedSequenceFn = dyn Any + Send + Sync;
41type StageSnapshotFn = dyn Fn(&[&DatumValue]) -> DatumValue + Send + Sync;
42/// Opaque container for a typed snapshot builder `Arc<dyn Fn(&[Option<T>]) -> Vec<T>>`.
43/// Down-cast at plan time.
44type StageTypedSnapshotFn = dyn Any + Send + Sync;
45type StagePartitionFn = dyn Fn(&DatumValue) -> usize + Send + Sync;
46/// Opaque container for a typed partitioner `Arc<dyn Fn(&T) -> usize>`.
47/// Down-cast at plan time.
48type StageTypedPartitionFn = dyn Any + Send + Sync;
49
50#[derive(Clone)]
51struct StageMapFns {
52    erased: Arc<StageMapFn>,
53    typed: Arc<StageTypedMapFn>,
54}
55
56/// Optional typed metadata attached to an `Opaque` stage so the typed **cyclic**
57/// executor can run it without `DatumValue` boxing or `GraphStageLogic`.
58///
59/// Built-in `Buffer`/`TakeWhile` stages surface this through
60/// [`GraphStage::typed_cyclic_op`]; every other stage returns `None` and the
61/// cyclic typed planner falls back to the erased queued interpreter. The erased
62/// executor never reads this — it keeps running the stage's `GraphStageLogic`.
63#[derive(Clone)]
64pub(super) enum TypedCyclicOp {
65    /// A `Buffer` stage. In the synchronous fused executor a buffer drains one
66    /// element per step and therefore never overflows — it is a pass-through.
67    BufferPassthrough,
68    /// A `TakeWhile` stage carrying its typed predicate
69    /// `Arc<dyn Fn(&T) -> bool + Send + Sync>` (down-cast at plan time, same
70    /// pattern as [`StageTypedMapFn`]).
71    TakeWhile(Arc<dyn Any + Send + Sync>),
72}
73
74impl fmt::Debug for TypedCyclicOp {
75    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76        match self {
77            Self::BufferPassthrough => f.write_str("BufferPassthrough"),
78            Self::TakeWhile(_) => f.write_str("TakeWhile(..)"),
79        }
80    }
81}
82
83pub(crate) trait DatumElement: Any + Send {
84    fn clone_box(&self) -> DatumValue;
85    fn into_any(self: Box<Self>) -> Box<dyn Any + Send>;
86    fn as_any_ref(&self) -> &dyn Any;
87}
88
89impl<T> DatumElement for T
90where
91    T: Any + Clone + Send,
92{
93    fn clone_box(&self) -> DatumValue {
94        Box::new(self.clone())
95    }
96
97    fn into_any(self: Box<Self>) -> Box<dyn Any + Send> {
98        self
99    }
100
101    fn as_any_ref(&self) -> &dyn Any {
102        self
103    }
104}
105
106fn datum<T>(value: T) -> DatumValue
107where
108    T: Clone + Send + 'static,
109{
110    Box::new(value)
111}
112
113fn downcast_datum<T, S>(
114    value: DatumValue,
115    operation: &'static str,
116    port: impl FnOnce() -> S,
117) -> StreamResult<T>
118where
119    T: Send + 'static,
120    S: Into<String>,
121{
122    // `port` is only evaluated on the cold error path so the success path
123    // (the per-element fused hot path) allocates nothing for diagnostics. It
124    // returns `impl Into<String>` so callers can hand back a `&'static str`
125    // without an eager `to_owned()`.
126    value
127        .into_any()
128        .downcast::<T>()
129        .map(|value| *value)
130        .map_err(|_| StreamError::InvalidPortOperation {
131            operation,
132            port: port().into(),
133            reason: format!("element type did not match {}", type_name::<T>()),
134        })
135}
136
137static NEXT_PORT_ID: AtomicUsize = AtomicUsize::new(1);
138
139fn next_port_id() -> PortId {
140    PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed))
141}
142
143fn next_port_id_block(count: usize) -> PortId {
144    debug_assert!(count > 0);
145    PortId(NEXT_PORT_ID.fetch_add(count, Ordering::Relaxed))
146}
147
148/// A process-global unique identifier for a port, drawn from a single atomic
149/// counter. Stages allocate contiguous id blocks; ids are not graph-local and
150/// do not start at zero.
151#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
152pub struct PortId(usize);
153
154impl PortId {
155    #[must_use]
156    pub const fn as_usize(self) -> usize {
157        self.0
158    }
159
160    const fn offset(self, offset: usize) -> Self {
161        Self(self.0 + offset)
162    }
163}
164
165/// Whether a port is an inlet (accepts elements) or an outlet (emits them).
166#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
167pub enum PortKind {
168    Inlet,
169    Outlet,
170}
171
172macro_rules! shared_name {
173    ($fn_name:ident, $value:literal) => {
174        fn $fn_name() -> Arc<str> {
175            static NAME: OnceLock<Arc<str>> = OnceLock::new();
176            Arc::clone(NAME.get_or_init(|| Arc::from($value)))
177        }
178    };
179}
180
181shared_name!(identity_stage_name, "Identity");
182shared_name!(identity_inlet_name, "Identity.in");
183shared_name!(identity_outlet_name, "Identity.out");
184shared_name!(map_stage_name, "Map");
185shared_name!(map_inlet_name, "Map.in");
186shared_name!(map_outlet_name, "Map.out");
187shared_name!(broadcast_stage_name, "Broadcast");
188shared_name!(broadcast_inlet_name, "Broadcast.in");
189shared_name!(balance_stage_name, "Balance");
190shared_name!(balance_inlet_name, "Balance.in");
191shared_name!(merge_stage_name, "Merge");
192shared_name!(merge_outlet_name, "Merge.out");
193shared_name!(merge_preferred_stage_name, "MergePreferred");
194shared_name!(merge_preferred_preferred_name, "MergePreferred.preferred");
195shared_name!(merge_preferred_outlet_name, "MergePreferred.out");
196shared_name!(merge_prioritized_stage_name, "MergePrioritized");
197shared_name!(merge_prioritized_outlet_name, "MergePrioritized.out");
198shared_name!(concat_stage_name, "Concat");
199shared_name!(concat_outlet_name, "Concat.out");
200shared_name!(or_else_stage_name, "OrElse");
201shared_name!(or_else_primary_name, "OrElse.primary");
202shared_name!(or_else_secondary_name, "OrElse.secondary");
203shared_name!(or_else_outlet_name, "OrElse.out");
204shared_name!(interleave_stage_name, "Interleave");
205shared_name!(interleave_outlet_name, "Interleave.out");
206shared_name!(zip_stage_name, "Zip");
207shared_name!(zip_in0_name, "Zip.in0");
208shared_name!(zip_in1_name, "Zip.in1");
209shared_name!(zip_outlet_name, "Zip.out");
210shared_name!(async_boundary_stage_name, "AsyncBoundary");
211shared_name!(async_boundary_inlet_name, "AsyncBoundary.in");
212shared_name!(async_boundary_outlet_name, "AsyncBoundary.out");
213
214mod builder;
215mod executor;
216mod junctions;
217mod ports;
218mod shapes;
219mod stage;
220mod wire;
221
222#[cfg(test)]
223use self::executor::BoundaryCountExecutor;
224use self::{
225    builder::StageRecord,
226    stage::{StageKind, StageTimerMailbox, StageTimerRuntime},
227};
228
229pub use self::{
230    builder::{
231        AsyncBoundaryExecutionConfig, FusedExecutionConfig, FusedExecutionReport, FusedSegment,
232        FusedTerminalReport, Graph, GraphBlueprint, GraphBuilder, GraphDsl, ImportedGraph,
233        PartialGraph,
234    },
235    junctions::{
236        AsyncBoundary, Balance, Broadcast, Buffer, Concat, Identity, Interleave, MapStage, Merge,
237        MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted, OrElse,
238        Partition, TakeWhile, Unzip, UnzipWith, Zip,
239    },
240    ports::{AnyInlet, AnyOutlet, Inlet, Outlet, PortRef},
241    shapes::{
242        BidiShape, FanInShape, FanOutShape, FanOutShape2, FlowShape, MergePreferredShape,
243        PortAllocator, Shape, SinkShape, SourceShape, ZipShape,
244    },
245    stage::{
246        AsyncCallback, GraphStage, GraphStageLogic, InHandler, OutHandler, StageSpec, TimerHandler,
247    },
248    wire::{
249        AutoInletEndpoint, AutoOutletEndpoint, InletCursor, OutletCursor, WireDsl, WirePair,
250        WireSpec,
251    },
252};
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use crate::Attributes;
258
259    #[test]
260    fn graph_dsl_builds_broadcast_zip_flow() {
261        let graph = GraphDsl::try_create(|builder| {
262            let broadcast = builder.add(Broadcast::<i32>::new(2));
263            let zip = builder.add(Zip::<i32, i32>::new());
264
265            builder.connect(broadcast.outlet(0)?, zip.in0())?;
266            builder.connect(broadcast.outlet(1)?, zip.in1())?;
267
268            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
269        })
270        .unwrap();
271
272        assert_eq!(graph.stage_count(), 2);
273        assert_eq!(graph.edge_count(), 2);
274        assert_eq!(graph.shape().inlets().len(), 1);
275        assert_eq!(graph.shape().outlets().len(), 1);
276        assert_eq!(
277            graph.run_with_input([1, 2, 3]).unwrap(),
278            vec![(1, 1), (2, 2), (3, 3)]
279        );
280    }
281
282    #[test]
283    fn graph_dsl_zip_slots_follow_inlet_ids() {
284        let graph = GraphDsl::try_create(|builder| {
285            let broadcast = builder.add(Broadcast::<i32>::new(2));
286            let identity = builder.add(Identity::<i32>::new());
287            let zip = builder.add(Zip::<i32, i32>::new());
288
289            builder.connect(broadcast.outlet(0)?, identity.inlet())?;
290            builder.connect(identity.outlet(), zip.in1())?;
291            builder.connect(broadcast.outlet(1)?, zip.in0())?;
292
293            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
294        })
295        .unwrap();
296
297        assert_eq!(
298            graph.run_with_input([1, 2, 3]).unwrap(),
299            vec![(1, 1), (2, 2), (3, 3)]
300        );
301    }
302
303    #[test]
304    fn graph_dsl_zip_buffers_skewed_inlet_arrivals() {
305        // Regression test for per-inlet Zip buffering. This asymmetric topology
306        // delivers TWO elements to zip.in0 for every ONE that reaches zip.in1
307        // within a single fused cycle:
308        //
309        //   in -> Broadcast A(2)
310        //          A.out0 -> Broadcast B(2) -> Merge(2).out -> zip.in0   (two arrivals)
311        //          A.out1 ----------------------------------> zip.in1   (one arrival)
312        //
313        // The fused executor drains A.out0 depth-first, so both Merge emissions
314        // hit zip.in0 before A.out1 ever feeds zip.in1. With single-slot Zip
315        // state this errors ("second element before its pair"); bounded
316        // per-inlet buffering must instead queue the surplus and pair in FIFO
317        // arrival order without error. Neither Broadcast matches the fused
318        // Broadcast(2)->Zip fast path (A feeds two different stages; B feeds a
319        // Merge), so this exercises the general buffered Zip path.
320        let graph = GraphDsl::try_create(|builder| {
321            let fan = builder.add(Broadcast::<i32>::new(2));
322            let doubler = builder.add(Broadcast::<i32>::new(2));
323            let merge = builder.add(Merge::<i32>::new(2));
324            let zip = builder.add(Zip::<i32, i32>::new());
325
326            builder.connect(fan.outlet(0)?, doubler.inlet())?;
327            builder.connect(doubler.outlet(0)?, merge.inlet(0)?)?;
328            builder.connect(doubler.outlet(1)?, merge.inlet(1)?)?;
329            builder.connect(merge.outlet(), zip.in0())?;
330            builder.connect(fan.outlet(1)?, zip.in1())?;
331
332            Ok(FlowShape::new(fan.inlet(), zip.outlet()))
333        })
334        .unwrap();
335
336        // in0 accumulates two per item while in1 gets one, so FIFO pairing runs
337        // in0 one item ahead: 10 pairs with 10, then the leftover 10 pairs with
338        // 20. The point is that this succeeds (no error) and pairs in order.
339        assert_eq!(
340            graph.run_with_input([10, 20]).unwrap(),
341            vec![(10, 10), (10, 20)]
342        );
343    }
344
345    #[test]
346    fn graph_dsl_runs_merge_preferred_buffered_feedback_cycle() {
347        let graph = GraphDsl::try_create(|builder| {
348            let merge = builder.add(MergePreferred::<i32>::new(1));
349            let broadcast = builder.add(Broadcast::<i32>::new(2));
350            let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
351            let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
352            let decrement = builder.add(MapStage::new(|item: i32| item - 1));
353
354            builder.connect(merge.outlet(), broadcast.inlet())?;
355            builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
356            builder.connect(buffer.outlet(), positive.inlet())?;
357            builder.connect(positive.outlet(), decrement.inlet())?;
358            builder.connect(decrement.outlet(), merge.preferred())?;
359
360            Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
361        })
362        .unwrap();
363
364        assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
365    }
366
367    #[test]
368    fn graph_dsl_runs_buffered_merge_feedback_cycle() {
369        let graph = GraphDsl::try_create(|builder| {
370            let merge = builder.add(Merge::<i32>::new(2));
371            let broadcast = builder.add(Broadcast::<i32>::new(2));
372            let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
373            let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
374            let decrement = builder.add(MapStage::new(|item: i32| item - 1));
375
376            builder.connect(merge.outlet(), broadcast.inlet())?;
377            builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
378            builder.connect(buffer.outlet(), positive.inlet())?;
379            builder.connect(positive.outlet(), decrement.inlet())?;
380            builder.connect(decrement.outlet(), merge.inlet(1)?)?;
381
382            Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
383        })
384        .unwrap();
385
386        assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
387    }
388
389    #[test]
390    fn graph_dsl_unbuffered_merge_cycle_surfaces_event_limit() {
391        let graph = GraphDsl::try_create(|builder| {
392            let merge = builder.add(Merge::<i32>::new(2));
393            let broadcast = builder.add(Broadcast::<i32>::new(2));
394
395            builder.connect(merge.outlet(), broadcast.inlet())?;
396            builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
397
398            Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
399        })
400        .unwrap();
401
402        let result = graph.run_with_input_report([1], FusedExecutionConfig { event_limit: 256 });
403
404        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 256 }));
405    }
406
407    #[test]
408    fn partial_graph_can_be_imported_with_its_shape() {
409        let partial = GraphDsl::partial(|builder| {
410            let first = builder.add(Identity::<i32>::new());
411            let second = builder.add_named(Identity::<i32>::new(), "partial.tail");
412            builder.connect(first.outlet(), second.inlet())?;
413            Ok(FlowShape::new(first.inlet(), second.outlet()))
414        })
415        .named("partial.identity");
416
417        let graph = GraphDsl::try_create(|builder| {
418            let imported = builder.import(&partial)?;
419            let after = builder.add(Identity::<i32>::new());
420            builder.connect(imported.outlet(), after.inlet())?;
421            Ok(FlowShape::new(imported.inlet(), after.outlet()))
422        })
423        .unwrap()
424        .named("outer.graph");
425
426        assert_eq!(graph.run_with_input([1, 2, 3]).unwrap(), vec![1, 2, 3]);
427        assert_eq!(graph.attributes().name(), Some("outer.graph"));
428    }
429
430    #[test]
431    fn graph_attributes_follow_innermost_wins_order() {
432        let graph = GraphDsl::create(|builder| {
433            builder.add_with_attributes(
434                Identity::<i32>::new(),
435                Attributes::named("stage-outer").and(Attributes::named("stage-inner")),
436            )
437        })
438        .unwrap()
439        .add_attributes(Attributes::dispatcher("graph-outer"))
440        .add_attributes(Attributes::dispatcher("graph-inner"));
441
442        assert_eq!(graph.attributes().dispatcher_hint(), Some("graph-inner"));
443        assert_eq!(
444            graph.stages[0].spec.attributes().name(),
445            Some("stage-inner")
446        );
447    }
448
449    #[test]
450    fn graph_dsl_rejects_invalid_and_incomplete_wiring() {
451        let graph = GraphDsl::try_create(|builder| {
452            let first = builder.add(Identity::<i32>::new());
453            let second = builder.add(Identity::<i32>::new());
454            let third = builder.add(Identity::<i32>::new());
455
456            builder.connect(first.outlet(), second.inlet())?;
457            let duplicate = builder.connect(first.outlet(), third.inlet());
458            assert!(matches!(duplicate, Err(StreamError::GraphValidation(_))));
459
460            Ok(FlowShape::new(first.inlet(), second.outlet()))
461        });
462
463        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
464
465        let graph = GraphDsl::create(|builder| {
466            let broadcast = builder.add(Broadcast::<i32>::new(2));
467            SourceShape::new(broadcast.outlet(0).unwrap())
468        });
469        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
470    }
471
472    #[test]
473    fn graph_dsl_rejects_erased_type_mismatch() {
474        let graph = GraphDsl::try_create(|builder| {
475            let left = builder.add(Identity::<i32>::new());
476            let right = builder.add(Identity::<u64>::new());
477            let mismatch = builder.connect_any(left.outlet().erase(), right.inlet().erase());
478            assert!(matches!(mismatch, Err(StreamError::GraphValidation(_))));
479            Ok(FlowShape::new(left.inlet(), right.outlet()))
480        });
481
482        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
483    }
484
485    #[test]
486    fn graph_dsl_rejects_result_ports_with_spoofed_metadata() {
487        let graph = GraphDsl::create(|builder| {
488            let shape = builder.add(Identity::<i32>::new());
489            FlowShape::new(
490                Inlet::<u64>::with_id(shape.inlet().id(), "Identity.in"),
491                shape.outlet(),
492            )
493        });
494
495        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
496    }
497
498    #[test]
499    fn allocated_ports_do_not_collide_with_manual_ports() {
500        let manual = Inlet::<i32>::new("manual");
501        let graph = GraphDsl::try_create(|builder| {
502            let shape = builder.add(Identity::<i32>::new());
503            assert_ne!(manual.id(), shape.inlet().id());
504            Ok(shape)
505        });
506
507        assert!(graph.is_ok());
508    }
509
510    #[test]
511    fn identity_ports_keep_static_names_and_unique_ids() {
512        let graph = GraphDsl::try_create(|builder| {
513            let first = builder.add(Identity::<i32>::new());
514            let second = builder.add(Identity::<i32>::new());
515
516            assert_eq!(first.inlet().name(), "Identity.in");
517            assert_eq!(first.outlet().name(), "Identity.out");
518            assert_eq!(second.inlet().name(), "Identity.in");
519            assert_eq!(second.outlet().name(), "Identity.out");
520            assert_ne!(first.inlet().id(), first.outlet().id());
521            assert_ne!(first.outlet().id(), second.inlet().id());
522            assert_ne!(second.inlet().id(), second.outlet().id());
523
524            builder.connect(first.outlet(), second.inlet())?;
525            Ok(FlowShape::new(first.inlet(), second.outlet()))
526        });
527
528        assert!(graph.is_ok());
529    }
530
531    #[test]
532    fn graph_stage_logic_checks_port_operations() {
533        let shape = FlowShape::new(Inlet::<i32>::new("in"), Outlet::<i32>::new("out"));
534        let inlet = shape.inlet();
535        let outlet = shape.outlet();
536        let mut logic = GraphStageLogic::new(&shape);
537
538        assert!(matches!(
539            logic.grab(&inlet),
540            Err(StreamError::InvalidPortOperation { .. })
541        ));
542        logic.pull(&inlet).unwrap();
543        assert!(logic.has_been_pulled(&inlet));
544        assert!(matches!(
545            logic.pull(&inlet),
546            Err(StreamError::InvalidPortOperation { .. })
547        ));
548        logic.offer(&inlet, 41).unwrap();
549        assert!(!logic.has_been_pulled(&inlet));
550        assert_eq!(logic.grab(&inlet).unwrap(), 41);
551
552        assert!(matches!(
553            logic.push(&outlet, 1),
554            Err(StreamError::InvalidPortOperation { .. })
555        ));
556        logic.request(&outlet).unwrap();
557        assert!(logic.is_available(&outlet));
558        logic.push(&outlet, 42).unwrap();
559        assert!(!logic.is_available(&outlet));
560        logic.complete(&outlet).unwrap();
561        assert!(logic.is_closed(&outlet));
562        assert!(matches!(
563            logic.request(&outlet),
564            Err(StreamError::InvalidPortOperation { .. })
565        ));
566    }
567
568    #[test]
569    fn fused_execution_enforces_event_limit() {
570        let graph = GraphDsl::create(|builder| builder.add(Identity::<i32>::new())).unwrap();
571
572        let result = graph.run_with_input_report([1, 2], FusedExecutionConfig { event_limit: 1 });
573
574        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 1 }));
575    }
576
577    #[test]
578    fn async_boundary_splits_fused_segments() {
579        let graph = GraphDsl::try_create(|builder| {
580            let first = builder.add(Identity::<i32>::new());
581            let boundary = builder.add(AsyncBoundary::<i32>::new());
582            let second = builder.add(MapStage::new(|item: i32| item + 1));
583
584            builder.connect(first.outlet(), boundary.inlet())?;
585            builder.connect(boundary.outlet(), second.inlet())?;
586
587            Ok(FlowShape::new(first.inlet(), second.outlet()))
588        })
589        .unwrap();
590
591        assert_eq!(graph.segments().len(), 3);
592        let report = graph
593            .run_with_input_report([1, 2], FusedExecutionConfig::default())
594            .unwrap();
595        assert_eq!(report.output, vec![2, 3]);
596        assert_eq!(report.async_boundary_crossings, 2);
597    }
598
599    #[test]
600    fn async_boundary_count_path_uses_ractor_handoff_segments() {
601        let graph = GraphDsl::try_create(|builder| {
602            let first = builder.add(MapStage::new(|item: i32| item + 1));
603            let boundary = builder.add(AsyncBoundary::<i32>::new());
604            let second = builder.add(MapStage::new(|item: i32| item * 2));
605
606            builder.connect(first.outlet(), boundary.inlet())?;
607            builder.connect(boundary.outlet(), second.inlet())?;
608
609            Ok(FlowShape::new(first.inlet(), second.outlet()))
610        })
611        .unwrap();
612
613        let report = graph
614            .run_async_boundary_count_with_input_report(
615                [1, 2, 3],
616                AsyncBoundaryExecutionConfig {
617                    fused: FusedExecutionConfig::default(),
618                    buffer_size: 2,
619                },
620            )
621            .unwrap();
622
623        assert_eq!(report.result, 3);
624        assert_eq!(report.async_boundary_crossings, 3);
625        assert_eq!(report.events, 18);
626    }
627
628    #[test]
629    fn threaded_async_boundary_baseline_matches_ractor_count_path() {
630        let graph = GraphDsl::try_create(|builder| {
631            let first = builder.add(MapStage::new(|item: i32| item + 1));
632            let boundary = builder.add(AsyncBoundary::<i32>::new());
633            let second = builder.add(MapStage::new(|item: i32| item * 2));
634
635            builder.connect(first.outlet(), boundary.inlet())?;
636            builder.connect(boundary.outlet(), second.inlet())?;
637
638            Ok(FlowShape::new(first.inlet(), second.outlet()))
639        })
640        .unwrap();
641
642        let config = AsyncBoundaryExecutionConfig {
643            fused: FusedExecutionConfig::default(),
644            buffer_size: 2,
645        };
646        let ractor_report = graph
647            .run_async_boundary_count_with_input_report([1, 2, 3], config)
648            .unwrap();
649        let threaded_report = BoundaryCountExecutor::Threaded
650            .run_count(
651                [1, 2, 3],
652                graph.typed_linear_async_segments().unwrap(),
653                config,
654            )
655            .unwrap();
656
657        assert_eq!(threaded_report, ractor_report);
658    }
659
660    #[test]
661    fn ractor_async_boundary_rejects_zero_buffer_size() {
662        let graph = GraphDsl::try_create(|builder| {
663            let first = builder.add(MapStage::new(|item: i32| item + 1));
664            let boundary = builder.add(AsyncBoundary::<i32>::new());
665            let second = builder.add(MapStage::new(|item: i32| item * 2));
666
667            builder.connect(first.outlet(), boundary.inlet())?;
668            builder.connect(boundary.outlet(), second.inlet())?;
669
670            Ok(FlowShape::new(first.inlet(), second.outlet()))
671        })
672        .unwrap();
673
674        let result = BoundaryCountExecutor::Ractor.run_count(
675            [1, 2, 3],
676            graph.typed_linear_async_segments().unwrap(),
677            AsyncBoundaryExecutionConfig {
678                fused: FusedExecutionConfig::default(),
679                buffer_size: 0,
680            },
681        );
682
683        assert!(matches!(
684            result,
685            Err(StreamError::GraphValidation(message)) if message.contains("buffer_size")
686        ));
687    }
688
689    #[test]
690    fn ractor_async_boundary_streams_input_without_eager_collection() {
691        let graph = GraphDsl::try_create(|builder| {
692            let first = builder.add(MapStage::new(|item: i32| item + 1));
693            let boundary = builder.add(AsyncBoundary::<i32>::new());
694            let second = builder.add(MapStage::new(|item: i32| item * 2));
695
696            builder.connect(first.outlet(), boundary.inlet())?;
697            builder.connect(boundary.outlet(), second.inlet())?;
698
699            Ok(FlowShape::new(first.inlet(), second.outlet()))
700        })
701        .unwrap();
702
703        let result = BoundaryCountExecutor::Ractor.run_count(
704            std::iter::repeat(1_i32),
705            graph.typed_linear_async_segments().unwrap(),
706            AsyncBoundaryExecutionConfig {
707                fused: FusedExecutionConfig { event_limit: 4 },
708                buffer_size: 1,
709            },
710        );
711
712        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 4 }));
713    }
714
715    #[test]
716    fn ractor_async_boundary_runs_inside_existing_tokio_runtime() {
717        let graph = GraphDsl::try_create(|builder| {
718            let first = builder.add(MapStage::new(|item: i32| item + 1));
719            let boundary = builder.add(AsyncBoundary::<i32>::new());
720            let second = builder.add(MapStage::new(|item: i32| item * 2));
721
722            builder.connect(first.outlet(), boundary.inlet())?;
723            builder.connect(boundary.outlet(), second.inlet())?;
724
725            Ok(FlowShape::new(first.inlet(), second.outlet()))
726        })
727        .unwrap();
728
729        let runtime = tokio::runtime::Builder::new_current_thread()
730            .build()
731            .unwrap();
732        let report = runtime
733            .block_on(async {
734                BoundaryCountExecutor::Ractor.run_count(
735                    [1, 2, 3],
736                    graph.typed_linear_async_segments().unwrap(),
737                    AsyncBoundaryExecutionConfig {
738                        fused: FusedExecutionConfig::default(),
739                        buffer_size: 2,
740                    },
741                )
742            })
743            .unwrap();
744
745        assert_eq!(report.result, 3);
746        assert_eq!(report.async_boundary_crossings, 3);
747        assert_eq!(report.events, 18);
748    }
749
750    #[test]
751    fn balance_merge_round_robins_through_junctions() {
752        let graph = GraphDsl::try_create(|builder| {
753            let balance = builder.add(Balance::<i32>::new(2));
754            let merge = builder.add(Merge::<i32>::new(2));
755
756            builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
757            builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
758
759            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
760        })
761        .unwrap();
762
763        assert_eq!(graph.run_with_input(0..6).unwrap(), vec![0, 1, 2, 3, 4, 5]);
764    }
765
766    #[test]
767    fn prioritized_merge_uses_weighted_schedule() {
768        let graph =
769            GraphDsl::create(|builder| builder.add(MergePrioritized::<i32>::new(vec![2, 1])))
770                .unwrap();
771
772        assert_eq!(
773            graph
774                .run_fan_in(vec![vec![1, 2, 3, 4], vec![100, 101]])
775                .unwrap(),
776            vec![1, 2, 100, 3, 4, 101]
777        );
778    }
779
780    #[test]
781    fn fused_execution_supports_count_and_fold_sinks() {
782        let graph = GraphDsl::try_create(|builder| {
783            let first = builder.add(MapStage::new(|item: u64| item + 1));
784            let second = builder.add(MapStage::new(|item: u64| item * 2));
785
786            builder.connect(first.outlet(), second.inlet())?;
787
788            Ok(FlowShape::new(first.inlet(), second.outlet()))
789        })
790        .unwrap();
791
792        assert_eq!(graph.run_count_with_input(0..4).unwrap(), 4);
793        assert_eq!(
794            graph
795                .run_fold_with_input(0..4, 0, |acc, item| acc + item)
796                .unwrap(),
797            20
798        );
799    }
800
801    #[test]
802    fn typed_linear_fast_path_runs_same_type_chains() {
803        let graph = GraphDsl::try_create(|builder| {
804            let first = builder.add(MapStage::new(|item: u64| item + 1));
805            let second = builder.add(MapStage::new(|item: u64| item * 2));
806
807            builder.connect(first.outlet(), second.inlet())?;
808
809            Ok(FlowShape::new(first.inlet(), second.outlet()))
810        })
811        .unwrap();
812
813        let report = graph
814            .run_typed_linear_with_input_report([1, 2, 3], FusedExecutionConfig::default())
815            .unwrap();
816        assert_eq!(report.output, vec![4, 6, 8]);
817        assert_eq!(report.events, 12);
818
819        assert_eq!(graph.run_typed_linear_count_with_input(0..4).unwrap(), 4);
820        assert_eq!(
821            graph
822                .run_typed_linear_fold_with_input(0..4, 0, |acc, item| acc + item)
823                .unwrap(),
824            20
825        );
826    }
827
828    #[test]
829    fn typed_linear_fast_path_rejects_junction_graphs() {
830        let graph = GraphDsl::try_create(|builder| {
831            let balance = builder.add(Balance::<i32>::new(2));
832            let merge = builder.add(Merge::<i32>::new(2));
833
834            builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
835            builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
836
837            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
838        })
839        .unwrap();
840
841        assert!(matches!(
842            graph.run_typed_linear_count_with_input([1, 2, 3]),
843            Err(StreamError::GraphValidation(_))
844        ));
845    }
846
847    #[test]
848    fn merge_preferred_drains_preferred_before_secondaries() {
849        let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i32>::new(2))).unwrap();
850
851        assert_eq!(
852            graph
853                .run_merge_preferred(vec![1, 2, 3], vec![vec![100, 101], vec![200]])
854                .unwrap(),
855            vec![1, 2, 3, 100, 200, 101]
856        );
857    }
858
859    #[test]
860    fn merge_waits_for_all_inputs_to_complete() {
861        let graph = GraphDsl::create(|builder| builder.add(Merge::<i32>::new(2))).unwrap();
862
863        assert_eq!(
864            graph.run_fan_in(vec![vec![], vec![10, 20]]).unwrap(),
865            vec![10, 20]
866        );
867        assert_eq!(graph.run_fan_in(vec![vec![1], vec![]]).unwrap(), vec![1]);
868    }
869
870    #[test]
871    fn merge_sorted_drains_remaining_input_after_peer_completes() {
872        let graph = GraphDsl::try_create(|builder| {
873            let unzip = builder.add(Unzip::<i32, i32>::new());
874            let merge = builder.add(MergeSorted::<i32>::new());
875
876            builder.connect(unzip.out0(), merge.inlet(0)?)?;
877            builder.connect(unzip.out1(), merge.inlet(1)?)?;
878
879            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
880        })
881        .unwrap();
882
883        assert_eq!(
884            graph.run_with_input([(1, 2), (4, 3), (6, 5)]).unwrap(),
885            vec![1, 2, 3, 4, 5, 6]
886        );
887    }
888
889    #[test]
890    fn merge_sequence_reorders_adversarial_arrivals_by_sequence_number() {
891        let graph = GraphDsl::try_create(|builder| {
892            let unzip = builder.add(Unzip::<u64, u64>::new());
893            let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
894
895            builder.connect(unzip.out0(), merge.inlet(0)?)?;
896            builder.connect(unzip.out1(), merge.inlet(1)?)?;
897
898            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
899        })
900        .unwrap();
901
902        assert_eq!(
903            graph.run_with_input([(0, 1), (2, 3), (4, 5)]).unwrap(),
904            vec![0, 1, 2, 3, 4, 5]
905        );
906    }
907
908    #[test]
909    fn merge_latest_emits_with_last_seen_peer_and_honors_eager_complete() {
910        let graph = GraphDsl::try_create(|builder| {
911            let unzip = builder.add(Unzip::<i32, i32>::new());
912            let merge = builder.add(MergeLatest::<i32>::new(2, false));
913
914            builder.connect(unzip.out0(), merge.inlet(0)?)?;
915            builder.connect(unzip.out1(), merge.inlet(1)?)?;
916
917            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
918        })
919        .unwrap();
920
921        assert_eq!(
922            graph.run_with_input([(1, 10), (2, 11)]).unwrap(),
923            vec![vec![1, 10], vec![2, 10], vec![2, 11]]
924        );
925    }
926
927    #[test]
928    fn zip_completes_when_any_input_completes() {
929        let graph = GraphDsl::create(|builder| builder.add(Zip::<i32, i32>::new())).unwrap();
930
931        assert_eq!(
932            graph.run_zip(vec![1, 2, 3], vec![10]).unwrap(),
933            vec![(1, 10)]
934        );
935        assert_eq!(graph.run_zip(vec![1], vec![10, 20]).unwrap(), vec![(1, 10)]);
936        assert_eq!(
937            graph.run_zip(vec![], vec![10, 20]).unwrap(),
938            Vec::<(i32, i32)>::new()
939        );
940    }
941
942    #[test]
943    fn concat_drains_inputs_in_declared_order() {
944        let graph = GraphDsl::create(|builder| builder.add(Concat::<i32>::new(3))).unwrap();
945
946        assert_eq!(
947            graph
948                .run_concat(vec![vec![1, 2], vec![], vec![3, 4]])
949                .unwrap(),
950            vec![1, 2, 3, 4]
951        );
952    }
953
954    #[test]
955    fn or_else_switches_only_if_primary_is_empty() {
956        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
957
958        assert_eq!(
959            graph.run_or_else(vec![], vec![10, 20]).unwrap(),
960            vec![10, 20]
961        );
962        assert_eq!(
963            graph.run_or_else(vec![1, 2], vec![10, 20]).unwrap(),
964            vec![1, 2]
965        );
966    }
967
968    #[test]
969    fn or_else_secondary_first_dropped_when_primary_emits() {
970        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
971
972        assert_eq!(
973            graph
974                .run_or_else_secondary_first(vec![1, 2], vec![10, 20])
975                .unwrap(),
976            vec![1, 2]
977        );
978    }
979
980    #[test]
981    fn or_else_secondary_first_flushed_when_primary_empty() {
982        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
983
984        assert_eq!(
985            graph
986                .run_or_else_secondary_first(vec![], vec![10, 20])
987                .unwrap(),
988            vec![10, 20]
989        );
990    }
991
992    #[test]
993    fn or_else_secondary_closed_then_primary_empty_drains_buffer() {
994        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
995
996        assert_eq!(
997            graph
998                .run_or_else_secondary_closed_first(vec![10, 20])
999                .unwrap(),
1000            vec![10, 20]
1001        );
1002    }
1003
1004    #[test]
1005    fn interleave_cycles_segment_sized_chunks() {
1006        let graph = GraphDsl::create(|builder| builder.add(Interleave::<i32>::new(3, 2))).unwrap();
1007
1008        assert_eq!(
1009            graph
1010                .run_interleave(vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]], 2, false)
1011                .unwrap(),
1012            vec![1, 2, 10, 11, 20, 3, 12]
1013        );
1014    }
1015
1016    #[test]
1017    fn interleave_eager_close_stops_when_any_input_completes() {
1018        let graph = GraphDsl::create(|builder| {
1019            builder.add(Interleave::<i32>::new_with_eager_close(2, 1, true))
1020        })
1021        .unwrap();
1022
1023        assert_eq!(
1024            graph
1025                .run_interleave(vec![vec![1, 2], vec![]], 1, true)
1026                .unwrap(),
1027            Vec::<i32>::new()
1028        );
1029        assert_eq!(
1030            graph
1031                .run_interleave(vec![vec![1], vec![10, 11]], 1, true)
1032                .unwrap(),
1033            vec![1, 10]
1034        );
1035    }
1036
1037    #[test]
1038    fn partition_routes_only_live_outlets_after_peer_cancels() {
1039        let graph = GraphDsl::try_create(|builder| {
1040            let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
1041            let merge = builder.add(Merge::<i32>::new(2));
1042
1043            builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
1044            builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
1045
1046            Ok(FlowShape::new(partition.inlet(), merge.outlet()))
1047        })
1048        .unwrap();
1049
1050        assert_eq!(
1051            graph.run_with_input([0, 1, 2, 3]).unwrap(),
1052            vec![0, 1, 2, 3]
1053        );
1054    }
1055
1056    #[test]
1057    fn unzip_with_keeps_live_outlet_running_after_peer_finishes() {
1058        let graph = GraphDsl::try_create(|builder| {
1059            let unzip = builder.add(UnzipWith::<i32, i32, i32>::new(|item| (item, item * 10)));
1060            let zip = builder.add(Zip::<i32, i32>::new());
1061
1062            builder.connect(unzip.out0(), zip.in0())?;
1063            builder.connect(unzip.out1(), zip.in1())?;
1064
1065            Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
1066        })
1067        .unwrap();
1068
1069        assert_eq!(
1070            graph.run_with_input([1, 2, 3]).unwrap(),
1071            vec![(1, 10), (2, 20), (3, 30)]
1072        );
1073    }
1074
1075    // --- Regression tests pinning WP-17a review-comment fixes ---
1076
1077    // Fix 1a: UnzipFanInFastPath preserves target inlet order for MergeSorted.
1078    // out0 is wired to inlet(1) and out1 to inlet(0) — values must still be
1079    // sorted correctly regardless of swapped wiring.
1080    #[test]
1081    fn unzip_merge_sorted_swapped_inlets_still_sorted() {
1082        let graph_normal = GraphDsl::try_create(|builder| {
1083            let unzip = builder.add(Unzip::<i32, i32>::new());
1084            let merge = builder.add(MergeSorted::<i32>::new());
1085            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1086            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1087            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1088        })
1089        .unwrap();
1090
1091        // Swapped: out0 → inlet(1), out1 → inlet(0).
1092        let graph_swapped = GraphDsl::try_create(|builder| {
1093            let unzip = builder.add(Unzip::<i32, i32>::new());
1094            let merge = builder.add(MergeSorted::<i32>::new());
1095            builder.connect(unzip.out0(), merge.inlet(1)?)?;
1096            builder.connect(unzip.out1(), merge.inlet(0)?)?;
1097            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1098        })
1099        .unwrap();
1100
1101        let input = vec![(1, 2), (4, 3), (6, 5)];
1102        let expected = vec![1, 2, 3, 4, 5, 6];
1103        assert_eq!(
1104            graph_normal.run_with_input(input.clone()).unwrap(),
1105            expected
1106        );
1107        assert_eq!(graph_swapped.run_with_input(input).unwrap(), expected);
1108    }
1109
1110    // Fix 1b: UnzipFanInFastPath preserves target inlet order for MergeLatest.
1111    // With swapped wiring the snapshot elements must reflect which inlet each
1112    // value was routed to, producing the same result as the non-swapped graph.
1113    #[test]
1114    fn unzip_merge_latest_swapped_inlets_correct_snapshot_order() {
1115        // Normal wiring: out0 → inlet(0), out1 → inlet(1).
1116        // Input (1, 10): latest = [Some(1), Some(10)] → snapshot [1, 10].
1117        let graph_normal = GraphDsl::try_create(|builder| {
1118            let unzip = builder.add(Unzip::<i32, i32>::new());
1119            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1120            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1121            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1122            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1123        })
1124        .unwrap();
1125
1126        // Swapped: out0 → inlet(1), out1 → inlet(0).
1127        // Input (1, 10): out0=1 goes to inlet(1), out1=10 goes to inlet(0).
1128        // latest = [Some(10), Some(1)] → snapshot [10, 1].
1129        let graph_swapped = GraphDsl::try_create(|builder| {
1130            let unzip = builder.add(Unzip::<i32, i32>::new());
1131            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1132            builder.connect(unzip.out0(), merge.inlet(1)?)?;
1133            builder.connect(unzip.out1(), merge.inlet(0)?)?;
1134            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1135        })
1136        .unwrap();
1137
1138        assert_eq!(
1139            graph_normal.run_with_input([(1, 10)]).unwrap(),
1140            vec![vec![1, 10]]
1141        );
1142        // Swapped wiring routes values to opposite slots; snapshot order differs.
1143        assert_eq!(
1144            graph_swapped.run_with_input([(1, 10)]).unwrap(),
1145            vec![vec![10, 1]]
1146        );
1147    }
1148
1149    // Fix 2: MergeSequence fails on a sequence gap when all inputs complete.
1150    // The GraphStage logic errors if `pending` holds items but next_sequence
1151    // is not among them — the fused executor must mirror that behavior.
1152    #[test]
1153    fn merge_sequence_fails_on_gap_at_completion() {
1154        // Sequences 1 and 2 arrive (via the unzip split of pair (1, 2)), but
1155        // sequence 0 never does.  When the upstream completes both inlets,
1156        // next_sequence is still 0, there is no sequence 0 in pending → error.
1157        let graph = GraphDsl::try_create(|builder| {
1158            let unzip = builder.add(Unzip::<u64, u64>::new());
1159            let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
1160            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1161            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1162            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1163        })
1164        .unwrap();
1165
1166        let result = graph.run_with_input([(1u64, 2u64)]);
1167        assert!(
1168            matches!(result, Err(StreamError::Failed(ref msg)) if msg.contains("expected sequence")),
1169            "expected a sequence-gap error, got: {result:?}"
1170        );
1171    }
1172
1173    // Fix 3: MergeLatest honors eager_complete — stream ends as soon as any
1174    // inlet completes when eager_complete = true.
1175    #[test]
1176    fn merge_latest_eager_complete_closes_on_first_inlet_done() {
1177        // With eager_complete = true, the stage should complete when the first
1178        // input stream finishes (here the single-item upstream drains first).
1179        // The non-eager variant of the same graph would keep running until both
1180        // inlets complete.
1181        let graph_eager = GraphDsl::try_create(|builder| {
1182            let unzip = builder.add(Unzip::<i32, i32>::new());
1183            let merge = builder.add(MergeLatest::<i32>::new(2, true));
1184            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1185            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1186            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1187        })
1188        .unwrap();
1189
1190        let graph_non_eager = GraphDsl::try_create(|builder| {
1191            let unzip = builder.add(Unzip::<i32, i32>::new());
1192            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1193            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1194            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1195            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1196        })
1197        .unwrap();
1198
1199        // Single item: after this item both outlets close simultaneously (the
1200        // Unzip upstream completes), so both eager and non-eager complete after
1201        // emitting the first snapshot.
1202        let result_eager = graph_eager.run_with_input([(1i32, 10i32)]).unwrap();
1203        let result_non_eager = graph_non_eager.run_with_input([(1i32, 10i32)]).unwrap();
1204
1205        // Both should produce at least one snapshot.
1206        assert!(!result_eager.is_empty(), "eager graph produced no output");
1207        assert!(
1208            !result_non_eager.is_empty(),
1209            "non-eager graph produced no output"
1210        );
1211
1212        // Produce the same result for a balanced input — the eager flag only
1213        // changes behavior when ONE inlet finishes before the other.
1214        assert_eq!(result_eager, result_non_eager);
1215    }
1216}