1use std::{
10 any::{Any, TypeId, type_name},
11 collections::{HashMap, HashSet, VecDeque},
12 fmt,
13 marker::PhantomData,
14 sync::{
15 Arc, Mutex, OnceLock,
16 atomic::{AtomicUsize, Ordering},
17 },
18 thread,
19};
20
21#[cfg(test)]
22use std::sync::mpsc;
23
24use crate::{
25 actor::{Actor, ActorProcessingErr, ActorRef},
26 stream::{StreamError, StreamResult},
27};
28
29type DatumValue = Box<dyn DatumElement>;
30type StageMapFn = dyn Fn(DatumValue) -> StreamResult<DatumValue> + Send + Sync;
31type StageTypedMapFn = dyn Any + Send + Sync;
32type StageZipFn = dyn Fn(DatumValue, DatumValue) -> StreamResult<DatumValue> + Send + Sync;
33type StageUnzipFn = dyn Fn(DatumValue) -> (DatumValue, DatumValue) + Send + Sync;
34type StageTypedUnzipFn = dyn Any + Send + Sync;
37type StageCompareFn = dyn Fn(&DatumValue, &DatumValue) -> std::cmp::Ordering + Send + Sync;
38type StageSequenceFn = dyn Fn(&DatumValue) -> u64 + Send + Sync;
39type StageTypedSequenceFn = dyn Any + Send + Sync;
42type StageSnapshotFn = dyn Fn(&[&DatumValue]) -> DatumValue + Send + Sync;
43type StageTypedSnapshotFn = dyn Any + Send + Sync;
46type StagePartitionFn = dyn Fn(&DatumValue) -> usize + Send + Sync;
47
48#[derive(Clone)]
49struct StageMapFns {
50 erased: Arc<StageMapFn>,
51 typed: Arc<StageTypedMapFn>,
52}
53
54pub(crate) trait DatumElement: Any + Send {
55 fn clone_box(&self) -> DatumValue;
56 fn into_any(self: Box<Self>) -> Box<dyn Any + Send>;
57 fn as_any_ref(&self) -> &dyn Any;
58}
59
60impl<T> DatumElement for T
61where
62 T: Any + Clone + Send,
63{
64 fn clone_box(&self) -> DatumValue {
65 Box::new(self.clone())
66 }
67
68 fn into_any(self: Box<Self>) -> Box<dyn Any + Send> {
69 self
70 }
71
72 fn as_any_ref(&self) -> &dyn Any {
73 self
74 }
75}
76
77fn datum<T>(value: T) -> DatumValue
78where
79 T: Clone + Send + 'static,
80{
81 Box::new(value)
82}
83
84fn downcast_datum<T, S>(
85 value: DatumValue,
86 operation: &'static str,
87 port: impl FnOnce() -> S,
88) -> StreamResult<T>
89where
90 T: Send + 'static,
91 S: Into<String>,
92{
93 value
98 .into_any()
99 .downcast::<T>()
100 .map(|value| *value)
101 .map_err(|_| StreamError::InvalidPortOperation {
102 operation,
103 port: port().into(),
104 reason: format!("element type did not match {}", type_name::<T>()),
105 })
106}
107
108static NEXT_PORT_ID: AtomicUsize = AtomicUsize::new(1);
109
110fn next_port_id() -> PortId {
111 PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed))
112}
113
114fn next_port_id_block(count: usize) -> PortId {
115 debug_assert!(count > 0);
116 PortId(NEXT_PORT_ID.fetch_add(count, Ordering::Relaxed))
117}
118
119#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
120pub struct PortId(usize);
121
122impl PortId {
123 #[must_use]
124 pub const fn as_usize(self) -> usize {
125 self.0
126 }
127
128 const fn offset(self, offset: usize) -> Self {
129 Self(self.0 + offset)
130 }
131}
132
133#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
134pub enum PortKind {
135 Inlet,
136 Outlet,
137}
138
139macro_rules! shared_name {
140 ($fn_name:ident, $value:literal) => {
141 fn $fn_name() -> Arc<str> {
142 static NAME: OnceLock<Arc<str>> = OnceLock::new();
143 Arc::clone(NAME.get_or_init(|| Arc::from($value)))
144 }
145 };
146}
147
148shared_name!(identity_stage_name, "Identity");
149shared_name!(identity_inlet_name, "Identity.in");
150shared_name!(identity_outlet_name, "Identity.out");
151shared_name!(map_stage_name, "Map");
152shared_name!(map_inlet_name, "Map.in");
153shared_name!(map_outlet_name, "Map.out");
154shared_name!(broadcast_stage_name, "Broadcast");
155shared_name!(broadcast_inlet_name, "Broadcast.in");
156shared_name!(balance_stage_name, "Balance");
157shared_name!(balance_inlet_name, "Balance.in");
158shared_name!(merge_stage_name, "Merge");
159shared_name!(merge_outlet_name, "Merge.out");
160shared_name!(merge_preferred_stage_name, "MergePreferred");
161shared_name!(merge_preferred_preferred_name, "MergePreferred.preferred");
162shared_name!(merge_preferred_outlet_name, "MergePreferred.out");
163shared_name!(merge_prioritized_stage_name, "MergePrioritized");
164shared_name!(merge_prioritized_outlet_name, "MergePrioritized.out");
165shared_name!(concat_stage_name, "Concat");
166shared_name!(concat_outlet_name, "Concat.out");
167shared_name!(or_else_stage_name, "OrElse");
168shared_name!(or_else_primary_name, "OrElse.primary");
169shared_name!(or_else_secondary_name, "OrElse.secondary");
170shared_name!(or_else_outlet_name, "OrElse.out");
171shared_name!(interleave_stage_name, "Interleave");
172shared_name!(interleave_outlet_name, "Interleave.out");
173shared_name!(zip_stage_name, "Zip");
174shared_name!(zip_in0_name, "Zip.in0");
175shared_name!(zip_in1_name, "Zip.in1");
176shared_name!(zip_outlet_name, "Zip.out");
177shared_name!(async_boundary_stage_name, "AsyncBoundary");
178shared_name!(async_boundary_inlet_name, "AsyncBoundary.in");
179shared_name!(async_boundary_outlet_name, "AsyncBoundary.out");
180
181mod builder;
182mod executor;
183mod junctions;
184mod ports;
185mod shapes;
186mod stage;
187
188#[cfg(test)]
189use self::executor::BoundaryCountExecutor;
190use self::{builder::StageRecord, shapes::PortAllocator, stage::StageKind};
191
192pub use self::{
193 builder::{
194 AsyncBoundaryExecutionConfig, FusedExecutionConfig, FusedExecutionReport, FusedSegment,
195 FusedTerminalReport, Graph, GraphBlueprint, GraphBuilder, GraphDsl, ImportedGraph,
196 PartialGraph,
197 },
198 junctions::{
199 AsyncBoundary, Balance, Broadcast, Concat, Identity, Interleave, MapStage, Merge,
200 MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted, OrElse,
201 Partition, Unzip, UnzipWith, Zip,
202 },
203 ports::{AnyInlet, AnyOutlet, Inlet, Outlet, PortRef},
204 shapes::{
205 BidiShape, FanInShape, FanOutShape, FanOutShape2, FlowShape, MergePreferredShape, Shape,
206 SinkShape, SourceShape, ZipShape,
207 },
208 stage::{AsyncCallback, GraphStage, GraphStageLogic, InHandler, OutHandler, StageSpec},
209};
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::Attributes;
215
216 #[test]
217 fn graph_dsl_builds_broadcast_zip_flow() {
218 let graph = GraphDsl::try_create(|builder| {
219 let broadcast = builder.add(Broadcast::<i32>::new(2));
220 let zip = builder.add(Zip::<i32, i32>::new());
221
222 builder.connect(broadcast.outlet(0)?, zip.in0())?;
223 builder.connect(broadcast.outlet(1)?, zip.in1())?;
224
225 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
226 })
227 .unwrap();
228
229 assert_eq!(graph.stage_count(), 2);
230 assert_eq!(graph.edge_count(), 2);
231 assert_eq!(graph.shape().inlets().len(), 1);
232 assert_eq!(graph.shape().outlets().len(), 1);
233 assert_eq!(
234 graph.run_with_input([1, 2, 3]).unwrap(),
235 vec![(1, 1), (2, 2), (3, 3)]
236 );
237 }
238
239 #[test]
240 fn graph_dsl_zip_slots_follow_inlet_ids() {
241 let graph = GraphDsl::try_create(|builder| {
242 let broadcast = builder.add(Broadcast::<i32>::new(2));
243 let identity = builder.add(Identity::<i32>::new());
244 let zip = builder.add(Zip::<i32, i32>::new());
245
246 builder.connect(broadcast.outlet(0)?, identity.inlet())?;
247 builder.connect(identity.outlet(), zip.in1())?;
248 builder.connect(broadcast.outlet(1)?, zip.in0())?;
249
250 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
251 })
252 .unwrap();
253
254 assert_eq!(
255 graph.run_with_input([1, 2, 3]).unwrap(),
256 vec![(1, 1), (2, 2), (3, 3)]
257 );
258 }
259
260 #[test]
261 fn graph_dsl_zip_buffers_skewed_inlet_arrivals() {
262 let graph = GraphDsl::try_create(|builder| {
278 let fan = builder.add(Broadcast::<i32>::new(2));
279 let doubler = builder.add(Broadcast::<i32>::new(2));
280 let merge = builder.add(Merge::<i32>::new(2));
281 let zip = builder.add(Zip::<i32, i32>::new());
282
283 builder.connect(fan.outlet(0)?, doubler.inlet())?;
284 builder.connect(doubler.outlet(0)?, merge.inlet(0)?)?;
285 builder.connect(doubler.outlet(1)?, merge.inlet(1)?)?;
286 builder.connect(merge.outlet(), zip.in0())?;
287 builder.connect(fan.outlet(1)?, zip.in1())?;
288
289 Ok(FlowShape::new(fan.inlet(), zip.outlet()))
290 })
291 .unwrap();
292
293 assert_eq!(
297 graph.run_with_input([10, 20]).unwrap(),
298 vec![(10, 10), (10, 20)]
299 );
300 }
301
302 #[test]
303 fn graph_dsl_rejects_cycles() {
304 let graph = GraphDsl::try_create(|builder| {
307 let merge = builder.add(Merge::<i32>::new(2));
308 let broadcast = builder.add(Broadcast::<i32>::new(2));
309 builder.connect(merge.outlet(), broadcast.inlet())?;
310 builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
311 Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
312 });
313 let Err(StreamError::GraphValidation(message)) = graph else {
314 panic!("cyclic graph should be rejected");
315 };
316 assert!(message.contains("WP-16"));
317 assert!(message.contains("demand-aware graph interpreter"));
318 }
319
320 #[test]
321 fn partial_graph_can_be_imported_with_its_shape() {
322 let partial = GraphDsl::partial(|builder| {
323 let first = builder.add(Identity::<i32>::new());
324 let second = builder.add_named(Identity::<i32>::new(), "partial.tail");
325 builder.connect(first.outlet(), second.inlet())?;
326 Ok(FlowShape::new(first.inlet(), second.outlet()))
327 })
328 .named("partial.identity");
329
330 let graph = GraphDsl::try_create(|builder| {
331 let imported = builder.import(&partial)?;
332 let after = builder.add(Identity::<i32>::new());
333 builder.connect(imported.outlet(), after.inlet())?;
334 Ok(FlowShape::new(imported.inlet(), after.outlet()))
335 })
336 .unwrap()
337 .named("outer.graph");
338
339 assert_eq!(graph.run_with_input([1, 2, 3]).unwrap(), vec![1, 2, 3]);
340 assert_eq!(graph.attributes().name(), Some("outer.graph"));
341 }
342
343 #[test]
344 fn graph_attributes_follow_innermost_wins_order() {
345 let graph = GraphDsl::create(|builder| {
346 builder.add_with_attributes(
347 Identity::<i32>::new(),
348 Attributes::named("stage-outer").and(Attributes::named("stage-inner")),
349 )
350 })
351 .unwrap()
352 .add_attributes(Attributes::dispatcher("graph-outer"))
353 .add_attributes(Attributes::dispatcher("graph-inner"));
354
355 assert_eq!(graph.attributes().dispatcher_hint(), Some("graph-inner"));
356 assert_eq!(
357 graph.stages[0].spec.attributes().name(),
358 Some("stage-inner")
359 );
360 }
361
362 #[test]
363 fn graph_dsl_rejects_invalid_and_incomplete_wiring() {
364 let graph = GraphDsl::try_create(|builder| {
365 let first = builder.add(Identity::<i32>::new());
366 let second = builder.add(Identity::<i32>::new());
367 let third = builder.add(Identity::<i32>::new());
368
369 builder.connect(first.outlet(), second.inlet())?;
370 let duplicate = builder.connect(first.outlet(), third.inlet());
371 assert!(matches!(duplicate, Err(StreamError::GraphValidation(_))));
372
373 Ok(FlowShape::new(first.inlet(), second.outlet()))
374 });
375
376 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
377
378 let graph = GraphDsl::create(|builder| {
379 let broadcast = builder.add(Broadcast::<i32>::new(2));
380 SourceShape::new(broadcast.outlet(0).unwrap())
381 });
382 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
383 }
384
385 #[test]
386 fn graph_dsl_rejects_erased_type_mismatch() {
387 let graph = GraphDsl::try_create(|builder| {
388 let left = builder.add(Identity::<i32>::new());
389 let right = builder.add(Identity::<u64>::new());
390 let mismatch = builder.connect_any(left.outlet().erase(), right.inlet().erase());
391 assert!(matches!(mismatch, Err(StreamError::GraphValidation(_))));
392 Ok(FlowShape::new(left.inlet(), right.outlet()))
393 });
394
395 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
396 }
397
398 #[test]
399 fn graph_dsl_rejects_result_ports_with_spoofed_metadata() {
400 let graph = GraphDsl::create(|builder| {
401 let shape = builder.add(Identity::<i32>::new());
402 FlowShape::new(
403 Inlet::<u64>::with_id(shape.inlet().id(), "Identity.in"),
404 shape.outlet(),
405 )
406 });
407
408 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
409 }
410
411 #[test]
412 fn allocated_ports_do_not_collide_with_manual_ports() {
413 let manual = Inlet::<i32>::new("manual");
414 let graph = GraphDsl::try_create(|builder| {
415 let shape = builder.add(Identity::<i32>::new());
416 assert_ne!(manual.id(), shape.inlet().id());
417 Ok(shape)
418 });
419
420 assert!(graph.is_ok());
421 }
422
423 #[test]
424 fn identity_ports_keep_static_names_and_unique_ids() {
425 let graph = GraphDsl::try_create(|builder| {
426 let first = builder.add(Identity::<i32>::new());
427 let second = builder.add(Identity::<i32>::new());
428
429 assert_eq!(first.inlet().name(), "Identity.in");
430 assert_eq!(first.outlet().name(), "Identity.out");
431 assert_eq!(second.inlet().name(), "Identity.in");
432 assert_eq!(second.outlet().name(), "Identity.out");
433 assert_ne!(first.inlet().id(), first.outlet().id());
434 assert_ne!(first.outlet().id(), second.inlet().id());
435 assert_ne!(second.inlet().id(), second.outlet().id());
436
437 builder.connect(first.outlet(), second.inlet())?;
438 Ok(FlowShape::new(first.inlet(), second.outlet()))
439 });
440
441 assert!(graph.is_ok());
442 }
443
444 #[test]
445 fn graph_stage_logic_checks_port_operations() {
446 let shape = FlowShape::new(Inlet::<i32>::new("in"), Outlet::<i32>::new("out"));
447 let inlet = shape.inlet();
448 let outlet = shape.outlet();
449 let mut logic = GraphStageLogic::new(&shape);
450
451 assert!(matches!(
452 logic.grab(&inlet),
453 Err(StreamError::InvalidPortOperation { .. })
454 ));
455 logic.pull(&inlet).unwrap();
456 assert!(logic.has_been_pulled(&inlet));
457 assert!(matches!(
458 logic.pull(&inlet),
459 Err(StreamError::InvalidPortOperation { .. })
460 ));
461 logic.offer(&inlet, 41).unwrap();
462 assert!(!logic.has_been_pulled(&inlet));
463 assert_eq!(logic.grab(&inlet).unwrap(), 41);
464
465 assert!(matches!(
466 logic.push(&outlet, 1),
467 Err(StreamError::InvalidPortOperation { .. })
468 ));
469 logic.request(&outlet).unwrap();
470 assert!(logic.is_available(&outlet));
471 logic.push(&outlet, 42).unwrap();
472 assert!(!logic.is_available(&outlet));
473 logic.complete(&outlet).unwrap();
474 assert!(logic.is_closed(&outlet));
475 assert!(matches!(
476 logic.request(&outlet),
477 Err(StreamError::InvalidPortOperation { .. })
478 ));
479 }
480
481 #[test]
482 fn fused_execution_enforces_event_limit() {
483 let graph = GraphDsl::create(|builder| builder.add(Identity::<i32>::new())).unwrap();
484
485 let result = graph.run_with_input_report([1, 2], FusedExecutionConfig { event_limit: 1 });
486
487 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 1 }));
488 }
489
490 #[test]
491 fn async_boundary_splits_fused_segments() {
492 let graph = GraphDsl::try_create(|builder| {
493 let first = builder.add(Identity::<i32>::new());
494 let boundary = builder.add(AsyncBoundary::<i32>::new());
495 let second = builder.add(MapStage::new(|item: i32| item + 1));
496
497 builder.connect(first.outlet(), boundary.inlet())?;
498 builder.connect(boundary.outlet(), second.inlet())?;
499
500 Ok(FlowShape::new(first.inlet(), second.outlet()))
501 })
502 .unwrap();
503
504 assert_eq!(graph.segments().len(), 3);
505 let report = graph
506 .run_with_input_report([1, 2], FusedExecutionConfig::default())
507 .unwrap();
508 assert_eq!(report.output, vec![2, 3]);
509 assert_eq!(report.async_boundary_crossings, 2);
510 }
511
512 #[test]
513 fn async_boundary_count_path_uses_ractor_handoff_segments() {
514 let graph = GraphDsl::try_create(|builder| {
515 let first = builder.add(MapStage::new(|item: i32| item + 1));
516 let boundary = builder.add(AsyncBoundary::<i32>::new());
517 let second = builder.add(MapStage::new(|item: i32| item * 2));
518
519 builder.connect(first.outlet(), boundary.inlet())?;
520 builder.connect(boundary.outlet(), second.inlet())?;
521
522 Ok(FlowShape::new(first.inlet(), second.outlet()))
523 })
524 .unwrap();
525
526 let report = graph
527 .run_async_boundary_count_with_input_report(
528 [1, 2, 3],
529 AsyncBoundaryExecutionConfig {
530 fused: FusedExecutionConfig::default(),
531 buffer_size: 2,
532 },
533 )
534 .unwrap();
535
536 assert_eq!(report.result, 3);
537 assert_eq!(report.async_boundary_crossings, 3);
538 assert_eq!(report.events, 18);
539 }
540
541 #[test]
542 fn threaded_async_boundary_baseline_matches_ractor_count_path() {
543 let graph = GraphDsl::try_create(|builder| {
544 let first = builder.add(MapStage::new(|item: i32| item + 1));
545 let boundary = builder.add(AsyncBoundary::<i32>::new());
546 let second = builder.add(MapStage::new(|item: i32| item * 2));
547
548 builder.connect(first.outlet(), boundary.inlet())?;
549 builder.connect(boundary.outlet(), second.inlet())?;
550
551 Ok(FlowShape::new(first.inlet(), second.outlet()))
552 })
553 .unwrap();
554
555 let config = AsyncBoundaryExecutionConfig {
556 fused: FusedExecutionConfig::default(),
557 buffer_size: 2,
558 };
559 let ractor_report = graph
560 .run_async_boundary_count_with_input_report([1, 2, 3], config)
561 .unwrap();
562 let threaded_report = BoundaryCountExecutor::Threaded
563 .run_count(
564 [1, 2, 3],
565 graph.typed_linear_async_segments().unwrap(),
566 config,
567 )
568 .unwrap();
569
570 assert_eq!(threaded_report, ractor_report);
571 }
572
573 #[test]
574 fn ractor_async_boundary_rejects_zero_buffer_size() {
575 let graph = GraphDsl::try_create(|builder| {
576 let first = builder.add(MapStage::new(|item: i32| item + 1));
577 let boundary = builder.add(AsyncBoundary::<i32>::new());
578 let second = builder.add(MapStage::new(|item: i32| item * 2));
579
580 builder.connect(first.outlet(), boundary.inlet())?;
581 builder.connect(boundary.outlet(), second.inlet())?;
582
583 Ok(FlowShape::new(first.inlet(), second.outlet()))
584 })
585 .unwrap();
586
587 let result = BoundaryCountExecutor::Ractor.run_count(
588 [1, 2, 3],
589 graph.typed_linear_async_segments().unwrap(),
590 AsyncBoundaryExecutionConfig {
591 fused: FusedExecutionConfig::default(),
592 buffer_size: 0,
593 },
594 );
595
596 assert!(matches!(
597 result,
598 Err(StreamError::GraphValidation(message)) if message.contains("buffer_size")
599 ));
600 }
601
602 #[test]
603 fn ractor_async_boundary_streams_input_without_eager_collection() {
604 let graph = GraphDsl::try_create(|builder| {
605 let first = builder.add(MapStage::new(|item: i32| item + 1));
606 let boundary = builder.add(AsyncBoundary::<i32>::new());
607 let second = builder.add(MapStage::new(|item: i32| item * 2));
608
609 builder.connect(first.outlet(), boundary.inlet())?;
610 builder.connect(boundary.outlet(), second.inlet())?;
611
612 Ok(FlowShape::new(first.inlet(), second.outlet()))
613 })
614 .unwrap();
615
616 let result = BoundaryCountExecutor::Ractor.run_count(
617 std::iter::repeat(1_i32),
618 graph.typed_linear_async_segments().unwrap(),
619 AsyncBoundaryExecutionConfig {
620 fused: FusedExecutionConfig { event_limit: 4 },
621 buffer_size: 1,
622 },
623 );
624
625 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 4 }));
626 }
627
628 #[test]
629 fn ractor_async_boundary_runs_inside_existing_tokio_runtime() {
630 let graph = GraphDsl::try_create(|builder| {
631 let first = builder.add(MapStage::new(|item: i32| item + 1));
632 let boundary = builder.add(AsyncBoundary::<i32>::new());
633 let second = builder.add(MapStage::new(|item: i32| item * 2));
634
635 builder.connect(first.outlet(), boundary.inlet())?;
636 builder.connect(boundary.outlet(), second.inlet())?;
637
638 Ok(FlowShape::new(first.inlet(), second.outlet()))
639 })
640 .unwrap();
641
642 let runtime = tokio::runtime::Builder::new_current_thread()
643 .build()
644 .unwrap();
645 let report = runtime
646 .block_on(async {
647 BoundaryCountExecutor::Ractor.run_count(
648 [1, 2, 3],
649 graph.typed_linear_async_segments().unwrap(),
650 AsyncBoundaryExecutionConfig {
651 fused: FusedExecutionConfig::default(),
652 buffer_size: 2,
653 },
654 )
655 })
656 .unwrap();
657
658 assert_eq!(report.result, 3);
659 assert_eq!(report.async_boundary_crossings, 3);
660 assert_eq!(report.events, 18);
661 }
662
663 #[test]
664 fn balance_merge_round_robins_through_junctions() {
665 let graph = GraphDsl::try_create(|builder| {
666 let balance = builder.add(Balance::<i32>::new(2));
667 let merge = builder.add(Merge::<i32>::new(2));
668
669 builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
670 builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
671
672 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
673 })
674 .unwrap();
675
676 assert_eq!(graph.run_with_input(0..6).unwrap(), vec![0, 1, 2, 3, 4, 5]);
677 }
678
679 #[test]
680 fn prioritized_merge_uses_weighted_schedule() {
681 let graph =
682 GraphDsl::create(|builder| builder.add(MergePrioritized::<i32>::new(vec![2, 1])))
683 .unwrap();
684
685 assert_eq!(
686 graph
687 .run_fan_in(vec![vec![1, 2, 3, 4], vec![100, 101]])
688 .unwrap(),
689 vec![1, 2, 100, 3, 4, 101]
690 );
691 }
692
693 #[test]
694 fn fused_execution_supports_count_and_fold_sinks() {
695 let graph = GraphDsl::try_create(|builder| {
696 let first = builder.add(MapStage::new(|item: u64| item + 1));
697 let second = builder.add(MapStage::new(|item: u64| item * 2));
698
699 builder.connect(first.outlet(), second.inlet())?;
700
701 Ok(FlowShape::new(first.inlet(), second.outlet()))
702 })
703 .unwrap();
704
705 assert_eq!(graph.run_count_with_input(0..4).unwrap(), 4);
706 assert_eq!(
707 graph
708 .run_fold_with_input(0..4, 0, |acc, item| acc + item)
709 .unwrap(),
710 20
711 );
712 }
713
714 #[test]
715 fn typed_linear_fast_path_runs_same_type_chains() {
716 let graph = GraphDsl::try_create(|builder| {
717 let first = builder.add(MapStage::new(|item: u64| item + 1));
718 let second = builder.add(MapStage::new(|item: u64| item * 2));
719
720 builder.connect(first.outlet(), second.inlet())?;
721
722 Ok(FlowShape::new(first.inlet(), second.outlet()))
723 })
724 .unwrap();
725
726 let report = graph
727 .run_typed_linear_with_input_report([1, 2, 3], FusedExecutionConfig::default())
728 .unwrap();
729 assert_eq!(report.output, vec![4, 6, 8]);
730 assert_eq!(report.events, 12);
731
732 assert_eq!(graph.run_typed_linear_count_with_input(0..4).unwrap(), 4);
733 assert_eq!(
734 graph
735 .run_typed_linear_fold_with_input(0..4, 0, |acc, item| acc + item)
736 .unwrap(),
737 20
738 );
739 }
740
741 #[test]
742 fn typed_linear_fast_path_rejects_junction_graphs() {
743 let graph = GraphDsl::try_create(|builder| {
744 let balance = builder.add(Balance::<i32>::new(2));
745 let merge = builder.add(Merge::<i32>::new(2));
746
747 builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
748 builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
749
750 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
751 })
752 .unwrap();
753
754 assert!(matches!(
755 graph.run_typed_linear_count_with_input([1, 2, 3]),
756 Err(StreamError::GraphValidation(_))
757 ));
758 }
759
760 #[test]
761 fn merge_preferred_drains_preferred_before_secondaries() {
762 let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i32>::new(2))).unwrap();
763
764 assert_eq!(
765 graph
766 .run_merge_preferred(vec![1, 2, 3], vec![vec![100, 101], vec![200]])
767 .unwrap(),
768 vec![1, 2, 3, 100, 200, 101]
769 );
770 }
771
772 #[test]
773 fn merge_waits_for_all_inputs_to_complete() {
774 let graph = GraphDsl::create(|builder| builder.add(Merge::<i32>::new(2))).unwrap();
775
776 assert_eq!(
777 graph.run_fan_in(vec![vec![], vec![10, 20]]).unwrap(),
778 vec![10, 20]
779 );
780 assert_eq!(graph.run_fan_in(vec![vec![1], vec![]]).unwrap(), vec![1]);
781 }
782
783 #[test]
784 fn merge_sorted_drains_remaining_input_after_peer_completes() {
785 let graph = GraphDsl::try_create(|builder| {
786 let unzip = builder.add(Unzip::<i32, i32>::new());
787 let merge = builder.add(MergeSorted::<i32>::new());
788
789 builder.connect(unzip.out0(), merge.inlet(0)?)?;
790 builder.connect(unzip.out1(), merge.inlet(1)?)?;
791
792 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
793 })
794 .unwrap();
795
796 assert_eq!(
797 graph.run_with_input([(1, 2), (4, 3), (6, 5)]).unwrap(),
798 vec![1, 2, 3, 4, 5, 6]
799 );
800 }
801
802 #[test]
803 fn merge_sequence_reorders_adversarial_arrivals_by_sequence_number() {
804 let graph = GraphDsl::try_create(|builder| {
805 let unzip = builder.add(Unzip::<u64, u64>::new());
806 let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
807
808 builder.connect(unzip.out0(), merge.inlet(0)?)?;
809 builder.connect(unzip.out1(), merge.inlet(1)?)?;
810
811 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
812 })
813 .unwrap();
814
815 assert_eq!(
816 graph.run_with_input([(0, 1), (2, 3), (4, 5)]).unwrap(),
817 vec![0, 1, 2, 3, 4, 5]
818 );
819 }
820
821 #[test]
822 fn merge_latest_emits_with_last_seen_peer_and_honors_eager_complete() {
823 let graph = GraphDsl::try_create(|builder| {
824 let unzip = builder.add(Unzip::<i32, i32>::new());
825 let merge = builder.add(MergeLatest::<i32>::new(2, false));
826
827 builder.connect(unzip.out0(), merge.inlet(0)?)?;
828 builder.connect(unzip.out1(), merge.inlet(1)?)?;
829
830 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
831 })
832 .unwrap();
833
834 assert_eq!(
835 graph.run_with_input([(1, 10), (2, 11)]).unwrap(),
836 vec![vec![1, 10], vec![2, 10], vec![2, 11]]
837 );
838 }
839
840 #[test]
841 fn zip_completes_when_any_input_completes() {
842 let graph = GraphDsl::create(|builder| builder.add(Zip::<i32, i32>::new())).unwrap();
843
844 assert_eq!(
845 graph.run_zip(vec![1, 2, 3], vec![10]).unwrap(),
846 vec![(1, 10)]
847 );
848 assert_eq!(graph.run_zip(vec![1], vec![10, 20]).unwrap(), vec![(1, 10)]);
849 assert_eq!(
850 graph.run_zip(vec![], vec![10, 20]).unwrap(),
851 Vec::<(i32, i32)>::new()
852 );
853 }
854
855 #[test]
856 fn concat_drains_inputs_in_declared_order() {
857 let graph = GraphDsl::create(|builder| builder.add(Concat::<i32>::new(3))).unwrap();
858
859 assert_eq!(
860 graph
861 .run_concat(vec![vec![1, 2], vec![], vec![3, 4]])
862 .unwrap(),
863 vec![1, 2, 3, 4]
864 );
865 }
866
867 #[test]
868 fn or_else_switches_only_if_primary_is_empty() {
869 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
870
871 assert_eq!(
872 graph.run_or_else(vec![], vec![10, 20]).unwrap(),
873 vec![10, 20]
874 );
875 assert_eq!(
876 graph.run_or_else(vec![1, 2], vec![10, 20]).unwrap(),
877 vec![1, 2]
878 );
879 }
880
881 #[test]
882 fn or_else_secondary_first_dropped_when_primary_emits() {
883 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
884
885 assert_eq!(
886 graph
887 .run_or_else_secondary_first(vec![1, 2], vec![10, 20])
888 .unwrap(),
889 vec![1, 2]
890 );
891 }
892
893 #[test]
894 fn or_else_secondary_first_flushed_when_primary_empty() {
895 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
896
897 assert_eq!(
898 graph
899 .run_or_else_secondary_first(vec![], vec![10, 20])
900 .unwrap(),
901 vec![10, 20]
902 );
903 }
904
905 #[test]
906 fn or_else_secondary_closed_then_primary_empty_drains_buffer() {
907 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
908
909 assert_eq!(
910 graph
911 .run_or_else_secondary_closed_first(vec![10, 20])
912 .unwrap(),
913 vec![10, 20]
914 );
915 }
916
917 #[test]
918 fn interleave_cycles_segment_sized_chunks() {
919 let graph = GraphDsl::create(|builder| builder.add(Interleave::<i32>::new(3, 2))).unwrap();
920
921 assert_eq!(
922 graph
923 .run_interleave(vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]], 2, false)
924 .unwrap(),
925 vec![1, 2, 10, 11, 20, 3, 12]
926 );
927 }
928
929 #[test]
930 fn interleave_eager_close_stops_when_any_input_completes() {
931 let graph = GraphDsl::create(|builder| {
932 builder.add(Interleave::<i32>::new_with_eager_close(2, 1, true))
933 })
934 .unwrap();
935
936 assert_eq!(
937 graph
938 .run_interleave(vec![vec![1, 2], vec![]], 1, true)
939 .unwrap(),
940 Vec::<i32>::new()
941 );
942 assert_eq!(
943 graph
944 .run_interleave(vec![vec![1], vec![10, 11]], 1, true)
945 .unwrap(),
946 vec![1, 10]
947 );
948 }
949
950 #[test]
951 fn partition_routes_only_live_outlets_after_peer_cancels() {
952 let graph = GraphDsl::try_create(|builder| {
953 let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
954 let merge = builder.add(Merge::<i32>::new(2));
955
956 builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
957 builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
958
959 Ok(FlowShape::new(partition.inlet(), merge.outlet()))
960 })
961 .unwrap();
962
963 assert_eq!(
964 graph.run_with_input([0, 1, 2, 3]).unwrap(),
965 vec![0, 1, 2, 3]
966 );
967 }
968
969 #[test]
970 fn unzip_with_keeps_live_outlet_running_after_peer_finishes() {
971 let graph = GraphDsl::try_create(|builder| {
972 let unzip = builder.add(UnzipWith::<i32, i32, i32>::new(|item| (item, item * 10)));
973 let zip = builder.add(Zip::<i32, i32>::new());
974
975 builder.connect(unzip.out0(), zip.in0())?;
976 builder.connect(unzip.out1(), zip.in1())?;
977
978 Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
979 })
980 .unwrap();
981
982 assert_eq!(
983 graph.run_with_input([1, 2, 3]).unwrap(),
984 vec![(1, 10), (2, 20), (3, 30)]
985 );
986 }
987
988 #[test]
994 fn unzip_merge_sorted_swapped_inlets_still_sorted() {
995 let graph_normal = GraphDsl::try_create(|builder| {
996 let unzip = builder.add(Unzip::<i32, i32>::new());
997 let merge = builder.add(MergeSorted::<i32>::new());
998 builder.connect(unzip.out0(), merge.inlet(0)?)?;
999 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1000 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1001 })
1002 .unwrap();
1003
1004 let graph_swapped = GraphDsl::try_create(|builder| {
1006 let unzip = builder.add(Unzip::<i32, i32>::new());
1007 let merge = builder.add(MergeSorted::<i32>::new());
1008 builder.connect(unzip.out0(), merge.inlet(1)?)?;
1009 builder.connect(unzip.out1(), merge.inlet(0)?)?;
1010 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1011 })
1012 .unwrap();
1013
1014 let input = vec![(1, 2), (4, 3), (6, 5)];
1015 let expected = vec![1, 2, 3, 4, 5, 6];
1016 assert_eq!(
1017 graph_normal.run_with_input(input.clone()).unwrap(),
1018 expected
1019 );
1020 assert_eq!(graph_swapped.run_with_input(input).unwrap(), expected);
1021 }
1022
1023 #[test]
1027 fn unzip_merge_latest_swapped_inlets_correct_snapshot_order() {
1028 let graph_normal = GraphDsl::try_create(|builder| {
1031 let unzip = builder.add(Unzip::<i32, i32>::new());
1032 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1033 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1034 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1035 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1036 })
1037 .unwrap();
1038
1039 let graph_swapped = GraphDsl::try_create(|builder| {
1043 let unzip = builder.add(Unzip::<i32, i32>::new());
1044 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1045 builder.connect(unzip.out0(), merge.inlet(1)?)?;
1046 builder.connect(unzip.out1(), merge.inlet(0)?)?;
1047 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1048 })
1049 .unwrap();
1050
1051 assert_eq!(
1052 graph_normal.run_with_input([(1, 10)]).unwrap(),
1053 vec![vec![1, 10]]
1054 );
1055 assert_eq!(
1057 graph_swapped.run_with_input([(1, 10)]).unwrap(),
1058 vec![vec![10, 1]]
1059 );
1060 }
1061
1062 #[test]
1066 fn merge_sequence_fails_on_gap_at_completion() {
1067 let graph = GraphDsl::try_create(|builder| {
1071 let unzip = builder.add(Unzip::<u64, u64>::new());
1072 let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
1073 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1074 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1075 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1076 })
1077 .unwrap();
1078
1079 let result = graph.run_with_input([(1u64, 2u64)]);
1080 assert!(
1081 matches!(result, Err(StreamError::Failed(ref msg)) if msg.contains("expected sequence")),
1082 "expected a sequence-gap error, got: {result:?}"
1083 );
1084 }
1085
1086 #[test]
1089 fn merge_latest_eager_complete_closes_on_first_inlet_done() {
1090 let graph_eager = GraphDsl::try_create(|builder| {
1095 let unzip = builder.add(Unzip::<i32, i32>::new());
1096 let merge = builder.add(MergeLatest::<i32>::new(2, true));
1097 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1098 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1099 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1100 })
1101 .unwrap();
1102
1103 let graph_non_eager = GraphDsl::try_create(|builder| {
1104 let unzip = builder.add(Unzip::<i32, i32>::new());
1105 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1106 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1107 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1108 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1109 })
1110 .unwrap();
1111
1112 let result_eager = graph_eager.run_with_input([(1i32, 10i32)]).unwrap();
1116 let result_non_eager = graph_non_eager.run_with_input([(1i32, 10i32)]).unwrap();
1117
1118 assert!(!result_eager.is_empty(), "eager graph produced no output");
1120 assert!(
1121 !result_non_eager.is_empty(),
1122 "non-eager graph produced no output"
1123 );
1124
1125 assert_eq!(result_eager, result_non_eager);
1128 }
1129}