par_term_scripting/process.rs
1//! Single script subprocess management.
2//!
3//! [`ScriptProcess`] manages the lifecycle of a single script subprocess, providing
4//! piped stdin/stdout/stderr communication. Stdout lines are parsed as JSON
5//! [`ScriptCommand`] objects, and stderr lines are collected for error reporting.
6
7use std::collections::HashMap;
8use std::io::{BufRead, BufReader, Write};
9use std::process::{Child, Command, Stdio};
10use std::sync::{Arc, Mutex};
11use std::thread::JoinHandle;
12
13use super::protocol::{ScriptCommand, ScriptEvent};
14
15/// Manages a single script subprocess with JSON-line communication.
16///
17/// The subprocess receives [`ScriptEvent`] objects serialized as JSON lines on stdin,
18/// and emits [`ScriptCommand`] objects as JSON lines on stdout. Stderr is captured
19/// separately for error reporting.
20pub struct ScriptProcess {
21 /// The child process handle, if still alive.
22 child: Option<Child>,
23 /// Writer to the child's stdin, if still open.
24 stdin_writer: Option<std::process::ChildStdin>,
25 /// Buffer of parsed commands read from the child's stdout.
26 command_buffer: Arc<Mutex<Vec<ScriptCommand>>>,
27 /// Buffer of error lines read from the child's stderr.
28 error_buffer: Arc<Mutex<Vec<String>>>,
29 /// Handle to the background thread reading stdout.
30 _stdout_thread: Option<JoinHandle<()>>,
31 /// Handle to the background thread reading stderr.
32 _stderr_thread: Option<JoinHandle<()>>,
33}
34
35impl ScriptProcess {
36 /// Spawn a script subprocess with piped stdin/stdout/stderr.
37 ///
38 /// Starts background threads to read stdout (parsing JSON into [`ScriptCommand`])
39 /// and stderr (collecting error lines).
40 ///
41 /// # Arguments
42 /// * `command` - The command to execute (e.g., "python3").
43 /// * `args` - Arguments to pass to the command.
44 /// * `env_vars` - Additional environment variables to set for the subprocess.
45 ///
46 /// # Errors
47 /// Returns an error string if the subprocess cannot be spawned.
48 pub fn spawn(
49 command: &str,
50 args: &[&str],
51 env_vars: &HashMap<String, String>,
52 ) -> Result<Self, String> {
53 let mut cmd = Command::new(command);
54 cmd.args(args)
55 .stdin(Stdio::piped())
56 .stdout(Stdio::piped())
57 .stderr(Stdio::piped())
58 .envs(env_vars);
59
60 let mut child = cmd
61 .spawn()
62 .map_err(|e| format!("Failed to spawn '{}': {}", command, e))?;
63
64 let stdin_writer = child.stdin.take();
65 let stdout = child
66 .stdout
67 .take()
68 .ok_or_else(|| "Failed to capture stdout".to_string())?;
69 let stderr = child
70 .stderr
71 .take()
72 .ok_or_else(|| "Failed to capture stderr".to_string())?;
73
74 let command_buffer: Arc<Mutex<Vec<ScriptCommand>>> = Arc::new(Mutex::new(Vec::new()));
75 let error_buffer: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
76
77 // Stdout reader thread: parse JSON lines into ScriptCommand
78 let cmd_buf = Arc::clone(&command_buffer);
79 let stdout_thread = std::thread::spawn(move || {
80 let reader = BufReader::new(stdout);
81 for line in reader.lines() {
82 match line {
83 Ok(text) => {
84 if text.is_empty() {
85 continue;
86 }
87 match serde_json::from_str::<ScriptCommand>(&text) {
88 Ok(cmd) => {
89 let mut buf = cmd_buf.lock().unwrap_or_else(|e| {
90 log::warn!("command_buffer mutex poisoned, recovering");
91 e.into_inner()
92 });
93 buf.push(cmd);
94 }
95 Err(e) => {
96 log::warn!(
97 "ScriptProcess: failed to parse stdout line as ScriptCommand: {}: {:?}",
98 e,
99 text
100 );
101 }
102 }
103 }
104 Err(e) => {
105 log::warn!("ScriptProcess: error reading stdout: {}", e);
106 break;
107 }
108 }
109 }
110 });
111
112 // Stderr reader thread: collect error lines
113 let err_buf = Arc::clone(&error_buffer);
114 let stderr_thread = std::thread::spawn(move || {
115 let reader = BufReader::new(stderr);
116 for line in reader.lines() {
117 match line {
118 Ok(text) => {
119 if text.is_empty() {
120 continue;
121 }
122 let mut buf = err_buf.lock().unwrap_or_else(|e| {
123 log::warn!("error_buffer mutex poisoned, recovering");
124 e.into_inner()
125 });
126 buf.push(text);
127 }
128 Err(e) => {
129 log::warn!("ScriptProcess: error reading stderr: {}", e);
130 break;
131 }
132 }
133 }
134 });
135
136 Ok(Self {
137 child: Some(child),
138 stdin_writer,
139 command_buffer,
140 error_buffer,
141 _stdout_thread: Some(stdout_thread),
142 _stderr_thread: Some(stderr_thread),
143 })
144 }
145
146 /// Check if the child process is still alive.
147 ///
148 /// Uses `try_wait()` to check without blocking. Returns `false` if the process
149 /// has exited or if there is no child process.
150 pub fn is_running(&mut self) -> bool {
151 match self.child.as_mut() {
152 Some(child) => match child.try_wait() {
153 Ok(Some(_status)) => false, // Process has exited
154 Ok(None) => true, // Process still running
155 Err(_) => false, // Error checking status
156 },
157 None => false,
158 }
159 }
160
161 /// Serialize a [`ScriptEvent`] to JSON and write it to the child's stdin as a line.
162 ///
163 /// # Errors
164 /// Returns an error if the stdin writer is not available or if the write fails.
165 pub fn send_event(&mut self, event: &ScriptEvent) -> Result<(), String> {
166 let stdin = self
167 .stdin_writer
168 .as_mut()
169 .ok_or_else(|| "stdin writer is not available".to_string())?;
170
171 let json = serde_json::to_string(event)
172 .map_err(|e| format!("Failed to serialize event: {}", e))?;
173
174 writeln!(stdin, "{}", json).map_err(|e| format!("Failed to write to stdin: {}", e))?;
175
176 stdin
177 .flush()
178 .map_err(|e| format!("Failed to flush stdin: {}", e))?;
179
180 Ok(())
181 }
182
183 /// Drain pending commands from the command buffer.
184 ///
185 /// Returns all commands that have been parsed from the child's stdout since the
186 /// last call to this method.
187 pub fn read_commands(&self) -> Vec<ScriptCommand> {
188 let mut buf = self.command_buffer.lock().unwrap_or_else(|e| {
189 log::warn!("command_buffer mutex poisoned, recovering");
190 e.into_inner()
191 });
192 buf.drain(..).collect()
193 }
194
195 /// Drain pending error lines from the error buffer.
196 ///
197 /// Returns all lines that have been read from the child's stderr since the
198 /// last call to this method.
199 pub fn read_errors(&self) -> Vec<String> {
200 let mut buf = self.error_buffer.lock().unwrap_or_else(|e| {
201 log::warn!("error_buffer mutex poisoned, recovering");
202 e.into_inner()
203 });
204 buf.drain(..).collect()
205 }
206
207 /// Stop the subprocess.
208 ///
209 /// Drops the stdin writer (sending EOF to the child), kills the child process
210 /// if it's still running, and waits for it to exit.
211 pub fn stop(&mut self) {
212 // Drop stdin to send EOF
213 self.stdin_writer.take();
214
215 if let Some(ref mut child) = self.child {
216 // Try to kill the child process
217 let _ = child.kill();
218 // Wait for the child to exit
219 let _ = child.wait();
220 }
221 self.child.take();
222 }
223}
224
225impl Drop for ScriptProcess {
226 fn drop(&mut self) {
227 self.stop();
228 }
229}