Skip to main content

par_term_scripting/
process.rs

1//! Single script subprocess management.
2//!
3//! [`ScriptProcess`] manages the lifecycle of a single script subprocess, providing
4//! piped stdin/stdout/stderr communication. Stdout lines are parsed as JSON
5//! [`ScriptCommand`] objects, and stderr lines are collected for error reporting.
6
7use std::collections::HashMap;
8use std::io::{BufRead, BufReader, Write};
9use std::process::{Child, Command, Stdio};
10use std::sync::{Arc, Mutex};
11use std::thread::JoinHandle;
12
13use super::protocol::{ScriptCommand, ScriptEvent};
14
15/// Manages a single script subprocess with JSON-line communication.
16///
17/// The subprocess receives [`ScriptEvent`] objects serialized as JSON lines on stdin,
18/// and emits [`ScriptCommand`] objects as JSON lines on stdout. Stderr is captured
19/// separately for error reporting.
20pub struct ScriptProcess {
21    /// The child process handle, if still alive.
22    child: Option<Child>,
23    /// Writer to the child's stdin, if still open.
24    stdin_writer: Option<std::process::ChildStdin>,
25    /// Buffer of parsed commands read from the child's stdout.
26    command_buffer: Arc<Mutex<Vec<ScriptCommand>>>,
27    /// Buffer of error lines read from the child's stderr.
28    error_buffer: Arc<Mutex<Vec<String>>>,
29    /// Handle to the background thread reading stdout.
30    _stdout_thread: Option<JoinHandle<()>>,
31    /// Handle to the background thread reading stderr.
32    _stderr_thread: Option<JoinHandle<()>>,
33}
34
35impl ScriptProcess {
36    /// Spawn a script subprocess with piped stdin/stdout/stderr.
37    ///
38    /// Starts background threads to read stdout (parsing JSON into [`ScriptCommand`])
39    /// and stderr (collecting error lines).
40    ///
41    /// # Arguments
42    /// * `command` - The command to execute (e.g., "python3").
43    /// * `args` - Arguments to pass to the command.
44    /// * `env_vars` - Additional environment variables to set for the subprocess.
45    ///
46    /// # Errors
47    /// Returns an error string if the subprocess cannot be spawned.
48    pub fn spawn(
49        command: &str,
50        args: &[&str],
51        env_vars: &HashMap<String, String>,
52    ) -> Result<Self, String> {
53        let mut cmd = Command::new(command);
54        cmd.args(args)
55            .stdin(Stdio::piped())
56            .stdout(Stdio::piped())
57            .stderr(Stdio::piped())
58            .envs(env_vars);
59
60        let mut child = cmd
61            .spawn()
62            .map_err(|e| format!("Failed to spawn '{}': {}", command, e))?;
63
64        let stdin_writer = child.stdin.take();
65        let stdout = child
66            .stdout
67            .take()
68            .ok_or_else(|| "Failed to capture stdout".to_string())?;
69        let stderr = child
70            .stderr
71            .take()
72            .ok_or_else(|| "Failed to capture stderr".to_string())?;
73
74        let command_buffer: Arc<Mutex<Vec<ScriptCommand>>> = Arc::new(Mutex::new(Vec::new()));
75        let error_buffer: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
76
77        // Stdout reader thread: parse JSON lines into ScriptCommand
78        let cmd_buf = Arc::clone(&command_buffer);
79        let stdout_thread = std::thread::spawn(move || {
80            let reader = BufReader::new(stdout);
81            for line in reader.lines() {
82                match line {
83                    Ok(text) => {
84                        if text.is_empty() {
85                            continue;
86                        }
87                        match serde_json::from_str::<ScriptCommand>(&text) {
88                            Ok(cmd) => {
89                                let mut buf = cmd_buf.lock().unwrap_or_else(|e| {
90                                    log::warn!("command_buffer mutex poisoned, recovering");
91                                    e.into_inner()
92                                });
93                                buf.push(cmd);
94                            }
95                            Err(e) => {
96                                log::warn!(
97                                    "ScriptProcess: failed to parse stdout line as ScriptCommand: {}: {:?}",
98                                    e,
99                                    text
100                                );
101                            }
102                        }
103                    }
104                    Err(e) => {
105                        log::warn!("ScriptProcess: error reading stdout: {}", e);
106                        break;
107                    }
108                }
109            }
110        });
111
112        // Stderr reader thread: collect error lines
113        let err_buf = Arc::clone(&error_buffer);
114        let stderr_thread = std::thread::spawn(move || {
115            let reader = BufReader::new(stderr);
116            for line in reader.lines() {
117                match line {
118                    Ok(text) => {
119                        if text.is_empty() {
120                            continue;
121                        }
122                        let mut buf = err_buf.lock().unwrap_or_else(|e| {
123                            log::warn!("error_buffer mutex poisoned, recovering");
124                            e.into_inner()
125                        });
126                        buf.push(text);
127                    }
128                    Err(e) => {
129                        log::warn!("ScriptProcess: error reading stderr: {}", e);
130                        break;
131                    }
132                }
133            }
134        });
135
136        Ok(Self {
137            child: Some(child),
138            stdin_writer,
139            command_buffer,
140            error_buffer,
141            _stdout_thread: Some(stdout_thread),
142            _stderr_thread: Some(stderr_thread),
143        })
144    }
145
146    /// Check if the child process is still alive.
147    ///
148    /// Uses `try_wait()` to check without blocking. Returns `false` if the process
149    /// has exited or if there is no child process.
150    pub fn is_running(&mut self) -> bool {
151        match self.child.as_mut() {
152            Some(child) => match child.try_wait() {
153                Ok(Some(_status)) => false, // Process has exited
154                Ok(None) => true,           // Process still running
155                Err(_) => false,            // Error checking status
156            },
157            None => false,
158        }
159    }
160
161    /// Serialize a [`ScriptEvent`] to JSON and write it to the child's stdin as a line.
162    ///
163    /// # Errors
164    /// Returns an error if the stdin writer is not available or if the write fails.
165    pub fn send_event(&mut self, event: &ScriptEvent) -> Result<(), String> {
166        let stdin = self
167            .stdin_writer
168            .as_mut()
169            .ok_or_else(|| "stdin writer is not available".to_string())?;
170
171        let json = serde_json::to_string(event)
172            .map_err(|e| format!("Failed to serialize event: {}", e))?;
173
174        writeln!(stdin, "{}", json).map_err(|e| format!("Failed to write to stdin: {}", e))?;
175
176        stdin
177            .flush()
178            .map_err(|e| format!("Failed to flush stdin: {}", e))?;
179
180        Ok(())
181    }
182
183    /// Drain pending commands from the command buffer.
184    ///
185    /// Returns all commands that have been parsed from the child's stdout since the
186    /// last call to this method.
187    pub fn read_commands(&self) -> Vec<ScriptCommand> {
188        let mut buf = self.command_buffer.lock().unwrap_or_else(|e| {
189            log::warn!("command_buffer mutex poisoned, recovering");
190            e.into_inner()
191        });
192        buf.drain(..).collect()
193    }
194
195    /// Drain pending error lines from the error buffer.
196    ///
197    /// Returns all lines that have been read from the child's stderr since the
198    /// last call to this method.
199    pub fn read_errors(&self) -> Vec<String> {
200        let mut buf = self.error_buffer.lock().unwrap_or_else(|e| {
201            log::warn!("error_buffer mutex poisoned, recovering");
202            e.into_inner()
203        });
204        buf.drain(..).collect()
205    }
206
207    /// Stop the subprocess.
208    ///
209    /// Drops the stdin writer (sending EOF to the child), kills the child process
210    /// if it's still running, and waits for it to exit.
211    pub fn stop(&mut self) {
212        // Drop stdin to send EOF
213        self.stdin_writer.take();
214
215        if let Some(ref mut child) = self.child {
216            // Try to kill the child process
217            let _ = child.kill();
218            // Wait for the child to exit
219            let _ = child.wait();
220        }
221        self.child.take();
222    }
223}
224
225impl Drop for ScriptProcess {
226    fn drop(&mut self) {
227        self.stop();
228    }
229}