Skip to main content

DagExecutor

Struct DagExecutor 

Source
pub struct DagExecutor { /* private fields */ }
Expand description

Ring 0 DAG executor for event processing.

Processes events through a finalized StreamingDag in topological order using the pre-computed RoutingTable for O(1) dispatch.

§Construction

let dag = DagBuilder::new()
    .source("src", schema.clone())
    .operator("transform", schema.clone())
    .connect("src", "transform")
    .sink_for("transform", "out", schema.clone())
    .build()?;

let mut executor = DagExecutor::from_dag(&dag);
executor.register_operator(transform_id, Box::new(my_operator));
executor.process_event(src_id, event)?;
let outputs = executor.take_sink_outputs(out_id);

Implementations§

Source§

impl DagExecutor

Source

pub fn from_dag(dag: &StreamingDag) -> Self

Creates a new executor from a finalized StreamingDag.

Allocates all per-node state (input queues, runtimes, sink buffers) up front in Ring 2. The hot path (process_event) is allocation-free.

§Arguments
  • dag - A finalized StreamingDag topology
Source

pub fn register_operator(&mut self, node: NodeId, operator: Box<dyn Operator>)

Registers an operator for a node.

Nodes without registered operators act as passthrough: events are forwarded to downstream nodes unchanged. This is the default for source and sink nodes.

§Arguments
  • node - The node ID to register the operator for
  • operator - The operator implementation
Source

pub fn process_event( &mut self, source_node: NodeId, event: Event, ) -> Result<(), DagError>

Processes an event from a source node through the entire DAG.

The event is enqueued at the source node, then all nodes are processed in topological order. Events produced by operators are routed to downstream nodes via the RoutingTable. Sink outputs are collected and can be retrieved via take_sink_outputs().

§Arguments
  • source_node - The source node to inject the event
  • event - The event to process
§Errors

Returns DagError::NodeNotFound if the source node is out of bounds.

Source

pub fn take_sink_outputs(&mut self, sink_node: NodeId) -> Vec<Event>

Takes collected sink outputs for a given sink node.

Returns all events that reached this sink during prior process_event calls, draining the internal buffer.

Source

pub fn take_all_sink_outputs(&mut self) -> FxHashMap<NodeId, Vec<Event>>

Takes all sink outputs across all sink nodes.

Source

pub fn metrics(&self) -> &DagExecutorMetrics

Returns a reference to the executor metrics.

Source

pub fn reset_metrics(&mut self)

Resets all executor metrics to zero.

Source

pub fn source_nodes(&self) -> &[NodeId]

Returns the source node IDs.

Source

pub fn sink_nodes(&self) -> &[NodeId]

Returns the sink node IDs.

Source

pub fn node_type(&self, node: NodeId) -> Option<DagNodeType>

Returns the node type for a given node ID.

Source

pub fn checkpoint(&self) -> FxHashMap<NodeId, OperatorState>

Checkpoints all registered operators.

Returns a map of NodeId to OperatorState for all nodes that have registered operators.

Source

pub fn restore( &mut self, states: &FxHashMap<NodeId, OperatorState>, ) -> Result<(), DagError>

Restores operator state from a checkpoint snapshot.

Iterates the provided states and calls operator.restore() on each registered operator.

§Errors

Returns DagError::RestoreFailed if any operator fails to restore.

Source

pub fn inject_events(&mut self, node_id: NodeId, events: Vec<Event>)

Injects events into a node’s input queue.

Used during recovery to repopulate queues with buffered events.

Source

pub fn input_count(&self, node_id: NodeId) -> usize

Returns the number of incoming edges for a node.

Source

pub fn process_checkpoint_barrier( &mut self, _barrier: &CheckpointBarrier, ) -> FxHashMap<NodeId, OperatorState>

Snapshots all registered operators in topological order.

Takes the barrier for consistency (future use with epoch tracking). In the synchronous single-threaded executor, topological ordering guarantees upstream-first snapshots.

Trait Implementations§

Source§

impl Debug for DagExecutor

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> ArchivePointee for T

Source§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
Source§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> LayoutRaw for T

Source§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
Source§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

Source§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
Source§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
Source§

impl<T> Pointee for T

Source§

type Metadata = ()

The metadata type for pointers and references to this type.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more