Skip to main content

datum/graph/
mod.rs

1//! Runtime-checked graph, shape, port, and junction primitives.
2//!
3//! This module contains Datum's Akka-like graph layer: typed public ports,
4//! runtime-validated GraphDSL wiring, fused acyclic fast paths, and a queued
5//! fused interpreter for cyclic feedback topologies. Invalid wiring and illegal
6//! stage operations fail deterministically.
7
8use std::{
9    any::{Any, TypeId, type_name},
10    collections::{HashMap, HashSet, VecDeque},
11    fmt,
12    marker::PhantomData,
13    sync::{
14        Arc, Mutex, OnceLock,
15        atomic::{AtomicUsize, Ordering},
16    },
17    thread,
18};
19
20#[cfg(test)]
21use std::sync::mpsc;
22
23use crate::{
24    actor::{Actor, ActorProcessingErr, ActorRef},
25    stream::{OverflowStrategy, StreamError, StreamResult},
26};
27
28type DatumValue = Box<dyn DatumElement>;
29type StageMapFn = dyn Fn(DatumValue) -> StreamResult<DatumValue> + Send + Sync;
30type StageTypedMapFn = dyn Any + Send + Sync;
31type StageZipFn = dyn Fn(DatumValue, DatumValue) -> StreamResult<DatumValue> + Send + Sync;
32type StageUnzipFn = dyn Fn(DatumValue) -> (DatumValue, DatumValue) + Send + Sync;
33/// Opaque container for a typed split function `Arc<dyn Fn(In) -> (Out0, Out1)>`.
34/// Down-cast at plan time — same pattern as [`StageTypedMapFn`].
35type StageTypedUnzipFn = dyn Any + Send + Sync;
36type StageCompareFn = dyn Fn(&DatumValue, &DatumValue) -> std::cmp::Ordering + Send + Sync;
37type StageSequenceFn = dyn Fn(&DatumValue) -> u64 + Send + Sync;
38/// Opaque container for a typed sequence-extractor `Arc<dyn Fn(&T) -> u64>`.
39/// Down-cast at plan time.
40type StageTypedSequenceFn = dyn Any + Send + Sync;
41type StageSnapshotFn = dyn Fn(&[&DatumValue]) -> DatumValue + Send + Sync;
42/// Opaque container for a typed snapshot builder `Arc<dyn Fn(&[Option<T>]) -> Vec<T>>`.
43/// Down-cast at plan time.
44type StageTypedSnapshotFn = dyn Any + Send + Sync;
45type StagePartitionFn = dyn Fn(&DatumValue) -> usize + Send + Sync;
46/// Opaque container for a typed partitioner `Arc<dyn Fn(&T) -> usize>`.
47/// Down-cast at plan time.
48type StageTypedPartitionFn = dyn Any + Send + Sync;
49
50#[derive(Clone)]
51struct StageMapFns {
52    erased: Arc<StageMapFn>,
53    typed: Arc<StageTypedMapFn>,
54}
55
56/// Optional typed metadata attached to an `Opaque` stage so the typed **cyclic**
57/// executor can run it without `DatumValue` boxing or `GraphStageLogic`.
58///
59/// Built-in `Buffer`/`TakeWhile` stages surface this through
60/// [`GraphStage::typed_cyclic_op`]; every other stage returns `None` and the
61/// cyclic typed planner falls back to the erased queued interpreter. The erased
62/// executor never reads this — it keeps running the stage's `GraphStageLogic`.
63#[derive(Clone)]
64pub(super) enum TypedCyclicOp {
65    /// A `Buffer` stage. In the synchronous fused executor a buffer drains one
66    /// element per step and therefore never overflows — it is a pass-through.
67    BufferPassthrough,
68    /// A `TakeWhile` stage carrying its typed predicate
69    /// `Arc<dyn Fn(&T) -> bool + Send + Sync>` (down-cast at plan time, same
70    /// pattern as [`StageTypedMapFn`]).
71    TakeWhile(Arc<dyn Any + Send + Sync>),
72}
73
74impl fmt::Debug for TypedCyclicOp {
75    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76        match self {
77            Self::BufferPassthrough => f.write_str("BufferPassthrough"),
78            Self::TakeWhile(_) => f.write_str("TakeWhile(..)"),
79        }
80    }
81}
82
83pub(crate) trait DatumElement: Any + Send {
84    fn clone_box(&self) -> DatumValue;
85    fn into_any(self: Box<Self>) -> Box<dyn Any + Send>;
86    fn as_any_ref(&self) -> &dyn Any;
87}
88
89impl<T> DatumElement for T
90where
91    T: Any + Clone + Send,
92{
93    fn clone_box(&self) -> DatumValue {
94        Box::new(self.clone())
95    }
96
97    fn into_any(self: Box<Self>) -> Box<dyn Any + Send> {
98        self
99    }
100
101    fn as_any_ref(&self) -> &dyn Any {
102        self
103    }
104}
105
106fn datum<T>(value: T) -> DatumValue
107where
108    T: Clone + Send + 'static,
109{
110    Box::new(value)
111}
112
113fn downcast_datum<T, S>(
114    value: DatumValue,
115    operation: &'static str,
116    port: impl FnOnce() -> S,
117) -> StreamResult<T>
118where
119    T: Send + 'static,
120    S: Into<String>,
121{
122    // `port` is only evaluated on the cold error path so the success path
123    // (the per-element fused hot path) allocates nothing for diagnostics. It
124    // returns `impl Into<String>` so callers can hand back a `&'static str`
125    // without an eager `to_owned()`.
126    value
127        .into_any()
128        .downcast::<T>()
129        .map(|value| *value)
130        .map_err(|_| StreamError::InvalidPortOperation {
131            operation,
132            port: port().into(),
133            reason: format!("element type did not match {}", type_name::<T>()),
134        })
135}
136
137static NEXT_PORT_ID: AtomicUsize = AtomicUsize::new(1);
138
139fn next_port_id() -> PortId {
140    PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed))
141}
142
143fn next_port_id_block(count: usize) -> PortId {
144    debug_assert!(count > 0);
145    PortId(NEXT_PORT_ID.fetch_add(count, Ordering::Relaxed))
146}
147
148#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
149pub struct PortId(usize);
150
151impl PortId {
152    #[must_use]
153    pub const fn as_usize(self) -> usize {
154        self.0
155    }
156
157    const fn offset(self, offset: usize) -> Self {
158        Self(self.0 + offset)
159    }
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
163pub enum PortKind {
164    Inlet,
165    Outlet,
166}
167
168macro_rules! shared_name {
169    ($fn_name:ident, $value:literal) => {
170        fn $fn_name() -> Arc<str> {
171            static NAME: OnceLock<Arc<str>> = OnceLock::new();
172            Arc::clone(NAME.get_or_init(|| Arc::from($value)))
173        }
174    };
175}
176
177shared_name!(identity_stage_name, "Identity");
178shared_name!(identity_inlet_name, "Identity.in");
179shared_name!(identity_outlet_name, "Identity.out");
180shared_name!(map_stage_name, "Map");
181shared_name!(map_inlet_name, "Map.in");
182shared_name!(map_outlet_name, "Map.out");
183shared_name!(broadcast_stage_name, "Broadcast");
184shared_name!(broadcast_inlet_name, "Broadcast.in");
185shared_name!(balance_stage_name, "Balance");
186shared_name!(balance_inlet_name, "Balance.in");
187shared_name!(merge_stage_name, "Merge");
188shared_name!(merge_outlet_name, "Merge.out");
189shared_name!(merge_preferred_stage_name, "MergePreferred");
190shared_name!(merge_preferred_preferred_name, "MergePreferred.preferred");
191shared_name!(merge_preferred_outlet_name, "MergePreferred.out");
192shared_name!(merge_prioritized_stage_name, "MergePrioritized");
193shared_name!(merge_prioritized_outlet_name, "MergePrioritized.out");
194shared_name!(concat_stage_name, "Concat");
195shared_name!(concat_outlet_name, "Concat.out");
196shared_name!(or_else_stage_name, "OrElse");
197shared_name!(or_else_primary_name, "OrElse.primary");
198shared_name!(or_else_secondary_name, "OrElse.secondary");
199shared_name!(or_else_outlet_name, "OrElse.out");
200shared_name!(interleave_stage_name, "Interleave");
201shared_name!(interleave_outlet_name, "Interleave.out");
202shared_name!(zip_stage_name, "Zip");
203shared_name!(zip_in0_name, "Zip.in0");
204shared_name!(zip_in1_name, "Zip.in1");
205shared_name!(zip_outlet_name, "Zip.out");
206shared_name!(async_boundary_stage_name, "AsyncBoundary");
207shared_name!(async_boundary_inlet_name, "AsyncBoundary.in");
208shared_name!(async_boundary_outlet_name, "AsyncBoundary.out");
209
210mod builder;
211mod executor;
212mod junctions;
213mod ports;
214mod shapes;
215mod stage;
216
217#[cfg(test)]
218use self::executor::BoundaryCountExecutor;
219use self::{builder::StageRecord, shapes::PortAllocator, stage::StageKind};
220
221pub use self::{
222    builder::{
223        AsyncBoundaryExecutionConfig, FusedExecutionConfig, FusedExecutionReport, FusedSegment,
224        FusedTerminalReport, Graph, GraphBlueprint, GraphBuilder, GraphDsl, ImportedGraph,
225        PartialGraph,
226    },
227    junctions::{
228        AsyncBoundary, Balance, Broadcast, Buffer, Concat, Identity, Interleave, MapStage, Merge,
229        MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted, OrElse,
230        Partition, TakeWhile, Unzip, UnzipWith, Zip,
231    },
232    ports::{AnyInlet, AnyOutlet, Inlet, Outlet, PortRef},
233    shapes::{
234        BidiShape, FanInShape, FanOutShape, FanOutShape2, FlowShape, MergePreferredShape, Shape,
235        SinkShape, SourceShape, ZipShape,
236    },
237    stage::{AsyncCallback, GraphStage, GraphStageLogic, InHandler, OutHandler, StageSpec},
238};
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use crate::Attributes;
244
245    #[test]
246    fn graph_dsl_builds_broadcast_zip_flow() {
247        let graph = GraphDsl::try_create(|builder| {
248            let broadcast = builder.add(Broadcast::<i32>::new(2));
249            let zip = builder.add(Zip::<i32, i32>::new());
250
251            builder.connect(broadcast.outlet(0)?, zip.in0())?;
252            builder.connect(broadcast.outlet(1)?, zip.in1())?;
253
254            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
255        })
256        .unwrap();
257
258        assert_eq!(graph.stage_count(), 2);
259        assert_eq!(graph.edge_count(), 2);
260        assert_eq!(graph.shape().inlets().len(), 1);
261        assert_eq!(graph.shape().outlets().len(), 1);
262        assert_eq!(
263            graph.run_with_input([1, 2, 3]).unwrap(),
264            vec![(1, 1), (2, 2), (3, 3)]
265        );
266    }
267
268    #[test]
269    fn graph_dsl_zip_slots_follow_inlet_ids() {
270        let graph = GraphDsl::try_create(|builder| {
271            let broadcast = builder.add(Broadcast::<i32>::new(2));
272            let identity = builder.add(Identity::<i32>::new());
273            let zip = builder.add(Zip::<i32, i32>::new());
274
275            builder.connect(broadcast.outlet(0)?, identity.inlet())?;
276            builder.connect(identity.outlet(), zip.in1())?;
277            builder.connect(broadcast.outlet(1)?, zip.in0())?;
278
279            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
280        })
281        .unwrap();
282
283        assert_eq!(
284            graph.run_with_input([1, 2, 3]).unwrap(),
285            vec![(1, 1), (2, 2), (3, 3)]
286        );
287    }
288
289    #[test]
290    fn graph_dsl_zip_buffers_skewed_inlet_arrivals() {
291        // Regression test for per-inlet Zip buffering. This asymmetric topology
292        // delivers TWO elements to zip.in0 for every ONE that reaches zip.in1
293        // within a single fused cycle:
294        //
295        //   in -> Broadcast A(2)
296        //          A.out0 -> Broadcast B(2) -> Merge(2).out -> zip.in0   (two arrivals)
297        //          A.out1 ----------------------------------> zip.in1   (one arrival)
298        //
299        // The fused executor drains A.out0 depth-first, so both Merge emissions
300        // hit zip.in0 before A.out1 ever feeds zip.in1. With single-slot Zip
301        // state this errors ("second element before its pair"); bounded
302        // per-inlet buffering must instead queue the surplus and pair in FIFO
303        // arrival order without error. Neither Broadcast matches the fused
304        // Broadcast(2)->Zip fast path (A feeds two different stages; B feeds a
305        // Merge), so this exercises the general buffered Zip path.
306        let graph = GraphDsl::try_create(|builder| {
307            let fan = builder.add(Broadcast::<i32>::new(2));
308            let doubler = builder.add(Broadcast::<i32>::new(2));
309            let merge = builder.add(Merge::<i32>::new(2));
310            let zip = builder.add(Zip::<i32, i32>::new());
311
312            builder.connect(fan.outlet(0)?, doubler.inlet())?;
313            builder.connect(doubler.outlet(0)?, merge.inlet(0)?)?;
314            builder.connect(doubler.outlet(1)?, merge.inlet(1)?)?;
315            builder.connect(merge.outlet(), zip.in0())?;
316            builder.connect(fan.outlet(1)?, zip.in1())?;
317
318            Ok(FlowShape::new(fan.inlet(), zip.outlet()))
319        })
320        .unwrap();
321
322        // in0 accumulates two per item while in1 gets one, so FIFO pairing runs
323        // in0 one item ahead: 10 pairs with 10, then the leftover 10 pairs with
324        // 20. The point is that this succeeds (no error) and pairs in order.
325        assert_eq!(
326            graph.run_with_input([10, 20]).unwrap(),
327            vec![(10, 10), (10, 20)]
328        );
329    }
330
331    #[test]
332    fn graph_dsl_runs_merge_preferred_buffered_feedback_cycle() {
333        let graph = GraphDsl::try_create(|builder| {
334            let merge = builder.add(MergePreferred::<i32>::new(1));
335            let broadcast = builder.add(Broadcast::<i32>::new(2));
336            let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
337            let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
338            let decrement = builder.add(MapStage::new(|item: i32| item - 1));
339
340            builder.connect(merge.outlet(), broadcast.inlet())?;
341            builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
342            builder.connect(buffer.outlet(), positive.inlet())?;
343            builder.connect(positive.outlet(), decrement.inlet())?;
344            builder.connect(decrement.outlet(), merge.preferred())?;
345
346            Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
347        })
348        .unwrap();
349
350        assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
351    }
352
353    #[test]
354    fn graph_dsl_runs_buffered_merge_feedback_cycle() {
355        let graph = GraphDsl::try_create(|builder| {
356            let merge = builder.add(Merge::<i32>::new(2));
357            let broadcast = builder.add(Broadcast::<i32>::new(2));
358            let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
359            let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
360            let decrement = builder.add(MapStage::new(|item: i32| item - 1));
361
362            builder.connect(merge.outlet(), broadcast.inlet())?;
363            builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
364            builder.connect(buffer.outlet(), positive.inlet())?;
365            builder.connect(positive.outlet(), decrement.inlet())?;
366            builder.connect(decrement.outlet(), merge.inlet(1)?)?;
367
368            Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
369        })
370        .unwrap();
371
372        assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
373    }
374
375    #[test]
376    fn graph_dsl_unbuffered_merge_cycle_surfaces_event_limit() {
377        let graph = GraphDsl::try_create(|builder| {
378            let merge = builder.add(Merge::<i32>::new(2));
379            let broadcast = builder.add(Broadcast::<i32>::new(2));
380
381            builder.connect(merge.outlet(), broadcast.inlet())?;
382            builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
383
384            Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
385        })
386        .unwrap();
387
388        let result = graph.run_with_input_report([1], FusedExecutionConfig { event_limit: 256 });
389
390        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 256 }));
391    }
392
393    #[test]
394    fn partial_graph_can_be_imported_with_its_shape() {
395        let partial = GraphDsl::partial(|builder| {
396            let first = builder.add(Identity::<i32>::new());
397            let second = builder.add_named(Identity::<i32>::new(), "partial.tail");
398            builder.connect(first.outlet(), second.inlet())?;
399            Ok(FlowShape::new(first.inlet(), second.outlet()))
400        })
401        .named("partial.identity");
402
403        let graph = GraphDsl::try_create(|builder| {
404            let imported = builder.import(&partial)?;
405            let after = builder.add(Identity::<i32>::new());
406            builder.connect(imported.outlet(), after.inlet())?;
407            Ok(FlowShape::new(imported.inlet(), after.outlet()))
408        })
409        .unwrap()
410        .named("outer.graph");
411
412        assert_eq!(graph.run_with_input([1, 2, 3]).unwrap(), vec![1, 2, 3]);
413        assert_eq!(graph.attributes().name(), Some("outer.graph"));
414    }
415
416    #[test]
417    fn graph_attributes_follow_innermost_wins_order() {
418        let graph = GraphDsl::create(|builder| {
419            builder.add_with_attributes(
420                Identity::<i32>::new(),
421                Attributes::named("stage-outer").and(Attributes::named("stage-inner")),
422            )
423        })
424        .unwrap()
425        .add_attributes(Attributes::dispatcher("graph-outer"))
426        .add_attributes(Attributes::dispatcher("graph-inner"));
427
428        assert_eq!(graph.attributes().dispatcher_hint(), Some("graph-inner"));
429        assert_eq!(
430            graph.stages[0].spec.attributes().name(),
431            Some("stage-inner")
432        );
433    }
434
435    #[test]
436    fn graph_dsl_rejects_invalid_and_incomplete_wiring() {
437        let graph = GraphDsl::try_create(|builder| {
438            let first = builder.add(Identity::<i32>::new());
439            let second = builder.add(Identity::<i32>::new());
440            let third = builder.add(Identity::<i32>::new());
441
442            builder.connect(first.outlet(), second.inlet())?;
443            let duplicate = builder.connect(first.outlet(), third.inlet());
444            assert!(matches!(duplicate, Err(StreamError::GraphValidation(_))));
445
446            Ok(FlowShape::new(first.inlet(), second.outlet()))
447        });
448
449        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
450
451        let graph = GraphDsl::create(|builder| {
452            let broadcast = builder.add(Broadcast::<i32>::new(2));
453            SourceShape::new(broadcast.outlet(0).unwrap())
454        });
455        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
456    }
457
458    #[test]
459    fn graph_dsl_rejects_erased_type_mismatch() {
460        let graph = GraphDsl::try_create(|builder| {
461            let left = builder.add(Identity::<i32>::new());
462            let right = builder.add(Identity::<u64>::new());
463            let mismatch = builder.connect_any(left.outlet().erase(), right.inlet().erase());
464            assert!(matches!(mismatch, Err(StreamError::GraphValidation(_))));
465            Ok(FlowShape::new(left.inlet(), right.outlet()))
466        });
467
468        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
469    }
470
471    #[test]
472    fn graph_dsl_rejects_result_ports_with_spoofed_metadata() {
473        let graph = GraphDsl::create(|builder| {
474            let shape = builder.add(Identity::<i32>::new());
475            FlowShape::new(
476                Inlet::<u64>::with_id(shape.inlet().id(), "Identity.in"),
477                shape.outlet(),
478            )
479        });
480
481        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
482    }
483
484    #[test]
485    fn allocated_ports_do_not_collide_with_manual_ports() {
486        let manual = Inlet::<i32>::new("manual");
487        let graph = GraphDsl::try_create(|builder| {
488            let shape = builder.add(Identity::<i32>::new());
489            assert_ne!(manual.id(), shape.inlet().id());
490            Ok(shape)
491        });
492
493        assert!(graph.is_ok());
494    }
495
496    #[test]
497    fn identity_ports_keep_static_names_and_unique_ids() {
498        let graph = GraphDsl::try_create(|builder| {
499            let first = builder.add(Identity::<i32>::new());
500            let second = builder.add(Identity::<i32>::new());
501
502            assert_eq!(first.inlet().name(), "Identity.in");
503            assert_eq!(first.outlet().name(), "Identity.out");
504            assert_eq!(second.inlet().name(), "Identity.in");
505            assert_eq!(second.outlet().name(), "Identity.out");
506            assert_ne!(first.inlet().id(), first.outlet().id());
507            assert_ne!(first.outlet().id(), second.inlet().id());
508            assert_ne!(second.inlet().id(), second.outlet().id());
509
510            builder.connect(first.outlet(), second.inlet())?;
511            Ok(FlowShape::new(first.inlet(), second.outlet()))
512        });
513
514        assert!(graph.is_ok());
515    }
516
517    #[test]
518    fn graph_stage_logic_checks_port_operations() {
519        let shape = FlowShape::new(Inlet::<i32>::new("in"), Outlet::<i32>::new("out"));
520        let inlet = shape.inlet();
521        let outlet = shape.outlet();
522        let mut logic = GraphStageLogic::new(&shape);
523
524        assert!(matches!(
525            logic.grab(&inlet),
526            Err(StreamError::InvalidPortOperation { .. })
527        ));
528        logic.pull(&inlet).unwrap();
529        assert!(logic.has_been_pulled(&inlet));
530        assert!(matches!(
531            logic.pull(&inlet),
532            Err(StreamError::InvalidPortOperation { .. })
533        ));
534        logic.offer(&inlet, 41).unwrap();
535        assert!(!logic.has_been_pulled(&inlet));
536        assert_eq!(logic.grab(&inlet).unwrap(), 41);
537
538        assert!(matches!(
539            logic.push(&outlet, 1),
540            Err(StreamError::InvalidPortOperation { .. })
541        ));
542        logic.request(&outlet).unwrap();
543        assert!(logic.is_available(&outlet));
544        logic.push(&outlet, 42).unwrap();
545        assert!(!logic.is_available(&outlet));
546        logic.complete(&outlet).unwrap();
547        assert!(logic.is_closed(&outlet));
548        assert!(matches!(
549            logic.request(&outlet),
550            Err(StreamError::InvalidPortOperation { .. })
551        ));
552    }
553
554    #[test]
555    fn fused_execution_enforces_event_limit() {
556        let graph = GraphDsl::create(|builder| builder.add(Identity::<i32>::new())).unwrap();
557
558        let result = graph.run_with_input_report([1, 2], FusedExecutionConfig { event_limit: 1 });
559
560        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 1 }));
561    }
562
563    #[test]
564    fn async_boundary_splits_fused_segments() {
565        let graph = GraphDsl::try_create(|builder| {
566            let first = builder.add(Identity::<i32>::new());
567            let boundary = builder.add(AsyncBoundary::<i32>::new());
568            let second = builder.add(MapStage::new(|item: i32| item + 1));
569
570            builder.connect(first.outlet(), boundary.inlet())?;
571            builder.connect(boundary.outlet(), second.inlet())?;
572
573            Ok(FlowShape::new(first.inlet(), second.outlet()))
574        })
575        .unwrap();
576
577        assert_eq!(graph.segments().len(), 3);
578        let report = graph
579            .run_with_input_report([1, 2], FusedExecutionConfig::default())
580            .unwrap();
581        assert_eq!(report.output, vec![2, 3]);
582        assert_eq!(report.async_boundary_crossings, 2);
583    }
584
585    #[test]
586    fn async_boundary_count_path_uses_ractor_handoff_segments() {
587        let graph = GraphDsl::try_create(|builder| {
588            let first = builder.add(MapStage::new(|item: i32| item + 1));
589            let boundary = builder.add(AsyncBoundary::<i32>::new());
590            let second = builder.add(MapStage::new(|item: i32| item * 2));
591
592            builder.connect(first.outlet(), boundary.inlet())?;
593            builder.connect(boundary.outlet(), second.inlet())?;
594
595            Ok(FlowShape::new(first.inlet(), second.outlet()))
596        })
597        .unwrap();
598
599        let report = graph
600            .run_async_boundary_count_with_input_report(
601                [1, 2, 3],
602                AsyncBoundaryExecutionConfig {
603                    fused: FusedExecutionConfig::default(),
604                    buffer_size: 2,
605                },
606            )
607            .unwrap();
608
609        assert_eq!(report.result, 3);
610        assert_eq!(report.async_boundary_crossings, 3);
611        assert_eq!(report.events, 18);
612    }
613
614    #[test]
615    fn threaded_async_boundary_baseline_matches_ractor_count_path() {
616        let graph = GraphDsl::try_create(|builder| {
617            let first = builder.add(MapStage::new(|item: i32| item + 1));
618            let boundary = builder.add(AsyncBoundary::<i32>::new());
619            let second = builder.add(MapStage::new(|item: i32| item * 2));
620
621            builder.connect(first.outlet(), boundary.inlet())?;
622            builder.connect(boundary.outlet(), second.inlet())?;
623
624            Ok(FlowShape::new(first.inlet(), second.outlet()))
625        })
626        .unwrap();
627
628        let config = AsyncBoundaryExecutionConfig {
629            fused: FusedExecutionConfig::default(),
630            buffer_size: 2,
631        };
632        let ractor_report = graph
633            .run_async_boundary_count_with_input_report([1, 2, 3], config)
634            .unwrap();
635        let threaded_report = BoundaryCountExecutor::Threaded
636            .run_count(
637                [1, 2, 3],
638                graph.typed_linear_async_segments().unwrap(),
639                config,
640            )
641            .unwrap();
642
643        assert_eq!(threaded_report, ractor_report);
644    }
645
646    #[test]
647    fn ractor_async_boundary_rejects_zero_buffer_size() {
648        let graph = GraphDsl::try_create(|builder| {
649            let first = builder.add(MapStage::new(|item: i32| item + 1));
650            let boundary = builder.add(AsyncBoundary::<i32>::new());
651            let second = builder.add(MapStage::new(|item: i32| item * 2));
652
653            builder.connect(first.outlet(), boundary.inlet())?;
654            builder.connect(boundary.outlet(), second.inlet())?;
655
656            Ok(FlowShape::new(first.inlet(), second.outlet()))
657        })
658        .unwrap();
659
660        let result = BoundaryCountExecutor::Ractor.run_count(
661            [1, 2, 3],
662            graph.typed_linear_async_segments().unwrap(),
663            AsyncBoundaryExecutionConfig {
664                fused: FusedExecutionConfig::default(),
665                buffer_size: 0,
666            },
667        );
668
669        assert!(matches!(
670            result,
671            Err(StreamError::GraphValidation(message)) if message.contains("buffer_size")
672        ));
673    }
674
675    #[test]
676    fn ractor_async_boundary_streams_input_without_eager_collection() {
677        let graph = GraphDsl::try_create(|builder| {
678            let first = builder.add(MapStage::new(|item: i32| item + 1));
679            let boundary = builder.add(AsyncBoundary::<i32>::new());
680            let second = builder.add(MapStage::new(|item: i32| item * 2));
681
682            builder.connect(first.outlet(), boundary.inlet())?;
683            builder.connect(boundary.outlet(), second.inlet())?;
684
685            Ok(FlowShape::new(first.inlet(), second.outlet()))
686        })
687        .unwrap();
688
689        let result = BoundaryCountExecutor::Ractor.run_count(
690            std::iter::repeat(1_i32),
691            graph.typed_linear_async_segments().unwrap(),
692            AsyncBoundaryExecutionConfig {
693                fused: FusedExecutionConfig { event_limit: 4 },
694                buffer_size: 1,
695            },
696        );
697
698        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 4 }));
699    }
700
701    #[test]
702    fn ractor_async_boundary_runs_inside_existing_tokio_runtime() {
703        let graph = GraphDsl::try_create(|builder| {
704            let first = builder.add(MapStage::new(|item: i32| item + 1));
705            let boundary = builder.add(AsyncBoundary::<i32>::new());
706            let second = builder.add(MapStage::new(|item: i32| item * 2));
707
708            builder.connect(first.outlet(), boundary.inlet())?;
709            builder.connect(boundary.outlet(), second.inlet())?;
710
711            Ok(FlowShape::new(first.inlet(), second.outlet()))
712        })
713        .unwrap();
714
715        let runtime = tokio::runtime::Builder::new_current_thread()
716            .build()
717            .unwrap();
718        let report = runtime
719            .block_on(async {
720                BoundaryCountExecutor::Ractor.run_count(
721                    [1, 2, 3],
722                    graph.typed_linear_async_segments().unwrap(),
723                    AsyncBoundaryExecutionConfig {
724                        fused: FusedExecutionConfig::default(),
725                        buffer_size: 2,
726                    },
727                )
728            })
729            .unwrap();
730
731        assert_eq!(report.result, 3);
732        assert_eq!(report.async_boundary_crossings, 3);
733        assert_eq!(report.events, 18);
734    }
735
736    #[test]
737    fn balance_merge_round_robins_through_junctions() {
738        let graph = GraphDsl::try_create(|builder| {
739            let balance = builder.add(Balance::<i32>::new(2));
740            let merge = builder.add(Merge::<i32>::new(2));
741
742            builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
743            builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
744
745            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
746        })
747        .unwrap();
748
749        assert_eq!(graph.run_with_input(0..6).unwrap(), vec![0, 1, 2, 3, 4, 5]);
750    }
751
752    #[test]
753    fn prioritized_merge_uses_weighted_schedule() {
754        let graph =
755            GraphDsl::create(|builder| builder.add(MergePrioritized::<i32>::new(vec![2, 1])))
756                .unwrap();
757
758        assert_eq!(
759            graph
760                .run_fan_in(vec![vec![1, 2, 3, 4], vec![100, 101]])
761                .unwrap(),
762            vec![1, 2, 100, 3, 4, 101]
763        );
764    }
765
766    #[test]
767    fn fused_execution_supports_count_and_fold_sinks() {
768        let graph = GraphDsl::try_create(|builder| {
769            let first = builder.add(MapStage::new(|item: u64| item + 1));
770            let second = builder.add(MapStage::new(|item: u64| item * 2));
771
772            builder.connect(first.outlet(), second.inlet())?;
773
774            Ok(FlowShape::new(first.inlet(), second.outlet()))
775        })
776        .unwrap();
777
778        assert_eq!(graph.run_count_with_input(0..4).unwrap(), 4);
779        assert_eq!(
780            graph
781                .run_fold_with_input(0..4, 0, |acc, item| acc + item)
782                .unwrap(),
783            20
784        );
785    }
786
787    #[test]
788    fn typed_linear_fast_path_runs_same_type_chains() {
789        let graph = GraphDsl::try_create(|builder| {
790            let first = builder.add(MapStage::new(|item: u64| item + 1));
791            let second = builder.add(MapStage::new(|item: u64| item * 2));
792
793            builder.connect(first.outlet(), second.inlet())?;
794
795            Ok(FlowShape::new(first.inlet(), second.outlet()))
796        })
797        .unwrap();
798
799        let report = graph
800            .run_typed_linear_with_input_report([1, 2, 3], FusedExecutionConfig::default())
801            .unwrap();
802        assert_eq!(report.output, vec![4, 6, 8]);
803        assert_eq!(report.events, 12);
804
805        assert_eq!(graph.run_typed_linear_count_with_input(0..4).unwrap(), 4);
806        assert_eq!(
807            graph
808                .run_typed_linear_fold_with_input(0..4, 0, |acc, item| acc + item)
809                .unwrap(),
810            20
811        );
812    }
813
814    #[test]
815    fn typed_linear_fast_path_rejects_junction_graphs() {
816        let graph = GraphDsl::try_create(|builder| {
817            let balance = builder.add(Balance::<i32>::new(2));
818            let merge = builder.add(Merge::<i32>::new(2));
819
820            builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
821            builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
822
823            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
824        })
825        .unwrap();
826
827        assert!(matches!(
828            graph.run_typed_linear_count_with_input([1, 2, 3]),
829            Err(StreamError::GraphValidation(_))
830        ));
831    }
832
833    #[test]
834    fn merge_preferred_drains_preferred_before_secondaries() {
835        let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i32>::new(2))).unwrap();
836
837        assert_eq!(
838            graph
839                .run_merge_preferred(vec![1, 2, 3], vec![vec![100, 101], vec![200]])
840                .unwrap(),
841            vec![1, 2, 3, 100, 200, 101]
842        );
843    }
844
845    #[test]
846    fn merge_waits_for_all_inputs_to_complete() {
847        let graph = GraphDsl::create(|builder| builder.add(Merge::<i32>::new(2))).unwrap();
848
849        assert_eq!(
850            graph.run_fan_in(vec![vec![], vec![10, 20]]).unwrap(),
851            vec![10, 20]
852        );
853        assert_eq!(graph.run_fan_in(vec![vec![1], vec![]]).unwrap(), vec![1]);
854    }
855
856    #[test]
857    fn merge_sorted_drains_remaining_input_after_peer_completes() {
858        let graph = GraphDsl::try_create(|builder| {
859            let unzip = builder.add(Unzip::<i32, i32>::new());
860            let merge = builder.add(MergeSorted::<i32>::new());
861
862            builder.connect(unzip.out0(), merge.inlet(0)?)?;
863            builder.connect(unzip.out1(), merge.inlet(1)?)?;
864
865            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
866        })
867        .unwrap();
868
869        assert_eq!(
870            graph.run_with_input([(1, 2), (4, 3), (6, 5)]).unwrap(),
871            vec![1, 2, 3, 4, 5, 6]
872        );
873    }
874
875    #[test]
876    fn merge_sequence_reorders_adversarial_arrivals_by_sequence_number() {
877        let graph = GraphDsl::try_create(|builder| {
878            let unzip = builder.add(Unzip::<u64, u64>::new());
879            let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
880
881            builder.connect(unzip.out0(), merge.inlet(0)?)?;
882            builder.connect(unzip.out1(), merge.inlet(1)?)?;
883
884            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
885        })
886        .unwrap();
887
888        assert_eq!(
889            graph.run_with_input([(0, 1), (2, 3), (4, 5)]).unwrap(),
890            vec![0, 1, 2, 3, 4, 5]
891        );
892    }
893
894    #[test]
895    fn merge_latest_emits_with_last_seen_peer_and_honors_eager_complete() {
896        let graph = GraphDsl::try_create(|builder| {
897            let unzip = builder.add(Unzip::<i32, i32>::new());
898            let merge = builder.add(MergeLatest::<i32>::new(2, false));
899
900            builder.connect(unzip.out0(), merge.inlet(0)?)?;
901            builder.connect(unzip.out1(), merge.inlet(1)?)?;
902
903            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
904        })
905        .unwrap();
906
907        assert_eq!(
908            graph.run_with_input([(1, 10), (2, 11)]).unwrap(),
909            vec![vec![1, 10], vec![2, 10], vec![2, 11]]
910        );
911    }
912
913    #[test]
914    fn zip_completes_when_any_input_completes() {
915        let graph = GraphDsl::create(|builder| builder.add(Zip::<i32, i32>::new())).unwrap();
916
917        assert_eq!(
918            graph.run_zip(vec![1, 2, 3], vec![10]).unwrap(),
919            vec![(1, 10)]
920        );
921        assert_eq!(graph.run_zip(vec![1], vec![10, 20]).unwrap(), vec![(1, 10)]);
922        assert_eq!(
923            graph.run_zip(vec![], vec![10, 20]).unwrap(),
924            Vec::<(i32, i32)>::new()
925        );
926    }
927
928    #[test]
929    fn concat_drains_inputs_in_declared_order() {
930        let graph = GraphDsl::create(|builder| builder.add(Concat::<i32>::new(3))).unwrap();
931
932        assert_eq!(
933            graph
934                .run_concat(vec![vec![1, 2], vec![], vec![3, 4]])
935                .unwrap(),
936            vec![1, 2, 3, 4]
937        );
938    }
939
940    #[test]
941    fn or_else_switches_only_if_primary_is_empty() {
942        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
943
944        assert_eq!(
945            graph.run_or_else(vec![], vec![10, 20]).unwrap(),
946            vec![10, 20]
947        );
948        assert_eq!(
949            graph.run_or_else(vec![1, 2], vec![10, 20]).unwrap(),
950            vec![1, 2]
951        );
952    }
953
954    #[test]
955    fn or_else_secondary_first_dropped_when_primary_emits() {
956        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
957
958        assert_eq!(
959            graph
960                .run_or_else_secondary_first(vec![1, 2], vec![10, 20])
961                .unwrap(),
962            vec![1, 2]
963        );
964    }
965
966    #[test]
967    fn or_else_secondary_first_flushed_when_primary_empty() {
968        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
969
970        assert_eq!(
971            graph
972                .run_or_else_secondary_first(vec![], vec![10, 20])
973                .unwrap(),
974            vec![10, 20]
975        );
976    }
977
978    #[test]
979    fn or_else_secondary_closed_then_primary_empty_drains_buffer() {
980        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
981
982        assert_eq!(
983            graph
984                .run_or_else_secondary_closed_first(vec![10, 20])
985                .unwrap(),
986            vec![10, 20]
987        );
988    }
989
990    #[test]
991    fn interleave_cycles_segment_sized_chunks() {
992        let graph = GraphDsl::create(|builder| builder.add(Interleave::<i32>::new(3, 2))).unwrap();
993
994        assert_eq!(
995            graph
996                .run_interleave(vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]], 2, false)
997                .unwrap(),
998            vec![1, 2, 10, 11, 20, 3, 12]
999        );
1000    }
1001
1002    #[test]
1003    fn interleave_eager_close_stops_when_any_input_completes() {
1004        let graph = GraphDsl::create(|builder| {
1005            builder.add(Interleave::<i32>::new_with_eager_close(2, 1, true))
1006        })
1007        .unwrap();
1008
1009        assert_eq!(
1010            graph
1011                .run_interleave(vec![vec![1, 2], vec![]], 1, true)
1012                .unwrap(),
1013            Vec::<i32>::new()
1014        );
1015        assert_eq!(
1016            graph
1017                .run_interleave(vec![vec![1], vec![10, 11]], 1, true)
1018                .unwrap(),
1019            vec![1, 10]
1020        );
1021    }
1022
1023    #[test]
1024    fn partition_routes_only_live_outlets_after_peer_cancels() {
1025        let graph = GraphDsl::try_create(|builder| {
1026            let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
1027            let merge = builder.add(Merge::<i32>::new(2));
1028
1029            builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
1030            builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
1031
1032            Ok(FlowShape::new(partition.inlet(), merge.outlet()))
1033        })
1034        .unwrap();
1035
1036        assert_eq!(
1037            graph.run_with_input([0, 1, 2, 3]).unwrap(),
1038            vec![0, 1, 2, 3]
1039        );
1040    }
1041
1042    #[test]
1043    fn unzip_with_keeps_live_outlet_running_after_peer_finishes() {
1044        let graph = GraphDsl::try_create(|builder| {
1045            let unzip = builder.add(UnzipWith::<i32, i32, i32>::new(|item| (item, item * 10)));
1046            let zip = builder.add(Zip::<i32, i32>::new());
1047
1048            builder.connect(unzip.out0(), zip.in0())?;
1049            builder.connect(unzip.out1(), zip.in1())?;
1050
1051            Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
1052        })
1053        .unwrap();
1054
1055        assert_eq!(
1056            graph.run_with_input([1, 2, 3]).unwrap(),
1057            vec![(1, 10), (2, 20), (3, 30)]
1058        );
1059    }
1060
1061    // --- Regression tests pinning WP-17a review-comment fixes ---
1062
1063    // Fix 1a: UnzipFanInFastPath preserves target inlet order for MergeSorted.
1064    // out0 is wired to inlet(1) and out1 to inlet(0) — values must still be
1065    // sorted correctly regardless of swapped wiring.
1066    #[test]
1067    fn unzip_merge_sorted_swapped_inlets_still_sorted() {
1068        let graph_normal = GraphDsl::try_create(|builder| {
1069            let unzip = builder.add(Unzip::<i32, i32>::new());
1070            let merge = builder.add(MergeSorted::<i32>::new());
1071            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1072            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1073            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1074        })
1075        .unwrap();
1076
1077        // Swapped: out0 → inlet(1), out1 → inlet(0).
1078        let graph_swapped = GraphDsl::try_create(|builder| {
1079            let unzip = builder.add(Unzip::<i32, i32>::new());
1080            let merge = builder.add(MergeSorted::<i32>::new());
1081            builder.connect(unzip.out0(), merge.inlet(1)?)?;
1082            builder.connect(unzip.out1(), merge.inlet(0)?)?;
1083            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1084        })
1085        .unwrap();
1086
1087        let input = vec![(1, 2), (4, 3), (6, 5)];
1088        let expected = vec![1, 2, 3, 4, 5, 6];
1089        assert_eq!(
1090            graph_normal.run_with_input(input.clone()).unwrap(),
1091            expected
1092        );
1093        assert_eq!(graph_swapped.run_with_input(input).unwrap(), expected);
1094    }
1095
1096    // Fix 1b: UnzipFanInFastPath preserves target inlet order for MergeLatest.
1097    // With swapped wiring the snapshot elements must reflect which inlet each
1098    // value was routed to, producing the same result as the non-swapped graph.
1099    #[test]
1100    fn unzip_merge_latest_swapped_inlets_correct_snapshot_order() {
1101        // Normal wiring: out0 → inlet(0), out1 → inlet(1).
1102        // Input (1, 10): latest = [Some(1), Some(10)] → snapshot [1, 10].
1103        let graph_normal = GraphDsl::try_create(|builder| {
1104            let unzip = builder.add(Unzip::<i32, i32>::new());
1105            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1106            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1107            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1108            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1109        })
1110        .unwrap();
1111
1112        // Swapped: out0 → inlet(1), out1 → inlet(0).
1113        // Input (1, 10): out0=1 goes to inlet(1), out1=10 goes to inlet(0).
1114        // latest = [Some(10), Some(1)] → snapshot [10, 1].
1115        let graph_swapped = GraphDsl::try_create(|builder| {
1116            let unzip = builder.add(Unzip::<i32, i32>::new());
1117            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1118            builder.connect(unzip.out0(), merge.inlet(1)?)?;
1119            builder.connect(unzip.out1(), merge.inlet(0)?)?;
1120            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1121        })
1122        .unwrap();
1123
1124        assert_eq!(
1125            graph_normal.run_with_input([(1, 10)]).unwrap(),
1126            vec![vec![1, 10]]
1127        );
1128        // Swapped wiring routes values to opposite slots; snapshot order differs.
1129        assert_eq!(
1130            graph_swapped.run_with_input([(1, 10)]).unwrap(),
1131            vec![vec![10, 1]]
1132        );
1133    }
1134
1135    // Fix 2: MergeSequence fails on a sequence gap when all inputs complete.
1136    // The GraphStage logic errors if `pending` holds items but next_sequence
1137    // is not among them — the fused executor must mirror that behavior.
1138    #[test]
1139    fn merge_sequence_fails_on_gap_at_completion() {
1140        // Sequences 1 and 2 arrive (via the unzip split of pair (1, 2)), but
1141        // sequence 0 never does.  When the upstream completes both inlets,
1142        // next_sequence is still 0, there is no sequence 0 in pending → error.
1143        let graph = GraphDsl::try_create(|builder| {
1144            let unzip = builder.add(Unzip::<u64, u64>::new());
1145            let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
1146            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1147            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1148            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1149        })
1150        .unwrap();
1151
1152        let result = graph.run_with_input([(1u64, 2u64)]);
1153        assert!(
1154            matches!(result, Err(StreamError::Failed(ref msg)) if msg.contains("expected sequence")),
1155            "expected a sequence-gap error, got: {result:?}"
1156        );
1157    }
1158
1159    // Fix 3: MergeLatest honors eager_complete — stream ends as soon as any
1160    // inlet completes when eager_complete = true.
1161    #[test]
1162    fn merge_latest_eager_complete_closes_on_first_inlet_done() {
1163        // With eager_complete = true, the stage should complete when the first
1164        // input stream finishes (here the single-item upstream drains first).
1165        // The non-eager variant of the same graph would keep running until both
1166        // inlets complete.
1167        let graph_eager = GraphDsl::try_create(|builder| {
1168            let unzip = builder.add(Unzip::<i32, i32>::new());
1169            let merge = builder.add(MergeLatest::<i32>::new(2, true));
1170            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1171            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1172            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1173        })
1174        .unwrap();
1175
1176        let graph_non_eager = GraphDsl::try_create(|builder| {
1177            let unzip = builder.add(Unzip::<i32, i32>::new());
1178            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1179            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1180            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1181            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1182        })
1183        .unwrap();
1184
1185        // Single item: after this item both outlets close simultaneously (the
1186        // Unzip upstream completes), so both eager and non-eager complete after
1187        // emitting the first snapshot.
1188        let result_eager = graph_eager.run_with_input([(1i32, 10i32)]).unwrap();
1189        let result_non_eager = graph_non_eager.run_with_input([(1i32, 10i32)]).unwrap();
1190
1191        // Both should produce at least one snapshot.
1192        assert!(!result_eager.is_empty(), "eager graph produced no output");
1193        assert!(
1194            !result_non_eager.is_empty(),
1195            "non-eager graph produced no output"
1196        );
1197
1198        // Produce the same result for a balanced input — the eager flag only
1199        // changes behavior when ONE inlet finishes before the other.
1200        assert_eq!(result_eager, result_non_eager);
1201    }
1202}