pub struct DagExecutor { /* private fields */ }Expand description
Ring 0 DAG executor for event processing.
Processes events through a finalized StreamingDag in topological order
using the pre-computed RoutingTable for O(1) dispatch.
§Construction
let dag = DagBuilder::new()
.source("src", schema.clone())
.operator("transform", schema.clone())
.connect("src", "transform")
.sink_for("transform", "out", schema.clone())
.build()?;
let mut executor = DagExecutor::from_dag(&dag);
executor.register_operator(transform_id, Box::new(my_operator));
executor.process_event(src_id, event)?;
let outputs = executor.take_sink_outputs(out_id);Implementations§
Source§impl DagExecutor
impl DagExecutor
Sourcepub fn from_dag(dag: &StreamingDag) -> Self
pub fn from_dag(dag: &StreamingDag) -> Self
Creates a new executor from a finalized StreamingDag.
Allocates all per-node state (input queues, runtimes, sink buffers)
up front in Ring 2. The hot path (process_event) is allocation-free.
§Arguments
dag- A finalizedStreamingDagtopology
Sourcepub fn register_operator(&mut self, node: NodeId, operator: Box<dyn Operator>)
pub fn register_operator(&mut self, node: NodeId, operator: Box<dyn Operator>)
Registers an operator for a node.
Nodes without registered operators act as passthrough: events are forwarded to downstream nodes unchanged. This is the default for source and sink nodes.
§Arguments
node- The node ID to register the operator foroperator- The operator implementation
Sourcepub fn process_event(
&mut self,
source_node: NodeId,
event: Event,
) -> Result<(), DagError>
pub fn process_event( &mut self, source_node: NodeId, event: Event, ) -> Result<(), DagError>
Processes an event from a source node through the entire DAG.
The event is enqueued at the source node, then all nodes are processed
in topological order. Events produced by operators are routed to
downstream nodes via the RoutingTable. Sink outputs are collected
and can be retrieved via take_sink_outputs().
§Arguments
source_node- The source node to inject the eventevent- The event to process
§Errors
Returns DagError::NodeNotFound if the source node is out of bounds.
Sourcepub fn take_sink_outputs(&mut self, sink_node: NodeId) -> Vec<Event>
pub fn take_sink_outputs(&mut self, sink_node: NodeId) -> Vec<Event>
Takes collected sink outputs for a given sink node.
Returns all events that reached this sink during prior
process_event calls, draining the internal buffer.
Sourcepub fn take_all_sink_outputs(&mut self) -> FxHashMap<NodeId, Vec<Event>>
pub fn take_all_sink_outputs(&mut self) -> FxHashMap<NodeId, Vec<Event>>
Takes all sink outputs across all sink nodes.
Sourcepub fn metrics(&self) -> &DagExecutorMetrics
pub fn metrics(&self) -> &DagExecutorMetrics
Returns a reference to the executor metrics.
Sourcepub fn reset_metrics(&mut self)
pub fn reset_metrics(&mut self)
Resets all executor metrics to zero.
Sourcepub fn source_nodes(&self) -> &[NodeId]
pub fn source_nodes(&self) -> &[NodeId]
Returns the source node IDs.
Sourcepub fn sink_nodes(&self) -> &[NodeId]
pub fn sink_nodes(&self) -> &[NodeId]
Returns the sink node IDs.
Sourcepub fn node_type(&self, node: NodeId) -> Option<DagNodeType>
pub fn node_type(&self, node: NodeId) -> Option<DagNodeType>
Returns the node type for a given node ID.
Sourcepub fn checkpoint(&self) -> FxHashMap<NodeId, OperatorState>
pub fn checkpoint(&self) -> FxHashMap<NodeId, OperatorState>
Checkpoints all registered operators.
Returns a map of NodeId to OperatorState for all nodes
that have registered operators.
Sourcepub fn restore(
&mut self,
states: &FxHashMap<NodeId, OperatorState>,
) -> Result<(), DagError>
pub fn restore( &mut self, states: &FxHashMap<NodeId, OperatorState>, ) -> Result<(), DagError>
Restores operator state from a checkpoint snapshot.
Iterates the provided states and calls operator.restore() on each
registered operator.
§Errors
Returns DagError::RestoreFailed if any operator fails to restore.
Sourcepub fn inject_events(&mut self, node_id: NodeId, events: Vec<Event>)
pub fn inject_events(&mut self, node_id: NodeId, events: Vec<Event>)
Injects events into a node’s input queue.
Used during recovery to repopulate queues with buffered events.
Sourcepub fn input_count(&self, node_id: NodeId) -> usize
pub fn input_count(&self, node_id: NodeId) -> usize
Returns the number of incoming edges for a node.
Sourcepub fn process_checkpoint_barrier(
&mut self,
_barrier: &CheckpointBarrier,
) -> FxHashMap<NodeId, OperatorState>
pub fn process_checkpoint_barrier( &mut self, _barrier: &CheckpointBarrier, ) -> FxHashMap<NodeId, OperatorState>
Snapshots all registered operators in topological order.
Takes the barrier for consistency (future use with epoch tracking). In the synchronous single-threaded executor, topological ordering guarantees upstream-first snapshots.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for DagExecutor
impl !RefUnwindSafe for DagExecutor
impl Send for DagExecutor
impl !Sync for DagExecutor
impl Unpin for DagExecutor
impl !UnwindSafe for DagExecutor
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.