laminar_core/dag/executor.rs
1//! Ring 0 DAG executor for event processing.
2//!
3//! [`DagExecutor`] processes events through a finalized [`StreamingDag`] in
4//! topological order. It uses the pre-computed [`RoutingTable`] (F-DAG-002)
5//! for O(1) dispatch and integrates with the [`Operator`] trait for operator
6//! invocation.
7//!
8//! # Architecture
9//!
10//! ```text
11//! ┌──────────────────────────────────────────────────────────────────┐
12//! │ RING 0: HOT PATH │
13//! │ │
14//! │ process_event(source, event) │
15//! │ │ │
16//! │ ▼ │
17//! │ ┌──────────┐ topological ┌───────────┐ route_output() │
18//! │ │ enqueue │──────order────▶│ operator │──────────────────┐ │
19//! │ │ (input │ │ .process()│ │ │
20//! │ │ queue) │ └───────────┘ │ │
21//! │ └──────────┘ │ │
22//! │ ▲ │ │
23//! │ │ ┌─────────────────┐ │ │
24//! │ └──────────────────│ RoutingTable │◀────────────────┘ │
25//! │ enqueue targets │ O(1) lookup │ │
26//! │ └─────────────────┘ │
27//! └──────────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! # Latency Budget
31//!
32//! | Component | Budget |
33//! |-----------|--------|
34//! | Routing table lookup | < 50ns |
35//! | Operator dispatch | < 200ns |
36//! | Multicast to N consumers | < 100ns |
37//! | State access | < 200ns |
38//! | **Total** | **< 500ns** |
39
40use std::collections::VecDeque;
41
42use fxhash::FxHashMap;
43use smallvec::SmallVec;
44
45use crate::alloc::HotPathGuard;
46use crate::operator::{Event, Operator, OperatorContext, OperatorState, Output, OutputVec};
47use crate::state::InMemoryStore;
48use crate::time::{BoundedOutOfOrdernessGenerator, TimerService};
49
50use super::checkpoint::CheckpointBarrier;
51use super::error::DagError;
52use super::routing::RoutingTable;
53use super::topology::{DagNodeType, NodeId, StreamingDag};
54
55/// Per-node runtime state (timer service, state store, watermark generator).
56///
57/// Created during executor construction (Ring 2) and used during event
58/// processing (Ring 0). Temporarily moved out of the executor during
59/// operator dispatch to satisfy Rust's borrow checker.
60struct NodeRuntime {
61 /// Timer service for this node.
62 timer_service: TimerService,
63 /// State store for this node.
64 state_store: Box<dyn crate::state::StateStore>,
65 /// Watermark generator for this node.
66 watermark_generator: Box<dyn crate::time::WatermarkGenerator>,
67}
68
69impl Default for NodeRuntime {
70 fn default() -> Self {
71 Self {
72 timer_service: TimerService::new(),
73 state_store: Box::new(InMemoryStore::new()),
74 watermark_generator: Box::new(BoundedOutOfOrdernessGenerator::new(0)),
75 }
76 }
77}
78
79/// Metrics tracked by the DAG executor.
80///
81/// Counters are updated during event processing and can be read
82/// at any time for observability.
83#[derive(Debug, Clone, Default)]
84pub struct DagExecutorMetrics {
85 /// Total events processed through operator dispatch.
86 pub events_processed: u64,
87 /// Total events routed to downstream nodes.
88 pub events_routed: u64,
89 /// Total multicast dispatches (fan-out to > 1 target).
90 pub multicast_publishes: u64,
91 /// Total backpressure stalls encountered.
92 pub backpressure_stalls: u64,
93 /// Total nodes skipped (empty input queue).
94 pub nodes_skipped: u64,
95}
96
97/// Ring 0 DAG executor for event processing.
98///
99/// Processes events through a finalized [`StreamingDag`] in topological order
100/// using the pre-computed [`RoutingTable`] for O(1) dispatch.
101///
102/// # Construction
103///
104/// ```rust,ignore
105/// let dag = DagBuilder::new()
106/// .source("src", schema.clone())
107/// .operator("transform", schema.clone())
108/// .connect("src", "transform")
109/// .sink_for("transform", "out", schema.clone())
110/// .build()?;
111///
112/// let mut executor = DagExecutor::from_dag(&dag);
113/// executor.register_operator(transform_id, Box::new(my_operator));
114/// executor.process_event(src_id, event)?;
115/// let outputs = executor.take_sink_outputs(out_id);
116/// ```
117pub struct DagExecutor {
118 /// Registered operators, indexed by `NodeId.0`. `None` = passthrough.
119 operators: Vec<Option<Box<dyn Operator>>>,
120 /// Per-node runtime state (timer, state store, watermark generator).
121 runtimes: Vec<Option<NodeRuntime>>,
122 /// Pre-allocated input queues per node, indexed by `NodeId.0`.
123 input_queues: Vec<VecDeque<Event>>,
124 /// Collected sink outputs, indexed by `NodeId.0`.
125 sink_outputs: Vec<Vec<Event>>,
126 /// Pre-computed routing table for O(1) dispatch.
127 routing: RoutingTable,
128 /// Topological execution order (from the finalized DAG).
129 execution_order: Vec<NodeId>,
130 /// Source node IDs.
131 source_nodes: Vec<NodeId>,
132 /// Sink node IDs.
133 sink_nodes: Vec<NodeId>,
134 /// Node types, indexed by `NodeId.0`.
135 node_types: Vec<DagNodeType>,
136 /// Total number of node slots allocated.
137 slot_count: usize,
138 /// Number of incoming edges per node, indexed by `NodeId.0`.
139 input_counts: Vec<usize>,
140 /// Temporary buffer for draining input queues (avoids allocation).
141 temp_events: Vec<Event>,
142 /// Executor metrics.
143 metrics: DagExecutorMetrics,
144}
145
146impl DagExecutor {
147 /// Creates a new executor from a finalized [`StreamingDag`].
148 ///
149 /// Allocates all per-node state (input queues, runtimes, sink buffers)
150 /// up front in Ring 2. The hot path (`process_event`) is allocation-free.
151 ///
152 /// # Arguments
153 ///
154 /// * `dag` - A finalized `StreamingDag` topology
155 #[must_use]
156 pub fn from_dag(dag: &StreamingDag) -> Self {
157 let slot_count = dag.nodes().keys().map(|n| n.0).max().map_or(0, |n| n + 1) as usize;
158
159 let routing = RoutingTable::from_dag(dag);
160
161 let mut operators = Vec::with_capacity(slot_count);
162 let mut runtimes = Vec::with_capacity(slot_count);
163 let mut input_queues = Vec::with_capacity(slot_count);
164 let mut sink_outputs = Vec::with_capacity(slot_count);
165 let mut node_types = Vec::with_capacity(slot_count);
166 let mut input_counts = vec![0usize; slot_count];
167
168 for _ in 0..slot_count {
169 operators.push(None);
170 runtimes.push(Some(NodeRuntime::default()));
171 input_queues.push(VecDeque::with_capacity(16));
172 sink_outputs.push(Vec::new());
173 node_types.push(DagNodeType::StatelessOperator);
174 }
175
176 // Populate node types and input counts from the DAG.
177 for node in dag.nodes().values() {
178 let idx = node.id.0 as usize;
179 if idx < slot_count {
180 node_types[idx] = node.node_type;
181 input_counts[idx] = dag.incoming_edge_count(node.id);
182 }
183 }
184
185 Self {
186 operators,
187 runtimes,
188 input_queues,
189 sink_outputs,
190 routing,
191 execution_order: dag.execution_order().to_vec(),
192 source_nodes: dag.sources().to_vec(),
193 sink_nodes: dag.sinks().to_vec(),
194 node_types,
195 slot_count,
196 input_counts,
197 temp_events: Vec::with_capacity(64),
198 metrics: DagExecutorMetrics::default(),
199 }
200 }
201
202 /// Registers an operator for a node.
203 ///
204 /// Nodes without registered operators act as passthrough: events are
205 /// forwarded to downstream nodes unchanged. This is the default for
206 /// source and sink nodes.
207 ///
208 /// # Arguments
209 ///
210 /// * `node` - The node ID to register the operator for
211 /// * `operator` - The operator implementation
212 pub fn register_operator(&mut self, node: NodeId, operator: Box<dyn Operator>) {
213 let idx = node.0 as usize;
214 if idx < self.slot_count {
215 self.operators[idx] = Some(operator);
216 }
217 }
218
219 /// Processes an event from a source node through the entire DAG.
220 ///
221 /// The event is enqueued at the source node, then all nodes are processed
222 /// in topological order. Events produced by operators are routed to
223 /// downstream nodes via the [`RoutingTable`]. Sink outputs are collected
224 /// and can be retrieved via [`take_sink_outputs()`](Self::take_sink_outputs).
225 ///
226 /// # Arguments
227 ///
228 /// * `source_node` - The source node to inject the event
229 /// * `event` - The event to process
230 ///
231 /// # Errors
232 ///
233 /// Returns [`DagError::NodeNotFound`] if the source node is out of bounds.
234 pub fn process_event(&mut self, source_node: NodeId, event: Event) -> Result<(), DagError> {
235 let idx = source_node.0 as usize;
236 if idx >= self.slot_count {
237 return Err(DagError::NodeNotFound(format!("{source_node}")));
238 }
239
240 self.input_queues[idx].push_back(event);
241 self.process_dag();
242 Ok(())
243 }
244
245 /// Takes collected sink outputs for a given sink node.
246 ///
247 /// Returns all events that reached this sink during prior
248 /// `process_event` calls, draining the internal buffer.
249 #[must_use]
250 pub fn take_sink_outputs(&mut self, sink_node: NodeId) -> Vec<Event> {
251 let idx = sink_node.0 as usize;
252 if idx < self.slot_count {
253 std::mem::take(&mut self.sink_outputs[idx])
254 } else {
255 Vec::new()
256 }
257 }
258
259 /// Takes all sink outputs across all sink nodes.
260 #[must_use]
261 pub fn take_all_sink_outputs(&mut self) -> FxHashMap<NodeId, Vec<Event>> {
262 let mut outputs = FxHashMap::default();
263 let sink_ids: SmallVec<[NodeId; 8]> = self.sink_nodes.iter().copied().collect();
264 for sink_id in sink_ids {
265 let events = self.take_sink_outputs(sink_id);
266 if !events.is_empty() {
267 outputs.insert(sink_id, events);
268 }
269 }
270 outputs
271 }
272
273 /// Returns a reference to the executor metrics.
274 #[must_use]
275 pub fn metrics(&self) -> &DagExecutorMetrics {
276 &self.metrics
277 }
278
279 /// Resets all executor metrics to zero.
280 pub fn reset_metrics(&mut self) {
281 self.metrics = DagExecutorMetrics::default();
282 }
283
284 /// Returns the source node IDs.
285 #[must_use]
286 pub fn source_nodes(&self) -> &[NodeId] {
287 &self.source_nodes
288 }
289
290 /// Returns the sink node IDs.
291 #[must_use]
292 pub fn sink_nodes(&self) -> &[NodeId] {
293 &self.sink_nodes
294 }
295
296 /// Returns the node type for a given node ID.
297 #[must_use]
298 pub fn node_type(&self, node: NodeId) -> Option<DagNodeType> {
299 let idx = node.0 as usize;
300 if idx < self.slot_count {
301 Some(self.node_types[idx])
302 } else {
303 None
304 }
305 }
306
307 /// Checkpoints all registered operators.
308 ///
309 /// Returns a map of `NodeId` to `OperatorState` for all nodes
310 /// that have registered operators.
311 #[must_use]
312 pub fn checkpoint(&self) -> FxHashMap<NodeId, OperatorState> {
313 let mut states = FxHashMap::default();
314 for (idx, op) in self.operators.iter().enumerate() {
315 if let Some(operator) = op {
316 #[allow(clippy::cast_possible_truncation)]
317 // DAG node count bounded by topology (< u32::MAX)
318 let node_id = NodeId(idx as u32);
319 states.insert(node_id, operator.checkpoint());
320 }
321 }
322 states
323 }
324
325 /// Restores operator state from a checkpoint snapshot.
326 ///
327 /// Iterates the provided states and calls `operator.restore()` on each
328 /// registered operator.
329 ///
330 /// # Errors
331 ///
332 /// Returns [`DagError::RestoreFailed`] if any operator fails to restore.
333 pub fn restore(&mut self, states: &FxHashMap<NodeId, OperatorState>) -> Result<(), DagError> {
334 for (node_id, state) in states {
335 let idx = node_id.0 as usize;
336 if idx < self.slot_count {
337 if let Some(ref mut operator) = self.operators[idx] {
338 operator
339 .restore(state.clone())
340 .map_err(|e| DagError::RestoreFailed {
341 node_id: format!("{node_id}"),
342 reason: e.to_string(),
343 })?;
344 }
345 }
346 }
347 Ok(())
348 }
349
350 /// Injects events into a node's input queue.
351 ///
352 /// Used during recovery to repopulate queues with buffered events.
353 pub fn inject_events(&mut self, node_id: NodeId, events: Vec<Event>) {
354 let idx = node_id.0 as usize;
355 if idx < self.slot_count {
356 self.input_queues[idx].extend(events);
357 }
358 }
359
360 /// Returns the number of incoming edges for a node.
361 #[must_use]
362 pub fn input_count(&self, node_id: NodeId) -> usize {
363 let idx = node_id.0 as usize;
364 if idx < self.slot_count {
365 self.input_counts[idx]
366 } else {
367 0
368 }
369 }
370
371 /// Snapshots all registered operators in topological order.
372 ///
373 /// Takes the barrier for consistency (future use with epoch tracking).
374 /// In the synchronous single-threaded executor, topological ordering
375 /// guarantees upstream-first snapshots.
376 #[must_use]
377 pub fn process_checkpoint_barrier(
378 &mut self,
379 _barrier: &CheckpointBarrier,
380 ) -> FxHashMap<NodeId, OperatorState> {
381 let mut states = FxHashMap::default();
382 for &node_id in &self.execution_order {
383 let idx = node_id.0 as usize;
384 if idx < self.slot_count {
385 if let Some(ref operator) = self.operators[idx] {
386 states.insert(node_id, operator.checkpoint());
387 }
388 }
389 }
390 states
391 }
392
393 /// Processes all nodes in topological order.
394 ///
395 /// Drains input queues, dispatches to operators, and routes outputs
396 /// to downstream nodes. Uses [`HotPathGuard`] (F071) for zero-allocation
397 /// enforcement in debug builds.
398 fn process_dag(&mut self) {
399 let _guard = HotPathGuard::enter("dag_executor");
400
401 let order_len = self.execution_order.len();
402 for i in 0..order_len {
403 let node_id = self.execution_order[i];
404 self.process_node(node_id);
405 }
406 }
407
408 /// Processes a single node: drains its input queue, dispatches each event
409 /// to the operator, and routes outputs downstream.
410 fn process_node(&mut self, node_id: NodeId) {
411 let idx = node_id.0 as usize;
412
413 if self.input_queues[idx].is_empty() {
414 self.metrics.nodes_skipped += 1;
415 return;
416 }
417
418 // Swap temp buffer out of self so the borrow checker allows
419 // mutable access to other fields during the loop.
420 let mut events = std::mem::take(&mut self.temp_events);
421 events.clear();
422 events.extend(self.input_queues[idx].drain(..));
423
424 // Take operator and runtime out temporarily.
425 // This lets us mutably access the rest of `self` for routing.
426 let mut operator = self.operators[idx].take();
427 let mut runtime = self.runtimes[idx].take();
428
429 for event in events.drain(..) {
430 self.metrics.events_processed += 1;
431
432 let outputs = if let Some(op) = &mut operator {
433 if let Some(rt) = &mut runtime {
434 let mut ctx = OperatorContext {
435 event_time: event.timestamp,
436 processing_time: 0,
437 timers: &mut rt.timer_service,
438 state: rt.state_store.as_mut(),
439 watermark_generator: rt.watermark_generator.as_mut(),
440 operator_index: idx,
441 };
442 op.process(&event, &mut ctx)
443 } else {
444 passthrough_output(event)
445 }
446 } else {
447 passthrough_output(event)
448 };
449
450 // Route outputs to downstream nodes.
451 for output in outputs {
452 if let Output::Event(out_event) = output {
453 self.route_output(node_id, out_event);
454 }
455 }
456 }
457
458 // Put operator and runtime back.
459 self.operators[idx] = operator;
460 self.runtimes[idx] = runtime;
461 self.temp_events = events;
462 }
463
464 /// Routes an output event from a source node to its downstream targets.
465 ///
466 /// - **Terminal (sink)**: event is collected in `sink_outputs`.
467 /// - **Single target**: event is enqueued directly (no clone).
468 /// - **Multicast**: event is cloned to N-1 targets, moved to the last.
469 fn route_output(&mut self, source: NodeId, event: Event) {
470 let entry = self.routing.node_targets(source);
471
472 if entry.is_terminal() {
473 // Sink node: collect output.
474 self.sink_outputs[source.0 as usize].push(event);
475 return;
476 }
477
478 self.metrics.events_routed += 1;
479
480 if entry.is_multicast {
481 self.metrics.multicast_publishes += 1;
482 let targets = entry.target_ids();
483
484 // Clone to all targets except the last, which gets the moved value.
485 for &target_id in &targets[..targets.len() - 1] {
486 self.input_queues[target_id as usize].push_back(event.clone());
487 }
488 self.input_queues[targets[targets.len() - 1] as usize].push_back(event);
489 } else {
490 // Single target: enqueue directly (zero-copy move).
491 self.input_queues[entry.targets[0] as usize].push_back(event);
492 }
493 }
494}
495
496impl std::fmt::Debug for DagExecutor {
497 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
498 f.debug_struct("DagExecutor")
499 .field("slot_count", &self.slot_count)
500 .field("source_nodes", &self.source_nodes)
501 .field("sink_nodes", &self.sink_nodes)
502 .field("execution_order", &self.execution_order)
503 .field("metrics", &self.metrics)
504 .finish_non_exhaustive()
505 }
506}
507
508/// Creates a passthrough output (forwards the event unchanged).
509#[inline]
510fn passthrough_output(event: Event) -> OutputVec {
511 let mut v = OutputVec::new();
512 v.push(Output::Event(event));
513 v
514}