use anyhow::{Context, Result, anyhow};
use serde_json::Value;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::Mutex;
use super::{Transport, is_notification};
pub struct StdioTransport {
_child: Mutex<Child>,
stdin: Mutex<ChildStdin>,
stdout: Mutex<BufReader<ChildStdout>>,
}
impl StdioTransport {
pub async fn new(cmd: &str) -> Result<Self> {
let parts: Vec<&str> = cmd.split_whitespace().collect();
if parts.is_empty() {
anyhow::bail!("--cmd is empty");
}
Self::spawn_argv(parts[0], &parts[1..]).await
}
pub async fn spawn_argv(program: &str, args: &[&str]) -> Result<Self> {
let mut child = Command::new(program)
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.with_context(|| format!("failed to spawn subprocess: {program}"))?;
let stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("failed to capture subprocess stdin"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("failed to capture subprocess stdout"))?;
Ok(Self {
_child: Mutex::new(child),
stdin: Mutex::new(stdin),
stdout: Mutex::new(BufReader::new(stdout)),
})
}
}
#[async_trait::async_trait]
impl Transport for StdioTransport {
async fn send(&self, request: Value) -> Result<Value> {
let notif = is_notification(&request);
let mut line = serde_json::to_string(&request)?;
line.push('\n');
{
let mut stdin = self.stdin.lock().await;
stdin
.write_all(line.as_bytes())
.await
.context("writing request to subprocess stdin")?;
stdin.flush().await.context("flushing subprocess stdin")?;
}
if notif {
return Ok(Value::Null);
}
let mut stdout = self.stdout.lock().await;
loop {
let mut buf = String::new();
let n = stdout
.read_line(&mut buf)
.await
.context("reading subprocess stdout")?;
if n == 0 {
anyhow::bail!("subprocess closed stdout before responding");
}
let trimmed = buf.trim();
if trimmed.is_empty() {
continue;
}
if !trimmed.starts_with('{') {
tracing::debug!(line = %trimmed, "skipping non-JSON line from subprocess");
continue;
}
let val: Value = serde_json::from_str(trimmed)
.with_context(|| format!("parsing JSON response: {trimmed}"))?;
return Ok(val);
}
}
}