Skip to main content

datum/graph/
mod.rs

1//! Runtime-checked graph, shape, port, and junction primitives.
2//!
3//! This module contains Datum's Akka-like graph layer: typed public ports,
4//! runtime-validated GraphDSL wiring, fused acyclic fast paths, and a queued
5//! fused interpreter for cyclic feedback topologies. Invalid wiring and illegal
6//! stage operations fail deterministically.
7
8use std::{
9    any::{Any, TypeId, type_name},
10    collections::{HashMap, HashSet, VecDeque},
11    fmt,
12    marker::PhantomData,
13    sync::{
14        Arc, Mutex, OnceLock,
15        atomic::{AtomicUsize, Ordering},
16    },
17    thread,
18};
19
20#[cfg(test)]
21use std::sync::mpsc;
22
23use crate::{
24    actor::{Actor, ActorProcessingErr, ActorRef},
25    stream::{OverflowStrategy, StreamError, StreamResult},
26};
27
28type DatumValue = Box<dyn DatumElement>;
29type StageMapFn = dyn Fn(DatumValue) -> StreamResult<DatumValue> + Send + Sync;
30type StageTypedMapFn = dyn Any + Send + Sync;
31type StageZipFn = dyn Fn(DatumValue, DatumValue) -> StreamResult<DatumValue> + Send + Sync;
32type StageUnzipFn = dyn Fn(DatumValue) -> (DatumValue, DatumValue) + Send + Sync;
33/// Opaque container for a typed split function `Arc<dyn Fn(In) -> (Out0, Out1)>`.
34/// Down-cast at plan time — same pattern as [`StageTypedMapFn`].
35type StageTypedUnzipFn = dyn Any + Send + Sync;
36type StageCompareFn = dyn Fn(&DatumValue, &DatumValue) -> std::cmp::Ordering + Send + Sync;
37type StageSequenceFn = dyn Fn(&DatumValue) -> u64 + Send + Sync;
38/// Opaque container for a typed sequence-extractor `Arc<dyn Fn(&T) -> u64>`.
39/// Down-cast at plan time.
40type StageTypedSequenceFn = dyn Any + Send + Sync;
41type StageSnapshotFn = dyn Fn(&[&DatumValue]) -> DatumValue + Send + Sync;
42/// Opaque container for a typed snapshot builder `Arc<dyn Fn(&[Option<T>]) -> Vec<T>>`.
43/// Down-cast at plan time.
44type StageTypedSnapshotFn = dyn Any + Send + Sync;
45type StagePartitionFn = dyn Fn(&DatumValue) -> usize + Send + Sync;
46/// Opaque container for a typed partitioner `Arc<dyn Fn(&T) -> usize>`.
47/// Down-cast at plan time.
48type StageTypedPartitionFn = dyn Any + Send + Sync;
49
50#[derive(Clone)]
51struct StageMapFns {
52    erased: Arc<StageMapFn>,
53    typed: Arc<StageTypedMapFn>,
54}
55
56/// Optional typed metadata attached to an `Opaque` stage so the typed **cyclic**
57/// executor can run it without `DatumValue` boxing or `GraphStageLogic`.
58///
59/// Built-in `Buffer`/`TakeWhile` stages surface this through
60/// [`GraphStage::typed_cyclic_op`]; every other stage returns `None` and the
61/// cyclic typed planner falls back to the erased queued interpreter. The erased
62/// executor never reads this — it keeps running the stage's `GraphStageLogic`.
63#[derive(Clone)]
64pub(super) enum TypedCyclicOp {
65    /// A `Buffer` stage. In the synchronous fused executor a buffer drains one
66    /// element per step and therefore never overflows — it is a pass-through.
67    BufferPassthrough,
68    /// A `TakeWhile` stage carrying its typed predicate
69    /// `Arc<dyn Fn(&T) -> bool + Send + Sync>` (down-cast at plan time, same
70    /// pattern as [`StageTypedMapFn`]).
71    TakeWhile(Arc<dyn Any + Send + Sync>),
72}
73
74impl fmt::Debug for TypedCyclicOp {
75    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76        match self {
77            Self::BufferPassthrough => f.write_str("BufferPassthrough"),
78            Self::TakeWhile(_) => f.write_str("TakeWhile(..)"),
79        }
80    }
81}
82
83pub(crate) trait DatumElement: Any + Send {
84    fn clone_box(&self) -> DatumValue;
85    fn into_any(self: Box<Self>) -> Box<dyn Any + Send>;
86    fn as_any_ref(&self) -> &dyn Any;
87}
88
89impl<T> DatumElement for T
90where
91    T: Any + Clone + Send,
92{
93    fn clone_box(&self) -> DatumValue {
94        Box::new(self.clone())
95    }
96
97    fn into_any(self: Box<Self>) -> Box<dyn Any + Send> {
98        self
99    }
100
101    fn as_any_ref(&self) -> &dyn Any {
102        self
103    }
104}
105
106fn datum<T>(value: T) -> DatumValue
107where
108    T: Clone + Send + 'static,
109{
110    Box::new(value)
111}
112
113fn downcast_datum<T, S>(
114    value: DatumValue,
115    operation: &'static str,
116    port: impl FnOnce() -> S,
117) -> StreamResult<T>
118where
119    T: Send + 'static,
120    S: Into<String>,
121{
122    // `port` is only evaluated on the cold error path so the success path
123    // (the per-element fused hot path) allocates nothing for diagnostics. It
124    // returns `impl Into<String>` so callers can hand back a `&'static str`
125    // without an eager `to_owned()`.
126    value
127        .into_any()
128        .downcast::<T>()
129        .map(|value| *value)
130        .map_err(|_| StreamError::InvalidPortOperation {
131            operation,
132            port: port().into(),
133            reason: format!("element type did not match {}", type_name::<T>()),
134        })
135}
136
137static NEXT_PORT_ID: AtomicUsize = AtomicUsize::new(1);
138
139fn next_port_id() -> PortId {
140    PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed))
141}
142
143fn next_port_id_block(count: usize) -> PortId {
144    debug_assert!(count > 0);
145    PortId(NEXT_PORT_ID.fetch_add(count, Ordering::Relaxed))
146}
147
148#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
149pub struct PortId(usize);
150
151impl PortId {
152    #[must_use]
153    pub const fn as_usize(self) -> usize {
154        self.0
155    }
156
157    const fn offset(self, offset: usize) -> Self {
158        Self(self.0 + offset)
159    }
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
163pub enum PortKind {
164    Inlet,
165    Outlet,
166}
167
168macro_rules! shared_name {
169    ($fn_name:ident, $value:literal) => {
170        fn $fn_name() -> Arc<str> {
171            static NAME: OnceLock<Arc<str>> = OnceLock::new();
172            Arc::clone(NAME.get_or_init(|| Arc::from($value)))
173        }
174    };
175}
176
177shared_name!(identity_stage_name, "Identity");
178shared_name!(identity_inlet_name, "Identity.in");
179shared_name!(identity_outlet_name, "Identity.out");
180shared_name!(map_stage_name, "Map");
181shared_name!(map_inlet_name, "Map.in");
182shared_name!(map_outlet_name, "Map.out");
183shared_name!(broadcast_stage_name, "Broadcast");
184shared_name!(broadcast_inlet_name, "Broadcast.in");
185shared_name!(balance_stage_name, "Balance");
186shared_name!(balance_inlet_name, "Balance.in");
187shared_name!(merge_stage_name, "Merge");
188shared_name!(merge_outlet_name, "Merge.out");
189shared_name!(merge_preferred_stage_name, "MergePreferred");
190shared_name!(merge_preferred_preferred_name, "MergePreferred.preferred");
191shared_name!(merge_preferred_outlet_name, "MergePreferred.out");
192shared_name!(merge_prioritized_stage_name, "MergePrioritized");
193shared_name!(merge_prioritized_outlet_name, "MergePrioritized.out");
194shared_name!(concat_stage_name, "Concat");
195shared_name!(concat_outlet_name, "Concat.out");
196shared_name!(or_else_stage_name, "OrElse");
197shared_name!(or_else_primary_name, "OrElse.primary");
198shared_name!(or_else_secondary_name, "OrElse.secondary");
199shared_name!(or_else_outlet_name, "OrElse.out");
200shared_name!(interleave_stage_name, "Interleave");
201shared_name!(interleave_outlet_name, "Interleave.out");
202shared_name!(zip_stage_name, "Zip");
203shared_name!(zip_in0_name, "Zip.in0");
204shared_name!(zip_in1_name, "Zip.in1");
205shared_name!(zip_outlet_name, "Zip.out");
206shared_name!(async_boundary_stage_name, "AsyncBoundary");
207shared_name!(async_boundary_inlet_name, "AsyncBoundary.in");
208shared_name!(async_boundary_outlet_name, "AsyncBoundary.out");
209
210mod builder;
211mod executor;
212mod junctions;
213mod ports;
214mod shapes;
215mod stage;
216
217#[cfg(test)]
218use self::executor::BoundaryCountExecutor;
219use self::{
220    builder::StageRecord,
221    stage::{StageKind, StageTimerMailbox, StageTimerRuntime},
222};
223
224pub use self::{
225    builder::{
226        AsyncBoundaryExecutionConfig, FusedExecutionConfig, FusedExecutionReport, FusedSegment,
227        FusedTerminalReport, Graph, GraphBlueprint, GraphBuilder, GraphDsl, ImportedGraph,
228        PartialGraph,
229    },
230    junctions::{
231        AsyncBoundary, Balance, Broadcast, Buffer, Concat, Identity, Interleave, MapStage, Merge,
232        MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted, OrElse,
233        Partition, TakeWhile, Unzip, UnzipWith, Zip,
234    },
235    ports::{AnyInlet, AnyOutlet, Inlet, Outlet, PortRef},
236    shapes::{
237        BidiShape, FanInShape, FanOutShape, FanOutShape2, FlowShape, MergePreferredShape,
238        PortAllocator, Shape, SinkShape, SourceShape, ZipShape,
239    },
240    stage::{
241        AsyncCallback, GraphStage, GraphStageLogic, InHandler, OutHandler, StageSpec, TimerHandler,
242    },
243};
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use crate::Attributes;
249
250    #[test]
251    fn graph_dsl_builds_broadcast_zip_flow() {
252        let graph = GraphDsl::try_create(|builder| {
253            let broadcast = builder.add(Broadcast::<i32>::new(2));
254            let zip = builder.add(Zip::<i32, i32>::new());
255
256            builder.connect(broadcast.outlet(0)?, zip.in0())?;
257            builder.connect(broadcast.outlet(1)?, zip.in1())?;
258
259            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
260        })
261        .unwrap();
262
263        assert_eq!(graph.stage_count(), 2);
264        assert_eq!(graph.edge_count(), 2);
265        assert_eq!(graph.shape().inlets().len(), 1);
266        assert_eq!(graph.shape().outlets().len(), 1);
267        assert_eq!(
268            graph.run_with_input([1, 2, 3]).unwrap(),
269            vec![(1, 1), (2, 2), (3, 3)]
270        );
271    }
272
273    #[test]
274    fn graph_dsl_zip_slots_follow_inlet_ids() {
275        let graph = GraphDsl::try_create(|builder| {
276            let broadcast = builder.add(Broadcast::<i32>::new(2));
277            let identity = builder.add(Identity::<i32>::new());
278            let zip = builder.add(Zip::<i32, i32>::new());
279
280            builder.connect(broadcast.outlet(0)?, identity.inlet())?;
281            builder.connect(identity.outlet(), zip.in1())?;
282            builder.connect(broadcast.outlet(1)?, zip.in0())?;
283
284            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
285        })
286        .unwrap();
287
288        assert_eq!(
289            graph.run_with_input([1, 2, 3]).unwrap(),
290            vec![(1, 1), (2, 2), (3, 3)]
291        );
292    }
293
294    #[test]
295    fn graph_dsl_zip_buffers_skewed_inlet_arrivals() {
296        // Regression test for per-inlet Zip buffering. This asymmetric topology
297        // delivers TWO elements to zip.in0 for every ONE that reaches zip.in1
298        // within a single fused cycle:
299        //
300        //   in -> Broadcast A(2)
301        //          A.out0 -> Broadcast B(2) -> Merge(2).out -> zip.in0   (two arrivals)
302        //          A.out1 ----------------------------------> zip.in1   (one arrival)
303        //
304        // The fused executor drains A.out0 depth-first, so both Merge emissions
305        // hit zip.in0 before A.out1 ever feeds zip.in1. With single-slot Zip
306        // state this errors ("second element before its pair"); bounded
307        // per-inlet buffering must instead queue the surplus and pair in FIFO
308        // arrival order without error. Neither Broadcast matches the fused
309        // Broadcast(2)->Zip fast path (A feeds two different stages; B feeds a
310        // Merge), so this exercises the general buffered Zip path.
311        let graph = GraphDsl::try_create(|builder| {
312            let fan = builder.add(Broadcast::<i32>::new(2));
313            let doubler = builder.add(Broadcast::<i32>::new(2));
314            let merge = builder.add(Merge::<i32>::new(2));
315            let zip = builder.add(Zip::<i32, i32>::new());
316
317            builder.connect(fan.outlet(0)?, doubler.inlet())?;
318            builder.connect(doubler.outlet(0)?, merge.inlet(0)?)?;
319            builder.connect(doubler.outlet(1)?, merge.inlet(1)?)?;
320            builder.connect(merge.outlet(), zip.in0())?;
321            builder.connect(fan.outlet(1)?, zip.in1())?;
322
323            Ok(FlowShape::new(fan.inlet(), zip.outlet()))
324        })
325        .unwrap();
326
327        // in0 accumulates two per item while in1 gets one, so FIFO pairing runs
328        // in0 one item ahead: 10 pairs with 10, then the leftover 10 pairs with
329        // 20. The point is that this succeeds (no error) and pairs in order.
330        assert_eq!(
331            graph.run_with_input([10, 20]).unwrap(),
332            vec![(10, 10), (10, 20)]
333        );
334    }
335
336    #[test]
337    fn graph_dsl_runs_merge_preferred_buffered_feedback_cycle() {
338        let graph = GraphDsl::try_create(|builder| {
339            let merge = builder.add(MergePreferred::<i32>::new(1));
340            let broadcast = builder.add(Broadcast::<i32>::new(2));
341            let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
342            let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
343            let decrement = builder.add(MapStage::new(|item: i32| item - 1));
344
345            builder.connect(merge.outlet(), broadcast.inlet())?;
346            builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
347            builder.connect(buffer.outlet(), positive.inlet())?;
348            builder.connect(positive.outlet(), decrement.inlet())?;
349            builder.connect(decrement.outlet(), merge.preferred())?;
350
351            Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
352        })
353        .unwrap();
354
355        assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
356    }
357
358    #[test]
359    fn graph_dsl_runs_buffered_merge_feedback_cycle() {
360        let graph = GraphDsl::try_create(|builder| {
361            let merge = builder.add(Merge::<i32>::new(2));
362            let broadcast = builder.add(Broadcast::<i32>::new(2));
363            let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
364            let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
365            let decrement = builder.add(MapStage::new(|item: i32| item - 1));
366
367            builder.connect(merge.outlet(), broadcast.inlet())?;
368            builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
369            builder.connect(buffer.outlet(), positive.inlet())?;
370            builder.connect(positive.outlet(), decrement.inlet())?;
371            builder.connect(decrement.outlet(), merge.inlet(1)?)?;
372
373            Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
374        })
375        .unwrap();
376
377        assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
378    }
379
380    #[test]
381    fn graph_dsl_unbuffered_merge_cycle_surfaces_event_limit() {
382        let graph = GraphDsl::try_create(|builder| {
383            let merge = builder.add(Merge::<i32>::new(2));
384            let broadcast = builder.add(Broadcast::<i32>::new(2));
385
386            builder.connect(merge.outlet(), broadcast.inlet())?;
387            builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
388
389            Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
390        })
391        .unwrap();
392
393        let result = graph.run_with_input_report([1], FusedExecutionConfig { event_limit: 256 });
394
395        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 256 }));
396    }
397
398    #[test]
399    fn partial_graph_can_be_imported_with_its_shape() {
400        let partial = GraphDsl::partial(|builder| {
401            let first = builder.add(Identity::<i32>::new());
402            let second = builder.add_named(Identity::<i32>::new(), "partial.tail");
403            builder.connect(first.outlet(), second.inlet())?;
404            Ok(FlowShape::new(first.inlet(), second.outlet()))
405        })
406        .named("partial.identity");
407
408        let graph = GraphDsl::try_create(|builder| {
409            let imported = builder.import(&partial)?;
410            let after = builder.add(Identity::<i32>::new());
411            builder.connect(imported.outlet(), after.inlet())?;
412            Ok(FlowShape::new(imported.inlet(), after.outlet()))
413        })
414        .unwrap()
415        .named("outer.graph");
416
417        assert_eq!(graph.run_with_input([1, 2, 3]).unwrap(), vec![1, 2, 3]);
418        assert_eq!(graph.attributes().name(), Some("outer.graph"));
419    }
420
421    #[test]
422    fn graph_attributes_follow_innermost_wins_order() {
423        let graph = GraphDsl::create(|builder| {
424            builder.add_with_attributes(
425                Identity::<i32>::new(),
426                Attributes::named("stage-outer").and(Attributes::named("stage-inner")),
427            )
428        })
429        .unwrap()
430        .add_attributes(Attributes::dispatcher("graph-outer"))
431        .add_attributes(Attributes::dispatcher("graph-inner"));
432
433        assert_eq!(graph.attributes().dispatcher_hint(), Some("graph-inner"));
434        assert_eq!(
435            graph.stages[0].spec.attributes().name(),
436            Some("stage-inner")
437        );
438    }
439
440    #[test]
441    fn graph_dsl_rejects_invalid_and_incomplete_wiring() {
442        let graph = GraphDsl::try_create(|builder| {
443            let first = builder.add(Identity::<i32>::new());
444            let second = builder.add(Identity::<i32>::new());
445            let third = builder.add(Identity::<i32>::new());
446
447            builder.connect(first.outlet(), second.inlet())?;
448            let duplicate = builder.connect(first.outlet(), third.inlet());
449            assert!(matches!(duplicate, Err(StreamError::GraphValidation(_))));
450
451            Ok(FlowShape::new(first.inlet(), second.outlet()))
452        });
453
454        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
455
456        let graph = GraphDsl::create(|builder| {
457            let broadcast = builder.add(Broadcast::<i32>::new(2));
458            SourceShape::new(broadcast.outlet(0).unwrap())
459        });
460        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
461    }
462
463    #[test]
464    fn graph_dsl_rejects_erased_type_mismatch() {
465        let graph = GraphDsl::try_create(|builder| {
466            let left = builder.add(Identity::<i32>::new());
467            let right = builder.add(Identity::<u64>::new());
468            let mismatch = builder.connect_any(left.outlet().erase(), right.inlet().erase());
469            assert!(matches!(mismatch, Err(StreamError::GraphValidation(_))));
470            Ok(FlowShape::new(left.inlet(), right.outlet()))
471        });
472
473        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
474    }
475
476    #[test]
477    fn graph_dsl_rejects_result_ports_with_spoofed_metadata() {
478        let graph = GraphDsl::create(|builder| {
479            let shape = builder.add(Identity::<i32>::new());
480            FlowShape::new(
481                Inlet::<u64>::with_id(shape.inlet().id(), "Identity.in"),
482                shape.outlet(),
483            )
484        });
485
486        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
487    }
488
489    #[test]
490    fn allocated_ports_do_not_collide_with_manual_ports() {
491        let manual = Inlet::<i32>::new("manual");
492        let graph = GraphDsl::try_create(|builder| {
493            let shape = builder.add(Identity::<i32>::new());
494            assert_ne!(manual.id(), shape.inlet().id());
495            Ok(shape)
496        });
497
498        assert!(graph.is_ok());
499    }
500
501    #[test]
502    fn identity_ports_keep_static_names_and_unique_ids() {
503        let graph = GraphDsl::try_create(|builder| {
504            let first = builder.add(Identity::<i32>::new());
505            let second = builder.add(Identity::<i32>::new());
506
507            assert_eq!(first.inlet().name(), "Identity.in");
508            assert_eq!(first.outlet().name(), "Identity.out");
509            assert_eq!(second.inlet().name(), "Identity.in");
510            assert_eq!(second.outlet().name(), "Identity.out");
511            assert_ne!(first.inlet().id(), first.outlet().id());
512            assert_ne!(first.outlet().id(), second.inlet().id());
513            assert_ne!(second.inlet().id(), second.outlet().id());
514
515            builder.connect(first.outlet(), second.inlet())?;
516            Ok(FlowShape::new(first.inlet(), second.outlet()))
517        });
518
519        assert!(graph.is_ok());
520    }
521
522    #[test]
523    fn graph_stage_logic_checks_port_operations() {
524        let shape = FlowShape::new(Inlet::<i32>::new("in"), Outlet::<i32>::new("out"));
525        let inlet = shape.inlet();
526        let outlet = shape.outlet();
527        let mut logic = GraphStageLogic::new(&shape);
528
529        assert!(matches!(
530            logic.grab(&inlet),
531            Err(StreamError::InvalidPortOperation { .. })
532        ));
533        logic.pull(&inlet).unwrap();
534        assert!(logic.has_been_pulled(&inlet));
535        assert!(matches!(
536            logic.pull(&inlet),
537            Err(StreamError::InvalidPortOperation { .. })
538        ));
539        logic.offer(&inlet, 41).unwrap();
540        assert!(!logic.has_been_pulled(&inlet));
541        assert_eq!(logic.grab(&inlet).unwrap(), 41);
542
543        assert!(matches!(
544            logic.push(&outlet, 1),
545            Err(StreamError::InvalidPortOperation { .. })
546        ));
547        logic.request(&outlet).unwrap();
548        assert!(logic.is_available(&outlet));
549        logic.push(&outlet, 42).unwrap();
550        assert!(!logic.is_available(&outlet));
551        logic.complete(&outlet).unwrap();
552        assert!(logic.is_closed(&outlet));
553        assert!(matches!(
554            logic.request(&outlet),
555            Err(StreamError::InvalidPortOperation { .. })
556        ));
557    }
558
559    #[test]
560    fn fused_execution_enforces_event_limit() {
561        let graph = GraphDsl::create(|builder| builder.add(Identity::<i32>::new())).unwrap();
562
563        let result = graph.run_with_input_report([1, 2], FusedExecutionConfig { event_limit: 1 });
564
565        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 1 }));
566    }
567
568    #[test]
569    fn async_boundary_splits_fused_segments() {
570        let graph = GraphDsl::try_create(|builder| {
571            let first = builder.add(Identity::<i32>::new());
572            let boundary = builder.add(AsyncBoundary::<i32>::new());
573            let second = builder.add(MapStage::new(|item: i32| item + 1));
574
575            builder.connect(first.outlet(), boundary.inlet())?;
576            builder.connect(boundary.outlet(), second.inlet())?;
577
578            Ok(FlowShape::new(first.inlet(), second.outlet()))
579        })
580        .unwrap();
581
582        assert_eq!(graph.segments().len(), 3);
583        let report = graph
584            .run_with_input_report([1, 2], FusedExecutionConfig::default())
585            .unwrap();
586        assert_eq!(report.output, vec![2, 3]);
587        assert_eq!(report.async_boundary_crossings, 2);
588    }
589
590    #[test]
591    fn async_boundary_count_path_uses_ractor_handoff_segments() {
592        let graph = GraphDsl::try_create(|builder| {
593            let first = builder.add(MapStage::new(|item: i32| item + 1));
594            let boundary = builder.add(AsyncBoundary::<i32>::new());
595            let second = builder.add(MapStage::new(|item: i32| item * 2));
596
597            builder.connect(first.outlet(), boundary.inlet())?;
598            builder.connect(boundary.outlet(), second.inlet())?;
599
600            Ok(FlowShape::new(first.inlet(), second.outlet()))
601        })
602        .unwrap();
603
604        let report = graph
605            .run_async_boundary_count_with_input_report(
606                [1, 2, 3],
607                AsyncBoundaryExecutionConfig {
608                    fused: FusedExecutionConfig::default(),
609                    buffer_size: 2,
610                },
611            )
612            .unwrap();
613
614        assert_eq!(report.result, 3);
615        assert_eq!(report.async_boundary_crossings, 3);
616        assert_eq!(report.events, 18);
617    }
618
619    #[test]
620    fn threaded_async_boundary_baseline_matches_ractor_count_path() {
621        let graph = GraphDsl::try_create(|builder| {
622            let first = builder.add(MapStage::new(|item: i32| item + 1));
623            let boundary = builder.add(AsyncBoundary::<i32>::new());
624            let second = builder.add(MapStage::new(|item: i32| item * 2));
625
626            builder.connect(first.outlet(), boundary.inlet())?;
627            builder.connect(boundary.outlet(), second.inlet())?;
628
629            Ok(FlowShape::new(first.inlet(), second.outlet()))
630        })
631        .unwrap();
632
633        let config = AsyncBoundaryExecutionConfig {
634            fused: FusedExecutionConfig::default(),
635            buffer_size: 2,
636        };
637        let ractor_report = graph
638            .run_async_boundary_count_with_input_report([1, 2, 3], config)
639            .unwrap();
640        let threaded_report = BoundaryCountExecutor::Threaded
641            .run_count(
642                [1, 2, 3],
643                graph.typed_linear_async_segments().unwrap(),
644                config,
645            )
646            .unwrap();
647
648        assert_eq!(threaded_report, ractor_report);
649    }
650
651    #[test]
652    fn ractor_async_boundary_rejects_zero_buffer_size() {
653        let graph = GraphDsl::try_create(|builder| {
654            let first = builder.add(MapStage::new(|item: i32| item + 1));
655            let boundary = builder.add(AsyncBoundary::<i32>::new());
656            let second = builder.add(MapStage::new(|item: i32| item * 2));
657
658            builder.connect(first.outlet(), boundary.inlet())?;
659            builder.connect(boundary.outlet(), second.inlet())?;
660
661            Ok(FlowShape::new(first.inlet(), second.outlet()))
662        })
663        .unwrap();
664
665        let result = BoundaryCountExecutor::Ractor.run_count(
666            [1, 2, 3],
667            graph.typed_linear_async_segments().unwrap(),
668            AsyncBoundaryExecutionConfig {
669                fused: FusedExecutionConfig::default(),
670                buffer_size: 0,
671            },
672        );
673
674        assert!(matches!(
675            result,
676            Err(StreamError::GraphValidation(message)) if message.contains("buffer_size")
677        ));
678    }
679
680    #[test]
681    fn ractor_async_boundary_streams_input_without_eager_collection() {
682        let graph = GraphDsl::try_create(|builder| {
683            let first = builder.add(MapStage::new(|item: i32| item + 1));
684            let boundary = builder.add(AsyncBoundary::<i32>::new());
685            let second = builder.add(MapStage::new(|item: i32| item * 2));
686
687            builder.connect(first.outlet(), boundary.inlet())?;
688            builder.connect(boundary.outlet(), second.inlet())?;
689
690            Ok(FlowShape::new(first.inlet(), second.outlet()))
691        })
692        .unwrap();
693
694        let result = BoundaryCountExecutor::Ractor.run_count(
695            std::iter::repeat(1_i32),
696            graph.typed_linear_async_segments().unwrap(),
697            AsyncBoundaryExecutionConfig {
698                fused: FusedExecutionConfig { event_limit: 4 },
699                buffer_size: 1,
700            },
701        );
702
703        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 4 }));
704    }
705
706    #[test]
707    fn ractor_async_boundary_runs_inside_existing_tokio_runtime() {
708        let graph = GraphDsl::try_create(|builder| {
709            let first = builder.add(MapStage::new(|item: i32| item + 1));
710            let boundary = builder.add(AsyncBoundary::<i32>::new());
711            let second = builder.add(MapStage::new(|item: i32| item * 2));
712
713            builder.connect(first.outlet(), boundary.inlet())?;
714            builder.connect(boundary.outlet(), second.inlet())?;
715
716            Ok(FlowShape::new(first.inlet(), second.outlet()))
717        })
718        .unwrap();
719
720        let runtime = tokio::runtime::Builder::new_current_thread()
721            .build()
722            .unwrap();
723        let report = runtime
724            .block_on(async {
725                BoundaryCountExecutor::Ractor.run_count(
726                    [1, 2, 3],
727                    graph.typed_linear_async_segments().unwrap(),
728                    AsyncBoundaryExecutionConfig {
729                        fused: FusedExecutionConfig::default(),
730                        buffer_size: 2,
731                    },
732                )
733            })
734            .unwrap();
735
736        assert_eq!(report.result, 3);
737        assert_eq!(report.async_boundary_crossings, 3);
738        assert_eq!(report.events, 18);
739    }
740
741    #[test]
742    fn balance_merge_round_robins_through_junctions() {
743        let graph = GraphDsl::try_create(|builder| {
744            let balance = builder.add(Balance::<i32>::new(2));
745            let merge = builder.add(Merge::<i32>::new(2));
746
747            builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
748            builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
749
750            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
751        })
752        .unwrap();
753
754        assert_eq!(graph.run_with_input(0..6).unwrap(), vec![0, 1, 2, 3, 4, 5]);
755    }
756
757    #[test]
758    fn prioritized_merge_uses_weighted_schedule() {
759        let graph =
760            GraphDsl::create(|builder| builder.add(MergePrioritized::<i32>::new(vec![2, 1])))
761                .unwrap();
762
763        assert_eq!(
764            graph
765                .run_fan_in(vec![vec![1, 2, 3, 4], vec![100, 101]])
766                .unwrap(),
767            vec![1, 2, 100, 3, 4, 101]
768        );
769    }
770
771    #[test]
772    fn fused_execution_supports_count_and_fold_sinks() {
773        let graph = GraphDsl::try_create(|builder| {
774            let first = builder.add(MapStage::new(|item: u64| item + 1));
775            let second = builder.add(MapStage::new(|item: u64| item * 2));
776
777            builder.connect(first.outlet(), second.inlet())?;
778
779            Ok(FlowShape::new(first.inlet(), second.outlet()))
780        })
781        .unwrap();
782
783        assert_eq!(graph.run_count_with_input(0..4).unwrap(), 4);
784        assert_eq!(
785            graph
786                .run_fold_with_input(0..4, 0, |acc, item| acc + item)
787                .unwrap(),
788            20
789        );
790    }
791
792    #[test]
793    fn typed_linear_fast_path_runs_same_type_chains() {
794        let graph = GraphDsl::try_create(|builder| {
795            let first = builder.add(MapStage::new(|item: u64| item + 1));
796            let second = builder.add(MapStage::new(|item: u64| item * 2));
797
798            builder.connect(first.outlet(), second.inlet())?;
799
800            Ok(FlowShape::new(first.inlet(), second.outlet()))
801        })
802        .unwrap();
803
804        let report = graph
805            .run_typed_linear_with_input_report([1, 2, 3], FusedExecutionConfig::default())
806            .unwrap();
807        assert_eq!(report.output, vec![4, 6, 8]);
808        assert_eq!(report.events, 12);
809
810        assert_eq!(graph.run_typed_linear_count_with_input(0..4).unwrap(), 4);
811        assert_eq!(
812            graph
813                .run_typed_linear_fold_with_input(0..4, 0, |acc, item| acc + item)
814                .unwrap(),
815            20
816        );
817    }
818
819    #[test]
820    fn typed_linear_fast_path_rejects_junction_graphs() {
821        let graph = GraphDsl::try_create(|builder| {
822            let balance = builder.add(Balance::<i32>::new(2));
823            let merge = builder.add(Merge::<i32>::new(2));
824
825            builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
826            builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
827
828            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
829        })
830        .unwrap();
831
832        assert!(matches!(
833            graph.run_typed_linear_count_with_input([1, 2, 3]),
834            Err(StreamError::GraphValidation(_))
835        ));
836    }
837
838    #[test]
839    fn merge_preferred_drains_preferred_before_secondaries() {
840        let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i32>::new(2))).unwrap();
841
842        assert_eq!(
843            graph
844                .run_merge_preferred(vec![1, 2, 3], vec![vec![100, 101], vec![200]])
845                .unwrap(),
846            vec![1, 2, 3, 100, 200, 101]
847        );
848    }
849
850    #[test]
851    fn merge_waits_for_all_inputs_to_complete() {
852        let graph = GraphDsl::create(|builder| builder.add(Merge::<i32>::new(2))).unwrap();
853
854        assert_eq!(
855            graph.run_fan_in(vec![vec![], vec![10, 20]]).unwrap(),
856            vec![10, 20]
857        );
858        assert_eq!(graph.run_fan_in(vec![vec![1], vec![]]).unwrap(), vec![1]);
859    }
860
861    #[test]
862    fn merge_sorted_drains_remaining_input_after_peer_completes() {
863        let graph = GraphDsl::try_create(|builder| {
864            let unzip = builder.add(Unzip::<i32, i32>::new());
865            let merge = builder.add(MergeSorted::<i32>::new());
866
867            builder.connect(unzip.out0(), merge.inlet(0)?)?;
868            builder.connect(unzip.out1(), merge.inlet(1)?)?;
869
870            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
871        })
872        .unwrap();
873
874        assert_eq!(
875            graph.run_with_input([(1, 2), (4, 3), (6, 5)]).unwrap(),
876            vec![1, 2, 3, 4, 5, 6]
877        );
878    }
879
880    #[test]
881    fn merge_sequence_reorders_adversarial_arrivals_by_sequence_number() {
882        let graph = GraphDsl::try_create(|builder| {
883            let unzip = builder.add(Unzip::<u64, u64>::new());
884            let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
885
886            builder.connect(unzip.out0(), merge.inlet(0)?)?;
887            builder.connect(unzip.out1(), merge.inlet(1)?)?;
888
889            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
890        })
891        .unwrap();
892
893        assert_eq!(
894            graph.run_with_input([(0, 1), (2, 3), (4, 5)]).unwrap(),
895            vec![0, 1, 2, 3, 4, 5]
896        );
897    }
898
899    #[test]
900    fn merge_latest_emits_with_last_seen_peer_and_honors_eager_complete() {
901        let graph = GraphDsl::try_create(|builder| {
902            let unzip = builder.add(Unzip::<i32, i32>::new());
903            let merge = builder.add(MergeLatest::<i32>::new(2, false));
904
905            builder.connect(unzip.out0(), merge.inlet(0)?)?;
906            builder.connect(unzip.out1(), merge.inlet(1)?)?;
907
908            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
909        })
910        .unwrap();
911
912        assert_eq!(
913            graph.run_with_input([(1, 10), (2, 11)]).unwrap(),
914            vec![vec![1, 10], vec![2, 10], vec![2, 11]]
915        );
916    }
917
918    #[test]
919    fn zip_completes_when_any_input_completes() {
920        let graph = GraphDsl::create(|builder| builder.add(Zip::<i32, i32>::new())).unwrap();
921
922        assert_eq!(
923            graph.run_zip(vec![1, 2, 3], vec![10]).unwrap(),
924            vec![(1, 10)]
925        );
926        assert_eq!(graph.run_zip(vec![1], vec![10, 20]).unwrap(), vec![(1, 10)]);
927        assert_eq!(
928            graph.run_zip(vec![], vec![10, 20]).unwrap(),
929            Vec::<(i32, i32)>::new()
930        );
931    }
932
933    #[test]
934    fn concat_drains_inputs_in_declared_order() {
935        let graph = GraphDsl::create(|builder| builder.add(Concat::<i32>::new(3))).unwrap();
936
937        assert_eq!(
938            graph
939                .run_concat(vec![vec![1, 2], vec![], vec![3, 4]])
940                .unwrap(),
941            vec![1, 2, 3, 4]
942        );
943    }
944
945    #[test]
946    fn or_else_switches_only_if_primary_is_empty() {
947        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
948
949        assert_eq!(
950            graph.run_or_else(vec![], vec![10, 20]).unwrap(),
951            vec![10, 20]
952        );
953        assert_eq!(
954            graph.run_or_else(vec![1, 2], vec![10, 20]).unwrap(),
955            vec![1, 2]
956        );
957    }
958
959    #[test]
960    fn or_else_secondary_first_dropped_when_primary_emits() {
961        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
962
963        assert_eq!(
964            graph
965                .run_or_else_secondary_first(vec![1, 2], vec![10, 20])
966                .unwrap(),
967            vec![1, 2]
968        );
969    }
970
971    #[test]
972    fn or_else_secondary_first_flushed_when_primary_empty() {
973        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
974
975        assert_eq!(
976            graph
977                .run_or_else_secondary_first(vec![], vec![10, 20])
978                .unwrap(),
979            vec![10, 20]
980        );
981    }
982
983    #[test]
984    fn or_else_secondary_closed_then_primary_empty_drains_buffer() {
985        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
986
987        assert_eq!(
988            graph
989                .run_or_else_secondary_closed_first(vec![10, 20])
990                .unwrap(),
991            vec![10, 20]
992        );
993    }
994
995    #[test]
996    fn interleave_cycles_segment_sized_chunks() {
997        let graph = GraphDsl::create(|builder| builder.add(Interleave::<i32>::new(3, 2))).unwrap();
998
999        assert_eq!(
1000            graph
1001                .run_interleave(vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]], 2, false)
1002                .unwrap(),
1003            vec![1, 2, 10, 11, 20, 3, 12]
1004        );
1005    }
1006
1007    #[test]
1008    fn interleave_eager_close_stops_when_any_input_completes() {
1009        let graph = GraphDsl::create(|builder| {
1010            builder.add(Interleave::<i32>::new_with_eager_close(2, 1, true))
1011        })
1012        .unwrap();
1013
1014        assert_eq!(
1015            graph
1016                .run_interleave(vec![vec![1, 2], vec![]], 1, true)
1017                .unwrap(),
1018            Vec::<i32>::new()
1019        );
1020        assert_eq!(
1021            graph
1022                .run_interleave(vec![vec![1], vec![10, 11]], 1, true)
1023                .unwrap(),
1024            vec![1, 10]
1025        );
1026    }
1027
1028    #[test]
1029    fn partition_routes_only_live_outlets_after_peer_cancels() {
1030        let graph = GraphDsl::try_create(|builder| {
1031            let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
1032            let merge = builder.add(Merge::<i32>::new(2));
1033
1034            builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
1035            builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
1036
1037            Ok(FlowShape::new(partition.inlet(), merge.outlet()))
1038        })
1039        .unwrap();
1040
1041        assert_eq!(
1042            graph.run_with_input([0, 1, 2, 3]).unwrap(),
1043            vec![0, 1, 2, 3]
1044        );
1045    }
1046
1047    #[test]
1048    fn unzip_with_keeps_live_outlet_running_after_peer_finishes() {
1049        let graph = GraphDsl::try_create(|builder| {
1050            let unzip = builder.add(UnzipWith::<i32, i32, i32>::new(|item| (item, item * 10)));
1051            let zip = builder.add(Zip::<i32, i32>::new());
1052
1053            builder.connect(unzip.out0(), zip.in0())?;
1054            builder.connect(unzip.out1(), zip.in1())?;
1055
1056            Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
1057        })
1058        .unwrap();
1059
1060        assert_eq!(
1061            graph.run_with_input([1, 2, 3]).unwrap(),
1062            vec![(1, 10), (2, 20), (3, 30)]
1063        );
1064    }
1065
1066    // --- Regression tests pinning WP-17a review-comment fixes ---
1067
1068    // Fix 1a: UnzipFanInFastPath preserves target inlet order for MergeSorted.
1069    // out0 is wired to inlet(1) and out1 to inlet(0) — values must still be
1070    // sorted correctly regardless of swapped wiring.
1071    #[test]
1072    fn unzip_merge_sorted_swapped_inlets_still_sorted() {
1073        let graph_normal = GraphDsl::try_create(|builder| {
1074            let unzip = builder.add(Unzip::<i32, i32>::new());
1075            let merge = builder.add(MergeSorted::<i32>::new());
1076            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1077            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1078            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1079        })
1080        .unwrap();
1081
1082        // Swapped: out0 → inlet(1), out1 → inlet(0).
1083        let graph_swapped = GraphDsl::try_create(|builder| {
1084            let unzip = builder.add(Unzip::<i32, i32>::new());
1085            let merge = builder.add(MergeSorted::<i32>::new());
1086            builder.connect(unzip.out0(), merge.inlet(1)?)?;
1087            builder.connect(unzip.out1(), merge.inlet(0)?)?;
1088            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1089        })
1090        .unwrap();
1091
1092        let input = vec![(1, 2), (4, 3), (6, 5)];
1093        let expected = vec![1, 2, 3, 4, 5, 6];
1094        assert_eq!(
1095            graph_normal.run_with_input(input.clone()).unwrap(),
1096            expected
1097        );
1098        assert_eq!(graph_swapped.run_with_input(input).unwrap(), expected);
1099    }
1100
1101    // Fix 1b: UnzipFanInFastPath preserves target inlet order for MergeLatest.
1102    // With swapped wiring the snapshot elements must reflect which inlet each
1103    // value was routed to, producing the same result as the non-swapped graph.
1104    #[test]
1105    fn unzip_merge_latest_swapped_inlets_correct_snapshot_order() {
1106        // Normal wiring: out0 → inlet(0), out1 → inlet(1).
1107        // Input (1, 10): latest = [Some(1), Some(10)] → snapshot [1, 10].
1108        let graph_normal = GraphDsl::try_create(|builder| {
1109            let unzip = builder.add(Unzip::<i32, i32>::new());
1110            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1111            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1112            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1113            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1114        })
1115        .unwrap();
1116
1117        // Swapped: out0 → inlet(1), out1 → inlet(0).
1118        // Input (1, 10): out0=1 goes to inlet(1), out1=10 goes to inlet(0).
1119        // latest = [Some(10), Some(1)] → snapshot [10, 1].
1120        let graph_swapped = GraphDsl::try_create(|builder| {
1121            let unzip = builder.add(Unzip::<i32, i32>::new());
1122            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1123            builder.connect(unzip.out0(), merge.inlet(1)?)?;
1124            builder.connect(unzip.out1(), merge.inlet(0)?)?;
1125            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1126        })
1127        .unwrap();
1128
1129        assert_eq!(
1130            graph_normal.run_with_input([(1, 10)]).unwrap(),
1131            vec![vec![1, 10]]
1132        );
1133        // Swapped wiring routes values to opposite slots; snapshot order differs.
1134        assert_eq!(
1135            graph_swapped.run_with_input([(1, 10)]).unwrap(),
1136            vec![vec![10, 1]]
1137        );
1138    }
1139
1140    // Fix 2: MergeSequence fails on a sequence gap when all inputs complete.
1141    // The GraphStage logic errors if `pending` holds items but next_sequence
1142    // is not among them — the fused executor must mirror that behavior.
1143    #[test]
1144    fn merge_sequence_fails_on_gap_at_completion() {
1145        // Sequences 1 and 2 arrive (via the unzip split of pair (1, 2)), but
1146        // sequence 0 never does.  When the upstream completes both inlets,
1147        // next_sequence is still 0, there is no sequence 0 in pending → error.
1148        let graph = GraphDsl::try_create(|builder| {
1149            let unzip = builder.add(Unzip::<u64, u64>::new());
1150            let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
1151            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1152            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1153            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1154        })
1155        .unwrap();
1156
1157        let result = graph.run_with_input([(1u64, 2u64)]);
1158        assert!(
1159            matches!(result, Err(StreamError::Failed(ref msg)) if msg.contains("expected sequence")),
1160            "expected a sequence-gap error, got: {result:?}"
1161        );
1162    }
1163
1164    // Fix 3: MergeLatest honors eager_complete — stream ends as soon as any
1165    // inlet completes when eager_complete = true.
1166    #[test]
1167    fn merge_latest_eager_complete_closes_on_first_inlet_done() {
1168        // With eager_complete = true, the stage should complete when the first
1169        // input stream finishes (here the single-item upstream drains first).
1170        // The non-eager variant of the same graph would keep running until both
1171        // inlets complete.
1172        let graph_eager = GraphDsl::try_create(|builder| {
1173            let unzip = builder.add(Unzip::<i32, i32>::new());
1174            let merge = builder.add(MergeLatest::<i32>::new(2, true));
1175            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1176            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1177            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1178        })
1179        .unwrap();
1180
1181        let graph_non_eager = GraphDsl::try_create(|builder| {
1182            let unzip = builder.add(Unzip::<i32, i32>::new());
1183            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1184            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1185            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1186            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1187        })
1188        .unwrap();
1189
1190        // Single item: after this item both outlets close simultaneously (the
1191        // Unzip upstream completes), so both eager and non-eager complete after
1192        // emitting the first snapshot.
1193        let result_eager = graph_eager.run_with_input([(1i32, 10i32)]).unwrap();
1194        let result_non_eager = graph_non_eager.run_with_input([(1i32, 10i32)]).unwrap();
1195
1196        // Both should produce at least one snapshot.
1197        assert!(!result_eager.is_empty(), "eager graph produced no output");
1198        assert!(
1199            !result_non_eager.is_empty(),
1200            "non-eager graph produced no output"
1201        );
1202
1203        // Produce the same result for a balanced input — the eager flag only
1204        // changes behavior when ONE inlet finishes before the other.
1205        assert_eq!(result_eager, result_non_eager);
1206    }
1207}