1use std::collections::HashMap;
7use std::path::Path;
8use std::sync::{Arc, Mutex};
9
10use crate::compile::CompiledCell;
11use crate::error::{Error, Result};
12use crate::graph::CellId;
13use crate::ipc::{WorkerKillHandle, WorkerPool};
14use crate::state::{BoxedOutput, StateManager};
15
16use super::context::{AbortHandle, ExecutionCallback};
17
18pub struct ProcessExecutor {
26 cells: HashMap<CellId, CompiledCellInfo>,
28 state: StateManager,
30 callback: Option<Box<dyn ExecutionCallback>>,
32 abort_handle: Option<AbortHandle>,
34 worker_pool: WorkerPool,
36 current_worker_kill: Arc<Mutex<Option<WorkerKillHandle>>>,
40}
41
42struct CompiledCellInfo {
44 compiled: CompiledCell,
45 dep_count: usize,
46}
47
48#[derive(Clone)]
53pub struct ExecutorKillHandle {
54 inner: Arc<Mutex<Option<WorkerKillHandle>>>,
55}
56
57impl ExecutorKillHandle {
58 pub fn kill(&self) {
62 match self.inner.lock() {
63 Ok(guard) => {
64 if let Some(ref kill_handle) = *guard {
65 tracing::info!("ExecutorKillHandle: found worker kill handle, calling kill()");
66 kill_handle.kill();
67 } else {
68 tracing::warn!("ExecutorKillHandle: inner is None (worker not spawned or already finished)");
69 }
70 }
71 Err(e) => {
72 tracing::error!("ExecutorKillHandle: failed to lock mutex: {}", e);
73 }
74 }
75 }
76}
77
78impl ProcessExecutor {
79 pub fn new(state_dir: impl AsRef<Path>) -> Result<Self> {
81 Ok(Self {
82 cells: HashMap::new(),
83 state: StateManager::new(state_dir)?,
84 callback: None,
85 abort_handle: None,
86 worker_pool: WorkerPool::new(4), current_worker_kill: Arc::new(Mutex::new(None)),
88 })
89 }
90
91 pub fn with_state(state: StateManager) -> Self {
93 Self {
94 cells: HashMap::new(),
95 state,
96 callback: None,
97 abort_handle: None,
98 worker_pool: WorkerPool::new(4),
99 current_worker_kill: Arc::new(Mutex::new(None)),
100 }
101 }
102
103 pub fn with_warm_pool(state_dir: impl AsRef<Path>, pool_size: usize) -> Result<Self> {
105 Ok(Self {
106 cells: HashMap::new(),
107 state: StateManager::new(state_dir)?,
108 callback: None,
109 abort_handle: None,
110 worker_pool: WorkerPool::with_warm_workers(pool_size, pool_size.min(2))?,
111 current_worker_kill: Arc::new(Mutex::new(None)),
112 })
113 }
114
115 pub fn set_callback(&mut self, callback: impl ExecutionCallback + 'static) {
117 self.callback = Some(Box::new(callback));
118 }
119
120 pub fn set_abort_handle(&mut self, handle: AbortHandle) {
122 self.abort_handle = Some(handle);
123 }
124
125 pub fn abort_handle(&self) -> Option<&AbortHandle> {
127 self.abort_handle.as_ref()
128 }
129
130 fn is_aborted(&self) -> bool {
132 self.abort_handle
133 .as_ref()
134 .is_some_and(|h| h.is_aborted())
135 }
136
137 pub fn register_cell(&mut self, compiled: CompiledCell, dep_count: usize) {
142 let cell_id = compiled.cell_id;
143 self.cells.insert(cell_id, CompiledCellInfo {
144 compiled,
145 dep_count,
146 });
147 }
148
149 pub fn unregister_cell(&mut self, cell_id: CellId) -> Option<CompiledCell> {
151 self.cells.remove(&cell_id).map(|info| info.compiled)
152 }
153
154 pub fn is_registered(&self, cell_id: CellId) -> bool {
156 self.cells.contains_key(&cell_id)
157 }
158
159 pub fn execute_cell(
163 &mut self,
164 cell_id: CellId,
165 inputs: &[Arc<BoxedOutput>],
166 ) -> Result<BoxedOutput> {
167 self.execute_cell_with_widgets(cell_id, inputs, Vec::new())
168 .map(|(output, _widgets_json)| output)
169 }
170
171 pub fn execute_cell_with_widgets(
176 &mut self,
177 cell_id: CellId,
178 inputs: &[Arc<BoxedOutput>],
179 widget_values_json: Vec<u8>,
180 ) -> Result<(BoxedOutput, Vec<u8>)> {
181 if self.is_aborted() {
183 return Err(Error::Aborted);
184 }
185
186 let info = self
187 .cells
188 .get(&cell_id)
189 .ok_or_else(|| Error::CellNotFound(format!("Cell {:?} not registered", cell_id)))?;
190
191 let compiled = &info.compiled;
192 let dep_count = info.dep_count;
193
194 if let Some(ref callback) = self.callback {
196 callback.on_cell_started(cell_id, &compiled.name);
197 }
198
199 let mut worker = self.worker_pool.get()?;
201
202 {
204 let mut kill_guard = self.current_worker_kill.lock().unwrap();
205 *kill_guard = Some(WorkerKillHandle::new(&worker));
206 }
207
208 worker.load_cell(
210 compiled.dylib_path.clone(),
211 dep_count,
212 compiled.entry_symbol.clone(),
213 compiled.name.clone(),
214 )?;
215
216 let input_bytes: Vec<Vec<u8>> = inputs
218 .iter()
219 .map(|output| output.bytes().to_vec())
220 .collect();
221
222 if self.is_aborted() {
224 let _ = worker.kill();
226 {
227 let mut kill_guard = self.current_worker_kill.lock().unwrap();
228 *kill_guard = None;
229 }
230 if let Some(ref callback) = self.callback {
231 callback.on_cell_error(cell_id, &compiled.name, &Error::Aborted);
232 }
233 return Err(Error::Aborted);
234 }
235
236 let result = worker.execute_with_widgets(input_bytes, widget_values_json);
238
239 {
241 let mut kill_guard = self.current_worker_kill.lock().unwrap();
242 *kill_guard = None;
243 }
244
245 self.worker_pool.put(worker);
247
248 if self.is_aborted() {
250 if let Some(ref callback) = self.callback {
251 callback.on_cell_error(cell_id, &compiled.name, &Error::Aborted);
252 }
253 return Err(Error::Aborted);
254 }
255
256 match result {
258 Ok((bytes, widgets_json)) => {
259 let output = self.parse_output_bytes(&bytes, &compiled.name)?;
261
262 if let Some(ref callback) = self.callback {
263 callback.on_cell_completed(cell_id, &compiled.name);
264 }
265
266 Ok((output, widgets_json))
267 }
268 Err(e) => {
269 if let Some(ref callback) = self.callback {
270 callback.on_cell_error(cell_id, &compiled.name, &e);
271 }
272 Err(e)
273 }
274 }
275 }
276
277 fn parse_output_bytes(&self, bytes: &[u8], cell_name: &str) -> Result<BoxedOutput> {
284 if bytes.len() < 8 {
285 return Err(Error::Execution(format!(
286 "Cell {} output too short: {} bytes (need at least 8 for header)",
287 cell_name,
288 bytes.len()
289 )));
290 }
291
292 let display_len = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize;
294 let display_end = 8 + display_len;
295
296 if bytes.len() < display_end {
297 return Err(Error::Execution(format!(
298 "Cell {} output too short for display data",
299 cell_name
300 )));
301 }
302
303 let display_text = String::from_utf8_lossy(&bytes[8..display_end]).to_string();
306 let rkyv_data = bytes[display_end..].to_vec();
307
308 Ok(BoxedOutput::from_raw_bytes_with_display(rkyv_data, display_text))
309 }
310
311 pub fn execute_and_store(
313 &mut self,
314 cell_id: CellId,
315 inputs: &[Arc<BoxedOutput>],
316 ) -> Result<()> {
317 let output = self.execute_cell(cell_id, inputs)?;
318 self.state.store_output(cell_id, output);
319 Ok(())
320 }
321
322 pub fn execute_in_order(
324 &mut self,
325 order: &[CellId],
326 deps: &HashMap<CellId, Vec<CellId>>,
327 ) -> Result<()> {
328 for &cell_id in order {
329 if self.is_aborted() {
331 return Err(Error::Aborted);
332 }
333
334 let dep_ids = deps.get(&cell_id).cloned().unwrap_or_default();
336 let inputs: Vec<Arc<BoxedOutput>> = dep_ids
337 .iter()
338 .filter_map(|&dep_id| self.state.get_output(dep_id))
339 .collect();
340
341 if inputs.len() != dep_ids.len() {
343 return Err(Error::Execution(format!(
344 "Missing dependencies for cell {:?}: expected {}, got {}",
345 cell_id,
346 dep_ids.len(),
347 inputs.len()
348 )));
349 }
350
351 self.execute_and_store(cell_id, &inputs)?;
352 }
353
354 Ok(())
355 }
356
357 pub fn kill_current(&self) {
363 if let Ok(guard) = self.current_worker_kill.lock()
364 && let Some(ref kill_handle) = *guard {
365 kill_handle.kill();
366 }
367 }
368
369 pub fn get_kill_handle(&self) -> Option<ExecutorKillHandle> {
374 Some(ExecutorKillHandle {
375 inner: self.current_worker_kill.clone(),
376 })
377 }
378
379 pub fn abort(&mut self) {
383 if let Some(ref handle) = self.abort_handle {
384 handle.abort();
385 }
386 self.kill_current();
387 }
388
389 pub fn state(&self) -> &StateManager {
391 &self.state
392 }
393
394 pub fn state_mut(&mut self) -> &mut StateManager {
396 &mut self.state
397 }
398
399 pub fn shutdown(&mut self) {
401 self.worker_pool.shutdown();
402 }
403}
404
405impl Drop for ProcessExecutor {
406 fn drop(&mut self) {
407 self.shutdown();
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 #[test]
416 fn test_process_executor_creation() {
417 let temp = tempfile::TempDir::new().unwrap();
418 let executor = ProcessExecutor::new(temp.path()).unwrap();
419 assert!(executor.cells.is_empty());
420 }
421
422 #[test]
423 #[ignore = "Requires venus-worker binary"]
424 fn test_process_executor_worker_pool() {
425 let temp = tempfile::TempDir::new().unwrap();
426 let executor = ProcessExecutor::with_warm_pool(temp.path(), 2).unwrap();
427 assert_eq!(executor.worker_pool.available_count(), 2);
428 }
429}