daedalus_engine/
engine.rs1#[cfg_attr(not(feature = "gpu"), allow(unused_imports))]
2use std::sync::Arc;
3
4use daedalus_planner::{Graph, PlannerConfig, PlannerInput, build_plan};
5use daedalus_registry::store::Registry;
6use daedalus_runtime::ExecutionTelemetry;
7use daedalus_runtime::executor::{Executor, NodeHandler};
8use daedalus_runtime::{HostBridgeManager, RuntimePlan, SchedulerConfig, build_runtime};
9
10use crate::config::{EngineConfig, GpuBackend, RuntimeMode};
11use crate::error::EngineError;
12
13pub struct RunResult {
30 pub runtime_plan: RuntimePlan,
31 pub telemetry: ExecutionTelemetry,
32}
33
34pub struct Engine {
42 config: EngineConfig,
43 #[cfg(feature = "gpu")]
44 gpu_handle: std::sync::Mutex<Option<Arc<daedalus_gpu::GpuContextHandle>>>,
45}
46
47impl Engine {
48 pub fn new(config: EngineConfig) -> Result<Self, EngineError> {
50 config.validate().map_err(EngineError::Config)?;
51 if matches!(config.gpu, GpuBackend::Device | GpuBackend::Mock) && !cfg!(feature = "gpu") {
52 return Err(EngineError::FeatureDisabled("gpu"));
53 }
54 if config.runtime.lockfree_queues && !cfg!(feature = "lockfree-queues") {
55 return Err(EngineError::FeatureDisabled("lockfree-queues"));
56 }
57 Ok(Self {
58 config,
59 #[cfg(feature = "gpu")]
60 gpu_handle: std::sync::Mutex::new(None),
61 })
62 }
63
64 pub fn config(&self) -> &EngineConfig {
66 &self.config
67 }
68
69 pub fn plan(
80 &self,
81 registry: &Registry,
82 graph: Graph,
83 ) -> Result<daedalus_planner::PlannerOutput, EngineError> {
84 let planner_cfg = self.planner_config()?;
85 let output = build_plan(PlannerInput { graph, registry }, planner_cfg);
86 let has_errors = output
87 .diagnostics
88 .iter()
89 .any(|d| !matches!(d.code, daedalus_planner::DiagnosticCode::LintWarning));
90 if has_errors {
91 return Err(EngineError::Planner(output.diagnostics));
92 }
93 Ok(output)
94 }
95
96 pub fn build_runtime_plan(
106 &self,
107 plan: &daedalus_planner::ExecutionPlan,
108 ) -> Result<RuntimePlan, EngineError> {
109 let sched = SchedulerConfig {
110 default_policy: self.config.runtime.default_policy.clone(),
111 backpressure: self.config.runtime.backpressure.clone(),
112 lockfree_queues: self.config.runtime.lockfree_queues,
113 };
114 Ok(build_runtime(plan, &sched))
115 }
116
117 pub fn execute<H: NodeHandler + Send + Sync + 'static>(
134 &self,
135 runtime_plan: RuntimePlan,
136 handler: H,
137 ) -> Result<ExecutionTelemetry, EngineError> {
138 let mut exec = Executor::new(&runtime_plan, handler)
139 .with_metrics_level(self.config.runtime.metrics_level);
140 #[cfg(feature = "gpu")]
141 {
142 if let Some(gpu) = self.get_gpu_handle()? {
143 exec = exec.with_gpu((*gpu).clone());
144 }
145 }
146 #[cfg(not(feature = "gpu"))]
147 {
148 if !matches!(self.config.gpu, GpuBackend::Cpu) {
149 return Err(EngineError::FeatureDisabled("gpu"));
150 }
151 let _ = exec;
152 }
153
154 if matches!(self.config.runtime.mode, RuntimeMode::Parallel) {
155 exec = exec.with_pool_size(self.config.runtime.pool_size);
156 }
157
158 let telemetry = match self.config.runtime.mode {
159 RuntimeMode::Serial => exec.run(),
160 RuntimeMode::Parallel => exec.run_parallel(),
161 }?;
162 Ok(telemetry)
163 }
164
165 pub fn execute_with_host<H: NodeHandler + Send + Sync + 'static>(
183 &self,
184 runtime_plan: RuntimePlan,
185 host: HostBridgeManager,
186 handler: H,
187 ) -> Result<ExecutionTelemetry, EngineError> {
188 let mut exec = Executor::new(&runtime_plan, handler)
189 .with_host_bridges(host)
190 .with_metrics_level(self.config.runtime.metrics_level);
191 #[cfg(feature = "gpu")]
192 {
193 if let Some(gpu) = self.get_gpu_handle()? {
194 exec = exec.with_gpu((*gpu).clone());
195 }
196 }
197 #[cfg(not(feature = "gpu"))]
198 {
199 if !matches!(self.config.gpu, GpuBackend::Cpu) {
200 return Err(EngineError::FeatureDisabled("gpu"));
201 }
202 let _ = exec;
203 }
204
205 if matches!(self.config.runtime.mode, RuntimeMode::Parallel) {
206 exec = exec.with_pool_size(self.config.runtime.pool_size);
207 }
208
209 let telemetry = match self.config.runtime.mode {
210 RuntimeMode::Serial => exec.run(),
211 RuntimeMode::Parallel => exec.run_parallel(),
212 }?;
213 Ok(telemetry)
214 }
215
216 pub fn run<H: NodeHandler + Send + Sync + 'static>(
233 &self,
234 registry: &Registry,
235 graph: Graph,
236 handler: H,
237 ) -> Result<RunResult, EngineError> {
238 let planner_output = self.plan(registry, graph)?;
239 let runtime_plan = self.build_runtime_plan(&planner_output.plan)?;
240 let telemetry = self.execute(runtime_plan.clone(), handler)?;
241 Ok(RunResult {
242 runtime_plan,
243 telemetry,
244 })
245 }
246
247 fn planner_config(&self) -> Result<PlannerConfig, EngineError> {
248 if self.config.planner.enable_gpu
249 && !cfg!(feature = "gpu")
250 && matches!(self.config.gpu, GpuBackend::Device | GpuBackend::Mock)
251 {
252 return Err(EngineError::FeatureDisabled("gpu"));
253 }
254 #[allow(unused_mut)]
255 let mut cfg = PlannerConfig {
256 enable_gpu: self.config.planner.enable_gpu,
257 enable_lints: self.config.planner.enable_lints,
258 active_features: self.config.planner.active_features.clone(),
259 #[cfg(feature = "gpu")]
260 gpu_caps: None,
261 };
262 #[cfg(feature = "gpu")]
263 {
264 if self.config.planner.enable_gpu {
265 let ctx = self.get_gpu_handle()?;
266 cfg.gpu_caps = ctx.as_ref().map(|c| c.capabilities());
267 }
268 }
269 #[cfg(not(feature = "gpu"))]
270 {
271 let _ = cfg.enable_gpu;
272 }
273 Ok(cfg)
274 }
275
276 #[cfg(feature = "gpu")]
277 fn get_gpu_handle(&self) -> Result<Option<Arc<daedalus_gpu::GpuContextHandle>>, EngineError> {
278 use daedalus_gpu::{GpuBackendKind, GpuOptions, select_backend};
279 if matches!(self.config.gpu, GpuBackend::Cpu) {
280 return Ok(None);
281 }
282 let mut guard = self.gpu_handle.lock().unwrap();
283 if let Some(handle) = guard.as_ref() {
284 return Ok(Some(handle.clone()));
285 }
286 let opts = match self.config.gpu {
287 GpuBackend::Cpu => return Ok(None),
288 GpuBackend::Mock => GpuOptions {
289 preferred_backend: Some(GpuBackendKind::Mock),
290 adapter_label: None,
291 allow_software: true,
292 },
293 GpuBackend::Device => GpuOptions {
294 preferred_backend: Some(GpuBackendKind::Wgpu),
295 adapter_label: None,
296 allow_software: false,
297 },
298 };
299 let handle = Arc::new(select_backend(&opts)?);
300 *guard = Some(handle.clone());
301 Ok(Some(handle))
302 }
303}