daedalus_engine/
engine.rs

1#[cfg_attr(not(feature = "gpu"), allow(unused_imports))]
2use std::sync::Arc;
3
4use daedalus_planner::{Graph, PlannerConfig, PlannerInput, build_plan};
5use daedalus_registry::store::Registry;
6use daedalus_runtime::ExecutionTelemetry;
7use daedalus_runtime::executor::{Executor, NodeHandler};
8use daedalus_runtime::{HostBridgeManager, RuntimePlan, SchedulerConfig, build_runtime};
9
10use crate::config::{EngineConfig, GpuBackend, RuntimeMode};
11use crate::error::EngineError;
12
13/// Result of a full engine run.
14///
15/// ```
16/// use daedalus_engine::RunResult;
17/// use daedalus_runtime::ExecutionTelemetry;
18/// use daedalus_runtime::RuntimePlan;
19///
20/// let result = RunResult {
21///     runtime_plan: RuntimePlan::from_execution(&daedalus_planner::ExecutionPlan::new(
22///         daedalus_planner::Graph::default(),
23///         vec![],
24///     )),
25///     telemetry: ExecutionTelemetry::default(),
26/// };
27/// assert!(result.runtime_plan.nodes.is_empty());
28/// ```
29pub struct RunResult {
30    pub runtime_plan: RuntimePlan,
31    pub telemetry: ExecutionTelemetry,
32}
33
34/// High-level engine facade for planning and execution.
35///
36/// ```no_run
37/// use daedalus_engine::{Engine, EngineConfig};
38/// let engine = Engine::new(EngineConfig::default()).unwrap();
39/// let _ = engine.config();
40/// ```
41pub struct Engine {
42    config: EngineConfig,
43    #[cfg(feature = "gpu")]
44    gpu_handle: std::sync::Mutex<Option<Arc<daedalus_gpu::GpuContextHandle>>>,
45}
46
47impl Engine {
48    /// Create a new engine from configuration.
49    pub fn new(config: EngineConfig) -> Result<Self, EngineError> {
50        config.validate().map_err(EngineError::Config)?;
51        if matches!(config.gpu, GpuBackend::Device | GpuBackend::Mock) && !cfg!(feature = "gpu") {
52            return Err(EngineError::FeatureDisabled("gpu"));
53        }
54        if config.runtime.lockfree_queues && !cfg!(feature = "lockfree-queues") {
55            return Err(EngineError::FeatureDisabled("lockfree-queues"));
56        }
57        Ok(Self {
58            config,
59            #[cfg(feature = "gpu")]
60            gpu_handle: std::sync::Mutex::new(None),
61        })
62    }
63
64    /// Return a reference to the engine config.
65    pub fn config(&self) -> &EngineConfig {
66        &self.config
67    }
68
69    /// Run planner on the provided graph + registry.
70    ///
71    /// ```no_run
72    /// use daedalus_engine::{Engine, EngineConfig};
73    /// use daedalus_registry::store::Registry;
74    /// use daedalus_planner::Graph;
75    /// let engine = Engine::new(EngineConfig::default()).unwrap();
76    /// let registry = Registry::new();
77    /// let _ = engine.plan(&registry, Graph::default());
78    /// ```
79    pub fn plan(
80        &self,
81        registry: &Registry,
82        graph: Graph,
83    ) -> Result<daedalus_planner::PlannerOutput, EngineError> {
84        let planner_cfg = self.planner_config()?;
85        let output = build_plan(PlannerInput { graph, registry }, planner_cfg);
86        let has_errors = output
87            .diagnostics
88            .iter()
89            .any(|d| !matches!(d.code, daedalus_planner::DiagnosticCode::LintWarning));
90        if has_errors {
91            return Err(EngineError::Planner(output.diagnostics));
92        }
93        Ok(output)
94    }
95
96    /// Construct a runtime plan from a planner plan using configured policies.
97    ///
98    /// ```no_run
99    /// use daedalus_engine::{Engine, EngineConfig};
100    /// use daedalus_planner::{ExecutionPlan, Graph};
101    /// let engine = Engine::new(EngineConfig::default()).unwrap();
102    /// let plan = ExecutionPlan::new(Graph::default(), vec![]);
103    /// let _ = engine.build_runtime_plan(&plan);
104    /// ```
105    pub fn build_runtime_plan(
106        &self,
107        plan: &daedalus_planner::ExecutionPlan,
108    ) -> Result<RuntimePlan, EngineError> {
109        let sched = SchedulerConfig {
110            default_policy: self.config.runtime.default_policy.clone(),
111            backpressure: self.config.runtime.backpressure.clone(),
112            lockfree_queues: self.config.runtime.lockfree_queues,
113        };
114        Ok(build_runtime(plan, &sched))
115    }
116
117    /// Execute a runtime plan using the provided handler.
118    ///
119    /// ```no_run
120    /// use daedalus_engine::{Engine, EngineConfig};
121    /// use daedalus_runtime::{RuntimePlan, RuntimeNode};
122    /// use daedalus_runtime::executor::NodeError;
123    /// use daedalus_planner::{ExecutionPlan, Graph};
124    ///
125    /// let engine = Engine::new(EngineConfig::default()).unwrap();
126    /// let plan = RuntimePlan::from_execution(&ExecutionPlan::new(Graph::default(), vec![]));
127    /// let handler = |_node: &RuntimeNode,
128    ///               _ctx: &daedalus_runtime::state::ExecutionContext,
129    ///               _io: &mut daedalus_runtime::io::NodeIo|
130    ///        -> Result<(), NodeError> { Ok(()) };
131    /// let _ = engine.execute(plan, handler);
132    /// ```
133    pub fn execute<H: NodeHandler + Send + Sync + 'static>(
134        &self,
135        runtime_plan: RuntimePlan,
136        handler: H,
137    ) -> Result<ExecutionTelemetry, EngineError> {
138        let mut exec = Executor::new(&runtime_plan, handler)
139            .with_metrics_level(self.config.runtime.metrics_level);
140        #[cfg(feature = "gpu")]
141        {
142            if let Some(gpu) = self.get_gpu_handle()? {
143                exec = exec.with_gpu((*gpu).clone());
144            }
145        }
146        #[cfg(not(feature = "gpu"))]
147        {
148            if !matches!(self.config.gpu, GpuBackend::Cpu) {
149                return Err(EngineError::FeatureDisabled("gpu"));
150            }
151            let _ = exec;
152        }
153
154        if matches!(self.config.runtime.mode, RuntimeMode::Parallel) {
155            exec = exec.with_pool_size(self.config.runtime.pool_size);
156        }
157
158        let telemetry = match self.config.runtime.mode {
159            RuntimeMode::Serial => exec.run(),
160            RuntimeMode::Parallel => exec.run_parallel(),
161        }?;
162        Ok(telemetry)
163    }
164
165    /// Execute a runtime plan with host bridge support.
166    ///
167    /// ```no_run
168    /// use daedalus_engine::{Engine, EngineConfig};
169    /// use daedalus_runtime::{HostBridgeManager, RuntimePlan, RuntimeNode};
170    /// use daedalus_runtime::executor::NodeError;
171    /// use daedalus_planner::{ExecutionPlan, Graph};
172    ///
173    /// let engine = Engine::new(EngineConfig::default()).unwrap();
174    /// let plan = RuntimePlan::from_execution(&ExecutionPlan::new(Graph::default(), vec![]));
175    /// let host = HostBridgeManager::new();
176    /// let handler = |_node: &RuntimeNode,
177    ///               _ctx: &daedalus_runtime::state::ExecutionContext,
178    ///               _io: &mut daedalus_runtime::io::NodeIo|
179    ///        -> Result<(), NodeError> { Ok(()) };
180    /// let _ = engine.execute_with_host(plan, host, handler);
181    /// ```
182    pub fn execute_with_host<H: NodeHandler + Send + Sync + 'static>(
183        &self,
184        runtime_plan: RuntimePlan,
185        host: HostBridgeManager,
186        handler: H,
187    ) -> Result<ExecutionTelemetry, EngineError> {
188        let mut exec = Executor::new(&runtime_plan, handler)
189            .with_host_bridges(host)
190            .with_metrics_level(self.config.runtime.metrics_level);
191        #[cfg(feature = "gpu")]
192        {
193            if let Some(gpu) = self.get_gpu_handle()? {
194                exec = exec.with_gpu((*gpu).clone());
195            }
196        }
197        #[cfg(not(feature = "gpu"))]
198        {
199            if !matches!(self.config.gpu, GpuBackend::Cpu) {
200                return Err(EngineError::FeatureDisabled("gpu"));
201            }
202            let _ = exec;
203        }
204
205        if matches!(self.config.runtime.mode, RuntimeMode::Parallel) {
206            exec = exec.with_pool_size(self.config.runtime.pool_size);
207        }
208
209        let telemetry = match self.config.runtime.mode {
210            RuntimeMode::Serial => exec.run(),
211            RuntimeMode::Parallel => exec.run_parallel(),
212        }?;
213        Ok(telemetry)
214    }
215
216    /// Full run: load registry (if not provided), plan, and execute.
217    ///
218    /// ```no_run
219    /// use daedalus_engine::{Engine, EngineConfig};
220    /// use daedalus_registry::store::Registry;
221    /// use daedalus_planner::Graph;
222    /// use daedalus_runtime::executor::NodeError;
223    ///
224    /// let engine = Engine::new(EngineConfig::default()).unwrap();
225    /// let registry = Registry::new();
226    /// let handler = |_node: &daedalus_runtime::RuntimeNode,
227    ///               _ctx: &daedalus_runtime::state::ExecutionContext,
228    ///               _io: &mut daedalus_runtime::io::NodeIo|
229    ///        -> Result<(), NodeError> { Ok(()) };
230    /// let _ = engine.run(&registry, Graph::default(), handler);
231    /// ```
232    pub fn run<H: NodeHandler + Send + Sync + 'static>(
233        &self,
234        registry: &Registry,
235        graph: Graph,
236        handler: H,
237    ) -> Result<RunResult, EngineError> {
238        let planner_output = self.plan(registry, graph)?;
239        let runtime_plan = self.build_runtime_plan(&planner_output.plan)?;
240        let telemetry = self.execute(runtime_plan.clone(), handler)?;
241        Ok(RunResult {
242            runtime_plan,
243            telemetry,
244        })
245    }
246
247    fn planner_config(&self) -> Result<PlannerConfig, EngineError> {
248        if self.config.planner.enable_gpu
249            && !cfg!(feature = "gpu")
250            && matches!(self.config.gpu, GpuBackend::Device | GpuBackend::Mock)
251        {
252            return Err(EngineError::FeatureDisabled("gpu"));
253        }
254        #[allow(unused_mut)]
255        let mut cfg = PlannerConfig {
256            enable_gpu: self.config.planner.enable_gpu,
257            enable_lints: self.config.planner.enable_lints,
258            active_features: self.config.planner.active_features.clone(),
259            #[cfg(feature = "gpu")]
260            gpu_caps: None,
261        };
262        #[cfg(feature = "gpu")]
263        {
264            if self.config.planner.enable_gpu {
265                let ctx = self.get_gpu_handle()?;
266                cfg.gpu_caps = ctx.as_ref().map(|c| c.capabilities());
267            }
268        }
269        #[cfg(not(feature = "gpu"))]
270        {
271            let _ = cfg.enable_gpu;
272        }
273        Ok(cfg)
274    }
275
276    #[cfg(feature = "gpu")]
277    fn get_gpu_handle(&self) -> Result<Option<Arc<daedalus_gpu::GpuContextHandle>>, EngineError> {
278        use daedalus_gpu::{GpuBackendKind, GpuOptions, select_backend};
279        if matches!(self.config.gpu, GpuBackend::Cpu) {
280            return Ok(None);
281        }
282        let mut guard = self.gpu_handle.lock().unwrap();
283        if let Some(handle) = guard.as_ref() {
284            return Ok(Some(handle.clone()));
285        }
286        let opts = match self.config.gpu {
287            GpuBackend::Cpu => return Ok(None),
288            GpuBackend::Mock => GpuOptions {
289                preferred_backend: Some(GpuBackendKind::Mock),
290                adapter_label: None,
291                allow_software: true,
292            },
293            GpuBackend::Device => GpuOptions {
294                preferred_backend: Some(GpuBackendKind::Wgpu),
295                adapter_label: None,
296                allow_software: false,
297            },
298        };
299        let handle = Arc::new(select_backend(&opts)?);
300        *guard = Some(handle.clone());
301        Ok(Some(handle))
302    }
303}