venus_core/execute/
process.rs

1//! Process-based executor for isolated cell execution.
2//!
3//! Provides true interruption capability by running cells in separate
4//! worker processes that can be killed at any time.
5
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::{Arc, Mutex};
9
10use crate::compile::CompiledCell;
11use crate::error::{Error, Result};
12use crate::graph::CellId;
13use crate::ipc::{WorkerKillHandle, WorkerPool};
14use crate::state::{BoxedOutput, StateManager};
15
16use super::context::{AbortHandle, ExecutionCallback};
17
18/// Process-based executor that runs cells in isolated worker processes.
19///
20/// Unlike `LinearExecutor`, this executor can truly interrupt cell execution
21/// by killing the worker process. This provides:
22/// - Immediate interruption (no need for cooperative checks)
23/// - Crash isolation (panics don't affect the server)
24/// - Memory isolation (runaway cells can't OOM the server)
25pub struct ProcessExecutor {
26    /// Compiled cells (we don't load them here, workers do)
27    cells: HashMap<CellId, CompiledCellInfo>,
28    /// State manager for inputs/outputs
29    state: StateManager,
30    /// Execution callback for progress reporting
31    callback: Option<Box<dyn ExecutionCallback>>,
32    /// Abort handle for interruption
33    abort_handle: Option<AbortHandle>,
34    /// Worker pool for process reuse
35    worker_pool: WorkerPool,
36    /// Currently executing worker kill handle (thread-safe for external kill).
37    /// This is wrapped in Arc<Mutex<>> so it can be cloned and killed from
38    /// another thread while execute_cell is running.
39    current_worker_kill: Arc<Mutex<Option<WorkerKillHandle>>>,
40}
41
42/// Info about a compiled cell (without the loaded library)
43struct CompiledCellInfo {
44    compiled: CompiledCell,
45    dep_count: usize,
46}
47
48/// Thread-safe handle for killing an executor's current cell from another thread.
49///
50/// This can be cloned and passed to another thread, then used to kill
51/// whatever cell is currently executing.
52#[derive(Clone)]
53pub struct ExecutorKillHandle {
54    inner: Arc<Mutex<Option<WorkerKillHandle>>>,
55}
56
57impl ExecutorKillHandle {
58    /// Kill the currently executing cell.
59    ///
60    /// If no cell is executing, this is a no-op.
61    pub fn kill(&self) {
62        match self.inner.lock() {
63            Ok(guard) => {
64                if let Some(ref kill_handle) = *guard {
65                    tracing::info!("ExecutorKillHandle: found worker kill handle, calling kill()");
66                    kill_handle.kill();
67                } else {
68                    tracing::warn!("ExecutorKillHandle: inner is None (worker not spawned or already finished)");
69                }
70            }
71            Err(e) => {
72                tracing::error!("ExecutorKillHandle: failed to lock mutex: {}", e);
73            }
74        }
75    }
76}
77
78impl ProcessExecutor {
79    /// Create a new process executor.
80    pub fn new(state_dir: impl AsRef<Path>) -> Result<Self> {
81        Ok(Self {
82            cells: HashMap::new(),
83            state: StateManager::new(state_dir)?,
84            callback: None,
85            abort_handle: None,
86            worker_pool: WorkerPool::new(4), // Pool of up to 4 workers
87            current_worker_kill: Arc::new(Mutex::new(None)),
88        })
89    }
90
91    /// Create with an existing state manager.
92    pub fn with_state(state: StateManager) -> Self {
93        Self {
94            cells: HashMap::new(),
95            state,
96            callback: None,
97            abort_handle: None,
98            worker_pool: WorkerPool::new(4),
99            current_worker_kill: Arc::new(Mutex::new(None)),
100        }
101    }
102
103    /// Create with a pre-warmed worker pool.
104    pub fn with_warm_pool(state_dir: impl AsRef<Path>, pool_size: usize) -> Result<Self> {
105        Ok(Self {
106            cells: HashMap::new(),
107            state: StateManager::new(state_dir)?,
108            callback: None,
109            abort_handle: None,
110            worker_pool: WorkerPool::with_warm_workers(pool_size, pool_size.min(2))?,
111            current_worker_kill: Arc::new(Mutex::new(None)),
112        })
113    }
114
115    /// Set the execution callback for progress reporting.
116    pub fn set_callback(&mut self, callback: impl ExecutionCallback + 'static) {
117        self.callback = Some(Box::new(callback));
118    }
119
120    /// Set the abort handle for interruption.
121    pub fn set_abort_handle(&mut self, handle: AbortHandle) {
122        self.abort_handle = Some(handle);
123    }
124
125    /// Get the current abort handle.
126    pub fn abort_handle(&self) -> Option<&AbortHandle> {
127        self.abort_handle.as_ref()
128    }
129
130    /// Check if execution has been aborted.
131    fn is_aborted(&self) -> bool {
132        self.abort_handle
133            .as_ref()
134            .is_some_and(|h| h.is_aborted())
135    }
136
137    /// Register a compiled cell for execution.
138    ///
139    /// Unlike `LinearExecutor::load_cell`, this doesn't actually load the dylib.
140    /// The worker process will load it when executing.
141    pub fn register_cell(&mut self, compiled: CompiledCell, dep_count: usize) {
142        let cell_id = compiled.cell_id;
143        self.cells.insert(cell_id, CompiledCellInfo {
144            compiled,
145            dep_count,
146        });
147    }
148
149    /// Unregister a cell.
150    pub fn unregister_cell(&mut self, cell_id: CellId) -> Option<CompiledCell> {
151        self.cells.remove(&cell_id).map(|info| info.compiled)
152    }
153
154    /// Check if a cell is registered.
155    pub fn is_registered(&self, cell_id: CellId) -> bool {
156        self.cells.contains_key(&cell_id)
157    }
158
159    /// Execute a single cell with the given inputs.
160    ///
161    /// This runs the cell in a worker process that can be killed for interruption.
162    pub fn execute_cell(
163        &mut self,
164        cell_id: CellId,
165        inputs: &[Arc<BoxedOutput>],
166    ) -> Result<BoxedOutput> {
167        self.execute_cell_with_widgets(cell_id, inputs, Vec::new())
168            .map(|(output, _widgets_json)| output)
169    }
170
171    /// Execute a single cell with the given inputs and widget values.
172    ///
173    /// This runs the cell in a worker process that can be killed for interruption.
174    /// Returns the cell output and any registered widget definitions as JSON.
175    pub fn execute_cell_with_widgets(
176        &mut self,
177        cell_id: CellId,
178        inputs: &[Arc<BoxedOutput>],
179        widget_values_json: Vec<u8>,
180    ) -> Result<(BoxedOutput, Vec<u8>)> {
181        // Check for abort before starting
182        if self.is_aborted() {
183            return Err(Error::Aborted);
184        }
185
186        let info = self
187            .cells
188            .get(&cell_id)
189            .ok_or_else(|| Error::CellNotFound(format!("Cell {:?} not registered", cell_id)))?;
190
191        let compiled = &info.compiled;
192        let dep_count = info.dep_count;
193
194        // Notify callback
195        if let Some(ref callback) = self.callback {
196            callback.on_cell_started(cell_id, &compiled.name);
197        }
198
199        // Get a worker from the pool
200        let mut worker = self.worker_pool.get()?;
201
202        // Store kill handle for potential interruption (thread-safe)
203        {
204            let mut kill_guard = self.current_worker_kill.lock().unwrap();
205            *kill_guard = Some(WorkerKillHandle::new(&worker));
206        }
207
208        // Load the cell in the worker
209        worker.load_cell(
210            compiled.dylib_path.clone(),
211            dep_count,
212            compiled.entry_symbol.clone(),
213            compiled.name.clone(),
214        )?;
215
216        // Prepare inputs as raw bytes
217        let input_bytes: Vec<Vec<u8>> = inputs
218            .iter()
219            .map(|output| output.bytes().to_vec())
220            .collect();
221
222        // Check for abort after load
223        if self.is_aborted() {
224            // Kill the worker and return abort error
225            let _ = worker.kill();
226            {
227                let mut kill_guard = self.current_worker_kill.lock().unwrap();
228                *kill_guard = None;
229            }
230            if let Some(ref callback) = self.callback {
231                callback.on_cell_error(cell_id, &compiled.name, &Error::Aborted);
232            }
233            return Err(Error::Aborted);
234        }
235
236        // Execute the cell with widget values
237        let result = worker.execute_with_widgets(input_bytes, widget_values_json);
238
239        // Clear kill handle
240        {
241            let mut kill_guard = self.current_worker_kill.lock().unwrap();
242            *kill_guard = None;
243        }
244
245        // Return worker to pool (if still alive)
246        self.worker_pool.put(worker);
247
248        // Check for abort after execution
249        if self.is_aborted() {
250            if let Some(ref callback) = self.callback {
251                callback.on_cell_error(cell_id, &compiled.name, &Error::Aborted);
252            }
253            return Err(Error::Aborted);
254        }
255
256        // Process result
257        match result {
258            Ok((bytes, widgets_json)) => {
259                // Parse the output bytes into BoxedOutput
260                let output = self.parse_output_bytes(&bytes, &compiled.name)?;
261
262                if let Some(ref callback) = self.callback {
263                    callback.on_cell_completed(cell_id, &compiled.name);
264                }
265
266                Ok((output, widgets_json))
267            }
268            Err(e) => {
269                if let Some(ref callback) = self.callback {
270                    callback.on_cell_error(cell_id, &compiled.name, &e);
271                }
272                Err(e)
273            }
274        }
275    }
276
277    /// Parse output bytes from worker into BoxedOutput.
278    ///
279    /// Output format (after worker strips widget data):
280    /// - display_len (8 bytes, u64 LE): length of display string
281    /// - display_bytes (N bytes): display string (UTF-8)
282    /// - rkyv_data (remaining bytes): rkyv-serialized data
283    fn parse_output_bytes(&self, bytes: &[u8], cell_name: &str) -> Result<BoxedOutput> {
284        if bytes.len() < 8 {
285            return Err(Error::Execution(format!(
286                "Cell {} output too short: {} bytes (need at least 8 for header)",
287                cell_name,
288                bytes.len()
289            )));
290        }
291
292        // Read display_len
293        let display_len = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize;
294        let display_end = 8 + display_len;
295
296        if bytes.len() < display_end {
297            return Err(Error::Execution(format!(
298                "Cell {} output too short for display data",
299                cell_name
300            )));
301        }
302
303        // Worker already stripped widgets_len and widgets_json
304        // Format is: display_len | display_bytes | rkyv_data
305        let display_text = String::from_utf8_lossy(&bytes[8..display_end]).to_string();
306        let rkyv_data = bytes[display_end..].to_vec();
307
308        Ok(BoxedOutput::from_raw_bytes_with_display(rkyv_data, display_text))
309    }
310
311    /// Execute a cell and store the output in the state manager.
312    pub fn execute_and_store(
313        &mut self,
314        cell_id: CellId,
315        inputs: &[Arc<BoxedOutput>],
316    ) -> Result<()> {
317        let output = self.execute_cell(cell_id, inputs)?;
318        self.state.store_output(cell_id, output);
319        Ok(())
320    }
321
322    /// Execute cells in the given order, resolving dependencies from state.
323    pub fn execute_in_order(
324        &mut self,
325        order: &[CellId],
326        deps: &HashMap<CellId, Vec<CellId>>,
327    ) -> Result<()> {
328        for &cell_id in order {
329            // Check for abort before each cell
330            if self.is_aborted() {
331                return Err(Error::Aborted);
332            }
333
334            // Gather inputs from dependencies
335            let dep_ids = deps.get(&cell_id).cloned().unwrap_or_default();
336            let inputs: Vec<Arc<BoxedOutput>> = dep_ids
337                .iter()
338                .filter_map(|&dep_id| self.state.get_output(dep_id))
339                .collect();
340
341            // Check we have all required inputs
342            if inputs.len() != dep_ids.len() {
343                return Err(Error::Execution(format!(
344                    "Missing dependencies for cell {:?}: expected {}, got {}",
345                    cell_id,
346                    dep_ids.len(),
347                    inputs.len()
348                )));
349            }
350
351            self.execute_and_store(cell_id, &inputs)?;
352        }
353
354        Ok(())
355    }
356
357    /// Kill the currently executing cell immediately.
358    ///
359    /// This is the key feature - we can terminate the worker process
360    /// mid-computation without any cooperation from the cell.
361    /// This method is thread-safe and can be called from any thread.
362    pub fn kill_current(&self) {
363        if let Ok(guard) = self.current_worker_kill.lock()
364            && let Some(ref kill_handle) = *guard {
365                kill_handle.kill();
366            }
367    }
368
369    /// Get a handle that can be used to kill the current execution from another thread.
370    ///
371    /// Returns `None` if no execution is in progress.
372    /// The returned handle is safe to clone and use from any thread.
373    pub fn get_kill_handle(&self) -> Option<ExecutorKillHandle> {
374        Some(ExecutorKillHandle {
375            inner: self.current_worker_kill.clone(),
376        })
377    }
378
379    /// Abort execution and kill any running cell.
380    ///
381    /// Sets the abort flag and kills the current worker.
382    pub fn abort(&mut self) {
383        if let Some(ref handle) = self.abort_handle {
384            handle.abort();
385        }
386        self.kill_current();
387    }
388
389    /// Get a reference to the state manager.
390    pub fn state(&self) -> &StateManager {
391        &self.state
392    }
393
394    /// Get a mutable reference to the state manager.
395    pub fn state_mut(&mut self) -> &mut StateManager {
396        &mut self.state
397    }
398
399    /// Shutdown the executor and all workers.
400    pub fn shutdown(&mut self) {
401        self.worker_pool.shutdown();
402    }
403}
404
405impl Drop for ProcessExecutor {
406    fn drop(&mut self) {
407        self.shutdown();
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    #[test]
416    fn test_process_executor_creation() {
417        let temp = tempfile::TempDir::new().unwrap();
418        let executor = ProcessExecutor::new(temp.path()).unwrap();
419        assert!(executor.cells.is_empty());
420    }
421
422    #[test]
423    #[ignore = "Requires venus-worker binary"]
424    fn test_process_executor_worker_pool() {
425        let temp = tempfile::TempDir::new().unwrap();
426        let executor = ProcessExecutor::with_warm_pool(temp.path(), 2).unwrap();
427        assert_eq!(executor.worker_pool.available_count(), 2);
428    }
429}