1use std::{
9 any::{Any, TypeId, type_name},
10 collections::{HashMap, HashSet, VecDeque},
11 fmt,
12 marker::PhantomData,
13 sync::{
14 Arc, Mutex, OnceLock,
15 atomic::{AtomicUsize, Ordering},
16 },
17 thread,
18};
19
20#[cfg(test)]
21use std::sync::mpsc;
22
23use crate::{
24 actor::{Actor, ActorProcessingErr, ActorRef},
25 stream::{OverflowStrategy, StreamError, StreamResult},
26};
27
28type DatumValue = Box<dyn DatumElement>;
29type StageMapFn = dyn Fn(DatumValue) -> StreamResult<DatumValue> + Send + Sync;
30type StageTypedMapFn = dyn Any + Send + Sync;
31type StageZipFn = dyn Fn(DatumValue, DatumValue) -> StreamResult<DatumValue> + Send + Sync;
32type StageUnzipFn = dyn Fn(DatumValue) -> (DatumValue, DatumValue) + Send + Sync;
33type StageTypedUnzipFn = dyn Any + Send + Sync;
36type StageCompareFn = dyn Fn(&DatumValue, &DatumValue) -> std::cmp::Ordering + Send + Sync;
37type StageSequenceFn = dyn Fn(&DatumValue) -> u64 + Send + Sync;
38type StageTypedSequenceFn = dyn Any + Send + Sync;
41type StageSnapshotFn = dyn Fn(&[&DatumValue]) -> DatumValue + Send + Sync;
42type StageTypedSnapshotFn = dyn Any + Send + Sync;
45type StagePartitionFn = dyn Fn(&DatumValue) -> usize + Send + Sync;
46type StageTypedPartitionFn = dyn Any + Send + Sync;
49
50#[derive(Clone)]
51struct StageMapFns {
52 erased: Arc<StageMapFn>,
53 typed: Arc<StageTypedMapFn>,
54}
55
56#[derive(Clone)]
64pub(super) enum TypedCyclicOp {
65 BufferPassthrough,
68 TakeWhile(Arc<dyn Any + Send + Sync>),
72}
73
74impl fmt::Debug for TypedCyclicOp {
75 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76 match self {
77 Self::BufferPassthrough => f.write_str("BufferPassthrough"),
78 Self::TakeWhile(_) => f.write_str("TakeWhile(..)"),
79 }
80 }
81}
82
83pub(crate) trait DatumElement: Any + Send {
84 fn clone_box(&self) -> DatumValue;
85 fn into_any(self: Box<Self>) -> Box<dyn Any + Send>;
86 fn as_any_ref(&self) -> &dyn Any;
87}
88
89impl<T> DatumElement for T
90where
91 T: Any + Clone + Send,
92{
93 fn clone_box(&self) -> DatumValue {
94 Box::new(self.clone())
95 }
96
97 fn into_any(self: Box<Self>) -> Box<dyn Any + Send> {
98 self
99 }
100
101 fn as_any_ref(&self) -> &dyn Any {
102 self
103 }
104}
105
106fn datum<T>(value: T) -> DatumValue
107where
108 T: Clone + Send + 'static,
109{
110 Box::new(value)
111}
112
113fn downcast_datum<T, S>(
114 value: DatumValue,
115 operation: &'static str,
116 port: impl FnOnce() -> S,
117) -> StreamResult<T>
118where
119 T: Send + 'static,
120 S: Into<String>,
121{
122 value
127 .into_any()
128 .downcast::<T>()
129 .map(|value| *value)
130 .map_err(|_| StreamError::InvalidPortOperation {
131 operation,
132 port: port().into(),
133 reason: format!("element type did not match {}", type_name::<T>()),
134 })
135}
136
137static NEXT_PORT_ID: AtomicUsize = AtomicUsize::new(1);
138
139fn next_port_id() -> PortId {
140 PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed))
141}
142
143fn next_port_id_block(count: usize) -> PortId {
144 debug_assert!(count > 0);
145 PortId(NEXT_PORT_ID.fetch_add(count, Ordering::Relaxed))
146}
147
148#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
149pub struct PortId(usize);
150
151impl PortId {
152 #[must_use]
153 pub const fn as_usize(self) -> usize {
154 self.0
155 }
156
157 const fn offset(self, offset: usize) -> Self {
158 Self(self.0 + offset)
159 }
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
163pub enum PortKind {
164 Inlet,
165 Outlet,
166}
167
168macro_rules! shared_name {
169 ($fn_name:ident, $value:literal) => {
170 fn $fn_name() -> Arc<str> {
171 static NAME: OnceLock<Arc<str>> = OnceLock::new();
172 Arc::clone(NAME.get_or_init(|| Arc::from($value)))
173 }
174 };
175}
176
177shared_name!(identity_stage_name, "Identity");
178shared_name!(identity_inlet_name, "Identity.in");
179shared_name!(identity_outlet_name, "Identity.out");
180shared_name!(map_stage_name, "Map");
181shared_name!(map_inlet_name, "Map.in");
182shared_name!(map_outlet_name, "Map.out");
183shared_name!(broadcast_stage_name, "Broadcast");
184shared_name!(broadcast_inlet_name, "Broadcast.in");
185shared_name!(balance_stage_name, "Balance");
186shared_name!(balance_inlet_name, "Balance.in");
187shared_name!(merge_stage_name, "Merge");
188shared_name!(merge_outlet_name, "Merge.out");
189shared_name!(merge_preferred_stage_name, "MergePreferred");
190shared_name!(merge_preferred_preferred_name, "MergePreferred.preferred");
191shared_name!(merge_preferred_outlet_name, "MergePreferred.out");
192shared_name!(merge_prioritized_stage_name, "MergePrioritized");
193shared_name!(merge_prioritized_outlet_name, "MergePrioritized.out");
194shared_name!(concat_stage_name, "Concat");
195shared_name!(concat_outlet_name, "Concat.out");
196shared_name!(or_else_stage_name, "OrElse");
197shared_name!(or_else_primary_name, "OrElse.primary");
198shared_name!(or_else_secondary_name, "OrElse.secondary");
199shared_name!(or_else_outlet_name, "OrElse.out");
200shared_name!(interleave_stage_name, "Interleave");
201shared_name!(interleave_outlet_name, "Interleave.out");
202shared_name!(zip_stage_name, "Zip");
203shared_name!(zip_in0_name, "Zip.in0");
204shared_name!(zip_in1_name, "Zip.in1");
205shared_name!(zip_outlet_name, "Zip.out");
206shared_name!(async_boundary_stage_name, "AsyncBoundary");
207shared_name!(async_boundary_inlet_name, "AsyncBoundary.in");
208shared_name!(async_boundary_outlet_name, "AsyncBoundary.out");
209
210mod builder;
211mod executor;
212mod junctions;
213mod ports;
214mod shapes;
215mod stage;
216
217#[cfg(test)]
218use self::executor::BoundaryCountExecutor;
219use self::{builder::StageRecord, shapes::PortAllocator, stage::StageKind};
220
221pub use self::{
222 builder::{
223 AsyncBoundaryExecutionConfig, FusedExecutionConfig, FusedExecutionReport, FusedSegment,
224 FusedTerminalReport, Graph, GraphBlueprint, GraphBuilder, GraphDsl, ImportedGraph,
225 PartialGraph,
226 },
227 junctions::{
228 AsyncBoundary, Balance, Broadcast, Buffer, Concat, Identity, Interleave, MapStage, Merge,
229 MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted, OrElse,
230 Partition, TakeWhile, Unzip, UnzipWith, Zip,
231 },
232 ports::{AnyInlet, AnyOutlet, Inlet, Outlet, PortRef},
233 shapes::{
234 BidiShape, FanInShape, FanOutShape, FanOutShape2, FlowShape, MergePreferredShape, Shape,
235 SinkShape, SourceShape, ZipShape,
236 },
237 stage::{AsyncCallback, GraphStage, GraphStageLogic, InHandler, OutHandler, StageSpec},
238};
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::Attributes;
244
245 #[test]
246 fn graph_dsl_builds_broadcast_zip_flow() {
247 let graph = GraphDsl::try_create(|builder| {
248 let broadcast = builder.add(Broadcast::<i32>::new(2));
249 let zip = builder.add(Zip::<i32, i32>::new());
250
251 builder.connect(broadcast.outlet(0)?, zip.in0())?;
252 builder.connect(broadcast.outlet(1)?, zip.in1())?;
253
254 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
255 })
256 .unwrap();
257
258 assert_eq!(graph.stage_count(), 2);
259 assert_eq!(graph.edge_count(), 2);
260 assert_eq!(graph.shape().inlets().len(), 1);
261 assert_eq!(graph.shape().outlets().len(), 1);
262 assert_eq!(
263 graph.run_with_input([1, 2, 3]).unwrap(),
264 vec![(1, 1), (2, 2), (3, 3)]
265 );
266 }
267
268 #[test]
269 fn graph_dsl_zip_slots_follow_inlet_ids() {
270 let graph = GraphDsl::try_create(|builder| {
271 let broadcast = builder.add(Broadcast::<i32>::new(2));
272 let identity = builder.add(Identity::<i32>::new());
273 let zip = builder.add(Zip::<i32, i32>::new());
274
275 builder.connect(broadcast.outlet(0)?, identity.inlet())?;
276 builder.connect(identity.outlet(), zip.in1())?;
277 builder.connect(broadcast.outlet(1)?, zip.in0())?;
278
279 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
280 })
281 .unwrap();
282
283 assert_eq!(
284 graph.run_with_input([1, 2, 3]).unwrap(),
285 vec![(1, 1), (2, 2), (3, 3)]
286 );
287 }
288
289 #[test]
290 fn graph_dsl_zip_buffers_skewed_inlet_arrivals() {
291 let graph = GraphDsl::try_create(|builder| {
307 let fan = builder.add(Broadcast::<i32>::new(2));
308 let doubler = builder.add(Broadcast::<i32>::new(2));
309 let merge = builder.add(Merge::<i32>::new(2));
310 let zip = builder.add(Zip::<i32, i32>::new());
311
312 builder.connect(fan.outlet(0)?, doubler.inlet())?;
313 builder.connect(doubler.outlet(0)?, merge.inlet(0)?)?;
314 builder.connect(doubler.outlet(1)?, merge.inlet(1)?)?;
315 builder.connect(merge.outlet(), zip.in0())?;
316 builder.connect(fan.outlet(1)?, zip.in1())?;
317
318 Ok(FlowShape::new(fan.inlet(), zip.outlet()))
319 })
320 .unwrap();
321
322 assert_eq!(
326 graph.run_with_input([10, 20]).unwrap(),
327 vec![(10, 10), (10, 20)]
328 );
329 }
330
331 #[test]
332 fn graph_dsl_runs_merge_preferred_buffered_feedback_cycle() {
333 let graph = GraphDsl::try_create(|builder| {
334 let merge = builder.add(MergePreferred::<i32>::new(1));
335 let broadcast = builder.add(Broadcast::<i32>::new(2));
336 let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
337 let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
338 let decrement = builder.add(MapStage::new(|item: i32| item - 1));
339
340 builder.connect(merge.outlet(), broadcast.inlet())?;
341 builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
342 builder.connect(buffer.outlet(), positive.inlet())?;
343 builder.connect(positive.outlet(), decrement.inlet())?;
344 builder.connect(decrement.outlet(), merge.preferred())?;
345
346 Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
347 })
348 .unwrap();
349
350 assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
351 }
352
353 #[test]
354 fn graph_dsl_runs_buffered_merge_feedback_cycle() {
355 let graph = GraphDsl::try_create(|builder| {
356 let merge = builder.add(Merge::<i32>::new(2));
357 let broadcast = builder.add(Broadcast::<i32>::new(2));
358 let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
359 let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
360 let decrement = builder.add(MapStage::new(|item: i32| item - 1));
361
362 builder.connect(merge.outlet(), broadcast.inlet())?;
363 builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
364 builder.connect(buffer.outlet(), positive.inlet())?;
365 builder.connect(positive.outlet(), decrement.inlet())?;
366 builder.connect(decrement.outlet(), merge.inlet(1)?)?;
367
368 Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
369 })
370 .unwrap();
371
372 assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
373 }
374
375 #[test]
376 fn graph_dsl_unbuffered_merge_cycle_surfaces_event_limit() {
377 let graph = GraphDsl::try_create(|builder| {
378 let merge = builder.add(Merge::<i32>::new(2));
379 let broadcast = builder.add(Broadcast::<i32>::new(2));
380
381 builder.connect(merge.outlet(), broadcast.inlet())?;
382 builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
383
384 Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
385 })
386 .unwrap();
387
388 let result = graph.run_with_input_report([1], FusedExecutionConfig { event_limit: 256 });
389
390 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 256 }));
391 }
392
393 #[test]
394 fn partial_graph_can_be_imported_with_its_shape() {
395 let partial = GraphDsl::partial(|builder| {
396 let first = builder.add(Identity::<i32>::new());
397 let second = builder.add_named(Identity::<i32>::new(), "partial.tail");
398 builder.connect(first.outlet(), second.inlet())?;
399 Ok(FlowShape::new(first.inlet(), second.outlet()))
400 })
401 .named("partial.identity");
402
403 let graph = GraphDsl::try_create(|builder| {
404 let imported = builder.import(&partial)?;
405 let after = builder.add(Identity::<i32>::new());
406 builder.connect(imported.outlet(), after.inlet())?;
407 Ok(FlowShape::new(imported.inlet(), after.outlet()))
408 })
409 .unwrap()
410 .named("outer.graph");
411
412 assert_eq!(graph.run_with_input([1, 2, 3]).unwrap(), vec![1, 2, 3]);
413 assert_eq!(graph.attributes().name(), Some("outer.graph"));
414 }
415
416 #[test]
417 fn graph_attributes_follow_innermost_wins_order() {
418 let graph = GraphDsl::create(|builder| {
419 builder.add_with_attributes(
420 Identity::<i32>::new(),
421 Attributes::named("stage-outer").and(Attributes::named("stage-inner")),
422 )
423 })
424 .unwrap()
425 .add_attributes(Attributes::dispatcher("graph-outer"))
426 .add_attributes(Attributes::dispatcher("graph-inner"));
427
428 assert_eq!(graph.attributes().dispatcher_hint(), Some("graph-inner"));
429 assert_eq!(
430 graph.stages[0].spec.attributes().name(),
431 Some("stage-inner")
432 );
433 }
434
435 #[test]
436 fn graph_dsl_rejects_invalid_and_incomplete_wiring() {
437 let graph = GraphDsl::try_create(|builder| {
438 let first = builder.add(Identity::<i32>::new());
439 let second = builder.add(Identity::<i32>::new());
440 let third = builder.add(Identity::<i32>::new());
441
442 builder.connect(first.outlet(), second.inlet())?;
443 let duplicate = builder.connect(first.outlet(), third.inlet());
444 assert!(matches!(duplicate, Err(StreamError::GraphValidation(_))));
445
446 Ok(FlowShape::new(first.inlet(), second.outlet()))
447 });
448
449 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
450
451 let graph = GraphDsl::create(|builder| {
452 let broadcast = builder.add(Broadcast::<i32>::new(2));
453 SourceShape::new(broadcast.outlet(0).unwrap())
454 });
455 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
456 }
457
458 #[test]
459 fn graph_dsl_rejects_erased_type_mismatch() {
460 let graph = GraphDsl::try_create(|builder| {
461 let left = builder.add(Identity::<i32>::new());
462 let right = builder.add(Identity::<u64>::new());
463 let mismatch = builder.connect_any(left.outlet().erase(), right.inlet().erase());
464 assert!(matches!(mismatch, Err(StreamError::GraphValidation(_))));
465 Ok(FlowShape::new(left.inlet(), right.outlet()))
466 });
467
468 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
469 }
470
471 #[test]
472 fn graph_dsl_rejects_result_ports_with_spoofed_metadata() {
473 let graph = GraphDsl::create(|builder| {
474 let shape = builder.add(Identity::<i32>::new());
475 FlowShape::new(
476 Inlet::<u64>::with_id(shape.inlet().id(), "Identity.in"),
477 shape.outlet(),
478 )
479 });
480
481 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
482 }
483
484 #[test]
485 fn allocated_ports_do_not_collide_with_manual_ports() {
486 let manual = Inlet::<i32>::new("manual");
487 let graph = GraphDsl::try_create(|builder| {
488 let shape = builder.add(Identity::<i32>::new());
489 assert_ne!(manual.id(), shape.inlet().id());
490 Ok(shape)
491 });
492
493 assert!(graph.is_ok());
494 }
495
496 #[test]
497 fn identity_ports_keep_static_names_and_unique_ids() {
498 let graph = GraphDsl::try_create(|builder| {
499 let first = builder.add(Identity::<i32>::new());
500 let second = builder.add(Identity::<i32>::new());
501
502 assert_eq!(first.inlet().name(), "Identity.in");
503 assert_eq!(first.outlet().name(), "Identity.out");
504 assert_eq!(second.inlet().name(), "Identity.in");
505 assert_eq!(second.outlet().name(), "Identity.out");
506 assert_ne!(first.inlet().id(), first.outlet().id());
507 assert_ne!(first.outlet().id(), second.inlet().id());
508 assert_ne!(second.inlet().id(), second.outlet().id());
509
510 builder.connect(first.outlet(), second.inlet())?;
511 Ok(FlowShape::new(first.inlet(), second.outlet()))
512 });
513
514 assert!(graph.is_ok());
515 }
516
517 #[test]
518 fn graph_stage_logic_checks_port_operations() {
519 let shape = FlowShape::new(Inlet::<i32>::new("in"), Outlet::<i32>::new("out"));
520 let inlet = shape.inlet();
521 let outlet = shape.outlet();
522 let mut logic = GraphStageLogic::new(&shape);
523
524 assert!(matches!(
525 logic.grab(&inlet),
526 Err(StreamError::InvalidPortOperation { .. })
527 ));
528 logic.pull(&inlet).unwrap();
529 assert!(logic.has_been_pulled(&inlet));
530 assert!(matches!(
531 logic.pull(&inlet),
532 Err(StreamError::InvalidPortOperation { .. })
533 ));
534 logic.offer(&inlet, 41).unwrap();
535 assert!(!logic.has_been_pulled(&inlet));
536 assert_eq!(logic.grab(&inlet).unwrap(), 41);
537
538 assert!(matches!(
539 logic.push(&outlet, 1),
540 Err(StreamError::InvalidPortOperation { .. })
541 ));
542 logic.request(&outlet).unwrap();
543 assert!(logic.is_available(&outlet));
544 logic.push(&outlet, 42).unwrap();
545 assert!(!logic.is_available(&outlet));
546 logic.complete(&outlet).unwrap();
547 assert!(logic.is_closed(&outlet));
548 assert!(matches!(
549 logic.request(&outlet),
550 Err(StreamError::InvalidPortOperation { .. })
551 ));
552 }
553
554 #[test]
555 fn fused_execution_enforces_event_limit() {
556 let graph = GraphDsl::create(|builder| builder.add(Identity::<i32>::new())).unwrap();
557
558 let result = graph.run_with_input_report([1, 2], FusedExecutionConfig { event_limit: 1 });
559
560 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 1 }));
561 }
562
563 #[test]
564 fn async_boundary_splits_fused_segments() {
565 let graph = GraphDsl::try_create(|builder| {
566 let first = builder.add(Identity::<i32>::new());
567 let boundary = builder.add(AsyncBoundary::<i32>::new());
568 let second = builder.add(MapStage::new(|item: i32| item + 1));
569
570 builder.connect(first.outlet(), boundary.inlet())?;
571 builder.connect(boundary.outlet(), second.inlet())?;
572
573 Ok(FlowShape::new(first.inlet(), second.outlet()))
574 })
575 .unwrap();
576
577 assert_eq!(graph.segments().len(), 3);
578 let report = graph
579 .run_with_input_report([1, 2], FusedExecutionConfig::default())
580 .unwrap();
581 assert_eq!(report.output, vec![2, 3]);
582 assert_eq!(report.async_boundary_crossings, 2);
583 }
584
585 #[test]
586 fn async_boundary_count_path_uses_ractor_handoff_segments() {
587 let graph = GraphDsl::try_create(|builder| {
588 let first = builder.add(MapStage::new(|item: i32| item + 1));
589 let boundary = builder.add(AsyncBoundary::<i32>::new());
590 let second = builder.add(MapStage::new(|item: i32| item * 2));
591
592 builder.connect(first.outlet(), boundary.inlet())?;
593 builder.connect(boundary.outlet(), second.inlet())?;
594
595 Ok(FlowShape::new(first.inlet(), second.outlet()))
596 })
597 .unwrap();
598
599 let report = graph
600 .run_async_boundary_count_with_input_report(
601 [1, 2, 3],
602 AsyncBoundaryExecutionConfig {
603 fused: FusedExecutionConfig::default(),
604 buffer_size: 2,
605 },
606 )
607 .unwrap();
608
609 assert_eq!(report.result, 3);
610 assert_eq!(report.async_boundary_crossings, 3);
611 assert_eq!(report.events, 18);
612 }
613
614 #[test]
615 fn threaded_async_boundary_baseline_matches_ractor_count_path() {
616 let graph = GraphDsl::try_create(|builder| {
617 let first = builder.add(MapStage::new(|item: i32| item + 1));
618 let boundary = builder.add(AsyncBoundary::<i32>::new());
619 let second = builder.add(MapStage::new(|item: i32| item * 2));
620
621 builder.connect(first.outlet(), boundary.inlet())?;
622 builder.connect(boundary.outlet(), second.inlet())?;
623
624 Ok(FlowShape::new(first.inlet(), second.outlet()))
625 })
626 .unwrap();
627
628 let config = AsyncBoundaryExecutionConfig {
629 fused: FusedExecutionConfig::default(),
630 buffer_size: 2,
631 };
632 let ractor_report = graph
633 .run_async_boundary_count_with_input_report([1, 2, 3], config)
634 .unwrap();
635 let threaded_report = BoundaryCountExecutor::Threaded
636 .run_count(
637 [1, 2, 3],
638 graph.typed_linear_async_segments().unwrap(),
639 config,
640 )
641 .unwrap();
642
643 assert_eq!(threaded_report, ractor_report);
644 }
645
646 #[test]
647 fn ractor_async_boundary_rejects_zero_buffer_size() {
648 let graph = GraphDsl::try_create(|builder| {
649 let first = builder.add(MapStage::new(|item: i32| item + 1));
650 let boundary = builder.add(AsyncBoundary::<i32>::new());
651 let second = builder.add(MapStage::new(|item: i32| item * 2));
652
653 builder.connect(first.outlet(), boundary.inlet())?;
654 builder.connect(boundary.outlet(), second.inlet())?;
655
656 Ok(FlowShape::new(first.inlet(), second.outlet()))
657 })
658 .unwrap();
659
660 let result = BoundaryCountExecutor::Ractor.run_count(
661 [1, 2, 3],
662 graph.typed_linear_async_segments().unwrap(),
663 AsyncBoundaryExecutionConfig {
664 fused: FusedExecutionConfig::default(),
665 buffer_size: 0,
666 },
667 );
668
669 assert!(matches!(
670 result,
671 Err(StreamError::GraphValidation(message)) if message.contains("buffer_size")
672 ));
673 }
674
675 #[test]
676 fn ractor_async_boundary_streams_input_without_eager_collection() {
677 let graph = GraphDsl::try_create(|builder| {
678 let first = builder.add(MapStage::new(|item: i32| item + 1));
679 let boundary = builder.add(AsyncBoundary::<i32>::new());
680 let second = builder.add(MapStage::new(|item: i32| item * 2));
681
682 builder.connect(first.outlet(), boundary.inlet())?;
683 builder.connect(boundary.outlet(), second.inlet())?;
684
685 Ok(FlowShape::new(first.inlet(), second.outlet()))
686 })
687 .unwrap();
688
689 let result = BoundaryCountExecutor::Ractor.run_count(
690 std::iter::repeat(1_i32),
691 graph.typed_linear_async_segments().unwrap(),
692 AsyncBoundaryExecutionConfig {
693 fused: FusedExecutionConfig { event_limit: 4 },
694 buffer_size: 1,
695 },
696 );
697
698 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 4 }));
699 }
700
701 #[test]
702 fn ractor_async_boundary_runs_inside_existing_tokio_runtime() {
703 let graph = GraphDsl::try_create(|builder| {
704 let first = builder.add(MapStage::new(|item: i32| item + 1));
705 let boundary = builder.add(AsyncBoundary::<i32>::new());
706 let second = builder.add(MapStage::new(|item: i32| item * 2));
707
708 builder.connect(first.outlet(), boundary.inlet())?;
709 builder.connect(boundary.outlet(), second.inlet())?;
710
711 Ok(FlowShape::new(first.inlet(), second.outlet()))
712 })
713 .unwrap();
714
715 let runtime = tokio::runtime::Builder::new_current_thread()
716 .build()
717 .unwrap();
718 let report = runtime
719 .block_on(async {
720 BoundaryCountExecutor::Ractor.run_count(
721 [1, 2, 3],
722 graph.typed_linear_async_segments().unwrap(),
723 AsyncBoundaryExecutionConfig {
724 fused: FusedExecutionConfig::default(),
725 buffer_size: 2,
726 },
727 )
728 })
729 .unwrap();
730
731 assert_eq!(report.result, 3);
732 assert_eq!(report.async_boundary_crossings, 3);
733 assert_eq!(report.events, 18);
734 }
735
736 #[test]
737 fn balance_merge_round_robins_through_junctions() {
738 let graph = GraphDsl::try_create(|builder| {
739 let balance = builder.add(Balance::<i32>::new(2));
740 let merge = builder.add(Merge::<i32>::new(2));
741
742 builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
743 builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
744
745 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
746 })
747 .unwrap();
748
749 assert_eq!(graph.run_with_input(0..6).unwrap(), vec![0, 1, 2, 3, 4, 5]);
750 }
751
752 #[test]
753 fn prioritized_merge_uses_weighted_schedule() {
754 let graph =
755 GraphDsl::create(|builder| builder.add(MergePrioritized::<i32>::new(vec![2, 1])))
756 .unwrap();
757
758 assert_eq!(
759 graph
760 .run_fan_in(vec![vec![1, 2, 3, 4], vec![100, 101]])
761 .unwrap(),
762 vec![1, 2, 100, 3, 4, 101]
763 );
764 }
765
766 #[test]
767 fn fused_execution_supports_count_and_fold_sinks() {
768 let graph = GraphDsl::try_create(|builder| {
769 let first = builder.add(MapStage::new(|item: u64| item + 1));
770 let second = builder.add(MapStage::new(|item: u64| item * 2));
771
772 builder.connect(first.outlet(), second.inlet())?;
773
774 Ok(FlowShape::new(first.inlet(), second.outlet()))
775 })
776 .unwrap();
777
778 assert_eq!(graph.run_count_with_input(0..4).unwrap(), 4);
779 assert_eq!(
780 graph
781 .run_fold_with_input(0..4, 0, |acc, item| acc + item)
782 .unwrap(),
783 20
784 );
785 }
786
787 #[test]
788 fn typed_linear_fast_path_runs_same_type_chains() {
789 let graph = GraphDsl::try_create(|builder| {
790 let first = builder.add(MapStage::new(|item: u64| item + 1));
791 let second = builder.add(MapStage::new(|item: u64| item * 2));
792
793 builder.connect(first.outlet(), second.inlet())?;
794
795 Ok(FlowShape::new(first.inlet(), second.outlet()))
796 })
797 .unwrap();
798
799 let report = graph
800 .run_typed_linear_with_input_report([1, 2, 3], FusedExecutionConfig::default())
801 .unwrap();
802 assert_eq!(report.output, vec![4, 6, 8]);
803 assert_eq!(report.events, 12);
804
805 assert_eq!(graph.run_typed_linear_count_with_input(0..4).unwrap(), 4);
806 assert_eq!(
807 graph
808 .run_typed_linear_fold_with_input(0..4, 0, |acc, item| acc + item)
809 .unwrap(),
810 20
811 );
812 }
813
814 #[test]
815 fn typed_linear_fast_path_rejects_junction_graphs() {
816 let graph = GraphDsl::try_create(|builder| {
817 let balance = builder.add(Balance::<i32>::new(2));
818 let merge = builder.add(Merge::<i32>::new(2));
819
820 builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
821 builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
822
823 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
824 })
825 .unwrap();
826
827 assert!(matches!(
828 graph.run_typed_linear_count_with_input([1, 2, 3]),
829 Err(StreamError::GraphValidation(_))
830 ));
831 }
832
833 #[test]
834 fn merge_preferred_drains_preferred_before_secondaries() {
835 let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i32>::new(2))).unwrap();
836
837 assert_eq!(
838 graph
839 .run_merge_preferred(vec![1, 2, 3], vec![vec![100, 101], vec![200]])
840 .unwrap(),
841 vec![1, 2, 3, 100, 200, 101]
842 );
843 }
844
845 #[test]
846 fn merge_waits_for_all_inputs_to_complete() {
847 let graph = GraphDsl::create(|builder| builder.add(Merge::<i32>::new(2))).unwrap();
848
849 assert_eq!(
850 graph.run_fan_in(vec![vec![], vec![10, 20]]).unwrap(),
851 vec![10, 20]
852 );
853 assert_eq!(graph.run_fan_in(vec![vec![1], vec![]]).unwrap(), vec![1]);
854 }
855
856 #[test]
857 fn merge_sorted_drains_remaining_input_after_peer_completes() {
858 let graph = GraphDsl::try_create(|builder| {
859 let unzip = builder.add(Unzip::<i32, i32>::new());
860 let merge = builder.add(MergeSorted::<i32>::new());
861
862 builder.connect(unzip.out0(), merge.inlet(0)?)?;
863 builder.connect(unzip.out1(), merge.inlet(1)?)?;
864
865 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
866 })
867 .unwrap();
868
869 assert_eq!(
870 graph.run_with_input([(1, 2), (4, 3), (6, 5)]).unwrap(),
871 vec![1, 2, 3, 4, 5, 6]
872 );
873 }
874
875 #[test]
876 fn merge_sequence_reorders_adversarial_arrivals_by_sequence_number() {
877 let graph = GraphDsl::try_create(|builder| {
878 let unzip = builder.add(Unzip::<u64, u64>::new());
879 let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
880
881 builder.connect(unzip.out0(), merge.inlet(0)?)?;
882 builder.connect(unzip.out1(), merge.inlet(1)?)?;
883
884 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
885 })
886 .unwrap();
887
888 assert_eq!(
889 graph.run_with_input([(0, 1), (2, 3), (4, 5)]).unwrap(),
890 vec![0, 1, 2, 3, 4, 5]
891 );
892 }
893
894 #[test]
895 fn merge_latest_emits_with_last_seen_peer_and_honors_eager_complete() {
896 let graph = GraphDsl::try_create(|builder| {
897 let unzip = builder.add(Unzip::<i32, i32>::new());
898 let merge = builder.add(MergeLatest::<i32>::new(2, false));
899
900 builder.connect(unzip.out0(), merge.inlet(0)?)?;
901 builder.connect(unzip.out1(), merge.inlet(1)?)?;
902
903 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
904 })
905 .unwrap();
906
907 assert_eq!(
908 graph.run_with_input([(1, 10), (2, 11)]).unwrap(),
909 vec![vec![1, 10], vec![2, 10], vec![2, 11]]
910 );
911 }
912
913 #[test]
914 fn zip_completes_when_any_input_completes() {
915 let graph = GraphDsl::create(|builder| builder.add(Zip::<i32, i32>::new())).unwrap();
916
917 assert_eq!(
918 graph.run_zip(vec![1, 2, 3], vec![10]).unwrap(),
919 vec![(1, 10)]
920 );
921 assert_eq!(graph.run_zip(vec![1], vec![10, 20]).unwrap(), vec![(1, 10)]);
922 assert_eq!(
923 graph.run_zip(vec![], vec![10, 20]).unwrap(),
924 Vec::<(i32, i32)>::new()
925 );
926 }
927
928 #[test]
929 fn concat_drains_inputs_in_declared_order() {
930 let graph = GraphDsl::create(|builder| builder.add(Concat::<i32>::new(3))).unwrap();
931
932 assert_eq!(
933 graph
934 .run_concat(vec![vec![1, 2], vec![], vec![3, 4]])
935 .unwrap(),
936 vec![1, 2, 3, 4]
937 );
938 }
939
940 #[test]
941 fn or_else_switches_only_if_primary_is_empty() {
942 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
943
944 assert_eq!(
945 graph.run_or_else(vec![], vec![10, 20]).unwrap(),
946 vec![10, 20]
947 );
948 assert_eq!(
949 graph.run_or_else(vec![1, 2], vec![10, 20]).unwrap(),
950 vec![1, 2]
951 );
952 }
953
954 #[test]
955 fn or_else_secondary_first_dropped_when_primary_emits() {
956 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
957
958 assert_eq!(
959 graph
960 .run_or_else_secondary_first(vec![1, 2], vec![10, 20])
961 .unwrap(),
962 vec![1, 2]
963 );
964 }
965
966 #[test]
967 fn or_else_secondary_first_flushed_when_primary_empty() {
968 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
969
970 assert_eq!(
971 graph
972 .run_or_else_secondary_first(vec![], vec![10, 20])
973 .unwrap(),
974 vec![10, 20]
975 );
976 }
977
978 #[test]
979 fn or_else_secondary_closed_then_primary_empty_drains_buffer() {
980 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
981
982 assert_eq!(
983 graph
984 .run_or_else_secondary_closed_first(vec![10, 20])
985 .unwrap(),
986 vec![10, 20]
987 );
988 }
989
990 #[test]
991 fn interleave_cycles_segment_sized_chunks() {
992 let graph = GraphDsl::create(|builder| builder.add(Interleave::<i32>::new(3, 2))).unwrap();
993
994 assert_eq!(
995 graph
996 .run_interleave(vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]], 2, false)
997 .unwrap(),
998 vec![1, 2, 10, 11, 20, 3, 12]
999 );
1000 }
1001
1002 #[test]
1003 fn interleave_eager_close_stops_when_any_input_completes() {
1004 let graph = GraphDsl::create(|builder| {
1005 builder.add(Interleave::<i32>::new_with_eager_close(2, 1, true))
1006 })
1007 .unwrap();
1008
1009 assert_eq!(
1010 graph
1011 .run_interleave(vec![vec![1, 2], vec![]], 1, true)
1012 .unwrap(),
1013 Vec::<i32>::new()
1014 );
1015 assert_eq!(
1016 graph
1017 .run_interleave(vec![vec![1], vec![10, 11]], 1, true)
1018 .unwrap(),
1019 vec![1, 10]
1020 );
1021 }
1022
1023 #[test]
1024 fn partition_routes_only_live_outlets_after_peer_cancels() {
1025 let graph = GraphDsl::try_create(|builder| {
1026 let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
1027 let merge = builder.add(Merge::<i32>::new(2));
1028
1029 builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
1030 builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
1031
1032 Ok(FlowShape::new(partition.inlet(), merge.outlet()))
1033 })
1034 .unwrap();
1035
1036 assert_eq!(
1037 graph.run_with_input([0, 1, 2, 3]).unwrap(),
1038 vec![0, 1, 2, 3]
1039 );
1040 }
1041
1042 #[test]
1043 fn unzip_with_keeps_live_outlet_running_after_peer_finishes() {
1044 let graph = GraphDsl::try_create(|builder| {
1045 let unzip = builder.add(UnzipWith::<i32, i32, i32>::new(|item| (item, item * 10)));
1046 let zip = builder.add(Zip::<i32, i32>::new());
1047
1048 builder.connect(unzip.out0(), zip.in0())?;
1049 builder.connect(unzip.out1(), zip.in1())?;
1050
1051 Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
1052 })
1053 .unwrap();
1054
1055 assert_eq!(
1056 graph.run_with_input([1, 2, 3]).unwrap(),
1057 vec![(1, 10), (2, 20), (3, 30)]
1058 );
1059 }
1060
1061 #[test]
1067 fn unzip_merge_sorted_swapped_inlets_still_sorted() {
1068 let graph_normal = GraphDsl::try_create(|builder| {
1069 let unzip = builder.add(Unzip::<i32, i32>::new());
1070 let merge = builder.add(MergeSorted::<i32>::new());
1071 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1072 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1073 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1074 })
1075 .unwrap();
1076
1077 let graph_swapped = GraphDsl::try_create(|builder| {
1079 let unzip = builder.add(Unzip::<i32, i32>::new());
1080 let merge = builder.add(MergeSorted::<i32>::new());
1081 builder.connect(unzip.out0(), merge.inlet(1)?)?;
1082 builder.connect(unzip.out1(), merge.inlet(0)?)?;
1083 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1084 })
1085 .unwrap();
1086
1087 let input = vec![(1, 2), (4, 3), (6, 5)];
1088 let expected = vec![1, 2, 3, 4, 5, 6];
1089 assert_eq!(
1090 graph_normal.run_with_input(input.clone()).unwrap(),
1091 expected
1092 );
1093 assert_eq!(graph_swapped.run_with_input(input).unwrap(), expected);
1094 }
1095
1096 #[test]
1100 fn unzip_merge_latest_swapped_inlets_correct_snapshot_order() {
1101 let graph_normal = GraphDsl::try_create(|builder| {
1104 let unzip = builder.add(Unzip::<i32, i32>::new());
1105 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1106 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1107 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1108 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1109 })
1110 .unwrap();
1111
1112 let graph_swapped = GraphDsl::try_create(|builder| {
1116 let unzip = builder.add(Unzip::<i32, i32>::new());
1117 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1118 builder.connect(unzip.out0(), merge.inlet(1)?)?;
1119 builder.connect(unzip.out1(), merge.inlet(0)?)?;
1120 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1121 })
1122 .unwrap();
1123
1124 assert_eq!(
1125 graph_normal.run_with_input([(1, 10)]).unwrap(),
1126 vec![vec![1, 10]]
1127 );
1128 assert_eq!(
1130 graph_swapped.run_with_input([(1, 10)]).unwrap(),
1131 vec![vec![10, 1]]
1132 );
1133 }
1134
1135 #[test]
1139 fn merge_sequence_fails_on_gap_at_completion() {
1140 let graph = GraphDsl::try_create(|builder| {
1144 let unzip = builder.add(Unzip::<u64, u64>::new());
1145 let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
1146 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1147 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1148 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1149 })
1150 .unwrap();
1151
1152 let result = graph.run_with_input([(1u64, 2u64)]);
1153 assert!(
1154 matches!(result, Err(StreamError::Failed(ref msg)) if msg.contains("expected sequence")),
1155 "expected a sequence-gap error, got: {result:?}"
1156 );
1157 }
1158
1159 #[test]
1162 fn merge_latest_eager_complete_closes_on_first_inlet_done() {
1163 let graph_eager = GraphDsl::try_create(|builder| {
1168 let unzip = builder.add(Unzip::<i32, i32>::new());
1169 let merge = builder.add(MergeLatest::<i32>::new(2, true));
1170 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1171 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1172 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1173 })
1174 .unwrap();
1175
1176 let graph_non_eager = GraphDsl::try_create(|builder| {
1177 let unzip = builder.add(Unzip::<i32, i32>::new());
1178 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1179 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1180 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1181 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1182 })
1183 .unwrap();
1184
1185 let result_eager = graph_eager.run_with_input([(1i32, 10i32)]).unwrap();
1189 let result_non_eager = graph_non_eager.run_with_input([(1i32, 10i32)]).unwrap();
1190
1191 assert!(!result_eager.is_empty(), "eager graph produced no output");
1193 assert!(
1194 !result_non_eager.is_empty(),
1195 "non-eager graph produced no output"
1196 );
1197
1198 assert_eq!(result_eager, result_non_eager);
1201 }
1202}