use anyhow::{Context, Result};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::Path;
use std::process::{Child, Command, Stdio};
use std::time::Duration;
pub struct SubprocessSession {
child: Child,
stdin: Option<BufWriter<std::process::ChildStdin>>,
reader_rx: Option<std::sync::mpsc::Receiver<ReaderMsg>>,
_reader_handle: Option<std::thread::JoinHandle<()>>,
timeout: Option<Duration>,
}
enum ReaderMsg {
Line(String),
Eof,
Error(std::io::Error),
}
impl SubprocessSession {
pub fn spawn(
program: &str,
args: &[&str],
env: &[(&str, &str)],
cwd: Option<&std::path::Path>,
timeout: Option<Duration>,
) -> Result<Self> {
let mut cmd = Command::new(program);
cmd.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit());
for (k, v) in env {
cmd.env(k, v);
}
if let Some(dir) = cwd {
cmd.current_dir(dir);
}
let mut child = cmd.spawn().map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
anyhow::anyhow!("{} not found on PATH", program)
} else {
anyhow::anyhow!("Failed to spawn {}: {}", program, e)
}
})?;
let stdin = BufWriter::new(child.stdin.take().unwrap());
let stdout = child.stdout.take().unwrap();
let (tx, rx) = std::sync::mpsc::channel();
let handle = std::thread::spawn(move || {
let mut reader = BufReader::new(stdout);
loop {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => {
let _ = tx.send(ReaderMsg::Eof);
break;
}
Ok(_) => {
if tx.send(ReaderMsg::Line(line)).is_err() {
break;
}
}
Err(e) => {
let _ = tx.send(ReaderMsg::Error(e));
break;
}
}
}
});
Ok(SubprocessSession {
child,
stdin: Some(stdin),
reader_rx: Some(rx),
_reader_handle: Some(handle),
timeout,
})
}
pub fn timeout(&self) -> Option<std::time::Duration> {
self.timeout
}
pub fn execute(&mut self, sentinel: &str, payload: &str) -> Result<String> {
let stdin = self.stdin.as_mut().context("Subprocess stdin closed")?;
write!(stdin, "{}_BEGIN\n{}\n{}_END\n", sentinel, payload, sentinel)
.context("Failed to send code to subprocess")?;
stdin.flush().context("Failed to flush stdin")?;
let done_marker = format!("{}_DONE", sentinel);
let mut output = String::new();
let timeout = &self.timeout;
let rx = self.reader_rx.as_ref().context("Reader channel closed")?;
loop {
let recv_result = match timeout {
Some(dur) => rx.recv_timeout(*dur),
None => rx
.recv()
.map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected),
};
match recv_result {
Ok(ReaderMsg::Line(line)) => {
let trimmed = line.trim_end_matches('\n').trim_end_matches('\r');
if trimmed == done_marker {
break;
}
output.push_str(&line);
}
Ok(ReaderMsg::Eof) => {
anyhow::bail!("Subprocess exited unexpectedly");
}
Ok(ReaderMsg::Error(e)) => {
anyhow::bail!("Failed to read from subprocess: {}", e);
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
let _ = self.child.kill();
anyhow::bail!(
"Code chunk timed out after {}s (set timeout in sidecar config.toml or CALEPIN_TIMEOUT env var)",
timeout.unwrap().as_secs()
);
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
anyhow::bail!("Subprocess reader thread terminated unexpectedly");
}
}
}
if output.ends_with('\n') {
output.pop();
}
Ok(format!("{}\n{}", sentinel, output))
}
}
pub fn spawn_script(
program: &str,
args_before_script: &[&str],
script: &str,
context_name: &str,
cwd: Option<&Path>,
timeout: Option<Duration>,
) -> Result<(SubprocessSession, tempfile::NamedTempFile)> {
let script_file = tempfile::NamedTempFile::new()
.with_context(|| format!("Failed to create temp file for {context_name} bootstrap"))?;
std::fs::write(script_file.path(), script)
.with_context(|| format!("Failed to write {context_name} bootstrap"))?;
let path_str = script_file.path().to_string_lossy().to_string();
let mut args = Vec::with_capacity(args_before_script.len() + 1);
args.extend(args_before_script.iter().copied());
args.push(path_str.as_str());
let proc = SubprocessSession::spawn(program, &args, &[], cwd, timeout)
.with_context(|| format!("Failed to start {context_name}"))?;
Ok((proc, script_file))
}
impl Drop for SubprocessSession {
fn drop(&mut self) {
drop(self.stdin.take());
drop(self.reader_rx.take());
let _ = self.child.wait();
}
}