daedalus_runtime/executor/
mod.rs

1use crate::plan::{BackpressureStrategy, EdgePolicyKind, RuntimeNode, RuntimePlan, RuntimeSegment};
2use crate::state::StateStore;
3use daedalus_planner::NodeRef;
4use std::collections::{BTreeMap, HashSet};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8mod errors;
9mod crash_diag;
10mod handler;
11mod parallel;
12mod payload;
13#[cfg(feature = "executor-pool")]
14mod pool;
15pub mod queue;
16mod serial;
17mod telemetry;
18
19pub use errors::{ExecuteError, NodeError};
20pub use handler::NodeHandler;
21pub use payload::{CorrelatedPayload, EdgePayload, next_correlation_id};
22pub use queue::EdgeStorage;
23pub use telemetry::{EdgeMetrics, ExecutionTelemetry, MetricsLevel, NodeMetrics};
24pub(crate) use telemetry::payload_size_bytes;
25/// Runtime executor for planner-generated runtime plans.
26///
27/// ```no_run
28/// use daedalus_runtime::executor::Executor;
29/// use daedalus_planner::{ExecutionPlan, Graph};
30/// use daedalus_runtime::RuntimePlan;
31///
32/// fn handler(
33///     _node: &daedalus_runtime::RuntimeNode,
34///     _ctx: &daedalus_runtime::state::ExecutionContext,
35///     _io: &mut daedalus_runtime::io::NodeIo,
36/// ) -> Result<(), daedalus_runtime::executor::NodeError> {
37///     Ok(())
38/// }
39///
40/// let plan = RuntimePlan::from_execution(&ExecutionPlan::new(Graph::default(), vec![]));
41/// let _exec = Executor::new(&plan, handler);
42/// ```
43pub struct Executor<'a, H: NodeHandler> {
44    pub(crate) nodes: Arc<[RuntimeNode]>,
45    pub(crate) edges: &'a [EdgeSpec],
46    #[cfg(feature = "gpu")]
47    #[allow(dead_code)]
48    pub(crate) gpu_edges: &'a [daedalus_planner::EdgeBufferInfo],
49    #[cfg(feature = "gpu")]
50    pub(crate) _gpu_entries: &'a [usize],
51    #[cfg(feature = "gpu")]
52    pub(crate) _gpu_exits: &'a [usize],
53    #[cfg(feature = "gpu")]
54    pub(crate) gpu_entry_set: Arc<HashSet<usize>>,
55    #[cfg(feature = "gpu")]
56    pub(crate) gpu_exit_set: Arc<HashSet<usize>>,
57    #[cfg(feature = "gpu")]
58    pub(crate) payload_edges: Arc<HashSet<usize>>,
59    #[cfg(not(feature = "gpu"))]
60    #[allow(dead_code)]
61    pub(crate) gpu_edges: &'a [()],
62    pub(crate) segments: &'a [RuntimeSegment],
63    pub(crate) schedule_order: &'a [NodeRef],
64    pub(crate) const_inputs: Arc<Vec<Vec<(String, daedalus_data::model::Value)>>>,
65    pub(crate) backpressure: BackpressureStrategy,
66    pub(crate) handler: Arc<H>,
67    pub(crate) state: StateStore,
68    pub(crate) gpu_available: bool,
69    pub(crate) gpu: MaybeGpu,
70    pub(crate) queues: Arc<Vec<EdgeStorage>>,
71    pub(crate) warnings_seen: Arc<Mutex<HashSet<String>>>,
72    pub(crate) telemetry: ExecutionTelemetry,
73    pub(crate) metrics_level: MetricsLevel,
74    pub(crate) pool_size: Option<usize>,
75    pub(crate) host_bridges: Option<crate::host_bridge::HostBridgeManager>,
76    pub(crate) const_coercers: Option<crate::io::ConstCoercerMap>,
77    pub(crate) output_movers: Option<crate::io::OutputMoverMap>,
78    pub(crate) graph_metadata: Arc<BTreeMap<String, daedalus_data::model::Value>>,
79}
80
81#[cfg(feature = "gpu")]
82type MaybeGpu = Option<daedalus_gpu::GpuContextHandle>;
83#[cfg(not(feature = "gpu"))]
84type MaybeGpu = Option<()>;
85
86type EdgeSpec = (NodeRef, String, NodeRef, String, EdgePolicyKind);
87
88impl<'a, H: NodeHandler> Executor<'a, H> {
89    /// Build an executor from a runtime plan and handler.
90    pub fn new(plan: &'a RuntimePlan, handler: H) -> Self {
91        let nodes: Arc<[RuntimeNode]> = plan.nodes.clone().into();
92        let queues = queue::build_queues(plan);
93        #[cfg(feature = "gpu")]
94        let payload_edges = Arc::new(collect_payload_edges(&nodes, &plan.edges));
95        Self {
96            nodes,
97            edges: &plan.edges,
98            #[cfg(feature = "gpu")]
99            gpu_edges: &plan.gpu_edges,
100            #[cfg(feature = "gpu")]
101            _gpu_entries: &plan.gpu_entries,
102            #[cfg(feature = "gpu")]
103            _gpu_exits: &plan.gpu_exits,
104            #[cfg(feature = "gpu")]
105            gpu_entry_set: Arc::new(plan.gpu_entries.iter().cloned().collect()),
106            #[cfg(feature = "gpu")]
107            gpu_exit_set: Arc::new(plan.gpu_exits.iter().cloned().collect()),
108            #[cfg(feature = "gpu")]
109            payload_edges,
110            #[cfg(not(feature = "gpu"))]
111            gpu_edges: &[],
112            segments: &plan.segments,
113            schedule_order: &plan.schedule_order,
114            const_inputs: Arc::new(plan.nodes.iter().map(|n| n.const_inputs.clone()).collect()),
115            backpressure: plan.backpressure.clone(),
116            handler: Arc::new(handler),
117            state: StateStore::default(),
118            gpu_available: false,
119            #[cfg(feature = "gpu")]
120            gpu: None,
121            #[cfg(not(feature = "gpu"))]
122            gpu: None,
123            queues: Arc::new(queues),
124            warnings_seen: Arc::new(Mutex::new(HashSet::new())),
125            telemetry: ExecutionTelemetry::with_level(MetricsLevel::default()),
126            metrics_level: MetricsLevel::default(),
127            pool_size: None,
128            host_bridges: None,
129            const_coercers: None,
130            output_movers: None,
131            graph_metadata: Arc::new(plan.graph_metadata.clone()),
132        }
133    }
134
135    /// Provide a shared constant coercer registry (used by dynamic plugins).
136    pub fn with_const_coercers(mut self, coercers: crate::io::ConstCoercerMap) -> Self {
137        self.const_coercers = Some(coercers);
138        self
139    }
140
141    /// Provide a shared output mover registry (used by dynamic plugins).
142    pub fn with_output_movers(mut self, movers: crate::io::OutputMoverMap) -> Self {
143        self.output_movers = Some(movers);
144        self
145    }
146
147    /// Inject shared state store (optional).
148    pub fn with_state(mut self, state: StateStore) -> Self {
149        self.state = state;
150        self
151    }
152
153    /// Provide a GPU handle when available.
154    #[cfg(feature = "gpu")]
155    pub fn with_gpu(mut self, gpu: daedalus_gpu::GpuContextHandle) -> Self {
156        self.gpu_available = true;
157        self.gpu = Some(gpu.clone());
158        if let Some(ref mgr) = self.host_bridges {
159            mgr.attach_gpu(gpu);
160        }
161        self
162    }
163
164    #[cfg(not(feature = "gpu"))]
165    pub fn without_gpu(mut self) -> Self {
166        self.gpu_available = false;
167        self
168    }
169
170    /// Override pool size when using the pool-based parallel executor.
171    pub fn with_pool_size(mut self, size: Option<usize>) -> Self {
172        self.pool_size = size;
173        self
174    }
175
176    pub fn with_metrics_level(mut self, level: MetricsLevel) -> Self {
177        self.metrics_level = level;
178        self.telemetry.metrics_level = level;
179        self
180    }
181
182    /// Attach a host bridge manager to enable implicit host I/O nodes.
183    pub fn with_host_bridges(mut self, mgr: crate::host_bridge::HostBridgeManager) -> Self {
184        #[cfg(feature = "gpu")]
185        if let Some(gpu) = self.gpu.clone() {
186            mgr.attach_gpu(gpu);
187        }
188        self.host_bridges = Some(mgr);
189        self
190    }
191
192    /// Reset per-run state (queues, telemetry, warnings) so this executor can be reused.
193    pub fn reset(&mut self) {
194        self.telemetry = ExecutionTelemetry::with_level(self.metrics_level);
195        if let Ok(mut warnings) = self.warnings_seen.lock() {
196            warnings.clear();
197        }
198
199        for (idx, storage) in self.queues.iter().enumerate() {
200            match storage {
201                EdgeStorage::Locked(queue) => {
202                    if let Ok(mut q) = queue.lock() {
203                        *q = queue::EdgeQueue::default();
204                        if let Some((_, _, _, _, policy)) = self.edges.get(idx) {
205                            q.ensure_policy(policy);
206                        }
207                    }
208                }
209                #[cfg(feature = "lockfree-queues")]
210                EdgeStorage::BoundedLf(queue) => while queue.pop().is_some() {},
211            }
212        }
213    }
214
215    /// Build a lightweight snapshot for a single run without re-planning.
216    fn snapshot(&self) -> Self {
217        Self {
218            nodes: self.nodes.clone(),
219            edges: self.edges,
220            #[cfg(feature = "gpu")]
221            gpu_edges: self.gpu_edges,
222            #[cfg(feature = "gpu")]
223            _gpu_entries: self._gpu_entries,
224            #[cfg(feature = "gpu")]
225            _gpu_exits: self._gpu_exits,
226            #[cfg(feature = "gpu")]
227            gpu_entry_set: self.gpu_entry_set.clone(),
228            #[cfg(feature = "gpu")]
229            gpu_exit_set: self.gpu_exit_set.clone(),
230            #[cfg(feature = "gpu")]
231            payload_edges: self.payload_edges.clone(),
232            #[cfg(not(feature = "gpu"))]
233            gpu_edges: self.gpu_edges,
234            segments: self.segments,
235            schedule_order: self.schedule_order,
236            const_inputs: self.const_inputs.clone(),
237            backpressure: self.backpressure.clone(),
238            handler: self.handler.clone(),
239            state: self.state.clone(),
240            gpu_available: self.gpu_available,
241            #[cfg(feature = "gpu")]
242            gpu: self.gpu.clone(),
243            #[cfg(not(feature = "gpu"))]
244            gpu: self.gpu,
245            queues: self.queues.clone(),
246            warnings_seen: self.warnings_seen.clone(),
247            telemetry: ExecutionTelemetry::with_level(self.metrics_level),
248            metrics_level: self.metrics_level,
249            pool_size: self.pool_size,
250            host_bridges: self.host_bridges.clone(),
251            const_coercers: self.const_coercers.clone(),
252            output_movers: self.output_movers.clone(),
253            graph_metadata: self.graph_metadata.clone(),
254        }
255    }
256
257    /// Execute the runtime plan serially in segment order.
258    pub fn run(self) -> Result<ExecutionTelemetry, ExecuteError> {
259        serial::run(self)
260    }
261
262    /// Execute the runtime plan serially without rebuilding the executor.
263    pub fn run_in_place(&mut self) -> Result<ExecutionTelemetry, ExecuteError> {
264        self.reset();
265        let exec = self.snapshot();
266        let result = serial::run(exec);
267        self.reset();
268        result
269    }
270
271    /// Execute the runtime plan allowing independent segments to run in parallel.
272    pub fn run_parallel(self) -> Result<ExecutionTelemetry, ExecuteError>
273    where
274        H: Send + Sync + 'static,
275    {
276        #[cfg(feature = "executor-pool")]
277        {
278            pool::run(self)
279        }
280        #[cfg(not(feature = "executor-pool"))]
281        {
282            parallel::run(self)
283        }
284    }
285
286    /// Execute the runtime plan in parallel without rebuilding the executor.
287    pub fn run_parallel_in_place(&mut self) -> Result<ExecutionTelemetry, ExecuteError>
288    where
289        H: Send + Sync + 'static,
290    {
291        self.reset();
292        let exec = self.snapshot();
293        let result = self.run_parallel_from_snapshot(exec);
294        self.reset();
295        result
296    }
297
298    fn run_parallel_from_snapshot(
299        &self,
300        exec: Executor<'a, H>,
301    ) -> Result<ExecutionTelemetry, ExecuteError>
302    where
303        H: Send + Sync + 'static,
304    {
305        #[cfg(feature = "executor-pool")]
306        {
307            pool::run(exec)
308        }
309        #[cfg(not(feature = "executor-pool"))]
310        {
311            parallel::run(exec)
312        }
313    }
314}
315
316#[cfg(feature = "gpu")]
317fn collect_payload_edges(nodes: &[RuntimeNode], edges: &[EdgeSpec]) -> HashSet<usize> {
318    let mut out = HashSet::new();
319    for (idx, (_from, _from_port, to, _to_port, _policy)) in edges.iter().enumerate() {
320        if let Some(node) = nodes.get(to.0)
321            && node.id.ends_with("io.host_output")
322        {
323            out.insert(idx);
324        }
325    }
326    out
327}
328
329pub(crate) fn thread_cpu_time() -> Option<Duration> {
330    #[cfg(target_os = "linux")]
331    unsafe {
332        let mut ts = libc::timespec {
333            tv_sec: 0,
334            tv_nsec: 0,
335        };
336        if libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut ts) == 0 {
337            return Some(Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32));
338        }
339    }
340    None
341}
342
343/// Owned executor that can be reused across runs without leaking the plan.
344pub struct OwnedExecutor<H: NodeHandler> {
345    pub(crate) nodes: Arc<[RuntimeNode]>,
346    pub(crate) edges: Arc<Vec<EdgeSpec>>,
347    #[cfg(feature = "gpu")]
348    #[allow(dead_code)]
349    pub(crate) gpu_edges: Arc<Vec<daedalus_planner::EdgeBufferInfo>>,
350    #[cfg(feature = "gpu")]
351    pub(crate) _gpu_entries: Arc<Vec<usize>>,
352    #[cfg(feature = "gpu")]
353    pub(crate) _gpu_exits: Arc<Vec<usize>>,
354    #[cfg(feature = "gpu")]
355    pub(crate) gpu_entry_set: Arc<HashSet<usize>>,
356    #[cfg(feature = "gpu")]
357    pub(crate) gpu_exit_set: Arc<HashSet<usize>>,
358    #[cfg(feature = "gpu")]
359    pub(crate) payload_edges: Arc<HashSet<usize>>,
360    #[cfg(not(feature = "gpu"))]
361    #[allow(dead_code)]
362    pub(crate) gpu_edges: Arc<Vec<()>>,
363    pub(crate) segments: Arc<Vec<RuntimeSegment>>,
364    pub(crate) schedule_order: Arc<Vec<NodeRef>>,
365    pub(crate) const_inputs: Arc<Vec<Vec<(String, daedalus_data::model::Value)>>>,
366    pub(crate) backpressure: BackpressureStrategy,
367    pub(crate) handler: Arc<H>,
368    pub(crate) state: StateStore,
369    pub(crate) gpu_available: bool,
370    pub(crate) gpu: MaybeGpu,
371    pub(crate) queues: Arc<Vec<EdgeStorage>>,
372    pub(crate) warnings_seen: Arc<Mutex<HashSet<String>>>,
373    pub(crate) telemetry: ExecutionTelemetry,
374    pub(crate) metrics_level: MetricsLevel,
375    pub(crate) pool_size: Option<usize>,
376    pub(crate) host_bridges: Option<crate::host_bridge::HostBridgeManager>,
377    pub(crate) const_coercers: Option<crate::io::ConstCoercerMap>,
378    pub(crate) output_movers: Option<crate::io::OutputMoverMap>,
379    pub(crate) graph_metadata: Arc<BTreeMap<String, daedalus_data::model::Value>>,
380}
381
382impl<H: NodeHandler> OwnedExecutor<H> {
383    pub fn new(plan: Arc<RuntimePlan>, handler: H) -> Self {
384        let nodes: Arc<[RuntimeNode]> = plan.nodes.clone().into();
385        let queues = queue::build_queues(&plan);
386        #[cfg(feature = "gpu")]
387        let payload_edges = Arc::new(collect_payload_edges(&nodes, &plan.edges));
388        Self {
389            nodes,
390            edges: Arc::new(plan.edges.clone()),
391            #[cfg(feature = "gpu")]
392            gpu_edges: Arc::new(plan.gpu_edges.clone()),
393            #[cfg(feature = "gpu")]
394            _gpu_entries: Arc::new(plan.gpu_entries.clone()),
395            #[cfg(feature = "gpu")]
396            _gpu_exits: Arc::new(plan.gpu_exits.clone()),
397            #[cfg(feature = "gpu")]
398            gpu_entry_set: Arc::new(plan.gpu_entries.iter().cloned().collect()),
399            #[cfg(feature = "gpu")]
400            gpu_exit_set: Arc::new(plan.gpu_exits.iter().cloned().collect()),
401            #[cfg(feature = "gpu")]
402            payload_edges,
403            #[cfg(not(feature = "gpu"))]
404            gpu_edges: Arc::new(Vec::new()),
405            segments: Arc::new(plan.segments.clone()),
406            schedule_order: Arc::new(plan.schedule_order.clone()),
407            const_inputs: Arc::new(plan.nodes.iter().map(|n| n.const_inputs.clone()).collect()),
408            backpressure: plan.backpressure.clone(),
409            handler: Arc::new(handler),
410            state: StateStore::default(),
411            gpu_available: false,
412            #[cfg(feature = "gpu")]
413            gpu: None,
414            #[cfg(not(feature = "gpu"))]
415            gpu: None,
416            queues: Arc::new(queues),
417            warnings_seen: Arc::new(Mutex::new(HashSet::new())),
418            telemetry: ExecutionTelemetry::with_level(MetricsLevel::default()),
419            metrics_level: MetricsLevel::default(),
420            pool_size: None,
421            host_bridges: None,
422            const_coercers: None,
423            output_movers: None,
424            graph_metadata: Arc::new(plan.graph_metadata.clone()),
425        }
426    }
427
428    /// Provide a shared constant coercer registry (used by dynamic plugins).
429    pub fn with_const_coercers(mut self, coercers: crate::io::ConstCoercerMap) -> Self {
430        self.const_coercers = Some(coercers);
431        self
432    }
433
434    /// Provide a shared output mover registry (used by dynamic plugins).
435    pub fn with_output_movers(mut self, movers: crate::io::OutputMoverMap) -> Self {
436        self.output_movers = Some(movers);
437        self
438    }
439
440    pub fn with_state(mut self, state: StateStore) -> Self {
441        self.state = state;
442        self
443    }
444
445    #[cfg(feature = "gpu")]
446    pub fn with_gpu(mut self, gpu: daedalus_gpu::GpuContextHandle) -> Self {
447        self.gpu_available = true;
448        self.gpu = Some(gpu.clone());
449        if let Some(ref mgr) = self.host_bridges {
450            mgr.attach_gpu(gpu);
451        }
452        self
453    }
454
455    #[cfg(not(feature = "gpu"))]
456    pub fn without_gpu(mut self) -> Self {
457        self.gpu_available = false;
458        self
459    }
460
461    pub fn with_pool_size(mut self, size: Option<usize>) -> Self {
462        self.pool_size = size;
463        self
464    }
465
466    pub fn with_host_bridges(mut self, mgr: crate::host_bridge::HostBridgeManager) -> Self {
467        #[cfg(feature = "gpu")]
468        if let Some(gpu) = self.gpu.clone() {
469            mgr.attach_gpu(gpu);
470        }
471        self.host_bridges = Some(mgr);
472        self
473    }
474
475    pub fn reset(&mut self) {
476        self.telemetry = ExecutionTelemetry::with_level(self.metrics_level);
477        if let Ok(mut warnings) = self.warnings_seen.lock() {
478            warnings.clear();
479        }
480        for (idx, storage) in self.queues.iter().enumerate() {
481            match storage {
482                EdgeStorage::Locked(queue) => {
483                    if let Ok(mut q) = queue.lock() {
484                        *q = queue::EdgeQueue::default();
485                        if let Some((_, _, _, _, policy)) = self.edges.get(idx) {
486                            q.ensure_policy(policy);
487                        }
488                    }
489                }
490                #[cfg(feature = "lockfree-queues")]
491                EdgeStorage::BoundedLf(queue) => while queue.pop().is_some() {},
492            }
493        }
494    }
495
496    fn snapshot<'a>(&'a self) -> Executor<'a, H> {
497        Executor {
498            nodes: self.nodes.clone(),
499            edges: self.edges.as_slice(),
500            #[cfg(feature = "gpu")]
501            gpu_edges: self.gpu_edges.as_slice(),
502            #[cfg(feature = "gpu")]
503            _gpu_entries: self._gpu_entries.as_slice(),
504            #[cfg(feature = "gpu")]
505            _gpu_exits: self._gpu_exits.as_slice(),
506            #[cfg(feature = "gpu")]
507            gpu_entry_set: self.gpu_entry_set.clone(),
508            #[cfg(feature = "gpu")]
509            gpu_exit_set: self.gpu_exit_set.clone(),
510            #[cfg(feature = "gpu")]
511            payload_edges: self.payload_edges.clone(),
512            #[cfg(not(feature = "gpu"))]
513            gpu_edges: self.gpu_edges.as_slice(),
514            segments: self.segments.as_slice(),
515            schedule_order: self.schedule_order.as_slice(),
516            const_inputs: self.const_inputs.clone(),
517            backpressure: self.backpressure.clone(),
518            handler: self.handler.clone(),
519            state: self.state.clone(),
520            gpu_available: self.gpu_available,
521            #[cfg(feature = "gpu")]
522            gpu: self.gpu.clone(),
523            #[cfg(not(feature = "gpu"))]
524            gpu: self.gpu,
525            queues: self.queues.clone(),
526            warnings_seen: self.warnings_seen.clone(),
527            telemetry: ExecutionTelemetry::with_level(self.metrics_level),
528            metrics_level: self.metrics_level,
529            pool_size: self.pool_size,
530            host_bridges: self.host_bridges.clone(),
531            const_coercers: self.const_coercers.clone(),
532            output_movers: self.output_movers.clone(),
533            graph_metadata: self.graph_metadata.clone(),
534        }
535    }
536
537    pub fn run_in_place(&mut self) -> Result<ExecutionTelemetry, ExecuteError> {
538        self.reset();
539        let exec = self.snapshot();
540        let res = serial::run(exec);
541        self.reset();
542        res
543    }
544
545    pub fn run_parallel_in_place(&mut self) -> Result<ExecutionTelemetry, ExecuteError>
546    where
547        H: Send + Sync + 'static,
548    {
549        self.reset();
550        let exec = self.snapshot();
551        let res = {
552            #[cfg(feature = "executor-pool")]
553            {
554                pool::run(exec)
555            }
556            #[cfg(not(feature = "executor-pool"))]
557            {
558                parallel::run(exec)
559            }
560        };
561        self.reset();
562        res
563    }
564}
565
566/// Build adjacency maps of incoming/outgoing edge indices per node.
567pub(crate) fn edge_maps(
568    edges: &[(NodeRef, String, NodeRef, String, EdgePolicyKind)],
569) -> (Vec<Vec<usize>>, Vec<Vec<usize>>) {
570    let mut incoming: Vec<Vec<usize>> = Vec::new();
571    let mut outgoing: Vec<Vec<usize>> = Vec::new();
572    let grow = |v: &mut Vec<Vec<usize>>, idx: usize| {
573        while v.len() <= idx {
574            v.push(Vec::new());
575        }
576    };
577    for (idx, (from, _, to, _, _)) in edges.iter().enumerate() {
578        let f = from.0;
579        let t = to.0;
580        grow(&mut incoming, f.max(t));
581        grow(&mut outgoing, f.max(t));
582        outgoing[f].push(idx);
583        incoming[t].push(idx);
584    }
585    (incoming, outgoing)
586}