#![allow(dead_code)]
use std::collections::{HashMap, VecDeque};
use std::ffi::OsStr;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Read, Write};
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdout, Command, ExitStatus, Stdio};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use thiserror::Error;
use wait_timeout::ChildExt;
pub const DEFAULT_BIN: &str = "claude";
pub const PREFLIGHT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Error)]
pub enum Error {
#[error(
"claude binary not found on PATH\n\
hint: install Claude Code and ensure `claude` is on PATH"
)]
HostMissing,
#[error("failed to launch claude: {source}")]
Spawn {
#[source]
source: std::io::Error,
},
#[error("claude did not exit within {elapsed:?}")]
Timeout { elapsed: Duration },
#[error("claude exited with {status}\nstderr: {stderr}")]
ExitedNonZero { status: ExitStatus, stderr: String },
#[error("I/O error on claude stream: {source}")]
Io {
#[source]
source: std::io::Error,
},
#[error("capture I/O on {path}: {source}")]
Capture {
path: PathBuf,
#[source]
source: std::io::Error,
},
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Session {
New(String),
Resume(String),
}
#[derive(Debug, Clone)]
pub struct SpawnOpts {
pub prompt: String,
pub cwd: PathBuf,
pub model: Option<String>,
pub allowed_tools: Vec<String>,
pub session: Option<Session>,
pub extra_args: Vec<String>,
pub bin: Option<PathBuf>,
}
impl SpawnOpts {
pub fn new(prompt: impl Into<String>, cwd: impl Into<PathBuf>) -> Self {
Self {
prompt: prompt.into(),
cwd: cwd.into(),
model: None,
allowed_tools: Vec::new(),
session: None,
extra_args: Vec::new(),
bin: None,
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub enum CaptureMode {
#[default]
Truncate,
Append,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct AssistantLine {
pub text: String,
pub message_id: String,
}
pub fn preflight(bin: Option<&Path>) -> Result<String, Error> {
let bin_os: &OsStr = bin
.map(|b| b.as_os_str())
.unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
if bin.is_none() && which::which(DEFAULT_BIN).is_err() {
return Err(Error::HostMissing);
}
let mut child = Command::new(bin_os)
.arg("--version")
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(map_spawn_error)?;
let status = match child
.wait_timeout(PREFLIGHT_TIMEOUT)
.map_err(|source| Error::Io { source })?
{
Some(status) => status,
None => {
let _ = child.kill();
let _ = child.wait();
return Err(Error::Timeout {
elapsed: PREFLIGHT_TIMEOUT,
});
}
};
let mut stdout = String::new();
if let Some(mut s) = child.stdout.take() {
s.read_to_string(&mut stdout)
.map_err(|source| Error::Io { source })?;
}
let mut stderr = String::new();
if let Some(mut s) = child.stderr.take() {
s.read_to_string(&mut stderr)
.map_err(|source| Error::Io { source })?;
}
if !status.success() {
return Err(Error::ExitedNonZero { status, stderr });
}
Ok(stdout.trim().to_string())
}
pub fn build_command(opts: &SpawnOpts) -> Command {
let bin_os: &OsStr = opts
.bin
.as_deref()
.map(|p| p.as_os_str())
.unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
let mut cmd = Command::new(bin_os);
cmd.current_dir(&opts.cwd);
cmd.args(["-p", "--output-format", "stream-json", "--verbose"]);
if let Some(model) = &opts.model {
cmd.args(["--model", model]);
}
if !opts.allowed_tools.is_empty() {
cmd.arg("--allowed-tools").arg(opts.allowed_tools.join(","));
}
match &opts.session {
Some(Session::New(uuid)) => {
cmd.args(["--session-id", uuid]);
}
Some(Session::Resume(uuid)) => {
cmd.args(["--resume", uuid]);
}
None => {}
}
for extra in &opts.extra_args {
cmd.arg(extra);
}
cmd.arg(&opts.prompt);
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd
}
pub fn spawn(opts: &SpawnOpts) -> Result<Child, Error> {
build_command(opts).spawn().map_err(map_spawn_error)
}
pub fn stream(child: Child, capture_path: &Path) -> Result<ClaudeProcess, Error> {
ClaudeProcess::from_child(child, capture_path)
}
fn map_spawn_error(source: std::io::Error) -> Error {
if source.kind() == std::io::ErrorKind::NotFound {
Error::HostMissing
} else {
Error::Spawn { source }
}
}
pub struct StreamParser<R: BufRead> {
reader: R,
capture: File,
capture_path: PathBuf,
buffers: HashMap<String, String>,
order: VecDeque<String>,
current_id: Option<String>,
pending: VecDeque<AssistantLine>,
finished: bool,
}
impl<R: BufRead> StreamParser<R> {
pub fn new(reader: R, capture_path: &Path) -> Result<Self, Error> {
Self::with_mode(reader, capture_path, CaptureMode::Truncate)
}
pub fn with_mode(reader: R, capture_path: &Path, mode: CaptureMode) -> Result<Self, Error> {
if let Some(parent) = capture_path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent).map_err(|source| Error::Capture {
path: parent.to_path_buf(),
source,
})?;
}
}
let mut open = OpenOptions::new();
open.create(true).write(true);
match mode {
CaptureMode::Truncate => {
open.truncate(true);
}
CaptureMode::Append => {
open.append(true);
}
}
let capture = open.open(capture_path).map_err(|source| Error::Capture {
path: capture_path.to_path_buf(),
source,
})?;
Ok(Self {
reader,
capture,
capture_path: capture_path.to_path_buf(),
buffers: HashMap::new(),
order: VecDeque::new(),
current_id: None,
pending: VecDeque::new(),
finished: false,
})
}
fn ingest_line(&mut self, raw: &str) -> Result<(), Error> {
let trimmed = raw.trim();
if trimmed.is_empty() {
return Ok(());
}
let env: serde_json::Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(err) => {
eprintln!("claude_proc: skipping malformed stream-json line: {err}");
return Ok(());
}
};
let ty = env.get("type").and_then(|t| t.as_str()).unwrap_or("");
if ty != "assistant" {
return Ok(());
}
let Some(message) = env.get("message") else {
return Ok(());
};
let id = message
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if let Some(prev) = self.current_id.as_ref() {
if prev != &id {
let prev = prev.clone();
self.flush_message(&prev);
}
}
if !self.buffers.contains_key(&id) {
self.order.push_back(id.clone());
}
self.current_id = Some(id.clone());
let empty = Vec::new();
let content = message
.get("content")
.and_then(|c| c.as_array())
.unwrap_or(&empty);
let mut appended = String::new();
for block in content {
let block_type = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
if block_type != "text" {
continue;
}
if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
appended.push_str(text);
}
}
if !appended.is_empty() {
self.buffers.entry(id).or_default().push_str(&appended);
self.capture
.write_all(appended.as_bytes())
.map_err(|source| Error::Capture {
path: self.capture_path.clone(),
source,
})?;
}
Ok(())
}
fn flush_message(&mut self, id: &str) {
let Some(buf) = self.buffers.remove(id) else {
return;
};
if let Some(pos) = self.order.iter().position(|x| x == id) {
self.order.remove(pos);
}
for piece in buf.split('\n') {
let trimmed = piece.trim();
if trimmed.is_empty() {
continue;
}
self.pending.push_back(AssistantLine {
text: trimmed.to_string(),
message_id: id.to_string(),
});
}
}
fn flush_all(&mut self) {
self.current_id = None;
while let Some(id) = self.order.pop_front() {
let buf = match self.buffers.remove(&id) {
Some(b) => b,
None => continue,
};
for piece in buf.split('\n') {
let trimmed = piece.trim();
if trimmed.is_empty() {
continue;
}
self.pending.push_back(AssistantLine {
text: trimmed.to_string(),
message_id: id.clone(),
});
}
}
}
fn flush_capture(&mut self) -> Result<(), Error> {
self.capture.flush().map_err(|source| Error::Capture {
path: self.capture_path.clone(),
source,
})
}
}
impl<R: BufRead> Iterator for StreamParser<R> {
type Item = Result<AssistantLine, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(line) = self.pending.pop_front() {
return Some(Ok(line));
}
if self.finished {
return None;
}
let mut raw = String::new();
match self.reader.read_line(&mut raw) {
Ok(0) => {
self.finished = true;
self.flush_all();
if let Err(e) = self.flush_capture() {
return Some(Err(e));
}
}
Ok(_) => {
if let Err(e) = self.ingest_line(&raw) {
return Some(Err(e));
}
}
Err(source) => {
self.finished = true;
return Some(Err(Error::Io { source }));
}
}
}
}
}
pub struct ClaudeProcess {
parser: StreamParser<BufReader<ChildStdout>>,
child: Arc<Mutex<Option<Child>>>,
stderr_thread: Option<JoinHandle<Vec<u8>>>,
}
#[derive(Clone)]
pub struct ChildKiller {
child: Arc<Mutex<Option<Child>>>,
}
impl ChildKiller {
pub fn kill(&self) -> std::io::Result<()> {
let mut guard = match self.child.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
if let Some(c) = guard.as_mut() {
return c.kill();
}
Ok(())
}
}
impl ClaudeProcess {
pub fn from_child(child: Child, capture_path: &Path) -> Result<Self, Error> {
Self::from_child_with_mode(child, capture_path, CaptureMode::Truncate)
}
pub fn from_child_with_mode(
mut child: Child,
capture_path: &Path,
mode: CaptureMode,
) -> Result<Self, Error> {
let stdout = child.stdout.take().ok_or_else(|| Error::Io {
source: std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"claude child spawned without piped stdout",
),
})?;
let stderr_thread = child.stderr.take().map(|mut s| {
thread::spawn(move || {
let mut buf = Vec::new();
let _ = s.read_to_end(&mut buf);
buf
})
});
let parser = StreamParser::with_mode(BufReader::new(stdout), capture_path, mode)?;
Ok(Self {
parser,
child: Arc::new(Mutex::new(Some(child))),
stderr_thread,
})
}
pub fn killer(&self) -> ChildKiller {
ChildKiller {
child: Arc::clone(&self.child),
}
}
pub fn finish(mut self) -> Result<(ExitStatus, String), Error> {
for item in self.parser.by_ref() {
item?;
}
self.parser.flush_capture()?;
let child_opt = {
let mut guard = match self.child.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
guard.take()
};
let mut child = child_opt.ok_or_else(|| Error::Io {
source: std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"ClaudeProcess::finish called with no live child — \
finish already called or kill-drop left the Option None",
),
})?;
let status = child.wait().map_err(|source| Error::Io { source })?;
let stderr_bytes = self
.stderr_thread
.take()
.map(|h| h.join().unwrap_or_default())
.unwrap_or_default();
let stderr = String::from_utf8_lossy(&stderr_bytes).into_owned();
Ok((status, stderr))
}
}
impl Iterator for ClaudeProcess {
type Item = Result<AssistantLine, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.parser.next()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tempfile::TempDir;
fn collect_args(cmd: &Command) -> Vec<String> {
cmd.get_args()
.map(|a| a.to_string_lossy().into_owned())
.collect()
}
#[test]
fn build_command_sets_required_streaming_flags() {
let opts = SpawnOpts::new("go", "/tmp");
let cmd = build_command(&opts);
let args = collect_args(&cmd);
assert!(args
.windows(4)
.any(|w| w == ["-p", "--output-format", "stream-json", "--verbose"]));
assert_eq!(args.last().unwrap(), "go");
}
#[test]
fn build_command_omits_unset_model_and_tools() {
let args = collect_args(&build_command(&SpawnOpts::new("x", "/tmp")));
assert!(!args.iter().any(|a| a == "--model"));
assert!(!args.iter().any(|a| a == "--allowed-tools"));
assert!(!args.iter().any(|a| a == "--session-id"));
assert!(!args.iter().any(|a| a == "--resume"));
}
#[test]
fn build_command_joins_allowed_tools_with_commas() {
let mut opts = SpawnOpts::new("x", "/tmp");
opts.allowed_tools = vec!["Read".into(), "Bash".into()];
let args = collect_args(&build_command(&opts));
let tools_idx = args.iter().position(|a| a == "--allowed-tools").unwrap();
assert_eq!(args[tools_idx + 1], "Read,Bash");
}
#[test]
fn build_command_uses_session_id_for_new_session() {
let mut opts = SpawnOpts::new("x", "/tmp");
opts.session = Some(Session::New("abc-123".into()));
let args = collect_args(&build_command(&opts));
let idx = args.iter().position(|a| a == "--session-id").unwrap();
assert_eq!(args[idx + 1], "abc-123");
assert!(!args.iter().any(|a| a == "--resume"));
}
#[test]
fn build_command_uses_resume_for_continued_session() {
let mut opts = SpawnOpts::new("x", "/tmp");
opts.session = Some(Session::Resume("abc-123".into()));
let args = collect_args(&build_command(&opts));
let idx = args.iter().position(|a| a == "--resume").unwrap();
assert_eq!(args[idx + 1], "abc-123");
assert!(!args.iter().any(|a| a == "--session-id"));
}
#[test]
fn build_command_puts_prompt_last_even_with_extra_args() {
let mut opts = SpawnOpts::new("go do it", "/tmp");
opts.extra_args = vec!["--dangerously-skip-permissions".into()];
let args = collect_args(&build_command(&opts));
assert_eq!(args.last().unwrap(), "go do it");
let danger_idx = args
.iter()
.position(|a| a == "--dangerously-skip-permissions")
.unwrap();
let prompt_idx = args.iter().position(|a| a == "go do it").unwrap();
assert!(danger_idx < prompt_idx, "extra args precede the prompt");
}
fn asst(id: &str, text: &str) -> String {
serde_json::json!({
"type": "assistant",
"message": {
"id": id,
"content": [ { "type": "text", "text": text } ]
}
})
.to_string()
}
fn asst_multi(id: &str, texts: &[&str]) -> String {
let content: Vec<_> = texts
.iter()
.map(|t| serde_json::json!({ "type": "text", "text": t }))
.collect();
serde_json::json!({
"type": "assistant",
"message": { "id": id, "content": content }
})
.to_string()
}
fn run_parser(lines: &[String], capture: &Path) -> Vec<AssistantLine> {
let body = lines.join("\n") + "\n";
let parser = StreamParser::new(Cursor::new(body.into_bytes()), capture).expect("open");
parser.map(|r| r.expect("parse ok")).collect()
}
#[test]
fn parses_three_messages_totaling_five_lines() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("node.out");
let lines = vec![
asst("m1", "first\nsecond"),
asst("m2", "third"),
asst("m3", "fourth\nfifth"),
];
let yielded = run_parser(&lines, &cap);
assert_eq!(
yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
vec!["first", "second", "third", "fourth", "fifth"]
);
assert_eq!(yielded[0].message_id, "m1");
assert_eq!(yielded[4].message_id, "m3");
}
#[test]
fn joins_multiple_text_blocks_within_one_message() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("node.out");
let line = asst_multi(
"m1",
&["first part ", "of line\nsecond ", "line split ", "too"],
);
let yielded = run_parser(&[line], &cap);
assert_eq!(
yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
vec!["first part of line", "second line split too"]
);
}
#[test]
fn joins_partial_message_across_envelopes_sharing_id() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("node.out");
let lines = vec![asst("msame", "hello "), asst("msame", "world\ngoodbye")];
let yielded = run_parser(&lines, &cap);
assert_eq!(
yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
vec!["hello world", "goodbye"]
);
}
#[test]
fn skips_malformed_stream_json_line_and_continues() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("node.out");
let lines = vec![
asst("m1", "ok"),
"{not valid json".to_string(),
asst("m2", "still ok"),
];
let yielded = run_parser(&lines, &cap);
assert_eq!(
yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
vec!["ok", "still ok"]
);
}
#[test]
fn skips_unknown_top_level_type() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("node.out");
let lines = vec![
serde_json::json!({ "type": "thinking", "content": "pondering" }).to_string(),
asst("m1", "visible"),
];
let yielded = run_parser(&lines, &cap);
assert_eq!(
yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
vec!["visible"]
);
}
#[test]
fn skips_system_hook_response_envelope() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("node.out");
let lines = vec![
serde_json::json!({
"type": "system",
"subtype": "hook_response",
"hook_name": "SessionStart",
"output": "CAVEMAN MODE ACTIVE"
})
.to_string(),
asst("m1", "real content"),
];
let yielded = run_parser(&lines, &cap);
assert_eq!(
yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
vec!["real content"]
);
}
#[test]
fn skips_non_text_blocks_inside_assistant_message() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("node.out");
let line = serde_json::json!({
"type": "assistant",
"message": {
"id": "m1",
"content": [
{ "type": "thinking", "thinking": "...internal..." },
{ "type": "text", "text": "visible line" },
{ "type": "tool_use", "name": "Read", "input": {} }
]
}
})
.to_string();
let yielded = run_parser(&[line], &cap);
assert_eq!(
yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
vec!["visible line"]
);
}
#[test]
fn trims_whitespace_around_reconstructed_lines() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("node.out");
let line = asst("m1", " BLOCKED \n trailing\n");
let yielded = run_parser(&[line], &cap);
assert_eq!(
yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
vec!["BLOCKED", "trailing"]
);
}
#[test]
fn capture_file_mirrors_concatenated_assistant_text() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("deep").join("node.out");
let lines = vec![
asst("m1", "alpha\n"),
asst_multi("m2", ["beta ", "gamma"].as_ref()),
];
let _ = run_parser(&lines, &cap);
let mut written = String::new();
File::open(&cap)
.unwrap()
.read_to_string(&mut written)
.unwrap();
assert_eq!(written, "alpha\nbeta gamma");
}
#[test]
fn empty_stream_yields_no_lines_and_creates_empty_capture() {
let tmp = TempDir::new().unwrap();
let cap = tmp.path().join("node.out");
let parser = StreamParser::new(Cursor::new(Vec::<u8>::new()), &cap).unwrap();
assert_eq!(parser.count(), 0);
assert!(cap.exists());
assert_eq!(std::fs::metadata(&cap).unwrap().len(), 0);
}
#[test]
fn preflight_missing_binary_is_host_missing() {
let err = preflight(Some(Path::new("/definitely/not/a/binary/omne-xyz"))).unwrap_err();
assert!(
matches!(err, Error::HostMissing),
"expected HostMissing, got {err:?}"
);
}
}