1use std::{
9 any::{Any, TypeId, type_name},
10 collections::{HashMap, HashSet, VecDeque},
11 fmt,
12 marker::PhantomData,
13 sync::{
14 Arc, Mutex, OnceLock,
15 atomic::{AtomicUsize, Ordering},
16 },
17 thread,
18};
19
20#[cfg(test)]
21use std::sync::mpsc;
22
23use crate::{
24 actor::{Actor, ActorProcessingErr, ActorRef},
25 stream::{OverflowStrategy, StreamError, StreamResult},
26};
27
28type DatumValue = Box<dyn DatumElement>;
29type StageMapFn = dyn Fn(DatumValue) -> StreamResult<DatumValue> + Send + Sync;
30type StageTypedMapFn = dyn Any + Send + Sync;
31type StageZipFn = dyn Fn(DatumValue, DatumValue) -> StreamResult<DatumValue> + Send + Sync;
32type StageUnzipFn = dyn Fn(DatumValue) -> (DatumValue, DatumValue) + Send + Sync;
33type StageTypedUnzipFn = dyn Any + Send + Sync;
36type StageCompareFn = dyn Fn(&DatumValue, &DatumValue) -> std::cmp::Ordering + Send + Sync;
37type StageSequenceFn = dyn Fn(&DatumValue) -> u64 + Send + Sync;
38type StageTypedSequenceFn = dyn Any + Send + Sync;
41type StageSnapshotFn = dyn Fn(&[&DatumValue]) -> DatumValue + Send + Sync;
42type StageTypedSnapshotFn = dyn Any + Send + Sync;
45type StagePartitionFn = dyn Fn(&DatumValue) -> usize + Send + Sync;
46type StageTypedPartitionFn = dyn Any + Send + Sync;
49
50#[derive(Clone)]
51struct StageMapFns {
52 erased: Arc<StageMapFn>,
53 typed: Arc<StageTypedMapFn>,
54}
55
56#[derive(Clone)]
64pub(super) enum TypedCyclicOp {
65 BufferPassthrough,
68 TakeWhile(Arc<dyn Any + Send + Sync>),
72}
73
74impl fmt::Debug for TypedCyclicOp {
75 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76 match self {
77 Self::BufferPassthrough => f.write_str("BufferPassthrough"),
78 Self::TakeWhile(_) => f.write_str("TakeWhile(..)"),
79 }
80 }
81}
82
83pub(crate) trait DatumElement: Any + Send {
84 fn clone_box(&self) -> DatumValue;
85 fn into_any(self: Box<Self>) -> Box<dyn Any + Send>;
86 fn as_any_ref(&self) -> &dyn Any;
87}
88
89impl<T> DatumElement for T
90where
91 T: Any + Clone + Send,
92{
93 fn clone_box(&self) -> DatumValue {
94 Box::new(self.clone())
95 }
96
97 fn into_any(self: Box<Self>) -> Box<dyn Any + Send> {
98 self
99 }
100
101 fn as_any_ref(&self) -> &dyn Any {
102 self
103 }
104}
105
106fn datum<T>(value: T) -> DatumValue
107where
108 T: Clone + Send + 'static,
109{
110 Box::new(value)
111}
112
113fn downcast_datum<T, S>(
114 value: DatumValue,
115 operation: &'static str,
116 port: impl FnOnce() -> S,
117) -> StreamResult<T>
118where
119 T: Send + 'static,
120 S: Into<String>,
121{
122 value
127 .into_any()
128 .downcast::<T>()
129 .map(|value| *value)
130 .map_err(|_| StreamError::InvalidPortOperation {
131 operation,
132 port: port().into(),
133 reason: format!("element type did not match {}", type_name::<T>()),
134 })
135}
136
137static NEXT_PORT_ID: AtomicUsize = AtomicUsize::new(1);
138
139fn next_port_id() -> PortId {
140 PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed))
141}
142
143fn next_port_id_block(count: usize) -> PortId {
144 debug_assert!(count > 0);
145 PortId(NEXT_PORT_ID.fetch_add(count, Ordering::Relaxed))
146}
147
148#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
149pub struct PortId(usize);
150
151impl PortId {
152 #[must_use]
153 pub const fn as_usize(self) -> usize {
154 self.0
155 }
156
157 const fn offset(self, offset: usize) -> Self {
158 Self(self.0 + offset)
159 }
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
163pub enum PortKind {
164 Inlet,
165 Outlet,
166}
167
168macro_rules! shared_name {
169 ($fn_name:ident, $value:literal) => {
170 fn $fn_name() -> Arc<str> {
171 static NAME: OnceLock<Arc<str>> = OnceLock::new();
172 Arc::clone(NAME.get_or_init(|| Arc::from($value)))
173 }
174 };
175}
176
177shared_name!(identity_stage_name, "Identity");
178shared_name!(identity_inlet_name, "Identity.in");
179shared_name!(identity_outlet_name, "Identity.out");
180shared_name!(map_stage_name, "Map");
181shared_name!(map_inlet_name, "Map.in");
182shared_name!(map_outlet_name, "Map.out");
183shared_name!(broadcast_stage_name, "Broadcast");
184shared_name!(broadcast_inlet_name, "Broadcast.in");
185shared_name!(balance_stage_name, "Balance");
186shared_name!(balance_inlet_name, "Balance.in");
187shared_name!(merge_stage_name, "Merge");
188shared_name!(merge_outlet_name, "Merge.out");
189shared_name!(merge_preferred_stage_name, "MergePreferred");
190shared_name!(merge_preferred_preferred_name, "MergePreferred.preferred");
191shared_name!(merge_preferred_outlet_name, "MergePreferred.out");
192shared_name!(merge_prioritized_stage_name, "MergePrioritized");
193shared_name!(merge_prioritized_outlet_name, "MergePrioritized.out");
194shared_name!(concat_stage_name, "Concat");
195shared_name!(concat_outlet_name, "Concat.out");
196shared_name!(or_else_stage_name, "OrElse");
197shared_name!(or_else_primary_name, "OrElse.primary");
198shared_name!(or_else_secondary_name, "OrElse.secondary");
199shared_name!(or_else_outlet_name, "OrElse.out");
200shared_name!(interleave_stage_name, "Interleave");
201shared_name!(interleave_outlet_name, "Interleave.out");
202shared_name!(zip_stage_name, "Zip");
203shared_name!(zip_in0_name, "Zip.in0");
204shared_name!(zip_in1_name, "Zip.in1");
205shared_name!(zip_outlet_name, "Zip.out");
206shared_name!(async_boundary_stage_name, "AsyncBoundary");
207shared_name!(async_boundary_inlet_name, "AsyncBoundary.in");
208shared_name!(async_boundary_outlet_name, "AsyncBoundary.out");
209
210mod builder;
211mod executor;
212mod junctions;
213mod ports;
214mod shapes;
215mod stage;
216
217#[cfg(test)]
218use self::executor::BoundaryCountExecutor;
219use self::{
220 builder::StageRecord,
221 stage::{StageKind, StageTimerMailbox, StageTimerRuntime},
222};
223
224pub use self::{
225 builder::{
226 AsyncBoundaryExecutionConfig, FusedExecutionConfig, FusedExecutionReport, FusedSegment,
227 FusedTerminalReport, Graph, GraphBlueprint, GraphBuilder, GraphDsl, ImportedGraph,
228 PartialGraph,
229 },
230 junctions::{
231 AsyncBoundary, Balance, Broadcast, Buffer, Concat, Identity, Interleave, MapStage, Merge,
232 MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted, OrElse,
233 Partition, TakeWhile, Unzip, UnzipWith, Zip,
234 },
235 ports::{AnyInlet, AnyOutlet, Inlet, Outlet, PortRef},
236 shapes::{
237 BidiShape, FanInShape, FanOutShape, FanOutShape2, FlowShape, MergePreferredShape,
238 PortAllocator, Shape, SinkShape, SourceShape, ZipShape,
239 },
240 stage::{
241 AsyncCallback, GraphStage, GraphStageLogic, InHandler, OutHandler, StageSpec, TimerHandler,
242 },
243};
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use crate::Attributes;
249
250 #[test]
251 fn graph_dsl_builds_broadcast_zip_flow() {
252 let graph = GraphDsl::try_create(|builder| {
253 let broadcast = builder.add(Broadcast::<i32>::new(2));
254 let zip = builder.add(Zip::<i32, i32>::new());
255
256 builder.connect(broadcast.outlet(0)?, zip.in0())?;
257 builder.connect(broadcast.outlet(1)?, zip.in1())?;
258
259 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
260 })
261 .unwrap();
262
263 assert_eq!(graph.stage_count(), 2);
264 assert_eq!(graph.edge_count(), 2);
265 assert_eq!(graph.shape().inlets().len(), 1);
266 assert_eq!(graph.shape().outlets().len(), 1);
267 assert_eq!(
268 graph.run_with_input([1, 2, 3]).unwrap(),
269 vec![(1, 1), (2, 2), (3, 3)]
270 );
271 }
272
273 #[test]
274 fn graph_dsl_zip_slots_follow_inlet_ids() {
275 let graph = GraphDsl::try_create(|builder| {
276 let broadcast = builder.add(Broadcast::<i32>::new(2));
277 let identity = builder.add(Identity::<i32>::new());
278 let zip = builder.add(Zip::<i32, i32>::new());
279
280 builder.connect(broadcast.outlet(0)?, identity.inlet())?;
281 builder.connect(identity.outlet(), zip.in1())?;
282 builder.connect(broadcast.outlet(1)?, zip.in0())?;
283
284 Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
285 })
286 .unwrap();
287
288 assert_eq!(
289 graph.run_with_input([1, 2, 3]).unwrap(),
290 vec![(1, 1), (2, 2), (3, 3)]
291 );
292 }
293
294 #[test]
295 fn graph_dsl_zip_buffers_skewed_inlet_arrivals() {
296 let graph = GraphDsl::try_create(|builder| {
312 let fan = builder.add(Broadcast::<i32>::new(2));
313 let doubler = builder.add(Broadcast::<i32>::new(2));
314 let merge = builder.add(Merge::<i32>::new(2));
315 let zip = builder.add(Zip::<i32, i32>::new());
316
317 builder.connect(fan.outlet(0)?, doubler.inlet())?;
318 builder.connect(doubler.outlet(0)?, merge.inlet(0)?)?;
319 builder.connect(doubler.outlet(1)?, merge.inlet(1)?)?;
320 builder.connect(merge.outlet(), zip.in0())?;
321 builder.connect(fan.outlet(1)?, zip.in1())?;
322
323 Ok(FlowShape::new(fan.inlet(), zip.outlet()))
324 })
325 .unwrap();
326
327 assert_eq!(
331 graph.run_with_input([10, 20]).unwrap(),
332 vec![(10, 10), (10, 20)]
333 );
334 }
335
336 #[test]
337 fn graph_dsl_runs_merge_preferred_buffered_feedback_cycle() {
338 let graph = GraphDsl::try_create(|builder| {
339 let merge = builder.add(MergePreferred::<i32>::new(1));
340 let broadcast = builder.add(Broadcast::<i32>::new(2));
341 let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
342 let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
343 let decrement = builder.add(MapStage::new(|item: i32| item - 1));
344
345 builder.connect(merge.outlet(), broadcast.inlet())?;
346 builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
347 builder.connect(buffer.outlet(), positive.inlet())?;
348 builder.connect(positive.outlet(), decrement.inlet())?;
349 builder.connect(decrement.outlet(), merge.preferred())?;
350
351 Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
352 })
353 .unwrap();
354
355 assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
356 }
357
358 #[test]
359 fn graph_dsl_runs_buffered_merge_feedback_cycle() {
360 let graph = GraphDsl::try_create(|builder| {
361 let merge = builder.add(Merge::<i32>::new(2));
362 let broadcast = builder.add(Broadcast::<i32>::new(2));
363 let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
364 let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
365 let decrement = builder.add(MapStage::new(|item: i32| item - 1));
366
367 builder.connect(merge.outlet(), broadcast.inlet())?;
368 builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
369 builder.connect(buffer.outlet(), positive.inlet())?;
370 builder.connect(positive.outlet(), decrement.inlet())?;
371 builder.connect(decrement.outlet(), merge.inlet(1)?)?;
372
373 Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
374 })
375 .unwrap();
376
377 assert_eq!(graph.run_with_input([3]).unwrap(), vec![3, 2, 1, 0]);
378 }
379
380 #[test]
381 fn graph_dsl_unbuffered_merge_cycle_surfaces_event_limit() {
382 let graph = GraphDsl::try_create(|builder| {
383 let merge = builder.add(Merge::<i32>::new(2));
384 let broadcast = builder.add(Broadcast::<i32>::new(2));
385
386 builder.connect(merge.outlet(), broadcast.inlet())?;
387 builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
388
389 Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
390 })
391 .unwrap();
392
393 let result = graph.run_with_input_report([1], FusedExecutionConfig { event_limit: 256 });
394
395 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 256 }));
396 }
397
398 #[test]
399 fn partial_graph_can_be_imported_with_its_shape() {
400 let partial = GraphDsl::partial(|builder| {
401 let first = builder.add(Identity::<i32>::new());
402 let second = builder.add_named(Identity::<i32>::new(), "partial.tail");
403 builder.connect(first.outlet(), second.inlet())?;
404 Ok(FlowShape::new(first.inlet(), second.outlet()))
405 })
406 .named("partial.identity");
407
408 let graph = GraphDsl::try_create(|builder| {
409 let imported = builder.import(&partial)?;
410 let after = builder.add(Identity::<i32>::new());
411 builder.connect(imported.outlet(), after.inlet())?;
412 Ok(FlowShape::new(imported.inlet(), after.outlet()))
413 })
414 .unwrap()
415 .named("outer.graph");
416
417 assert_eq!(graph.run_with_input([1, 2, 3]).unwrap(), vec![1, 2, 3]);
418 assert_eq!(graph.attributes().name(), Some("outer.graph"));
419 }
420
421 #[test]
422 fn graph_attributes_follow_innermost_wins_order() {
423 let graph = GraphDsl::create(|builder| {
424 builder.add_with_attributes(
425 Identity::<i32>::new(),
426 Attributes::named("stage-outer").and(Attributes::named("stage-inner")),
427 )
428 })
429 .unwrap()
430 .add_attributes(Attributes::dispatcher("graph-outer"))
431 .add_attributes(Attributes::dispatcher("graph-inner"));
432
433 assert_eq!(graph.attributes().dispatcher_hint(), Some("graph-inner"));
434 assert_eq!(
435 graph.stages[0].spec.attributes().name(),
436 Some("stage-inner")
437 );
438 }
439
440 #[test]
441 fn graph_dsl_rejects_invalid_and_incomplete_wiring() {
442 let graph = GraphDsl::try_create(|builder| {
443 let first = builder.add(Identity::<i32>::new());
444 let second = builder.add(Identity::<i32>::new());
445 let third = builder.add(Identity::<i32>::new());
446
447 builder.connect(first.outlet(), second.inlet())?;
448 let duplicate = builder.connect(first.outlet(), third.inlet());
449 assert!(matches!(duplicate, Err(StreamError::GraphValidation(_))));
450
451 Ok(FlowShape::new(first.inlet(), second.outlet()))
452 });
453
454 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
455
456 let graph = GraphDsl::create(|builder| {
457 let broadcast = builder.add(Broadcast::<i32>::new(2));
458 SourceShape::new(broadcast.outlet(0).unwrap())
459 });
460 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
461 }
462
463 #[test]
464 fn graph_dsl_rejects_erased_type_mismatch() {
465 let graph = GraphDsl::try_create(|builder| {
466 let left = builder.add(Identity::<i32>::new());
467 let right = builder.add(Identity::<u64>::new());
468 let mismatch = builder.connect_any(left.outlet().erase(), right.inlet().erase());
469 assert!(matches!(mismatch, Err(StreamError::GraphValidation(_))));
470 Ok(FlowShape::new(left.inlet(), right.outlet()))
471 });
472
473 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
474 }
475
476 #[test]
477 fn graph_dsl_rejects_result_ports_with_spoofed_metadata() {
478 let graph = GraphDsl::create(|builder| {
479 let shape = builder.add(Identity::<i32>::new());
480 FlowShape::new(
481 Inlet::<u64>::with_id(shape.inlet().id(), "Identity.in"),
482 shape.outlet(),
483 )
484 });
485
486 assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
487 }
488
489 #[test]
490 fn allocated_ports_do_not_collide_with_manual_ports() {
491 let manual = Inlet::<i32>::new("manual");
492 let graph = GraphDsl::try_create(|builder| {
493 let shape = builder.add(Identity::<i32>::new());
494 assert_ne!(manual.id(), shape.inlet().id());
495 Ok(shape)
496 });
497
498 assert!(graph.is_ok());
499 }
500
501 #[test]
502 fn identity_ports_keep_static_names_and_unique_ids() {
503 let graph = GraphDsl::try_create(|builder| {
504 let first = builder.add(Identity::<i32>::new());
505 let second = builder.add(Identity::<i32>::new());
506
507 assert_eq!(first.inlet().name(), "Identity.in");
508 assert_eq!(first.outlet().name(), "Identity.out");
509 assert_eq!(second.inlet().name(), "Identity.in");
510 assert_eq!(second.outlet().name(), "Identity.out");
511 assert_ne!(first.inlet().id(), first.outlet().id());
512 assert_ne!(first.outlet().id(), second.inlet().id());
513 assert_ne!(second.inlet().id(), second.outlet().id());
514
515 builder.connect(first.outlet(), second.inlet())?;
516 Ok(FlowShape::new(first.inlet(), second.outlet()))
517 });
518
519 assert!(graph.is_ok());
520 }
521
522 #[test]
523 fn graph_stage_logic_checks_port_operations() {
524 let shape = FlowShape::new(Inlet::<i32>::new("in"), Outlet::<i32>::new("out"));
525 let inlet = shape.inlet();
526 let outlet = shape.outlet();
527 let mut logic = GraphStageLogic::new(&shape);
528
529 assert!(matches!(
530 logic.grab(&inlet),
531 Err(StreamError::InvalidPortOperation { .. })
532 ));
533 logic.pull(&inlet).unwrap();
534 assert!(logic.has_been_pulled(&inlet));
535 assert!(matches!(
536 logic.pull(&inlet),
537 Err(StreamError::InvalidPortOperation { .. })
538 ));
539 logic.offer(&inlet, 41).unwrap();
540 assert!(!logic.has_been_pulled(&inlet));
541 assert_eq!(logic.grab(&inlet).unwrap(), 41);
542
543 assert!(matches!(
544 logic.push(&outlet, 1),
545 Err(StreamError::InvalidPortOperation { .. })
546 ));
547 logic.request(&outlet).unwrap();
548 assert!(logic.is_available(&outlet));
549 logic.push(&outlet, 42).unwrap();
550 assert!(!logic.is_available(&outlet));
551 logic.complete(&outlet).unwrap();
552 assert!(logic.is_closed(&outlet));
553 assert!(matches!(
554 logic.request(&outlet),
555 Err(StreamError::InvalidPortOperation { .. })
556 ));
557 }
558
559 #[test]
560 fn fused_execution_enforces_event_limit() {
561 let graph = GraphDsl::create(|builder| builder.add(Identity::<i32>::new())).unwrap();
562
563 let result = graph.run_with_input_report([1, 2], FusedExecutionConfig { event_limit: 1 });
564
565 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 1 }));
566 }
567
568 #[test]
569 fn async_boundary_splits_fused_segments() {
570 let graph = GraphDsl::try_create(|builder| {
571 let first = builder.add(Identity::<i32>::new());
572 let boundary = builder.add(AsyncBoundary::<i32>::new());
573 let second = builder.add(MapStage::new(|item: i32| item + 1));
574
575 builder.connect(first.outlet(), boundary.inlet())?;
576 builder.connect(boundary.outlet(), second.inlet())?;
577
578 Ok(FlowShape::new(first.inlet(), second.outlet()))
579 })
580 .unwrap();
581
582 assert_eq!(graph.segments().len(), 3);
583 let report = graph
584 .run_with_input_report([1, 2], FusedExecutionConfig::default())
585 .unwrap();
586 assert_eq!(report.output, vec![2, 3]);
587 assert_eq!(report.async_boundary_crossings, 2);
588 }
589
590 #[test]
591 fn async_boundary_count_path_uses_ractor_handoff_segments() {
592 let graph = GraphDsl::try_create(|builder| {
593 let first = builder.add(MapStage::new(|item: i32| item + 1));
594 let boundary = builder.add(AsyncBoundary::<i32>::new());
595 let second = builder.add(MapStage::new(|item: i32| item * 2));
596
597 builder.connect(first.outlet(), boundary.inlet())?;
598 builder.connect(boundary.outlet(), second.inlet())?;
599
600 Ok(FlowShape::new(first.inlet(), second.outlet()))
601 })
602 .unwrap();
603
604 let report = graph
605 .run_async_boundary_count_with_input_report(
606 [1, 2, 3],
607 AsyncBoundaryExecutionConfig {
608 fused: FusedExecutionConfig::default(),
609 buffer_size: 2,
610 },
611 )
612 .unwrap();
613
614 assert_eq!(report.result, 3);
615 assert_eq!(report.async_boundary_crossings, 3);
616 assert_eq!(report.events, 18);
617 }
618
619 #[test]
620 fn threaded_async_boundary_baseline_matches_ractor_count_path() {
621 let graph = GraphDsl::try_create(|builder| {
622 let first = builder.add(MapStage::new(|item: i32| item + 1));
623 let boundary = builder.add(AsyncBoundary::<i32>::new());
624 let second = builder.add(MapStage::new(|item: i32| item * 2));
625
626 builder.connect(first.outlet(), boundary.inlet())?;
627 builder.connect(boundary.outlet(), second.inlet())?;
628
629 Ok(FlowShape::new(first.inlet(), second.outlet()))
630 })
631 .unwrap();
632
633 let config = AsyncBoundaryExecutionConfig {
634 fused: FusedExecutionConfig::default(),
635 buffer_size: 2,
636 };
637 let ractor_report = graph
638 .run_async_boundary_count_with_input_report([1, 2, 3], config)
639 .unwrap();
640 let threaded_report = BoundaryCountExecutor::Threaded
641 .run_count(
642 [1, 2, 3],
643 graph.typed_linear_async_segments().unwrap(),
644 config,
645 )
646 .unwrap();
647
648 assert_eq!(threaded_report, ractor_report);
649 }
650
651 #[test]
652 fn ractor_async_boundary_rejects_zero_buffer_size() {
653 let graph = GraphDsl::try_create(|builder| {
654 let first = builder.add(MapStage::new(|item: i32| item + 1));
655 let boundary = builder.add(AsyncBoundary::<i32>::new());
656 let second = builder.add(MapStage::new(|item: i32| item * 2));
657
658 builder.connect(first.outlet(), boundary.inlet())?;
659 builder.connect(boundary.outlet(), second.inlet())?;
660
661 Ok(FlowShape::new(first.inlet(), second.outlet()))
662 })
663 .unwrap();
664
665 let result = BoundaryCountExecutor::Ractor.run_count(
666 [1, 2, 3],
667 graph.typed_linear_async_segments().unwrap(),
668 AsyncBoundaryExecutionConfig {
669 fused: FusedExecutionConfig::default(),
670 buffer_size: 0,
671 },
672 );
673
674 assert!(matches!(
675 result,
676 Err(StreamError::GraphValidation(message)) if message.contains("buffer_size")
677 ));
678 }
679
680 #[test]
681 fn ractor_async_boundary_streams_input_without_eager_collection() {
682 let graph = GraphDsl::try_create(|builder| {
683 let first = builder.add(MapStage::new(|item: i32| item + 1));
684 let boundary = builder.add(AsyncBoundary::<i32>::new());
685 let second = builder.add(MapStage::new(|item: i32| item * 2));
686
687 builder.connect(first.outlet(), boundary.inlet())?;
688 builder.connect(boundary.outlet(), second.inlet())?;
689
690 Ok(FlowShape::new(first.inlet(), second.outlet()))
691 })
692 .unwrap();
693
694 let result = BoundaryCountExecutor::Ractor.run_count(
695 std::iter::repeat(1_i32),
696 graph.typed_linear_async_segments().unwrap(),
697 AsyncBoundaryExecutionConfig {
698 fused: FusedExecutionConfig { event_limit: 4 },
699 buffer_size: 1,
700 },
701 );
702
703 assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 4 }));
704 }
705
706 #[test]
707 fn ractor_async_boundary_runs_inside_existing_tokio_runtime() {
708 let graph = GraphDsl::try_create(|builder| {
709 let first = builder.add(MapStage::new(|item: i32| item + 1));
710 let boundary = builder.add(AsyncBoundary::<i32>::new());
711 let second = builder.add(MapStage::new(|item: i32| item * 2));
712
713 builder.connect(first.outlet(), boundary.inlet())?;
714 builder.connect(boundary.outlet(), second.inlet())?;
715
716 Ok(FlowShape::new(first.inlet(), second.outlet()))
717 })
718 .unwrap();
719
720 let runtime = tokio::runtime::Builder::new_current_thread()
721 .build()
722 .unwrap();
723 let report = runtime
724 .block_on(async {
725 BoundaryCountExecutor::Ractor.run_count(
726 [1, 2, 3],
727 graph.typed_linear_async_segments().unwrap(),
728 AsyncBoundaryExecutionConfig {
729 fused: FusedExecutionConfig::default(),
730 buffer_size: 2,
731 },
732 )
733 })
734 .unwrap();
735
736 assert_eq!(report.result, 3);
737 assert_eq!(report.async_boundary_crossings, 3);
738 assert_eq!(report.events, 18);
739 }
740
741 #[test]
742 fn balance_merge_round_robins_through_junctions() {
743 let graph = GraphDsl::try_create(|builder| {
744 let balance = builder.add(Balance::<i32>::new(2));
745 let merge = builder.add(Merge::<i32>::new(2));
746
747 builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
748 builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
749
750 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
751 })
752 .unwrap();
753
754 assert_eq!(graph.run_with_input(0..6).unwrap(), vec![0, 1, 2, 3, 4, 5]);
755 }
756
757 #[test]
758 fn prioritized_merge_uses_weighted_schedule() {
759 let graph =
760 GraphDsl::create(|builder| builder.add(MergePrioritized::<i32>::new(vec![2, 1])))
761 .unwrap();
762
763 assert_eq!(
764 graph
765 .run_fan_in(vec![vec![1, 2, 3, 4], vec![100, 101]])
766 .unwrap(),
767 vec![1, 2, 100, 3, 4, 101]
768 );
769 }
770
771 #[test]
772 fn fused_execution_supports_count_and_fold_sinks() {
773 let graph = GraphDsl::try_create(|builder| {
774 let first = builder.add(MapStage::new(|item: u64| item + 1));
775 let second = builder.add(MapStage::new(|item: u64| item * 2));
776
777 builder.connect(first.outlet(), second.inlet())?;
778
779 Ok(FlowShape::new(first.inlet(), second.outlet()))
780 })
781 .unwrap();
782
783 assert_eq!(graph.run_count_with_input(0..4).unwrap(), 4);
784 assert_eq!(
785 graph
786 .run_fold_with_input(0..4, 0, |acc, item| acc + item)
787 .unwrap(),
788 20
789 );
790 }
791
792 #[test]
793 fn typed_linear_fast_path_runs_same_type_chains() {
794 let graph = GraphDsl::try_create(|builder| {
795 let first = builder.add(MapStage::new(|item: u64| item + 1));
796 let second = builder.add(MapStage::new(|item: u64| item * 2));
797
798 builder.connect(first.outlet(), second.inlet())?;
799
800 Ok(FlowShape::new(first.inlet(), second.outlet()))
801 })
802 .unwrap();
803
804 let report = graph
805 .run_typed_linear_with_input_report([1, 2, 3], FusedExecutionConfig::default())
806 .unwrap();
807 assert_eq!(report.output, vec![4, 6, 8]);
808 assert_eq!(report.events, 12);
809
810 assert_eq!(graph.run_typed_linear_count_with_input(0..4).unwrap(), 4);
811 assert_eq!(
812 graph
813 .run_typed_linear_fold_with_input(0..4, 0, |acc, item| acc + item)
814 .unwrap(),
815 20
816 );
817 }
818
819 #[test]
820 fn typed_linear_fast_path_rejects_junction_graphs() {
821 let graph = GraphDsl::try_create(|builder| {
822 let balance = builder.add(Balance::<i32>::new(2));
823 let merge = builder.add(Merge::<i32>::new(2));
824
825 builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
826 builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
827
828 Ok(FlowShape::new(balance.inlet(), merge.outlet()))
829 })
830 .unwrap();
831
832 assert!(matches!(
833 graph.run_typed_linear_count_with_input([1, 2, 3]),
834 Err(StreamError::GraphValidation(_))
835 ));
836 }
837
838 #[test]
839 fn merge_preferred_drains_preferred_before_secondaries() {
840 let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i32>::new(2))).unwrap();
841
842 assert_eq!(
843 graph
844 .run_merge_preferred(vec![1, 2, 3], vec![vec![100, 101], vec![200]])
845 .unwrap(),
846 vec![1, 2, 3, 100, 200, 101]
847 );
848 }
849
850 #[test]
851 fn merge_waits_for_all_inputs_to_complete() {
852 let graph = GraphDsl::create(|builder| builder.add(Merge::<i32>::new(2))).unwrap();
853
854 assert_eq!(
855 graph.run_fan_in(vec![vec![], vec![10, 20]]).unwrap(),
856 vec![10, 20]
857 );
858 assert_eq!(graph.run_fan_in(vec![vec![1], vec![]]).unwrap(), vec![1]);
859 }
860
861 #[test]
862 fn merge_sorted_drains_remaining_input_after_peer_completes() {
863 let graph = GraphDsl::try_create(|builder| {
864 let unzip = builder.add(Unzip::<i32, i32>::new());
865 let merge = builder.add(MergeSorted::<i32>::new());
866
867 builder.connect(unzip.out0(), merge.inlet(0)?)?;
868 builder.connect(unzip.out1(), merge.inlet(1)?)?;
869
870 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
871 })
872 .unwrap();
873
874 assert_eq!(
875 graph.run_with_input([(1, 2), (4, 3), (6, 5)]).unwrap(),
876 vec![1, 2, 3, 4, 5, 6]
877 );
878 }
879
880 #[test]
881 fn merge_sequence_reorders_adversarial_arrivals_by_sequence_number() {
882 let graph = GraphDsl::try_create(|builder| {
883 let unzip = builder.add(Unzip::<u64, u64>::new());
884 let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
885
886 builder.connect(unzip.out0(), merge.inlet(0)?)?;
887 builder.connect(unzip.out1(), merge.inlet(1)?)?;
888
889 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
890 })
891 .unwrap();
892
893 assert_eq!(
894 graph.run_with_input([(0, 1), (2, 3), (4, 5)]).unwrap(),
895 vec![0, 1, 2, 3, 4, 5]
896 );
897 }
898
899 #[test]
900 fn merge_latest_emits_with_last_seen_peer_and_honors_eager_complete() {
901 let graph = GraphDsl::try_create(|builder| {
902 let unzip = builder.add(Unzip::<i32, i32>::new());
903 let merge = builder.add(MergeLatest::<i32>::new(2, false));
904
905 builder.connect(unzip.out0(), merge.inlet(0)?)?;
906 builder.connect(unzip.out1(), merge.inlet(1)?)?;
907
908 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
909 })
910 .unwrap();
911
912 assert_eq!(
913 graph.run_with_input([(1, 10), (2, 11)]).unwrap(),
914 vec![vec![1, 10], vec![2, 10], vec![2, 11]]
915 );
916 }
917
918 #[test]
919 fn zip_completes_when_any_input_completes() {
920 let graph = GraphDsl::create(|builder| builder.add(Zip::<i32, i32>::new())).unwrap();
921
922 assert_eq!(
923 graph.run_zip(vec![1, 2, 3], vec![10]).unwrap(),
924 vec![(1, 10)]
925 );
926 assert_eq!(graph.run_zip(vec![1], vec![10, 20]).unwrap(), vec![(1, 10)]);
927 assert_eq!(
928 graph.run_zip(vec![], vec![10, 20]).unwrap(),
929 Vec::<(i32, i32)>::new()
930 );
931 }
932
933 #[test]
934 fn concat_drains_inputs_in_declared_order() {
935 let graph = GraphDsl::create(|builder| builder.add(Concat::<i32>::new(3))).unwrap();
936
937 assert_eq!(
938 graph
939 .run_concat(vec![vec![1, 2], vec![], vec![3, 4]])
940 .unwrap(),
941 vec![1, 2, 3, 4]
942 );
943 }
944
945 #[test]
946 fn or_else_switches_only_if_primary_is_empty() {
947 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
948
949 assert_eq!(
950 graph.run_or_else(vec![], vec![10, 20]).unwrap(),
951 vec![10, 20]
952 );
953 assert_eq!(
954 graph.run_or_else(vec![1, 2], vec![10, 20]).unwrap(),
955 vec![1, 2]
956 );
957 }
958
959 #[test]
960 fn or_else_secondary_first_dropped_when_primary_emits() {
961 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
962
963 assert_eq!(
964 graph
965 .run_or_else_secondary_first(vec![1, 2], vec![10, 20])
966 .unwrap(),
967 vec![1, 2]
968 );
969 }
970
971 #[test]
972 fn or_else_secondary_first_flushed_when_primary_empty() {
973 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
974
975 assert_eq!(
976 graph
977 .run_or_else_secondary_first(vec![], vec![10, 20])
978 .unwrap(),
979 vec![10, 20]
980 );
981 }
982
983 #[test]
984 fn or_else_secondary_closed_then_primary_empty_drains_buffer() {
985 let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
986
987 assert_eq!(
988 graph
989 .run_or_else_secondary_closed_first(vec![10, 20])
990 .unwrap(),
991 vec![10, 20]
992 );
993 }
994
995 #[test]
996 fn interleave_cycles_segment_sized_chunks() {
997 let graph = GraphDsl::create(|builder| builder.add(Interleave::<i32>::new(3, 2))).unwrap();
998
999 assert_eq!(
1000 graph
1001 .run_interleave(vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]], 2, false)
1002 .unwrap(),
1003 vec![1, 2, 10, 11, 20, 3, 12]
1004 );
1005 }
1006
1007 #[test]
1008 fn interleave_eager_close_stops_when_any_input_completes() {
1009 let graph = GraphDsl::create(|builder| {
1010 builder.add(Interleave::<i32>::new_with_eager_close(2, 1, true))
1011 })
1012 .unwrap();
1013
1014 assert_eq!(
1015 graph
1016 .run_interleave(vec![vec![1, 2], vec![]], 1, true)
1017 .unwrap(),
1018 Vec::<i32>::new()
1019 );
1020 assert_eq!(
1021 graph
1022 .run_interleave(vec![vec![1], vec![10, 11]], 1, true)
1023 .unwrap(),
1024 vec![1, 10]
1025 );
1026 }
1027
1028 #[test]
1029 fn partition_routes_only_live_outlets_after_peer_cancels() {
1030 let graph = GraphDsl::try_create(|builder| {
1031 let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
1032 let merge = builder.add(Merge::<i32>::new(2));
1033
1034 builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
1035 builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
1036
1037 Ok(FlowShape::new(partition.inlet(), merge.outlet()))
1038 })
1039 .unwrap();
1040
1041 assert_eq!(
1042 graph.run_with_input([0, 1, 2, 3]).unwrap(),
1043 vec![0, 1, 2, 3]
1044 );
1045 }
1046
1047 #[test]
1048 fn unzip_with_keeps_live_outlet_running_after_peer_finishes() {
1049 let graph = GraphDsl::try_create(|builder| {
1050 let unzip = builder.add(UnzipWith::<i32, i32, i32>::new(|item| (item, item * 10)));
1051 let zip = builder.add(Zip::<i32, i32>::new());
1052
1053 builder.connect(unzip.out0(), zip.in0())?;
1054 builder.connect(unzip.out1(), zip.in1())?;
1055
1056 Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
1057 })
1058 .unwrap();
1059
1060 assert_eq!(
1061 graph.run_with_input([1, 2, 3]).unwrap(),
1062 vec![(1, 10), (2, 20), (3, 30)]
1063 );
1064 }
1065
1066 #[test]
1072 fn unzip_merge_sorted_swapped_inlets_still_sorted() {
1073 let graph_normal = GraphDsl::try_create(|builder| {
1074 let unzip = builder.add(Unzip::<i32, i32>::new());
1075 let merge = builder.add(MergeSorted::<i32>::new());
1076 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1077 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1078 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1079 })
1080 .unwrap();
1081
1082 let graph_swapped = GraphDsl::try_create(|builder| {
1084 let unzip = builder.add(Unzip::<i32, i32>::new());
1085 let merge = builder.add(MergeSorted::<i32>::new());
1086 builder.connect(unzip.out0(), merge.inlet(1)?)?;
1087 builder.connect(unzip.out1(), merge.inlet(0)?)?;
1088 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1089 })
1090 .unwrap();
1091
1092 let input = vec![(1, 2), (4, 3), (6, 5)];
1093 let expected = vec![1, 2, 3, 4, 5, 6];
1094 assert_eq!(
1095 graph_normal.run_with_input(input.clone()).unwrap(),
1096 expected
1097 );
1098 assert_eq!(graph_swapped.run_with_input(input).unwrap(), expected);
1099 }
1100
1101 #[test]
1105 fn unzip_merge_latest_swapped_inlets_correct_snapshot_order() {
1106 let graph_normal = GraphDsl::try_create(|builder| {
1109 let unzip = builder.add(Unzip::<i32, i32>::new());
1110 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1111 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1112 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1113 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1114 })
1115 .unwrap();
1116
1117 let graph_swapped = GraphDsl::try_create(|builder| {
1121 let unzip = builder.add(Unzip::<i32, i32>::new());
1122 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1123 builder.connect(unzip.out0(), merge.inlet(1)?)?;
1124 builder.connect(unzip.out1(), merge.inlet(0)?)?;
1125 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1126 })
1127 .unwrap();
1128
1129 assert_eq!(
1130 graph_normal.run_with_input([(1, 10)]).unwrap(),
1131 vec![vec![1, 10]]
1132 );
1133 assert_eq!(
1135 graph_swapped.run_with_input([(1, 10)]).unwrap(),
1136 vec![vec![10, 1]]
1137 );
1138 }
1139
1140 #[test]
1144 fn merge_sequence_fails_on_gap_at_completion() {
1145 let graph = GraphDsl::try_create(|builder| {
1149 let unzip = builder.add(Unzip::<u64, u64>::new());
1150 let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
1151 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1152 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1153 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1154 })
1155 .unwrap();
1156
1157 let result = graph.run_with_input([(1u64, 2u64)]);
1158 assert!(
1159 matches!(result, Err(StreamError::Failed(ref msg)) if msg.contains("expected sequence")),
1160 "expected a sequence-gap error, got: {result:?}"
1161 );
1162 }
1163
1164 #[test]
1167 fn merge_latest_eager_complete_closes_on_first_inlet_done() {
1168 let graph_eager = GraphDsl::try_create(|builder| {
1173 let unzip = builder.add(Unzip::<i32, i32>::new());
1174 let merge = builder.add(MergeLatest::<i32>::new(2, true));
1175 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1176 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1177 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1178 })
1179 .unwrap();
1180
1181 let graph_non_eager = GraphDsl::try_create(|builder| {
1182 let unzip = builder.add(Unzip::<i32, i32>::new());
1183 let merge = builder.add(MergeLatest::<i32>::new(2, false));
1184 builder.connect(unzip.out0(), merge.inlet(0)?)?;
1185 builder.connect(unzip.out1(), merge.inlet(1)?)?;
1186 Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
1187 })
1188 .unwrap();
1189
1190 let result_eager = graph_eager.run_with_input([(1i32, 10i32)]).unwrap();
1194 let result_non_eager = graph_non_eager.run_with_input([(1i32, 10i32)]).unwrap();
1195
1196 assert!(!result_eager.is_empty(), "eager graph produced no output");
1198 assert!(
1199 !result_non_eager.is_empty(),
1200 "non-eager graph produced no output"
1201 );
1202
1203 assert_eq!(result_eager, result_non_eager);
1206 }
1207}