1use std::{
9 any::{Any, TypeId, type_name},
10 collections::{HashMap, HashSet, VecDeque},
11 fmt,
12 marker::PhantomData,
13 sync::{
14 Arc, Mutex, OnceLock,
15 atomic::{AtomicUsize, Ordering},
16 },
17 thread,
18};
19
20#[cfg(test)]
21use std::sync::mpsc;
22
23use crate::{
24 actor::{Actor, ActorProcessingErr, ActorRef},
25 stream::{OverflowStrategy, StreamError, StreamResult},
26};
27
28type DatumValue = Box<dyn DatumElement>;
29type StageMapFn = dyn Fn(DatumValue) -> StreamResult<DatumValue> + Send + Sync;
30type StageTypedMapFn = dyn Any + Send + Sync;
31type StageZipFn = dyn Fn(DatumValue, DatumValue) -> StreamResult<DatumValue> + Send + Sync;
32type StageUnzipFn = dyn Fn(DatumValue) -> (DatumValue, DatumValue) + Send + Sync;
33type StageTypedUnzipFn = dyn Any + Send + Sync;
36type StageCompareFn = dyn Fn(&DatumValue, &DatumValue) -> std::cmp::Ordering + Send + Sync;
37type StageSequenceFn = dyn Fn(&DatumValue) -> u64 + Send + Sync;
38type StageTypedSequenceFn = dyn Any + Send + Sync;
41type StageSnapshotFn = dyn Fn(&[&DatumValue]) -> DatumValue + Send + Sync;
42type StageTypedSnapshotFn = dyn Any + Send + Sync;
45type StagePartitionFn = dyn Fn(&DatumValue) -> usize + Send + Sync;
46type StageTypedPartitionFn = dyn Any + Send + Sync;
49
50#[derive(Clone)]
51struct StageMapFns {
52 erased: Arc<StageMapFn>,
53 typed: Arc<StageTypedMapFn>,
54}
55
56#[derive(Clone)]
64pub(super) enum TypedCyclicOp {
65 BufferPassthrough,
68 TakeWhile(Arc<dyn Any + Send + Sync>),
72}
73
74impl fmt::Debug for TypedCyclicOp {
75 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76 match self {
77 Self::BufferPassthrough => f.write_str("BufferPassthrough"),
78 Self::TakeWhile(_) => f.write_str("TakeWhile(..)"),
79 }
80 }
81}
82
83pub(crate) trait DatumElement: Any + Send {
84 fn clone_box(&self) -> DatumValue;
85 fn into_any(self: Box<Self>) -> Box<dyn Any + Send>;
86 fn as_any_ref(&self) -> &dyn Any;
87}
88
89impl<T> DatumElement for T
90where
91 T: Any + Clone + Send,
92{
93 fn clone_box(&self) -> DatumValue {
94 Box::new(self.clone())
95 }
96
97 fn into_any(self: Box<Self>) -> Box<dyn Any + Send> {
98 self
99 }
100
101 fn as_any_ref(&self) -> &dyn Any {
102 self
103 }
104}
105
106fn datum<T>(value: T) -> DatumValue
107where
108 T: Clone + Send + 'static,
109{
110 Box::new(value)
111}
112
113fn downcast_datum<T, S>(
114 value: DatumValue,
115 operation: &'static str,
116 port: impl FnOnce() -> S,
117) -> StreamResult<T>
118where
119 T: Send + 'static,
120 S: Into<String>,
121{
122 value
127 .into_any()
128 .downcast::<T>()
129 .map(|value| *value)
130 .map_err(|_| StreamError::InvalidPortOperation {
131 operation,
132 port: port().into(),
133 reason: format!("element type did not match {}", type_name::<T>()),
134 })
135}
136
137static NEXT_PORT_ID: AtomicUsize = AtomicUsize::new(1);
138
139fn next_port_id() -> PortId {
140 PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed))
141}
142
143fn next_port_id_block(count: usize) -> PortId {
144 debug_assert!(count > 0);
145 PortId(NEXT_PORT_ID.fetch_add(count, Ordering::Relaxed))
146}
147
148#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
152pub struct PortId(usize);
153
154impl PortId {
155 #[must_use]
156 pub const fn as_usize(self) -> usize {
157 self.0
158 }
159
160 const fn offset(self, offset: usize) -> Self {
161 Self(self.0 + offset)
162 }
163}
164
165#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
167pub enum PortKind {
168 Inlet,
169 Outlet,
170}
171
172macro_rules! shared_name {
173 ($fn_name:ident, $value:literal) => {
174 fn $fn_name() -> Arc<str> {
175 static NAME: OnceLock<Arc<str>> = OnceLock::new();
176 Arc::clone(NAME.get_or_init(|| Arc::from($value)))
177 }
178 };
179}
180
181shared_name!(identity_stage_name, "Identity");
182shared_name!(identity_inlet_name, "Identity.in");
183shared_name!(identity_outlet_name, "Identity.out");
184shared_name!(map_stage_name, "Map");
185shared_name!(map_inlet_name, "Map.in");
186shared_name!(map_outlet_name, "Map.out");
187shared_name!(broadcast_stage_name, "Broadcast");
188shared_name!(broadcast_inlet_name, "Broadcast.in");
189shared_name!(balance_stage_name, "Balance");
190shared_name!(balance_inlet_name, "Balance.in");
191shared_name!(merge_stage_name, "Merge");
192shared_name!(merge_outlet_name, "Merge.out");
193shared_name!(merge_preferred_stage_name, "MergePreferred");
194shared_name!(merge_preferred_preferred_name, "MergePreferred.preferred");
195shared_name!(merge_preferred_outlet_name, "MergePreferred.out");
196shared_name!(merge_prioritized_stage_name, "MergePrioritized");
197shared_name!(merge_prioritized_outlet_name, "MergePrioritized.out");
198shared_name!(concat_stage_name, "Concat");
199shared_name!(concat_outlet_name, "Concat.out");
200shared_name!(or_else_stage_name, "OrElse");
201shared_name!(or_else_primary_name, "OrElse.primary");
202shared_name!(or_else_secondary_name, "OrElse.secondary");
203shared_name!(or_else_outlet_name, "OrElse.out");
204shared_name!(interleave_stage_name, "Interleave");
205shared_name!(interleave_outlet_name, "Interleave.out");
206shared_name!(zip_stage_name, "Zip");
207shared_name!(zip_in0_name, "Zip.in0");
208shared_name!(zip_in1_name, "Zip.in1");
209shared_name!(zip_outlet_name, "Zip.out");
210shared_name!(async_boundary_stage_name, "AsyncBoundary");
211shared_name!(async_boundary_inlet_name, "AsyncBoundary.in");
212shared_name!(async_boundary_outlet_name, "AsyncBoundary.out");
213
214mod builder;
215mod executor;
216mod junctions;
217mod ports;
218mod shapes;
219mod stage;
220mod wire;
221
222#[cfg(test)]
223use self::executor::BoundaryCountExecutor;
224use self::{
225 builder::StageRecord,
226 stage::{StageKind, StageTimerMailbox, StageTimerRuntime},
227};
228
229pub use self::{
230 builder::{
231 AsyncBoundaryExecutionConfig, FusedExecutionConfig, FusedExecutionReport, FusedSegment,
232 FusedTerminalReport, Graph, GraphBlueprint, GraphBuilder, GraphDsl, ImportedGraph,
233 PartialGraph,
234 },
235 junctions::{
236 AsyncBoundary, Balance, Broadcast, Buffer, Concat, Identity, Interleave, MapStage, Merge,
237 MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted, OrElse,
238 Partition, TakeWhile, Unzip, UnzipWith, Zip,
239 },
240 ports::{AnyInlet, AnyOutlet, Inlet, Outlet, PortRef},
241 shapes::{
242 BidiShape, FanInShape, FanOutShape, FanOutShape2, FlowShape, MergePreferredShape,
243 PortAllocator, Shape, SinkShape, SourceShape, ZipShape,
244 },
245 stage::{
246 AsyncCallback, GraphStage, GraphStageLogic, InHandler, OutHandler, StageSpec, TimerHandler,
247 },
248 wire::{
249 AutoInletEndpoint, AutoOutletEndpoint, InletCursor, OutletCursor, WireDsl, WirePair,
250 WireSpec,
251 },
252};
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use crate::Attributes;
258
259 #[test]
260 fn graph_dsl_builds_broadcast_zip_flow() {
261 let graph = GraphDsl::try_create(|builder| {
262 let broadcast = builder.add(Broadcast::<i32>::new(2));
263 let zip = builder.add(Zip::<i32, i32>::new());
264
265 builder.connect(broadcast.outlet(0)?, zip.in0())?;
266 builder.connect(broadcast.outlet(1)?, zip.in1())?;
267
268 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
269 })
270 .unwrap();
271
272 assert_eq!(graph.stage_count(), 2);
273 assert_eq!(graph.edge_count(), 2);
274 assert_eq!(graph.shape().inlets().len(), 1);
275 assert_eq!(graph.shape().outlets().len(), 1);
276 assert_eq!(
277 graph.run_with_input([1, 2, 3]).unwrap(),
278 vec![(1, 1), (2, 2), (3, 3)]
279 );
280 }
281
282 #[test]
283 fn graph_dsl_zip_slots_follow_inlet_ids() {
284 let graph = GraphDsl::try_create(|builder| {
285 let broadcast = builder.add(Broadcast::<i32>::new(2));
286 let identity = builder.add(Identity::<i32>::new());
287 let zip = builder.add(Zip::<i32, i32>::new());
288
289 builder.connect(broadcast.outlet(0)?, identity.inlet())?;
290 builder.connect(identity.outlet(), zip.in1())?;
291 builder.connect(broadcast.outlet(1)?, zip.in0())?;
292
293 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
294 })
295 .unwrap();
296
297 assert_eq!(
298 graph.run_with_input([1, 2, 3]).unwrap(),
299 vec![(1, 1), (2, 2), (3, 3)]
300 );
301 }
302
303 #[test]
304 fn graph_dsl_zip_buffers_skewed_inlet_arrivals() {
305 let graph = GraphDsl::try_create(|builder| {
321 let fan = builder.add(Broadcast::<i32>::new(2));
322 let doubler = builder.add(Broadcast::<i32>::new(2));
323 let merge = builder.add(Merge::<i32>::new(2));
324 let zip = builder.add(Zip::<i32, i32>::new());
325
326 builder.connect(fan.outlet(0)?, doubler.inlet())?;
327 builder.connect(doubler.outlet(0)?, merge.inlet(0)?)?;
328 builder.connect(doubler.outlet(1)?, merge.inlet(1)?)?;
329 builder.connect(merge.outlet(), zip.in0())?;
330 builder.connect(fan.outlet(1)?, zip.in1())?;
331
332 Ok(FlowShape::new(fan.inlet(), zip.outlet()))
333 })
334 .unwrap();
335
336 assert_eq!(
340 graph.run_with_input([10, 20]).unwrap(),
341 vec![(10, 10), (10, 20)]
342 );
343 }
344
345 #[test]
346 fn graph_dsl_runs_merge_preferred_buffered_feedback_cycle() {
347 let graph = GraphDsl::try_create(|builder| {
348 let merge = builder.add(MergePreferred::<i32>::new(1));
349 let broadcast = builder.add(Broadcast::<i32>::new(2));
350 let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
351 let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
352 let decrement = builder.add(MapStage::new(|item: i32| item - 1));
353
354 builder.connect(merge.outlet(), broadcast.inlet())?;
355 builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
356 builder.connect(buffer.outlet(), positive.inlet())?;
357 builder.connect(positive.outlet(), decrement.inlet())?;
358 builder.connect(decrement.outlet(), merge.preferred())?;
359
360 Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
361 })
362 .unwrap();
363
364 assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
365 }
366
367 #[test]
368 fn graph_dsl_runs_buffered_merge_feedback_cycle() {
369 let graph = GraphDsl::try_create(|builder| {
370 let merge = builder.add(Merge::<i32>::new(2));
371 let broadcast = builder.add(Broadcast::<i32>::new(2));
372 let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
373 let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
374 let decrement = builder.add(MapStage::new(|item: i32| item - 1));
375
376 builder.connect(merge.outlet(), broadcast.inlet())?;
377 builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
378 builder.connect(buffer.outlet(), positive.inlet())?;
379 builder.connect(positive.outlet(), decrement.inlet())?;
380 builder.connect(decrement.outlet(), merge.inlet(1)?)?;
381
382 Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
383 })
384 .unwrap();
385
386 assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
387 }
388
389 #[test]
390 fn graph_dsl_unbuffered_merge_cycle_surfaces_event_limit() {
391 let graph = GraphDsl::try_create(|builder| {
392 let merge = builder.add(Merge::<i32>::new(2));
393 let broadcast = builder.add(Broadcast::<i32>::new(2));
394
395 builder.connect(merge.outlet(), broadcast.inlet())?;
396 builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
397
398 Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
399 })
400 .unwrap();
401
402 let result = graph.run_with_input_report([1], FusedExecutionConfig { event_limit: 256 });
403
404 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 256 }));
405 }
406
407 #[test]
408 fn partial_graph_can_be_imported_with_its_shape() {
409 let partial = GraphDsl::partial(|builder| {
410 let first = builder.add(Identity::<i32>::new());
411 let second = builder.add_named(Identity::<i32>::new(), "partial.tail");
412 builder.connect(first.outlet(), second.inlet())?;
413 Ok(FlowShape::new(first.inlet(), second.outlet()))
414 })
415 .named("partial.identity");
416
417 let graph = GraphDsl::try_create(|builder| {
418 let imported = builder.import(&partial)?;
419 let after = builder.add(Identity::<i32>::new());
420 builder.connect(imported.outlet(), after.inlet())?;
421 Ok(FlowShape::new(imported.inlet(), after.outlet()))
422 })
423 .unwrap()
424 .named("outer.graph");
425
426 assert_eq!(graph.run_with_input([1, 2, 3]).unwrap(), vec![1, 2, 3]);
427 assert_eq!(graph.attributes().name(), Some("outer.graph"));
428 }
429
430 #[test]
431 fn graph_attributes_follow_innermost_wins_order() {
432 let graph = GraphDsl::create(|builder| {
433 builder.add_with_attributes(
434 Identity::<i32>::new(),
435 Attributes::named("stage-outer").and(Attributes::named("stage-inner")),
436 )
437 })
438 .unwrap()
439 .add_attributes(Attributes::dispatcher("graph-outer"))
440 .add_attributes(Attributes::dispatcher("graph-inner"));
441
442 assert_eq!(graph.attributes().dispatcher_hint(), Some("graph-inner"));
443 assert_eq!(
444 graph.stages[0].spec.attributes().name(),
445 Some("stage-inner")
446 );
447 }
448
449 #[test]
450 fn graph_dsl_rejects_invalid_and_incomplete_wiring() {
451 let graph = GraphDsl::try_create(|builder| {
452 let first = builder.add(Identity::<i32>::new());
453 let second = builder.add(Identity::<i32>::new());
454 let third = builder.add(Identity::<i32>::new());
455
456 builder.connect(first.outlet(), second.inlet())?;
457 let duplicate = builder.connect(first.outlet(), third.inlet());
458 assert!(matches!(duplicate, Err(StreamError::GraphValidation(_))));
459
460 Ok(FlowShape::new(first.inlet(), second.outlet()))
461 });
462
463 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
464
465 let graph = GraphDsl::create(|builder| {
466 let broadcast = builder.add(Broadcast::<i32>::new(2));
467 SourceShape::new(broadcast.outlet(0).unwrap())
468 });
469 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
470 }
471
472 #[test]
473 fn graph_dsl_rejects_erased_type_mismatch() {
474 let graph = GraphDsl::try_create(|builder| {
475 let left = builder.add(Identity::<i32>::new());
476 let right = builder.add(Identity::<u64>::new());
477 let mismatch = builder.connect_any(left.outlet().erase(), right.inlet().erase());
478 assert!(matches!(mismatch, Err(StreamError::GraphValidation(_))));
479 Ok(FlowShape::new(left.inlet(), right.outlet()))
480 });
481
482 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
483 }
484
485 #[test]
486 fn graph_dsl_rejects_result_ports_with_spoofed_metadata() {
487 let graph = GraphDsl::create(|builder| {
488 let shape = builder.add(Identity::<i32>::new());
489 FlowShape::new(
490 Inlet::<u64>::with_id(shape.inlet().id(), "Identity.in"),
491 shape.outlet(),
492 )
493 });
494
495 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
496 }
497
498 #[test]
499 fn allocated_ports_do_not_collide_with_manual_ports() {
500 let manual = Inlet::<i32>::new("manual");
501 let graph = GraphDsl::try_create(|builder| {
502 let shape = builder.add(Identity::<i32>::new());
503 assert_ne!(manual.id(), shape.inlet().id());
504 Ok(shape)
505 });
506
507 assert!(graph.is_ok());
508 }
509
510 #[test]
511 fn identity_ports_keep_static_names_and_unique_ids() {
512 let graph = GraphDsl::try_create(|builder| {
513 let first = builder.add(Identity::<i32>::new());
514 let second = builder.add(Identity::<i32>::new());
515
516 assert_eq!(first.inlet().name(), "Identity.in");
517 assert_eq!(first.outlet().name(), "Identity.out");
518 assert_eq!(second.inlet().name(), "Identity.in");
519 assert_eq!(second.outlet().name(), "Identity.out");
520 assert_ne!(first.inlet().id(), first.outlet().id());
521 assert_ne!(first.outlet().id(), second.inlet().id());
522 assert_ne!(second.inlet().id(), second.outlet().id());
523
524 builder.connect(first.outlet(), second.inlet())?;
525 Ok(FlowShape::new(first.inlet(), second.outlet()))
526 });
527
528 assert!(graph.is_ok());
529 }
530
531 #[test]
532 fn graph_stage_logic_checks_port_operations() {
533 let shape = FlowShape::new(Inlet::<i32>::new("in"), Outlet::<i32>::new("out"));
534 let inlet = shape.inlet();
535 let outlet = shape.outlet();
536 let mut logic = GraphStageLogic::new(&shape);
537
538 assert!(matches!(
539 logic.grab(&inlet),
540 Err(StreamError::InvalidPortOperation { .. })
541 ));
542 logic.pull(&inlet).unwrap();
543 assert!(logic.has_been_pulled(&inlet));
544 assert!(matches!(
545 logic.pull(&inlet),
546 Err(StreamError::InvalidPortOperation { .. })
547 ));
548 logic.offer(&inlet, 41).unwrap();
549 assert!(!logic.has_been_pulled(&inlet));
550 assert_eq!(logic.grab(&inlet).unwrap(), 41);
551
552 assert!(matches!(
553 logic.push(&outlet, 1),
554 Err(StreamError::InvalidPortOperation { .. })
555 ));
556 logic.request(&outlet).unwrap();
557 assert!(logic.is_available(&outlet));
558 logic.push(&outlet, 42).unwrap();
559 assert!(!logic.is_available(&outlet));
560 logic.complete(&outlet).unwrap();
561 assert!(logic.is_closed(&outlet));
562 assert!(matches!(
563 logic.request(&outlet),
564 Err(StreamError::InvalidPortOperation { .. })
565 ));
566 }
567
568 #[test]
569 fn fused_execution_enforces_event_limit() {
570 let graph = GraphDsl::create(|builder| builder.add(Identity::<i32>::new())).unwrap();
571
572 let result = graph.run_with_input_report([1, 2], FusedExecutionConfig { event_limit: 1 });
573
574 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 1 }));
575 }
576
577 #[test]
578 fn async_boundary_splits_fused_segments() {
579 let graph = GraphDsl::try_create(|builder| {
580 let first = builder.add(Identity::<i32>::new());
581 let boundary = builder.add(AsyncBoundary::<i32>::new());
582 let second = builder.add(MapStage::new(|item: i32| item + 1));
583
584 builder.connect(first.outlet(), boundary.inlet())?;
585 builder.connect(boundary.outlet(), second.inlet())?;
586
587 Ok(FlowShape::new(first.inlet(), second.outlet()))
588 })
589 .unwrap();
590
591 assert_eq!(graph.segments().len(), 3);
592 let report = graph
593 .run_with_input_report([1, 2], FusedExecutionConfig::default())
594 .unwrap();
595 assert_eq!(report.output, vec![2, 3]);
596 assert_eq!(report.async_boundary_crossings, 2);
597 }
598
599 #[test]
600 fn async_boundary_count_path_uses_ractor_handoff_segments() {
601 let graph = GraphDsl::try_create(|builder| {
602 let first = builder.add(MapStage::new(|item: i32| item + 1));
603 let boundary = builder.add(AsyncBoundary::<i32>::new());
604 let second = builder.add(MapStage::new(|item: i32| item * 2));
605
606 builder.connect(first.outlet(), boundary.inlet())?;
607 builder.connect(boundary.outlet(), second.inlet())?;
608
609 Ok(FlowShape::new(first.inlet(), second.outlet()))
610 })
611 .unwrap();
612
613 let report = graph
614 .run_async_boundary_count_with_input_report(
615 [1, 2, 3],
616 AsyncBoundaryExecutionConfig {
617 fused: FusedExecutionConfig::default(),
618 buffer_size: 2,
619 },
620 )
621 .unwrap();
622
623 assert_eq!(report.result, 3);
624 assert_eq!(report.async_boundary_crossings, 3);
625 assert_eq!(report.events, 18);
626 }
627
628 #[test]
629 fn threaded_async_boundary_baseline_matches_ractor_count_path() {
630 let graph = GraphDsl::try_create(|builder| {
631 let first = builder.add(MapStage::new(|item: i32| item + 1));
632 let boundary = builder.add(AsyncBoundary::<i32>::new());
633 let second = builder.add(MapStage::new(|item: i32| item * 2));
634
635 builder.connect(first.outlet(), boundary.inlet())?;
636 builder.connect(boundary.outlet(), second.inlet())?;
637
638 Ok(FlowShape::new(first.inlet(), second.outlet()))
639 })
640 .unwrap();
641
642 let config = AsyncBoundaryExecutionConfig {
643 fused: FusedExecutionConfig::default(),
644 buffer_size: 2,
645 };
646 let ractor_report = graph
647 .run_async_boundary_count_with_input_report([1, 2, 3], config)
648 .unwrap();
649 let threaded_report = BoundaryCountExecutor::Threaded
650 .run_count(
651 [1, 2, 3],
652 graph.typed_linear_async_segments().unwrap(),
653 config,
654 )
655 .unwrap();
656
657 assert_eq!(threaded_report, ractor_report);
658 }
659
660 #[test]
661 fn ractor_async_boundary_rejects_zero_buffer_size() {
662 let graph = GraphDsl::try_create(|builder| {
663 let first = builder.add(MapStage::new(|item: i32| item + 1));
664 let boundary = builder.add(AsyncBoundary::<i32>::new());
665 let second = builder.add(MapStage::new(|item: i32| item * 2));
666
667 builder.connect(first.outlet(), boundary.inlet())?;
668 builder.connect(boundary.outlet(), second.inlet())?;
669
670 Ok(FlowShape::new(first.inlet(), second.outlet()))
671 })
672 .unwrap();
673
674 let result = BoundaryCountExecutor::Ractor.run_count(
675 [1, 2, 3],
676 graph.typed_linear_async_segments().unwrap(),
677 AsyncBoundaryExecutionConfig {
678 fused: FusedExecutionConfig::default(),
679 buffer_size: 0,
680 },
681 );
682
683 assert!(matches!(
684 result,
685 Err(StreamError::GraphValidation(message)) if message.contains("buffer_size")
686 ));
687 }
688
689 #[test]
690 fn ractor_async_boundary_streams_input_without_eager_collection() {
691 let graph = GraphDsl::try_create(|builder| {
692 let first = builder.add(MapStage::new(|item: i32| item + 1));
693 let boundary = builder.add(AsyncBoundary::<i32>::new());
694 let second = builder.add(MapStage::new(|item: i32| item * 2));
695
696 builder.connect(first.outlet(), boundary.inlet())?;
697 builder.connect(boundary.outlet(), second.inlet())?;
698
699 Ok(FlowShape::new(first.inlet(), second.outlet()))
700 })
701 .unwrap();
702
703 let result = BoundaryCountExecutor::Ractor.run_count(
704 std::iter::repeat(1_i32),
705 graph.typed_linear_async_segments().unwrap(),
706 AsyncBoundaryExecutionConfig {
707 fused: FusedExecutionConfig { event_limit: 4 },
708 buffer_size: 1,
709 },
710 );
711
712 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 4 }));
713 }
714
715 #[test]
716 fn ractor_async_boundary_runs_inside_existing_tokio_runtime() {
717 let graph = GraphDsl::try_create(|builder| {
718 let first = builder.add(MapStage::new(|item: i32| item + 1));
719 let boundary = builder.add(AsyncBoundary::<i32>::new());
720 let second = builder.add(MapStage::new(|item: i32| item * 2));
721
722 builder.connect(first.outlet(), boundary.inlet())?;
723 builder.connect(boundary.outlet(), second.inlet())?;
724
725 Ok(FlowShape::new(first.inlet(), second.outlet()))
726 })
727 .unwrap();
728
729 let runtime = tokio::runtime::Builder::new_current_thread()
730 .build()
731 .unwrap();
732 let report = runtime
733 .block_on(async {
734 BoundaryCountExecutor::Ractor.run_count(
735 [1, 2, 3],
736 graph.typed_linear_async_segments().unwrap(),
737 AsyncBoundaryExecutionConfig {
738 fused: FusedExecutionConfig::default(),
739 buffer_size: 2,
740 },
741 )
742 })
743 .unwrap();
744
745 assert_eq!(report.result, 3);
746 assert_eq!(report.async_boundary_crossings, 3);
747 assert_eq!(report.events, 18);
748 }
749
750 #[test]
751 fn balance_merge_round_robins_through_junctions() {
752 let graph = GraphDsl::try_create(|builder| {
753 let balance = builder.add(Balance::<i32>::new(2));
754 let merge = builder.add(Merge::<i32>::new(2));
755
756 builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
757 builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
758
759 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
760 })
761 .unwrap();
762
763 assert_eq!(graph.run_with_input(0..6).unwrap(), vec![0, 1, 2, 3, 4, 5]);
764 }
765
766 #[test]
767 fn prioritized_merge_uses_weighted_schedule() {
768 let graph =
769 GraphDsl::create(|builder| builder.add(MergePrioritized::<i32>::new(vec![2, 1])))
770 .unwrap();
771
772 assert_eq!(
773 graph
774 .run_fan_in(vec![vec![1, 2, 3, 4], vec![100, 101]])
775 .unwrap(),
776 vec![1, 2, 100, 3, 4, 101]
777 );
778 }
779
780 #[test]
781 fn fused_execution_supports_count_and_fold_sinks() {
782 let graph = GraphDsl::try_create(|builder| {
783 let first = builder.add(MapStage::new(|item: u64| item + 1));
784 let second = builder.add(MapStage::new(|item: u64| item * 2));
785
786 builder.connect(first.outlet(), second.inlet())?;
787
788 Ok(FlowShape::new(first.inlet(), second.outlet()))
789 })
790 .unwrap();
791
792 assert_eq!(graph.run_count_with_input(0..4).unwrap(), 4);
793 assert_eq!(
794 graph
795 .run_fold_with_input(0..4, 0, |acc, item| acc + item)
796 .unwrap(),
797 20
798 );
799 }
800
801 #[test]
802 fn typed_linear_fast_path_runs_same_type_chains() {
803 let graph = GraphDsl::try_create(|builder| {
804 let first = builder.add(MapStage::new(|item: u64| item + 1));
805 let second = builder.add(MapStage::new(|item: u64| item * 2));
806
807 builder.connect(first.outlet(), second.inlet())?;
808
809 Ok(FlowShape::new(first.inlet(), second.outlet()))
810 })
811 .unwrap();
812
813 let report = graph
814 .run_typed_linear_with_input_report([1, 2, 3], FusedExecutionConfig::default())
815 .unwrap();
816 assert_eq!(report.output, vec![4, 6, 8]);
817 assert_eq!(report.events, 12);
818
819 assert_eq!(graph.run_typed_linear_count_with_input(0..4).unwrap(), 4);
820 assert_eq!(
821 graph
822 .run_typed_linear_fold_with_input(0..4, 0, |acc, item| acc + item)
823 .unwrap(),
824 20
825 );
826 }
827
828 #[test]
829 fn typed_linear_fast_path_rejects_junction_graphs() {
830 let graph = GraphDsl::try_create(|builder| {
831 let balance = builder.add(Balance::<i32>::new(2));
832 let merge = builder.add(Merge::<i32>::new(2));
833
834 builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
835 builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
836
837 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
838 })
839 .unwrap();
840
841 assert!(matches!(
842 graph.run_typed_linear_count_with_input([1, 2, 3]),
843 Err(StreamError::GraphValidation(_))
844 ));
845 }
846
847 #[test]
848 fn merge_preferred_drains_preferred_before_secondaries() {
849 let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i32>::new(2))).unwrap();
850
851 assert_eq!(
852 graph
853 .run_merge_preferred(vec![1, 2, 3], vec![vec![100, 101], vec![200]])
854 .unwrap(),
855 vec![1, 2, 3, 100, 200, 101]
856 );
857 }
858
859 #[test]
860 fn merge_waits_for_all_inputs_to_complete() {
861 let graph = GraphDsl::create(|builder| builder.add(Merge::<i32>::new(2))).unwrap();
862
863 assert_eq!(
864 graph.run_fan_in(vec![vec![], vec![10, 20]]).unwrap(),
865 vec![10, 20]
866 );
867 assert_eq!(graph.run_fan_in(vec![vec![1], vec![]]).unwrap(), vec![1]);
868 }
869
870 #[test]
871 fn merge_sorted_drains_remaining_input_after_peer_completes() {
872 let graph = GraphDsl::try_create(|builder| {
873 let unzip = builder.add(Unzip::<i32, i32>::new());
874 let merge = builder.add(MergeSorted::<i32>::new());
875
876 builder.connect(unzip.out0(), merge.inlet(0)?)?;
877 builder.connect(unzip.out1(), merge.inlet(1)?)?;
878
879 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
880 })
881 .unwrap();
882
883 assert_eq!(
884 graph.run_with_input([(1, 2), (4, 3), (6, 5)]).unwrap(),
885 vec![1, 2, 3, 4, 5, 6]
886 );
887 }
888
889 #[test]
890 fn merge_sequence_reorders_adversarial_arrivals_by_sequence_number() {
891 let graph = GraphDsl::try_create(|builder| {
892 let unzip = builder.add(Unzip::<u64, u64>::new());
893 let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
894
895 builder.connect(unzip.out0(), merge.inlet(0)?)?;
896 builder.connect(unzip.out1(), merge.inlet(1)?)?;
897
898 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
899 })
900 .unwrap();
901
902 assert_eq!(
903 graph.run_with_input([(0, 1), (2, 3), (4, 5)]).unwrap(),
904 vec![0, 1, 2, 3, 4, 5]
905 );
906 }
907
908 #[test]
909 fn merge_latest_emits_with_last_seen_peer_and_honors_eager_complete() {
910 let graph = GraphDsl::try_create(|builder| {
911 let unzip = builder.add(Unzip::<i32, i32>::new());
912 let merge = builder.add(MergeLatest::<i32>::new(2, false));
913
914 builder.connect(unzip.out0(), merge.inlet(0)?)?;
915 builder.connect(unzip.out1(), merge.inlet(1)?)?;
916
917 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
918 })
919 .unwrap();
920
921 assert_eq!(
922 graph.run_with_input([(1, 10), (2, 11)]).unwrap(),
923 vec![vec![1, 10], vec![2, 10], vec![2, 11]]
924 );
925 }
926
927 #[test]
928 fn zip_completes_when_any_input_completes() {
929 let graph = GraphDsl::create(|builder| builder.add(Zip::<i32, i32>::new())).unwrap();
930
931 assert_eq!(
932 graph.run_zip(vec![1, 2, 3], vec![10]).unwrap(),
933 vec![(1, 10)]
934 );
935 assert_eq!(graph.run_zip(vec![1], vec![10, 20]).unwrap(), vec![(1, 10)]);
936 assert_eq!(
937 graph.run_zip(vec![], vec![10, 20]).unwrap(),
938 Vec::<(i32, i32)>::new()
939 );
940 }
941
942 #[test]
943 fn concat_drains_inputs_in_declared_order() {
944 let graph = GraphDsl::create(|builder| builder.add(Concat::<i32>::new(3))).unwrap();
945
946 assert_eq!(
947 graph
948 .run_concat(vec![vec![1, 2], vec![], vec![3, 4]])
949 .unwrap(),
950 vec![1, 2, 3, 4]
951 );
952 }
953
954 #[test]
955 fn or_else_switches_only_if_primary_is_empty() {
956 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
957
958 assert_eq!(
959 graph.run_or_else(vec![], vec![10, 20]).unwrap(),
960 vec![10, 20]
961 );
962 assert_eq!(
963 graph.run_or_else(vec![1, 2], vec![10, 20]).unwrap(),
964 vec![1, 2]
965 );
966 }
967
968 #[test]
969 fn or_else_secondary_first_dropped_when_primary_emits() {
970 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
971
972 assert_eq!(
973 graph
974 .run_or_else_secondary_first(vec![1, 2], vec![10, 20])
975 .unwrap(),
976 vec![1, 2]
977 );
978 }
979
980 #[test]
981 fn or_else_secondary_first_flushed_when_primary_empty() {
982 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
983
984 assert_eq!(
985 graph
986 .run_or_else_secondary_first(vec![], vec![10, 20])
987 .unwrap(),
988 vec![10, 20]
989 );
990 }
991
992 #[test]
993 fn or_else_secondary_closed_then_primary_empty_drains_buffer() {
994 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
995
996 assert_eq!(
997 graph
998 .run_or_else_secondary_closed_first(vec![10, 20])
999 .unwrap(),
1000 vec![10, 20]
1001 );
1002 }
1003
1004 #[test]
1005 fn interleave_cycles_segment_sized_chunks() {
1006 let graph = GraphDsl::create(|builder| builder.add(Interleave::<i32>::new(3, 2))).unwrap();
1007
1008 assert_eq!(
1009 graph
1010 .run_interleave(vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]], 2, false)
1011 .unwrap(),
1012 vec![1, 2, 10, 11, 20, 3, 12]
1013 );
1014 }
1015
1016 #[test]
1017 fn interleave_eager_close_stops_when_any_input_completes() {
1018 let graph = GraphDsl::create(|builder| {
1019 builder.add(Interleave::<i32>::new_with_eager_close(2, 1, true))
1020 })
1021 .unwrap();
1022
1023 assert_eq!(
1024 graph
1025 .run_interleave(vec![vec![1, 2], vec![]], 1, true)
1026 .unwrap(),
1027 Vec::<i32>::new()
1028 );
1029 assert_eq!(
1030 graph
1031 .run_interleave(vec![vec![1], vec![10, 11]], 1, true)
1032 .unwrap(),
1033 vec![1, 10]
1034 );
1035 }
1036
1037 #[test]
1038 fn partition_routes_only_live_outlets_after_peer_cancels() {
1039 let graph = GraphDsl::try_create(|builder| {
1040 let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
1041 let merge = builder.add(Merge::<i32>::new(2));
1042
1043 builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
1044 builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
1045
1046 Ok(FlowShape::new(partition.inlet(), merge.outlet()))
1047 })
1048 .unwrap();
1049
1050 assert_eq!(
1051 graph.run_with_input([0, 1, 2, 3]).unwrap(),
1052 vec![0, 1, 2, 3]
1053 );
1054 }
1055
1056 #[test]
1057 fn unzip_with_keeps_live_outlet_running_after_peer_finishes() {
1058 let graph = GraphDsl::try_create(|builder| {
1059 let unzip = builder.add(UnzipWith::<i32, i32, i32>::new(|item| (item, item * 10)));
1060 let zip = builder.add(Zip::<i32, i32>::new());
1061
1062 builder.connect(unzip.out0(), zip.in0())?;
1063 builder.connect(unzip.out1(), zip.in1())?;
1064
1065 Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
1066 })
1067 .unwrap();
1068
1069 assert_eq!(
1070 graph.run_with_input([1, 2, 3]).unwrap(),
1071 vec![(1, 10), (2, 20), (3, 30)]
1072 );
1073 }
1074
1075 #[test]
1081 fn unzip_merge_sorted_swapped_inlets_still_sorted() {
1082 let graph_normal = GraphDsl::try_create(|builder| {
1083 let unzip = builder.add(Unzip::<i32, i32>::new());
1084 let merge = builder.add(MergeSorted::<i32>::new());
1085 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1086 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1087 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1088 })
1089 .unwrap();
1090
1091 let graph_swapped = GraphDsl::try_create(|builder| {
1093 let unzip = builder.add(Unzip::<i32, i32>::new());
1094 let merge = builder.add(MergeSorted::<i32>::new());
1095 builder.connect(unzip.out0(), merge.inlet(1)?)?;
1096 builder.connect(unzip.out1(), merge.inlet(0)?)?;
1097 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1098 })
1099 .unwrap();
1100
1101 let input = vec![(1, 2), (4, 3), (6, 5)];
1102 let expected = vec![1, 2, 3, 4, 5, 6];
1103 assert_eq!(
1104 graph_normal.run_with_input(input.clone()).unwrap(),
1105 expected
1106 );
1107 assert_eq!(graph_swapped.run_with_input(input).unwrap(), expected);
1108 }
1109
1110 #[test]
1114 fn unzip_merge_latest_swapped_inlets_correct_snapshot_order() {
1115 let graph_normal = GraphDsl::try_create(|builder| {
1118 let unzip = builder.add(Unzip::<i32, i32>::new());
1119 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1120 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1121 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1122 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1123 })
1124 .unwrap();
1125
1126 let graph_swapped = GraphDsl::try_create(|builder| {
1130 let unzip = builder.add(Unzip::<i32, i32>::new());
1131 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1132 builder.connect(unzip.out0(), merge.inlet(1)?)?;
1133 builder.connect(unzip.out1(), merge.inlet(0)?)?;
1134 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1135 })
1136 .unwrap();
1137
1138 assert_eq!(
1139 graph_normal.run_with_input([(1, 10)]).unwrap(),
1140 vec![vec![1, 10]]
1141 );
1142 assert_eq!(
1144 graph_swapped.run_with_input([(1, 10)]).unwrap(),
1145 vec![vec![10, 1]]
1146 );
1147 }
1148
1149 #[test]
1153 fn merge_sequence_fails_on_gap_at_completion() {
1154 let graph = GraphDsl::try_create(|builder| {
1158 let unzip = builder.add(Unzip::<u64, u64>::new());
1159 let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
1160 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1161 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1162 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1163 })
1164 .unwrap();
1165
1166 let result = graph.run_with_input([(1u64, 2u64)]);
1167 assert!(
1168 matches!(result, Err(StreamError::Failed(ref msg)) if msg.contains("expected sequence")),
1169 "expected a sequence-gap error, got: {result:?}"
1170 );
1171 }
1172
1173 #[test]
1176 fn merge_latest_eager_complete_closes_on_first_inlet_done() {
1177 let graph_eager = GraphDsl::try_create(|builder| {
1182 let unzip = builder.add(Unzip::<i32, i32>::new());
1183 let merge = builder.add(MergeLatest::<i32>::new(2, true));
1184 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1185 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1186 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1187 })
1188 .unwrap();
1189
1190 let graph_non_eager = GraphDsl::try_create(|builder| {
1191 let unzip = builder.add(Unzip::<i32, i32>::new());
1192 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1193 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1194 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1195 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1196 })
1197 .unwrap();
1198
1199 let result_eager = graph_eager.run_with_input([(1i32, 10i32)]).unwrap();
1203 let result_non_eager = graph_non_eager.run_with_input([(1i32, 10i32)]).unwrap();
1204
1205 assert!(!result_eager.is_empty(), "eager graph produced no output");
1207 assert!(
1208 !result_non_eager.is_empty(),
1209 "non-eager graph produced no output"
1210 );
1211
1212 assert_eq!(result_eager, result_non_eager);
1215 }
1216}