use super::*;
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) enum ExecutorMode {
#[default]
Auto,
ErasedOnly,
TypedOnly,
}
impl<In, Out> GraphBlueprint<FlowShape<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
pub fn run_with_input<I>(&self, input: I) -> StreamResult<Vec<Out>>
where
I: IntoIterator<Item = In>,
{
Ok(self
.run_with_input_report(input, FusedExecutionConfig::default())?
.output)
}
pub fn run_with_input_report<I>(
&self,
input: I,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<Out>>
where
I: IntoIterator<Item = In>,
{
self.run_with_input_report_mode(input, config, ExecutorMode::Auto)
}
pub(crate) fn run_with_input_report_mode<I>(
&self,
input: I,
config: FusedExecutionConfig,
mode: ExecutorMode,
) -> StreamResult<FusedExecutionReport<Out>>
where
I: IntoIterator<Item = In>,
{
if mode != ExecutorMode::ErasedOnly {
let linear_plan = try_typed_flow_plan::<In, Out>(
&self.stages,
&self.edges,
self.shape.inlet().id(),
self.shape.outlet().id(),
);
if let Some(plan) = linear_plan {
let input = input.into_iter();
let mut output = Vec::with_capacity(input.size_hint().0);
let mut events = 0usize;
let mut async_boundary_crossings = 0usize;
for item in input {
let out =
plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
output.push(out);
}
return Ok(FusedExecutionReport {
output,
events,
async_boundary_crossings,
});
}
let inlet_id = self.shape.inlet().id();
let outlet_id = self.shape.outlet().id();
if let Some(runner) = try_build_typed_acyclic_junction_dispatch::<In, Out>(
&self.stages,
&self.edges,
inlet_id,
outlet_id,
) {
let mut input_iter = input.into_iter();
let output = runner(&mut input_iter)?;
return Ok(FusedExecutionReport {
output,
events: 0,
async_boundary_crossings: 0,
});
}
let ms_plan = try_typed_merge_sequence_plan::<In, Out>(
&self.stages,
&self.edges,
self.shape.inlet().id(),
self.shape.outlet().id(),
);
if let Some(mut plan) = ms_plan {
let output = run_typed_merge_sequence(&mut plan, input)?;
return Ok(FusedExecutionReport {
output,
events: 0,
async_boundary_crossings: 0,
});
}
if let Some(runner) = try_build_typed_merge_latest_dispatch::<In, Out>(
&self.stages,
&self.edges,
inlet_id,
outlet_id,
) {
let mut input_iter = input.into_iter();
let output = runner(&mut input_iter)?;
return Ok(FusedExecutionReport {
output,
events: 0,
async_boundary_crossings: 0,
});
}
if let Some(runner) = try_build_typed_cyclic_feedback_dispatch::<In, Out>(
&self.stages,
&self.edges,
inlet_id,
outlet_id,
) {
let mut input_iter = input.into_iter();
return runner(&mut input_iter, config);
}
if mode == ExecutorMode::TypedOnly {
return Err(StreamError::GraphValidation(
"typed executor does not support this graph shape".into(),
));
}
}
let input = input.into_iter();
let mut executor = FusedExecutor::new(self, config);
let inlet = self.shape.inlet().id();
let outlet = self.shape.outlet().id();
let mut output = Vec::with_capacity(input.size_hint().0);
{
let mut output_sink = VecOutputSink {
output: &mut output,
};
executor.request(outlet, outlet, &mut output_sink)?;
for item in input {
executor.deliver(inlet, datum(item), outlet, &mut output_sink)?;
}
executor.complete(inlet, outlet, &mut output_sink)?;
}
Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
pub fn run_count_with_input<I>(&self, input: I) -> StreamResult<usize>
where
I: IntoIterator<Item = In>,
{
Ok(self
.run_count_with_input_report(input, FusedExecutionConfig::default())?
.result)
}
pub fn run_count_with_input_report<I>(
&self,
input: I,
config: FusedExecutionConfig,
) -> StreamResult<FusedTerminalReport<usize>>
where
I: IntoIterator<Item = In>,
{
self.run_count_with_input_report_mode(input, config, ExecutorMode::Auto)
}
pub(crate) fn run_count_with_input_report_mode<I>(
&self,
input: I,
config: FusedExecutionConfig,
mode: ExecutorMode,
) -> StreamResult<FusedTerminalReport<usize>>
where
I: IntoIterator<Item = In>,
{
if mode != ExecutorMode::ErasedOnly {
let plan = try_typed_flow_plan::<In, Out>(
&self.stages,
&self.edges,
self.shape.inlet().id(),
self.shape.outlet().id(),
);
if let Some(plan) = plan {
let mut count = 0usize;
let mut events = 0usize;
let mut async_boundary_crossings = 0usize;
for item in input {
plan.run_item_count(item, config, &mut events, &mut async_boundary_crossings)?;
count += 1;
}
return Ok(FusedTerminalReport {
result: count,
events,
async_boundary_crossings,
});
} else if mode == ExecutorMode::TypedOnly {
return Err(StreamError::GraphValidation(
"typed executor does not support this graph shape".into(),
));
}
}
let mut executor = FusedExecutor::new(self, config);
let inlet = self.shape.inlet().id();
let outlet = self.shape.outlet().id();
let mut output_sink = CountOutputSink { count: 0 };
executor.request::<Out>(outlet, outlet, &mut output_sink)?;
for item in input {
executor.deliver::<Out>(inlet, datum(item), outlet, &mut output_sink)?;
}
executor.complete::<Out>(inlet, outlet, &mut output_sink)?;
Ok(FusedTerminalReport {
result: output_sink.count,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
pub fn run_fold_with_input<I, Acc, F>(&self, input: I, zero: Acc, fold: F) -> StreamResult<Acc>
where
I: IntoIterator<Item = In>,
F: FnMut(Acc, Out) -> Acc,
{
Ok(self
.run_fold_with_input_report(input, zero, fold, FusedExecutionConfig::default())?
.result)
}
pub fn run_fold_with_input_report<I, Acc, F>(
&self,
input: I,
zero: Acc,
fold: F,
config: FusedExecutionConfig,
) -> StreamResult<FusedTerminalReport<Acc>>
where
I: IntoIterator<Item = In>,
F: FnMut(Acc, Out) -> Acc,
{
self.run_fold_with_input_report_mode(input, zero, fold, config, ExecutorMode::Auto)
}
pub(crate) fn run_fold_with_input_report_mode<I, Acc, F>(
&self,
input: I,
zero: Acc,
mut fold: F,
config: FusedExecutionConfig,
mode: ExecutorMode,
) -> StreamResult<FusedTerminalReport<Acc>>
where
I: IntoIterator<Item = In>,
F: FnMut(Acc, Out) -> Acc,
{
if mode != ExecutorMode::ErasedOnly {
let plan = try_typed_flow_plan::<In, Out>(
&self.stages,
&self.edges,
self.shape.inlet().id(),
self.shape.outlet().id(),
);
if let Some(plan) = plan {
let mut accumulator = zero;
let mut events = 0usize;
let mut async_boundary_crossings = 0usize;
for item in input {
let out =
plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
accumulator = fold(accumulator, out);
}
return Ok(FusedTerminalReport {
result: accumulator,
events,
async_boundary_crossings,
});
} else if mode == ExecutorMode::TypedOnly {
return Err(StreamError::GraphValidation(
"typed executor does not support this graph shape".into(),
));
}
}
let mut executor = FusedExecutor::new(self, config);
let inlet = self.shape.inlet().id();
let outlet = self.shape.outlet().id();
let mut output_sink = FoldOutputSink {
accumulator: Some(zero),
fold,
};
executor.request(outlet, outlet, &mut output_sink)?;
for item in input {
executor.deliver(inlet, datum(item), outlet, &mut output_sink)?;
}
executor.complete(inlet, outlet, &mut output_sink)?;
Ok(FusedTerminalReport {
result: output_sink.finish(),
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn run_with_input_mode<I>(
&self,
input: I,
mode: ExecutorMode,
) -> StreamResult<Vec<Out>>
where
I: IntoIterator<Item = In>,
{
Ok(self
.run_with_input_report_mode(input, FusedExecutionConfig::default(), mode)?
.output)
}
#[allow(dead_code)]
pub(crate) fn run_count_with_input_mode<I>(
&self,
input: I,
mode: ExecutorMode,
) -> StreamResult<usize>
where
I: IntoIterator<Item = In>,
{
Ok(self
.run_count_with_input_report_mode(input, FusedExecutionConfig::default(), mode)?
.result)
}
#[allow(dead_code)]
pub(crate) fn run_fold_with_input_mode<I, Acc, F>(
&self,
input: I,
zero: Acc,
fold: F,
mode: ExecutorMode,
) -> StreamResult<Acc>
where
I: IntoIterator<Item = In>,
F: FnMut(Acc, Out) -> Acc,
{
Ok(self
.run_fold_with_input_report_mode(
input,
zero,
fold,
FusedExecutionConfig::default(),
mode,
)?
.result)
}
}
impl<T> GraphBlueprint<FlowShape<T, T>>
where
T: Send + 'static,
{
pub fn run_typed_linear_with_input<I>(&self, input: I) -> StreamResult<Vec<T>>
where
I: IntoIterator<Item = T>,
{
Ok(self
.run_typed_linear_with_input_report(input, FusedExecutionConfig::default())?
.output)
}
pub fn run_typed_linear_with_input_report<I>(
&self,
input: I,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>>
where
I: IntoIterator<Item = T>,
{
let input = input.into_iter();
let plan = self.typed_linear_plan()?;
let mut output = Vec::with_capacity(input.size_hint().0);
let mut events = 0;
let mut async_boundary_crossings = 0;
for item in input {
let item = plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
output.push(item);
}
Ok(FusedExecutionReport {
output,
events,
async_boundary_crossings,
})
}
pub fn run_typed_linear_count_with_input<I>(&self, input: I) -> StreamResult<usize>
where
I: IntoIterator<Item = T>,
{
Ok(self
.run_typed_linear_count_with_input_report(input, FusedExecutionConfig::default())?
.result)
}
pub fn run_typed_linear_count_with_input_report<I>(
&self,
input: I,
config: FusedExecutionConfig,
) -> StreamResult<FusedTerminalReport<usize>>
where
I: IntoIterator<Item = T>,
{
let plan = self.typed_linear_plan()?;
let mut count = 0;
let mut events = 0;
let mut async_boundary_crossings = 0;
for item in input {
let _ = plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
count += 1;
}
Ok(FusedTerminalReport {
result: count,
events,
async_boundary_crossings,
})
}
pub fn run_typed_linear_fold_with_input<I, Acc, F>(
&self,
input: I,
zero: Acc,
fold: F,
) -> StreamResult<Acc>
where
I: IntoIterator<Item = T>,
F: FnMut(Acc, T) -> Acc,
{
Ok(self
.run_typed_linear_fold_with_input_report(
input,
zero,
fold,
FusedExecutionConfig::default(),
)?
.result)
}
pub fn run_typed_linear_fold_with_input_report<I, Acc, F>(
&self,
input: I,
zero: Acc,
mut fold: F,
config: FusedExecutionConfig,
) -> StreamResult<FusedTerminalReport<Acc>>
where
I: IntoIterator<Item = T>,
F: FnMut(Acc, T) -> Acc,
{
let plan = self.typed_linear_plan()?;
let mut accumulator = zero;
let mut events = 0;
let mut async_boundary_crossings = 0;
for item in input {
let item = plan.run_item(item, config, &mut events, &mut async_boundary_crossings)?;
accumulator = fold(accumulator, item);
}
Ok(FusedTerminalReport {
result: accumulator,
events,
async_boundary_crossings,
})
}
pub fn run_async_boundary_count_with_input_report<I>(
&self,
input: I,
config: AsyncBoundaryExecutionConfig,
) -> StreamResult<FusedTerminalReport<usize>>
where
I: IntoIterator<Item = T> + Send,
I::IntoIter: Send + 'static,
{
let segments = self.typed_linear_async_segments()?;
BoundaryCountExecutor::Ractor.run_count(input, segments, config)
}
fn typed_linear_plan(&self) -> StreamResult<TypedLinearPlan<T>> {
let graph_inlet = self.shape.inlet().id();
let graph_outlet = self.shape.outlet().id();
let type_id = TypeId::of::<T>();
let mut current_inlet = graph_inlet;
let mut seen = HashSet::new();
let mut steps = Vec::new();
loop {
let stage_index = self
.stages
.iter()
.position(|stage| {
stage
.spec
.inlets
.iter()
.any(|inlet| inlet.id() == current_inlet)
})
.ok_or_else(|| {
StreamError::GraphValidation(format!(
"typed linear fast path could not find inlet {}",
current_inlet.as_usize()
))
})?;
if !seen.insert(stage_index) {
return Err(StreamError::GraphValidation(
"typed linear fast path does not support cycles".into(),
));
}
let stage = &self.stages[stage_index];
if stage.spec.inlets.len() != 1 || stage.spec.outlets.len() != 1 {
return Err(StreamError::GraphValidation(format!(
"typed linear fast path requires single-inlet single-outlet stages; {} has {} inlet(s) and {} outlet(s)",
stage.spec.name(),
stage.spec.inlets.len(),
stage.spec.outlets.len()
)));
}
let inlet = &stage.spec.inlets[0];
let outlet = &stage.spec.outlets[0];
if inlet.type_id() != type_id || outlet.type_id() != type_id {
return Err(StreamError::GraphValidation(format!(
"typed linear fast path requires every port to use {}",
type_name::<T>()
)));
}
let step = match &stage.spec.kind {
StageKind::Identity | StageKind::Opaque => TypedLinearStep::Pass,
StageKind::AsyncBoundary => TypedLinearStep::AsyncBoundary,
StageKind::Map(map) => {
let mapper = map
.typed
.as_ref()
.downcast_ref::<Arc<dyn Fn(T) -> T + Send + Sync>>()
.ok_or_else(|| {
StreamError::GraphValidation(format!(
"typed linear fast path could not downcast map stage {}",
stage.spec.name()
))
})?;
TypedLinearStep::Map(Arc::clone(mapper))
}
_ => {
return Err(StreamError::GraphValidation(format!(
"typed linear fast path does not support {}",
stage.spec.name()
)));
}
};
steps.push(step);
if outlet.id() == graph_outlet {
break;
}
current_inlet = self
.edges
.iter()
.find_map(|edge| (edge.outlet == outlet.id()).then_some(edge.inlet))
.ok_or_else(|| {
StreamError::GraphValidation(format!(
"typed linear fast path could not follow outlet {}",
outlet.id().as_usize()
))
})?;
}
if seen.len() != self.stages.len() {
return Err(StreamError::GraphValidation(
"typed linear fast path requires all stages to be on the result path".into(),
));
}
Ok(TypedLinearPlan { steps })
}
pub(super) fn typed_linear_async_segments(&self) -> StreamResult<TypedLinearSegments<T>> {
let plan = self.typed_linear_plan()?;
let mut segments = Vec::new();
let mut current = Vec::new();
for step in plan.steps {
match step {
TypedLinearStep::AsyncBoundary => {
segments.push(current);
current = Vec::new();
}
step => current.push(step),
}
}
segments.push(current);
if segments.len() == 1 {
return Err(StreamError::GraphValidation(
"async boundary execution requires at least one AsyncBoundary stage".into(),
));
}
Ok(TypedLinearSegments { segments })
}
}
pub(super) struct TypedLinearPlan<T> {
steps: Vec<TypedLinearStep<T>>,
}
pub(super) struct TypedLinearSegments<T> {
segments: Vec<Vec<TypedLinearStep<T>>>,
}
pub(super) enum TypedLinearStep<T> {
Pass,
Map(Arc<dyn Fn(T) -> T + Send + Sync>),
AsyncBoundary,
}
impl<T> Clone for TypedLinearStep<T> {
fn clone(&self) -> Self {
match self {
Self::Pass => Self::Pass,
Self::Map(mapper) => Self::Map(Arc::clone(mapper)),
Self::AsyncBoundary => Self::AsyncBoundary,
}
}
}
impl<T> TypedLinearPlan<T> {
fn run_item(
&self,
mut item: T,
config: FusedExecutionConfig,
events: &mut usize,
async_boundary_crossings: &mut usize,
) -> StreamResult<T>
where
T: Send + 'static,
{
for step in &self.steps {
bump_fused_event(events, config)?;
match step {
TypedLinearStep::Pass => {}
TypedLinearStep::Map(mapper) => {
item = mapper(item);
}
TypedLinearStep::AsyncBoundary => {
*async_boundary_crossings += 1;
}
}
bump_fused_event(events, config)?;
}
Ok(item)
}
}
#[allow(dead_code)]
pub(crate) struct TypedSlot<T>(Option<T>);
#[allow(dead_code)]
impl<T> TypedSlot<T> {
pub(crate) fn empty() -> Self {
Self(None)
}
pub(crate) fn put(&mut self, value: T) {
self.0 = Some(value);
}
pub(crate) fn take(&mut self) -> Option<T> {
self.0.take()
}
pub(crate) fn is_some(&self) -> bool {
self.0.is_some()
}
}
#[allow(dead_code)]
pub(crate) struct TypedPortRegistry {
slots: HashMap<PortId, Box<dyn Any + Send>>,
}
#[allow(dead_code)]
impl TypedPortRegistry {
pub(crate) fn new() -> Self {
Self {
slots: HashMap::new(),
}
}
pub(crate) fn register<T: Any + Send>(&mut self, port_id: PortId) {
let prev = self
.slots
.insert(port_id, Box::new(TypedSlot::<T>::empty()));
assert!(prev.is_none(), "port {port_id:?} registered twice");
}
pub(crate) fn get_mut<T: Any + Send>(&mut self, port_id: PortId) -> Option<&mut TypedSlot<T>> {
self.slots.get_mut(&port_id)?.downcast_mut::<TypedSlot<T>>()
}
}
#[allow(dead_code)]
pub(crate) trait TypedKernel<In, Out>: Send + Sync {
fn run(&self, input: In) -> Out;
}
#[allow(dead_code)]
pub(crate) trait TypedStageFactory<In, Out>: Send + Sync {
fn try_build(&self, spec: &StageSpec) -> Option<Box<dyn TypedKernel<In, Out>>>;
}
enum TypedMiddleStep<T: 'static> {
Pass,
Map(Arc<dyn Fn(T) -> T + Send + Sync>),
AsyncBoundary,
}
enum TypedLastStep<In: 'static, Out: 'static> {
Map(Arc<dyn Fn(In) -> Out + Send + Sync>),
Identity(Arc<dyn Fn(In) -> Out + Send + Sync>),
}
pub(crate) struct TypedFlowPlan<In: 'static, Out: 'static> {
middle_steps: Vec<TypedMiddleStep<In>>,
last_step: TypedLastStep<In, Out>,
#[allow(dead_code)]
stage_count: usize,
}
impl<In: Send + 'static, Out: Send + 'static> TypedFlowPlan<In, Out> {
pub(crate) fn run_item(
&self,
item: In,
config: FusedExecutionConfig,
events: &mut usize,
async_boundary_crossings: &mut usize,
) -> StreamResult<Out> {
let mut val = item;
for step in &self.middle_steps {
bump_fused_event(events, config)?;
val = match step {
TypedMiddleStep::Pass => val,
TypedMiddleStep::Map(f) => f(val),
TypedMiddleStep::AsyncBoundary => {
*async_boundary_crossings += 1;
val
}
};
bump_fused_event(events, config)?;
}
bump_fused_event(events, config)?;
let out = match &self.last_step {
TypedLastStep::Map(f) => f(val),
TypedLastStep::Identity(f) => f(val),
};
bump_fused_event(events, config)?;
Ok(out)
}
pub(crate) fn run_item_count(
&self,
item: In,
config: FusedExecutionConfig,
events: &mut usize,
async_boundary_crossings: &mut usize,
) -> StreamResult<()> {
let mut val = item;
for step in &self.middle_steps {
bump_fused_event(events, config)?;
val = match step {
TypedMiddleStep::Pass => val,
TypedMiddleStep::Map(f) => f(val),
TypedMiddleStep::AsyncBoundary => {
*async_boundary_crossings += 1;
val
}
};
bump_fused_event(events, config)?;
}
bump_fused_event(events, config)?;
match &self.last_step {
TypedLastStep::Map(f) => {
let _ = f(val);
}
TypedLastStep::Identity(_) => {
drop(val);
}
}
bump_fused_event(events, config)?;
Ok(())
}
}
pub(crate) fn try_typed_flow_plan<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<TypedFlowPlan<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
let in_type_id = TypeId::of::<In>();
let out_type_id = TypeId::of::<Out>();
let mut current_inlet = graph_inlet;
let mut seen = HashSet::new();
let mut stage_infos: Vec<(&StageKind, TypeId)> = Vec::new();
loop {
let stage_index = stages.iter().position(|s| {
s.spec
.inlets
.iter()
.any(|inlet| inlet.id() == current_inlet)
})?;
if !seen.insert(stage_index) {
return None;
}
let stage = &stages[stage_index];
if stage.spec.inlets.len() != 1 || stage.spec.outlets.len() != 1 {
return None;
}
let inlet = &stage.spec.inlets[0];
let outlet = &stage.spec.outlets[0];
if inlet.type_id() != in_type_id {
return None;
}
stage_infos.push((&stage.spec.kind, outlet.type_id()));
if outlet.id() == graph_outlet {
break;
}
current_inlet = edges
.iter()
.find_map(|e| (e.outlet == outlet.id()).then_some(e.inlet))?;
}
if seen.len() != stages.len() {
return None;
}
let (last_kind, last_outlet_type) = stage_infos.last()?;
if *last_outlet_type != out_type_id {
return None;
}
let total = stage_infos.len();
let mut middle_steps: Vec<TypedMiddleStep<In>> = Vec::with_capacity(total.saturating_sub(1));
for (kind, outlet_type) in &stage_infos[..total.saturating_sub(1)] {
if *outlet_type != in_type_id {
return None;
}
let step = match kind {
StageKind::Identity => TypedMiddleStep::Pass,
StageKind::Opaque => return None,
StageKind::AsyncBoundary => TypedMiddleStep::AsyncBoundary,
StageKind::Map(map) => {
let f = map
.typed
.downcast_ref::<Arc<dyn Fn(In) -> In + Send + Sync>>()?;
TypedMiddleStep::Map(Arc::clone(f))
}
_ => return None,
};
middle_steps.push(step);
}
let last_step: TypedLastStep<In, Out> = match last_kind {
StageKind::Identity => {
if in_type_id != out_type_id {
return None;
}
TypedLastStep::Identity(Arc::new(|x: In| -> Out {
let boxed: Box<dyn Any + Send> = Box::new(x);
*boxed
.downcast::<Out>()
.expect("TypeId equality verified at plan time")
}))
}
StageKind::Opaque => return None,
StageKind::AsyncBoundary => {
if in_type_id != out_type_id {
return None;
}
TypedLastStep::Identity(Arc::new(|x: In| -> Out {
let boxed: Box<dyn Any + Send> = Box::new(x);
*boxed
.downcast::<Out>()
.expect("TypeId equality verified at plan time")
}))
}
StageKind::Map(map) => {
let f = map
.typed
.downcast_ref::<Arc<dyn Fn(In) -> Out + Send + Sync>>()?;
TypedLastStep::Map(Arc::clone(f))
}
_ => return None,
};
Some(TypedFlowPlan {
middle_steps,
last_step,
stage_count: total,
})
}
pub(crate) struct MergeSequenceCore<T> {
next_sequence: u64,
pending: Vec<(u64, T)>,
output_buffer: VecDeque<T>,
completed_count: usize,
input_count: usize,
completed: bool,
}
impl<T> MergeSequenceCore<T> {
pub(crate) fn new(input_count: usize) -> Self {
Self {
next_sequence: 0,
pending: Vec::new(),
output_buffer: VecDeque::new(),
completed_count: 0,
input_count,
completed: false,
}
}
fn reset(&mut self) {
self.next_sequence = 0;
self.pending.clear();
self.output_buffer.clear();
self.completed_count = 0;
self.completed = false;
}
fn push_item(&mut self, seq: u64, val: T) -> StreamResult<()> {
if seq == self.next_sequence {
self.output_buffer.push_back(val);
self.next_sequence += 1;
while let Some(index) = self
.pending
.iter()
.position(|(s, _)| *s == self.next_sequence)
{
let (_, item) = self.pending.remove(index);
self.output_buffer.push_back(item);
self.next_sequence += 1;
}
} else {
if self.pending.iter().any(|(s, _)| *s == seq) {
return Err(StreamError::Failed(format!(
"duplicate sequence {seq} on merge sequence"
)));
}
self.pending.push((seq, val));
self.pending.sort_by_key(|(s, _)| *s);
while let Some(index) = self
.pending
.iter()
.position(|(s, _)| *s == self.next_sequence)
{
let (_, item) = self.pending.remove(index);
self.output_buffer.push_back(item);
self.next_sequence += 1;
}
}
Ok(())
}
fn on_inlet_complete(&mut self) -> StreamResult<bool> {
self.completed_count += 1;
if self.completed_count >= self.input_count && self.output_buffer.is_empty() {
if !self.pending.is_empty() {
return Err(StreamError::Failed(format!(
"expected sequence {}, but all input ports have pushed or are complete",
self.next_sequence,
)));
}
self.completed = true;
Ok(true)
} else {
Ok(false)
}
}
fn drain_into(&mut self, out: &mut Vec<T>) {
out.extend(self.output_buffer.drain(..));
}
}
pub(crate) struct TypedMergeSequencePlan<In, T> {
splits: Vec<Arc<dyn Fn(In) -> T + Send + Sync>>,
extract_sequence: Arc<dyn Fn(&T) -> u64 + Send + Sync>,
core: MergeSequenceCore<T>,
}
impl<In: Clone + Send + 'static, T: Send + 'static> TypedMergeSequencePlan<In, T> {
fn push_item(&mut self, item: In, out: &mut Vec<T>) -> StreamResult<()> {
for split_fn in &self.splits {
let val = split_fn(item.clone());
let seq = (self.extract_sequence)(&val);
self.core.push_item(seq, val)?;
}
self.core.drain_into(out);
Ok(())
}
fn finish(&mut self, out: &mut Vec<T>) -> StreamResult<()> {
for _ in 0..self.splits.len() {
self.core.on_inlet_complete()?;
}
self.core.drain_into(out);
Ok(())
}
fn reset(&mut self) {
self.core.reset();
}
}
pub(crate) fn try_typed_merge_sequence_plan<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<TypedMergeSequencePlan<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
if stages.len() != 2 {
return None;
}
let in_type_id = TypeId::of::<In>();
let out_type_id = TypeId::of::<Out>();
let unzip_idx = stages
.iter()
.position(|s| s.spec.inlets.len() == 1 && s.spec.inlets[0].id() == graph_inlet)?;
let unzip_stage = &stages[unzip_idx];
let typed_split_any = match &unzip_stage.spec.kind {
StageKind::Unzip { typed_split, .. } => Arc::clone(typed_split),
_ => return None,
};
if unzip_stage.spec.inlets[0].type_id() != in_type_id {
return None;
}
let k = unzip_stage.spec.outlets.len();
if k == 0 {
return None;
}
for outlet in &unzip_stage.spec.outlets {
if outlet.type_id() != out_type_id {
return None;
}
}
let ms_idx = 1 - unzip_idx;
let ms_stage = &stages[ms_idx];
let (ms_input_count, typed_extract_any) = match &ms_stage.spec.kind {
StageKind::MergeSequence {
input_count,
typed_extract,
..
} => (*input_count, Arc::clone(typed_extract)),
_ => return None,
};
if ms_stage.spec.inlets.len() != k || ms_stage.spec.outlets.len() != 1 {
return None;
}
if ms_input_count != k {
return None;
}
for inlet in &ms_stage.spec.inlets {
if inlet.type_id() != out_type_id {
return None;
}
}
if ms_stage.spec.outlets[0].type_id() != out_type_id {
return None;
}
if ms_stage.spec.outlets[0].id() != graph_outlet {
return None;
}
let unzip_outlet_ids: Vec<PortId> =
unzip_stage.spec.outlets.iter().map(AnyOutlet::id).collect();
let ms_inlet_ids: Vec<PortId> = ms_stage.spec.inlets.iter().map(AnyInlet::id).collect();
let mut outlet_to_ms_inlet: Vec<Option<usize>> = vec![None; k];
for edge in edges {
if let Some(uo_idx) = unzip_outlet_ids.iter().position(|&id| id == edge.outlet) {
if let Some(mi_idx) = ms_inlet_ids.iter().position(|&id| id == edge.inlet) {
outlet_to_ms_inlet[uo_idx] = Some(mi_idx);
} else {
return None; }
}
}
if outlet_to_ms_inlet.iter().any(|x| x.is_none()) {
return None; }
if k != 2 {
return None;
}
let typed_split =
typed_split_any.downcast_ref::<Arc<dyn Fn(In) -> (Out, Out) + Send + Sync>>()?;
let typed_split = Arc::clone(typed_split);
let typed_extract =
typed_extract_any.downcast_ref::<Arc<dyn Fn(&Out) -> u64 + Send + Sync>>()?;
let typed_extract = Arc::clone(typed_extract);
#[allow(clippy::type_complexity)]
let mut splits: Vec<Option<Arc<dyn Fn(In) -> Out + Send + Sync>>> = vec![None; k];
let split0 = Arc::clone(&typed_split);
let split1 = Arc::clone(&typed_split);
let ms_idx_for_out0 = outlet_to_ms_inlet[0].unwrap();
let ms_idx_for_out1 = outlet_to_ms_inlet[1].unwrap();
splits[ms_idx_for_out0] = Some(Arc::new(move |input: In| split0(input).0));
splits[ms_idx_for_out1] = Some(Arc::new(move |input: In| split1(input).1));
if splits.iter().any(|s| s.is_none()) {
return None;
}
let splits: Vec<Arc<dyn Fn(In) -> Out + Send + Sync>> =
splits.into_iter().map(|s| s.unwrap()).collect();
Some(TypedMergeSequencePlan {
splits,
extract_sequence: typed_extract,
core: MergeSequenceCore::new(k),
})
}
pub(crate) fn run_typed_merge_sequence<In, T, I>(
plan: &mut TypedMergeSequencePlan<In, T>,
input: I,
) -> StreamResult<Vec<T>>
where
In: Clone + Send + 'static,
T: Send + 'static,
I: IntoIterator<Item = In>,
{
plan.reset();
let input = input.into_iter();
let hint = input.size_hint().0;
let mut output: Vec<T> = Vec::with_capacity(hint * plan.splits.len());
for item in input {
plan.push_item(item, &mut output)?;
}
plan.finish(&mut output)?;
Ok(output)
}
pub(crate) struct MergeLatestCore<T> {
latest: Vec<Option<T>>,
seen_count: usize,
completed_count: usize,
input_count: usize,
pending: VecDeque<Vec<T>>,
completed: bool,
eager_complete: bool,
}
impl<T: Clone> MergeLatestCore<T> {
pub(crate) fn new(input_count: usize, eager_complete: bool) -> Self {
Self {
latest: vec![None; input_count],
seen_count: 0,
completed_count: 0,
input_count,
pending: VecDeque::new(),
completed: false,
eager_complete,
}
}
fn reset(&mut self) {
for slot in &mut self.latest {
*slot = None;
}
self.seen_count = 0;
self.completed_count = 0;
self.pending.clear();
self.completed = false;
}
fn push_item(&mut self, inlet_index: usize, val: T) {
if self.latest[inlet_index].is_none() {
self.seen_count += 1;
}
self.latest[inlet_index] = Some(val);
if self.seen_count >= self.input_count {
let snapshot: Vec<T> = self
.latest
.iter()
.map(|s| s.clone().expect("merge-latest typed: slot seen but None"))
.collect();
self.pending.push_back(snapshot);
}
}
fn on_inlet_complete(&mut self) -> bool {
self.completed_count += 1;
let all_done = self.completed_count >= self.input_count;
let eager_done = self.eager_complete && self.pending.is_empty();
if all_done || eager_done {
self.completed = true;
true
} else {
false
}
}
fn drain_into(&mut self, out: &mut Vec<Vec<T>>) {
out.extend(self.pending.drain(..));
}
}
pub(crate) struct TypedMergeLatestPlan<In, T> {
splits: Vec<Arc<dyn Fn(In) -> T + Send + Sync>>,
core: MergeLatestCore<T>,
}
impl<In: Clone + Send + 'static, T: Clone + Send + 'static> TypedMergeLatestPlan<In, T> {
fn push_item(&mut self, item: In, out: &mut Vec<Vec<T>>) {
for (idx, split_fn) in self.splits.iter().enumerate() {
let val = split_fn(item.clone());
self.core.push_item(idx, val);
}
self.core.drain_into(out);
}
fn finish(&mut self) -> bool {
for _ in 0..self.splits.len() {
if self.core.on_inlet_complete() {
return true;
}
}
true
}
fn reset(&mut self) {
self.core.reset();
}
}
pub(crate) fn try_typed_merge_latest_plan<In, T>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<TypedMergeLatestPlan<In, T>>
where
In: Clone + Send + 'static,
T: Clone + Send + 'static,
{
if stages.len() != 2 {
return None;
}
let in_type_id = TypeId::of::<In>();
let elem_type_id = TypeId::of::<T>();
let vec_type_id = TypeId::of::<Vec<T>>();
let unzip_idx = stages
.iter()
.position(|s| s.spec.inlets.len() == 1 && s.spec.inlets[0].id() == graph_inlet)?;
let unzip_stage = &stages[unzip_idx];
let typed_split_any = match &unzip_stage.spec.kind {
StageKind::Unzip { typed_split, .. } => Arc::clone(typed_split),
_ => return None,
};
if unzip_stage.spec.inlets[0].type_id() != in_type_id {
return None;
}
let k = unzip_stage.spec.outlets.len();
if k == 0 {
return None;
}
for outlet in &unzip_stage.spec.outlets {
if outlet.type_id() != elem_type_id {
return None;
}
}
let ml_idx = 1 - unzip_idx;
let ml_stage = &stages[ml_idx];
let (ml_input_count, typed_snapshot_any) = match &ml_stage.spec.kind {
StageKind::MergeLatest {
input_count,
typed_snapshot,
..
} => (*input_count, Arc::clone(typed_snapshot)),
_ => return None,
};
if ml_stage.spec.inlets.len() != k || ml_stage.spec.outlets.len() != 1 {
return None;
}
if ml_input_count != k {
return None;
}
for inlet in &ml_stage.spec.inlets {
if inlet.type_id() != elem_type_id {
return None;
}
}
if ml_stage.spec.outlets[0].type_id() != vec_type_id {
return None;
}
if ml_stage.spec.outlets[0].id() != graph_outlet {
return None;
}
let unzip_outlet_ids: Vec<PortId> =
unzip_stage.spec.outlets.iter().map(AnyOutlet::id).collect();
let ml_inlet_ids: Vec<PortId> = ml_stage.spec.inlets.iter().map(AnyInlet::id).collect();
let mut outlet_to_ml_inlet: Vec<Option<usize>> = vec![None; k];
for edge in edges {
if let Some(uo_idx) = unzip_outlet_ids.iter().position(|&id| id == edge.outlet) {
if let Some(mi_idx) = ml_inlet_ids.iter().position(|&id| id == edge.inlet) {
outlet_to_ml_inlet[uo_idx] = Some(mi_idx);
} else {
return None; }
}
}
if outlet_to_ml_inlet.iter().any(|x| x.is_none()) {
return None; }
if k != 2 {
return None;
}
type SplitFn<A, B> = Arc<dyn Fn(A) -> (B, B) + Send + Sync>;
let typed_split = typed_split_any.downcast_ref::<SplitFn<In, T>>()?;
let typed_split = Arc::clone(typed_split);
type SnapshotFn<U> = Arc<dyn Fn(&[Option<U>]) -> Vec<U> + Send + Sync>;
typed_snapshot_any.downcast_ref::<SnapshotFn<T>>()?;
#[allow(clippy::type_complexity)]
let mut splits: Vec<Option<Arc<dyn Fn(In) -> T + Send + Sync>>> = vec![None; k];
let split0 = Arc::clone(&typed_split);
let split1 = Arc::clone(&typed_split);
let ml_idx_for_out0 = outlet_to_ml_inlet[0].unwrap();
let ml_idx_for_out1 = outlet_to_ml_inlet[1].unwrap();
splits[ml_idx_for_out0] = Some(Arc::new(move |input: In| split0(input).0));
splits[ml_idx_for_out1] = Some(Arc::new(move |input: In| split1(input).1));
if splits.iter().any(|s| s.is_none()) {
return None;
}
let splits: Vec<Arc<dyn Fn(In) -> T + Send + Sync>> =
splits.into_iter().map(|s| s.unwrap()).collect();
let eager_complete = match &ml_stage.spec.kind {
StageKind::MergeLatest { eager_complete, .. } => *eager_complete,
_ => return None,
};
Some(TypedMergeLatestPlan {
splits,
core: MergeLatestCore::new(k, eager_complete),
})
}
type MergeLatestRunner<In, Out> =
Box<dyn FnOnce(&mut dyn Iterator<Item = In>) -> StreamResult<Vec<Out>>>;
pub(crate) fn try_build_typed_merge_latest_dispatch<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
inlet: PortId,
outlet: PortId,
) -> Option<MergeLatestRunner<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
let elem_type_id: TypeId = stages.iter().find_map(|s| {
if let StageKind::MergeLatest { .. } = &s.spec.kind {
s.spec.inlets.first().map(|i| i.type_id())
} else {
None
}
})?;
macro_rules! try_elem {
($($T:ty),*) => {
$(
if elem_type_id == TypeId::of::<$T>() {
let mut plan = try_typed_merge_latest_plan::<In, $T>(stages, edges, inlet, outlet)?;
let runner: MergeLatestRunner<In, Out> = Box::new(
move |iter: &mut dyn Iterator<Item = In>| {
plan.reset();
let hint = iter.size_hint().0;
let mut output: Vec<Vec<$T>> = Vec::with_capacity(hint);
for item in iter {
plan.push_item(item, &mut output);
}
plan.finish();
let boxed: Box<dyn Any + Send> = Box::new(output);
boxed
.downcast::<Vec<Out>>()
.map(|b| *b)
.map_err(|_| StreamError::Failed(
"merge-latest typed runner: Out type mismatch".into()
))
}
);
return Some(runner);
}
)*
};
}
try_elem!(
u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize, f32, f64, bool, String
);
None
}
enum CyclicFeedbackStep<T> {
Pass,
Map(Arc<dyn Fn(T) -> T + Send + Sync>),
TakeWhile(Arc<dyn Fn(&T) -> bool + Send + Sync>),
}
type CyclicFeedbackRunner<In, Out> = Box<
dyn FnOnce(
&mut dyn Iterator<Item = In>,
FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<Out>>,
>;
fn run_cyclic_feedback_chain<T>(
steps: &[CyclicFeedbackStep<T>],
mut value: T,
feedback_open: &mut bool,
) -> Option<T> {
for step in steps {
match step {
CyclicFeedbackStep::Pass => {}
CyclicFeedbackStep::Map(f) => value = f(value),
CyclicFeedbackStep::TakeWhile(predicate) => {
if !predicate(&value) {
*feedback_open = false;
return None;
}
}
}
}
Some(value)
}
pub(crate) fn try_build_typed_cyclic_feedback_dispatch<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<CyclicFeedbackRunner<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
let in_type_id = TypeId::of::<In>();
if in_type_id != TypeId::of::<Out>() {
return None;
}
let inlet_for_outlet = |outlet: PortId| {
edges
.iter()
.find_map(|e| (e.outlet == outlet).then_some(e.inlet))
};
let stage_owning_inlet = |inlet: PortId| {
stages
.iter()
.enumerate()
.find(|(_, s)| s.spec.inlets.iter().any(|i| i.id() == inlet))
};
let (merge_index, merge) = stage_owning_inlet(graph_inlet)?;
if !matches!(merge.spec.kind, StageKind::MergePreferred) {
return None;
}
if merge.spec.inlets.len() != 2 || merge.spec.outlets.len() != 1 {
return None;
}
let preferred_inlet = merge.spec.inlets[0].id();
if preferred_inlet == graph_inlet || merge.spec.inlets[1].id() != graph_inlet {
return None;
}
if merge.spec.inlets.iter().any(|i| i.type_id() != in_type_id)
|| merge.spec.outlets[0].type_id() != in_type_id
{
return None;
}
let broadcast_inlet = inlet_for_outlet(merge.spec.outlets[0].id())?;
let (broadcast_index, broadcast) = stage_owning_inlet(broadcast_inlet)?;
if !matches!(broadcast.spec.kind, StageKind::Broadcast) {
return None;
}
if broadcast.spec.inlets.len() != 1 || broadcast.spec.outlets.len() != 2 {
return None;
}
if broadcast.spec.inlets[0].type_id() != in_type_id
|| broadcast
.spec
.outlets
.iter()
.any(|o| o.type_id() != in_type_id)
{
return None;
}
if broadcast.spec.outlets[0].id() != graph_outlet
|| broadcast.spec.outlets[1].id() == graph_outlet
{
return None;
}
let feedback_outlet = broadcast.spec.outlets[1].id();
let mut visited: HashSet<usize> = HashSet::new();
visited.insert(merge_index);
visited.insert(broadcast_index);
let mut steps: Vec<CyclicFeedbackStep<In>> = Vec::new();
let mut current_outlet = feedback_outlet;
loop {
let inlet = inlet_for_outlet(current_outlet)?;
if inlet == preferred_inlet {
break; }
let (stage_index, stage) = stage_owning_inlet(inlet)?;
if stage.spec.inlets.len() != 1 || stage.spec.outlets.len() != 1 {
return None;
}
if stage.spec.inlets[0].type_id() != in_type_id
|| stage.spec.outlets[0].type_id() != in_type_id
{
return None;
}
if !visited.insert(stage_index) {
return None; }
let step = match &stage.spec.kind {
StageKind::Identity => CyclicFeedbackStep::Pass,
StageKind::Map(map) => {
let f = map
.typed
.downcast_ref::<Arc<dyn Fn(In) -> In + Send + Sync>>()?;
CyclicFeedbackStep::Map(Arc::clone(f))
}
StageKind::Opaque => match stage.spec.typed_cyclic.as_ref()? {
TypedCyclicOp::BufferPassthrough => CyclicFeedbackStep::Pass,
TypedCyclicOp::TakeWhile(predicate) => {
let p = predicate.downcast_ref::<Arc<dyn Fn(&In) -> bool + Send + Sync>>()?;
CyclicFeedbackStep::TakeWhile(Arc::clone(p))
}
},
_ => return None,
};
steps.push(step);
current_outlet = stage.spec.outlets[0].id();
}
if visited.len() != stages.len() {
return None;
}
let runner: CyclicFeedbackRunner<In, Out> = Box::new(
move |iter: &mut dyn Iterator<Item = In>, config: FusedExecutionConfig| {
let limit = config.event_limit;
let mut output: Vec<In> = Vec::with_capacity(iter.size_hint().0);
let mut pending: VecDeque<In> = VecDeque::new();
let mut feedback_open = true;
let mut events: usize = 0;
for item in iter {
pending.push_back(item);
while let Some(value) = pending.pop_front() {
events += 1;
if events > limit {
return Err(StreamError::EventLimitExceeded { limit });
}
output.push(value.clone());
if feedback_open {
events += steps.len();
if events > limit {
return Err(StreamError::EventLimitExceeded { limit });
}
if let Some(next) =
run_cyclic_feedback_chain(&steps, value, &mut feedback_open)
{
pending.push_back(next);
}
}
}
}
let output = downcast_output_vec::<In, Out>(output, "cyclic feedback")?;
Ok(FusedExecutionReport {
output,
events,
async_boundary_crossings: 0,
})
},
);
Some(runner)
}
type AcyclicJunctionRunner<In, Out> =
Box<dyn FnOnce(&mut dyn Iterator<Item = In>) -> StreamResult<Vec<Out>>>;
fn downcast_output_vec<T, Out>(output: Vec<T>, context: &'static str) -> StreamResult<Vec<Out>>
where
T: Send + 'static,
Out: Send + 'static,
{
let boxed: Box<dyn Any + Send> = Box::new(output);
boxed
.downcast::<Vec<Out>>()
.map(|b| *b)
.map_err(|_| StreamError::Failed(format!("{context} typed runner: output type mismatch")))
}
fn stage_with_graph_inlet(
stages: &[super::builder::StageRecord],
graph_inlet: PortId,
) -> Option<(usize, &super::builder::StageRecord)> {
stages.iter().enumerate().find(|(_, stage)| {
stage
.spec
.inlets
.iter()
.any(|inlet| inlet.id() == graph_inlet)
})
}
fn other_stage(
stages: &[super::builder::StageRecord],
index: usize,
) -> Option<(usize, &super::builder::StageRecord)> {
if stages.len() != 2 {
return None;
}
let other = 1usize.checked_sub(index)?;
stages.get(other).map(|stage| (other, stage))
}
fn edge_target_index(
edges: &[super::builder::Edge],
outlet: PortId,
inlets: &[AnyInlet],
) -> Option<usize> {
let inlet_id = edges
.iter()
.find_map(|edge| (edge.outlet == outlet).then_some(edge.inlet))?;
inlets.iter().position(|inlet| inlet.id() == inlet_id)
}
fn outlets_cover_inlets(
edges: &[super::builder::Edge],
outlets: &[AnyOutlet],
inlets: &[AnyInlet],
) -> Option<Vec<usize>> {
if outlets.len() != inlets.len() {
return None;
}
let mut seen = vec![false; inlets.len()];
let mut mapping = Vec::with_capacity(outlets.len());
for outlet in outlets {
let inlet_index = edge_target_index(edges, outlet.id(), inlets)?;
if seen[inlet_index] {
return None;
}
seen[inlet_index] = true;
mapping.push(inlet_index);
}
seen.iter().all(|item| *item).then_some(mapping)
}
pub(crate) fn try_build_typed_acyclic_junction_dispatch<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<AcyclicJunctionRunner<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
if let Some(runner) =
try_typed_broadcast_zip_runner::<In, Out>(stages, edges, graph_inlet, graph_outlet)
{
return Some(runner);
}
if let Some(runner) =
try_typed_balance_merge_runner::<In, Out>(stages, edges, graph_inlet, graph_outlet)
{
return Some(runner);
}
if let Some(runner) =
try_typed_partition_merge_runner::<In, Out>(stages, edges, graph_inlet, graph_outlet)
{
return Some(runner);
}
if let Some(runner) =
try_build_typed_unzip_zip_dispatch::<In, Out>(stages, edges, graph_inlet, graph_outlet)
{
return Some(runner);
}
try_build_typed_merge_sorted_dispatch::<In, Out>(stages, edges, graph_inlet, graph_outlet)
}
fn try_typed_broadcast_zip_runner<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<AcyclicJunctionRunner<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
if stages.len() != 2 || edges.len() != 2 {
return None;
}
let in_type = TypeId::of::<In>();
let pair_type = TypeId::of::<(In, In)>();
if TypeId::of::<Out>() != pair_type {
return None;
}
let (broadcast_idx, broadcast_stage) = stage_with_graph_inlet(stages, graph_inlet)?;
if !matches!(broadcast_stage.spec.kind, StageKind::Broadcast) {
return None;
}
let (_, zip_stage) = other_stage(stages, broadcast_idx)?;
if !matches!(zip_stage.spec.kind, StageKind::Zip(_)) {
return None;
}
if broadcast_stage.spec.inlets.len() != 1
|| broadcast_stage.spec.outlets.len() != 2
|| zip_stage.spec.inlets.len() != 2
|| zip_stage.spec.outlets.len() != 1
|| zip_stage.spec.outlets[0].id() != graph_outlet
|| zip_stage.spec.outlets[0].type_id() != pair_type
|| broadcast_stage.spec.inlets[0].type_id() != in_type
|| broadcast_stage
.spec
.outlets
.iter()
.any(|outlet| outlet.type_id() != in_type)
|| zip_stage
.spec
.inlets
.iter()
.any(|inlet| inlet.type_id() != in_type)
{
return None;
}
outlets_cover_inlets(edges, &broadcast_stage.spec.outlets, &zip_stage.spec.inlets)?;
Some(Box::new(|iter| {
let mut output = Vec::with_capacity(iter.size_hint().0);
for item in iter {
output.push((item.clone(), item));
}
downcast_output_vec(output, "broadcast-zip")
}))
}
fn try_typed_balance_merge_runner<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<AcyclicJunctionRunner<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
if stages.len() != 2 || TypeId::of::<In>() != TypeId::of::<Out>() {
return None;
}
let in_type = TypeId::of::<In>();
let (balance_idx, balance_stage) = stage_with_graph_inlet(stages, graph_inlet)?;
if !matches!(balance_stage.spec.kind, StageKind::Balance) {
return None;
}
let (_, merge_stage) = other_stage(stages, balance_idx)?;
if !matches!(merge_stage.spec.kind, StageKind::Merge) {
return None;
}
if balance_stage.spec.inlets.len() != 1
|| balance_stage.spec.outlets.is_empty()
|| merge_stage.spec.outlets.len() != 1
|| merge_stage.spec.outlets[0].id() != graph_outlet
|| edges.len() != balance_stage.spec.outlets.len()
|| balance_stage.spec.inlets[0].type_id() != in_type
|| merge_stage.spec.outlets[0].type_id() != in_type
|| balance_stage
.spec
.outlets
.iter()
.any(|outlet| outlet.type_id() != in_type)
|| merge_stage
.spec
.inlets
.iter()
.any(|inlet| inlet.type_id() != in_type)
{
return None;
}
outlets_cover_inlets(edges, &balance_stage.spec.outlets, &merge_stage.spec.inlets)?;
Some(Box::new(|iter| {
let mut output = Vec::with_capacity(iter.size_hint().0);
output.extend(iter);
downcast_output_vec(output, "balance-merge")
}))
}
fn try_typed_partition_merge_runner<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<AcyclicJunctionRunner<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
if stages.len() != 2 || TypeId::of::<In>() != TypeId::of::<Out>() {
return None;
}
let in_type = TypeId::of::<In>();
let (partition_idx, partition_stage) = stage_with_graph_inlet(stages, graph_inlet)?;
let (output_count, typed_partitioner) = match &partition_stage.spec.kind {
StageKind::Partition {
output_count,
typed_partitioner,
..
} => (*output_count, Arc::clone(typed_partitioner)),
_ => return None,
};
let (_, merge_stage) = other_stage(stages, partition_idx)?;
if !matches!(merge_stage.spec.kind, StageKind::Merge) {
return None;
}
if partition_stage.spec.inlets.len() != 1
|| partition_stage.spec.outlets.len() != output_count
|| merge_stage.spec.outlets.len() != 1
|| merge_stage.spec.outlets[0].id() != graph_outlet
|| merge_stage.spec.inlets.len() != output_count
|| edges.len() != output_count
|| partition_stage.spec.inlets[0].type_id() != in_type
|| merge_stage.spec.outlets[0].type_id() != in_type
|| partition_stage
.spec
.outlets
.iter()
.any(|outlet| outlet.type_id() != in_type)
|| merge_stage
.spec
.inlets
.iter()
.any(|inlet| inlet.type_id() != in_type)
{
return None;
}
outlets_cover_inlets(
edges,
&partition_stage.spec.outlets,
&merge_stage.spec.inlets,
)?;
let partitioner =
typed_partitioner.downcast_ref::<Arc<dyn Fn(&In) -> usize + Send + Sync>>()?;
let partitioner = Arc::clone(partitioner);
Some(Box::new(move |iter| {
let mut output = Vec::with_capacity(iter.size_hint().0);
for item in iter {
let idx = partitioner(&item);
if idx >= output_count {
return Err(StreamError::Failed(format!(
"partitioner returned out-of-bounds index {idx} for {output_count} outputs"
)));
}
output.push(item);
}
downcast_output_vec(output, "partition-merge")
}))
}
fn try_build_typed_unzip_zip_dispatch<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<AcyclicJunctionRunner<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
let elem_type_id = stages.iter().find_map(|stage| {
if let StageKind::Zip(_) = stage.spec.kind {
let [left, right] = stage.spec.inlets.as_slice() else {
return None;
};
(left.type_id() == right.type_id()).then_some(left.type_id())
} else {
None
}
})?;
macro_rules! try_elem {
($($T:ty),*) => {
$(
if elem_type_id == TypeId::of::<$T>() {
return try_typed_unzip_zip_runner_same::<In, $T, Out>(
stages,
edges,
graph_inlet,
graph_outlet,
);
}
)*
};
}
try_elem!(
u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize, f32, f64, bool, char,
String
);
None
}
fn try_typed_unzip_zip_runner_same<In, T, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<AcyclicJunctionRunner<In, Out>>
where
In: Clone + Send + 'static,
T: Send + 'static,
Out: Send + 'static,
{
if stages.len() != 2 || edges.len() != 2 || TypeId::of::<Out>() != TypeId::of::<(T, T)>() {
return None;
}
let in_type = TypeId::of::<In>();
let elem_type = TypeId::of::<T>();
let pair_type = TypeId::of::<(T, T)>();
let (unzip_idx, unzip_stage) = stage_with_graph_inlet(stages, graph_inlet)?;
let typed_split = match &unzip_stage.spec.kind {
StageKind::Unzip { typed_split, .. } => Arc::clone(typed_split),
_ => return None,
};
let (_, zip_stage) = other_stage(stages, unzip_idx)?;
if !matches!(zip_stage.spec.kind, StageKind::Zip(_)) {
return None;
}
if unzip_stage.spec.inlets.len() != 1
|| unzip_stage.spec.outlets.len() != 2
|| zip_stage.spec.inlets.len() != 2
|| zip_stage.spec.outlets.len() != 1
|| zip_stage.spec.outlets[0].id() != graph_outlet
|| unzip_stage.spec.inlets[0].type_id() != in_type
|| zip_stage.spec.outlets[0].type_id() != pair_type
|| unzip_stage
.spec
.outlets
.iter()
.any(|outlet| outlet.type_id() != elem_type)
|| zip_stage
.spec
.inlets
.iter()
.any(|inlet| inlet.type_id() != elem_type)
{
return None;
}
let mapping = outlets_cover_inlets(edges, &unzip_stage.spec.outlets, &zip_stage.spec.inlets)?;
let out0_to_left = mapping.first().copied()? == 0;
let split = typed_split.downcast_ref::<Arc<dyn Fn(In) -> (T, T) + Send + Sync>>()?;
let split = Arc::clone(split);
Some(Box::new(move |iter| {
let mut output = Vec::with_capacity(iter.size_hint().0);
for item in iter {
let (left, right) = split(item);
if out0_to_left {
output.push((left, right));
} else {
output.push((right, left));
}
}
downcast_output_vec(output, "unzip-zip")
}))
}
fn try_build_typed_merge_sorted_dispatch<In, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<AcyclicJunctionRunner<In, Out>>
where
In: Clone + Send + 'static,
Out: Send + 'static,
{
let elem_type_id = stages.iter().find_map(|stage| {
if let StageKind::MergeSorted(_) = stage.spec.kind {
let [left, right] = stage.spec.inlets.as_slice() else {
return None;
};
(left.type_id() == right.type_id()).then_some(left.type_id())
} else {
None
}
})?;
macro_rules! try_elem {
($($T:ty),*) => {
$(
if elem_type_id == TypeId::of::<$T>() {
return try_typed_merge_sorted_runner::<In, $T, Out>(
stages,
edges,
graph_inlet,
graph_outlet,
);
}
)*
};
}
try_elem!(
u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize, bool, char, String
);
None
}
fn try_typed_merge_sorted_runner<In, T, Out>(
stages: &[super::builder::StageRecord],
edges: &[super::builder::Edge],
graph_inlet: PortId,
graph_outlet: PortId,
) -> Option<AcyclicJunctionRunner<In, Out>>
where
In: Clone + Send + 'static,
T: Ord + Send + 'static,
Out: Send + 'static,
{
if stages.len() != 2 || edges.len() != 2 || TypeId::of::<Out>() != TypeId::of::<T>() {
return None;
}
let in_type = TypeId::of::<In>();
let elem_type = TypeId::of::<T>();
let (unzip_idx, unzip_stage) = stage_with_graph_inlet(stages, graph_inlet)?;
let typed_split = match &unzip_stage.spec.kind {
StageKind::Unzip { typed_split, .. } => Arc::clone(typed_split),
_ => return None,
};
let (_, merge_stage) = other_stage(stages, unzip_idx)?;
if !matches!(merge_stage.spec.kind, StageKind::MergeSorted(_)) {
return None;
}
if unzip_stage.spec.inlets.len() != 1
|| unzip_stage.spec.outlets.len() != 2
|| merge_stage.spec.inlets.len() != 2
|| merge_stage.spec.outlets.len() != 1
|| merge_stage.spec.outlets[0].id() != graph_outlet
|| unzip_stage.spec.inlets[0].type_id() != in_type
|| merge_stage.spec.outlets[0].type_id() != elem_type
|| unzip_stage
.spec
.outlets
.iter()
.any(|outlet| outlet.type_id() != elem_type)
|| merge_stage
.spec
.inlets
.iter()
.any(|inlet| inlet.type_id() != elem_type)
{
return None;
}
let mapping = outlets_cover_inlets(edges, &unzip_stage.spec.outlets, &merge_stage.spec.inlets)?;
let out0_to_left = mapping.first().copied()? == 0;
let split = typed_split.downcast_ref::<Arc<dyn Fn(In) -> (T, T) + Send + Sync>>()?;
let split = Arc::clone(split);
Some(Box::new(move |iter| {
let mut left = VecDeque::new();
let mut right = VecDeque::new();
let mut output = Vec::with_capacity(iter.size_hint().0.saturating_mul(2));
for item in iter {
let (first, second) = split(item);
if out0_to_left {
left.push_back(first);
right.push_back(second);
} else {
left.push_back(second);
right.push_back(first);
}
drain_merge_sorted(&mut left, &mut right, false, false, &mut output);
}
drain_merge_sorted(&mut left, &mut right, true, true, &mut output);
downcast_output_vec(output, "merge-sorted")
}))
}
fn drain_merge_sorted<T: Ord>(
left: &mut VecDeque<T>,
right: &mut VecDeque<T>,
left_closed: bool,
right_closed: bool,
output: &mut Vec<T>,
) {
loop {
let next = match (left.front(), right.front()) {
(Some(left_item), Some(right_item)) => {
if left_item <= right_item {
left.pop_front()
} else {
right.pop_front()
}
}
(Some(_), None) if right_closed => left.pop_front(),
(None, Some(_)) if left_closed => right.pop_front(),
_ => None,
};
let Some(item) = next else {
break;
};
output.push(item);
}
}
pub(super) enum BoundaryCountExecutor {
#[cfg(test)]
Threaded,
Ractor,
}
impl BoundaryCountExecutor {
pub(super) fn run_count<I, T>(
&self,
input: I,
segments: TypedLinearSegments<T>,
config: AsyncBoundaryExecutionConfig,
) -> StreamResult<FusedTerminalReport<usize>>
where
I: IntoIterator<Item = T> + Send,
I::IntoIter: Send + 'static,
T: Send + 'static,
{
match self {
#[cfg(test)]
Self::Threaded => run_threaded_async_linear_count(input, segments, config),
Self::Ractor => run_ractor_async_linear_count(input, segments, config),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Default)]
struct BufferedFlowState {
queued: VecDeque<i32>,
upstream_closed: bool,
pull_calls: usize,
finish_calls: usize,
}
struct BufferedFlowOnPull {
state: Arc<Mutex<BufferedFlowState>>,
}
impl GraphStage for BufferedFlowOnPull {
type Shape = FlowShape<i32, i32>;
fn name(&self) -> &str {
"BufferedFlowOnPull"
}
fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
let first_id = next_port_id_block(2);
FlowShape::new(
Inlet::with_id(first_id, "buffered-flow.in"),
Outlet::with_id(first_id.offset(1), "buffered-flow.out"),
)
}
fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
}
fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
struct In {
state: Arc<Mutex<BufferedFlowState>>,
}
impl InHandler for In {
fn on_push(
&mut self,
logic: &mut GraphStageLogic,
inlet: AnyInlet,
) -> StreamResult<()> {
let value: i32 = logic.grab_datum(inlet.id()).and_then(|value| {
downcast_datum(value, "grab", || format!("inlet#{}", inlet.id().as_usize()))
})?;
self.state.lock().unwrap().queued.push_back(value);
Ok(())
}
fn on_upstream_finish(
&mut self,
_logic: &mut GraphStageLogic,
_inlet: AnyInlet,
) -> StreamResult<()> {
self.state.lock().unwrap().upstream_closed = true;
Ok(())
}
}
struct Out {
outlet: Outlet<i32>,
state: Arc<Mutex<BufferedFlowState>>,
}
impl OutHandler for Out {
fn on_pull(
&mut self,
logic: &mut GraphStageLogic,
_outlet: AnyOutlet,
) -> StreamResult<()> {
let (next, upstream_closed) = {
let mut state = self.state.lock().unwrap();
state.pull_calls += 1;
(state.queued.pop_front(), state.upstream_closed)
};
if let Some(value) = next {
logic.emit(&self.outlet, value)
} else if upstream_closed {
logic.complete(&self.outlet)
} else {
Ok(())
}
}
fn on_downstream_finish(
&mut self,
logic: &mut GraphStageLogic,
_outlet: AnyOutlet,
) -> StreamResult<()> {
self.state.lock().unwrap().finish_calls += 1;
logic.complete_stage()
}
}
let mut logic = GraphStageLogic::new(shape);
logic
.set_handler(
&shape.inlet(),
Box::new(In {
state: Arc::clone(&self.state),
}),
)
.unwrap();
logic
.set_out_handler(
&shape.outlet(),
Box::new(Out {
outlet: shape.outlet(),
state: Arc::clone(&self.state),
}),
)
.unwrap();
logic
}
}
struct EmitMultipleThenFailOnPush;
impl GraphStage for EmitMultipleThenFailOnPush {
type Shape = FlowShape<i32, i32>;
fn name(&self) -> &str {
"EmitMultipleThenFailOnPush"
}
fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
let first_id = next_port_id_block(2);
FlowShape::new(
Inlet::with_id(first_id, "emit-fail.in"),
Outlet::with_id(first_id.offset(1), "emit-fail.out"),
)
}
fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
}
fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
struct Handler {
outlet: Outlet<i32>,
}
impl InHandler for Handler {
fn on_push(
&mut self,
logic: &mut GraphStageLogic,
_inlet: AnyInlet,
) -> StreamResult<()> {
logic.emit_multiple(&self.outlet, [1, 2])?;
Err(StreamError::Failed("emit_multiple boom".into()))
}
}
let mut logic = GraphStageLogic::new(shape);
logic
.set_handler(
&shape.inlet(),
Box::new(Handler {
outlet: shape.outlet(),
}),
)
.unwrap();
logic
}
}
struct ReadNThenFailOnFinish;
struct EmitMultipleOnPush;
impl GraphStage for EmitMultipleOnPush {
type Shape = FlowShape<i32, i32>;
fn name(&self) -> &str {
"EmitMultipleOnPush"
}
fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
let first_id = next_port_id_block(2);
FlowShape::new(
Inlet::with_id(first_id, "emit-multiple.in"),
Outlet::with_id(first_id.offset(1), "emit-multiple.out"),
)
}
fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
}
fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
struct Handler {
outlet: Outlet<i32>,
}
impl InHandler for Handler {
fn on_push(
&mut self,
logic: &mut GraphStageLogic,
_inlet: AnyInlet,
) -> StreamResult<()> {
logic.emit_multiple(&self.outlet, [1, 2])
}
}
let mut logic = GraphStageLogic::new(shape);
logic
.set_handler(
&shape.inlet(),
Box::new(Handler {
outlet: shape.outlet(),
}),
)
.unwrap();
logic
}
}
impl GraphStage for ReadNThenFailOnFinish {
type Shape = FlowShape<i32, i32>;
fn name(&self) -> &str {
"ReadNThenFailOnFinish"
}
fn allocate_shape(&self, _allocator: &mut PortAllocator) -> Self::Shape {
let first_id = next_port_id_block(2);
FlowShape::new(
Inlet::with_id(first_id, "read-n.in"),
Outlet::with_id(first_id.offset(1), "read-n.out"),
)
}
fn stage_spec(&self, shape: &Self::Shape) -> StageSpec {
StageSpec::opaque(self.name(), shape.inlets(), shape.outlets())
}
fn create_logic(&self, shape: &Self::Shape) -> GraphStageLogic {
struct Handler {
inlet: Inlet<i32>,
armed: bool,
}
impl InHandler for Handler {
fn on_push(
&mut self,
logic: &mut GraphStageLogic,
_inlet: AnyInlet,
) -> StreamResult<()> {
if !self.armed {
self.armed = true;
logic.read_n(&self.inlet, 2, |_values| {}, |_values| {})
} else {
Ok(())
}
}
fn on_upstream_finish(
&mut self,
_logic: &mut GraphStageLogic,
_inlet: AnyInlet,
) -> StreamResult<()> {
Err(StreamError::Failed("read_n finish boom".into()))
}
}
let mut logic = GraphStageLogic::new(shape);
logic
.set_handler(
&shape.inlet(),
Box::new(Handler {
inlet: shape.inlet(),
armed: false,
}),
)
.unwrap();
logic
}
}
fn single_opaque_stage_graph<G>(stage: G) -> GraphBlueprint<FlowShape<i32, i32>>
where
G: GraphStage<Shape = FlowShape<i32, i32>>,
{
GraphDsl::create(|builder| builder.add(stage)).unwrap()
}
#[test]
fn process_push_restores_handler_before_emit_multiple_error_propagates() {
let graph = single_opaque_stage_graph(EmitMultipleThenFailOnPush);
let inlet = graph.shape.inlet().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
let result = executor.process_stage(0, inlet, datum(10));
assert!(matches!(
result,
Err(StreamError::Failed(message)) if message == "emit_multiple boom"
));
assert!(
executor.opaque_logics[0]
.as_mut()
.unwrap()
.get_in_handler_mut(inlet)
.is_some()
);
}
#[test]
fn process_completion_restores_handler_before_read_n_finish_error_propagates() {
let graph = single_opaque_stage_graph(ReadNThenFailOnFinish);
let inlet = graph.shape.inlet().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
executor.process_stage(0, inlet, datum(1)).unwrap();
executor.process_stage(0, inlet, datum(2)).unwrap();
let result = executor.process_completion(0, inlet);
assert!(matches!(
result,
Err(StreamError::Failed(message)) if message == "read_n finish boom"
));
assert!(
executor.opaque_logics[0]
.as_mut()
.unwrap()
.get_in_handler_mut(inlet)
.is_some()
);
}
#[test]
fn opaque_request_drives_out_handler_for_buffered_output() {
let state = Arc::new(Mutex::new(BufferedFlowState::default()));
let graph = single_opaque_stage_graph(BufferedFlowOnPull {
state: Arc::clone(&state),
});
let inlet = graph.shape.inlet().id();
let outlet = graph.shape.outlet().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
let mut output = Vec::<i32>::new();
let mut output_sink = VecOutputSink {
output: &mut output,
};
executor
.deliver(inlet, datum(7_i32), outlet, &mut output_sink)
.unwrap();
assert!(output_sink.output.is_empty());
executor.request(outlet, outlet, &mut output_sink).unwrap();
assert_eq!(&*output_sink.output, &[7]);
let state = state.lock().unwrap();
assert_eq!(state.pull_calls, 2);
assert_eq!(state.finish_calls, 0);
}
#[test]
fn opaque_downstream_finish_before_first_demand_invokes_out_handler() {
let state = Arc::new(Mutex::new(BufferedFlowState::default()));
let graph = single_opaque_stage_graph(BufferedFlowOnPull {
state: Arc::clone(&state),
});
let outlet = graph.shape.outlet().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
let mut output = Vec::<i32>::new();
let mut output_sink = VecOutputSink {
output: &mut output,
};
executor
.downstream_finish(outlet, outlet, &mut output_sink)
.unwrap();
executor.request(outlet, outlet, &mut output_sink).unwrap();
assert!(output_sink.output.is_empty());
let state = state.lock().unwrap();
assert_eq!(state.pull_calls, 0);
assert_eq!(state.finish_calls, 1);
}
#[test]
fn opaque_downstream_finish_drops_buffered_output_after_upstream_complete() {
let state = Arc::new(Mutex::new(BufferedFlowState::default()));
let graph = single_opaque_stage_graph(BufferedFlowOnPull {
state: Arc::clone(&state),
});
let inlet = graph.shape.inlet().id();
let outlet = graph.shape.outlet().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
let mut output = Vec::<i32>::new();
let mut output_sink = VecOutputSink {
output: &mut output,
};
executor
.deliver(inlet, datum(11_i32), outlet, &mut output_sink)
.unwrap();
executor.complete(inlet, outlet, &mut output_sink).unwrap();
executor
.downstream_finish(outlet, outlet, &mut output_sink)
.unwrap();
executor.request(outlet, outlet, &mut output_sink).unwrap();
assert!(output_sink.output.is_empty());
let state = state.lock().unwrap();
assert_eq!(state.finish_calls, 1);
}
#[test]
fn broadcast_cancels_upstream_only_after_all_outlets_cancel() {
let graph = GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<i32>::new(2));
let merge = builder.add(Merge::<i32>::new(2));
builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
})
.unwrap();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
let broadcast_index = *executor
.stage_by_inlet
.get(&graph.shape.inlet().id())
.unwrap();
let first = graph.stages[broadcast_index].spec.outlets[0].id();
let second = graph.stages[broadcast_index].spec.outlets[1].id();
let first_transition = executor
.process_downstream_finish(broadcast_index, first)
.unwrap();
assert!(first_transition.cancelled_inlets.is_empty());
let second_transition = executor
.process_downstream_finish(broadcast_index, second)
.unwrap();
assert_eq!(
second_transition.cancelled_inlets,
vec![graph.stages[broadcast_index].spec.inlets[0].id()]
);
}
#[test]
fn downstream_finish_propagates_through_merge_and_broadcast() {
let graph = GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<i32>::new(2));
let merge = builder.add(Merge::<i32>::new(2));
builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
})
.unwrap();
let outlet = graph.shape.outlet().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
let mut output = Vec::<i32>::new();
let mut output_sink = VecOutputSink {
output: &mut output,
};
executor
.downstream_finish(outlet, outlet, &mut output_sink)
.unwrap();
let broadcast_index = *executor
.stage_by_inlet
.get(&graph.shape.inlet().id())
.unwrap();
let StageState::Broadcast {
live_outlets,
cancelled_outlets,
..
} = &executor.stage_states[broadcast_index]
else {
panic!("expected broadcast state");
};
assert_eq!(*live_outlets, 0);
assert_eq!(cancelled_outlets, &vec![true, true]);
}
#[test]
fn cyclic_graph_clears_pending_events_when_output_cancels() {
struct CancelAfterFirst {
emitted: usize,
}
impl FusedOutputSink<i32> for CancelAfterFirst {
fn emit(&mut self, _value: i32) -> StreamResult<()> {
self.emitted += 1;
Err(StreamError::Cancelled)
}
}
let graph = GraphDsl::try_create(|builder| {
let merge = builder.add(MergePreferred::<i32>::new(1));
let broadcast = builder.add(Broadcast::<i32>::new(2));
let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
let decrement = builder.add(MapStage::new(|item: i32| item - 1));
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
builder.connect(buffer.outlet(), positive.inlet())?;
builder.connect(positive.outlet(), decrement.inlet())?;
builder.connect(decrement.outlet(), merge.preferred())?;
Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
})
.unwrap();
let graph_outlet = graph.shape.outlet().id();
let graph_inlet = graph.shape.inlet().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
let mut output = CancelAfterFirst { emitted: 0 };
executor
.request(graph_outlet, graph_outlet, &mut output)
.unwrap();
let result = executor.deliver(graph_inlet, datum(1_i32), graph_outlet, &mut output);
assert_eq!(result, Err(StreamError::Cancelled));
assert_eq!(output.emitted, 1);
assert!(executor.event_stack.is_empty());
}
const CYCLE_LIMIT: FusedExecutionConfig = FusedExecutionConfig {
event_limit: 5_000_000,
};
fn cyclic_feedback_i32(
buffer_cap: usize,
strategy: OverflowStrategy,
) -> GraphBlueprint<FlowShape<i32, i32>> {
GraphDsl::try_create(|builder| {
let merge = builder.add(MergePreferred::<i32>::new(1));
let broadcast = builder.add(Broadcast::<i32>::new(2));
let buffer = builder.add(Buffer::<i32>::new(buffer_cap, strategy));
let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
let decrement = builder.add(MapStage::new(|item: i32| item - 1));
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
builder.connect(buffer.outlet(), positive.inlet())?;
builder.connect(positive.outlet(), decrement.inlet())?;
builder.connect(decrement.outlet(), merge.preferred())?;
Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
})
.unwrap()
}
fn assert_cyclic_equiv_i32(graph: &GraphBlueprint<FlowShape<i32, i32>>, input: Vec<i32>) {
let erased = graph
.run_with_input_report_mode(input.clone(), CYCLE_LIMIT, ExecutorMode::ErasedOnly)
.map(|r| r.output);
let typed = graph
.run_with_input_report_mode(input.clone(), CYCLE_LIMIT, ExecutorMode::TypedOnly)
.map(|r| r.output);
let auto = graph
.run_with_input_report_mode(input.clone(), CYCLE_LIMIT, ExecutorMode::Auto)
.map(|r| r.output);
assert!(
typed.is_ok(),
"typed cyclic path was not selected for {input:?}: {typed:?}"
);
assert_eq!(erased, typed, "typed != erased for input {input:?}");
assert_eq!(erased, auto, "auto != erased for input {input:?}");
}
#[test]
fn cyclic_typed_matches_erased_single_and_multi_input() {
let graph = cyclic_feedback_i32(16, OverflowStrategy::Backpressure);
for input in [
vec![],
vec![0],
vec![1],
vec![5],
vec![100],
vec![2, 5],
vec![5, 2],
vec![0, 3],
vec![3, 0, 2],
vec![1, 1, 1],
vec![-1],
vec![4, -3, 7],
] {
assert_cyclic_equiv_i32(&graph, input);
}
}
#[test]
fn cyclic_typed_matches_erased_across_buffer_configs() {
for cap in [1usize, 2, 8, 64] {
for strategy in [
OverflowStrategy::Backpressure,
OverflowStrategy::DropHead,
OverflowStrategy::DropTail,
OverflowStrategy::DropBuffer,
OverflowStrategy::DropNew,
OverflowStrategy::Fail,
] {
let graph = cyclic_feedback_i32(cap, strategy);
for input in [vec![6], vec![3, 0, 4], vec![1, 1]] {
assert_cyclic_equiv_i32(&graph, input);
}
}
}
}
#[test]
fn cyclic_typed_falls_back_for_feedback_first_broadcast_orientation() {
let graph = GraphDsl::try_create(|builder| {
let merge = builder.add(MergePreferred::<i32>::new(1));
let broadcast = builder.add(Broadcast::<i32>::new(2));
let buffer = builder.add(Buffer::<i32>::new(4, OverflowStrategy::Backpressure));
let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
let decrement = builder.add(MapStage::new(|item: i32| item - 1));
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(0)?, buffer.inlet())?;
builder.connect(buffer.outlet(), positive.inlet())?;
builder.connect(positive.outlet(), decrement.inlet())?;
builder.connect(decrement.outlet(), merge.preferred())?;
Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(1)?))
})
.unwrap();
for input in [vec![5], vec![2, 4], vec![0, 3]] {
let typed = graph.run_with_input_report_mode(
input.clone(),
CYCLE_LIMIT,
ExecutorMode::TypedOnly,
);
assert!(
matches!(typed, Err(StreamError::GraphValidation(_))),
"feedback-first orientation should not be typed-supported: {typed:?}"
);
let erased = graph
.run_with_input_report_mode(input.clone(), CYCLE_LIMIT, ExecutorMode::ErasedOnly)
.map(|r| r.output);
let auto = graph
.run_with_input_report_mode(input.clone(), CYCLE_LIMIT, ExecutorMode::Auto)
.map(|r| r.output);
assert_eq!(
erased, auto,
"auto must match erased on fallback for {input:?}"
);
}
}
#[test]
fn cyclic_typed_matches_erased_map_before_takewhile_and_identity() {
let graph = GraphDsl::try_create(|builder| {
let merge = builder.add(MergePreferred::<i32>::new(1));
let broadcast = builder.add(Broadcast::<i32>::new(2));
let decrement = builder.add(MapStage::new(|item: i32| item - 1));
let nonneg = builder.add(TakeWhile::<i32>::new(|item| *item >= 0));
let passthrough = builder.add(Identity::<i32>::new());
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, decrement.inlet())?;
builder.connect(decrement.outlet(), nonneg.inlet())?;
builder.connect(nonneg.outlet(), passthrough.inlet())?;
builder.connect(passthrough.outlet(), merge.preferred())?;
Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
})
.unwrap();
for input in [vec![5], vec![0], vec![3, 1, 4], vec![10, 0]] {
assert_cyclic_equiv_i32(&graph, input);
}
}
#[test]
fn cyclic_typed_matches_erased_u64_elements() {
let graph = GraphDsl::try_create(|builder| {
let merge = builder.add(MergePreferred::<u64>::new(1));
let broadcast = builder.add(Broadcast::<u64>::new(2));
let buffer = builder.add(Buffer::<u64>::new(16, OverflowStrategy::Backpressure));
let positive = builder.add(TakeWhile::<u64>::new(|item| *item > 0));
let decrement = builder.add(MapStage::new(|item: u64| item - 1));
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
builder.connect(buffer.outlet(), positive.inlet())?;
builder.connect(positive.outlet(), decrement.inlet())?;
builder.connect(decrement.outlet(), merge.preferred())?;
Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
})
.unwrap();
for input in [vec![0u64], vec![7u64], vec![3u64, 5], vec![10_000u64]] {
let erased = graph
.run_with_input_report_mode(input.clone(), CYCLE_LIMIT, ExecutorMode::ErasedOnly)
.map(|r| r.output);
let typed = graph
.run_with_input_report_mode(input.clone(), CYCLE_LIMIT, ExecutorMode::TypedOnly)
.map(|r| r.output);
let auto = graph
.run_with_input_report_mode(input.clone(), CYCLE_LIMIT, ExecutorMode::Auto)
.map(|r| r.output);
assert!(typed.is_ok(), "typed not selected for u64 input {input:?}");
assert_eq!(erased, typed, "u64 typed != erased for {input:?}");
assert_eq!(erased, auto, "u64 auto != erased for {input:?}");
}
}
#[test]
fn cyclic_typed_unproductive_cycle_surfaces_event_limit_like_erased() {
let graph = GraphDsl::try_create(|builder| {
let merge = builder.add(MergePreferred::<i32>::new(1));
let broadcast = builder.add(Broadcast::<i32>::new(2));
let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
let increment = builder.add(MapStage::new(|item: i32| item + 1));
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
builder.connect(buffer.outlet(), increment.inlet())?;
builder.connect(increment.outlet(), merge.preferred())?;
Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
})
.unwrap();
let config = FusedExecutionConfig { event_limit: 512 };
let erased = graph.run_with_input_report_mode(vec![1], config, ExecutorMode::ErasedOnly);
let typed = graph.run_with_input_report_mode(vec![1], config, ExecutorMode::TypedOnly);
let auto = graph.run_with_input_report_mode(vec![1], config, ExecutorMode::Auto);
assert_eq!(
erased.map(|r| r.output),
Err(StreamError::EventLimitExceeded { limit: 512 })
);
assert_eq!(
typed.map(|r| r.output),
Err(StreamError::EventLimitExceeded { limit: 512 })
);
assert_eq!(
auto.map(|r| r.output),
Err(StreamError::EventLimitExceeded { limit: 512 })
);
}
#[test]
fn cyclic_typed_falls_back_for_plain_merge() {
let graph = GraphDsl::try_create(|builder| {
let merge = builder.add(Merge::<i32>::new(2));
let broadcast = builder.add(Broadcast::<i32>::new(2));
let buffer = builder.add(Buffer::<i32>::new(8, OverflowStrategy::Backpressure));
let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
let decrement = builder.add(MapStage::new(|item: i32| item - 1));
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, buffer.inlet())?;
builder.connect(buffer.outlet(), positive.inlet())?;
builder.connect(positive.outlet(), decrement.inlet())?;
builder.connect(decrement.outlet(), merge.inlet(1)?)?;
Ok(FlowShape::new(merge.inlet(0)?, broadcast.outlet(0)?))
})
.unwrap();
let typed = graph.run_with_input_report_mode(vec![3], CYCLE_LIMIT, ExecutorMode::TypedOnly);
assert!(
matches!(typed, Err(StreamError::GraphValidation(_))),
"plain Merge cycle should not be typed-supported: {typed:?}"
);
let erased = graph
.run_with_input_report_mode(vec![3], CYCLE_LIMIT, ExecutorMode::ErasedOnly)
.map(|r| r.output);
let auto = graph
.run_with_input_report_mode(vec![3], CYCLE_LIMIT, ExecutorMode::Auto)
.map(|r| r.output);
assert_eq!(erased, auto, "auto must match erased on fallback");
}
#[test]
fn cyclic_typed_falls_back_for_custom_opaque_in_feedback() {
let graph = GraphDsl::try_create(|builder| {
let merge = builder.add(MergePreferred::<i32>::new(1));
let broadcast = builder.add(Broadcast::<i32>::new(2));
let positive = builder.add(TakeWhile::<i32>::new(|item| *item > 0));
let decrement = builder.add(MapStage::new(|item: i32| item - 1));
let custom = builder.add(BufferedFlowOnPull {
state: Arc::new(Mutex::new(BufferedFlowState::default())),
});
builder.connect(merge.outlet(), broadcast.inlet())?;
builder.connect(broadcast.outlet(1)?, positive.inlet())?;
builder.connect(positive.outlet(), decrement.inlet())?;
builder.connect(decrement.outlet(), custom.inlet())?;
builder.connect(custom.outlet(), merge.preferred())?;
Ok(FlowShape::new(merge.secondary(0)?, broadcast.outlet(0)?))
})
.unwrap();
let typed = graph.run_with_input_report_mode(vec![4], CYCLE_LIMIT, ExecutorMode::TypedOnly);
assert!(
matches!(typed, Err(StreamError::GraphValidation(_))),
"custom opaque feedback stage should fall back: {typed:?}"
);
let erased = graph
.run_with_input_report_mode(vec![4], CYCLE_LIMIT, ExecutorMode::ErasedOnly)
.map(|r| r.output);
let auto = graph
.run_with_input_report_mode(vec![4], CYCLE_LIMIT, ExecutorMode::Auto)
.map(|r| r.output);
assert_eq!(erased, auto, "auto must match erased on fallback");
}
#[test]
fn cyclic_typed_matches_erased_randomized() {
let graph = cyclic_feedback_i32(16, OverflowStrategy::Backpressure);
let mut state: u64 = 0x9E37_79B9_7F4A_7C15;
let mut next = || {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
state
};
for _ in 0..200 {
let len = (next() % 6) as usize;
let input: Vec<i32> = (0..len).map(|_| (next() % 20) as i32 - 5).collect();
assert_cyclic_equiv_i32(&graph, input);
}
}
#[test]
fn partition_holds_routed_element_until_target_outlet_pulls() {
let graph = GraphDsl::create(|builder| {
builder.add(Partition::<i32>::new(2, |value| usize::from(*value >= 10)))
})
.unwrap();
let stage_index = 0usize;
let inlet = graph.shape.inlet().id();
let out0 = graph.shape.outlet(0).unwrap().id();
let out1 = graph.shape.outlet(1).unwrap().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
executor.process_pull(stage_index, out0).unwrap();
let transition = executor
.process_stage(stage_index, inlet, datum(11_i32))
.unwrap();
assert!(matches!(transition.emissions, StageEmissions::None));
let pull_transition = executor.process_pull(stage_index, out1).unwrap();
match pull_transition.emissions {
StageEmissions::One(port, value) => {
assert_eq!(port, out1);
assert_eq!(
downcast_datum::<i32, _>(value, "emit", || "Partition.out1").unwrap(),
11
);
}
_ => panic!("expected one pending partition emission"),
}
}
#[test]
fn partition_cancels_upstream_only_after_all_outlets_cancel_when_not_eager() {
let graph = GraphDsl::create(|builder| {
builder.add(Partition::<i32>::new(2, |value| usize::from(*value >= 10)))
})
.unwrap();
let stage_index = 0usize;
let out0 = graph.shape.outlet(0).unwrap().id();
let out1 = graph.shape.outlet(1).unwrap().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
let first = executor
.process_downstream_finish(stage_index, out0)
.unwrap();
assert!(first.cancelled_inlets.is_empty());
let second = executor
.process_downstream_finish(stage_index, out1)
.unwrap();
assert_eq!(second.cancelled_inlets, vec![graph.shape.inlet().id()]);
}
#[test]
fn unzip_continues_emitting_to_live_outlet_after_peer_cancels() {
let graph =
GraphDsl::create(|builder| builder.add(Unzip::<i32, &'static str>::new())).unwrap();
let stage_index = 0usize;
let inlet = graph.shape.inlet().id();
let out0 = graph.shape.out0().id();
let out1 = graph.shape.out1().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
executor.process_pull(stage_index, out0).unwrap();
executor.process_pull(stage_index, out1).unwrap();
let cancel = executor
.process_downstream_finish(stage_index, out1)
.unwrap();
assert!(cancel.cancelled_inlets.is_empty());
let transition = executor
.process_stage(stage_index, inlet, datum((7_i32, "seven")))
.unwrap();
match transition.emissions {
StageEmissions::One(port, value) => {
assert_eq!(port, out0);
assert_eq!(
downcast_datum::<i32, _>(value, "emit", || "Unzip.out0").unwrap(),
7
);
}
StageEmissions::Many(values) => {
assert_eq!(values.len(), 1);
assert_eq!(values[0].0, out0);
}
_ => panic!("expected emission to the remaining live unzip outlet"),
}
}
#[test]
fn unzip_cancels_upstream_only_after_both_outlets_cancel() {
let graph =
GraphDsl::create(|builder| builder.add(Unzip::<i32, &'static str>::new())).unwrap();
let stage_index = 0usize;
let out0 = graph.shape.out0().id();
let out1 = graph.shape.out1().id();
let mut executor = FusedExecutor::new(&graph, FusedExecutionConfig::default());
let first = executor
.process_downstream_finish(stage_index, out0)
.unwrap();
assert!(first.cancelled_inlets.is_empty());
let second = executor
.process_downstream_finish(stage_index, out1)
.unwrap();
assert_eq!(second.cancelled_inlets, vec![graph.shape.inlet().id()]);
}
#[test]
fn opaque_internal_outlet_repulls_after_first_emission() {
let graph = GraphDsl::try_create(|builder| {
let opaque = builder.add(EmitMultipleOnPush);
let identity = builder.add(Identity::<i32>::new());
builder.connect(opaque.outlet(), identity.inlet())?;
Ok(FlowShape::new(opaque.inlet(), identity.outlet()))
})
.unwrap();
assert_eq!(graph.run_with_input([10]).unwrap(), vec![1, 2]);
}
#[test]
fn executor_mode_auto_erased_identical_typed_errors() {
let graph = GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<i32>::new(2));
let merge = builder.add(Merge::<i32>::new(2));
builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
})
.unwrap();
let input = vec![1, 2, 3];
let auto_result = graph
.run_with_input_mode(input.clone(), ExecutorMode::Auto)
.unwrap();
let erased_result = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
assert_eq!(auto_result, erased_result);
assert_eq!(auto_result.len(), input.len() * 2);
let typed_result = graph.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly);
assert!(
matches!(
typed_result,
Err(StreamError::GraphValidation(ref msg))
if msg.contains("typed executor does not support this graph shape")
),
"expected TypedOnly to error for junction graph, got: {typed_result:?}"
);
}
fn identity_chain_bp(n: usize) -> GraphBlueprint<FlowShape<i64, i64>> {
assert!(n >= 1);
GraphDsl::try_create(|builder| {
let first = builder.add(Identity::<i64>::new());
let inlet = first.inlet();
let mut outlet = first.outlet();
for _ in 1..n {
let next = builder.add(Identity::<i64>::new());
builder.connect(outlet, next.inlet())?;
outlet = next.outlet();
}
Ok(FlowShape::new(inlet, outlet))
})
.unwrap()
}
fn map_chain_bp(n: usize) -> GraphBlueprint<FlowShape<i64, i64>> {
assert!(n >= 1);
GraphDsl::try_create(|builder| {
let first = builder.add(MapStage::new(|x: i64| x.wrapping_mul(2)));
let inlet = first.inlet();
let mut outlet = first.outlet();
for _ in 1..n {
let next = builder.add(MapStage::new(|x: i64| x.wrapping_mul(2)));
builder.connect(outlet, next.inlet())?;
outlet = next.outlet();
}
Ok(FlowShape::new(inlet, outlet))
})
.unwrap()
}
#[test]
fn typed_erased_equivalence_identity_collect() {
let graph = identity_chain_bp(5);
let input: Vec<i64> = (0..20).collect();
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
assert_eq!(
typed, erased,
"typed and erased paths disagree on identity×5"
);
}
#[test]
fn typed_erased_equivalence_identity_count() {
let graph = identity_chain_bp(5);
let input: Vec<i64> = (0..20).collect();
let config = FusedExecutionConfig::default();
let erased = graph
.run_count_with_input_report_mode(input.clone(), config, ExecutorMode::ErasedOnly)
.unwrap()
.result;
let typed = graph
.run_count_with_input_report_mode(input.clone(), config, ExecutorMode::TypedOnly)
.unwrap()
.result;
assert_eq!(typed, erased, "typed and erased count differ on identity×5");
}
#[test]
fn typed_erased_equivalence_map_fold() {
let graph = map_chain_bp(5);
let input: Vec<i64> = (1..=10).collect();
let config = FusedExecutionConfig::default();
let erased = graph
.run_fold_with_input_report_mode(
input.clone(),
0i64,
|acc, x| acc.wrapping_add(x),
config,
ExecutorMode::ErasedOnly,
)
.unwrap()
.result;
let typed = graph
.run_fold_with_input_report_mode(
input.clone(),
0i64,
|acc, x| acc.wrapping_add(x),
config,
ExecutorMode::TypedOnly,
)
.unwrap()
.result;
assert_eq!(typed, erased, "typed and erased fold differ on map×5");
}
#[test]
fn typed_erased_equivalence_map_collect() {
let graph = map_chain_bp(5);
let input: Vec<i64> = (0..20).collect();
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
assert_eq!(typed, erased, "typed and erased paths disagree on map×5");
}
#[test]
fn typed_only_errors_on_junction_graph() {
let graph = GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<i32>::new(2));
let merge = builder.add(Merge::<i32>::new(2));
builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
})
.unwrap();
let result = graph.run_with_input_mode(vec![1], ExecutorMode::TypedOnly);
assert!(
matches!(
result,
Err(StreamError::GraphValidation(ref msg))
if msg.contains("typed executor does not support this graph shape")
),
"expected TypedOnly to error on junction, got: {result:?}"
);
}
#[test]
fn auto_falls_back_silently_for_junction_graph() {
let graph = GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<i32>::new(2));
let merge = builder.add(Merge::<i32>::new(2));
builder.connect(broadcast.outlet(0)?, merge.inlet(0)?)?;
builder.connect(broadcast.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(broadcast.inlet(), merge.outlet()))
})
.unwrap();
let result = graph.run_with_input_mode(vec![1, 2, 3], ExecutorMode::Auto);
assert!(result.is_ok(), "Auto should succeed (fallback to erased)");
assert_eq!(result.unwrap().len(), 6); }
fn merge_sequence_graph() -> GraphBlueprint<FlowShape<(u64, u64), u64>> {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeSequence::<u64>::new(2, |item| *item));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
}
#[test]
fn typed_only_accepts_merge_sequence_topology() {
let graph = merge_sequence_graph();
let result =
graph.run_with_input_mode(vec![(0u64, 1u64), (2u64, 3u64)], ExecutorMode::TypedOnly);
assert!(
result.is_ok(),
"TypedOnly should accept Unzip→MergeSequence topology, got: {result:?}"
);
}
#[test]
fn typed_erased_equivalence_merge_sequence_in_order() {
let graph = merge_sequence_graph();
let input: Vec<(u64, u64)> = (0..10).step_by(2).map(|i| (i, i + 1)).collect();
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
assert_eq!(
typed, erased,
"typed and erased disagree on in-order merge_sequence"
);
let expected: Vec<u64> = (0..10).collect();
assert_eq!(typed, expected);
}
#[test]
fn typed_erased_equivalence_merge_sequence_out_of_order() {
let graph = merge_sequence_graph();
let input: Vec<(u64, u64)> = vec![(1, 0), (3, 2), (5, 4)];
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
assert_eq!(
typed, erased,
"typed and erased disagree on out-of-order merge_sequence"
);
assert_eq!(typed, vec![0u64, 1, 2, 3, 4, 5]);
}
#[test]
fn typed_erased_equivalence_merge_sequence_gap_failure() {
let graph = merge_sequence_graph();
let input = vec![(1u64, 2u64)];
let erased = graph.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly);
let typed = graph.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly);
assert!(
matches!(&erased, Err(StreamError::Failed(msg)) if msg.contains("expected sequence")),
"ErasedOnly should fail on gap: {erased:?}"
);
assert!(
matches!(&typed, Err(StreamError::Failed(msg)) if msg.contains("expected sequence")),
"TypedOnly should fail on gap: {typed:?}"
);
}
#[test]
fn typed_erased_equivalence_merge_sequence_completion() {
let graph = merge_sequence_graph();
let input = vec![(0u64, 1u64)];
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
assert_eq!(typed, erased, "typed and erased disagree on completion");
assert_eq!(typed, vec![0u64, 1u64]);
}
#[test]
fn auto_selects_typed_for_merge_sequence_topology() {
let graph = merge_sequence_graph();
let input: Vec<(u64, u64)> = (0..20).step_by(2).map(|i| (i, i + 1)).collect();
let auto_result = graph
.run_with_input_mode(input.clone(), ExecutorMode::Auto)
.unwrap();
let erased_result = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
assert_eq!(
auto_result, erased_result,
"Auto and ErasedOnly disagree on merge_sequence topology"
);
}
fn merge_latest_graph_exec() -> GraphBlueprint<FlowShape<(u64, u64), Vec<u64>>> {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeLatest::<u64>::new(2, false));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
}
fn merge_latest_eager_graph_exec() -> GraphBlueprint<FlowShape<(i32, i32), Vec<i32>>> {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<i32, i32>::new());
let merge = builder.add(MergeLatest::<i32>::new(2, true));
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
}
#[test]
fn typed_only_accepts_merge_latest_topology() {
let graph = merge_latest_graph_exec();
let result =
graph.run_with_input_mode(vec![(0u64, 1u64), (2u64, 3u64)], ExecutorMode::TypedOnly);
assert!(
result.is_ok(),
"TypedOnly should accept Unzip→MergeLatest topology, got: {result:?}"
);
}
#[test]
fn typed_erased_equivalence_merge_latest_snapshot_ordering() {
let graph = merge_latest_graph_exec();
let input: Vec<(u64, u64)> = (0..10).map(|i| (i, i + 100)).collect();
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
assert_eq!(
typed, erased,
"typed and erased disagree on snapshot ordering"
);
assert!(
typed.iter().all(|s| s.len() == 2),
"snapshots must have len 2"
);
}
#[test]
fn typed_erased_equivalence_merge_latest_partial_fill() {
let graph = merge_latest_graph_exec();
let input = vec![(5u64, 42u64)];
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
assert_eq!(typed, erased, "typed and erased disagree on partial-fill");
assert_eq!(typed.len(), 1, "expected exactly one snapshot");
}
#[test]
fn typed_erased_equivalence_merge_latest_eager_complete() {
let graph_eager = merge_latest_eager_graph_exec();
let input = vec![(1i32, 10i32)];
let erased = graph_eager
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph_eager
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
assert_eq!(
typed, erased,
"typed and erased disagree on eager-complete behavior"
);
assert!(
!typed.is_empty(),
"eager-complete graph should produce at least one snapshot"
);
}
#[test]
fn typed_erased_equivalence_merge_latest_completion() {
let graph = merge_latest_graph_exec();
let input = vec![(0u64, 1u64)];
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
assert_eq!(typed, erased, "typed and erased disagree on completion");
}
#[test]
fn auto_selects_typed_for_merge_latest_topology() {
let graph = merge_latest_graph_exec();
let input: Vec<(u64, u64)> = (0..20).map(|i| (i, i + 1_000)).collect();
let auto_result = graph
.run_with_input_mode(input.clone(), ExecutorMode::Auto)
.unwrap();
let erased_result = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
assert_eq!(
auto_result, erased_result,
"Auto and ErasedOnly disagree on merge_latest topology"
);
}
fn broadcast_zip_graph_exec() -> GraphBlueprint<FlowShape<i64, (i64, i64)>> {
GraphDsl::try_create(|builder| {
let broadcast = builder.add(Broadcast::<i64>::new(2));
let zip = builder.add(Zip::<i64, i64>::new());
builder.connect(broadcast.outlet(0)?, zip.in0())?;
builder.connect(broadcast.outlet(1)?, zip.in1())?;
Ok(FlowShape::new(broadcast.inlet(), zip.outlet()))
})
.unwrap()
}
fn balance_merge_graph_exec() -> GraphBlueprint<FlowShape<i64, i64>> {
GraphDsl::try_create(|builder| {
let balance = builder.add(Balance::<i64>::new(2));
let merge = builder.add(Merge::<i64>::new(2));
builder.connect(balance.outlet(0)?, merge.inlet(0)?)?;
builder.connect(balance.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(balance.inlet(), merge.outlet()))
})
.unwrap()
}
fn partition_merge_graph_exec() -> GraphBlueprint<FlowShape<i64, i64>> {
GraphDsl::try_create(|builder| {
let partition = builder.add(Partition::<i64>::new(2, |item| {
item.unsigned_abs() as usize % 2
}));
let merge = builder.add(Merge::<i64>::new(2));
builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(partition.inlet(), merge.outlet()))
})
.unwrap()
}
fn unzip_zip_graph_exec() -> GraphBlueprint<FlowShape<i64, (i64, i64)>> {
GraphDsl::try_create(|builder| {
let unzip = builder.add(UnzipWith::<i64, i64, i64>::new(|item| (item, item + 10)));
let zip = builder.add(Zip::<i64, i64>::new());
builder.connect(unzip.out0(), zip.in0())?;
builder.connect(unzip.out1(), zip.in1())?;
Ok(FlowShape::new(unzip.inlet(), zip.outlet()))
})
.unwrap()
}
fn merge_sorted_graph_exec() -> GraphBlueprint<FlowShape<(u64, u64), u64>> {
GraphDsl::try_create(|builder| {
let unzip = builder.add(Unzip::<u64, u64>::new());
let merge = builder.add(MergeSorted::<u64>::new());
builder.connect(unzip.out0(), merge.inlet(0)?)?;
builder.connect(unzip.out1(), merge.inlet(1)?)?;
Ok(FlowShape::new(unzip.inlet(), merge.outlet()))
})
.unwrap()
}
#[test]
fn typed_erased_equivalence_broadcast_zip() {
let graph = broadcast_zip_graph_exec();
let input: Vec<i64> = (-3..=3).collect();
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
let auto = graph
.run_with_input_mode(input, ExecutorMode::Auto)
.unwrap();
assert_eq!(typed, erased, "typed and erased disagree on Broadcast->Zip");
assert_eq!(
auto, erased,
"Auto and ErasedOnly disagree on Broadcast->Zip"
);
assert_eq!(typed.first().copied(), Some((-3, -3)));
}
#[test]
fn typed_erased_equivalence_balance_merge() {
let graph = balance_merge_graph_exec();
let input: Vec<i64> = (0..32).collect();
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
let auto = graph
.run_with_input_mode(input, ExecutorMode::Auto)
.unwrap();
assert_eq!(typed, erased, "typed and erased disagree on Balance->Merge");
assert_eq!(
auto, erased,
"Auto and ErasedOnly disagree on Balance->Merge"
);
assert_eq!(typed.len(), 32);
}
#[test]
fn typed_erased_equivalence_partition_merge() {
let graph = partition_merge_graph_exec();
let input: Vec<i64> = (-12..12).collect();
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
let auto = graph
.run_with_input_mode(input, ExecutorMode::Auto)
.unwrap();
assert_eq!(
typed, erased,
"typed and erased disagree on Partition->Merge"
);
assert_eq!(
auto, erased,
"Auto and ErasedOnly disagree on Partition->Merge"
);
}
#[test]
fn typed_erased_equivalence_partition_merge_error() {
let graph = GraphDsl::try_create(|builder| {
let partition = builder.add(Partition::<i64>::new(2, |_| 2));
let merge = builder.add(Merge::<i64>::new(2));
builder.connect(partition.outlet(0)?, merge.inlet(0)?)?;
builder.connect(partition.outlet(1)?, merge.inlet(1)?)?;
Ok(FlowShape::new(partition.inlet(), merge.outlet()))
})
.unwrap();
let erased = graph.run_with_input_mode(vec![7], ExecutorMode::ErasedOnly);
let typed = graph.run_with_input_mode(vec![7], ExecutorMode::TypedOnly);
assert!(
matches!(&erased, Err(StreamError::Failed(msg)) if msg.contains("out-of-bounds")),
"ErasedOnly should fail on bad partitioner: {erased:?}"
);
assert!(
matches!(&typed, Err(StreamError::Failed(msg)) if msg.contains("out-of-bounds")),
"TypedOnly should fail on bad partitioner: {typed:?}"
);
}
#[test]
fn typed_erased_equivalence_unzip_zip() {
let graph = unzip_zip_graph_exec();
let input: Vec<i64> = (0..16).collect();
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
let auto = graph
.run_with_input_mode(input, ExecutorMode::Auto)
.unwrap();
assert_eq!(typed, erased, "typed and erased disagree on UnzipWith->Zip");
assert_eq!(
auto, erased,
"Auto and ErasedOnly disagree on UnzipWith->Zip"
);
assert_eq!(typed[0], (0, 10));
}
#[test]
fn typed_erased_equivalence_merge_sorted() {
let graph = merge_sorted_graph_exec();
let input: Vec<(u64, u64)> = (0..20).step_by(2).map(|item| (item, item + 1)).collect();
let erased = graph
.run_with_input_mode(input.clone(), ExecutorMode::ErasedOnly)
.unwrap();
let typed = graph
.run_with_input_mode(input.clone(), ExecutorMode::TypedOnly)
.unwrap();
let auto = graph
.run_with_input_mode(input, ExecutorMode::Auto)
.unwrap();
assert_eq!(
typed, erased,
"typed and erased disagree on Unzip->MergeSorted"
);
assert_eq!(
auto, erased,
"Auto and ErasedOnly disagree on Unzip->MergeSorted"
);
assert_eq!(typed, (0..20).collect::<Vec<_>>());
}
#[test]
fn typed_erased_equivalence_prioritized_merge_helper() {
let graph =
GraphDsl::create(|builder| builder.add(MergePrioritized::<i64>::new(vec![2, 1])))
.unwrap();
let inputs = vec![vec![1, 2, 3, 4], vec![100, 101]];
let erased = graph
.run_fan_in_report_mode(
inputs.clone(),
FusedExecutionConfig::default(),
ExecutorMode::ErasedOnly,
)
.unwrap();
let typed = graph
.run_fan_in_report_mode(
inputs.clone(),
FusedExecutionConfig::default(),
ExecutorMode::TypedOnly,
)
.unwrap();
let auto = graph
.run_fan_in_report_mode(inputs, FusedExecutionConfig::default(), ExecutorMode::Auto)
.unwrap();
assert_eq!(
typed, erased,
"typed and erased disagree on MergePrioritized"
);
assert_eq!(
auto, erased,
"Auto and ErasedOnly disagree on MergePrioritized"
);
assert_eq!(typed.output, vec![1, 2, 100, 3, 4, 101]);
}
#[test]
fn typed_erased_equivalence_merge_preferred_helper() {
let graph = GraphDsl::create(|builder| builder.add(MergePreferred::<i64>::new(2))).unwrap();
let preferred = vec![1, 2, 3];
let secondary = vec![vec![100, 101], vec![200, 201]];
let erased = graph
.run_merge_preferred_report_mode(
preferred.clone(),
secondary.clone(),
FusedExecutionConfig::default(),
ExecutorMode::ErasedOnly,
)
.unwrap();
let typed = graph
.run_merge_preferred_report_mode(
preferred.clone(),
secondary.clone(),
FusedExecutionConfig::default(),
ExecutorMode::TypedOnly,
)
.unwrap();
let auto = graph
.run_merge_preferred_report_mode(
preferred,
secondary,
FusedExecutionConfig::default(),
ExecutorMode::Auto,
)
.unwrap();
assert_eq!(typed, erased, "typed and erased disagree on MergePreferred");
assert_eq!(
auto, erased,
"Auto and ErasedOnly disagree on MergePreferred"
);
assert_eq!(typed.output, vec![1, 2, 3, 100, 200, 101, 201]);
}
#[test]
fn typed_erased_equivalence_concat_helper() {
let graph = GraphDsl::create(|builder| builder.add(Concat::<i64>::new(3))).unwrap();
let inputs = vec![vec![1, 2], vec![], vec![3, 4]];
let erased = graph
.run_concat_report_mode(
inputs.clone(),
FusedExecutionConfig::default(),
ExecutorMode::ErasedOnly,
)
.unwrap();
let typed = graph
.run_concat_report_mode(
inputs.clone(),
FusedExecutionConfig::default(),
ExecutorMode::TypedOnly,
)
.unwrap();
let auto = graph
.run_concat_report_mode(inputs, FusedExecutionConfig::default(), ExecutorMode::Auto)
.unwrap();
assert_eq!(typed, erased, "typed and erased disagree on Concat");
assert_eq!(auto, erased, "Auto and ErasedOnly disagree on Concat");
assert_eq!(typed.output, vec![1, 2, 3, 4]);
}
#[test]
fn typed_erased_equivalence_interleave_helper() {
let graph = GraphDsl::create(|builder| builder.add(Interleave::<i64>::new(3, 2))).unwrap();
let inputs = vec![vec![1, 2, 3], vec![10, 11, 12], vec![20]];
let erased = graph
.run_interleave_report_mode(
inputs.clone(),
2,
false,
FusedExecutionConfig::default(),
ExecutorMode::ErasedOnly,
)
.unwrap();
let typed = graph
.run_interleave_report_mode(
inputs.clone(),
2,
false,
FusedExecutionConfig::default(),
ExecutorMode::TypedOnly,
)
.unwrap();
let auto = graph
.run_interleave_report_mode(
inputs,
2,
false,
FusedExecutionConfig::default(),
ExecutorMode::Auto,
)
.unwrap();
assert_eq!(typed, erased, "typed and erased disagree on Interleave");
assert_eq!(auto, erased, "Auto and ErasedOnly disagree on Interleave");
assert_eq!(typed.output, vec![1, 2, 10, 11, 20, 3, 12]);
}
#[test]
fn typed_erased_equivalence_interleave_eager_close_helper() {
let graph = GraphDsl::create(|builder| {
builder.add(Interleave::<i64>::new_with_eager_close(2, 1, true))
})
.unwrap();
let inputs = vec![vec![1], vec![10, 11]];
let erased = graph
.run_interleave_report_mode(
inputs.clone(),
1,
true,
FusedExecutionConfig::default(),
ExecutorMode::ErasedOnly,
)
.unwrap();
let typed = graph
.run_interleave_report_mode(
inputs.clone(),
1,
true,
FusedExecutionConfig::default(),
ExecutorMode::TypedOnly,
)
.unwrap();
let auto = graph
.run_interleave_report_mode(
inputs,
1,
true,
FusedExecutionConfig::default(),
ExecutorMode::Auto,
)
.unwrap();
assert_eq!(
typed, erased,
"typed and erased disagree on Interleave eager close"
);
assert_eq!(
auto, erased,
"Auto and ErasedOnly disagree on Interleave eager close"
);
assert_eq!(typed.output, vec![1, 10]);
}
#[test]
fn typed_erased_equivalence_helper_event_limit_failures() {
let config = FusedExecutionConfig { event_limit: 1 };
let prioritized =
GraphDsl::create(|builder| builder.add(MergePrioritized::<i64>::new(vec![2, 1])))
.unwrap();
let prioritized_inputs = vec![vec![1], vec![10]];
let erased = prioritized.run_fan_in_report_mode(
prioritized_inputs.clone(),
config,
ExecutorMode::ErasedOnly,
);
let typed = prioritized.run_fan_in_report_mode(
prioritized_inputs.clone(),
config,
ExecutorMode::TypedOnly,
);
let auto =
prioritized.run_fan_in_report_mode(prioritized_inputs, config, ExecutorMode::Auto);
assert_eq!(typed, erased);
assert_eq!(auto, erased);
let preferred =
GraphDsl::create(|builder| builder.add(MergePreferred::<i64>::new(1))).unwrap();
let erased = preferred.run_merge_preferred_report_mode(
vec![1],
vec![vec![10]],
config,
ExecutorMode::ErasedOnly,
);
let typed = preferred.run_merge_preferred_report_mode(
vec![1],
vec![vec![10]],
config,
ExecutorMode::TypedOnly,
);
let auto = preferred.run_merge_preferred_report_mode(
vec![1],
vec![vec![10]],
config,
ExecutorMode::Auto,
);
assert_eq!(typed, erased);
assert_eq!(auto, erased);
let concat = GraphDsl::create(|builder| builder.add(Concat::<i64>::new(2))).unwrap();
let concat_inputs = vec![vec![1], vec![10]];
let erased =
concat.run_concat_report_mode(concat_inputs.clone(), config, ExecutorMode::ErasedOnly);
let typed =
concat.run_concat_report_mode(concat_inputs.clone(), config, ExecutorMode::TypedOnly);
let auto = concat.run_concat_report_mode(concat_inputs, config, ExecutorMode::Auto);
assert_eq!(typed, erased);
assert_eq!(auto, erased);
let interleave =
GraphDsl::create(|builder| builder.add(Interleave::<i64>::new(2, 1))).unwrap();
let interleave_inputs = vec![vec![1], vec![10]];
let erased = interleave.run_interleave_report_mode(
interleave_inputs.clone(),
1,
false,
config,
ExecutorMode::ErasedOnly,
);
let typed = interleave.run_interleave_report_mode(
interleave_inputs.clone(),
1,
false,
config,
ExecutorMode::TypedOnly,
);
let auto = interleave.run_interleave_report_mode(
interleave_inputs,
1,
false,
config,
ExecutorMode::Auto,
);
assert_eq!(typed, erased);
assert_eq!(auto, erased);
}
#[test]
fn merge_latest_blueprint_sequential_reuse_is_independent() {
let graph = merge_latest_graph_exec();
let input_a: Vec<(u64, u64)> = (0..5).map(|i| (i, i + 100)).collect();
let input_b: Vec<(u64, u64)> = (10..15).map(|i| (i, i + 200)).collect();
let result_a_typed = graph
.run_with_input_mode(input_a.clone(), ExecutorMode::TypedOnly)
.unwrap();
let result_b_typed = graph
.run_with_input_mode(input_b.clone(), ExecutorMode::TypedOnly)
.unwrap();
let result_a_erased = graph
.run_with_input_mode(input_a, ExecutorMode::ErasedOnly)
.unwrap();
let result_b_erased = graph
.run_with_input_mode(input_b, ExecutorMode::ErasedOnly)
.unwrap();
assert_eq!(
result_a_typed, result_a_erased,
"sequential run A: typed and erased disagree"
);
assert_eq!(
result_b_typed, result_b_erased,
"sequential run B: typed and erased disagree"
);
assert_ne!(
result_a_typed, result_b_typed,
"runs A and B should differ (different inputs)"
);
}
#[test]
fn merge_latest_blueprint_concurrent_reuse_is_independent() {
use std::sync::Arc as StdArc;
let graph = StdArc::new(merge_latest_graph_exec());
let input_a: Vec<(u64, u64)> = (0..50).map(|i| (i, i + 1_000)).collect();
let input_b: Vec<(u64, u64)> = (100..150).map(|i| (i, i + 2_000)).collect();
let graph_a = StdArc::clone(&graph);
let graph_b = StdArc::clone(&graph);
let ia = input_a.clone();
let ib = input_b.clone();
let handle_a =
std::thread::spawn(move || graph_a.run_with_input_mode(ia, ExecutorMode::TypedOnly));
let handle_b =
std::thread::spawn(move || graph_b.run_with_input_mode(ib, ExecutorMode::TypedOnly));
let result_a = handle_a.join().expect("thread A panicked").unwrap();
let result_b = handle_b.join().expect("thread B panicked").unwrap();
let ref_a = graph
.run_with_input_mode(input_a, ExecutorMode::ErasedOnly)
.unwrap();
let ref_b = graph
.run_with_input_mode(input_b, ExecutorMode::ErasedOnly)
.unwrap();
assert_eq!(
result_a, ref_a,
"concurrent run A: typed and erased disagree"
);
assert_eq!(
result_b, ref_b,
"concurrent run B: typed and erased disagree"
);
assert_ne!(result_a, result_b, "concurrent runs must be independent");
}
}
enum AsyncLinearMessage<T> {
Item(T),
Done,
Failed(StreamError),
}
enum RactorBoundaryCommand {
Run,
}
#[cfg(feature = "cluster")]
impl ractor::Message for RactorBoundaryCommand {}
#[cfg(test)]
fn run_threaded_async_linear_count<I, T>(
input: I,
segments: TypedLinearSegments<T>,
config: AsyncBoundaryExecutionConfig,
) -> StreamResult<FusedTerminalReport<usize>>
where
I: IntoIterator<Item = T> + Send,
I::IntoIter: Send,
T: Send + 'static,
{
let channels = segments.segments.len() + 1;
let mut senders = Vec::with_capacity(channels);
let mut receivers = Vec::with_capacity(channels);
for _ in 0..channels {
let (sender, receiver) = mpsc::sync_channel(config.buffer_size);
senders.push(sender);
receivers.push(Some(receiver));
}
let first_sender = senders
.first()
.expect("at least one async segment channel")
.clone();
let mut final_receiver = Some(
receivers
.last_mut()
.expect("at least one async segment channel")
.take()
.expect("final receiver is present"),
);
let events = AtomicUsize::new(0);
let async_boundary_crossings = AtomicUsize::new(0);
let result = thread::scope(|scope| {
let source = scope.spawn(move || feed_threaded_async_linear_input(input, first_sender));
let mut workers = Vec::with_capacity(segments.segments.len());
for (index, steps) in segments.segments.iter().enumerate() {
let input = receivers[index].take().expect("worker receiver is present");
let output = senders[index + 1].clone();
let has_boundary_after = index + 1 < segments.segments.len();
let events = &events;
let async_boundary_crossings = &async_boundary_crossings;
workers.push(scope.spawn(move || {
run_threaded_async_linear_segment(
steps,
input,
output,
has_boundary_after,
events,
async_boundary_crossings,
config,
)
}));
}
drop(senders);
let final_rx = final_receiver.take().expect("final receiver present");
let mut count = 0;
let mut terminal_error = None;
loop {
match final_rx.recv() {
Ok(AsyncLinearMessage::Item(_)) => count += 1,
Ok(AsyncLinearMessage::Done) => break,
Ok(AsyncLinearMessage::Failed(error)) => {
terminal_error = Some(error);
break;
}
Err(_) => {
terminal_error = Some(StreamError::AbruptTermination);
break;
}
}
}
drop(final_rx);
let mut worker_error = join_threaded_async_linear_worker(source)?;
for worker in workers {
if worker_error.is_none() {
worker_error = join_threaded_async_linear_worker(worker)?;
} else {
let _ = join_threaded_async_linear_worker(worker);
}
}
match (terminal_error, worker_error) {
(Some(error), _) if error != StreamError::AbruptTermination => return Err(error),
(_, Some(error)) => return Err(error),
(Some(error), None) => return Err(error),
(None, None) => {}
}
Ok(count)
});
Ok(FusedTerminalReport {
result: result?,
events: events.load(Ordering::Relaxed),
async_boundary_crossings: async_boundary_crossings.load(Ordering::Relaxed),
})
}
fn run_ractor_async_linear_count<I, T>(
input: I,
segments: TypedLinearSegments<T>,
config: AsyncBoundaryExecutionConfig,
) -> StreamResult<FusedTerminalReport<usize>>
where
I: IntoIterator<Item = T> + Send,
I::IntoIter: Send + 'static,
T: Send + 'static,
{
if config.buffer_size == 0 {
return Err(StreamError::GraphValidation(
"ractor async boundary execution requires buffer_size greater than zero".into(),
));
}
let input = input.into_iter();
let runtime = ractor_boundary_runtime()?;
if tokio::runtime::Handle::try_current().is_ok() {
thread::scope(|scope| {
let handle = scope.spawn(move || {
runtime.block_on(run_ractor_async_linear_count_on_runtime(
input, segments, config,
))
});
handle.join().map_err(|_| {
StreamError::Failed("ractor async boundary runtime thread panicked".into())
})?
})
} else {
runtime.block_on(run_ractor_async_linear_count_on_runtime(
input, segments, config,
))
}
}
fn ractor_boundary_runtime() -> StreamResult<&'static tokio::runtime::Runtime> {
static RUNTIME: OnceLock<Result<tokio::runtime::Runtime, String>> = OnceLock::new();
match RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.build()
.map_err(|error| format!("ractor async boundary runtime failed to start: {error}"))
}) {
Ok(runtime) => Ok(runtime),
Err(error) => Err(StreamError::Failed(error.clone())),
}
}
async fn run_ractor_async_linear_count_on_runtime<I, T>(
input: I,
segments: TypedLinearSegments<T>,
config: AsyncBoundaryExecutionConfig,
) -> StreamResult<FusedTerminalReport<usize>>
where
I: Iterator<Item = T> + Send + 'static,
T: Send + 'static,
{
let channels = segments.segments.len() + 1;
let mut senders = Vec::with_capacity(channels);
let mut receivers = Vec::with_capacity(channels);
for _ in 0..channels {
let (sender, receiver) = ractor::concurrency::mpsc_bounded(config.buffer_size);
senders.push(sender);
receivers.push(Some(receiver));
}
let first_sender = senders
.first()
.expect("at least one async segment channel")
.clone();
let mut final_receiver = receivers
.last_mut()
.expect("at least one async segment channel")
.take()
.expect("final receiver is present");
let events = Arc::new(AtomicUsize::new(0));
let async_boundary_crossings = Arc::new(AtomicUsize::new(0));
let (source_ref, source_handle) = Actor::spawn(
None,
RactorLinearSourceActor::<I, T>::new(),
RactorLinearSourceState {
input: Some(input),
output: first_sender,
},
)
.await
.map_err(ractor_spawn_error)?;
let mut actors = Vec::with_capacity(segments.segments.len() + 1);
actors.push((source_ref, source_handle));
for (index, steps) in segments.segments.into_iter().enumerate() {
let input = receivers[index].take().expect("worker receiver is present");
let output = senders[index + 1].clone();
let has_boundary_after = index + 1 < channels - 1;
let (worker_ref, worker_handle) = match Actor::spawn(
None,
RactorLinearSegmentActor::<T>::new(),
RactorLinearSegmentState {
steps,
input,
output,
has_boundary_after,
events: Arc::clone(&events),
async_boundary_crossings: Arc::clone(&async_boundary_crossings),
config,
},
)
.await
{
Ok(actor) => actor,
Err(error) => {
let error = ractor_spawn_error(error);
stop_ractor_async_linear_actors(&actors);
let _ = join_ractor_async_linear_actors(actors).await;
return Err(error);
}
};
actors.push((worker_ref, worker_handle));
}
drop(senders);
let mut start_error = None;
for (actor, _) in &actors {
if actor.send_message(RactorBoundaryCommand::Run).is_err() {
start_error = Some(StreamError::AbruptTermination);
break;
}
}
if let Some(error) = start_error {
stop_ractor_async_linear_actors(&actors);
let _ = join_ractor_async_linear_actors(actors).await;
return Err(error);
}
let mut count = 0;
let mut terminal_error = None;
loop {
match final_receiver.recv().await {
Some(AsyncLinearMessage::Item(_)) => count += 1,
Some(AsyncLinearMessage::Done) => break,
Some(AsyncLinearMessage::Failed(error)) => {
terminal_error = Some(error);
break;
}
None => {
terminal_error = Some(StreamError::AbruptTermination);
break;
}
}
}
drop(final_receiver);
stop_ractor_async_linear_actors(&actors);
let actor_error = join_ractor_async_linear_actors(actors).await;
match (terminal_error, actor_error) {
(Some(error), _) if error != StreamError::AbruptTermination => return Err(error),
(_, Some(error)) => return Err(error),
(Some(error), None) => return Err(error),
(None, None) => {}
}
Ok(FusedTerminalReport {
result: count,
events: events.load(Ordering::Relaxed),
async_boundary_crossings: async_boundary_crossings.load(Ordering::Relaxed),
})
}
struct RactorLinearSourceActor<I, T> {
_marker: PhantomData<fn() -> (I, T)>,
}
impl<I, T> RactorLinearSourceActor<I, T> {
fn new() -> Self {
Self {
_marker: PhantomData,
}
}
}
struct RactorLinearSourceState<I, T> {
input: Option<I>,
output: ractor::concurrency::MpscSender<AsyncLinearMessage<T>>,
}
impl<I, T> Actor for RactorLinearSourceActor<I, T>
where
I: Iterator<Item = T> + Send + 'static,
T: Send + 'static,
{
type Msg = RactorBoundaryCommand;
type State = RactorLinearSourceState<I, T>;
type Arguments = RactorLinearSourceState<I, T>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(args)
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
RactorBoundaryCommand::Run => {
let input = state.input.take().ok_or_else(|| {
actor_processing_error(StreamError::GraphValidation(
"ractor async boundary source actor was run more than once".into(),
))
})?;
feed_ractor_async_linear_input(input, &state.output)
.await
.map_err(actor_processing_error)?;
myself.stop(None);
}
}
Ok(())
}
}
struct RactorLinearSegmentActor<T> {
_marker: PhantomData<fn() -> T>,
}
impl<T> RactorLinearSegmentActor<T> {
fn new() -> Self {
Self {
_marker: PhantomData,
}
}
}
struct RactorLinearSegmentState<T> {
steps: Vec<TypedLinearStep<T>>,
input: ractor::concurrency::MpscReceiver<AsyncLinearMessage<T>>,
output: ractor::concurrency::MpscSender<AsyncLinearMessage<T>>,
has_boundary_after: bool,
events: Arc<AtomicUsize>,
async_boundary_crossings: Arc<AtomicUsize>,
config: AsyncBoundaryExecutionConfig,
}
impl<T> Actor for RactorLinearSegmentActor<T>
where
T: Send + 'static,
{
type Msg = RactorBoundaryCommand;
type State = RactorLinearSegmentState<T>;
type Arguments = RactorLinearSegmentState<T>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(args)
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
RactorBoundaryCommand::Run => {
run_ractor_async_linear_segment(state)
.await
.map_err(actor_processing_error)?;
myself.stop(None);
}
}
Ok(())
}
}
async fn feed_ractor_async_linear_input<I, T>(
input: I,
output: &ractor::concurrency::MpscSender<AsyncLinearMessage<T>>,
) -> StreamResult<()>
where
I: Iterator<Item = T>,
{
for item in input {
output
.send(AsyncLinearMessage::Item(item))
.await
.map_err(|_| StreamError::AbruptTermination)?;
}
output
.send(AsyncLinearMessage::Done)
.await
.map_err(|_| StreamError::AbruptTermination)
}
async fn run_ractor_async_linear_segment<T>(
state: &mut RactorLinearSegmentState<T>,
) -> StreamResult<()>
where
T: Send + 'static,
{
loop {
match state.input.recv().await {
Some(AsyncLinearMessage::Item(item)) => {
let result =
run_async_linear_item(item, &state.steps, &state.events, state.config.fused)
.and_then(|item| {
if state.has_boundary_after {
bump_fused_event_atomic(&state.events, state.config.fused)?;
state
.async_boundary_crossings
.fetch_add(1, Ordering::Relaxed);
bump_fused_event_atomic(&state.events, state.config.fused)?;
}
Ok(item)
});
match result {
Ok(item) => state
.output
.send(AsyncLinearMessage::Item(item))
.await
.map_err(|_| StreamError::AbruptTermination)?,
Err(error) => {
let _ = state
.output
.send(AsyncLinearMessage::Failed(error.clone()))
.await;
return Err(error);
}
}
}
Some(AsyncLinearMessage::Done) => {
state
.output
.send(AsyncLinearMessage::Done)
.await
.map_err(|_| StreamError::AbruptTermination)?;
return Ok(());
}
Some(AsyncLinearMessage::Failed(error)) => {
let _ = state
.output
.send(AsyncLinearMessage::Failed(error.clone()))
.await;
return Err(error);
}
None => return Err(StreamError::AbruptTermination),
}
}
}
async fn join_ractor_async_linear_actor(
handle: ractor::concurrency::JoinHandle<()>,
) -> StreamResult<()> {
handle.await.map_err(|error| {
StreamError::Failed(format!("ractor async boundary actor task failed: {error}"))
})
}
fn stop_ractor_async_linear_actors(
actors: &[(
ActorRef<RactorBoundaryCommand>,
ractor::concurrency::JoinHandle<()>,
)],
) {
for (actor, _) in actors {
actor.stop(None);
}
}
#[cfg_attr(not(test), allow(dead_code))]
async fn join_ractor_async_linear_actors(
actors: Vec<(
ActorRef<RactorBoundaryCommand>,
ractor::concurrency::JoinHandle<()>,
)>,
) -> Option<StreamError> {
let mut actor_error = None;
for (_, handle) in actors {
let result = join_ractor_async_linear_actor(handle).await;
if actor_error.is_some() {
continue;
}
if let Err(error) = result {
actor_error = Some(error);
}
}
actor_error
}
fn ractor_spawn_error(error: ractor::SpawnErr) -> StreamError {
StreamError::Failed(format!(
"ractor async boundary actor failed to spawn: {error}"
))
}
fn actor_processing_error(error: StreamError) -> ActorProcessingErr {
Box::new(error)
}
#[cfg(test)]
fn feed_threaded_async_linear_input<I, T>(
input: I,
output: mpsc::SyncSender<AsyncLinearMessage<T>>,
) -> StreamResult<()>
where
I: IntoIterator<Item = T>,
{
for item in input {
output
.send(AsyncLinearMessage::Item(item))
.map_err(|_| StreamError::AbruptTermination)?;
}
output
.send(AsyncLinearMessage::Done)
.map_err(|_| StreamError::AbruptTermination)
}
#[cfg(test)]
fn run_threaded_async_linear_segment<T>(
steps: &[TypedLinearStep<T>],
input: mpsc::Receiver<AsyncLinearMessage<T>>,
output: mpsc::SyncSender<AsyncLinearMessage<T>>,
has_boundary_after: bool,
events: &AtomicUsize,
async_boundary_crossings: &AtomicUsize,
config: AsyncBoundaryExecutionConfig,
) -> StreamResult<()>
where
T: Send + 'static,
{
loop {
match input.recv().map_err(|_| StreamError::AbruptTermination)? {
AsyncLinearMessage::Item(item) => {
let result =
run_async_linear_item(item, steps, events, config.fused).and_then(|item| {
if has_boundary_after {
bump_fused_event_atomic(events, config.fused)?;
async_boundary_crossings.fetch_add(1, Ordering::Relaxed);
bump_fused_event_atomic(events, config.fused)?;
}
output
.send(AsyncLinearMessage::Item(item))
.map_err(|_| StreamError::AbruptTermination)
});
if let Err(error) = result {
let _ = output.send(AsyncLinearMessage::Failed(error.clone()));
return Err(error);
}
}
AsyncLinearMessage::Done => {
output
.send(AsyncLinearMessage::Done)
.map_err(|_| StreamError::AbruptTermination)?;
return Ok(());
}
AsyncLinearMessage::Failed(error) => {
let _ = output.send(AsyncLinearMessage::Failed(error.clone()));
return Err(error);
}
}
}
}
fn run_async_linear_item<T>(
mut item: T,
steps: &[TypedLinearStep<T>],
events: &AtomicUsize,
config: FusedExecutionConfig,
) -> StreamResult<T>
where
T: Send + 'static,
{
for step in steps {
bump_fused_event_atomic(events, config)?;
match step {
TypedLinearStep::Pass => {}
TypedLinearStep::Map(mapper) => {
item = mapper(item);
}
TypedLinearStep::AsyncBoundary => {
return Err(StreamError::GraphValidation(
"async boundary execution expects pre-split linear segments".into(),
));
}
}
bump_fused_event_atomic(events, config)?;
}
Ok(item)
}
#[cfg(test)]
fn join_threaded_async_linear_worker(
handle: thread::ScopedJoinHandle<'_, StreamResult<()>>,
) -> StreamResult<Option<StreamError>> {
match handle.join() {
Ok(Ok(())) => Ok(None),
Ok(Err(error)) => Ok(Some(error)),
Err(_) => Err(StreamError::Failed("async boundary worker panicked".into())),
}
}
fn bump_fused_event_atomic(events: &AtomicUsize, config: FusedExecutionConfig) -> StreamResult<()> {
let events = events.fetch_add(1, Ordering::Relaxed) + 1;
if events > config.event_limit {
return Err(StreamError::EventLimitExceeded {
limit: config.event_limit,
});
}
Ok(())
}
fn add_typed_helper_events(
events: &mut usize,
config: FusedExecutionConfig,
count: usize,
) -> StreamResult<()> {
let next = events
.checked_add(count)
.ok_or_else(|| StreamError::Failed("typed helper event count overflow".into()))?;
if next > config.event_limit {
return Err(StreamError::EventLimitExceeded {
limit: config.event_limit,
});
}
*events = next;
Ok(())
}
fn typed_fan_in_success_events(output_len: usize, input_count: usize) -> StreamResult<usize> {
output_len
.checked_mul(3)
.and_then(|events| events.checked_add(input_count))
.and_then(|events| events.checked_add(1))
.ok_or_else(|| StreamError::Failed("typed helper event count overflow".into()))
}
fn direct_single_fan_in_stage<'a, T>(
stages: &'a [super::builder::StageRecord],
edges: &[super::builder::Edge],
shape_inlets: &[AnyInlet],
shape_outlet: &AnyOutlet,
) -> Option<&'a super::builder::StageRecord>
where
T: 'static,
{
if stages.len() != 1 || !edges.is_empty() {
return None;
}
let stage = stages.first()?;
let element_type = TypeId::of::<T>();
if stage.spec.inlets.len() != shape_inlets.len()
|| stage.spec.outlets.len() != 1
|| stage.spec.outlets[0].id() != shape_outlet.id()
|| stage.spec.outlets[0].type_id() != element_type
|| shape_outlet.type_id() != element_type
|| stage
.spec
.inlets
.iter()
.map(AnyInlet::id)
.ne(shape_inlets.iter().map(AnyInlet::id))
|| stage
.spec
.inlets
.iter()
.any(|inlet| inlet.type_id() != element_type)
|| shape_inlets
.iter()
.any(|inlet| inlet.type_id() != element_type)
{
return None;
}
Some(stage)
}
fn run_typed_scheduled_fan_in<T>(
inputs: Vec<Vec<T>>,
schedule: &[usize],
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>>
where
T: Send + 'static,
{
let output_capacity = inputs.iter().map(Vec::len).sum();
let events = typed_fan_in_success_events(output_capacity, inputs.len())?;
let mut checked_events = 0;
add_typed_helper_events(&mut checked_events, config, events)?;
let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
let mut schedule_index = 0;
let mut output = Vec::with_capacity(output_capacity);
while output.len() < output_capacity {
let input_index = next_scheduled_input(&queues, schedule, &mut schedule_index)
.ok_or_else(|| StreamError::GraphValidation("no runnable fan-in input".into()))?;
let item = queues[input_index]
.next()
.expect("scheduled input had an item");
output.push(item);
}
Ok(FusedExecutionReport {
output,
events,
async_boundary_crossings: 0,
})
}
fn run_typed_concat<T>(
inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>>
where
T: Send + 'static,
{
let output_capacity = inputs.iter().map(Vec::len).sum();
let events = typed_fan_in_success_events(output_capacity, inputs.len())?;
let mut checked_events = 0;
add_typed_helper_events(&mut checked_events, config, events)?;
let mut output = Vec::with_capacity(output_capacity);
for input in inputs {
output.extend(input);
}
Ok(FusedExecutionReport {
output,
events,
async_boundary_crossings: 0,
})
}
fn run_typed_interleave<T>(
inputs: Vec<Vec<T>>,
segment_size: usize,
eager_close: bool,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>>
where
T: Send + 'static,
{
let output_capacity = inputs.iter().map(Vec::len).sum();
let mut events = 0;
if !eager_close {
let total_events = typed_fan_in_success_events(output_capacity, inputs.len())?;
add_typed_helper_events(&mut events, config, total_events)?;
}
let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
let mut completed = vec![false; queues.len()];
let mut output = Vec::with_capacity(output_capacity);
for (index, queue) in queues.iter().enumerate() {
if queue.len() == 0 {
completed[index] = true;
if eager_close {
add_typed_helper_events(&mut events, config, 2)?;
return Ok(FusedExecutionReport {
output,
events,
async_boundary_crossings: 0,
});
}
}
}
let mut current = 0usize;
while completed.iter().any(|done| !done) {
if completed[current] {
current = next_open_index(&completed, current)
.ok_or_else(|| StreamError::GraphValidation("no open interleave input".into()))?;
continue;
}
let mut emitted = 0usize;
while emitted < segment_size {
match queues[current].next() {
Some(item) => {
if eager_close {
add_typed_helper_events(&mut events, config, 3)?;
}
output.push(item);
emitted += 1;
}
None => {
completed[current] = true;
if eager_close {
add_typed_helper_events(&mut events, config, 2)?;
return Ok(FusedExecutionReport {
output,
events,
async_boundary_crossings: 0,
});
}
break;
}
}
}
if completed.iter().all(|done| *done) {
break;
}
current = next_open_index(&completed, current)
.ok_or_else(|| StreamError::GraphValidation("no open interleave input".into()))?;
}
Ok(FusedExecutionReport {
output,
events,
async_boundary_crossings: 0,
})
}
impl<T> GraphBlueprint<FanInShape<T, T>>
where
T: Clone + Send + 'static,
{
pub fn run_fan_in(&self, inputs: Vec<Vec<T>>) -> StreamResult<Vec<T>> {
Ok(self
.run_fan_in_report(inputs, FusedExecutionConfig::default())?
.output)
}
pub fn run_fan_in_report(
&self,
inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
self.run_fan_in_report_mode(inputs, config, ExecutorMode::Auto)
}
pub(crate) fn run_fan_in_report_mode(
&self,
inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
mode: ExecutorMode,
) -> StreamResult<FusedExecutionReport<T>> {
if inputs.len() != self.shape.inlet_count() {
return Err(StreamError::GraphValidation(format!(
"expected {} input streams, got {}",
self.shape.inlet_count(),
inputs.len()
)));
}
if mode != ExecutorMode::ErasedOnly {
if let Some(schedule) = self.typed_prioritized_fan_in_schedule() {
return run_typed_scheduled_fan_in(inputs, &schedule, config);
}
if mode == ExecutorMode::TypedOnly {
return Err(StreamError::GraphValidation(
"typed executor does not support this graph shape".into(),
));
}
}
self.run_fan_in_report_erased(inputs, config)
}
fn typed_prioritized_fan_in_schedule(&self) -> Option<Vec<usize>> {
let shape_inlets = self.shape.inlets();
let shape_outlet = self.shape.outlet().erase();
let stage = direct_single_fan_in_stage::<T>(
&self.stages,
&self.edges,
&shape_inlets,
&shape_outlet,
)?;
let StageKind::MergePrioritized { weights } = &stage.spec.kind else {
return None;
};
if weights.len() != self.shape.inlet_count() || weights.contains(&0) {
return None;
}
Some(
weights
.iter()
.enumerate()
.flat_map(|(index, weight)| std::iter::repeat_n(index, *weight))
.collect(),
)
}
fn run_fan_in_report_erased(
&self,
inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
let output_capacity = inputs.iter().map(Vec::len).sum();
let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
let schedule = self.fan_in_schedule();
let mut schedule_index = 0;
let outlet = self.shape.outlet().id();
let mut executor = FusedExecutor::new(self, config);
let mut output = Vec::with_capacity(output_capacity);
let mut completed = vec![false; queues.len()];
{
let mut output_sink = VecOutputSink {
output: &mut output,
};
for (index, queue) in queues.iter().enumerate() {
if queue.len() == 0 {
executor.complete(self.shape.inlet(index)?.id(), outlet, &mut output_sink)?;
completed[index] = true;
}
}
while queues.iter().any(|queue| queue.len() > 0) {
let input_index = next_scheduled_input(&queues, &schedule, &mut schedule_index)
.ok_or_else(|| {
StreamError::GraphValidation("no runnable fan-in input".into())
})?;
let item = queues[input_index]
.next()
.expect("scheduled input had an item");
executor.deliver(
self.shape.inlet(input_index)?.id(),
datum(item),
outlet,
&mut output_sink,
)?;
if queues[input_index].len() == 0 && !completed[input_index] {
executor.complete(
self.shape.inlet(input_index)?.id(),
outlet,
&mut output_sink,
)?;
completed[input_index] = true;
}
}
}
Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
fn fan_in_schedule(&self) -> Vec<usize> {
self.stages
.iter()
.find_map(|stage| match &stage.spec.kind {
StageKind::MergePrioritized { weights }
if weights.len() == self.shape.inlet_count()
&& stage.spec.outlets.len() == 1
&& stage.spec.outlets[0].id() == self.shape.outlet().id()
&& stage.spec.inlets.iter().map(AnyInlet::id).eq(self
.shape
.inlets()
.iter()
.map(|inlet| inlet.id())) =>
{
Some(
weights
.iter()
.enumerate()
.flat_map(|(index, weight)| std::iter::repeat_n(index, *weight))
.collect(),
)
}
_ => None,
})
.unwrap_or_else(|| (0..self.shape.inlet_count()).collect())
}
pub fn run_concat(&self, inputs: Vec<Vec<T>>) -> StreamResult<Vec<T>> {
Ok(self
.run_concat_report(inputs, FusedExecutionConfig::default())?
.output)
}
pub fn run_concat_report(
&self,
inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
self.run_concat_report_mode(inputs, config, ExecutorMode::Auto)
}
pub(crate) fn run_concat_report_mode(
&self,
inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
mode: ExecutorMode,
) -> StreamResult<FusedExecutionReport<T>> {
if inputs.len() != self.shape.inlet_count() {
return Err(StreamError::GraphValidation(format!(
"expected {} input streams, got {}",
self.shape.inlet_count(),
inputs.len()
)));
}
if mode != ExecutorMode::ErasedOnly {
if self.typed_concat_supported() {
return run_typed_concat(inputs, config);
}
if mode == ExecutorMode::TypedOnly {
return Err(StreamError::GraphValidation(
"typed executor does not support this graph shape".into(),
));
}
}
self.run_concat_report_erased(inputs, config)
}
fn typed_concat_supported(&self) -> bool {
let shape_inlets = self.shape.inlets();
let shape_outlet = self.shape.outlet().erase();
direct_single_fan_in_stage::<T>(&self.stages, &self.edges, &shape_inlets, &shape_outlet)
.is_some_and(|stage| matches!(&stage.spec.kind, StageKind::Concat))
}
fn run_concat_report_erased(
&self,
inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
let output_capacity = inputs.iter().map(Vec::len).sum();
let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
let outlet = self.shape.outlet().id();
let mut executor = FusedExecutor::new(self, config);
let mut output = Vec::with_capacity(output_capacity);
{
let mut output_sink = VecOutputSink {
output: &mut output,
};
for (index, queue) in queues.iter_mut().enumerate() {
for item in queue.by_ref() {
executor.deliver(
self.shape.inlet(index)?.id(),
datum(item),
outlet,
&mut output_sink,
)?;
}
executor.complete(self.shape.inlet(index)?.id(), outlet, &mut output_sink)?;
}
}
Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
pub fn run_or_else(&self, primary: Vec<T>, secondary: Vec<T>) -> StreamResult<Vec<T>> {
Ok(self
.run_or_else_report(primary, secondary, FusedExecutionConfig::default())?
.output)
}
pub fn run_or_else_report(
&self,
primary: Vec<T>,
secondary: Vec<T>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
if self.shape.inlet_count() != 2 {
return Err(StreamError::GraphValidation(format!(
"or-else helper expected 2 inlets, got {}",
self.shape.inlet_count()
)));
}
let primary = primary.into_iter();
let secondary = secondary.into_iter();
let primary_inlet = self.shape.inlet(0)?.id();
let secondary_inlet = self.shape.inlet(1)?.id();
let outlet = self.shape.outlet().id();
let mut executor = FusedExecutor::new(self, config);
let mut output = Vec::new();
let mut primary_emitted = false;
{
let mut output_sink = VecOutputSink {
output: &mut output,
};
for item in primary {
primary_emitted = true;
executor.deliver(primary_inlet, datum(item), outlet, &mut output_sink)?;
}
executor.complete(primary_inlet, outlet, &mut output_sink)?;
if !primary_emitted {
for item in secondary {
executor.deliver(secondary_inlet, datum(item), outlet, &mut output_sink)?;
}
}
executor.complete(secondary_inlet, outlet, &mut output_sink)?;
}
Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
pub fn run_or_else_secondary_first(
&self,
primary: Vec<T>,
secondary: Vec<T>,
) -> StreamResult<Vec<T>> {
Ok(self
.run_or_else_secondary_first_report(
primary,
secondary,
FusedExecutionConfig::default(),
)?
.output)
}
pub fn run_or_else_secondary_first_report(
&self,
primary: Vec<T>,
secondary: Vec<T>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
if self.shape.inlet_count() != 2 {
return Err(StreamError::GraphValidation(format!(
"or-else helper expected 2 inlets, got {}",
self.shape.inlet_count()
)));
}
let primary_inlet = self.shape.inlet(0)?.id();
let secondary_inlet = self.shape.inlet(1)?.id();
let outlet = self.shape.outlet().id();
let mut executor = FusedExecutor::new(self, config);
let mut output = Vec::new();
{
let mut output_sink = VecOutputSink {
output: &mut output,
};
for item in secondary {
executor.deliver(secondary_inlet, datum(item), outlet, &mut output_sink)?;
}
for item in primary {
executor.deliver(primary_inlet, datum(item), outlet, &mut output_sink)?;
}
executor.complete(primary_inlet, outlet, &mut output_sink)?;
executor.complete(secondary_inlet, outlet, &mut output_sink)?;
}
Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
pub fn run_or_else_secondary_closed_first(&self, secondary: Vec<T>) -> StreamResult<Vec<T>> {
if self.shape.inlet_count() != 2 {
return Err(StreamError::GraphValidation(format!(
"or-else helper expected 2 inlets, got {}",
self.shape.inlet_count()
)));
}
let primary_inlet = self.shape.inlet(0)?.id();
let secondary_inlet = self.shape.inlet(1)?.id();
let outlet = self.shape.outlet().id();
let mut executor = FusedExecutor::new(self, FusedExecutionConfig::default());
let mut output = Vec::new();
{
let mut output_sink = VecOutputSink {
output: &mut output,
};
for item in secondary {
executor.deliver(secondary_inlet, datum(item), outlet, &mut output_sink)?;
}
executor.complete(secondary_inlet, outlet, &mut output_sink)?;
executor.complete(primary_inlet, outlet, &mut output_sink)?;
}
Ok(output)
}
pub fn run_interleave(
&self,
inputs: Vec<Vec<T>>,
segment_size: usize,
eager_close: bool,
) -> StreamResult<Vec<T>> {
Ok(self
.run_interleave_report(
inputs,
segment_size,
eager_close,
FusedExecutionConfig::default(),
)?
.output)
}
pub fn run_interleave_report(
&self,
inputs: Vec<Vec<T>>,
segment_size: usize,
eager_close: bool,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
self.run_interleave_report_mode(
inputs,
segment_size,
eager_close,
config,
ExecutorMode::Auto,
)
}
pub(crate) fn run_interleave_report_mode(
&self,
inputs: Vec<Vec<T>>,
segment_size: usize,
eager_close: bool,
config: FusedExecutionConfig,
mode: ExecutorMode,
) -> StreamResult<FusedExecutionReport<T>> {
if inputs.len() != self.shape.inlet_count() {
return Err(StreamError::GraphValidation(format!(
"expected {} input streams, got {}",
self.shape.inlet_count(),
inputs.len()
)));
}
if segment_size == 0 {
return Err(StreamError::GraphValidation(
"interleave segment size must be greater than zero".into(),
));
}
if mode != ExecutorMode::ErasedOnly {
if self.typed_interleave_supported(segment_size, eager_close) {
return run_typed_interleave(inputs, segment_size, eager_close, config);
}
if mode == ExecutorMode::TypedOnly {
return Err(StreamError::GraphValidation(
"typed executor does not support this graph shape".into(),
));
}
}
self.run_interleave_report_erased(inputs, segment_size, eager_close, config)
}
fn typed_interleave_supported(&self, segment_size: usize, eager_close: bool) -> bool {
let shape_inlets = self.shape.inlets();
let shape_outlet = self.shape.outlet().erase();
direct_single_fan_in_stage::<T>(&self.stages, &self.edges, &shape_inlets, &shape_outlet)
.is_some_and(|stage| {
matches!(
&stage.spec.kind,
StageKind::Interleave {
segment_size: stage_segment_size,
eager_close: stage_eager_close,
} if *stage_segment_size == segment_size && *stage_eager_close == eager_close
)
})
}
fn run_interleave_report_erased(
&self,
inputs: Vec<Vec<T>>,
segment_size: usize,
eager_close: bool,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
let output_capacity = inputs.iter().map(Vec::len).sum();
let mut queues: Vec<_> = inputs.into_iter().map(Vec::into_iter).collect();
let mut completed = vec![false; queues.len()];
let outlet = self.shape.outlet().id();
let mut executor = FusedExecutor::new(self, config);
let mut output = Vec::with_capacity(output_capacity);
{
let mut output_sink = VecOutputSink {
output: &mut output,
};
for (index, queue) in queues.iter().enumerate() {
if queue.len() == 0 {
executor.complete(self.shape.inlet(index)?.id(), outlet, &mut output_sink)?;
completed[index] = true;
if eager_close {
return Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
});
}
}
}
let mut current = 0usize;
while completed.iter().any(|done| !done) {
if completed[current] {
current = next_open_index(&completed, current).ok_or_else(|| {
StreamError::GraphValidation("no open interleave input".into())
})?;
continue;
}
let mut emitted = 0usize;
while emitted < segment_size {
match queues[current].next() {
Some(item) => {
executor.deliver(
self.shape.inlet(current)?.id(),
datum(item),
outlet,
&mut output_sink,
)?;
emitted += 1;
}
None => {
executor.complete(
self.shape.inlet(current)?.id(),
outlet,
&mut output_sink,
)?;
completed[current] = true;
if eager_close {
return Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
});
}
break;
}
}
}
if completed.iter().all(|done| *done) {
break;
}
current = next_open_index(&completed, current).ok_or_else(|| {
StreamError::GraphValidation("no open interleave input".into())
})?;
}
}
Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
}
impl<T> GraphBlueprint<MergePreferredShape<T>>
where
T: Clone + Send + 'static,
{
pub fn run_merge_preferred(
&self,
preferred: Vec<T>,
secondary_inputs: Vec<Vec<T>>,
) -> StreamResult<Vec<T>> {
Ok(self
.run_merge_preferred_report(
preferred,
secondary_inputs,
FusedExecutionConfig::default(),
)?
.output)
}
pub fn run_merge_preferred_report(
&self,
preferred: Vec<T>,
secondary_inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
self.run_merge_preferred_report_mode(
preferred,
secondary_inputs,
config,
ExecutorMode::Auto,
)
}
pub(crate) fn run_merge_preferred_report_mode(
&self,
preferred: Vec<T>,
secondary_inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
mode: ExecutorMode,
) -> StreamResult<FusedExecutionReport<T>> {
if secondary_inputs.len() != self.shape.secondary_count() {
return Err(StreamError::GraphValidation(format!(
"expected {} secondary input streams, got {}",
self.shape.secondary_count(),
secondary_inputs.len()
)));
}
if mode != ExecutorMode::ErasedOnly {
if self.typed_merge_preferred_supported() {
return run_typed_merge_preferred(preferred, secondary_inputs, config);
}
if mode == ExecutorMode::TypedOnly {
return Err(StreamError::GraphValidation(
"typed executor does not support this graph shape".into(),
));
}
}
self.run_merge_preferred_report_erased(preferred, secondary_inputs, config)
}
fn typed_merge_preferred_supported(&self) -> bool {
let shape_inlets = self.shape.inlets();
let shape_outlet = self.shape.outlet().erase();
direct_single_fan_in_stage::<T>(&self.stages, &self.edges, &shape_inlets, &shape_outlet)
.is_some_and(|stage| matches!(&stage.spec.kind, StageKind::MergePreferred))
}
fn run_merge_preferred_report_erased(
&self,
preferred: Vec<T>,
secondary_inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>> {
let output_capacity =
preferred.len() + secondary_inputs.iter().map(Vec::len).sum::<usize>();
let mut preferred = preferred.into_iter();
let mut secondary: Vec<_> = secondary_inputs.into_iter().map(Vec::into_iter).collect();
let secondary_schedule = (0..secondary.len()).collect::<Vec<_>>();
let mut secondary_index = 0;
let outlet = self.shape.outlet().id();
let preferred_inlet = self.shape.preferred().id();
let mut executor = FusedExecutor::new(self, config);
let mut output = Vec::with_capacity(output_capacity);
let mut preferred_completed = false;
let mut secondary_completed = vec![false; secondary.len()];
{
let mut output_sink = VecOutputSink {
output: &mut output,
};
if preferred.len() == 0 {
executor.complete(preferred_inlet, outlet, &mut output_sink)?;
preferred_completed = true;
}
for (index, queue) in secondary.iter().enumerate() {
if queue.len() == 0 {
executor.complete(
self.shape.secondary(index)?.id(),
outlet,
&mut output_sink,
)?;
secondary_completed[index] = true;
}
}
while preferred.len() > 0 || secondary.iter().any(|queue| queue.len() > 0) {
if let Some(item) = preferred.next() {
executor.deliver(preferred_inlet, datum(item), outlet, &mut output_sink)?;
if preferred.len() == 0 && !preferred_completed {
executor.complete(preferred_inlet, outlet, &mut output_sink)?;
preferred_completed = true;
}
continue;
}
let input_index =
next_scheduled_input(&secondary, &secondary_schedule, &mut secondary_index)
.ok_or_else(|| {
StreamError::GraphValidation("no runnable secondary input".into())
})?;
let item = secondary[input_index]
.next()
.expect("scheduled secondary input had an item");
executor.deliver(
self.shape.secondary(input_index)?.id(),
datum(item),
outlet,
&mut output_sink,
)?;
if secondary[input_index].len() == 0 && !secondary_completed[input_index] {
executor.complete(
self.shape.secondary(input_index)?.id(),
outlet,
&mut output_sink,
)?;
secondary_completed[input_index] = true;
}
}
}
Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
}
fn run_typed_merge_preferred<T>(
preferred: Vec<T>,
secondary_inputs: Vec<Vec<T>>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<T>>
where
T: Send + 'static,
{
let output_capacity = preferred.len() + secondary_inputs.iter().map(Vec::len).sum::<usize>();
let input_count = secondary_inputs.len() + 1;
let events = typed_fan_in_success_events(output_capacity, input_count)?;
let mut checked_events = 0;
add_typed_helper_events(&mut checked_events, config, events)?;
let mut output = Vec::with_capacity(output_capacity);
output.extend(preferred);
let mut secondary: Vec<_> = secondary_inputs.into_iter().map(Vec::into_iter).collect();
let secondary_schedule = (0..secondary.len()).collect::<Vec<_>>();
let mut secondary_index = 0;
while output.len() < output_capacity {
let input_index =
next_scheduled_input(&secondary, &secondary_schedule, &mut secondary_index)
.ok_or_else(|| {
StreamError::GraphValidation("no runnable secondary input".into())
})?;
let item = secondary[input_index]
.next()
.expect("scheduled secondary input had an item");
output.push(item);
}
Ok(FusedExecutionReport {
output,
events,
async_boundary_crossings: 0,
})
}
fn next_scheduled_input<I>(
queues: &[I],
schedule: &[usize],
schedule_index: &mut usize,
) -> Option<usize>
where
I: ExactSizeIterator,
{
if schedule.is_empty() {
return queues.iter().position(|queue| queue.len() > 0);
}
for _ in 0..schedule.len() {
let index = schedule[*schedule_index % schedule.len()];
*schedule_index += 1;
if queues.get(index).is_some_and(|queue| queue.len() > 0) {
return Some(index);
}
}
queues.iter().position(|queue| queue.len() > 0)
}
fn next_open_index(completed: &[bool], current: usize) -> Option<usize> {
if completed.is_empty() {
return None;
}
let len = completed.len();
for offset in 1..=len {
let index = (current + offset) % len;
if !completed[index] {
return Some(index);
}
}
None
}
pub(crate) trait FusedOutputSink<Out> {
fn emit(&mut self, value: Out) -> StreamResult<()>;
}
pub(crate) struct VecOutputSink<'a, Out> {
pub(crate) output: &'a mut Vec<Out>,
}
impl<Out> FusedOutputSink<Out> for VecOutputSink<'_, Out> {
fn emit(&mut self, value: Out) -> StreamResult<()> {
self.output.push(value);
Ok(())
}
}
pub(crate) struct CountOutputSink {
pub(crate) count: usize,
}
impl<Out> FusedOutputSink<Out> for CountOutputSink {
fn emit(&mut self, _value: Out) -> StreamResult<()> {
self.count += 1;
Ok(())
}
}
pub(crate) struct FoldOutputSink<Acc, F> {
pub(crate) accumulator: Option<Acc>,
pub(crate) fold: F,
}
impl<Out, Acc, F> FusedOutputSink<Out> for FoldOutputSink<Acc, F>
where
F: FnMut(Acc, Out) -> Acc,
{
fn emit(&mut self, value: Out) -> StreamResult<()> {
let accumulator = self
.accumulator
.take()
.expect("fold accumulator is present while executor is running");
self.accumulator = Some((self.fold)(accumulator, value));
Ok(())
}
}
impl<Acc, F> FoldOutputSink<Acc, F> {
pub(crate) fn finish(mut self) -> Acc {
self.accumulator
.take()
.expect("fold accumulator is present after executor finishes")
}
}
#[derive(Debug)]
pub(crate) struct FusedExecutor<'a, S: Shape> {
graph: &'a GraphBlueprint<S>,
pub(crate) edge_by_outlet: HashMap<PortId, PortId>,
pub(crate) edge_by_inlet: HashMap<PortId, PortId>,
pub(crate) stage_by_inlet: HashMap<PortId, usize>,
pub(crate) stage_by_outlet: HashMap<PortId, usize>,
stage_states: Vec<StageState>,
pub(crate) opaque_logics: Vec<Option<GraphStageLogic>>,
config: FusedExecutionConfig,
pub(crate) events: usize,
pub(crate) async_boundary_crossings: usize,
cancelled_outlets: HashSet<PortId>,
event_stack: Vec<FusedEvent>,
draining: bool,
has_cycle: bool,
}
#[derive(Debug)]
enum StageState {
Stateless,
Broadcast {
cancelled_outlets: Vec<bool>,
live_outlets: usize,
},
Balance {
next: usize,
cancelled_outlets: Vec<bool>,
live_outlets: usize,
},
Merge {
open_inputs: usize,
eager_complete: bool,
completed: bool,
},
OrElse {
primary_emitted: bool,
buffer: VecDeque<DatumValue>,
primary_closed: bool,
secondary_closed: bool,
completed: bool,
},
Zip {
left_inlet: PortId,
right_inlet: PortId,
left: VecDeque<DatumValue>,
right: VecDeque<DatumValue>,
left_pending_complete: bool,
right_pending_complete: bool,
completed: bool,
},
Unzip {
fast_path: Option<UnzipFanInFastPath>,
zip_fast_path: Option<UnzipZipFastPath>,
demand: [bool; 2],
cancelled: [bool; 2],
upstream_closed: bool,
},
MergeSorted {
left: VecDeque<DatumValue>,
right: VecDeque<DatumValue>,
left_closed: bool,
right_closed: bool,
pending: VecDeque<DatumValue>,
completed: bool,
},
MergeSequence {
next_sequence: u64,
pending: Vec<(u64, DatumValue)>,
completed_count: usize,
output_buffer: VecDeque<DatumValue>,
completed: bool,
},
MergeLatest {
latest: Vec<Option<DatumValue>>,
seen_count: usize,
completed_count: usize,
pending: VecDeque<DatumValue>,
completed: bool,
},
Partition {
pending: Option<(usize, DatumValue)>,
upstream_closed: bool,
demand: Vec<bool>,
cancelled: Vec<bool>,
output_count: usize,
completed: bool,
},
}
#[derive(Clone, Copy, Debug)]
struct UnzipFanInFastPath {
fan_in_stage_index: usize,
target_inlet_indices: [usize; 2],
}
#[derive(Clone, Copy, Debug)]
struct UnzipZipFastPath {
zip_stage_index: usize,
}
enum StageEmissions {
None,
One(PortId, DatumValue),
Two((PortId, DatumValue), (PortId, DatumValue)),
Many(Vec<(PortId, DatumValue)>),
}
struct StageTransition {
emissions: StageEmissions,
completed_outlets: Vec<PortId>,
cancelled_inlets: Vec<PortId>,
}
impl StageTransition {
fn none() -> Self {
Self {
emissions: StageEmissions::None,
completed_outlets: Vec::new(),
cancelled_inlets: Vec::new(),
}
}
fn emit(emissions: StageEmissions) -> Self {
Self {
emissions,
completed_outlets: Vec::new(),
cancelled_inlets: Vec::new(),
}
}
fn with_completion(mut self, completed_outlets: Vec<PortId>) -> Self {
self.completed_outlets = completed_outlets;
self
}
fn with_cancellations(mut self, cancelled_inlets: Vec<PortId>) -> Self {
self.cancelled_inlets = cancelled_inlets;
self
}
}
#[derive(Debug)]
enum FusedEvent {
Deliver { inlet: PortId, value: DatumValue },
CompleteInlet { inlet: PortId },
Request { outlet: PortId },
DownstreamFinish { outlet: PortId },
Emit { outlet: PortId, value: DatumValue },
CompleteOutlet { outlet: PortId },
CancelInlet { inlet: PortId },
}
fn graph_has_cycle<S: Shape>(
graph: &GraphBlueprint<S>,
stage_by_inlet: &HashMap<PortId, usize>,
stage_by_outlet: &HashMap<PortId, usize>,
) -> bool {
let mut adjacency = vec![Vec::new(); graph.stages.len()];
for edge in &graph.edges {
let Some(from) = stage_by_outlet.get(&edge.outlet).copied() else {
continue;
};
let Some(to) = stage_by_inlet.get(&edge.inlet).copied() else {
continue;
};
adjacency[from].push(to);
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum Visit {
New,
Active,
Done,
}
fn visit(node: usize, adjacency: &[Vec<usize>], marks: &mut [Visit]) -> bool {
marks[node] = Visit::Active;
for &next in &adjacency[node] {
if marks[next] == Visit::Active {
return true;
}
if marks[next] == Visit::New && visit(next, adjacency, marks) {
return true;
}
}
marks[node] = Visit::Done;
false
}
let mut marks = vec![Visit::New; graph.stages.len()];
for node in 0..graph.stages.len() {
if marks[node] == Visit::New && visit(node, &adjacency, &mut marks) {
return true;
}
}
false
}
impl<'a, S: Shape> FusedExecutor<'a, S> {
fn is_outlet_cancelled(&self, outlet: PortId) -> bool {
!self.cancelled_outlets.is_empty() && self.cancelled_outlets.contains(&outlet)
}
fn new(graph: &'a GraphBlueprint<S>, config: FusedExecutionConfig) -> Self {
let edge_by_outlet = graph
.edges
.iter()
.map(|edge| (edge.outlet, edge.inlet))
.collect();
let edge_by_inlet = graph
.edges
.iter()
.map(|edge| (edge.inlet, edge.outlet))
.collect();
let mut stage_by_inlet = HashMap::new();
let mut stage_by_outlet = HashMap::new();
for (index, stage) in graph.stages.iter().enumerate() {
for inlet in &stage.spec.inlets {
stage_by_inlet.insert(inlet.id(), index);
}
for outlet in &stage.spec.outlets {
stage_by_outlet.insert(outlet.id(), index);
}
}
let opaque_logics: Vec<_> = graph
.stages
.iter()
.map(|stage| stage.logic_factory.as_ref().map(|factory| factory()))
.collect();
let stage_states = graph
.stages
.iter()
.map(|stage| match stage.spec.kind {
StageKind::Broadcast => StageState::Broadcast {
cancelled_outlets: vec![false; stage.spec.outlets.len()],
live_outlets: stage.spec.outlets.len(),
},
StageKind::Balance => StageState::Balance {
next: 0,
cancelled_outlets: vec![false; stage.spec.outlets.len()],
live_outlets: stage.spec.outlets.len(),
},
StageKind::Merge => StageState::Merge {
open_inputs: stage.spec.inlets.len(),
eager_complete: false,
completed: false,
},
StageKind::MergePreferred => StageState::Merge {
open_inputs: stage.spec.inlets.len(),
eager_complete: false,
completed: false,
},
StageKind::MergePrioritized { .. } => StageState::Merge {
open_inputs: stage.spec.inlets.len(),
eager_complete: false,
completed: false,
},
StageKind::Concat => StageState::Merge {
open_inputs: stage.spec.inlets.len(),
eager_complete: false,
completed: false,
},
StageKind::Interleave { eager_close, .. } => StageState::Merge {
open_inputs: stage.spec.inlets.len(),
eager_complete: eager_close,
completed: false,
},
StageKind::OrElse { primary_inlet: _ } => StageState::OrElse {
primary_emitted: false,
buffer: VecDeque::new(),
primary_closed: false,
secondary_closed: false,
completed: false,
},
StageKind::Zip(_) => {
if let [left, right] = stage.spec.inlets.as_slice() {
StageState::Zip {
left_inlet: left.id(),
right_inlet: right.id(),
left: VecDeque::new(),
right: VecDeque::new(),
left_pending_complete: false,
right_pending_complete: false,
completed: false,
}
} else {
StageState::Stateless
}
}
StageKind::Unzip { .. } => StageState::Unzip {
fast_path: unzip_fan_in_fast_path(
stage,
graph,
&edge_by_outlet,
&stage_by_inlet,
),
zip_fast_path: unzip_zip_fast_path(
stage,
graph,
&edge_by_outlet,
&stage_by_inlet,
),
demand: [false; 2],
cancelled: [false; 2],
upstream_closed: false,
},
StageKind::MergeSorted(_) => StageState::MergeSorted {
left: VecDeque::new(),
right: VecDeque::new(),
left_closed: false,
right_closed: false,
pending: VecDeque::new(),
completed: false,
},
StageKind::MergeSequence { .. } => StageState::MergeSequence {
next_sequence: 0,
pending: Vec::new(),
completed_count: 0,
output_buffer: VecDeque::new(),
completed: false,
},
StageKind::MergeLatest { input_count, .. } => {
let mut latest = Vec::with_capacity(input_count);
for _ in 0..input_count {
latest.push(None);
}
StageState::MergeLatest {
latest,
seen_count: 0,
completed_count: 0,
pending: VecDeque::new(),
completed: false,
}
}
StageKind::Partition { output_count, .. } => StageState::Partition {
pending: None,
upstream_closed: false,
demand: vec![false; output_count],
cancelled: vec![false; output_count],
output_count,
completed: false,
},
_ => StageState::Stateless,
})
.collect();
let has_cycle =
!graph.edges.is_empty() && graph_has_cycle(graph, &stage_by_inlet, &stage_by_outlet);
let mut executor = Self {
graph,
edge_by_outlet,
edge_by_inlet,
stage_by_inlet,
stage_by_outlet,
stage_states,
opaque_logics,
config,
events: 0,
async_boundary_crossings: 0,
cancelled_outlets: HashSet::new(),
event_stack: Vec::new(),
draining: false,
has_cycle,
};
executor.prime_connected_demands();
executor
}
fn deliver<Out>(
&mut self,
inlet: PortId,
value: DatumValue,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
if !self.has_cycle {
return self.process_deliver_direct(inlet, value, graph_outlet, output);
}
self.schedule_event(FusedEvent::Deliver { inlet, value });
self.drain_events(graph_outlet, output)
}
fn complete<Out>(
&mut self,
inlet: PortId,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
if !self.has_cycle {
return self.process_complete_inlet_direct(inlet, graph_outlet, output);
}
self.schedule_event(FusedEvent::CompleteInlet { inlet });
self.drain_events(graph_outlet, output)
}
fn request<Out>(
&mut self,
outlet: PortId,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
if !self.has_cycle {
return self.process_request_direct(outlet, graph_outlet, output);
}
self.schedule_event(FusedEvent::Request { outlet });
self.drain_events(graph_outlet, output)
}
#[cfg_attr(not(test), allow(dead_code))]
fn downstream_finish<Out>(
&mut self,
outlet: PortId,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
if !self.has_cycle {
return self.process_downstream_finish_direct(outlet, graph_outlet, output);
}
self.schedule_event(FusedEvent::DownstreamFinish { outlet });
self.drain_events(graph_outlet, output)
}
fn schedule_event(&mut self, event: FusedEvent) {
self.event_stack.push(event);
}
fn schedule_transition(&mut self, transition: StageTransition) {
for inlet in transition.cancelled_inlets.into_iter().rev() {
self.schedule_event(FusedEvent::CancelInlet { inlet });
}
for outlet in transition.completed_outlets.into_iter().rev() {
if !self.is_outlet_cancelled(outlet) {
self.schedule_event(FusedEvent::CompleteOutlet { outlet });
}
}
self.schedule_emissions_in_order(transition.emissions);
}
fn schedule_emissions_in_order(&mut self, emissions: StageEmissions) {
match emissions {
StageEmissions::None => {}
StageEmissions::One(outlet, value) => {
self.schedule_event(FusedEvent::Emit { outlet, value });
}
StageEmissions::Two((first_outlet, first_value), (second_outlet, second_value)) => {
self.schedule_event(FusedEvent::Emit {
outlet: second_outlet,
value: second_value,
});
self.schedule_event(FusedEvent::Emit {
outlet: first_outlet,
value: first_value,
});
}
StageEmissions::Many(emissions) => {
for (outlet, value) in emissions.into_iter().rev() {
self.schedule_event(FusedEvent::Emit { outlet, value });
}
}
}
}
fn drain_events<Out>(
&mut self,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
if self.draining {
return Ok(());
}
self.draining = true;
let result = (|| {
while let Some(event) = self.event_stack.pop() {
self.process_event(event, graph_outlet, output)?;
}
Ok(())
})();
self.draining = false;
if result.is_err() {
self.event_stack.clear();
}
result
}
fn process_event<Out>(
&mut self,
event: FusedEvent,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
match event {
FusedEvent::Deliver { inlet, value } => {
self.bump_event()?;
let stage_index = *self.stage_by_inlet.get(&inlet).ok_or_else(|| {
StreamError::GraphValidation(format!(
"inlet {} has no owning stage",
inlet.as_usize()
))
})?;
let transition = self.process_stage(stage_index, inlet, value)?;
self.process_transition(transition);
Ok(())
}
FusedEvent::CompleteInlet { inlet } => {
self.bump_event()?;
let stage_index = *self.stage_by_inlet.get(&inlet).ok_or_else(|| {
StreamError::GraphValidation(format!(
"inlet {} has no owning stage",
inlet.as_usize()
))
})?;
let transition = self.process_completion(stage_index, inlet)?;
self.process_transition(transition);
Ok(())
}
FusedEvent::Request { outlet } => {
if self.is_outlet_cancelled(outlet) {
return Ok(());
}
self.bump_event()?;
let Some(stage_index) = self.stage_by_outlet.get(&outlet).copied() else {
return Ok(());
};
let transition = self.process_pull(stage_index, outlet)?;
self.process_transition(transition);
Ok(())
}
FusedEvent::DownstreamFinish { outlet } => {
if !self.cancelled_outlets.insert(outlet) {
return Ok(());
}
self.bump_event()?;
let stage_index = *self.stage_by_outlet.get(&outlet).ok_or_else(|| {
StreamError::GraphValidation(format!(
"outlet {} has no owning stage",
outlet.as_usize()
))
})?;
let transition = self.process_downstream_finish(stage_index, outlet)?;
self.process_transition(transition);
Ok(())
}
FusedEvent::Emit { outlet, value } => {
self.process_emit_cyclic(outlet, value, graph_outlet, output)
}
FusedEvent::CompleteOutlet { outlet } => {
self.process_complete_outlet_cyclic(outlet, graph_outlet, output)
}
FusedEvent::CancelInlet { inlet } => {
if let Some(outlet) = self.edge_by_inlet.get(&inlet).copied() {
self.schedule_event(FusedEvent::DownstreamFinish { outlet });
}
Ok(())
}
}
}
fn process_transition(&mut self, transition: StageTransition) {
self.schedule_transition(transition);
}
fn process_transition_direct<Out>(
&mut self,
transition: StageTransition,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
self.process_emissions_direct(transition.emissions, graph_outlet, output)?;
for outlet in transition.completed_outlets {
if !self.is_outlet_cancelled(outlet) {
self.process_complete_outlet_direct(outlet, graph_outlet, output)?;
}
}
for inlet in transition.cancelled_inlets {
self.process_cancel_inlet_direct(inlet, graph_outlet, output)?;
}
Ok(())
}
fn process_emissions_direct<Out>(
&mut self,
emissions: StageEmissions,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
match emissions {
StageEmissions::None => Ok(()),
StageEmissions::One(outlet, value) => {
self.process_emit_direct(outlet, value, graph_outlet, output)
}
StageEmissions::Two((first_outlet, first_value), (second_outlet, second_value)) => {
self.process_emit_direct(first_outlet, first_value, graph_outlet, output)?;
self.process_emit_direct(second_outlet, second_value, graph_outlet, output)
}
StageEmissions::Many(emissions) => {
for (outlet, value) in emissions {
self.process_emit_direct(outlet, value, graph_outlet, output)?;
}
Ok(())
}
}
}
fn process_deliver_direct<Out>(
&mut self,
inlet: PortId,
value: DatumValue,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
self.bump_event()?;
let stage_index = *self.stage_by_inlet.get(&inlet).ok_or_else(|| {
StreamError::GraphValidation(format!("inlet {} has no owning stage", inlet.as_usize()))
})?;
let transition = self.process_stage(stage_index, inlet, value)?;
self.process_transition_direct(transition, graph_outlet, output)
}
fn process_complete_inlet_direct<Out>(
&mut self,
inlet: PortId,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
self.bump_event()?;
let stage_index = *self.stage_by_inlet.get(&inlet).ok_or_else(|| {
StreamError::GraphValidation(format!("inlet {} has no owning stage", inlet.as_usize()))
})?;
let transition = self.process_completion(stage_index, inlet)?;
self.process_transition_direct(transition, graph_outlet, output)
}
fn process_request_direct<Out>(
&mut self,
outlet: PortId,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
if self.is_outlet_cancelled(outlet) {
return Ok(());
}
self.bump_event()?;
let Some(stage_index) = self.stage_by_outlet.get(&outlet).copied() else {
return Ok(());
};
let transition = self.process_pull(stage_index, outlet)?;
self.process_transition_direct(transition, graph_outlet, output)
}
fn process_downstream_finish_direct<Out>(
&mut self,
outlet: PortId,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
if !self.cancelled_outlets.insert(outlet) {
return Ok(());
}
self.bump_event()?;
let stage_index = *self.stage_by_outlet.get(&outlet).ok_or_else(|| {
StreamError::GraphValidation(format!(
"outlet {} has no owning stage",
outlet.as_usize()
))
})?;
let transition = self.process_downstream_finish(stage_index, outlet)?;
self.process_transition_direct(transition, graph_outlet, output)
}
fn process_cancel_inlet_direct<Out>(
&mut self,
inlet: PortId,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
if let Some(outlet) = self.edge_by_inlet.get(&inlet).copied() {
self.process_downstream_finish_direct(outlet, graph_outlet, output)
} else {
Ok(())
}
}
fn process_emit_direct<Out>(
&mut self,
outlet: PortId,
value: DatumValue,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
self.bump_event()?;
if self.is_outlet_cancelled(outlet) {
return Ok(());
}
if outlet == graph_outlet {
output.emit(downcast_datum(value, "emit", || {
format!("outlet#{}", outlet.as_usize())
})?)?;
return self.process_request_direct(outlet, graph_outlet, output);
}
let Some(inlet) = self.edge_by_outlet.get(&outlet).copied() else {
return Err(StreamError::GraphValidation(format!(
"outlet {} is neither connected nor graph output",
outlet.as_usize()
)));
};
let should_repull = self
.stage_by_outlet
.get(&outlet)
.copied()
.is_some_and(|stage_index| {
matches!(
self.graph.stages[stage_index].spec.kind,
StageKind::Opaque | StageKind::Unzip { .. } | StageKind::Partition { .. }
)
});
self.process_deliver_direct(inlet, value, graph_outlet, output)?;
if should_repull {
self.process_request_direct(outlet, graph_outlet, output)?;
}
Ok(())
}
fn process_emit_cyclic<Out>(
&mut self,
outlet: PortId,
value: DatumValue,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
self.bump_event()?;
if self.is_outlet_cancelled(outlet) {
return Ok(());
}
if outlet == graph_outlet {
output.emit(downcast_datum(value, "emit", || {
format!("outlet#{}", outlet.as_usize())
})?)?;
self.schedule_event(FusedEvent::Request { outlet });
return Ok(());
}
let Some(inlet) = self.edge_by_outlet.get(&outlet).copied() else {
return Err(StreamError::GraphValidation(format!(
"outlet {} is neither connected nor graph output",
outlet.as_usize()
)));
};
let should_repull = self
.stage_by_outlet
.get(&outlet)
.copied()
.is_some_and(|stage_index| {
matches!(
self.graph.stages[stage_index].spec.kind,
StageKind::Opaque | StageKind::Unzip { .. } | StageKind::Partition { .. }
)
});
if should_repull {
self.schedule_event(FusedEvent::Request { outlet });
}
self.schedule_event(FusedEvent::Deliver { inlet, value });
Ok(())
}
fn process_complete_outlet_direct<Out>(
&mut self,
outlet: PortId,
graph_outlet: PortId,
output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
self.bump_event()?;
if outlet == graph_outlet {
return Ok(());
}
let Some(inlet) = self.edge_by_outlet.get(&outlet).copied() else {
return Err(StreamError::GraphValidation(format!(
"outlet {} is neither connected nor graph output",
outlet.as_usize()
)));
};
self.process_complete_inlet_direct(inlet, graph_outlet, output)
}
fn process_complete_outlet_cyclic<Out>(
&mut self,
outlet: PortId,
graph_outlet: PortId,
_output: &mut impl FusedOutputSink<Out>,
) -> StreamResult<()>
where
Out: Send + 'static,
{
self.bump_event()?;
if outlet == graph_outlet {
return Ok(());
}
let Some(inlet) = self.edge_by_outlet.get(&outlet).copied() else {
return Err(StreamError::GraphValidation(format!(
"outlet {} is neither connected nor graph output",
outlet.as_usize()
)));
};
self.schedule_event(FusedEvent::CompleteInlet { inlet });
Ok(())
}
fn process_stage(
&mut self,
stage_index: usize,
inlet: PortId,
value: DatumValue,
) -> StreamResult<StageTransition> {
let stage = &self.graph.stages[stage_index];
match &stage.spec.kind {
StageKind::Identity => Ok(StageTransition::emit(StageEmissions::One(
single_outlet(stage)?,
value,
))),
StageKind::Opaque => {
if let Some(logic) = self
.opaque_logics
.get_mut(stage_index)
.and_then(|l| l.as_mut())
{
logic.drain_async_callbacks();
let inlet_ref = stage.spec.inlets.iter().find(|i| i.id() == inlet).cloned();
if let Some(inlet_ref) = inlet_ref {
logic.offer_datum(inlet, value)?;
let mut handler = logic.take_in_handler(inlet);
let result = if let Some(ref mut handler) = handler {
let inlet_any = inlet_ref;
handler.on_push(logic, inlet_any)
} else {
Ok(())
};
if let Some(handler) = handler {
logic.restore_in_handler(inlet, handler);
}
result?;
}
self.collect_opaque_emissions(stage, stage_index)
} else {
Ok(StageTransition::emit(StageEmissions::One(
single_outlet(stage)?,
value,
)))
}
}
StageKind::Map(map) => Ok(StageTransition::emit(StageEmissions::One(
single_outlet(stage)?,
(map.erased)(value)?,
))),
StageKind::AsyncBoundary => {
self.async_boundary_crossings += 1;
Ok(StageTransition::emit(StageEmissions::One(
single_outlet(stage)?,
value,
)))
}
StageKind::Broadcast => {
broadcast_emissions(&stage.spec.outlets, value).map(StageTransition::emit)
}
StageKind::Balance => {
let outlets = &stage.spec.outlets;
if outlets.is_empty() {
return Err(StreamError::GraphValidation(
"balance has no outlets".into(),
));
}
let StageState::Balance {
next,
cancelled_outlets,
live_outlets,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"balance state is missing".into(),
));
};
if *live_outlets == 0 {
return Ok(StageTransition::none());
}
let mut selected = None;
for offset in 0..outlets.len() {
let index = (*next + offset) % outlets.len();
if !cancelled_outlets[index] {
selected = Some(index);
break;
}
}
let index = selected.ok_or_else(|| {
StreamError::GraphValidation("balance has no live outlets".into())
})?;
let outlet = outlets[index].id();
*next = (index + 1) % outlets.len();
Ok(StageTransition::emit(StageEmissions::One(outlet, value)))
}
StageKind::Merge | StageKind::MergePreferred | StageKind::MergePrioritized { .. } => {
let StageState::Merge { completed, .. } = &self.stage_states[stage_index] else {
return Err(StreamError::GraphValidation(
"merge state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
Ok(StageTransition::emit(StageEmissions::One(
single_outlet(stage)?,
value,
)))
}
StageKind::Concat | StageKind::Interleave { .. } => {
let StageState::Merge { completed, .. } = &self.stage_states[stage_index] else {
return Err(StreamError::GraphValidation(
"fan-in state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
Ok(StageTransition::emit(StageEmissions::One(
single_outlet(stage)?,
value,
)))
}
StageKind::OrElse { primary_inlet } => {
let StageState::OrElse {
primary_emitted,
buffer,
primary_closed,
completed,
..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"or-else state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
if inlet == *primary_inlet {
*primary_emitted = true;
buffer.clear();
Ok(StageTransition::emit(StageEmissions::One(
single_outlet(stage)?,
value,
)))
} else if *primary_emitted {
Ok(StageTransition::none())
} else if *primary_closed {
Ok(StageTransition::emit(StageEmissions::One(
single_outlet(stage)?,
value,
)))
} else {
buffer.push_back(value);
Ok(StageTransition::none())
}
}
StageKind::Zip(zip) => {
if stage.spec.inlets.len() != 2 {
return Err(StreamError::GraphValidation(format!(
"zip stage {} expected 2 inlets, got {}",
stage.spec.name(),
stage.spec.inlets.len()
)));
}
let ready = {
let StageState::Zip {
left_inlet,
right_inlet,
left,
right,
left_pending_complete,
right_pending_complete,
completed,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation("zip state is missing".into()));
};
if *completed {
return Ok(StageTransition::none());
}
if inlet == *left_inlet {
left.push_back(value);
} else if inlet == *right_inlet {
right.push_back(value);
} else {
return Err(StreamError::GraphValidation(format!(
"zip inlet {} is not part of the stage",
inlet.as_usize()
)));
}
match (left.front().is_some(), right.front().is_some()) {
(true, true) => {
let left_item =
left.pop_front().expect("zip left buffer had an element");
let right_item =
right.pop_front().expect("zip right buffer had an element");
let should_complete = (*left_pending_complete && left.is_empty())
|| (*right_pending_complete && right.is_empty());
Some((left_item, right_item, should_complete))
}
_ => None,
}
};
if let Some((left, right, should_complete)) = ready {
let outlet = single_outlet(stage)?;
if should_complete {
let StageState::Zip { completed, .. } = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"zip state is missing".into(),
));
};
*completed = true;
}
let mut transition =
StageTransition::emit(StageEmissions::One(outlet, zip(left, right)?));
if should_complete {
transition.completed_outlets.push(outlet);
}
Ok(transition)
} else {
Ok(StageTransition::none())
}
}
StageKind::Unzip { split, .. } => {
let (fan_in, zip_fast) = match &self.stage_states[stage_index] {
StageState::Unzip {
fast_path,
zip_fast_path,
..
} => (*fast_path, *zip_fast_path),
_ => (None, None),
};
if let Some(zip_fast) = zip_fast {
let (out0_val, out1_val) = split(value);
let zip_stage = &self.graph.stages[zip_fast.zip_stage_index];
let StageKind::Zip(zip) = &zip_stage.spec.kind else {
return Err(StreamError::GraphValidation(
"unzip-zip fast path references a non-zip stage".into(),
));
};
let outlet = single_outlet(zip_stage)?;
let zipped = zip(out0_val, out1_val)?;
return Ok(StageTransition::emit(StageEmissions::One(outlet, zipped)));
}
if let Some(fast_path) = fan_in {
let (out0_val, out1_val) = split(value);
let target = &self.graph.stages[fast_path.fan_in_stage_index];
match &target.spec.kind {
StageKind::MergeSorted(compare) => {
let result = {
let StageState::MergeSorted {
left,
right,
left_closed,
right_closed,
pending,
completed,
} = &mut self.stage_states[fast_path.fan_in_stage_index]
else {
return Err(StreamError::GraphValidation(
"merge-sorted state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
if fast_path.target_inlet_indices[0] == 0 {
left.push_back(out0_val);
right.push_back(out1_val);
} else {
left.push_back(out1_val);
right.push_back(out0_val);
}
loop {
let next = match (left.front(), right.front()) {
(Some(l), Some(r)) => {
if compare(l, r) != std::cmp::Ordering::Greater {
left.pop_front()
} else {
right.pop_front()
}
}
(Some(_), None) if *right_closed => left.pop_front(),
(None, Some(_)) if *left_closed => right.pop_front(),
_ => break,
};
if let Some(val) = next {
pending.push_back(val);
} else {
break;
}
}
if let Some(output) = pending.pop_front() {
let outlet = single_outlet(target)?;
let all_done = *left_closed
&& *right_closed
&& left.is_empty()
&& right.is_empty()
&& pending.is_empty();
if all_done {
*completed = true;
StageTransition::emit(StageEmissions::One(outlet, output))
.with_completion(vec![outlet])
} else {
StageTransition::emit(StageEmissions::One(outlet, output))
}
} else {
StageTransition::none()
}
};
Ok(result)
}
StageKind::MergeSequence {
extract_sequence,
input_count,
..
} => {
let result = {
let StageState::MergeSequence {
next_sequence,
pending,
completed_count,
output_buffer,
completed,
} = &mut self.stage_states[fast_path.fan_in_stage_index]
else {
return Err(StreamError::GraphValidation(
"merge-sequence state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
for val in [out0_val, out1_val] {
let seq = extract_sequence(&val);
if seq == *next_sequence {
output_buffer.push_back(val);
*next_sequence += 1;
while let Some(index) =
pending.iter().position(|(s, _)| *s == *next_sequence)
{
let (_, item) = pending.remove(index);
output_buffer.push_back(item);
*next_sequence += 1;
}
} else {
if pending.iter().any(|(s, _)| *s == seq) {
return Err(StreamError::Failed(format!(
"duplicate sequence {seq} on merge sequence"
)));
}
pending.push((seq, val));
pending.sort_by_key(|(s, _)| *s);
while let Some(index) =
pending.iter().position(|(s, _)| *s == *next_sequence)
{
let (_, item) = pending.remove(index);
output_buffer.push_back(item);
*next_sequence += 1;
}
}
}
if !output_buffer.is_empty() {
let outlet = single_outlet(target)?;
let all_done = *completed_count >= *input_count;
let emissions: Vec<_> =
output_buffer.drain(..).map(|v| (outlet, v)).collect();
if all_done {
*completed = true;
StageTransition::emit(StageEmissions::Many(emissions))
.with_completion(vec![outlet])
} else {
StageTransition::emit(StageEmissions::Many(emissions))
}
} else {
StageTransition::none()
}
};
Ok(result)
}
StageKind::MergeLatest {
input_count,
build_snapshot,
..
} => {
let result = {
let StageState::MergeLatest {
latest,
seen_count,
completed_count,
pending,
completed,
} = &mut self.stage_states[fast_path.fan_in_stage_index]
else {
return Err(StreamError::GraphValidation(
"merge-latest state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
let inlets = &target.spec.inlets;
for (idx, val) in fast_path
.target_inlet_indices
.into_iter()
.zip([out0_val, out1_val])
{
if idx < inlets.len() && latest[idx].is_none() {
*seen_count += 1;
}
latest[idx] = Some(val);
if *seen_count >= *input_count {
let values: Vec<&DatumValue> =
latest.iter().filter_map(|v| v.as_ref()).collect();
let snapshot = build_snapshot(&values);
pending.push_back(snapshot);
}
}
if !pending.is_empty() {
let outlet = single_outlet(target)?;
let all_done = *completed_count >= *input_count;
let emissions: Vec<_> =
pending.drain(..).map(|v| (outlet, v)).collect();
if all_done {
*completed = true;
StageTransition::emit(StageEmissions::Many(emissions))
.with_completion(vec![outlet])
} else {
StageTransition::emit(StageEmissions::Many(emissions))
}
} else {
StageTransition::none()
}
};
Ok(result)
}
_ => {
let transition = {
let StageState::Unzip { cancelled, .. } =
&self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"unzip state is missing".into(),
));
};
let out0 = stage.spec.outlets.first().map(AnyOutlet::id);
let out1 = stage.spec.outlets.get(1).map(AnyOutlet::id);
let live0 = out0.is_some() && !cancelled[0];
let live1 = out1.is_some() && !cancelled[1];
if live0 && live1 {
StageTransition::emit(StageEmissions::Two(
(out0.unwrap(), out0_val),
(out1.unwrap(), out1_val),
))
} else if live0 {
StageTransition::emit(StageEmissions::One(
out0.unwrap(),
out0_val,
))
} else if live1 {
StageTransition::emit(StageEmissions::One(
out1.unwrap(),
out1_val,
))
} else {
StageTransition::none()
}
};
Ok(transition)
}
}
} else {
let transition = {
let StageState::Unzip { cancelled, .. } = &self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"unzip state is missing".into(),
));
};
let (out0_val, out1_val) = split(value);
let out0 = stage.spec.outlets.first().map(AnyOutlet::id);
let out1 = stage.spec.outlets.get(1).map(AnyOutlet::id);
let live0 = out0.is_some() && !cancelled[0];
let live1 = out1.is_some() && !cancelled[1];
if live0 && live1 {
StageTransition::emit(StageEmissions::Two(
(out0.unwrap(), out0_val),
(out1.unwrap(), out1_val),
))
} else if live0 {
StageTransition::emit(StageEmissions::One(out0.unwrap(), out0_val))
} else if live1 {
StageTransition::emit(StageEmissions::One(out1.unwrap(), out1_val))
} else {
StageTransition::none()
}
};
Ok(transition)
}
}
StageKind::MergeSorted(compare) => {
let result = {
let StageState::MergeSorted {
left,
right,
left_closed,
right_closed,
pending,
completed,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"merge-sorted state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
let is_left = stage.spec.inlets.first().is_some_and(|i| i.id() == inlet);
if is_left {
left.push_back(value);
} else {
right.push_back(value);
}
loop {
let next = match (left.front(), right.front()) {
(Some(l), Some(r)) => {
if compare(l, r) != std::cmp::Ordering::Greater {
left.pop_front()
} else {
right.pop_front()
}
}
(Some(_), None) if *right_closed => left.pop_front(),
(None, Some(_)) if *left_closed => right.pop_front(),
_ => break,
};
if let Some(val) = next {
pending.push_back(val);
} else {
break;
}
}
if let Some(output) = pending.pop_front() {
let outlet = single_outlet(stage)?;
let all_done = *left_closed
&& *right_closed
&& left.is_empty()
&& right.is_empty()
&& pending.is_empty();
if all_done {
*completed = true;
StageTransition::emit(StageEmissions::One(outlet, output))
.with_completion(vec![outlet])
} else {
StageTransition::emit(StageEmissions::One(outlet, output))
}
} else {
StageTransition::none()
}
};
Ok(result)
}
StageKind::MergeSequence {
input_count,
extract_sequence,
..
} => {
let result = {
let StageState::MergeSequence {
next_sequence,
pending,
completed_count,
output_buffer,
completed,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"merge-sequence state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
let seq = extract_sequence(&value);
if seq == *next_sequence {
output_buffer.push_back(value);
*next_sequence += 1;
while let Some(index) =
pending.iter().position(|(s, _)| *s == *next_sequence)
{
let (_, item) = pending.remove(index);
output_buffer.push_back(item);
*next_sequence += 1;
}
} else {
if pending.iter().any(|(s, _)| *s == seq) {
return Err(StreamError::Failed(format!(
"duplicate sequence {seq} on merge sequence"
)));
}
pending.push((seq, value));
pending.sort_by_key(|(s, _)| *s);
while let Some(index) =
pending.iter().position(|(s, _)| *s == *next_sequence)
{
let (_, item) = pending.remove(index);
output_buffer.push_back(item);
*next_sequence += 1;
}
}
if !output_buffer.is_empty() {
let outlet = single_outlet(stage)?;
let all_done = *completed_count >= *input_count;
let emissions: Vec<_> =
output_buffer.drain(..).map(|v| (outlet, v)).collect();
if all_done {
*completed = true;
StageTransition::emit(StageEmissions::Many(emissions))
.with_completion(vec![outlet])
} else {
StageTransition::emit(StageEmissions::Many(emissions))
}
} else {
StageTransition::none()
}
};
Ok(result)
}
StageKind::MergeLatest {
input_count,
build_snapshot,
..
} => {
let result = {
let StageState::MergeLatest {
latest,
seen_count,
completed_count,
pending,
completed,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"merge-latest state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
let inlet_index = stage
.spec
.inlets
.iter()
.position(|i| i.id() == inlet)
.ok_or_else(|| {
StreamError::GraphValidation(format!(
"merge-latest inlet {} not part of stage",
inlet.as_usize()
))
})?;
if latest[inlet_index].is_none() {
*seen_count += 1;
}
latest[inlet_index] = Some(value);
if *seen_count >= *input_count {
let values: Vec<&DatumValue> =
latest.iter().filter_map(|v| v.as_ref()).collect();
let snapshot = build_snapshot(&values);
pending.push_back(snapshot);
}
if !pending.is_empty() {
let outlet = single_outlet(stage)?;
let all_done = *completed_count >= *input_count;
let emissions: Vec<_> = pending.drain(..).map(|v| (outlet, v)).collect();
if all_done {
*completed = true;
StageTransition::emit(StageEmissions::Many(emissions))
.with_completion(vec![outlet])
} else {
StageTransition::emit(StageEmissions::Many(emissions))
}
} else {
StageTransition::none()
}
};
Ok(result)
}
StageKind::Partition {
output_count,
partitioner,
..
} => {
let result = {
let StageState::Partition {
pending,
upstream_closed: _,
demand,
cancelled,
output_count: _,
completed,
..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"partition state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
let idx = partitioner(&value);
if idx >= *output_count {
return Err(StreamError::Failed(format!(
"partitioner returned out-of-bounds index {idx} for {output_count} outputs"
)));
}
if cancelled[idx] {
return Ok(StageTransition::none());
}
if demand[idx] {
demand[idx] = false;
let outlet = stage.spec.outlets[idx].id();
StageTransition::emit(StageEmissions::One(outlet, value))
} else {
*pending = Some((idx, value));
StageTransition::none()
}
};
Ok(result)
}
}
}
fn process_completion(
&mut self,
stage_index: usize,
inlet: PortId,
) -> StreamResult<StageTransition> {
let stage = &self.graph.stages[stage_index];
match &stage.spec.kind {
StageKind::Identity | StageKind::Map(_) | StageKind::AsyncBoundary => {
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?]))
}
StageKind::Opaque => {
if let Some(logic) = self
.opaque_logics
.get_mut(stage_index)
.and_then(|l| l.as_mut())
{
logic.drain_async_callbacks();
logic.complete_inlet_by_id(inlet)?;
let inlet_ref = stage.spec.inlets.iter().find(|i| i.id() == inlet).cloned();
if let Some(inlet_ref) = inlet_ref {
let mut handler = logic.take_in_handler(inlet);
let result = if let Some(ref mut handler) = handler {
let inlet_any = inlet_ref;
handler.on_upstream_finish(logic, inlet_any)
} else {
Ok(())
};
if let Some(handler) = handler {
logic.restore_in_handler(inlet, handler);
}
result?;
}
self.collect_opaque_emissions(stage, stage_index)
} else {
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?]))
}
}
StageKind::Broadcast | StageKind::Balance => {
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(stage.spec.outlets.iter().map(AnyOutlet::id).collect()))
}
StageKind::Merge | StageKind::MergePreferred | StageKind::MergePrioritized { .. } => {
let StageState::Merge {
open_inputs,
eager_complete,
completed,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"merge state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
if *open_inputs == 0 {
return Ok(StageTransition::none());
}
*open_inputs -= 1;
if *eager_complete || *open_inputs == 0 {
*completed = true;
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?]))
} else {
Ok(StageTransition::none())
}
}
StageKind::Concat | StageKind::Interleave { .. } => {
let StageState::Merge {
open_inputs,
eager_complete,
completed,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"fan-in state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
if *open_inputs == 0 {
return Ok(StageTransition::none());
}
*open_inputs -= 1;
if *eager_complete || *open_inputs == 0 {
*completed = true;
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?]))
} else {
Ok(StageTransition::none())
}
}
StageKind::OrElse { primary_inlet } => {
let StageState::OrElse {
primary_emitted,
buffer,
primary_closed,
secondary_closed,
completed,
..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"or-else state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
if inlet == *primary_inlet {
*primary_closed = true;
if *primary_emitted {
*completed = true;
buffer.clear();
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?]))
} else {
let outlet = single_outlet(stage)?;
let emissions: Vec<_> = buffer.drain(..).map(|v| (outlet, v)).collect();
if *secondary_closed {
*completed = true;
let transition = if emissions.is_empty() {
StageTransition::emit(StageEmissions::None)
} else {
StageTransition::emit(StageEmissions::Many(emissions))
};
Ok(transition.with_completion(vec![outlet]))
} else {
if emissions.is_empty() {
Ok(StageTransition::none())
} else {
Ok(StageTransition::emit(StageEmissions::Many(emissions)))
}
}
}
} else {
*secondary_closed = true;
if *primary_closed && !*primary_emitted {
let outlet = single_outlet(stage)?;
let emissions: Vec<_> = buffer.drain(..).map(|v| (outlet, v)).collect();
*completed = true;
if emissions.is_empty() {
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(vec![outlet]))
} else {
Ok(StageTransition::emit(StageEmissions::Many(emissions))
.with_completion(vec![outlet]))
}
} else {
Ok(StageTransition::none())
}
}
}
StageKind::Zip(_) => {
let StageState::Zip {
left_inlet,
right_inlet,
left,
right,
left_pending_complete,
right_pending_complete,
completed,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation("zip state is missing".into()));
};
if *completed {
return Ok(StageTransition::none());
}
let finishes_left = inlet == *left_inlet;
let finishes_right = inlet == *right_inlet;
if (finishes_left && left.is_empty()) || (finishes_right && right.is_empty()) {
*completed = true;
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?]))
} else {
if finishes_left {
*left_pending_complete = true;
}
if finishes_right {
*right_pending_complete = true;
}
Ok(StageTransition::none())
}
}
StageKind::Unzip { .. } => {
let (fan_in, zip_fast) = match &self.stage_states[stage_index] {
StageState::Unzip {
fast_path,
zip_fast_path,
..
} => (*fast_path, *zip_fast_path),
_ => (None, None),
};
if let Some(zip_fast) = zip_fast {
let StageState::Unzip {
upstream_closed, ..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"unzip state is missing".into(),
));
};
*upstream_closed = true;
let zip_stage = &self.graph.stages[zip_fast.zip_stage_index];
return Ok(StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(zip_stage)?]));
}
if let Some(fast_path) = fan_in {
let StageState::Unzip {
upstream_closed, ..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"unzip state is missing".into(),
));
};
*upstream_closed = true;
let target_stage = &self.graph.stages[fast_path.fan_in_stage_index];
let target_inlets = [
target_stage.spec.inlets[fast_path.target_inlet_indices[0]].id(),
target_stage.spec.inlets[fast_path.target_inlet_indices[1]].id(),
];
let mut combined = StageTransition::none();
for target_inlet in target_inlets {
let t =
self.process_completion(fast_path.fan_in_stage_index, target_inlet)?;
combined.emissions = merge_emissions(combined.emissions, t.emissions);
combined.completed_outlets.extend(t.completed_outlets);
combined.cancelled_inlets.extend(t.cancelled_inlets);
}
Ok(combined)
} else {
let StageState::Unzip {
upstream_closed, ..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"unzip state is missing".into(),
));
};
*upstream_closed = true;
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(stage.spec.outlets.iter().map(AnyOutlet::id).collect()))
}
}
StageKind::MergeSorted(compare) => {
let result = {
let StageState::MergeSorted {
left,
right,
left_closed,
right_closed,
pending,
completed,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"merge-sorted state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
let is_left = stage.spec.inlets.first().is_some_and(|i| i.id() == inlet);
if is_left {
*left_closed = true;
} else {
*right_closed = true;
}
loop {
let next = match (left.front(), right.front()) {
(Some(l), Some(r)) => {
if compare(l, r) != std::cmp::Ordering::Greater {
left.pop_front()
} else {
right.pop_front()
}
}
(Some(_), None) if *right_closed => left.pop_front(),
(None, Some(_)) if *left_closed => right.pop_front(),
_ => break,
};
if let Some(val) = next {
pending.push_back(val);
} else {
break;
}
}
if pending.is_empty() {
let all_done =
*left_closed && *right_closed && left.is_empty() && right.is_empty();
if all_done {
*completed = true;
StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?])
} else {
StageTransition::none()
}
} else {
let outlet = single_outlet(stage)?;
let emissions: Vec<_> = pending.drain(..).map(|v| (outlet, v)).collect();
let all_done =
*left_closed && *right_closed && left.is_empty() && right.is_empty();
if all_done {
*completed = true;
StageTransition::emit(StageEmissions::Many(emissions))
.with_completion(vec![outlet])
} else {
StageTransition::emit(StageEmissions::Many(emissions))
}
}
};
Ok(result)
}
StageKind::MergeSequence { input_count, .. } => {
let result = {
let StageState::MergeSequence {
next_sequence,
pending,
completed_count,
output_buffer,
completed,
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"merge-sequence state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
*completed_count += 1;
if *completed_count >= *input_count && output_buffer.is_empty() {
if !pending.is_empty() {
return Err(StreamError::Failed(format!(
"expected sequence {next_sequence}, but all input ports have pushed or are complete",
)));
}
*completed = true;
StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?])
} else {
StageTransition::none()
}
};
Ok(result)
}
StageKind::MergeLatest {
input_count,
eager_complete,
..
} => {
let result = {
let StageState::MergeLatest {
completed_count,
pending,
completed,
..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"merge-latest state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
*completed_count += 1;
let all_done = *completed_count >= *input_count;
let eager_done = *eager_complete && pending.is_empty();
if all_done || eager_done {
*completed = true;
StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?])
} else {
StageTransition::none()
}
};
Ok(result)
}
StageKind::Partition { .. } => {
let result = {
let StageState::Partition {
pending,
upstream_closed,
completed,
..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"partition state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
*upstream_closed = true;
if pending.is_none() {
*completed = true;
StageTransition::emit(StageEmissions::None)
.with_completion(stage.spec.outlets.iter().map(AnyOutlet::id).collect())
} else {
StageTransition::none()
}
};
Ok(result)
}
}
}
fn process_pull(
&mut self,
stage_index: usize,
outlet: PortId,
) -> StreamResult<StageTransition> {
let stage = &self.graph.stages[stage_index];
match &stage.spec.kind {
StageKind::Opaque => {
if let Some(logic) = self
.opaque_logics
.get_mut(stage_index)
.and_then(|l| l.as_mut())
{
logic.drain_async_callbacks();
logic.set_demand_by_id(outlet)?;
let outlet_ref = stage
.spec
.outlets
.iter()
.find(|o| o.id() == outlet)
.cloned();
if let Some(outlet_ref) = outlet_ref {
let mut handler = logic.take_out_handler(outlet);
let result = if let Some(ref mut handler) = handler {
handler.on_pull(logic, outlet_ref)
} else {
Ok(())
};
if let Some(handler) = handler
&& handler.keep_handler()
&& logic.get_out_handler_mut(outlet).is_none()
{
logic.restore_out_handler(outlet, handler);
}
result?;
}
self.collect_opaque_emissions(stage, stage_index)
} else {
Ok(StageTransition::none())
}
}
StageKind::Unzip { .. } => {
let StageState::Unzip {
demand, cancelled, ..
} = &mut self.stage_states[stage_index]
else {
return Ok(StageTransition::none());
};
let Some(idx) = stage.spec.outlets.iter().position(|o| o.id() == outlet) else {
return Ok(StageTransition::none());
};
if idx < 2 && !cancelled[idx] {
demand[idx] = true;
}
Ok(StageTransition::none())
}
StageKind::Partition { .. } => {
let result = {
let StageState::Partition {
pending,
upstream_closed,
demand,
cancelled,
completed,
..
} = &mut self.stage_states[stage_index]
else {
return Ok(StageTransition::none());
};
if *completed {
return Ok(StageTransition::none());
}
let Some(idx) = stage.spec.outlets.iter().position(|o| o.id() == outlet) else {
return Ok(StageTransition::none());
};
if cancelled[idx] {
return Ok(StageTransition::none());
}
if let Some((p_idx, p_val)) = pending.take() {
if p_idx == idx {
let out = stage.spec.outlets[idx].id();
if *upstream_closed {
*completed = true;
StageTransition::emit(StageEmissions::One(out, p_val))
.with_completion(
stage.spec.outlets.iter().map(AnyOutlet::id).collect(),
)
} else {
StageTransition::emit(StageEmissions::One(out, p_val))
}
} else {
*pending = Some((p_idx, p_val));
demand[idx] = true;
StageTransition::none()
}
} else {
demand[idx] = true;
StageTransition::none()
}
};
Ok(result)
}
_ => Ok(StageTransition::none()),
}
}
fn process_downstream_finish(
&mut self,
stage_index: usize,
outlet: PortId,
) -> StreamResult<StageTransition> {
let stage = &self.graph.stages[stage_index];
match &stage.spec.kind {
StageKind::Broadcast => {
let StageState::Broadcast {
cancelled_outlets,
live_outlets,
..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"broadcast state is missing".into(),
));
};
let index = stage
.spec
.outlets
.iter()
.position(|candidate| candidate.id() == outlet)
.ok_or_else(|| {
StreamError::GraphValidation(format!(
"broadcast outlet {} is not part of the stage",
outlet.as_usize()
))
})?;
if cancelled_outlets[index] {
return Ok(StageTransition::none());
}
cancelled_outlets[index] = true;
*live_outlets -= 1;
if *live_outlets == 0 {
Ok(StageTransition::none()
.with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect()))
} else {
Ok(StageTransition::none())
}
}
StageKind::Balance => {
let StageState::Balance {
cancelled_outlets,
live_outlets,
..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"balance state is missing".into(),
));
};
let index = stage
.spec
.outlets
.iter()
.position(|candidate| candidate.id() == outlet)
.ok_or_else(|| {
StreamError::GraphValidation(format!(
"balance outlet {} is not part of the stage",
outlet.as_usize()
))
})?;
if cancelled_outlets[index] {
return Ok(StageTransition::none());
}
cancelled_outlets[index] = true;
*live_outlets -= 1;
if *live_outlets == 0 {
Ok(StageTransition::none()
.with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect()))
} else {
Ok(StageTransition::none())
}
}
StageKind::Unzip { .. } => {
let StageState::Unzip { cancelled, .. } = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"unzip state is missing".into(),
));
};
let idx = stage
.spec
.outlets
.iter()
.position(|o| o.id() == outlet)
.unwrap_or(0);
if idx < 2 && !cancelled[idx] {
cancelled[idx] = true;
let all_cancelled = cancelled.iter().all(|c| *c);
if all_cancelled {
Ok(StageTransition::none().with_cancellations(
stage.spec.inlets.iter().map(AnyInlet::id).collect(),
))
} else {
Ok(StageTransition::none())
}
} else {
Ok(StageTransition::none())
}
}
StageKind::MergeSorted(_)
| StageKind::MergeSequence { .. }
| StageKind::MergeLatest { .. } => Ok(StageTransition::none()
.with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect())),
StageKind::Partition { eager_cancel, .. } => {
let result = {
let StageState::Partition {
pending,
cancelled,
completed,
..
} = &mut self.stage_states[stage_index]
else {
return Err(StreamError::GraphValidation(
"partition state is missing".into(),
));
};
if *completed {
return Ok(StageTransition::none());
}
let Some(idx) = stage.spec.outlets.iter().position(|o| o.id() == outlet) else {
return Ok(StageTransition::none());
};
if cancelled[idx] {
return Ok(StageTransition::none());
}
cancelled[idx] = true;
if let Some((p_idx, _)) = pending
&& *p_idx == idx
{
*pending = None;
}
let all_cancelled = cancelled.iter().all(|c| *c);
if all_cancelled || *eager_cancel {
*completed = true;
StageTransition::none().with_cancellations(
stage.spec.inlets.iter().map(AnyInlet::id).collect(),
)
} else {
StageTransition::none()
}
};
Ok(result)
}
StageKind::Opaque => {
let no_cancelled_outlets = self.cancelled_outlets.is_empty();
if let Some(logic) = self
.opaque_logics
.get_mut(stage_index)
.and_then(|l| l.as_mut())
{
logic.drain_async_callbacks();
logic.downstream_finish_by_id(outlet, "downstream_finish")?;
let outlet_ref = stage
.spec
.outlets
.iter()
.find(|o| o.id() == outlet)
.cloned();
if let Some(outlet_ref) = outlet_ref {
let mut handler = logic.take_out_handler(outlet);
let result = if let Some(ref mut handler) = handler {
handler.on_downstream_finish(logic, outlet_ref)
} else {
Ok(())
};
if let Some(handler) = handler
&& handler.keep_handler()
&& logic.get_out_handler_mut(outlet).is_none()
{
logic.restore_out_handler(outlet, handler);
}
result?;
}
let all_outlets_closed = stage.spec.outlets.iter().all(|candidate| {
logic.is_closed_by_id(candidate.id())
|| (!no_cancelled_outlets
&& self.cancelled_outlets.contains(&candidate.id()))
});
let mut transition = self.collect_opaque_emissions(stage, stage_index)?;
if all_outlets_closed {
transition.cancelled_inlets =
stage.spec.inlets.iter().map(AnyInlet::id).collect();
}
Ok(transition)
} else {
Ok(StageTransition::none()
.with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect()))
}
}
_ => Ok(StageTransition::none()
.with_cancellations(stage.spec.inlets.iter().map(AnyInlet::id).collect())),
}
}
fn collect_opaque_emissions(
&mut self,
stage: &StageRecord,
stage_index: usize,
) -> StreamResult<StageTransition> {
if let Some(logic) = self
.opaque_logics
.get_mut(stage_index)
.and_then(|l| l.as_mut())
{
let emissions_slots = std::mem::take(&mut logic.pending_emissions);
let completions = std::mem::take(&mut logic.pending_completions);
let has_stage_failed = logic.stage_error().is_some();
let emissions = if emissions_slots.is_empty() {
StageEmissions::None
} else if emissions_slots.len() == 1 {
let (port, val) = emissions_slots.into_iter().next().unwrap();
StageEmissions::One(port, val)
} else {
StageEmissions::Many(emissions_slots)
};
if has_stage_failed {
let _ = has_stage_failed;
}
Ok(StageTransition {
emissions,
completed_outlets: completions,
cancelled_inlets: Vec::new(),
})
} else {
Ok(StageTransition::emit(StageEmissions::None)
.with_completion(vec![single_outlet(stage)?]))
}
}
fn bump_event(&mut self) -> StreamResult<()> {
bump_fused_event(&mut self.events, self.config)
}
fn prime_connected_demands(&mut self) {
for (stage_index, stage) in self.graph.stages.iter().enumerate() {
match &stage.spec.kind {
StageKind::Opaque => {
let Some(logic) = self
.opaque_logics
.get_mut(stage_index)
.and_then(|logic| logic.as_mut())
else {
continue;
};
for outlet in &stage.spec.outlets {
if self.edge_by_outlet.contains_key(&outlet.id()) {
let _ = logic.set_demand_by_id(outlet.id());
}
}
}
StageKind::Unzip { .. } => {
let StageState::Unzip { demand, .. } = &mut self.stage_states[stage_index]
else {
continue;
};
for (idx, outlet) in stage.spec.outlets.iter().enumerate() {
if self.edge_by_outlet.contains_key(&outlet.id()) {
demand[idx] = true;
}
}
}
StageKind::Partition { .. } => {
let StageState::Partition {
demand,
output_count,
..
} = &mut self.stage_states[stage_index]
else {
continue;
};
for (idx, demand_slot) in demand.iter_mut().enumerate().take(*output_count) {
if idx < stage.spec.outlets.len()
&& self
.edge_by_outlet
.contains_key(&stage.spec.outlets[idx].id())
{
*demand_slot = true;
}
}
}
_ => {}
}
}
}
}
impl<Left, Right> GraphBlueprint<ZipShape<Left, Right>>
where
Left: Clone + Send + 'static,
Right: Clone + Send + 'static,
{
pub fn run_zip(&self, left: Vec<Left>, right: Vec<Right>) -> StreamResult<Vec<(Left, Right)>> {
Ok(self
.run_zip_report(left, right, FusedExecutionConfig::default())?
.output)
}
pub fn run_zip_report(
&self,
left: Vec<Left>,
right: Vec<Right>,
config: FusedExecutionConfig,
) -> StreamResult<FusedExecutionReport<(Left, Right)>> {
let mut left = left.into_iter();
let mut right = right.into_iter();
let left_inlet = self.shape.in0().id();
let right_inlet = self.shape.in1().id();
let outlet = self.shape.outlet().id();
let mut executor = FusedExecutor::new(self, config);
let mut output = Vec::with_capacity(left.len().min(right.len()));
let mut left_completed = false;
let mut right_completed = false;
{
let mut output_sink = VecOutputSink {
output: &mut output,
};
if left.len() == 0 {
executor.complete(left_inlet, outlet, &mut output_sink)?;
left_completed = true;
}
if right.len() == 0 {
executor.complete(right_inlet, outlet, &mut output_sink)?;
right_completed = true;
}
while left.len() > 0 || right.len() > 0 {
if let Some(item) = left.next() {
executor.deliver(left_inlet, datum(item), outlet, &mut output_sink)?;
if left.len() == 0 && !left_completed {
executor.complete(left_inlet, outlet, &mut output_sink)?;
left_completed = true;
}
}
if let Some(item) = right.next() {
executor.deliver(right_inlet, datum(item), outlet, &mut output_sink)?;
if right.len() == 0 && !right_completed {
executor.complete(right_inlet, outlet, &mut output_sink)?;
right_completed = true;
}
}
}
}
Ok(FusedExecutionReport {
output,
events: executor.events,
async_boundary_crossings: executor.async_boundary_crossings,
})
}
}
fn unzip_fan_in_fast_path<S: Shape>(
stage: &StageRecord,
graph: &GraphBlueprint<S>,
edge_by_outlet: &HashMap<PortId, PortId>,
stage_by_inlet: &HashMap<PortId, usize>,
) -> Option<UnzipFanInFastPath> {
let outlets = &stage.spec.outlets;
if outlets.len() != 2 {
return None;
}
let inlet0 = edge_by_outlet.get(&outlets[0].id()).copied()?;
let inlet1 = edge_by_outlet.get(&outlets[1].id()).copied()?;
let stage0 = *stage_by_inlet.get(&inlet0)?;
let stage1 = *stage_by_inlet.get(&inlet1)?;
if stage0 != stage1 {
return None;
}
let target = graph.stages.get(stage0)?;
if !matches!(
target.spec.kind,
StageKind::MergeSorted(_) | StageKind::MergeSequence { .. } | StageKind::MergeLatest { .. }
) {
return None;
}
let idx0 = target.spec.inlets.iter().position(|i| i.id() == inlet0)?;
let idx1 = target.spec.inlets.iter().position(|i| i.id() == inlet1)?;
Some(UnzipFanInFastPath {
fan_in_stage_index: stage0,
target_inlet_indices: [idx0, idx1],
})
}
fn unzip_zip_fast_path<S: Shape>(
stage: &StageRecord,
graph: &GraphBlueprint<S>,
edge_by_outlet: &HashMap<PortId, PortId>,
stage_by_inlet: &HashMap<PortId, usize>,
) -> Option<UnzipZipFastPath> {
let outlets = &stage.spec.outlets;
if outlets.len() != 2 {
return None;
}
let inlet0 = edge_by_outlet.get(&outlets[0].id()).copied()?;
let inlet1 = edge_by_outlet.get(&outlets[1].id()).copied()?;
let stage0 = *stage_by_inlet.get(&inlet0)?;
let stage1 = *stage_by_inlet.get(&inlet1)?;
if stage0 != stage1 {
return None;
}
let target = graph.stages.get(stage0)?;
if !matches!(target.spec.kind, StageKind::Zip(_)) {
return None;
}
Some(UnzipZipFastPath {
zip_stage_index: stage0,
})
}
pub(crate) fn bump_fused_event(
events: &mut usize,
config: FusedExecutionConfig,
) -> StreamResult<()> {
*events += 1;
if *events > config.event_limit {
return Err(StreamError::EventLimitExceeded {
limit: config.event_limit,
});
}
Ok(())
}
fn broadcast_emissions(outlets: &[AnyOutlet], value: DatumValue) -> StreamResult<StageEmissions> {
match outlets {
[] => Err(StreamError::GraphValidation(
"broadcast has no outlets".into(),
)),
[outlet] => Ok(StageEmissions::One(outlet.id(), value)),
[first, second] => Ok(StageEmissions::Two(
(first.id(), value.clone_box()),
(second.id(), value),
)),
outlets => {
let mut emitted = Vec::with_capacity(outlets.len());
for outlet in &outlets[..outlets.len() - 1] {
emitted.push((outlet.id(), value.clone_box()));
}
emitted.push((outlets[outlets.len() - 1].id(), value));
Ok(StageEmissions::Many(emitted))
}
}
}
fn single_outlet(stage: &StageRecord) -> StreamResult<PortId> {
stage
.spec
.outlets
.first()
.map(AnyOutlet::id)
.ok_or_else(|| {
StreamError::GraphValidation(format!("stage {} has no outlet", stage.spec.name()))
})
}
fn merge_emissions(first: StageEmissions, second: StageEmissions) -> StageEmissions {
match (first, second) {
(StageEmissions::None, other) | (other, StageEmissions::None) => other,
(StageEmissions::One(p1, v1), StageEmissions::One(p2, v2)) => {
StageEmissions::Many(vec![(p1, v1), (p2, v2)])
}
(StageEmissions::One(p, v), StageEmissions::Many(mut vec))
| (StageEmissions::Many(mut vec), StageEmissions::One(p, v)) => {
vec.push((p, v));
StageEmissions::Many(vec)
}
(StageEmissions::Many(mut v1), StageEmissions::Many(v2)) => {
v1.extend(v2);
StageEmissions::Many(v1)
}
(a, b) => {
let mut all = Vec::new();
push_emissions(&mut all, a);
push_emissions(&mut all, b);
StageEmissions::Many(all)
}
}
}
fn push_emissions(out: &mut Vec<(PortId, DatumValue)>, emissions: StageEmissions) {
match emissions {
StageEmissions::None => {}
StageEmissions::One(p, v) => out.push((p, v)),
StageEmissions::Two((p1, v1), (p2, v2)) => {
out.push((p1, v1));
out.push((p2, v2));
}
StageEmissions::Many(vec) => out.extend(vec),
}
}