1use crate::plan::{BackpressureStrategy, EdgePolicyKind, RuntimeNode, RuntimePlan, RuntimeSegment};
2use crate::state::StateStore;
3use daedalus_planner::NodeRef;
4use std::collections::{BTreeMap, HashSet};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8mod errors;
9mod crash_diag;
10mod handler;
11mod parallel;
12mod payload;
13#[cfg(feature = "executor-pool")]
14mod pool;
15pub mod queue;
16mod serial;
17mod telemetry;
18
19pub use errors::{ExecuteError, NodeError};
20pub use handler::NodeHandler;
21pub use payload::{CorrelatedPayload, EdgePayload, next_correlation_id};
22pub use queue::EdgeStorage;
23pub use telemetry::{EdgeMetrics, ExecutionTelemetry, MetricsLevel, NodeMetrics};
24pub(crate) use telemetry::payload_size_bytes;
25pub struct Executor<'a, H: NodeHandler> {
44 pub(crate) nodes: Arc<[RuntimeNode]>,
45 pub(crate) edges: &'a [EdgeSpec],
46 #[cfg(feature = "gpu")]
47 #[allow(dead_code)]
48 pub(crate) gpu_edges: &'a [daedalus_planner::EdgeBufferInfo],
49 #[cfg(feature = "gpu")]
50 pub(crate) _gpu_entries: &'a [usize],
51 #[cfg(feature = "gpu")]
52 pub(crate) _gpu_exits: &'a [usize],
53 #[cfg(feature = "gpu")]
54 pub(crate) gpu_entry_set: Arc<HashSet<usize>>,
55 #[cfg(feature = "gpu")]
56 pub(crate) gpu_exit_set: Arc<HashSet<usize>>,
57 #[cfg(feature = "gpu")]
58 pub(crate) payload_edges: Arc<HashSet<usize>>,
59 #[cfg(not(feature = "gpu"))]
60 #[allow(dead_code)]
61 pub(crate) gpu_edges: &'a [()],
62 pub(crate) segments: &'a [RuntimeSegment],
63 pub(crate) schedule_order: &'a [NodeRef],
64 pub(crate) const_inputs: Arc<Vec<Vec<(String, daedalus_data::model::Value)>>>,
65 pub(crate) backpressure: BackpressureStrategy,
66 pub(crate) handler: Arc<H>,
67 pub(crate) state: StateStore,
68 pub(crate) gpu_available: bool,
69 pub(crate) gpu: MaybeGpu,
70 pub(crate) queues: Arc<Vec<EdgeStorage>>,
71 pub(crate) warnings_seen: Arc<Mutex<HashSet<String>>>,
72 pub(crate) telemetry: ExecutionTelemetry,
73 pub(crate) metrics_level: MetricsLevel,
74 pub(crate) pool_size: Option<usize>,
75 pub(crate) host_bridges: Option<crate::host_bridge::HostBridgeManager>,
76 pub(crate) const_coercers: Option<crate::io::ConstCoercerMap>,
77 pub(crate) output_movers: Option<crate::io::OutputMoverMap>,
78 pub(crate) graph_metadata: Arc<BTreeMap<String, daedalus_data::model::Value>>,
79}
80
81#[cfg(feature = "gpu")]
82type MaybeGpu = Option<daedalus_gpu::GpuContextHandle>;
83#[cfg(not(feature = "gpu"))]
84type MaybeGpu = Option<()>;
85
86type EdgeSpec = (NodeRef, String, NodeRef, String, EdgePolicyKind);
87
88impl<'a, H: NodeHandler> Executor<'a, H> {
89 pub fn new(plan: &'a RuntimePlan, handler: H) -> Self {
91 let nodes: Arc<[RuntimeNode]> = plan.nodes.clone().into();
92 let queues = queue::build_queues(plan);
93 #[cfg(feature = "gpu")]
94 let payload_edges = Arc::new(collect_payload_edges(&nodes, &plan.edges));
95 Self {
96 nodes,
97 edges: &plan.edges,
98 #[cfg(feature = "gpu")]
99 gpu_edges: &plan.gpu_edges,
100 #[cfg(feature = "gpu")]
101 _gpu_entries: &plan.gpu_entries,
102 #[cfg(feature = "gpu")]
103 _gpu_exits: &plan.gpu_exits,
104 #[cfg(feature = "gpu")]
105 gpu_entry_set: Arc::new(plan.gpu_entries.iter().cloned().collect()),
106 #[cfg(feature = "gpu")]
107 gpu_exit_set: Arc::new(plan.gpu_exits.iter().cloned().collect()),
108 #[cfg(feature = "gpu")]
109 payload_edges,
110 #[cfg(not(feature = "gpu"))]
111 gpu_edges: &[],
112 segments: &plan.segments,
113 schedule_order: &plan.schedule_order,
114 const_inputs: Arc::new(plan.nodes.iter().map(|n| n.const_inputs.clone()).collect()),
115 backpressure: plan.backpressure.clone(),
116 handler: Arc::new(handler),
117 state: StateStore::default(),
118 gpu_available: false,
119 #[cfg(feature = "gpu")]
120 gpu: None,
121 #[cfg(not(feature = "gpu"))]
122 gpu: None,
123 queues: Arc::new(queues),
124 warnings_seen: Arc::new(Mutex::new(HashSet::new())),
125 telemetry: ExecutionTelemetry::with_level(MetricsLevel::default()),
126 metrics_level: MetricsLevel::default(),
127 pool_size: None,
128 host_bridges: None,
129 const_coercers: None,
130 output_movers: None,
131 graph_metadata: Arc::new(plan.graph_metadata.clone()),
132 }
133 }
134
135 pub fn with_const_coercers(mut self, coercers: crate::io::ConstCoercerMap) -> Self {
137 self.const_coercers = Some(coercers);
138 self
139 }
140
141 pub fn with_output_movers(mut self, movers: crate::io::OutputMoverMap) -> Self {
143 self.output_movers = Some(movers);
144 self
145 }
146
147 pub fn with_state(mut self, state: StateStore) -> Self {
149 self.state = state;
150 self
151 }
152
153 #[cfg(feature = "gpu")]
155 pub fn with_gpu(mut self, gpu: daedalus_gpu::GpuContextHandle) -> Self {
156 self.gpu_available = true;
157 self.gpu = Some(gpu.clone());
158 if let Some(ref mgr) = self.host_bridges {
159 mgr.attach_gpu(gpu);
160 }
161 self
162 }
163
164 #[cfg(not(feature = "gpu"))]
165 pub fn without_gpu(mut self) -> Self {
166 self.gpu_available = false;
167 self
168 }
169
170 pub fn with_pool_size(mut self, size: Option<usize>) -> Self {
172 self.pool_size = size;
173 self
174 }
175
176 pub fn with_metrics_level(mut self, level: MetricsLevel) -> Self {
177 self.metrics_level = level;
178 self.telemetry.metrics_level = level;
179 self
180 }
181
182 pub fn with_host_bridges(mut self, mgr: crate::host_bridge::HostBridgeManager) -> Self {
184 #[cfg(feature = "gpu")]
185 if let Some(gpu) = self.gpu.clone() {
186 mgr.attach_gpu(gpu);
187 }
188 self.host_bridges = Some(mgr);
189 self
190 }
191
192 pub fn reset(&mut self) {
194 self.telemetry = ExecutionTelemetry::with_level(self.metrics_level);
195 if let Ok(mut warnings) = self.warnings_seen.lock() {
196 warnings.clear();
197 }
198
199 for (idx, storage) in self.queues.iter().enumerate() {
200 match storage {
201 EdgeStorage::Locked(queue) => {
202 if let Ok(mut q) = queue.lock() {
203 *q = queue::EdgeQueue::default();
204 if let Some((_, _, _, _, policy)) = self.edges.get(idx) {
205 q.ensure_policy(policy);
206 }
207 }
208 }
209 #[cfg(feature = "lockfree-queues")]
210 EdgeStorage::BoundedLf(queue) => while queue.pop().is_some() {},
211 }
212 }
213 }
214
215 fn snapshot(&self) -> Self {
217 Self {
218 nodes: self.nodes.clone(),
219 edges: self.edges,
220 #[cfg(feature = "gpu")]
221 gpu_edges: self.gpu_edges,
222 #[cfg(feature = "gpu")]
223 _gpu_entries: self._gpu_entries,
224 #[cfg(feature = "gpu")]
225 _gpu_exits: self._gpu_exits,
226 #[cfg(feature = "gpu")]
227 gpu_entry_set: self.gpu_entry_set.clone(),
228 #[cfg(feature = "gpu")]
229 gpu_exit_set: self.gpu_exit_set.clone(),
230 #[cfg(feature = "gpu")]
231 payload_edges: self.payload_edges.clone(),
232 #[cfg(not(feature = "gpu"))]
233 gpu_edges: self.gpu_edges,
234 segments: self.segments,
235 schedule_order: self.schedule_order,
236 const_inputs: self.const_inputs.clone(),
237 backpressure: self.backpressure.clone(),
238 handler: self.handler.clone(),
239 state: self.state.clone(),
240 gpu_available: self.gpu_available,
241 #[cfg(feature = "gpu")]
242 gpu: self.gpu.clone(),
243 #[cfg(not(feature = "gpu"))]
244 gpu: self.gpu,
245 queues: self.queues.clone(),
246 warnings_seen: self.warnings_seen.clone(),
247 telemetry: ExecutionTelemetry::with_level(self.metrics_level),
248 metrics_level: self.metrics_level,
249 pool_size: self.pool_size,
250 host_bridges: self.host_bridges.clone(),
251 const_coercers: self.const_coercers.clone(),
252 output_movers: self.output_movers.clone(),
253 graph_metadata: self.graph_metadata.clone(),
254 }
255 }
256
257 pub fn run(self) -> Result<ExecutionTelemetry, ExecuteError> {
259 serial::run(self)
260 }
261
262 pub fn run_in_place(&mut self) -> Result<ExecutionTelemetry, ExecuteError> {
264 self.reset();
265 let exec = self.snapshot();
266 let result = serial::run(exec);
267 self.reset();
268 result
269 }
270
271 pub fn run_parallel(self) -> Result<ExecutionTelemetry, ExecuteError>
273 where
274 H: Send + Sync + 'static,
275 {
276 #[cfg(feature = "executor-pool")]
277 {
278 pool::run(self)
279 }
280 #[cfg(not(feature = "executor-pool"))]
281 {
282 parallel::run(self)
283 }
284 }
285
286 pub fn run_parallel_in_place(&mut self) -> Result<ExecutionTelemetry, ExecuteError>
288 where
289 H: Send + Sync + 'static,
290 {
291 self.reset();
292 let exec = self.snapshot();
293 let result = self.run_parallel_from_snapshot(exec);
294 self.reset();
295 result
296 }
297
298 fn run_parallel_from_snapshot(
299 &self,
300 exec: Executor<'a, H>,
301 ) -> Result<ExecutionTelemetry, ExecuteError>
302 where
303 H: Send + Sync + 'static,
304 {
305 #[cfg(feature = "executor-pool")]
306 {
307 pool::run(exec)
308 }
309 #[cfg(not(feature = "executor-pool"))]
310 {
311 parallel::run(exec)
312 }
313 }
314}
315
316#[cfg(feature = "gpu")]
317fn collect_payload_edges(nodes: &[RuntimeNode], edges: &[EdgeSpec]) -> HashSet<usize> {
318 let mut out = HashSet::new();
319 for (idx, (_from, _from_port, to, _to_port, _policy)) in edges.iter().enumerate() {
320 if let Some(node) = nodes.get(to.0)
321 && node.id.ends_with("io.host_output")
322 {
323 out.insert(idx);
324 }
325 }
326 out
327}
328
329pub(crate) fn thread_cpu_time() -> Option<Duration> {
330 #[cfg(target_os = "linux")]
331 unsafe {
332 let mut ts = libc::timespec {
333 tv_sec: 0,
334 tv_nsec: 0,
335 };
336 if libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut ts) == 0 {
337 return Some(Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32));
338 }
339 }
340 None
341}
342
343pub struct OwnedExecutor<H: NodeHandler> {
345 pub(crate) nodes: Arc<[RuntimeNode]>,
346 pub(crate) edges: Arc<Vec<EdgeSpec>>,
347 #[cfg(feature = "gpu")]
348 #[allow(dead_code)]
349 pub(crate) gpu_edges: Arc<Vec<daedalus_planner::EdgeBufferInfo>>,
350 #[cfg(feature = "gpu")]
351 pub(crate) _gpu_entries: Arc<Vec<usize>>,
352 #[cfg(feature = "gpu")]
353 pub(crate) _gpu_exits: Arc<Vec<usize>>,
354 #[cfg(feature = "gpu")]
355 pub(crate) gpu_entry_set: Arc<HashSet<usize>>,
356 #[cfg(feature = "gpu")]
357 pub(crate) gpu_exit_set: Arc<HashSet<usize>>,
358 #[cfg(feature = "gpu")]
359 pub(crate) payload_edges: Arc<HashSet<usize>>,
360 #[cfg(not(feature = "gpu"))]
361 #[allow(dead_code)]
362 pub(crate) gpu_edges: Arc<Vec<()>>,
363 pub(crate) segments: Arc<Vec<RuntimeSegment>>,
364 pub(crate) schedule_order: Arc<Vec<NodeRef>>,
365 pub(crate) const_inputs: Arc<Vec<Vec<(String, daedalus_data::model::Value)>>>,
366 pub(crate) backpressure: BackpressureStrategy,
367 pub(crate) handler: Arc<H>,
368 pub(crate) state: StateStore,
369 pub(crate) gpu_available: bool,
370 pub(crate) gpu: MaybeGpu,
371 pub(crate) queues: Arc<Vec<EdgeStorage>>,
372 pub(crate) warnings_seen: Arc<Mutex<HashSet<String>>>,
373 pub(crate) telemetry: ExecutionTelemetry,
374 pub(crate) metrics_level: MetricsLevel,
375 pub(crate) pool_size: Option<usize>,
376 pub(crate) host_bridges: Option<crate::host_bridge::HostBridgeManager>,
377 pub(crate) const_coercers: Option<crate::io::ConstCoercerMap>,
378 pub(crate) output_movers: Option<crate::io::OutputMoverMap>,
379 pub(crate) graph_metadata: Arc<BTreeMap<String, daedalus_data::model::Value>>,
380}
381
382impl<H: NodeHandler> OwnedExecutor<H> {
383 pub fn new(plan: Arc<RuntimePlan>, handler: H) -> Self {
384 let nodes: Arc<[RuntimeNode]> = plan.nodes.clone().into();
385 let queues = queue::build_queues(&plan);
386 #[cfg(feature = "gpu")]
387 let payload_edges = Arc::new(collect_payload_edges(&nodes, &plan.edges));
388 Self {
389 nodes,
390 edges: Arc::new(plan.edges.clone()),
391 #[cfg(feature = "gpu")]
392 gpu_edges: Arc::new(plan.gpu_edges.clone()),
393 #[cfg(feature = "gpu")]
394 _gpu_entries: Arc::new(plan.gpu_entries.clone()),
395 #[cfg(feature = "gpu")]
396 _gpu_exits: Arc::new(plan.gpu_exits.clone()),
397 #[cfg(feature = "gpu")]
398 gpu_entry_set: Arc::new(plan.gpu_entries.iter().cloned().collect()),
399 #[cfg(feature = "gpu")]
400 gpu_exit_set: Arc::new(plan.gpu_exits.iter().cloned().collect()),
401 #[cfg(feature = "gpu")]
402 payload_edges,
403 #[cfg(not(feature = "gpu"))]
404 gpu_edges: Arc::new(Vec::new()),
405 segments: Arc::new(plan.segments.clone()),
406 schedule_order: Arc::new(plan.schedule_order.clone()),
407 const_inputs: Arc::new(plan.nodes.iter().map(|n| n.const_inputs.clone()).collect()),
408 backpressure: plan.backpressure.clone(),
409 handler: Arc::new(handler),
410 state: StateStore::default(),
411 gpu_available: false,
412 #[cfg(feature = "gpu")]
413 gpu: None,
414 #[cfg(not(feature = "gpu"))]
415 gpu: None,
416 queues: Arc::new(queues),
417 warnings_seen: Arc::new(Mutex::new(HashSet::new())),
418 telemetry: ExecutionTelemetry::with_level(MetricsLevel::default()),
419 metrics_level: MetricsLevel::default(),
420 pool_size: None,
421 host_bridges: None,
422 const_coercers: None,
423 output_movers: None,
424 graph_metadata: Arc::new(plan.graph_metadata.clone()),
425 }
426 }
427
428 pub fn with_const_coercers(mut self, coercers: crate::io::ConstCoercerMap) -> Self {
430 self.const_coercers = Some(coercers);
431 self
432 }
433
434 pub fn with_output_movers(mut self, movers: crate::io::OutputMoverMap) -> Self {
436 self.output_movers = Some(movers);
437 self
438 }
439
440 pub fn with_state(mut self, state: StateStore) -> Self {
441 self.state = state;
442 self
443 }
444
445 #[cfg(feature = "gpu")]
446 pub fn with_gpu(mut self, gpu: daedalus_gpu::GpuContextHandle) -> Self {
447 self.gpu_available = true;
448 self.gpu = Some(gpu.clone());
449 if let Some(ref mgr) = self.host_bridges {
450 mgr.attach_gpu(gpu);
451 }
452 self
453 }
454
455 #[cfg(not(feature = "gpu"))]
456 pub fn without_gpu(mut self) -> Self {
457 self.gpu_available = false;
458 self
459 }
460
461 pub fn with_pool_size(mut self, size: Option<usize>) -> Self {
462 self.pool_size = size;
463 self
464 }
465
466 pub fn with_host_bridges(mut self, mgr: crate::host_bridge::HostBridgeManager) -> Self {
467 #[cfg(feature = "gpu")]
468 if let Some(gpu) = self.gpu.clone() {
469 mgr.attach_gpu(gpu);
470 }
471 self.host_bridges = Some(mgr);
472 self
473 }
474
475 pub fn reset(&mut self) {
476 self.telemetry = ExecutionTelemetry::with_level(self.metrics_level);
477 if let Ok(mut warnings) = self.warnings_seen.lock() {
478 warnings.clear();
479 }
480 for (idx, storage) in self.queues.iter().enumerate() {
481 match storage {
482 EdgeStorage::Locked(queue) => {
483 if let Ok(mut q) = queue.lock() {
484 *q = queue::EdgeQueue::default();
485 if let Some((_, _, _, _, policy)) = self.edges.get(idx) {
486 q.ensure_policy(policy);
487 }
488 }
489 }
490 #[cfg(feature = "lockfree-queues")]
491 EdgeStorage::BoundedLf(queue) => while queue.pop().is_some() {},
492 }
493 }
494 }
495
496 fn snapshot<'a>(&'a self) -> Executor<'a, H> {
497 Executor {
498 nodes: self.nodes.clone(),
499 edges: self.edges.as_slice(),
500 #[cfg(feature = "gpu")]
501 gpu_edges: self.gpu_edges.as_slice(),
502 #[cfg(feature = "gpu")]
503 _gpu_entries: self._gpu_entries.as_slice(),
504 #[cfg(feature = "gpu")]
505 _gpu_exits: self._gpu_exits.as_slice(),
506 #[cfg(feature = "gpu")]
507 gpu_entry_set: self.gpu_entry_set.clone(),
508 #[cfg(feature = "gpu")]
509 gpu_exit_set: self.gpu_exit_set.clone(),
510 #[cfg(feature = "gpu")]
511 payload_edges: self.payload_edges.clone(),
512 #[cfg(not(feature = "gpu"))]
513 gpu_edges: self.gpu_edges.as_slice(),
514 segments: self.segments.as_slice(),
515 schedule_order: self.schedule_order.as_slice(),
516 const_inputs: self.const_inputs.clone(),
517 backpressure: self.backpressure.clone(),
518 handler: self.handler.clone(),
519 state: self.state.clone(),
520 gpu_available: self.gpu_available,
521 #[cfg(feature = "gpu")]
522 gpu: self.gpu.clone(),
523 #[cfg(not(feature = "gpu"))]
524 gpu: self.gpu,
525 queues: self.queues.clone(),
526 warnings_seen: self.warnings_seen.clone(),
527 telemetry: ExecutionTelemetry::with_level(self.metrics_level),
528 metrics_level: self.metrics_level,
529 pool_size: self.pool_size,
530 host_bridges: self.host_bridges.clone(),
531 const_coercers: self.const_coercers.clone(),
532 output_movers: self.output_movers.clone(),
533 graph_metadata: self.graph_metadata.clone(),
534 }
535 }
536
537 pub fn run_in_place(&mut self) -> Result<ExecutionTelemetry, ExecuteError> {
538 self.reset();
539 let exec = self.snapshot();
540 let res = serial::run(exec);
541 self.reset();
542 res
543 }
544
545 pub fn run_parallel_in_place(&mut self) -> Result<ExecutionTelemetry, ExecuteError>
546 where
547 H: Send + Sync + 'static,
548 {
549 self.reset();
550 let exec = self.snapshot();
551 let res = {
552 #[cfg(feature = "executor-pool")]
553 {
554 pool::run(exec)
555 }
556 #[cfg(not(feature = "executor-pool"))]
557 {
558 parallel::run(exec)
559 }
560 };
561 self.reset();
562 res
563 }
564}
565
566pub(crate) fn edge_maps(
568 edges: &[(NodeRef, String, NodeRef, String, EdgePolicyKind)],
569) -> (Vec<Vec<usize>>, Vec<Vec<usize>>) {
570 let mut incoming: Vec<Vec<usize>> = Vec::new();
571 let mut outgoing: Vec<Vec<usize>> = Vec::new();
572 let grow = |v: &mut Vec<Vec<usize>>, idx: usize| {
573 while v.len() <= idx {
574 v.push(Vec::new());
575 }
576 };
577 for (idx, (from, _, to, _, _)) in edges.iter().enumerate() {
578 let f = from.0;
579 let t = to.0;
580 grow(&mut incoming, f.max(t));
581 grow(&mut outgoing, f.max(t));
582 outgoing[f].push(idx);
583 incoming[t].push(idx);
584 }
585 (incoming, outgoing)
586}