use std::{
any::{Any, TypeId, type_name},
collections::{HashMap, HashSet, VecDeque},
fmt,
marker::PhantomData,
sync::{
Arc, Mutex, OnceLock,
atomic::{AtomicUsize, Ordering},
},
thread,
};
#[cfg(test)]
use std::sync::mpsc;
use crate::{
actor::{Actor, ActorProcessingErr, ActorRef},
stream::{StreamError, StreamResult},
};
type DatumValue = Box<dyn DatumElement>;
type StageMapFn = dyn Fn(DatumValue) -> StreamResult<DatumValue> + Send + Sync;
type StageTypedMapFn = dyn Any + Send + Sync;
type StageZipFn = dyn Fn(DatumValue, DatumValue) -> StreamResult<DatumValue> + Send + Sync;
type StageUnzipFn = dyn Fn(DatumValue) -> (DatumValue, DatumValue) + Send + Sync;
type StageTypedUnzipFn = dyn Any + Send + Sync;
type StageCompareFn = dyn Fn(&DatumValue, &DatumValue) -> std::cmp::Ordering + Send + Sync;
type StageSequenceFn = dyn Fn(&DatumValue) -> u64 + Send + Sync;
type StageTypedSequenceFn = dyn Any + Send + Sync;
type StageSnapshotFn = dyn Fn(&[&DatumValue]) -> DatumValue + Send + Sync;
type StageTypedSnapshotFn = dyn Any + Send + Sync;
type StagePartitionFn = dyn Fn(&DatumValue) -> usize + Send + Sync;
#[derive(Clone)]
struct StageMapFns {
erased: Arc<StageMapFn>,
typed: Arc<StageTypedMapFn>,
}
pub(crate) trait DatumElement: Any + Send {
fn clone_box(&self) -> DatumValue;
fn into_any(self: Box<Self>) -> Box<dyn Any + Send>;
fn as_any_ref(&self) -> &dyn Any;
}
impl<T> DatumElement for T
where
T: Any + Clone + Send,
{
fn clone_box(&self) -> DatumValue {
Box::new(self.clone())
}
fn into_any(self: Box<Self>) -> Box<dyn Any + Send> {
self
}
fn as_any_ref(&self) -> &dyn Any {
self
}
}
fn datum<T>(value: T) -> DatumValue
where
T: Clone + Send + 'static,
{
Box::new(value)
}
fn downcast_datum<T, S>(
value: DatumValue,
operation: &'static str,
port: impl FnOnce() -> S,
) -> StreamResult<T>
where
T: Send + 'static,
S: Into<String>,
{
value
.into_any()
.downcast::<T>()
.map(|value| *value)
.map_err(|_| StreamError::InvalidPortOperation {
operation,
port: port().into(),
reason: format!("element type did not match {}", type_name::<T>()),
})
}
static NEXT_PORT_ID: AtomicUsize = AtomicUsize::new(1);
fn next_port_id() -> PortId {
PortId(NEXT_PORT_ID.fetch_add(1, Ordering::Relaxed))
}
fn next_port_id_block(count: usize) -> PortId {
debug_assert!(count > 0);
PortId(NEXT_PORT_ID.fetch_add(count, Ordering::Relaxed))
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PortId(usize);
impl PortId {
#[must_use]
pub const fn as_usize(self) -> usize {
self.0
}
const fn offset(self, offset: usize) -> Self {
Self(self.0 + offset)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum PortKind {
Inlet,
Outlet,
}
macro_rules! shared_name {
($fn_name:ident, $value:literal) => {
fn $fn_name() -> Arc<str> {
static NAME: OnceLock<Arc<str>> = OnceLock::new();
Arc::clone(NAME.get_or_init(|| Arc::from($value)))
}
};
}
shared_name!(identity_stage_name, "Identity");
shared_name!(identity_inlet_name, "Identity.in");
shared_name!(identity_outlet_name, "Identity.out");
shared_name!(map_stage_name, "Map");
shared_name!(map_inlet_name, "Map.in");
shared_name!(map_outlet_name, "Map.out");
shared_name!(broadcast_stage_name, "Broadcast");
shared_name!(broadcast_inlet_name, "Broadcast.in");
shared_name!(balance_stage_name, "Balance");
shared_name!(balance_inlet_name, "Balance.in");
shared_name!(merge_stage_name, "Merge");
shared_name!(merge_outlet_name, "Merge.out");
shared_name!(merge_preferred_stage_name, "MergePreferred");
shared_name!(merge_preferred_preferred_name, "MergePreferred.preferred");
shared_name!(merge_preferred_outlet_name, "MergePreferred.out");
shared_name!(merge_prioritized_stage_name, "MergePrioritized");
shared_name!(merge_prioritized_outlet_name, "MergePrioritized.out");
shared_name!(concat_stage_name, "Concat");
shared_name!(concat_outlet_name, "Concat.out");
shared_name!(or_else_stage_name, "OrElse");
shared_name!(or_else_primary_name, "OrElse.primary");
shared_name!(or_else_secondary_name, "OrElse.secondary");
shared_name!(or_else_outlet_name, "OrElse.out");
shared_name!(interleave_stage_name, "Interleave");
shared_name!(interleave_outlet_name, "Interleave.out");
shared_name!(zip_stage_name, "Zip");
shared_name!(zip_in0_name, "Zip.in0");
shared_name!(zip_in1_name, "Zip.in1");
shared_name!(zip_outlet_name, "Zip.out");
shared_name!(async_boundary_stage_name, "AsyncBoundary");
shared_name!(async_boundary_inlet_name, "AsyncBoundary.in");
shared_name!(async_boundary_outlet_name, "AsyncBoundary.out");
mod builder;
mod executor;
mod junctions;
mod ports;
mod shapes;
mod stage;
#[cfg(test)]
use self::executor::BoundaryCountExecutor;
use self::{builder::StageRecord, shapes::PortAllocator, stage::StageKind};
pub use self::{
builder::{
AsyncBoundaryExecutionConfig, FusedExecutionConfig, FusedExecutionReport, FusedSegment,
FusedTerminalReport, Graph, GraphBlueprint, GraphBuilder, GraphDsl, ImportedGraph,
PartialGraph,
},
junctions::{
AsyncBoundary, Balance, Broadcast, Concat, Identity, Interleave, MapStage, Merge,
MergeLatest, MergePreferred, MergePrioritized, MergeSequence, MergeSorted, OrElse,
Partition, Unzip, UnzipWith, Zip,
},
ports::{AnyInlet, AnyOutlet, Inlet, Outlet, PortRef},
shapes::{
BidiShape, FanInShape, FanOutShape, FanOutShape2, FlowShape, MergePreferredShape, Shape,
SinkShape, SourceShape, ZipShape,
},
stage::{AsyncCallback, GraphStage, GraphStageLogic, InHandler, OutHandler, StageSpec},
};
#[cfg(test)]
mod tests {
use super::*;
use crate::Attributes;
#[test]
fn graph_dsl_builds_broadcast_zip_flow() {
let graph = GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<i32>::new(2));
let zip = builder.add(Zip::<i32, i32>::new());
builder.connect(broadcast.outlet(0)?, zip.in0())?;
builder.connect(broadcast.outlet(1)?, zip.in1())?;
Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
})
.unwrap();
assert_eq!(graph.stage_count(), 2);
assert_eq!(graph.edge_count(), 2);
assert_eq!(graph.shape().inlets().len(), 1);
assert_eq!(graph.shape().outlets().len(), 1);
assert_eq!(
graph.run_with_input([1, 2, 3]).unwrap(),
vec![(1, 1), (2, 2), (3, 3)]
);
}
#[test]
fn graph_dsl_zip_slots_follow_inlet_ids() {
let graph = GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<i32>::new(2));
let identity = builder.add(Identity::<i32>::new());
let zip = builder.add(Zip::<i32, i32>::new());
builder.connect(broadcast.outlet(0)?, identity.inlet())?;
builder.connect(identity.outlet(), zip.in1())?;
builder.connect(broadcast.outlet(1)?, zip.in0())?;
Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
})
.unwrap();
assert_eq!(
graph.run_with_input([1, 2, 3]).unwrap(),
vec![(1, 1), (2, 2), (3, 3)]
);
}
#[test]
fn graph_dsl_zip_buffers_skewed_inlet_arrivals() {
let graph = GraphDsl::try_create(|builder| {
let fan = builder.add(Broadcast::<i32>::new(2));
let doubler = builder.add(Broadcast::<i32>::new(2));
let merge = builder.add(Merge::<i32>::new(2));
let zip = builder.add(Zip::<i32, i32>::new());
builder.connect(fan.outlet(0)?, doubler.inlet())?;
builder.connect(doubler.outlet(0)?, merge.inlet(0)?)?;
builder.connect(doubler.outlet(1)?, merge.inlet(1)?)?;
builder.connect(merge.outlet(), zip.in0())?;
builder.connect(fan.outlet(1)?, zip.in1())?;
Ok(FlowShape::new(fan.inlet(), zip.outlet()))
})
.unwrap();
assert_eq!(
graph.run_with_input([10, 20]).unwrap(),
vec![(10, 10), (10, 20)]
);
}
#[test]
fn graph_dsl_rejects_cycles() {
let graph = GraphDsl::try_create(|builder| {
let merge = builder.add(Merge::<i32>::new(2));
let broadcast = builder.add(Broadcast::<i32>::new(2));
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
});
let Err(StreamError::GraphValidation(message)) = graph else {
panic!("cyclic graph should be rejected");
};
assert!(message.contains("WP-16"));
assert!(message.contains("demand-aware graph interpreter"));
}
#[test]
fn partial_graph_can_be_imported_with_its_shape() {
let partial = GraphDsl::partial(|builder| {
let first = builder.add(Identity::<i32>::new());
let second = builder.add_named(Identity::<i32>::new(), "partial.tail");
builder.connect(first.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
})
.named("partial.identity");
let graph = GraphDsl::try_create(|builder| {
let imported = builder.import(&partial)?;
let after = builder.add(Identity::<i32>::new());
builder.connect(imported.outlet(), after.inlet())?;
Ok(FlowShape::new(imported.inlet(), after.outlet()))
})
.unwrap()
.named("outer.graph");
assert_eq!(graph.run_with_input([1, 2, 3]).unwrap(), vec![1, 2, 3]);
assert_eq!(graph.attributes().name(), Some("outer.graph"));
}
#[test]
fn graph_attributes_follow_innermost_wins_order() {
let graph = GraphDsl::create(|builder| {
builder.add_with_attributes(
Identity::<i32>::new(),
Attributes::named("stage-outer").and(Attributes::named("stage-inner")),
)
})
.unwrap()
.add_attributes(Attributes::dispatcher("graph-outer"))
.add_attributes(Attributes::dispatcher("graph-inner"));
assert_eq!(graph.attributes().dispatcher_hint(), Some("graph-inner"));
assert_eq!(
graph.stages[0].spec.attributes().name(),
Some("stage-inner")
);
}
#[test]
fn graph_dsl_rejects_invalid_and_incomplete_wiring() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<i32>::new());
let second = builder.add(Identity::<i32>::new());
let third = builder.add(Identity::<i32>::new());
builder.connect(first.outlet(), second.inlet())?;
let duplicate = builder.connect(first.outlet(), third.inlet());
assert!(matches!(duplicate, Err(StreamError::GraphValidation(_))));
Ok(FlowShape::new(first.inlet(), second.outlet()))
});
assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
let graph = GraphDsl::create(|builder| {
let broadcast = builder.add(Broadcast::<i32>::new(2));
SourceShape::new(broadcast.outlet(0).unwrap())
});
assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
}
#[test]
fn graph_dsl_rejects_erased_type_mismatch() {
let graph = GraphDsl::try_create(|builder| {
let left = builder.add(Identity::<i32>::new());
let right = builder.add(Identity::<u64>::new());
let mismatch = builder.connect_any(left.outlet().erase(), right.inlet().erase());
assert!(matches!(mismatch, Err(StreamError::GraphValidation(_))));
Ok(FlowShape::new(left.inlet(), right.outlet()))
});
assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
}
#[test]
fn graph_dsl_rejects_result_ports_with_spoofed_metadata() {
let graph = GraphDsl::create(|builder| {
let shape = builder.add(Identity::<i32>::new());
FlowShape::new(
Inlet::<u64>::with_id(shape.inlet().id(), "Identity.in"),
shape.outlet(),
)
});
assert!(matches!(graph, Err(StreamError::GraphValidation(_))));
}
#[test]
fn allocated_ports_do_not_collide_with_manual_ports() {
let manual = Inlet::<i32>::new("manual");
let graph = GraphDsl::try_create(|builder| {
let shape = builder.add(Identity::<i32>::new());
assert_ne!(manual.id(), shape.inlet().id());
Ok(shape)
});
assert!(graph.is_ok());
}
#[test]
fn identity_ports_keep_static_names_and_unique_ids() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<i32>::new());
let second = builder.add(Identity::<i32>::new());
assert_eq!(first.inlet().name(), "Identity.in");
assert_eq!(first.outlet().name(), "Identity.out");
assert_eq!(second.inlet().name(), "Identity.in");
assert_eq!(second.outlet().name(), "Identity.out");
assert_ne!(first.inlet().id(), first.outlet().id());
assert_ne!(first.outlet().id(), second.inlet().id());
assert_ne!(second.inlet().id(), second.outlet().id());
builder.connect(first.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
});
assert!(graph.is_ok());
}
#[test]
fn graph_stage_logic_checks_port_operations() {
let shape = FlowShape::new(Inlet::<i32>::new("in"), Outlet::<i32>::new("out"));
let inlet = shape.inlet();
let outlet = shape.outlet();
let mut logic = GraphStageLogic::new(&shape);
assert!(matches!(
logic.grab(&inlet),
Err(StreamError::InvalidPortOperation { .. })
));
logic.pull(&inlet).unwrap();
assert!(logic.has_been_pulled(&inlet));
assert!(matches!(
logic.pull(&inlet),
Err(StreamError::InvalidPortOperation { .. })
));
logic.offer(&inlet, 41).unwrap();
assert!(!logic.has_been_pulled(&inlet));
assert_eq!(logic.grab(&inlet).unwrap(), 41);
assert!(matches!(
logic.push(&outlet, 1),
Err(StreamError::InvalidPortOperation { .. })
));
logic.request(&outlet).unwrap();
assert!(logic.is_available(&outlet));
logic.push(&outlet, 42).unwrap();
assert!(!logic.is_available(&outlet));
logic.complete(&outlet).unwrap();
assert!(logic.is_closed(&outlet));
assert!(matches!(
logic.request(&outlet),
Err(StreamError::InvalidPortOperation { .. })
));
}
#[test]
fn fused_execution_enforces_event_limit() {
let graph = GraphDsl::create(|builder| builder.add(Identity::<i32>::new())).unwrap();
let result = graph.run_with_input_report([1, 2], FusedExecutionConfig { event_limit: 1 });
assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 1 }));
}
#[test]
fn async_boundary_splits_fused_segments() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<i32>::new());
let boundary = builder.add(AsyncBoundary::<i32>::new());
let second = builder.add(MapStage::new(|item: i32| item + 1));
builder.connect(first.outlet(), boundary.inlet())?;
builder.connect(boundary.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
})
.unwrap();
assert_eq!(graph.segments().len(), 3);
let report = graph
.run_with_input_report([1, 2], FusedExecutionConfig::default())
.unwrap();
assert_eq!(report.output, vec![2, 3]);
assert_eq!(report.async_boundary_crossings, 2);
}
#[test]
fn async_boundary_count_path_uses_ractor_handoff_segments() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: i32| item + 1));
let boundary = builder.add(AsyncBoundary::<i32>::new());
let second = builder.add(MapStage::new(|item: i32| item * 2));
builder.connect(first.outlet(), boundary.inlet())?;
builder.connect(boundary.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
})
.unwrap();
let report = graph
.run_async_boundary_count_with_input_report(
[1, 2, 3],
AsyncBoundaryExecutionConfig {
fused: FusedExecutionConfig::default(),
buffer_size: 2,
},
)
.unwrap();
assert_eq!(report.result, 3);
assert_eq!(report.async_boundary_crossings, 3);
assert_eq!(report.events, 18);
}
#[test]
fn threaded_async_boundary_baseline_matches_ractor_count_path() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: i32| item + 1));
let boundary = builder.add(AsyncBoundary::<i32>::new());
let second = builder.add(MapStage::new(|item: i32| item * 2));
builder.connect(first.outlet(), boundary.inlet())?;
builder.connect(boundary.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
})
.unwrap();
let config = AsyncBoundaryExecutionConfig {
fused: FusedExecutionConfig::default(),
buffer_size: 2,
};
let ractor_report = graph
.run_async_boundary_count_with_input_report([1, 2, 3], config)
.unwrap();
let threaded_report = BoundaryCountExecutor::Threaded
.run_count(
[1, 2, 3],
graph.typed_linear_async_segments().unwrap(),
config,
)
.unwrap();
assert_eq!(threaded_report, ractor_report);
}
#[test]
fn ractor_async_boundary_rejects_zero_buffer_size() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: i32| item + 1));
let boundary = builder.add(AsyncBoundary::<i32>::new());
let second = builder.add(MapStage::new(|item: i32| item * 2));
builder.connect(first.outlet(), boundary.inlet())?;
builder.connect(boundary.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
})
.unwrap();
let result = BoundaryCountExecutor::Ractor.run_count(
[1, 2, 3],
graph.typed_linear_async_segments().unwrap(),
AsyncBoundaryExecutionConfig {
fused: FusedExecutionConfig::default(),
buffer_size: 0,
},
);
assert!(matches!(
result,
Err(StreamError::GraphValidation(message)) if message.contains("buffer_size")
));
}
#[test]
fn ractor_async_boundary_streams_input_without_eager_collection() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: i32| item + 1));
let boundary = builder.add(AsyncBoundary::<i32>::new());
let second = builder.add(MapStage::new(|item: i32| item * 2));
builder.connect(first.outlet(), boundary.inlet())?;
builder.connect(boundary.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
})
.unwrap();
let result = BoundaryCountExecutor::Ractor.run_count(
std::iter::repeat(1_i32),
graph.typed_linear_async_segments().unwrap(),
AsyncBoundaryExecutionConfig {
fused: FusedExecutionConfig { event_limit: 4 },
buffer_size: 1,
},
);
assert_eq!(result, Err(StreamError::EventLimitExceeded { limit: 4 }));
}
#[test]
fn ractor_async_boundary_runs_inside_existing_tokio_runtime() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: i32| item + 1));
let boundary = builder.add(AsyncBoundary::<i32>::new());
let second = builder.add(MapStage::new(|item: i32| item * 2));
builder.connect(first.outlet(), boundary.inlet())?;
builder.connect(boundary.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
})
.unwrap();
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let report = runtime
.block_on(async {
BoundaryCountExecutor::Ractor.run_count(
[1, 2, 3],
graph.typed_linear_async_segments().unwrap(),
AsyncBoundaryExecutionConfig {
fused: FusedExecutionConfig::default(),
buffer_size: 2,
},
)
})
.unwrap();
assert_eq!(report.result, 3);
assert_eq!(report.async_boundary_crossings, 3);
assert_eq!(report.events, 18);
}
#[test]
fn balance_merge_round_robins_through_junctions() {
let graph = GraphDsl::try_create(|builder| {
let balance = builder.add(Balance::<i32>::new(2));
let merge = builder.add(Merge::<i32>::new(2));
builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(balance.inlet(), merge.outlet()))
})
.unwrap();
assert_eq!(graph.run_with_input(0..6).unwrap(), vec![0, 1, 2, 3, 4, 5]);
}
#[test]
fn prioritized_merge_uses_weighted_schedule() {
let graph =
GraphDsl::create(|builder| builder.add(MergePrioritized::<i32>::new(vec![2, 1])))
.unwrap();
assert_eq!(
graph
.run_fan_in(vec![vec![1, 2, 3, 4], vec![100, 101]])
.unwrap(),
vec![1, 2, 100, 3, 4, 101]
);
}
#[test]
fn fused_execution_supports_count_and_fold_sinks() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: u64| item + 1));
let second = builder.add(MapStage::new(|item: u64| item * 2));
builder.connect(first.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
})
.unwrap();
assert_eq!(graph.run_count_with_input(0..4).unwrap(), 4);
assert_eq!(
graph
.run_fold_with_input(0..4, 0, |acc, item| acc + item)
.unwrap(),
20
);
}
#[test]
fn typed_linear_fast_path_runs_same_type_chains() {
let graph = GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|item: u64| item + 1));
let second = builder.add(MapStage::new(|item: u64| item * 2));
builder.connect(first.outlet(), second.inlet())?;
Ok(FlowShape::new(first.inlet(), second.outlet()))
})
.unwrap();
let report = graph
.run_typed_linear_with_input_report([1, 2, 3], FusedExecutionConfig::default())
.unwrap();
assert_eq!(report.output, vec![4, 6, 8]);
assert_eq!(report.events, 12);
assert_eq!(graph.run_typed_linear_count_with_input(0..4).unwrap(), 4);
assert_eq!(
graph
.run_typed_linear_fold_with_input(0..4, 0, |acc, item| acc + item)
.unwrap(),
20
);
}
#[test]
fn typed_linear_fast_path_rejects_junction_graphs() {
let graph = GraphDsl::try_create(|builder| {
let balance = builder.add(Balance::<i32>::new(2));
let merge = builder.add(Merge::<i32>::new(2));
builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(balance.inlet(), merge.outlet()))
})
.unwrap();
assert!(matches!(
graph.run_typed_linear_count_with_input([1, 2, 3]),
Err(StreamError::GraphValidation(_))
));
}
#[test]
fn merge_preferred_drains_preferred_before_secondaries() {
let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i32>::new(2))).unwrap();
assert_eq!(
graph
.run_merge_preferred(vec![1, 2, 3], vec![vec![100, 101], vec![200]])
.unwrap(),
vec![1, 2, 3, 100, 200, 101]
);
}
#[test]
fn merge_waits_for_all_inputs_to_complete() {
let graph = GraphDsl::create(|builder| builder.add(Merge::<i32>::new(2))).unwrap();
assert_eq!(
graph.run_fan_in(vec![vec![], vec![10, 20]]).unwrap(),
vec![10, 20]
);
assert_eq!(graph.run_fan_in(vec![vec![1], vec![]]).unwrap(), vec![1]);
}
#[test]
fn merge_sorted_drains_remaining_input_after_peer_completes() {
let graph = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<i32, i32>::new());
let merge = builder.add(MergeSorted::<i32>::new());
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
assert_eq!(
graph.run_with_input([(1, 2), (4, 3), (6, 5)]).unwrap(),
vec![1, 2, 3, 4, 5, 6]
);
}
#[test]
fn merge_sequence_reorders_adversarial_arrivals_by_sequence_number() {
let graph = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
assert_eq!(
graph.run_with_input([(0, 1), (2, 3), (4, 5)]).unwrap(),
vec![0, 1, 2, 3, 4, 5]
);
}
#[test]
fn merge_latest_emits_with_last_seen_peer_and_honors_eager_complete() {
let graph = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<i32, i32>::new());
let merge = builder.add(MergeLatest::<i32>::new(2, false));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
assert_eq!(
graph.run_with_input([(1, 10), (2, 11)]).unwrap(),
vec![vec![1, 10], vec![2, 10], vec![2, 11]]
);
}
#[test]
fn zip_completes_when_any_input_completes() {
let graph = GraphDsl::create(|builder| builder.add(Zip::<i32, i32>::new())).unwrap();
assert_eq!(
graph.run_zip(vec![1, 2, 3], vec![10]).unwrap(),
vec![(1, 10)]
);
assert_eq!(graph.run_zip(vec![1], vec![10, 20]).unwrap(), vec![(1, 10)]);
assert_eq!(
graph.run_zip(vec![], vec![10, 20]).unwrap(),
Vec::<(i32, i32)>::new()
);
}
#[test]
fn concat_drains_inputs_in_declared_order() {
let graph = GraphDsl::create(|builder| builder.add(Concat::<i32>::new(3))).unwrap();
assert_eq!(
graph
.run_concat(vec![vec![1, 2], vec![], vec![3, 4]])
.unwrap(),
vec![1, 2, 3, 4]
);
}
#[test]
fn or_else_switches_only_if_primary_is_empty() {
let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
assert_eq!(
graph.run_or_else(vec![], vec![10, 20]).unwrap(),
vec![10, 20]
);
assert_eq!(
graph.run_or_else(vec![1, 2], vec![10, 20]).unwrap(),
vec![1, 2]
);
}
#[test]
fn or_else_secondary_first_dropped_when_primary_emits() {
let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
assert_eq!(
graph
.run_or_else_secondary_first(vec![1, 2], vec![10, 20])
.unwrap(),
vec![1, 2]
);
}
#[test]
fn or_else_secondary_first_flushed_when_primary_empty() {
let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
assert_eq!(
graph
.run_or_else_secondary_first(vec![], vec![10, 20])
.unwrap(),
vec![10, 20]
);
}
#[test]
fn or_else_secondary_closed_then_primary_empty_drains_buffer() {
let graph = GraphDsl::create(|builder| builder.add(OrElse::<i32>::new())).unwrap();
assert_eq!(
graph
.run_or_else_secondary_closed_first(vec![10, 20])
.unwrap(),
vec![10, 20]
);
}
#[test]
fn interleave_cycles_segment_sized_chunks() {
let graph = GraphDsl::create(|builder| builder.add(Interleave::<i32>::new(3, 2))).unwrap();
assert_eq!(
graph
.run_interleave(vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]], 2, false)
.unwrap(),
vec![1, 2, 10, 11, 20, 3, 12]
);
}
#[test]
fn interleave_eager_close_stops_when_any_input_completes() {
let graph = GraphDsl::create(|builder| {
builder.add(Interleave::<i32>::new_with_eager_close(2, 1, true))
})
.unwrap();
assert_eq!(
graph
.run_interleave(vec![vec![1, 2], vec![]], 1, true)
.unwrap(),
Vec::<i32>::new()
);
assert_eq!(
graph
.run_interleave(vec![vec![1], vec![10, 11]], 1, true)
.unwrap(),
vec![1, 10]
);
}
#[test]
fn partition_routes_only_live_outlets_after_peer_cancels() {
let graph = GraphDsl::try_create(|builder| {
let partition = builder.add(Partition::<i32>::new(2, |item| (*item % 2) as usize));
let merge = builder.add(Merge::<i32>::new(2));
builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(partition.inlet(), merge.outlet()))
})
.unwrap();
assert_eq!(
graph.run_with_input([0, 1, 2, 3]).unwrap(),
vec![0, 1, 2, 3]
);
}
#[test]
fn unzip_with_keeps_live_outlet_running_after_peer_finishes() {
let graph = GraphDsl::try_create(|builder| {
let unzip = builder.add(UnzipWith::<i32, i32, i32>::new(|item| (item, item * 10)));
let zip = builder.add(Zip::<i32, i32>::new());
builder.connect(unzip.out0(), zip.in0())?;
builder.connect(unzip.out1(), zip.in1())?;
Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
})
.unwrap();
assert_eq!(
graph.run_with_input([1, 2, 3]).unwrap(),
vec![(1, 10), (2, 20), (3, 30)]
);
}
#[test]
fn unzip_merge_sorted_swapped_inlets_still_sorted() {
let graph_normal = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<i32, i32>::new());
let merge = builder.add(MergeSorted::<i32>::new());
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
let graph_swapped = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<i32, i32>::new());
let merge = builder.add(MergeSorted::<i32>::new());
builder.connect(unzip.out0(), merge.inlet(1)?)?;
builder.connect(unzip.out1(), merge.inlet(0)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
let input = vec![(1, 2), (4, 3), (6, 5)];
let expected = vec![1, 2, 3, 4, 5, 6];
assert_eq!(
graph_normal.run_with_input(input.clone()).unwrap(),
expected
);
assert_eq!(graph_swapped.run_with_input(input).unwrap(), expected);
}
#[test]
fn unzip_merge_latest_swapped_inlets_correct_snapshot_order() {
let graph_normal = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<i32, i32>::new());
let merge = builder.add(MergeLatest::<i32>::new(2, false));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
let graph_swapped = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<i32, i32>::new());
let merge = builder.add(MergeLatest::<i32>::new(2, false));
builder.connect(unzip.out0(), merge.inlet(1)?)?;
builder.connect(unzip.out1(), merge.inlet(0)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
assert_eq!(
graph_normal.run_with_input([(1, 10)]).unwrap(),
vec![vec![1, 10]]
);
assert_eq!(
graph_swapped.run_with_input([(1, 10)]).unwrap(),
vec![vec![10, 1]]
);
}
#[test]
fn merge_sequence_fails_on_gap_at_completion() {
let graph = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
let result = graph.run_with_input([(1u64, 2u64)]);
assert!(
matches!(result, Err(StreamError::Failed(ref msg)) if msg.contains("expected sequence")),
"expected a sequence-gap error, got: {result:?}"
);
}
#[test]
fn merge_latest_eager_complete_closes_on_first_inlet_done() {
let graph_eager = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<i32, i32>::new());
let merge = builder.add(MergeLatest::<i32>::new(2, true));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
let graph_non_eager = GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<i32, i32>::new());
let merge = builder.add(MergeLatest::<i32>::new(2, false));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap();
let result_eager = graph_eager.run_with_input([(1i32, 10i32)]).unwrap();
let result_non_eager = graph_non_eager.run_with_input([(1i32, 10i32)]).unwrap();
assert!(!result_eager.is_empty(), "eager graph produced no output");
assert!(
!result_non_eager.is_empty(),
"non-eager graph produced no output"
);
assert_eq!(result_eager, result_non_eager);
}
}