1#![allow(dead_code)]
38
39use std::collections::{HashMap, VecDeque};
40use std::ffi::OsStr;
41use std::fs::{File, OpenOptions};
42use std::io::{BufRead, BufReader, Read, Write};
43use std::path::{Path, PathBuf};
44use std::process::{Child, ChildStdout, Command, ExitStatus, Stdio};
45use std::sync::{Arc, Mutex};
46use std::thread::{self, JoinHandle};
47use std::time::Duration;
48
49use thiserror::Error;
50use wait_timeout::ChildExt;
51
52pub const DEFAULT_BIN: &str = "claude";
54
55pub const PREFLIGHT_TIMEOUT: Duration = Duration::from_secs(10);
61
62#[derive(Debug, Error)]
64pub enum Error {
65 #[error(
69 "claude binary not found on PATH\n\
70 hint: install Claude Code and ensure `claude` is on PATH"
71 )]
72 HostMissing,
73
74 #[error("failed to launch claude: {source}")]
78 Spawn {
79 #[source]
80 source: std::io::Error,
81 },
82
83 #[error("claude did not exit within {elapsed:?}")]
87 Timeout { elapsed: Duration },
88
89 #[error("claude exited with {status}\nstderr: {stderr}")]
93 ExitedNonZero { status: ExitStatus, stderr: String },
94
95 #[error("I/O error on claude stream: {source}")]
97 Io {
98 #[source]
99 source: std::io::Error,
100 },
101
102 #[error("capture I/O on {path}: {source}")]
104 Capture {
105 path: PathBuf,
106 #[source]
107 source: std::io::Error,
108 },
109}
110
111#[derive(Debug, Clone, Eq, PartialEq)]
119pub enum Session {
120 New(String),
123 Resume(String),
125}
126
127#[derive(Debug, Clone)]
129pub struct SpawnOpts {
130 pub prompt: String,
132 pub cwd: PathBuf,
135 pub model: Option<String>,
137 pub allowed_tools: Vec<String>,
140 pub session: Option<Session>,
142 pub extra_args: Vec<String>,
146 pub bin: Option<PathBuf>,
148}
149
150impl SpawnOpts {
151 pub fn new(prompt: impl Into<String>, cwd: impl Into<PathBuf>) -> Self {
154 Self {
155 prompt: prompt.into(),
156 cwd: cwd.into(),
157 model: None,
158 allowed_tools: Vec::new(),
159 session: None,
160 extra_args: Vec::new(),
161 bin: None,
162 }
163 }
164}
165
166#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
174pub enum CaptureMode {
175 #[default]
176 Truncate,
177 Append,
178}
179
180#[derive(Debug, Clone, Eq, PartialEq)]
188pub struct AssistantLine {
189 pub text: String,
190 pub message_id: String,
191}
192
193pub fn preflight(bin: Option<&Path>) -> Result<String, Error> {
199 let bin_os: &OsStr = bin
200 .map(|b| b.as_os_str())
201 .unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
202
203 if bin.is_none() && which::which(DEFAULT_BIN).is_err() {
206 return Err(Error::HostMissing);
207 }
208
209 let mut child = Command::new(bin_os)
210 .arg("--version")
211 .stdin(Stdio::null())
212 .stdout(Stdio::piped())
213 .stderr(Stdio::piped())
214 .spawn()
215 .map_err(map_spawn_error)?;
216
217 let status = match child
218 .wait_timeout(PREFLIGHT_TIMEOUT)
219 .map_err(|source| Error::Io { source })?
220 {
221 Some(status) => status,
222 None => {
223 let _ = child.kill();
224 let _ = child.wait();
225 return Err(Error::Timeout {
226 elapsed: PREFLIGHT_TIMEOUT,
227 });
228 }
229 };
230
231 let mut stdout = String::new();
232 if let Some(mut s) = child.stdout.take() {
233 s.read_to_string(&mut stdout)
234 .map_err(|source| Error::Io { source })?;
235 }
236 let mut stderr = String::new();
237 if let Some(mut s) = child.stderr.take() {
238 s.read_to_string(&mut stderr)
239 .map_err(|source| Error::Io { source })?;
240 }
241
242 if !status.success() {
243 return Err(Error::ExitedNonZero { status, stderr });
244 }
245 Ok(stdout.trim().to_string())
246}
247
248pub fn build_command(opts: &SpawnOpts) -> Command {
252 let bin_os: &OsStr = opts
253 .bin
254 .as_deref()
255 .map(|p| p.as_os_str())
256 .unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
257
258 let mut cmd = Command::new(bin_os);
259 cmd.current_dir(&opts.cwd);
260 cmd.args(["-p", "--output-format", "stream-json", "--verbose"]);
261
262 if let Some(model) = &opts.model {
263 cmd.args(["--model", model]);
264 }
265 if !opts.allowed_tools.is_empty() {
266 cmd.arg("--allowed-tools").arg(opts.allowed_tools.join(","));
267 }
268 match &opts.session {
269 Some(Session::New(uuid)) => {
270 cmd.args(["--session-id", uuid]);
271 }
272 Some(Session::Resume(uuid)) => {
273 cmd.args(["--resume", uuid]);
274 }
275 None => {}
276 }
277 for extra in &opts.extra_args {
278 cmd.arg(extra);
279 }
280
281 cmd.arg(&opts.prompt);
284
285 cmd.stdin(Stdio::null());
286 cmd.stdout(Stdio::piped());
287 cmd.stderr(Stdio::piped());
288 cmd
289}
290
291pub fn spawn(opts: &SpawnOpts) -> Result<Child, Error> {
297 build_command(opts).spawn().map_err(map_spawn_error)
298}
299
300pub fn stream(child: Child, capture_path: &Path) -> Result<ClaudeProcess, Error> {
304 ClaudeProcess::from_child(child, capture_path)
305}
306
307fn map_spawn_error(source: std::io::Error) -> Error {
311 if source.kind() == std::io::ErrorKind::NotFound {
312 Error::HostMissing
313 } else {
314 Error::Spawn { source }
315 }
316}
317
318pub struct StreamParser<R: BufRead> {
327 reader: R,
328 capture: File,
329 capture_path: PathBuf,
330 buffers: HashMap<String, String>,
334 order: VecDeque<String>,
338 current_id: Option<String>,
341 pending: VecDeque<AssistantLine>,
343 finished: bool,
344}
345
346impl<R: BufRead> StreamParser<R> {
347 pub fn new(reader: R, capture_path: &Path) -> Result<Self, Error> {
352 Self::with_mode(reader, capture_path, CaptureMode::Truncate)
353 }
354
355 pub fn with_mode(reader: R, capture_path: &Path, mode: CaptureMode) -> Result<Self, Error> {
361 if let Some(parent) = capture_path.parent() {
362 if !parent.as_os_str().is_empty() {
363 std::fs::create_dir_all(parent).map_err(|source| Error::Capture {
364 path: parent.to_path_buf(),
365 source,
366 })?;
367 }
368 }
369 let mut open = OpenOptions::new();
370 open.create(true).write(true);
371 match mode {
372 CaptureMode::Truncate => {
373 open.truncate(true);
374 }
375 CaptureMode::Append => {
376 open.append(true);
377 }
378 }
379 let capture = open.open(capture_path).map_err(|source| Error::Capture {
380 path: capture_path.to_path_buf(),
381 source,
382 })?;
383 Ok(Self {
384 reader,
385 capture,
386 capture_path: capture_path.to_path_buf(),
387 buffers: HashMap::new(),
388 order: VecDeque::new(),
389 current_id: None,
390 pending: VecDeque::new(),
391 finished: false,
392 })
393 }
394
395 fn ingest_line(&mut self, raw: &str) -> Result<(), Error> {
400 let trimmed = raw.trim();
401 if trimmed.is_empty() {
402 return Ok(());
403 }
404 let env: serde_json::Value = match serde_json::from_str(trimmed) {
405 Ok(v) => v,
406 Err(err) => {
407 eprintln!("claude_proc: skipping malformed stream-json line: {err}");
408 return Ok(());
409 }
410 };
411 let ty = env.get("type").and_then(|t| t.as_str()).unwrap_or("");
412 if ty != "assistant" {
413 return Ok(());
418 }
419
420 let Some(message) = env.get("message") else {
421 return Ok(());
422 };
423 let id = message
424 .get("id")
425 .and_then(|v| v.as_str())
426 .unwrap_or("")
427 .to_string();
428
429 if let Some(prev) = self.current_id.as_ref() {
433 if prev != &id {
434 let prev = prev.clone();
435 self.flush_message(&prev);
436 }
437 }
438 if !self.buffers.contains_key(&id) {
439 self.order.push_back(id.clone());
440 }
441 self.current_id = Some(id.clone());
442
443 let empty = Vec::new();
444 let content = message
445 .get("content")
446 .and_then(|c| c.as_array())
447 .unwrap_or(&empty);
448 let mut appended = String::new();
449 for block in content {
450 let block_type = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
451 if block_type != "text" {
452 continue;
453 }
454 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
455 appended.push_str(text);
456 }
457 }
458 if !appended.is_empty() {
459 self.buffers.entry(id).or_default().push_str(&appended);
460 self.capture
461 .write_all(appended.as_bytes())
462 .map_err(|source| Error::Capture {
463 path: self.capture_path.clone(),
464 source,
465 })?;
466 }
467 Ok(())
468 }
469
470 fn flush_message(&mut self, id: &str) {
474 let Some(buf) = self.buffers.remove(id) else {
475 return;
476 };
477 if let Some(pos) = self.order.iter().position(|x| x == id) {
479 self.order.remove(pos);
480 }
481 for piece in buf.split('\n') {
482 let trimmed = piece.trim();
483 if trimmed.is_empty() {
484 continue;
485 }
486 self.pending.push_back(AssistantLine {
487 text: trimmed.to_string(),
488 message_id: id.to_string(),
489 });
490 }
491 }
492
493 fn flush_all(&mut self) {
497 self.current_id = None;
498 while let Some(id) = self.order.pop_front() {
499 let buf = match self.buffers.remove(&id) {
504 Some(b) => b,
505 None => continue,
506 };
507 for piece in buf.split('\n') {
508 let trimmed = piece.trim();
509 if trimmed.is_empty() {
510 continue;
511 }
512 self.pending.push_back(AssistantLine {
513 text: trimmed.to_string(),
514 message_id: id.clone(),
515 });
516 }
517 }
518 }
519
520 fn flush_capture(&mut self) -> Result<(), Error> {
524 self.capture.flush().map_err(|source| Error::Capture {
525 path: self.capture_path.clone(),
526 source,
527 })
528 }
529}
530
531impl<R: BufRead> Iterator for StreamParser<R> {
532 type Item = Result<AssistantLine, Error>;
533
534 fn next(&mut self) -> Option<Self::Item> {
535 loop {
536 if let Some(line) = self.pending.pop_front() {
537 return Some(Ok(line));
538 }
539 if self.finished {
540 return None;
541 }
542 let mut raw = String::new();
543 match self.reader.read_line(&mut raw) {
544 Ok(0) => {
545 self.finished = true;
546 self.flush_all();
547 if let Err(e) = self.flush_capture() {
548 return Some(Err(e));
549 }
550 }
551 Ok(_) => {
552 if let Err(e) = self.ingest_line(&raw) {
553 return Some(Err(e));
554 }
555 }
556 Err(source) => {
557 self.finished = true;
558 return Some(Err(Error::Io { source }));
559 }
560 }
561 }
562 }
563}
564
565pub struct ClaudeProcess {
575 parser: StreamParser<BufReader<ChildStdout>>,
576 child: Arc<Mutex<Option<Child>>>,
577 stderr_thread: Option<JoinHandle<Vec<u8>>>,
578}
579
580#[derive(Clone)]
588pub struct ChildKiller {
589 child: Arc<Mutex<Option<Child>>>,
590}
591
592impl ChildKiller {
593 pub fn kill(&self) -> std::io::Result<()> {
597 let mut guard = match self.child.lock() {
598 Ok(g) => g,
599 Err(poisoned) => poisoned.into_inner(),
600 };
601 if let Some(c) = guard.as_mut() {
602 return c.kill();
603 }
604 Ok(())
605 }
606}
607
608impl ClaudeProcess {
609 pub fn from_child(child: Child, capture_path: &Path) -> Result<Self, Error> {
618 Self::from_child_with_mode(child, capture_path, CaptureMode::Truncate)
619 }
620
621 pub fn from_child_with_mode(
625 mut child: Child,
626 capture_path: &Path,
627 mode: CaptureMode,
628 ) -> Result<Self, Error> {
629 let stdout = child.stdout.take().ok_or_else(|| Error::Io {
630 source: std::io::Error::new(
631 std::io::ErrorKind::BrokenPipe,
632 "claude child spawned without piped stdout",
633 ),
634 })?;
635 let stderr_thread = child.stderr.take().map(|mut s| {
636 thread::spawn(move || {
637 let mut buf = Vec::new();
638 let _ = s.read_to_end(&mut buf);
639 buf
640 })
641 });
642 let parser = StreamParser::with_mode(BufReader::new(stdout), capture_path, mode)?;
643 Ok(Self {
644 parser,
645 child: Arc::new(Mutex::new(Some(child))),
646 stderr_thread,
647 })
648 }
649
650 pub fn killer(&self) -> ChildKiller {
652 ChildKiller {
653 child: Arc::clone(&self.child),
654 }
655 }
656
657 pub fn finish(mut self) -> Result<(ExitStatus, String), Error> {
665 for item in self.parser.by_ref() {
668 item?;
669 }
670 self.parser.flush_capture()?;
671
672 let child_opt = {
673 let mut guard = match self.child.lock() {
674 Ok(g) => g,
675 Err(poisoned) => poisoned.into_inner(),
676 };
677 guard.take()
678 };
679 let mut child = child_opt.ok_or_else(|| Error::Io {
680 source: std::io::Error::new(
681 std::io::ErrorKind::BrokenPipe,
682 "ClaudeProcess::finish called with no live child — \
683 finish already called or kill-drop left the Option None",
684 ),
685 })?;
686 let status = child.wait().map_err(|source| Error::Io { source })?;
687 let stderr_bytes = self
688 .stderr_thread
689 .take()
690 .map(|h| h.join().unwrap_or_default())
691 .unwrap_or_default();
692 let stderr = String::from_utf8_lossy(&stderr_bytes).into_owned();
693 Ok((status, stderr))
694 }
695}
696
697impl Iterator for ClaudeProcess {
698 type Item = Result<AssistantLine, Error>;
699
700 fn next(&mut self) -> Option<Self::Item> {
701 self.parser.next()
702 }
703}
704
705#[cfg(test)]
706mod tests {
707 use super::*;
708 use std::io::Cursor;
709 use tempfile::TempDir;
710
711 fn collect_args(cmd: &Command) -> Vec<String> {
714 cmd.get_args()
715 .map(|a| a.to_string_lossy().into_owned())
716 .collect()
717 }
718
719 #[test]
720 fn build_command_sets_required_streaming_flags() {
721 let opts = SpawnOpts::new("go", "/tmp");
722 let cmd = build_command(&opts);
723 let args = collect_args(&cmd);
724 assert!(args
725 .windows(4)
726 .any(|w| w == ["-p", "--output-format", "stream-json", "--verbose"]));
727 assert_eq!(args.last().unwrap(), "go");
728 }
729
730 #[test]
731 fn build_command_omits_unset_model_and_tools() {
732 let args = collect_args(&build_command(&SpawnOpts::new("x", "/tmp")));
733 assert!(!args.iter().any(|a| a == "--model"));
734 assert!(!args.iter().any(|a| a == "--allowed-tools"));
735 assert!(!args.iter().any(|a| a == "--session-id"));
736 assert!(!args.iter().any(|a| a == "--resume"));
737 }
738
739 #[test]
740 fn build_command_joins_allowed_tools_with_commas() {
741 let mut opts = SpawnOpts::new("x", "/tmp");
742 opts.allowed_tools = vec!["Read".into(), "Bash".into()];
743 let args = collect_args(&build_command(&opts));
744 let tools_idx = args.iter().position(|a| a == "--allowed-tools").unwrap();
745 assert_eq!(args[tools_idx + 1], "Read,Bash");
746 }
747
748 #[test]
749 fn build_command_uses_session_id_for_new_session() {
750 let mut opts = SpawnOpts::new("x", "/tmp");
751 opts.session = Some(Session::New("abc-123".into()));
752 let args = collect_args(&build_command(&opts));
753 let idx = args.iter().position(|a| a == "--session-id").unwrap();
754 assert_eq!(args[idx + 1], "abc-123");
755 assert!(!args.iter().any(|a| a == "--resume"));
756 }
757
758 #[test]
759 fn build_command_uses_resume_for_continued_session() {
760 let mut opts = SpawnOpts::new("x", "/tmp");
761 opts.session = Some(Session::Resume("abc-123".into()));
762 let args = collect_args(&build_command(&opts));
763 let idx = args.iter().position(|a| a == "--resume").unwrap();
764 assert_eq!(args[idx + 1], "abc-123");
765 assert!(!args.iter().any(|a| a == "--session-id"));
766 }
767
768 #[test]
769 fn build_command_puts_prompt_last_even_with_extra_args() {
770 let mut opts = SpawnOpts::new("go do it", "/tmp");
771 opts.extra_args = vec!["--dangerously-skip-permissions".into()];
772 let args = collect_args(&build_command(&opts));
773 assert_eq!(args.last().unwrap(), "go do it");
774 let danger_idx = args
775 .iter()
776 .position(|a| a == "--dangerously-skip-permissions")
777 .unwrap();
778 let prompt_idx = args.iter().position(|a| a == "go do it").unwrap();
779 assert!(danger_idx < prompt_idx, "extra args precede the prompt");
780 }
781
782 fn asst(id: &str, text: &str) -> String {
785 serde_json::json!({
786 "type": "assistant",
787 "message": {
788 "id": id,
789 "content": [ { "type": "text", "text": text } ]
790 }
791 })
792 .to_string()
793 }
794
795 fn asst_multi(id: &str, texts: &[&str]) -> String {
796 let content: Vec<_> = texts
797 .iter()
798 .map(|t| serde_json::json!({ "type": "text", "text": t }))
799 .collect();
800 serde_json::json!({
801 "type": "assistant",
802 "message": { "id": id, "content": content }
803 })
804 .to_string()
805 }
806
807 fn run_parser(lines: &[String], capture: &Path) -> Vec<AssistantLine> {
808 let body = lines.join("\n") + "\n";
809 let parser = StreamParser::new(Cursor::new(body.into_bytes()), capture).expect("open");
810 parser.map(|r| r.expect("parse ok")).collect()
811 }
812
813 #[test]
814 fn parses_three_messages_totaling_five_lines() {
815 let tmp = TempDir::new().unwrap();
816 let cap = tmp.path().join("node.out");
817 let lines = vec![
818 asst("m1", "first\nsecond"),
819 asst("m2", "third"),
820 asst("m3", "fourth\nfifth"),
821 ];
822 let yielded = run_parser(&lines, &cap);
823 assert_eq!(
824 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
825 vec!["first", "second", "third", "fourth", "fifth"]
826 );
827 assert_eq!(yielded[0].message_id, "m1");
828 assert_eq!(yielded[4].message_id, "m3");
829 }
830
831 #[test]
832 fn joins_multiple_text_blocks_within_one_message() {
833 let tmp = TempDir::new().unwrap();
834 let cap = tmp.path().join("node.out");
835 let line = asst_multi(
838 "m1",
839 &["first part ", "of line\nsecond ", "line split ", "too"],
840 );
841 let yielded = run_parser(&[line], &cap);
842 assert_eq!(
843 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
844 vec!["first part of line", "second line split too"]
845 );
846 }
847
848 #[test]
849 fn joins_partial_message_across_envelopes_sharing_id() {
850 let tmp = TempDir::new().unwrap();
853 let cap = tmp.path().join("node.out");
854 let lines = vec![asst("msame", "hello "), asst("msame", "world\ngoodbye")];
855 let yielded = run_parser(&lines, &cap);
856 assert_eq!(
857 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
858 vec!["hello world", "goodbye"]
859 );
860 }
861
862 #[test]
863 fn skips_malformed_stream_json_line_and_continues() {
864 let tmp = TempDir::new().unwrap();
865 let cap = tmp.path().join("node.out");
866 let lines = vec![
867 asst("m1", "ok"),
868 "{not valid json".to_string(),
869 asst("m2", "still ok"),
870 ];
871 let yielded = run_parser(&lines, &cap);
872 assert_eq!(
873 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
874 vec!["ok", "still ok"]
875 );
876 }
877
878 #[test]
879 fn skips_unknown_top_level_type() {
880 let tmp = TempDir::new().unwrap();
881 let cap = tmp.path().join("node.out");
882 let lines = vec![
883 serde_json::json!({ "type": "thinking", "content": "pondering" }).to_string(),
884 asst("m1", "visible"),
885 ];
886 let yielded = run_parser(&lines, &cap);
887 assert_eq!(
888 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
889 vec!["visible"]
890 );
891 }
892
893 #[test]
894 fn skips_system_hook_response_envelope() {
895 let tmp = TempDir::new().unwrap();
897 let cap = tmp.path().join("node.out");
898 let lines = vec![
899 serde_json::json!({
900 "type": "system",
901 "subtype": "hook_response",
902 "hook_name": "SessionStart",
903 "output": "CAVEMAN MODE ACTIVE"
904 })
905 .to_string(),
906 asst("m1", "real content"),
907 ];
908 let yielded = run_parser(&lines, &cap);
909 assert_eq!(
910 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
911 vec!["real content"]
912 );
913 }
914
915 #[test]
916 fn skips_non_text_blocks_inside_assistant_message() {
917 let tmp = TempDir::new().unwrap();
918 let cap = tmp.path().join("node.out");
919 let line = serde_json::json!({
922 "type": "assistant",
923 "message": {
924 "id": "m1",
925 "content": [
926 { "type": "thinking", "thinking": "...internal..." },
927 { "type": "text", "text": "visible line" },
928 { "type": "tool_use", "name": "Read", "input": {} }
929 ]
930 }
931 })
932 .to_string();
933 let yielded = run_parser(&[line], &cap);
934 assert_eq!(
935 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
936 vec!["visible line"]
937 );
938 }
939
940 #[test]
941 fn trims_whitespace_around_reconstructed_lines() {
942 let tmp = TempDir::new().unwrap();
943 let cap = tmp.path().join("node.out");
944 let line = asst("m1", " BLOCKED \n trailing\n");
945 let yielded = run_parser(&[line], &cap);
946 assert_eq!(
947 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
948 vec!["BLOCKED", "trailing"]
949 );
950 }
951
952 #[test]
953 fn capture_file_mirrors_concatenated_assistant_text() {
954 let tmp = TempDir::new().unwrap();
955 let cap = tmp.path().join("deep").join("node.out");
956 let lines = vec![
957 asst("m1", "alpha\n"),
958 asst_multi("m2", ["beta ", "gamma"].as_ref()),
959 ];
960 let _ = run_parser(&lines, &cap);
961 let mut written = String::new();
962 File::open(&cap)
963 .unwrap()
964 .read_to_string(&mut written)
965 .unwrap();
966 assert_eq!(written, "alpha\nbeta gamma");
967 }
968
969 #[test]
970 fn empty_stream_yields_no_lines_and_creates_empty_capture() {
971 let tmp = TempDir::new().unwrap();
972 let cap = tmp.path().join("node.out");
973 let parser = StreamParser::new(Cursor::new(Vec::<u8>::new()), &cap).unwrap();
974 assert_eq!(parser.count(), 0);
975 assert!(cap.exists());
976 assert_eq!(std::fs::metadata(&cap).unwrap().len(), 0);
977 }
978
979 #[test]
980 fn preflight_missing_binary_is_host_missing() {
981 let err = preflight(Some(Path::new("/definitely/not/a/binary/omne-xyz"))).unwrap_err();
983 assert!(
984 matches!(err, Error::HostMissing),
985 "expected HostMissing, got {err:?}"
986 );
987 }
988}