venus_core/ipc/
worker.rs

1//! Worker process management for Venus cell execution.
2//!
3//! Provides `WorkerHandle` for spawning and communicating with isolated
4//! worker processes, and `WorkerPool` for efficient worker reuse.
5
6use std::io::{BufReader, BufWriter};
7use std::path::PathBuf;
8use std::process::{Child, Command, Stdio};
9use std::sync::Arc;
10use std::time::Duration;
11
12use crate::error::{Error, Result};
13
14use super::protocol::{WorkerCommand, WorkerResponse, read_message, write_message};
15
16/// Handle to a worker process.
17///
18/// Provides methods to send commands, receive responses, and kill the process.
19pub struct WorkerHandle {
20    /// The child process.
21    child: Child,
22    /// Buffered stdin writer.
23    stdin: BufWriter<std::process::ChildStdin>,
24    /// Buffered stdout reader.
25    stdout: BufReader<std::process::ChildStdout>,
26    /// Whether the worker has been killed.
27    killed: bool,
28}
29
30impl WorkerHandle {
31    /// Spawn a new worker process.
32    ///
33    /// Looks for the `venus-worker` binary in the following order:
34    /// 1. `VENUS_WORKER_PATH` environment variable
35    /// 2. Same directory as the current executable
36    /// 3. System PATH
37    pub fn spawn() -> Result<Self> {
38        let worker_path = Self::find_worker_binary()?;
39
40        let mut child = Command::new(&worker_path)
41            .stdin(Stdio::piped())
42            .stdout(Stdio::piped())
43            .stderr(Stdio::inherit()) // Let worker stderr pass through for debugging
44            .spawn()
45            .map_err(|e| {
46                Error::Ipc(format!(
47                    "Failed to spawn worker process '{}': {}",
48                    worker_path.display(),
49                    e
50                ))
51            })?;
52
53        let stdin = child.stdin.take().ok_or_else(|| {
54            Error::Ipc("Failed to get worker stdin".to_string())
55        })?;
56        let stdout = child.stdout.take().ok_or_else(|| {
57            Error::Ipc("Failed to get worker stdout".to_string())
58        })?;
59
60        let mut handle = Self {
61            child,
62            stdin: BufWriter::new(stdin),
63            stdout: BufReader::new(stdout),
64            killed: false,
65        };
66
67        // Verify worker is alive with a ping
68        handle.send_command(&WorkerCommand::Ping)?;
69        match handle.recv_response()? {
70            WorkerResponse::Pong => Ok(handle),
71            other => Err(Error::Ipc(format!(
72                "Unexpected response from worker: {:?}",
73                other
74            ))),
75        }
76    }
77
78    /// Find the venus-worker binary path.
79    fn find_worker_binary() -> Result<PathBuf> {
80        // 1. Check environment variable
81        if let Ok(path) = std::env::var("VENUS_WORKER_PATH") {
82            let path = PathBuf::from(path);
83            if path.exists() {
84                return Ok(path);
85            }
86        }
87
88        // 2. Look next to current executable
89        if let Ok(exe_path) = std::env::current_exe()
90            && let Some(exe_dir) = exe_path.parent() {
91                let worker_name = if cfg!(windows) {
92                    "venus-worker.exe"
93                } else {
94                    "venus-worker"
95                };
96                let worker_path = exe_dir.join(worker_name);
97                if worker_path.exists() {
98                    return Ok(worker_path);
99                }
100            }
101
102        // 3. Try system PATH via which
103        let worker_name = if cfg!(windows) {
104            "venus-worker.exe"
105        } else {
106            "venus-worker"
107        };
108        if let Ok(path) = which::which(worker_name) {
109            return Ok(path);
110        }
111
112        // 4. For development: try target/debug or target/release
113        if let Ok(manifest_dir) = std::env::var("CARGO_MANIFEST_DIR") {
114            for profile in &["debug", "release"] {
115                let worker_name = if cfg!(windows) {
116                    "venus-worker.exe"
117                } else {
118                    "venus-worker"
119                };
120                let path = PathBuf::from(&manifest_dir)
121                    .join("..")
122                    .join("..")
123                    .join("target")
124                    .join(profile)
125                    .join(worker_name);
126                if path.exists() {
127                    return Ok(path.canonicalize().unwrap_or(path));
128                }
129            }
130        }
131
132        Err(Error::Ipc(
133            "Could not find venus-worker binary. Set VENUS_WORKER_PATH or ensure it's in PATH."
134                .to_string(),
135        ))
136    }
137
138    /// Send a command to the worker.
139    pub fn send_command(&mut self, cmd: &WorkerCommand) -> Result<()> {
140        if self.killed {
141            return Err(Error::Ipc("Worker has been killed".to_string()));
142        }
143        write_message(&mut self.stdin, cmd)
144    }
145
146    /// Receive a response from the worker.
147    pub fn recv_response(&mut self) -> Result<WorkerResponse> {
148        if self.killed {
149            return Err(Error::Ipc("Worker has been killed".to_string()));
150        }
151        read_message(&mut self.stdout)
152    }
153
154    /// Load a cell in the worker.
155    pub fn load_cell(
156        &mut self,
157        dylib_path: PathBuf,
158        dep_count: usize,
159        entry_symbol: String,
160        name: String,
161    ) -> Result<()> {
162        self.send_command(&WorkerCommand::LoadCell {
163            dylib_path: dylib_path.to_string_lossy().to_string(),
164            dep_count,
165            entry_symbol,
166            name,
167        })?;
168
169        match self.recv_response()? {
170            WorkerResponse::Loaded => Ok(()),
171            WorkerResponse::Error { message } => {
172                Err(Error::Execution(format!("Failed to load cell: {}", message)))
173            }
174            other => Err(Error::Ipc(format!(
175                "Unexpected response when loading cell: {:?}",
176                other
177            ))),
178        }
179    }
180
181    /// Execute the loaded cell with given inputs.
182    ///
183    /// Returns the raw output bytes on success.
184    pub fn execute(&mut self, inputs: Vec<Vec<u8>>) -> Result<Vec<u8>> {
185        self.execute_with_widgets(inputs, Vec::new()).map(|(bytes, _)| bytes)
186    }
187
188    /// Execute the loaded cell with given inputs and widget values.
189    ///
190    /// Returns the raw output bytes and widget definitions JSON on success.
191    pub fn execute_with_widgets(
192        &mut self,
193        inputs: Vec<Vec<u8>>,
194        widget_values_json: Vec<u8>,
195    ) -> Result<(Vec<u8>, Vec<u8>)> {
196        self.send_command(&WorkerCommand::Execute { inputs, widget_values_json })?;
197
198        match self.recv_response()? {
199            WorkerResponse::Output { bytes, widgets_json } => Ok((bytes, widgets_json)),
200            WorkerResponse::Error { message } => {
201                Err(Error::Execution(message))
202            }
203            WorkerResponse::Panic { message } => {
204                Err(Error::Execution(format!(
205                    "Cell panicked: {}. Check for unwrap() on None/Err, out-of-bounds access, or other panic sources.",
206                    message
207                )))
208            }
209            other => Err(Error::Ipc(format!(
210                "Unexpected response when executing: {:?}",
211                other
212            ))),
213        }
214    }
215
216    /// Kill the worker process immediately.
217    ///
218    /// This is the key feature for interruption - we can terminate
219    /// the worker mid-computation without any cooperation from the cell.
220    pub fn kill(&mut self) -> Result<()> {
221        if self.killed {
222            return Ok(());
223        }
224
225        self.killed = true;
226
227        // Try graceful shutdown first (with short timeout)
228        // This allows any cleanup to happen
229        let _ = self.send_command(&WorkerCommand::Shutdown);
230
231        // Give it a moment to shutdown gracefully
232        std::thread::sleep(Duration::from_millis(10));
233
234        // Force kill if still running
235        if let Err(e) = self.child.kill() {
236            // ESRCH (No such process) means process already exited, which is fine
237            // Check raw OS error: 3 on Unix (ESRCH), 87 on Windows (ERROR_INVALID_PARAMETER)
238            let is_already_dead = e.raw_os_error().map_or(false, |code| {
239                cfg!(unix) && code == 3 || cfg!(windows) && code == 87
240            });
241
242            if !is_already_dead {
243                tracing::warn!("Failed to kill worker: {}", e);
244            }
245        }
246
247        // Wait to reap zombie
248        let _ = self.child.wait();
249
250        Ok(())
251    }
252
253    /// Check if the worker process is still running.
254    pub fn is_alive(&mut self) -> bool {
255        if self.killed {
256            return false;
257        }
258        matches!(self.child.try_wait(), Ok(None))
259    }
260
261    /// Get the process ID of the worker.
262    pub fn pid(&self) -> u32 {
263        self.child.id()
264    }
265
266    /// Graceful shutdown - ask worker to exit cleanly.
267    pub fn shutdown(mut self) -> Result<()> {
268        if self.killed {
269            return Ok(());
270        }
271
272        let _ = self.send_command(&WorkerCommand::Shutdown);
273
274        // Wait for acknowledgement with timeout
275        // Note: We can't easily do timeout on blocking read,
276        // so we just wait for the process to exit
277        match self.child.wait() {
278            Ok(status) => {
279                if status.success() {
280                    Ok(())
281                } else {
282                    Err(Error::Ipc(format!(
283                        "Worker exited with status: {}",
284                        status
285                    )))
286                }
287            }
288            Err(e) => Err(Error::Ipc(format!("Failed to wait for worker: {}", e))),
289        }
290    }
291}
292
293impl Drop for WorkerHandle {
294    fn drop(&mut self) {
295        // Ensure worker is killed when handle is dropped
296        let _ = self.kill();
297    }
298}
299
300/// Pool of reusable worker processes.
301///
302/// Maintains a set of warm workers to avoid spawn overhead.
303/// Workers are recycled after each cell execution.
304pub struct WorkerPool {
305    /// Available workers ready for use.
306    available: Vec<WorkerHandle>,
307    /// Maximum pool size.
308    max_size: usize,
309}
310
311impl WorkerPool {
312    /// Create a new worker pool.
313    pub fn new(max_size: usize) -> Self {
314        Self {
315            available: Vec::with_capacity(max_size),
316            max_size,
317        }
318    }
319
320    /// Create a pool and pre-warm with N workers.
321    pub fn with_warm_workers(max_size: usize, warm_count: usize) -> Result<Self> {
322        let mut pool = Self::new(max_size);
323        for _ in 0..warm_count.min(max_size) {
324            let worker = WorkerHandle::spawn()?;
325            pool.available.push(worker);
326        }
327        Ok(pool)
328    }
329
330    /// Get a worker from the pool, spawning if necessary.
331    pub fn get(&mut self) -> Result<WorkerHandle> {
332        // Try to reuse an existing worker
333        while let Some(mut worker) = self.available.pop() {
334            if worker.is_alive() {
335                return Ok(worker);
336            }
337            // Worker died, try next one
338        }
339
340        // No available workers, spawn a new one
341        WorkerHandle::spawn()
342    }
343
344    /// Return a worker to the pool for reuse.
345    ///
346    /// If the pool is full, the worker is dropped (killed).
347    pub fn put(&mut self, mut worker: WorkerHandle) {
348        if !worker.is_alive() {
349            return;
350        }
351
352        if self.available.len() < self.max_size {
353            self.available.push(worker);
354        }
355        // Otherwise worker is dropped and killed
356    }
357
358    /// Kill all workers in the pool.
359    pub fn shutdown(&mut self) {
360        for mut worker in self.available.drain(..) {
361            let _ = worker.kill();
362        }
363    }
364
365    /// Get the number of available workers.
366    pub fn available_count(&self) -> usize {
367        self.available.len()
368    }
369}
370
371impl Drop for WorkerPool {
372    fn drop(&mut self) {
373        self.shutdown();
374    }
375}
376
377/// Thread-safe handle for killing a worker from another thread.
378///
379/// Used for interrupt handling in async contexts.
380#[derive(Clone)]
381pub struct WorkerKillHandle {
382    /// Process ID of the worker.
383    pid: u32,
384    /// Whether the kill has been requested.
385    killed: Arc<std::sync::atomic::AtomicBool>,
386}
387
388impl WorkerKillHandle {
389    /// Create a kill handle for a worker.
390    pub fn new(worker: &WorkerHandle) -> Self {
391        Self {
392            pid: worker.pid(),
393            killed: Arc::new(std::sync::atomic::AtomicBool::new(false)),
394        }
395    }
396
397    /// Kill the worker process.
398    ///
399    /// This can be called from any thread and will immediately
400    /// terminate the worker process.
401    pub fn kill(&self) {
402        if self.killed.swap(true, std::sync::atomic::Ordering::SeqCst) {
403            tracing::debug!("Worker {} already killed, skipping", self.pid);
404            return; // Already killed
405        }
406
407        tracing::info!("Sending SIGKILL to worker process {}", self.pid);
408
409        #[cfg(unix)]
410        {
411            // SIGKILL for immediate termination
412            unsafe {
413                let result = libc::kill(self.pid as i32, libc::SIGKILL);
414                if result != 0 {
415                    tracing::warn!("Failed to kill worker {}: errno={}", self.pid, *libc::__errno_location());
416                } else {
417                    tracing::info!("SIGKILL sent successfully to worker {}", self.pid);
418                }
419            }
420        }
421
422        #[cfg(windows)]
423        {
424            use windows::Win32::Foundation::CloseHandle;
425            use windows::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
426
427            unsafe {
428                if let Ok(handle) = OpenProcess(PROCESS_TERMINATE, false, self.pid) {
429                    let _ = TerminateProcess(handle, 1);
430                    let _ = CloseHandle(handle);
431                }
432            }
433        }
434    }
435
436    /// Check if kill has been requested.
437    pub fn is_killed(&self) -> bool {
438        self.killed.load(std::sync::atomic::Ordering::SeqCst)
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    // Note: These tests require the venus-worker binary to be built.
447    // Run `cargo build -p venus-worker` first.
448
449    #[test]
450    #[ignore = "Requires venus-worker binary"]
451    fn test_worker_spawn_and_ping() {
452        let worker = WorkerHandle::spawn().unwrap();
453        assert!(worker.pid() > 0);
454    }
455
456    #[test]
457    #[ignore = "Requires venus-worker binary"]
458    fn test_worker_pool() {
459        let mut pool = WorkerPool::new(4);
460        let worker1 = pool.get().unwrap();
461        let pid1 = worker1.pid();
462        pool.put(worker1);
463
464        let worker2 = pool.get().unwrap();
465        assert_eq!(worker2.pid(), pid1); // Same worker reused
466    }
467}