Skip to main content

datum/graph/
mod.rs

1//! Runtime-checked graph, shape, port, and junction primitives.
2//!
3//! This module starts Datum's Akka-like graph layer. It deliberately keeps the
4//! first execution slice small: acyclic fused regions with linear stages and a
5//! few fan-in/fan-out junctions. Ports are typed at the public API boundary and
6//! still validated at runtime so invalid GraphDSL wiring and illegal stage
7//! operations fail deterministically.
8
9use std::{
10    any::{Any, TypeId, type_name},
11    collections::{HashMap, HashSet, VecDeque},
12    fmt,
13    marker::PhantomData,
14    sync::{
15        Arc, Mutex, OnceLock,
16        atomic::{AtomicUsize, Ordering},
17    },
18    thread,
19};
20
21#[cfg(test)]
22use std::sync::mpsc;
23
24use crate::{
25    actor::{Actor, ActorProcessingErr, ActorRef},
26    stream::{StreamError, StreamResult},
27};
28
29type DatumValue = Box<dyn DatumElement>;
30type StageMapFn = dyn Fn(DatumValue) -> StreamResult<DatumValue> + Send + Sync;
31type StageTypedMapFn = dyn Any + Send + Sync;
32type StageZipFn = dyn Fn(DatumValue, DatumValue) -> StreamResult<DatumValue> + Send + Sync;
33type StageUnzipFn = dyn Fn(DatumValue) -> (DatumValue, DatumValue) + Send + Sync;
34/// Opaque container for a typed split function `Arc<dyn Fn(In) -> (Out0, Out1)>`.
35/// Down-cast at plan time — same pattern as [`StageTypedMapFn`].
36type StageTypedUnzipFn = dyn Any + Send + Sync;
37type StageCompareFn = dyn Fn(&DatumValue, &DatumValue) -> std::cmp::Ordering + Send + Sync;
38type StageSequenceFn = dyn Fn(&DatumValue) -> u64 + Send + Sync;
39/// Opaque container for a typed sequence-extractor `Arc<dyn Fn(&T) -> u64>`.
40/// Down-cast at plan time.
41type StageTypedSequenceFn = dyn Any + Send + Sync;
42type StageSnapshotFn = dyn Fn(&[&DatumValue]) -> DatumValue + Send + Sync;
43/// Opaque container for a typed snapshot builder `Arc<dyn Fn(&[Option<T>]) -> Vec<T>>`.
44/// Down-cast at plan time.
45type StageTypedSnapshotFn = dyn Any + Send + Sync;
46type StagePartitionFn = dyn Fn(&DatumValue) -> usize + Send + Sync;
47
48#[derive(Clone)]
49struct StageMapFns {
50    erased: Arc<StageMapFn>,
51    typed: Arc<StageTypedMapFn>,
52}
53
54pub(crate) trait DatumElement: Any + Send {
55    fn clone_box(&self) -> DatumValue;
56    fn into_any(self: Box<Self>) -> Box<dyn Any + Send>;
57    fn as_any_ref(&self) -> &dyn Any;
58}
59
60impl<T> DatumElement for T
61where
62    T: Any + Clone + Send,
63{
64    fn clone_box(&self) -> DatumValue {
65        Box::new(self.clone())
66    }
67
68    fn into_any(self: Box<Self>) -> Box<dyn Any + Send> {
69        self
70    }
71
72    fn as_any_ref(&self) -> &dyn Any {
73        self
74    }
75}
76
77fn datum<T>(value: T) -> DatumValue
78where
79    T: Clone + Send + 'static,
80{
81    Box::new(value)
82}
83
84fn downcast_datum<T, S>(
85    value: DatumValue,
86    operation: &'static str,
87    port: impl FnOnce() -> S,
88) -> StreamResult<T>
89where
90    T: Send + 'static,
91    S: Into<String>,
92{
93    // `port` is only evaluated on the cold error path so the success path
94    // (the per-element fused hot path) allocates nothing for diagnostics. It
95    // returns `impl Into<String>` so callers can hand back a `&'static str`
96    // without an eager `to_owned()`.
97    value
98        .into_any()
99        .downcast::<T>()
100        .map(|value| *value)
101        .map_err(|_| StreamError::InvalidPortOperation {
102            operation,
103            port: port().into(),
104            reason: format!("element type did not match {}", type_name::<T>()),
105        })
106}
107
108static NEXT_PORT_ID: AtomicUsize = AtomicUsize::new(1);
109
110fn next_port_id() -> PortId {
111    PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed))
112}
113
114fn next_port_id_block(count: usize) -> PortId {
115    debug_assert!(count > 0);
116    PortId(NEXT_PORT_ID.fetch_add(count, Ordering::Relaxed))
117}
118
119#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
120pub struct PortId(usize);
121
122impl PortId {
123    #[must_use]
124    pub const fn as_usize(self) -> usize {
125        self.0
126    }
127
128    const fn offset(self, offset: usize) -> Self {
129        Self(self.0 + offset)
130    }
131}
132
133#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
134pub enum PortKind {
135    Inlet,
136    Outlet,
137}
138
139macro_rules! shared_name {
140    ($fn_name:ident, $value:literal) => {
141        fn $fn_name() -> Arc<str> {
142            static NAME: OnceLock<Arc<str>> = OnceLock::new();
143            Arc::clone(NAME.get_or_init(|| Arc::from($value)))
144        }
145    };
146}
147
148shared_name!(identity_stage_name, "Identity");
149shared_name!(identity_inlet_name, "Identity.in");
150shared_name!(identity_outlet_name, "Identity.out");
151shared_name!(map_stage_name, "Map");
152shared_name!(map_inlet_name, "Map.in");
153shared_name!(map_outlet_name, "Map.out");
154shared_name!(broadcast_stage_name, "Broadcast");
155shared_name!(broadcast_inlet_name, "Broadcast.in");
156shared_name!(balance_stage_name, "Balance");
157shared_name!(balance_inlet_name, "Balance.in");
158shared_name!(merge_stage_name, "Merge");
159shared_name!(merge_outlet_name, "Merge.out");
160shared_name!(merge_preferred_stage_name, "MergePreferred");
161shared_name!(merge_preferred_preferred_name, "MergePreferred.preferred");
162shared_name!(merge_preferred_outlet_name, "MergePreferred.out");
163shared_name!(merge_prioritized_stage_name, "MergePrioritized");
164shared_name!(merge_prioritized_outlet_name, "MergePrioritized.out");
165shared_name!(concat_stage_name, "Concat");
166shared_name!(concat_outlet_name, "Concat.out");
167shared_name!(or_else_stage_name, "OrElse");
168shared_name!(or_else_primary_name, "OrElse.primary");
169shared_name!(or_else_secondary_name, "OrElse.secondary");
170shared_name!(or_else_outlet_name, "OrElse.out");
171shared_name!(interleave_stage_name, "Interleave");
172shared_name!(interleave_outlet_name, "Interleave.out");
173shared_name!(zip_stage_name, "Zip");
174shared_name!(zip_in0_name, "Zip.in0");
175shared_name!(zip_in1_name, "Zip.in1");
176shared_name!(zip_outlet_name, "Zip.out");
177shared_name!(async_boundary_stage_name, "AsyncBoundary");
178shared_name!(async_boundary_inlet_name, "AsyncBoundary.in");
179shared_name!(async_boundary_outlet_name, "AsyncBoundary.out");
180
181mod builder;
182mod executor;
183mod junctions;
184mod ports;
185mod shapes;
186mod stage;
187
188#[cfg(test)]
189use self::executor::BoundaryCountExecutor;
190use self::{builder::StageRecord, shapes::PortAllocator, stage::StageKind};
191
192pub use self::{
193    builder::{
194        AsyncBoundaryExecutionConfig, FusedExecutionConfig, FusedExecutionReport, FusedSegment,
195        FusedTerminalReport, Graph, GraphBlueprint, GraphBuilder, GraphDsl, ImportedGraph,
196        PartialGraph,
197    },
198    junctions::{
199        AsyncBoundary, Balance, Broadcast, Concat, Identity, Interleave, MapStage, Merge,
200        MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted, OrElse,
201        Partition, Unzip, UnzipWith, Zip,
202    },
203    ports::{AnyInlet, AnyOutlet, Inlet, Outlet, PortRef},
204    shapes::{
205        BidiShape, FanInShape, FanOutShape, FanOutShape2, FlowShape, MergePreferredShape, Shape,
206        SinkShape, SourceShape, ZipShape,
207    },
208    stage::{AsyncCallback, GraphStage, GraphStageLogic, InHandler, OutHandler, StageSpec},
209};
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::Attributes;
215
216    #[test]
217    fn graph_dsl_builds_broadcast_zip_flow() {
218        let graph = GraphDsl::try_create(|builder| {
219            let broadcast = builder.add(Broadcast::<i32>::new(2));
220            let zip = builder.add(Zip::<i32, i32>::new());
221
222            builder.connect(broadcast.outlet(0)?, zip.in0())?;
223            builder.connect(broadcast.outlet(1)?, zip.in1())?;
224
225            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
226        })
227        .unwrap();
228
229        assert_eq!(graph.stage_count(), 2);
230        assert_eq!(graph.edge_count(), 2);
231        assert_eq!(graph.shape().inlets().len(), 1);
232        assert_eq!(graph.shape().outlets().len(), 1);
233        assert_eq!(
234            graph.run_with_input([1, 2, 3]).unwrap(),
235            vec![(1, 1), (2, 2), (3, 3)]
236        );
237    }
238
239    #[test]
240    fn graph_dsl_zip_slots_follow_inlet_ids() {
241        let graph = GraphDsl::try_create(|builder| {
242            let broadcast = builder.add(Broadcast::<i32>::new(2));
243            let identity = builder.add(Identity::<i32>::new());
244            let zip = builder.add(Zip::<i32, i32>::new());
245
246            builder.connect(broadcast.outlet(0)?, identity.inlet())?;
247            builder.connect(identity.outlet(), zip.in1())?;
248            builder.connect(broadcast.outlet(1)?, zip.in0())?;
249
250            Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
251        })
252        .unwrap();
253
254        assert_eq!(
255            graph.run_with_input([1, 2, 3]).unwrap(),
256            vec![(1, 1), (2, 2), (3, 3)]
257        );
258    }
259
260    #[test]
261    fn graph_dsl_zip_buffers_skewed_inlet_arrivals() {
262        // Regression test for per-inlet Zip buffering. This asymmetric topology
263        // delivers TWO elements to zip.in0 for every ONE that reaches zip.in1
264        // within a single fused cycle:
265        //
266        //   in -> Broadcast A(2)
267        //          A.out0 -> Broadcast B(2) -> Merge(2).out -> zip.in0   (two arrivals)
268        //          A.out1 ----------------------------------> zip.in1   (one arrival)
269        //
270        // The fused executor drains A.out0 depth-first, so both Merge emissions
271        // hit zip.in0 before A.out1 ever feeds zip.in1. With single-slot Zip
272        // state this errors ("second element before its pair"); bounded
273        // per-inlet buffering must instead queue the surplus and pair in FIFO
274        // arrival order without error. Neither Broadcast matches the fused
275        // Broadcast(2)->Zip fast path (A feeds two different stages; B feeds a
276        // Merge), so this exercises the general buffered Zip path.
277        let graph = GraphDsl::try_create(|builder| {
278            let fan = builder.add(Broadcast::<i32>::new(2));
279            let doubler = builder.add(Broadcast::<i32>::new(2));
280            let merge = builder.add(Merge::<i32>::new(2));
281            let zip = builder.add(Zip::<i32, i32>::new());
282
283            builder.connect(fan.outlet(0)?, doubler.inlet())?;
284            builder.connect(doubler.outlet(0)?, merge.inlet(0)?)?;
285            builder.connect(doubler.outlet(1)?, merge.inlet(1)?)?;
286            builder.connect(merge.outlet(), zip.in0())?;
287            builder.connect(fan.outlet(1)?, zip.in1())?;
288
289            Ok(FlowShape::new(fan.inlet(), zip.outlet()))
290        })
291        .unwrap();
292
293        // in0 accumulates two per item while in1 gets one, so FIFO pairing runs
294        // in0 one item ahead: 10 pairs with 10, then the leftover 10 pairs with
295        // 20. The point is that this succeeds (no error) and pairs in order.
296        assert_eq!(
297            graph.run_with_input([10, 20]).unwrap(),
298            vec![(10, 10), (10, 20)]
299        );
300    }
301
302    #[test]
303    fn graph_dsl_rejects_cycles() {
304        // Cycles stay deferred until WP-16: the current fused executor is
305        // still recursive and push-driven rather than demand-aware.
306        let graph = GraphDsl::try_create(|builder| {
307            let merge = builder.add(Merge::<i32>::new(2));
308            let broadcast = builder.add(Broadcast::<i32>::new(2));
309            builder.connect(merge.outlet(), broadcast.inlet())?;
310            builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
311            Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
312        });
313        let Err(StreamError::GraphValidation(message)) = graph else {
314            panic!("cyclic graph should be rejected");
315        };
316        assert!(message.contains("WP-16"));
317        assert!(message.contains("demand-aware graph interpreter"));
318    }
319
320    #[test]
321    fn partial_graph_can_be_imported_with_its_shape() {
322        let partial = GraphDsl::partial(|builder| {
323            let first = builder.add(Identity::<i32>::new());
324            let second = builder.add_named(Identity::<i32>::new(), "partial.tail");
325            builder.connect(first.outlet(), second.inlet())?;
326            Ok(FlowShape::new(first.inlet(), second.outlet()))
327        })
328        .named("partial.identity");
329
330        let graph = GraphDsl::try_create(|builder| {
331            let imported = builder.import(&partial)?;
332            let after = builder.add(Identity::<i32>::new());
333            builder.connect(imported.outlet(), after.inlet())?;
334            Ok(FlowShape::new(imported.inlet(), after.outlet()))
335        })
336        .unwrap()
337        .named("outer.graph");
338
339        assert_eq!(graph.run_with_input([1, 2, 3]).unwrap(), vec![1, 2, 3]);
340        assert_eq!(graph.attributes().name(), Some("outer.graph"));
341    }
342
343    #[test]
344    fn graph_attributes_follow_innermost_wins_order() {
345        let graph = GraphDsl::create(|builder| {
346            builder.add_with_attributes(
347                Identity::<i32>::new(),
348                Attributes::named("stage-outer").and(Attributes::named("stage-inner")),
349            )
350        })
351        .unwrap()
352        .add_attributes(Attributes::dispatcher("graph-outer"))
353        .add_attributes(Attributes::dispatcher("graph-inner"));
354
355        assert_eq!(graph.attributes().dispatcher_hint(), Some("graph-inner"));
356        assert_eq!(
357            graph.stages[0].spec.attributes().name(),
358            Some("stage-inner")
359        );
360    }
361
362    #[test]
363    fn graph_dsl_rejects_invalid_and_incomplete_wiring() {
364        let graph = GraphDsl::try_create(|builder| {
365            let first = builder.add(Identity::<i32>::new());
366            let second = builder.add(Identity::<i32>::new());
367            let third = builder.add(Identity::<i32>::new());
368
369            builder.connect(first.outlet(), second.inlet())?;
370            let duplicate = builder.connect(first.outlet(), third.inlet());
371            assert!(matches!(duplicate, Err(StreamError::GraphValidation(_))));
372
373            Ok(FlowShape::new(first.inlet(), second.outlet()))
374        });
375
376        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
377
378        let graph = GraphDsl::create(|builder| {
379            let broadcast = builder.add(Broadcast::<i32>::new(2));
380            SourceShape::new(broadcast.outlet(0).unwrap())
381        });
382        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
383    }
384
385    #[test]
386    fn graph_dsl_rejects_erased_type_mismatch() {
387        let graph = GraphDsl::try_create(|builder| {
388            let left = builder.add(Identity::<i32>::new());
389            let right = builder.add(Identity::<u64>::new());
390            let mismatch = builder.connect_any(left.outlet().erase(), right.inlet().erase());
391            assert!(matches!(mismatch, Err(StreamError::GraphValidation(_))));
392            Ok(FlowShape::new(left.inlet(), right.outlet()))
393        });
394
395        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
396    }
397
398    #[test]
399    fn graph_dsl_rejects_result_ports_with_spoofed_metadata() {
400        let graph = GraphDsl::create(|builder| {
401            let shape = builder.add(Identity::<i32>::new());
402            FlowShape::new(
403                Inlet::<u64>::with_id(shape.inlet().id(), "Identity.in"),
404                shape.outlet(),
405            )
406        });
407
408        assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
409    }
410
411    #[test]
412    fn allocated_ports_do_not_collide_with_manual_ports() {
413        let manual = Inlet::<i32>::new("manual");
414        let graph = GraphDsl::try_create(|builder| {
415            let shape = builder.add(Identity::<i32>::new());
416            assert_ne!(manual.id(), shape.inlet().id());
417            Ok(shape)
418        });
419
420        assert!(graph.is_ok());
421    }
422
423    #[test]
424    fn identity_ports_keep_static_names_and_unique_ids() {
425        let graph = GraphDsl::try_create(|builder| {
426            let first = builder.add(Identity::<i32>::new());
427            let second = builder.add(Identity::<i32>::new());
428
429            assert_eq!(first.inlet().name(), "Identity.in");
430            assert_eq!(first.outlet().name(), "Identity.out");
431            assert_eq!(second.inlet().name(), "Identity.in");
432            assert_eq!(second.outlet().name(), "Identity.out");
433            assert_ne!(first.inlet().id(), first.outlet().id());
434            assert_ne!(first.outlet().id(), second.inlet().id());
435            assert_ne!(second.inlet().id(), second.outlet().id());
436
437            builder.connect(first.outlet(), second.inlet())?;
438            Ok(FlowShape::new(first.inlet(), second.outlet()))
439        });
440
441        assert!(graph.is_ok());
442    }
443
444    #[test]
445    fn graph_stage_logic_checks_port_operations() {
446        let shape = FlowShape::new(Inlet::<i32>::new("in"), Outlet::<i32>::new("out"));
447        let inlet = shape.inlet();
448        let outlet = shape.outlet();
449        let mut logic = GraphStageLogic::new(&shape);
450
451        assert!(matches!(
452            logic.grab(&inlet),
453            Err(StreamError::InvalidPortOperation { .. })
454        ));
455        logic.pull(&inlet).unwrap();
456        assert!(logic.has_been_pulled(&inlet));
457        assert!(matches!(
458            logic.pull(&inlet),
459            Err(StreamError::InvalidPortOperation { .. })
460        ));
461        logic.offer(&inlet, 41).unwrap();
462        assert!(!logic.has_been_pulled(&inlet));
463        assert_eq!(logic.grab(&inlet).unwrap(), 41);
464
465        assert!(matches!(
466            logic.push(&outlet, 1),
467            Err(StreamError::InvalidPortOperation { .. })
468        ));
469        logic.request(&outlet).unwrap();
470        assert!(logic.is_available(&outlet));
471        logic.push(&outlet, 42).unwrap();
472        assert!(!logic.is_available(&outlet));
473        logic.complete(&outlet).unwrap();
474        assert!(logic.is_closed(&outlet));
475        assert!(matches!(
476            logic.request(&outlet),
477            Err(StreamError::InvalidPortOperation { .. })
478        ));
479    }
480
481    #[test]
482    fn fused_execution_enforces_event_limit() {
483        let graph = GraphDsl::create(|builder| builder.add(Identity::<i32>::new())).unwrap();
484
485        let result = graph.run_with_input_report([1, 2], FusedExecutionConfig { event_limit: 1 });
486
487        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 1 }));
488    }
489
490    #[test]
491    fn async_boundary_splits_fused_segments() {
492        let graph = GraphDsl::try_create(|builder| {
493            let first = builder.add(Identity::<i32>::new());
494            let boundary = builder.add(AsyncBoundary::<i32>::new());
495            let second = builder.add(MapStage::new(|item: i32| item + 1));
496
497            builder.connect(first.outlet(), boundary.inlet())?;
498            builder.connect(boundary.outlet(), second.inlet())?;
499
500            Ok(FlowShape::new(first.inlet(), second.outlet()))
501        })
502        .unwrap();
503
504        assert_eq!(graph.segments().len(), 3);
505        let report = graph
506            .run_with_input_report([1, 2], FusedExecutionConfig::default())
507            .unwrap();
508        assert_eq!(report.output, vec![2, 3]);
509        assert_eq!(report.async_boundary_crossings, 2);
510    }
511
512    #[test]
513    fn async_boundary_count_path_uses_ractor_handoff_segments() {
514        let graph = GraphDsl::try_create(|builder| {
515            let first = builder.add(MapStage::new(|item: i32| item + 1));
516            let boundary = builder.add(AsyncBoundary::<i32>::new());
517            let second = builder.add(MapStage::new(|item: i32| item * 2));
518
519            builder.connect(first.outlet(), boundary.inlet())?;
520            builder.connect(boundary.outlet(), second.inlet())?;
521
522            Ok(FlowShape::new(first.inlet(), second.outlet()))
523        })
524        .unwrap();
525
526        let report = graph
527            .run_async_boundary_count_with_input_report(
528                [1, 2, 3],
529                AsyncBoundaryExecutionConfig {
530                    fused: FusedExecutionConfig::default(),
531                    buffer_size: 2,
532                },
533            )
534            .unwrap();
535
536        assert_eq!(report.result, 3);
537        assert_eq!(report.async_boundary_crossings, 3);
538        assert_eq!(report.events, 18);
539    }
540
541    #[test]
542    fn threaded_async_boundary_baseline_matches_ractor_count_path() {
543        let graph = GraphDsl::try_create(|builder| {
544            let first = builder.add(MapStage::new(|item: i32| item + 1));
545            let boundary = builder.add(AsyncBoundary::<i32>::new());
546            let second = builder.add(MapStage::new(|item: i32| item * 2));
547
548            builder.connect(first.outlet(), boundary.inlet())?;
549            builder.connect(boundary.outlet(), second.inlet())?;
550
551            Ok(FlowShape::new(first.inlet(), second.outlet()))
552        })
553        .unwrap();
554
555        let config = AsyncBoundaryExecutionConfig {
556            fused: FusedExecutionConfig::default(),
557            buffer_size: 2,
558        };
559        let ractor_report = graph
560            .run_async_boundary_count_with_input_report([1, 2, 3], config)
561            .unwrap();
562        let threaded_report = BoundaryCountExecutor::Threaded
563            .run_count(
564                [1, 2, 3],
565                graph.typed_linear_async_segments().unwrap(),
566                config,
567            )
568            .unwrap();
569
570        assert_eq!(threaded_report, ractor_report);
571    }
572
573    #[test]
574    fn ractor_async_boundary_rejects_zero_buffer_size() {
575        let graph = GraphDsl::try_create(|builder| {
576            let first = builder.add(MapStage::new(|item: i32| item + 1));
577            let boundary = builder.add(AsyncBoundary::<i32>::new());
578            let second = builder.add(MapStage::new(|item: i32| item * 2));
579
580            builder.connect(first.outlet(), boundary.inlet())?;
581            builder.connect(boundary.outlet(), second.inlet())?;
582
583            Ok(FlowShape::new(first.inlet(), second.outlet()))
584        })
585        .unwrap();
586
587        let result = BoundaryCountExecutor::Ractor.run_count(
588            [1, 2, 3],
589            graph.typed_linear_async_segments().unwrap(),
590            AsyncBoundaryExecutionConfig {
591                fused: FusedExecutionConfig::default(),
592                buffer_size: 0,
593            },
594        );
595
596        assert!(matches!(
597            result,
598            Err(StreamError::GraphValidation(message)) if message.contains("buffer_size")
599        ));
600    }
601
602    #[test]
603    fn ractor_async_boundary_streams_input_without_eager_collection() {
604        let graph = GraphDsl::try_create(|builder| {
605            let first = builder.add(MapStage::new(|item: i32| item + 1));
606            let boundary = builder.add(AsyncBoundary::<i32>::new());
607            let second = builder.add(MapStage::new(|item: i32| item * 2));
608
609            builder.connect(first.outlet(), boundary.inlet())?;
610            builder.connect(boundary.outlet(), second.inlet())?;
611
612            Ok(FlowShape::new(first.inlet(), second.outlet()))
613        })
614        .unwrap();
615
616        let result = BoundaryCountExecutor::Ractor.run_count(
617            std::iter::repeat(1_i32),
618            graph.typed_linear_async_segments().unwrap(),
619            AsyncBoundaryExecutionConfig {
620                fused: FusedExecutionConfig { event_limit: 4 },
621                buffer_size: 1,
622            },
623        );
624
625        assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 4 }));
626    }
627
628    #[test]
629    fn ractor_async_boundary_runs_inside_existing_tokio_runtime() {
630        let graph = GraphDsl::try_create(|builder| {
631            let first = builder.add(MapStage::new(|item: i32| item + 1));
632            let boundary = builder.add(AsyncBoundary::<i32>::new());
633            let second = builder.add(MapStage::new(|item: i32| item * 2));
634
635            builder.connect(first.outlet(), boundary.inlet())?;
636            builder.connect(boundary.outlet(), second.inlet())?;
637
638            Ok(FlowShape::new(first.inlet(), second.outlet()))
639        })
640        .unwrap();
641
642        let runtime = tokio::runtime::Builder::new_current_thread()
643            .build()
644            .unwrap();
645        let report = runtime
646            .block_on(async {
647                BoundaryCountExecutor::Ractor.run_count(
648                    [1, 2, 3],
649                    graph.typed_linear_async_segments().unwrap(),
650                    AsyncBoundaryExecutionConfig {
651                        fused: FusedExecutionConfig::default(),
652                        buffer_size: 2,
653                    },
654                )
655            })
656            .unwrap();
657
658        assert_eq!(report.result, 3);
659        assert_eq!(report.async_boundary_crossings, 3);
660        assert_eq!(report.events, 18);
661    }
662
663    #[test]
664    fn balance_merge_round_robins_through_junctions() {
665        let graph = GraphDsl::try_create(|builder| {
666            let balance = builder.add(Balance::<i32>::new(2));
667            let merge = builder.add(Merge::<i32>::new(2));
668
669            builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
670            builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
671
672            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
673        })
674        .unwrap();
675
676        assert_eq!(graph.run_with_input(0..6).unwrap(), vec![0, 1, 2, 3, 4, 5]);
677    }
678
679    #[test]
680    fn prioritized_merge_uses_weighted_schedule() {
681        let graph =
682            GraphDsl::create(|builder| builder.add(MergePrioritized::<i32>::new(vec![2, 1])))
683                .unwrap();
684
685        assert_eq!(
686            graph
687                .run_fan_in(vec![vec![1, 2, 3, 4], vec![100, 101]])
688                .unwrap(),
689            vec![1, 2, 100, 3, 4, 101]
690        );
691    }
692
693    #[test]
694    fn fused_execution_supports_count_and_fold_sinks() {
695        let graph = GraphDsl::try_create(|builder| {
696            let first = builder.add(MapStage::new(|item: u64| item + 1));
697            let second = builder.add(MapStage::new(|item: u64| item * 2));
698
699            builder.connect(first.outlet(), second.inlet())?;
700
701            Ok(FlowShape::new(first.inlet(), second.outlet()))
702        })
703        .unwrap();
704
705        assert_eq!(graph.run_count_with_input(0..4).unwrap(), 4);
706        assert_eq!(
707            graph
708                .run_fold_with_input(0..4, 0, |acc, item| acc + item)
709                .unwrap(),
710            20
711        );
712    }
713
714    #[test]
715    fn typed_linear_fast_path_runs_same_type_chains() {
716        let graph = GraphDsl::try_create(|builder| {
717            let first = builder.add(MapStage::new(|item: u64| item + 1));
718            let second = builder.add(MapStage::new(|item: u64| item * 2));
719
720            builder.connect(first.outlet(), second.inlet())?;
721
722            Ok(FlowShape::new(first.inlet(), second.outlet()))
723        })
724        .unwrap();
725
726        let report = graph
727            .run_typed_linear_with_input_report([1, 2, 3], FusedExecutionConfig::default())
728            .unwrap();
729        assert_eq!(report.output, vec![4, 6, 8]);
730        assert_eq!(report.events, 12);
731
732        assert_eq!(graph.run_typed_linear_count_with_input(0..4).unwrap(), 4);
733        assert_eq!(
734            graph
735                .run_typed_linear_fold_with_input(0..4, 0, |acc, item| acc + item)
736                .unwrap(),
737            20
738        );
739    }
740
741    #[test]
742    fn typed_linear_fast_path_rejects_junction_graphs() {
743        let graph = GraphDsl::try_create(|builder| {
744            let balance = builder.add(Balance::<i32>::new(2));
745            let merge = builder.add(Merge::<i32>::new(2));
746
747            builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
748            builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
749
750            Ok(FlowShape::new(balance.inlet(), merge.outlet()))
751        })
752        .unwrap();
753
754        assert!(matches!(
755            graph.run_typed_linear_count_with_input([1, 2, 3]),
756            Err(StreamError::GraphValidation(_))
757        ));
758    }
759
760    #[test]
761    fn merge_preferred_drains_preferred_before_secondaries() {
762        let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i32>::new(2))).unwrap();
763
764        assert_eq!(
765            graph
766                .run_merge_preferred(vec![1, 2, 3], vec![vec![100, 101], vec![200]])
767                .unwrap(),
768            vec![1, 2, 3, 100, 200, 101]
769        );
770    }
771
772    #[test]
773    fn merge_waits_for_all_inputs_to_complete() {
774        let graph = GraphDsl::create(|builder| builder.add(Merge::<i32>::new(2))).unwrap();
775
776        assert_eq!(
777            graph.run_fan_in(vec![vec![], vec![10, 20]]).unwrap(),
778            vec![10, 20]
779        );
780        assert_eq!(graph.run_fan_in(vec![vec![1], vec![]]).unwrap(), vec![1]);
781    }
782
783    #[test]
784    fn merge_sorted_drains_remaining_input_after_peer_completes() {
785        let graph = GraphDsl::try_create(|builder| {
786            let unzip = builder.add(Unzip::<i32, i32>::new());
787            let merge = builder.add(MergeSorted::<i32>::new());
788
789            builder.connect(unzip.out0(), merge.inlet(0)?)?;
790            builder.connect(unzip.out1(), merge.inlet(1)?)?;
791
792            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
793        })
794        .unwrap();
795
796        assert_eq!(
797            graph.run_with_input([(1, 2), (4, 3), (6, 5)]).unwrap(),
798            vec![1, 2, 3, 4, 5, 6]
799        );
800    }
801
802    #[test]
803    fn merge_sequence_reorders_adversarial_arrivals_by_sequence_number() {
804        let graph = GraphDsl::try_create(|builder| {
805            let unzip = builder.add(Unzip::<u64, u64>::new());
806            let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
807
808            builder.connect(unzip.out0(), merge.inlet(0)?)?;
809            builder.connect(unzip.out1(), merge.inlet(1)?)?;
810
811            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
812        })
813        .unwrap();
814
815        assert_eq!(
816            graph.run_with_input([(0, 1), (2, 3), (4, 5)]).unwrap(),
817            vec![0, 1, 2, 3, 4, 5]
818        );
819    }
820
821    #[test]
822    fn merge_latest_emits_with_last_seen_peer_and_honors_eager_complete() {
823        let graph = GraphDsl::try_create(|builder| {
824            let unzip = builder.add(Unzip::<i32, i32>::new());
825            let merge = builder.add(MergeLatest::<i32>::new(2, false));
826
827            builder.connect(unzip.out0(), merge.inlet(0)?)?;
828            builder.connect(unzip.out1(), merge.inlet(1)?)?;
829
830            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
831        })
832        .unwrap();
833
834        assert_eq!(
835            graph.run_with_input([(1, 10), (2, 11)]).unwrap(),
836            vec![vec![1, 10], vec![2, 10], vec![2, 11]]
837        );
838    }
839
840    #[test]
841    fn zip_completes_when_any_input_completes() {
842        let graph = GraphDsl::create(|builder| builder.add(Zip::<i32, i32>::new())).unwrap();
843
844        assert_eq!(
845            graph.run_zip(vec![1, 2, 3], vec![10]).unwrap(),
846            vec![(1, 10)]
847        );
848        assert_eq!(graph.run_zip(vec![1], vec![10, 20]).unwrap(), vec![(1, 10)]);
849        assert_eq!(
850            graph.run_zip(vec![], vec![10, 20]).unwrap(),
851            Vec::<(i32, i32)>::new()
852        );
853    }
854
855    #[test]
856    fn concat_drains_inputs_in_declared_order() {
857        let graph = GraphDsl::create(|builder| builder.add(Concat::<i32>::new(3))).unwrap();
858
859        assert_eq!(
860            graph
861                .run_concat(vec![vec![1, 2], vec![], vec![3, 4]])
862                .unwrap(),
863            vec![1, 2, 3, 4]
864        );
865    }
866
867    #[test]
868    fn or_else_switches_only_if_primary_is_empty() {
869        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
870
871        assert_eq!(
872            graph.run_or_else(vec![], vec![10, 20]).unwrap(),
873            vec![10, 20]
874        );
875        assert_eq!(
876            graph.run_or_else(vec![1, 2], vec![10, 20]).unwrap(),
877            vec![1, 2]
878        );
879    }
880
881    #[test]
882    fn or_else_secondary_first_dropped_when_primary_emits() {
883        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
884
885        assert_eq!(
886            graph
887                .run_or_else_secondary_first(vec![1, 2], vec![10, 20])
888                .unwrap(),
889            vec![1, 2]
890        );
891    }
892
893    #[test]
894    fn or_else_secondary_first_flushed_when_primary_empty() {
895        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
896
897        assert_eq!(
898            graph
899                .run_or_else_secondary_first(vec![], vec![10, 20])
900                .unwrap(),
901            vec![10, 20]
902        );
903    }
904
905    #[test]
906    fn or_else_secondary_closed_then_primary_empty_drains_buffer() {
907        let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
908
909        assert_eq!(
910            graph
911                .run_or_else_secondary_closed_first(vec![10, 20])
912                .unwrap(),
913            vec![10, 20]
914        );
915    }
916
917    #[test]
918    fn interleave_cycles_segment_sized_chunks() {
919        let graph = GraphDsl::create(|builder| builder.add(Interleave::<i32>::new(3, 2))).unwrap();
920
921        assert_eq!(
922            graph
923                .run_interleave(vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]], 2, false)
924                .unwrap(),
925            vec![1, 2, 10, 11, 20, 3, 12]
926        );
927    }
928
929    #[test]
930    fn interleave_eager_close_stops_when_any_input_completes() {
931        let graph = GraphDsl::create(|builder| {
932            builder.add(Interleave::<i32>::new_with_eager_close(2, 1, true))
933        })
934        .unwrap();
935
936        assert_eq!(
937            graph
938                .run_interleave(vec![vec![1, 2], vec![]], 1, true)
939                .unwrap(),
940            Vec::<i32>::new()
941        );
942        assert_eq!(
943            graph
944                .run_interleave(vec![vec![1], vec![10, 11]], 1, true)
945                .unwrap(),
946            vec![1, 10]
947        );
948    }
949
950    #[test]
951    fn partition_routes_only_live_outlets_after_peer_cancels() {
952        let graph = GraphDsl::try_create(|builder| {
953            let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
954            let merge = builder.add(Merge::<i32>::new(2));
955
956            builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
957            builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
958
959            Ok(FlowShape::new(partition.inlet(), merge.outlet()))
960        })
961        .unwrap();
962
963        assert_eq!(
964            graph.run_with_input([0, 1, 2, 3]).unwrap(),
965            vec![0, 1, 2, 3]
966        );
967    }
968
969    #[test]
970    fn unzip_with_keeps_live_outlet_running_after_peer_finishes() {
971        let graph = GraphDsl::try_create(|builder| {
972            let unzip = builder.add(UnzipWith::<i32, i32, i32>::new(|item| (item, item * 10)));
973            let zip = builder.add(Zip::<i32, i32>::new());
974
975            builder.connect(unzip.out0(), zip.in0())?;
976            builder.connect(unzip.out1(), zip.in1())?;
977
978            Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
979        })
980        .unwrap();
981
982        assert_eq!(
983            graph.run_with_input([1, 2, 3]).unwrap(),
984            vec![(1, 10), (2, 20), (3, 30)]
985        );
986    }
987
988    // --- Regression tests pinning WP-17a review-comment fixes ---
989
990    // Fix 1a: UnzipFanInFastPath preserves target inlet order for MergeSorted.
991    // out0 is wired to inlet(1) and out1 to inlet(0) — values must still be
992    // sorted correctly regardless of swapped wiring.
993    #[test]
994    fn unzip_merge_sorted_swapped_inlets_still_sorted() {
995        let graph_normal = GraphDsl::try_create(|builder| {
996            let unzip = builder.add(Unzip::<i32, i32>::new());
997            let merge = builder.add(MergeSorted::<i32>::new());
998            builder.connect(unzip.out0(), merge.inlet(0)?)?;
999            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1000            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1001        })
1002        .unwrap();
1003
1004        // Swapped: out0 → inlet(1), out1 → inlet(0).
1005        let graph_swapped = GraphDsl::try_create(|builder| {
1006            let unzip = builder.add(Unzip::<i32, i32>::new());
1007            let merge = builder.add(MergeSorted::<i32>::new());
1008            builder.connect(unzip.out0(), merge.inlet(1)?)?;
1009            builder.connect(unzip.out1(), merge.inlet(0)?)?;
1010            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1011        })
1012        .unwrap();
1013
1014        let input = vec![(1, 2), (4, 3), (6, 5)];
1015        let expected = vec![1, 2, 3, 4, 5, 6];
1016        assert_eq!(
1017            graph_normal.run_with_input(input.clone()).unwrap(),
1018            expected
1019        );
1020        assert_eq!(graph_swapped.run_with_input(input).unwrap(), expected);
1021    }
1022
1023    // Fix 1b: UnzipFanInFastPath preserves target inlet order for MergeLatest.
1024    // With swapped wiring the snapshot elements must reflect which inlet each
1025    // value was routed to, producing the same result as the non-swapped graph.
1026    #[test]
1027    fn unzip_merge_latest_swapped_inlets_correct_snapshot_order() {
1028        // Normal wiring: out0 → inlet(0), out1 → inlet(1).
1029        // Input (1, 10): latest = [Some(1), Some(10)] → snapshot [1, 10].
1030        let graph_normal = GraphDsl::try_create(|builder| {
1031            let unzip = builder.add(Unzip::<i32, i32>::new());
1032            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1033            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1034            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1035            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1036        })
1037        .unwrap();
1038
1039        // Swapped: out0 → inlet(1), out1 → inlet(0).
1040        // Input (1, 10): out0=1 goes to inlet(1), out1=10 goes to inlet(0).
1041        // latest = [Some(10), Some(1)] → snapshot [10, 1].
1042        let graph_swapped = GraphDsl::try_create(|builder| {
1043            let unzip = builder.add(Unzip::<i32, i32>::new());
1044            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1045            builder.connect(unzip.out0(), merge.inlet(1)?)?;
1046            builder.connect(unzip.out1(), merge.inlet(0)?)?;
1047            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1048        })
1049        .unwrap();
1050
1051        assert_eq!(
1052            graph_normal.run_with_input([(1, 10)]).unwrap(),
1053            vec![vec![1, 10]]
1054        );
1055        // Swapped wiring routes values to opposite slots; snapshot order differs.
1056        assert_eq!(
1057            graph_swapped.run_with_input([(1, 10)]).unwrap(),
1058            vec![vec![10, 1]]
1059        );
1060    }
1061
1062    // Fix 2: MergeSequence fails on a sequence gap when all inputs complete.
1063    // The GraphStage logic errors if `pending` holds items but next_sequence
1064    // is not among them — the fused executor must mirror that behavior.
1065    #[test]
1066    fn merge_sequence_fails_on_gap_at_completion() {
1067        // Sequences 1 and 2 arrive (via the unzip split of pair (1, 2)), but
1068        // sequence 0 never does.  When the upstream completes both inlets,
1069        // next_sequence is still 0, there is no sequence 0 in pending → error.
1070        let graph = GraphDsl::try_create(|builder| {
1071            let unzip = builder.add(Unzip::<u64, u64>::new());
1072            let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
1073            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1074            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1075            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1076        })
1077        .unwrap();
1078
1079        let result = graph.run_with_input([(1u64, 2u64)]);
1080        assert!(
1081            matches!(result, Err(StreamError::Failed(ref msg)) if msg.contains("expected sequence")),
1082            "expected a sequence-gap error, got: {result:?}"
1083        );
1084    }
1085
1086    // Fix 3: MergeLatest honors eager_complete — stream ends as soon as any
1087    // inlet completes when eager_complete = true.
1088    #[test]
1089    fn merge_latest_eager_complete_closes_on_first_inlet_done() {
1090        // With eager_complete = true, the stage should complete when the first
1091        // input stream finishes (here the single-item upstream drains first).
1092        // The non-eager variant of the same graph would keep running until both
1093        // inlets complete.
1094        let graph_eager = GraphDsl::try_create(|builder| {
1095            let unzip = builder.add(Unzip::<i32, i32>::new());
1096            let merge = builder.add(MergeLatest::<i32>::new(2, true));
1097            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1098            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1099            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1100        })
1101        .unwrap();
1102
1103        let graph_non_eager = GraphDsl::try_create(|builder| {
1104            let unzip = builder.add(Unzip::<i32, i32>::new());
1105            let merge = builder.add(MergeLatest::<i32>::new(2, false));
1106            builder.connect(unzip.out0(), merge.inlet(0)?)?;
1107            builder.connect(unzip.out1(), merge.inlet(1)?)?;
1108            Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1109        })
1110        .unwrap();
1111
1112        // Single item: after this item both outlets close simultaneously (the
1113        // Unzip upstream completes), so both eager and non-eager complete after
1114        // emitting the first snapshot.
1115        let result_eager = graph_eager.run_with_input([(1i32, 10i32)]).unwrap();
1116        let result_non_eager = graph_non_eager.run_with_input([(1i32, 10i32)]).unwrap();
1117
1118        // Both should produce at least one snapshot.
1119        assert!(!result_eager.is_empty(), "eager graph produced no output");
1120        assert!(
1121            !result_non_eager.is_empty(),
1122            "non-eager graph produced no output"
1123        );
1124
1125        // Produce the same result for a balanced input — the eager flag only
1126        // changes behavior when ONE inlet finishes before the other.
1127        assert_eq!(result_eager, result_non_eager);
1128    }
1129}