use anyhow::{Context, Result};
use std::path::Path;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::Mutex;
use tokio::time::{Duration, timeout};
type OutputBuffer = Arc<Mutex<Vec<u8>>>;
pub struct HeadlessHandle {
stdin: Arc<Mutex<Option<ChildStdin>>>,
output: OutputBuffer,
child: Arc<Mutex<Option<Child>>>,
}
impl HeadlessHandle {
pub async fn spawn_shell<'a>(
shell: &str,
working_dir: &Path,
env: impl IntoIterator<Item = (&'a String, &'a String)>,
) -> Result<Self> {
let mut command = Command::new(shell);
command
.current_dir(working_dir)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
for (key, value) in env {
command.env(key, value);
}
let mut child = command.spawn().context("Failed to spawn headless shell")?;
let stdin = child
.stdin
.take()
.context("Missing stdin for headless shell")?;
let stdout = child
.stdout
.take()
.context("Missing stdout for headless shell")?;
let stderr = child
.stderr
.take()
.context("Missing stderr for headless shell")?;
let output = Arc::new(Mutex::new(Vec::new()));
let handle = Self {
stdin: Arc::new(Mutex::new(Some(stdin))),
output: output.clone(),
child: Arc::new(Mutex::new(Some(child))),
};
spawn_output_task(stdout, output.clone());
spawn_output_task(stderr, output);
Ok(handle)
}
pub async fn write(&self, data: &[u8]) -> Result<()> {
let mut stdin_guard = self.stdin.lock().await;
if let Some(stdin) = stdin_guard.as_mut() {
stdin.write_all(data).await?;
stdin.flush().await?;
Ok(())
} else {
Err(anyhow::anyhow!("Headless shell stdin unavailable"))
}
}
pub async fn read(&self) -> Result<Vec<u8>> {
let mut buffer = self.output.lock().await;
if buffer.is_empty() {
return Ok(Vec::new());
}
let data = buffer.clone();
buffer.clear();
Ok(data)
}
pub async fn read_with_timeout(&self, timeout_ms: u64) -> Result<Vec<u8>> {
match timeout(Duration::from_millis(timeout_ms), self.read()).await {
Ok(result) => result,
Err(_) => Ok(Vec::new()),
}
}
pub async fn is_running(&self) -> bool {
let mut guard = self.child.lock().await;
if let Some(child) = guard.as_mut() {
matches!(child.try_wait(), Ok(None))
} else {
false
}
}
pub async fn shutdown(self) -> Result<()> {
if let Some(mut child) = self.child.lock().await.take() {
let _ = child.kill().await;
}
Ok(())
}
}
fn spawn_output_task<R>(mut reader: R, output: OutputBuffer)
where
R: AsyncRead + Unpin + Send + 'static,
{
tokio::spawn(async move {
let mut buffer = vec![0u8; 4096];
loop {
match reader.read(&mut buffer).await {
Ok(0) => break,
Ok(n) => {
let mut out = output.lock().await;
out.extend_from_slice(&buffer[..n]);
if out.len() > 1_048_576 {
let drain = out.len() - 1_048_576;
out.drain(..drain);
}
}
Err(err) => {
tracing::debug!("Headless shell read error: {}", err);
break;
}
}
}
});
}