oxur_repl/executor/
subprocess.rs1use crate::metrics::{RestartReason, SubprocessMetrics};
9use std::collections::HashSet;
10use std::io::{BufRead, BufReader, Write};
11use std::path::{Path, PathBuf};
12use std::process::{Child, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
13use thiserror::Error;
14
15#[derive(Debug, Error)]
17pub enum ExecutorError {
18 #[error("Failed to spawn subprocess: {0}")]
19 SpawnFailed(#[from] std::io::Error),
20
21 #[error("Subprocess not running")]
22 NotRunning,
23
24 #[error("Failed to send command to subprocess: {0}")]
25 CommandFailed(String),
26
27 #[error("Failed to read response from subprocess: {0}")]
28 ResponseFailed(String),
29
30 #[error("Library load failed: {0}")]
31 LoadFailed(String),
32
33 #[error("Execution failed: {0}")]
34 ExecutionFailed(String),
35
36 #[error("Runtime error in subprocess: {0}")]
37 RuntimeError(String),
38
39 #[error("Panic in subprocess: {0}")]
40 Panic(String),
41}
42
43pub type Result<T> = std::result::Result<T, ExecutorError>;
44
45#[derive(Debug, Clone, PartialEq)]
47pub enum ExecutionResult {
48 Success { output: String },
50
51 RuntimeError { message: String },
53
54 Panic { location: String, message: String },
56}
57
58#[derive(Debug)]
110pub struct SubprocessExecutor {
111 child: Option<Child>,
113
114 stdin: Option<ChildStdin>,
116
117 stdout: Option<BufReader<ChildStdout>>,
119
120 loaded_libraries: HashSet<String>,
122
123 subprocess_binary: PathBuf,
125
126 metrics: SubprocessMetrics,
128}
129
130impl SubprocessExecutor {
131 pub fn new() -> Result<Self> {
139 let subprocess_binary = Self::find_subprocess_binary()?;
140
141 let mut executor = Self {
142 child: None,
143 stdin: None,
144 stdout: None,
145 loaded_libraries: HashSet::new(),
146 subprocess_binary,
147 metrics: SubprocessMetrics::new(),
148 };
149
150 executor.spawn()?;
151 Ok(executor)
152 }
153
154 fn find_subprocess_binary() -> Result<PathBuf> {
161 if let Ok(exe) = std::env::current_exe() {
163 if let Some(parent) = exe.parent() {
164 let subprocess = parent.join("oxur-repl-subprocess");
165 if subprocess.exists() {
166 return Ok(subprocess);
167 }
168 }
169 }
170
171 if let Ok(path) = which::which("oxur-repl-subprocess") {
173 return Ok(path);
174 }
175
176 let target_debug = PathBuf::from("target/debug/oxur-repl-subprocess");
178 if target_debug.exists() {
179 return Ok(target_debug);
180 }
181
182 let target_release = PathBuf::from("target/release/oxur-repl-subprocess");
183 if target_release.exists() {
184 return Ok(target_release);
185 }
186
187 let workspace_debug = PathBuf::from("../../target/debug/oxur-repl-subprocess");
189 if workspace_debug.exists() {
190 return Ok(workspace_debug);
191 }
192
193 let workspace_release = PathBuf::from("../../target/release/oxur-repl-subprocess");
194 if workspace_release.exists() {
195 return Ok(workspace_release);
196 }
197
198 let deep_workspace_debug = PathBuf::from("../../../target/debug/oxur-repl-subprocess");
200 if deep_workspace_debug.exists() {
201 return Ok(deep_workspace_debug);
202 }
203
204 Ok(PathBuf::from("oxur-repl-subprocess"))
206 }
207
208 fn spawn(&mut self) -> Result<()> {
210 let mut child = Command::new(&self.subprocess_binary)
211 .stdin(Stdio::piped())
212 .stdout(Stdio::piped())
213 .stderr(Stdio::inherit()) .spawn()?;
215
216 let stdin = child.stdin.take().expect("Failed to take stdin");
217 let stdout = child.stdout.take().expect("Failed to take stdout");
218
219 self.child = Some(child);
220 self.stdin = Some(stdin);
221 self.stdout = Some(BufReader::new(stdout));
222
223 self.metrics.process_started();
225
226 Ok(())
227 }
228
229 pub fn is_loaded(&self, cache_key: impl AsRef<str>) -> bool {
231 self.loaded_libraries.contains(cache_key.as_ref())
232 }
233
234 pub fn load_library(&mut self, path: &Path, cache_key: impl AsRef<str>) -> Result<()> {
251 let cache_key = cache_key.as_ref();
252
253 if self.is_loaded(cache_key) {
255 return Ok(());
256 }
257
258 let command = format!("LOAD {} {}\n", cache_key, path.display());
260 self.send_command(&command)?;
261
262 let response = loop {
264 let line = self.read_line()?;
265 if !line.trim().is_empty() {
266 break line;
267 }
268 };
269
270 if response == "LOADED" {
271 self.loaded_libraries.insert(cache_key.to_string());
273 Ok(())
274 } else if let Some(error) = response.strip_prefix("LOAD_ERROR ") {
275 Err(ExecutorError::LoadFailed(error.to_string()))
276 } else {
277 Err(ExecutorError::LoadFailed(format!("Unexpected response: {}", response)))
278 }
279 }
280
281 pub fn execute(&mut self, cache_key: impl AsRef<str>) -> Result<ExecutionResult> {
300 let cache_key = cache_key.as_ref();
301
302 let command = format!("RUN {}\n", cache_key);
304 self.send_command(&command)?;
305
306 let mut result_value = String::new();
308
309 loop {
310 let response = self.read_line()?;
311
312 if response.trim().is_empty() {
314 continue;
315 }
316
317 match response.as_str() {
318 "OXUR_EXECUTION_COMPLETE" => {
319 return Ok(ExecutionResult::Success { output: result_value });
320 }
321 line if line.starts_with("OXUR_RESULT ") => {
322 result_value = line.strip_prefix("OXUR_RESULT ").unwrap_or("").to_string();
323 }
325 line if line.starts_with("OXUR_RUNTIME_ERROR ") => {
326 let message =
327 line.strip_prefix("OXUR_RUNTIME_ERROR ").unwrap_or("").to_string();
328 return Ok(ExecutionResult::RuntimeError { message });
329 }
330 line if line.starts_with("OXUR_PANIC_LOCATION ") => {
331 let location =
332 line.strip_prefix("OXUR_PANIC_LOCATION ").unwrap_or("unknown").to_string();
333
334 let message_line = self.read_line()?;
336 let message = message_line
337 .strip_prefix("OXUR_PANIC_MESSAGE ")
338 .unwrap_or("Unknown panic")
339 .to_string();
340
341 return Ok(ExecutionResult::Panic { location, message });
342 }
343 _ => {
344 return Err(ExecutorError::ExecutionFailed(format!(
345 "Unexpected response: {}",
346 response
347 )));
348 }
349 }
350 }
351 }
352
353 fn send_command(&mut self, command: &str) -> Result<()> {
355 let stdin = self.stdin.as_mut().ok_or(ExecutorError::NotRunning)?;
356
357 stdin
358 .write_all(command.as_bytes())
359 .map_err(|e| ExecutorError::CommandFailed(e.to_string()))?;
360
361 stdin.flush().map_err(|e| ExecutorError::CommandFailed(format!("flush failed: {}", e)))?;
362
363 Ok(())
364 }
365
366 fn read_line(&mut self) -> Result<String> {
368 let stdout = self.stdout.as_mut().ok_or(ExecutorError::NotRunning)?;
369
370 let mut line = String::new();
371 stdout.read_line(&mut line).map_err(|e| ExecutorError::ResponseFailed(e.to_string()))?;
372
373 Ok(line.trim().to_string())
374 }
375
376 pub fn shutdown(&mut self) -> Result<Option<ExitStatus>> {
382 self.stdin.take();
384
385 let exit_status =
387 if let Some(mut child) = self.child.take() { child.wait().ok() } else { None };
388
389 self.stdout.take();
390 self.loaded_libraries.clear();
391
392 Ok(exit_status)
393 }
394
395 pub fn restart(&mut self) -> Result<RestartReason> {
402 let exit_status = self.shutdown()?;
403 let reason =
404 exit_status.map(RestartReason::from_exit_status).unwrap_or(RestartReason::Unknown);
405
406 self.metrics.record_restart(reason);
407 self.spawn()?;
408 Ok(reason)
409 }
410
411 pub fn restart_user_requested(&mut self) -> Result<()> {
415 let _ = self.shutdown()?;
416 self.metrics.record_restart(RestartReason::UserRequested);
417 self.spawn()?;
418 Ok(())
419 }
420
421 pub fn loaded_count(&self) -> usize {
423 self.loaded_libraries.len()
424 }
425
426 pub fn is_running(&self) -> bool {
428 self.child.is_some()
429 }
430
431 pub fn uptime_seconds(&self) -> f64 {
433 self.metrics.uptime_seconds()
434 }
435
436 pub fn restart_count(&self) -> u64 {
438 self.metrics.restart_count()
439 }
440
441 pub fn update_metrics(&self) {
445 self.metrics.update_uptime_gauge();
446 }
447
448 pub fn metrics(&self) -> &SubprocessMetrics {
450 &self.metrics
451 }
452}
453
454impl Drop for SubprocessExecutor {
455 fn drop(&mut self) {
456 let _ = self.shutdown();
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 #[test]
465 fn test_executor_creation() {
466 let executor = SubprocessExecutor::new();
468
469 match executor {
471 Ok(exec) => {
472 assert!(exec.is_running());
473 }
474 Err(e) => {
475 eprintln!(
477 "Note: Subprocess binary not found (expected in some test environments): {}",
478 e
479 );
480 }
481 }
482 }
483
484 #[test]
485 fn test_executor_is_loaded() {
486 if let Ok(executor) = SubprocessExecutor::new() {
487 assert!(!executor.is_loaded("test_key"));
489 assert!(!executor.is_loaded("another_key"));
490
491 }
494 }
495
496 #[test]
497 fn test_executor_loaded_count() {
498 if let Ok(executor) = SubprocessExecutor::new() {
499 assert_eq!(executor.loaded_count(), 0);
501
502 }
505 }
506
507 #[test]
508 fn test_executor_shutdown() {
509 if let Ok(mut executor) = SubprocessExecutor::new() {
510 assert!(executor.is_running());
511
512 executor.shutdown().expect("Shutdown failed");
513 assert!(!executor.is_running());
514 }
515 }
516
517 #[test]
518 fn test_executor_restart() {
519 if let Ok(mut executor) = SubprocessExecutor::new() {
520 assert_eq!(executor.loaded_count(), 0);
522 assert!(executor.is_running());
523 assert_eq!(executor.restart_count(), 0);
524
525 let reason = executor.restart().expect("Restart failed");
527 assert!(reason.is_clean(), "Expected clean restart, got {:?}", reason);
529
530 assert_eq!(executor.loaded_count(), 0);
532 assert!(executor.is_running());
533 assert_eq!(executor.restart_count(), 1);
534 }
535 }
536
537 #[test]
538 fn test_executor_restart_user_requested() {
539 if let Ok(mut executor) = SubprocessExecutor::new() {
540 assert_eq!(executor.restart_count(), 0);
541
542 executor.restart_user_requested().expect("Restart failed");
544
545 assert!(executor.is_running());
546 assert_eq!(executor.restart_count(), 1);
547 }
548 }
549
550 #[test]
551 fn test_executor_uptime() {
552 if let Ok(executor) = SubprocessExecutor::new() {
553 let uptime = executor.uptime_seconds();
555 assert!(uptime >= 0.0, "Uptime should be non-negative");
556 }
557 }
558
559 #[test]
560 fn test_executor_execute_unloaded() {
561 if let Ok(mut executor) = SubprocessExecutor::new() {
562 let result = executor.execute("nonexistent_key");
565
566 match result {
569 Ok(ExecutionResult::RuntimeError { .. }) => {
570 }
572 Err(_) => {
573 }
575 Ok(ExecutionResult::Success { .. }) => {
576 panic!("Should not succeed when executing unloaded code");
577 }
578 Ok(ExecutionResult::Panic { .. }) => {
579 }
581 }
582 }
583 }
584}