Skip to main content

laminar_core/dag/
executor.rs

1//! Ring 0 DAG executor for event processing.
2//!
3//! [`DagExecutor`] processes events through a finalized [`StreamingDag`] in
4//! topological order. It uses the pre-computed [`RoutingTable`] (F-DAG-002)
5//! for O(1) dispatch and integrates with the [`Operator`] trait for operator
6//! invocation.
7//!
8//! # Architecture
9//!
10//! ```text
11//! ┌──────────────────────────────────────────────────────────────────┐
12//! │                      RING 0: HOT PATH                            │
13//! │                                                                  │
14//! │  process_event(source, event)                                    │
15//! │       │                                                          │
16//! │       ▼                                                          │
17//! │  ┌──────────┐   topological   ┌───────────┐   route_output()    │
18//! │  │  enqueue  │──────order────▶│  operator  │──────────────────┐  │
19//! │  │  (input   │                │  .process()│                  │  │
20//! │  │   queue)  │                └───────────┘                  │  │
21//! │  └──────────┘                                                │  │
22//! │       ▲                                                      │  │
23//! │       │                  ┌─────────────────┐                 │  │
24//! │       └──────────────────│  RoutingTable   │◀────────────────┘  │
25//! │         enqueue targets  │  O(1) lookup    │                    │
26//! │                          └─────────────────┘                    │
27//! └──────────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! # Latency Budget
31//!
32//! | Component | Budget |
33//! |-----------|--------|
34//! | Routing table lookup | < 50ns |
35//! | Operator dispatch | < 200ns |
36//! | Multicast to N consumers | < 100ns |
37//! | State access | < 200ns |
38//! | **Total** | **< 500ns** |
39
40use std::collections::VecDeque;
41
42use fxhash::FxHashMap;
43use smallvec::SmallVec;
44
45use crate::alloc::HotPathGuard;
46use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output, OutputVec};
47use crate::state::InMemoryStore;
48use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
49
50use super::checkpoint::CheckpointBarrier;
51use super::error::DagError;
52use super::routing::RoutingTable;
53use super::topology::{DagNodeType, NodeId, StreamingDag};
54
55/// Per-node runtime state (timer service, state store, watermark generator).
56///
57/// Created during executor construction (Ring 2) and used during event
58/// processing (Ring 0). Temporarily moved out of the executor during
59/// operator dispatch to satisfy Rust's borrow checker.
60struct NodeRuntime {
61    /// Timer service for this node.
62    timer_service: TimerService,
63    /// State store for this node.
64    state_store: Box<dyn crate::state::StateStore>,
65    /// Watermark generator for this node.
66    watermark_generator: Box<dyn crate::time::WatermarkGenerator>,
67}
68
69impl Default for NodeRuntime {
70    fn default() -> Self {
71        Self {
72            timer_service: TimerService::new(),
73            state_store: Box::new(InMemoryStore::new()),
74            watermark_generator: Box::new(BoundedOutOfOrdernessGenerator::new(0)),
75        }
76    }
77}
78
79/// Metrics tracked by the DAG executor.
80///
81/// Counters are updated during event processing and can be read
82/// at any time for observability.
83#[derive(Debug, Clone, Default)]
84pub struct DagExecutorMetrics {
85    /// Total events processed through operator dispatch.
86    pub events_processed: u64,
87    /// Total events routed to downstream nodes.
88    pub events_routed: u64,
89    /// Total multicast dispatches (fan-out to > 1 target).
90    pub multicast_publishes: u64,
91    /// Total backpressure stalls encountered.
92    pub backpressure_stalls: u64,
93    /// Total nodes skipped (empty input queue).
94    pub nodes_skipped: u64,
95}
96
97/// Ring 0 DAG executor for event processing.
98///
99/// Processes events through a finalized [`StreamingDag`] in topological order
100/// using the pre-computed [`RoutingTable`] for O(1) dispatch.
101///
102/// # Construction
103///
104/// ```rust,ignore
105/// let dag = DagBuilder::new()
106///     .source("src", schema.clone())
107///     .operator("transform", schema.clone())
108///     .connect("src", "transform")
109///     .sink_for("transform", "out", schema.clone())
110///     .build()?;
111///
112/// let mut executor = DagExecutor::from_dag(&dag);
113/// executor.register_operator(transform_id, Box::new(my_operator));
114/// executor.process_event(src_id, event)?;
115/// let outputs = executor.take_sink_outputs(out_id);
116/// ```
117pub struct DagExecutor {
118    /// Registered operators, indexed by `NodeId.0`. `None` = passthrough.
119    operators: Vec<Option<Box<dyn Operator>>>,
120    /// Per-node runtime state (timer, state store, watermark generator).
121    runtimes: Vec<Option<NodeRuntime>>,
122    /// Pre-allocated input queues per node, indexed by `NodeId.0`.
123    input_queues: Vec<VecDeque<Event>>,
124    /// Collected sink outputs, indexed by `NodeId.0`.
125    sink_outputs: Vec<Vec<Event>>,
126    /// Pre-computed routing table for O(1) dispatch.
127    routing: RoutingTable,
128    /// Topological execution order (from the finalized DAG).
129    execution_order: Vec<NodeId>,
130    /// Source node IDs.
131    source_nodes: Vec<NodeId>,
132    /// Sink node IDs.
133    sink_nodes: Vec<NodeId>,
134    /// Node types, indexed by `NodeId.0`.
135    node_types: Vec<DagNodeType>,
136    /// Total number of node slots allocated.
137    slot_count: usize,
138    /// Number of incoming edges per node, indexed by `NodeId.0`.
139    input_counts: Vec<usize>,
140    /// Temporary buffer for draining input queues (avoids allocation).
141    temp_events: Vec<Event>,
142    /// Executor metrics.
143    metrics: DagExecutorMetrics,
144}
145
146impl DagExecutor {
147    /// Creates a new executor from a finalized [`StreamingDag`].
148    ///
149    /// Allocates all per-node state (input queues, runtimes, sink buffers)
150    /// up front in Ring 2. The hot path (`process_event`) is allocation-free.
151    ///
152    /// # Arguments
153    ///
154    /// * `dag` - A finalized `StreamingDag` topology
155    #[must_use]
156    pub fn from_dag(dag: &StreamingDag) -> Self {
157        let slot_count = dag.nodes().keys().map(|n| n.0).max().map_or(0, |n| n + 1) as usize;
158
159        let routing = RoutingTable::from_dag(dag);
160
161        let mut operators = Vec::with_capacity(slot_count);
162        let mut runtimes = Vec::with_capacity(slot_count);
163        let mut input_queues = Vec::with_capacity(slot_count);
164        let mut sink_outputs = Vec::with_capacity(slot_count);
165        let mut node_types = Vec::with_capacity(slot_count);
166        let mut input_counts = vec![0usize; slot_count];
167
168        for _ in 0..slot_count {
169            operators.push(None);
170            runtimes.push(Some(NodeRuntime::default()));
171            input_queues.push(VecDeque::with_capacity(16));
172            sink_outputs.push(Vec::new());
173            node_types.push(DagNodeType::StatelessOperator);
174        }
175
176        // Populate node types and input counts from the DAG.
177        for node in dag.nodes().values() {
178            let idx = node.id.0 as usize;
179            if idx < slot_count {
180                node_types[idx] = node.node_type;
181                input_counts[idx] = dag.incoming_edge_count(node.id);
182            }
183        }
184
185        Self {
186            operators,
187            runtimes,
188            input_queues,
189            sink_outputs,
190            routing,
191            execution_order: dag.execution_order().to_vec(),
192            source_nodes: dag.sources().to_vec(),
193            sink_nodes: dag.sinks().to_vec(),
194            node_types,
195            slot_count,
196            input_counts,
197            temp_events: Vec::with_capacity(64),
198            metrics: DagExecutorMetrics::default(),
199        }
200    }
201
202    /// Registers an operator for a node.
203    ///
204    /// Nodes without registered operators act as passthrough: events are
205    /// forwarded to downstream nodes unchanged. This is the default for
206    /// source and sink nodes.
207    ///
208    /// # Arguments
209    ///
210    /// * `node` - The node ID to register the operator for
211    /// * `operator` - The operator implementation
212    pub fn register_operator(&mut self, node: NodeId, operator: Box<dyn Operator>) {
213        let idx = node.0 as usize;
214        if idx < self.slot_count {
215            self.operators[idx] = Some(operator);
216        }
217    }
218
219    /// Processes an event from a source node through the entire DAG.
220    ///
221    /// The event is enqueued at the source node, then all nodes are processed
222    /// in topological order. Events produced by operators are routed to
223    /// downstream nodes via the [`RoutingTable`]. Sink outputs are collected
224    /// and can be retrieved via [`take_sink_outputs()`](Self::take_sink_outputs).
225    ///
226    /// # Arguments
227    ///
228    /// * `source_node` - The source node to inject the event
229    /// * `event` - The event to process
230    ///
231    /// # Errors
232    ///
233    /// Returns [`DagError::NodeNotFound`] if the source node is out of bounds.
234    pub fn process_event(&mut self, source_node: NodeId, event: Event) -> Result<(), DagError> {
235        let idx = source_node.0 as usize;
236        if idx >= self.slot_count {
237            return Err(DagError::NodeNotFound(format!("{source_node}")));
238        }
239
240        self.input_queues[idx].push_back(event);
241        self.process_dag();
242        Ok(())
243    }
244
245    /// Takes collected sink outputs for a given sink node.
246    ///
247    /// Returns all events that reached this sink during prior
248    /// `process_event` calls, draining the internal buffer.
249    #[must_use]
250    pub fn take_sink_outputs(&mut self, sink_node: NodeId) -> Vec<Event> {
251        let idx = sink_node.0 as usize;
252        if idx < self.slot_count {
253            std::mem::take(&mut self.sink_outputs[idx])
254        } else {
255            Vec::new()
256        }
257    }
258
259    /// Takes all sink outputs across all sink nodes.
260    #[must_use]
261    pub fn take_all_sink_outputs(&mut self) -> FxHashMap<NodeId, Vec<Event>> {
262        let mut outputs = FxHashMap::default();
263        let sink_ids: SmallVec<[NodeId; 8]> = self.sink_nodes.iter().copied().collect();
264        for sink_id in sink_ids {
265            let events = self.take_sink_outputs(sink_id);
266            if !events.is_empty() {
267                outputs.insert(sink_id, events);
268            }
269        }
270        outputs
271    }
272
273    /// Returns a reference to the executor metrics.
274    #[must_use]
275    pub fn metrics(&self) -> &DagExecutorMetrics {
276        &self.metrics
277    }
278
279    /// Resets all executor metrics to zero.
280    pub fn reset_metrics(&mut self) {
281        self.metrics = DagExecutorMetrics::default();
282    }
283
284    /// Returns the source node IDs.
285    #[must_use]
286    pub fn source_nodes(&self) -> &[NodeId] {
287        &self.source_nodes
288    }
289
290    /// Returns the sink node IDs.
291    #[must_use]
292    pub fn sink_nodes(&self) -> &[NodeId] {
293        &self.sink_nodes
294    }
295
296    /// Returns the node type for a given node ID.
297    #[must_use]
298    pub fn node_type(&self, node: NodeId) -> Option<DagNodeType> {
299        let idx = node.0 as usize;
300        if idx < self.slot_count {
301            Some(self.node_types[idx])
302        } else {
303            None
304        }
305    }
306
307    /// Checkpoints all registered operators.
308    ///
309    /// Returns a map of `NodeId` to `OperatorState` for all nodes
310    /// that have registered operators.
311    #[must_use]
312    pub fn checkpoint(&self) -> FxHashMap<NodeId, OperatorState> {
313        let mut states = FxHashMap::default();
314        for (idx, op) in self.operators.iter().enumerate() {
315            if let Some(operator) = op {
316                #[allow(clippy::cast_possible_truncation)]
317                // DAG node count bounded by topology (< u32::MAX)
318                let node_id = NodeId(idx as u32);
319                states.insert(node_id, operator.checkpoint());
320            }
321        }
322        states
323    }
324
325    /// Restores operator state from a checkpoint snapshot.
326    ///
327    /// Iterates the provided states and calls `operator.restore()` on each
328    /// registered operator.
329    ///
330    /// # Errors
331    ///
332    /// Returns [`DagError::RestoreFailed`] if any operator fails to restore.
333    pub fn restore(&mut self, states: &FxHashMap<NodeId, OperatorState>) -> Result<(), DagError> {
334        for (node_id, state) in states {
335            let idx = node_id.0 as usize;
336            if idx < self.slot_count {
337                if let Some(ref mut operator) = self.operators[idx] {
338                    operator
339                        .restore(state.clone())
340                        .map_err(|e| DagError::RestoreFailed {
341                            node_id: format!("{node_id}"),
342                            reason: e.to_string(),
343                        })?;
344                }
345            }
346        }
347        Ok(())
348    }
349
350    /// Injects events into a node's input queue.
351    ///
352    /// Used during recovery to repopulate queues with buffered events.
353    pub fn inject_events(&mut self, node_id: NodeId, events: Vec<Event>) {
354        let idx = node_id.0 as usize;
355        if idx < self.slot_count {
356            self.input_queues[idx].extend(events);
357        }
358    }
359
360    /// Returns the number of incoming edges for a node.
361    #[must_use]
362    pub fn input_count(&self, node_id: NodeId) -> usize {
363        let idx = node_id.0 as usize;
364        if idx < self.slot_count {
365            self.input_counts[idx]
366        } else {
367            0
368        }
369    }
370
371    /// Snapshots all registered operators in topological order.
372    ///
373    /// Takes the barrier for consistency (future use with epoch tracking).
374    /// In the synchronous single-threaded executor, topological ordering
375    /// guarantees upstream-first snapshots.
376    #[must_use]
377    pub fn process_checkpoint_barrier(
378        &mut self,
379        _barrier: &CheckpointBarrier,
380    ) -> FxHashMap<NodeId, OperatorState> {
381        let mut states = FxHashMap::default();
382        for &node_id in &self.execution_order {
383            let idx = node_id.0 as usize;
384            if idx < self.slot_count {
385                if let Some(ref operator) = self.operators[idx] {
386                    states.insert(node_id, operator.checkpoint());
387                }
388            }
389        }
390        states
391    }
392
393    /// Processes all nodes in topological order.
394    ///
395    /// Drains input queues, dispatches to operators, and routes outputs
396    /// to downstream nodes. Uses [`HotPathGuard`] (F071) for zero-allocation
397    /// enforcement in debug builds.
398    fn process_dag(&mut self) {
399        let _guard = HotPathGuard::enter("dag_executor");
400
401        let order_len = self.execution_order.len();
402        for i in 0..order_len {
403            let node_id = self.execution_order[i];
404            self.process_node(node_id);
405        }
406    }
407
408    /// Processes a single node: drains its input queue, dispatches each event
409    /// to the operator, and routes outputs downstream.
410    fn process_node(&mut self, node_id: NodeId) {
411        let idx = node_id.0 as usize;
412
413        if self.input_queues[idx].is_empty() {
414            self.metrics.nodes_skipped += 1;
415            return;
416        }
417
418        // Swap temp buffer out of self so the borrow checker allows
419        // mutable access to other fields during the loop.
420        let mut events = std::mem::take(&mut self.temp_events);
421        events.clear();
422        events.extend(self.input_queues[idx].drain(..));
423
424        // Take operator and runtime out temporarily.
425        // This lets us mutably access the rest of `self` for routing.
426        let mut operator = self.operators[idx].take();
427        let mut runtime = self.runtimes[idx].take();
428
429        for event in events.drain(..) {
430            self.metrics.events_processed += 1;
431
432            let outputs = if let Some(op) = &mut operator {
433                if let Some(rt) = &mut runtime {
434                    let mut ctx = OperatorContext {
435                        event_time: event.timestamp,
436                        processing_time: 0,
437                        timers: &mut rt.timer_service,
438                        state: rt.state_store.as_mut(),
439                        watermark_generator: rt.watermark_generator.as_mut(),
440                        operator_index: idx,
441                    };
442                    op.process(&event, &mut ctx)
443                } else {
444                    passthrough_output(event)
445                }
446            } else {
447                passthrough_output(event)
448            };
449
450            // Route outputs to downstream nodes.
451            for output in outputs {
452                if let Output::Event(out_event) = output {
453                    self.route_output(node_id, out_event);
454                }
455            }
456        }
457
458        // Put operator and runtime back.
459        self.operators[idx] = operator;
460        self.runtimes[idx] = runtime;
461        self.temp_events = events;
462    }
463
464    /// Routes an output event from a source node to its downstream targets.
465    ///
466    /// - **Terminal (sink)**: event is collected in `sink_outputs`.
467    /// - **Single target**: event is enqueued directly (no clone).
468    /// - **Multicast**: event is cloned to N-1 targets, moved to the last.
469    fn route_output(&mut self, source: NodeId, event: Event) {
470        let entry = self.routing.node_targets(source);
471
472        if entry.is_terminal() {
473            // Sink node: collect output.
474            self.sink_outputs[source.0 as usize].push(event);
475            return;
476        }
477
478        self.metrics.events_routed += 1;
479
480        if entry.is_multicast {
481            self.metrics.multicast_publishes += 1;
482            let targets = entry.target_ids();
483
484            // Clone to all targets except the last, which gets the moved value.
485            for &target_id in &targets[..targets.len() - 1] {
486                self.input_queues[target_id as usize].push_back(event.clone());
487            }
488            self.input_queues[targets[targets.len() - 1] as usize].push_back(event);
489        } else {
490            // Single target: enqueue directly (zero-copy move).
491            self.input_queues[entry.targets[0] as usize].push_back(event);
492        }
493    }
494}
495
496impl std::fmt::Debug for DagExecutor {
497    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
498        f.debug_struct("DagExecutor")
499            .field("slot_count", &self.slot_count)
500            .field("source_nodes", &self.source_nodes)
501            .field("sink_nodes", &self.sink_nodes)
502            .field("execution_order", &self.execution_order)
503            .field("metrics", &self.metrics)
504            .finish_non_exhaustive()
505    }
506}
507
508/// Creates a passthrough output (forwards the event unchanged).
509#[inline]
510fn passthrough_output(event: Event) -> OutputVec {
511    let mut v = OutputVec::new();
512    v.push(Output::Event(event));
513    v
514}