1#![allow(dead_code)]
38
39use std::collections::{HashMap, VecDeque};
40use std::ffi::OsStr;
41use std::fs::{File, OpenOptions};
42use std::io::{BufRead, BufReader, Read, Write};
43use std::path::{Path, PathBuf};
44use std::process::{Child, ChildStdout, Command, ExitStatus, Stdio};
45use std::sync::{Arc, Mutex};
46use std::thread::{self, JoinHandle};
47use std::time::Duration;
48
49use thiserror::Error;
50use wait_timeout::ChildExt;
51
52pub const DEFAULT_BIN: &str = "claude";
54
55pub const PREFLIGHT_TIMEOUT: Duration = Duration::from_secs(10);
61
62#[derive(Debug, Error)]
64pub enum Error {
65 #[error(
69 "claude binary not found on PATH\n\
70 hint: install Claude Code and ensure `claude` is on PATH"
71 )]
72 HostMissing,
73
74 #[error("failed to launch claude: {source}")]
78 Spawn {
79 #[source]
80 source: std::io::Error,
81 },
82
83 #[error("claude did not exit within {elapsed:?}")]
87 Timeout { elapsed: Duration },
88
89 #[error("claude exited with {status}\nstderr: {stderr}")]
93 ExitedNonZero { status: ExitStatus, stderr: String },
94
95 #[error("I/O error on claude stream: {source}")]
97 Io {
98 #[source]
99 source: std::io::Error,
100 },
101
102 #[error("capture I/O on {path}: {source}")]
104 Capture {
105 path: PathBuf,
106 #[source]
107 source: std::io::Error,
108 },
109}
110
111#[derive(Debug, Clone, Eq, PartialEq)]
119pub enum Session {
120 New(String),
123 Resume(String),
125}
126
127#[derive(Debug, Clone)]
129pub struct SpawnOpts {
130 pub prompt: String,
132 pub cwd: PathBuf,
135 pub model: Option<String>,
137 pub allowed_tools: Vec<String>,
140 pub session: Option<Session>,
142 pub extra_args: Vec<String>,
146 pub env_vars: Vec<(String, String)>,
152 pub bin: Option<PathBuf>,
154}
155
156impl SpawnOpts {
157 pub fn new(prompt: impl Into<String>, cwd: impl Into<PathBuf>) -> Self {
160 Self {
161 prompt: prompt.into(),
162 cwd: cwd.into(),
163 model: None,
164 allowed_tools: Vec::new(),
165 session: None,
166 extra_args: Vec::new(),
167 env_vars: Vec::new(),
168 bin: None,
169 }
170 }
171}
172
173#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
181pub enum CaptureMode {
182 #[default]
183 Truncate,
184 Append,
185}
186
187#[derive(Debug, Clone, Eq, PartialEq)]
195pub struct AssistantLine {
196 pub text: String,
197 pub message_id: String,
198}
199
200pub fn preflight(bin: Option<&Path>) -> Result<String, Error> {
206 let bin_os: &OsStr = bin
207 .map(|b| b.as_os_str())
208 .unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
209
210 if bin.is_none() && which::which(DEFAULT_BIN).is_err() {
213 return Err(Error::HostMissing);
214 }
215
216 let mut child = Command::new(bin_os)
217 .arg("--version")
218 .stdin(Stdio::null())
219 .stdout(Stdio::piped())
220 .stderr(Stdio::piped())
221 .spawn()
222 .map_err(map_spawn_error)?;
223
224 let status = match child
225 .wait_timeout(PREFLIGHT_TIMEOUT)
226 .map_err(|source| Error::Io { source })?
227 {
228 Some(status) => status,
229 None => {
230 let _ = child.kill();
231 let _ = child.wait();
232 return Err(Error::Timeout {
233 elapsed: PREFLIGHT_TIMEOUT,
234 });
235 }
236 };
237
238 let mut stdout = String::new();
239 if let Some(mut s) = child.stdout.take() {
240 s.read_to_string(&mut stdout)
241 .map_err(|source| Error::Io { source })?;
242 }
243 let mut stderr = String::new();
244 if let Some(mut s) = child.stderr.take() {
245 s.read_to_string(&mut stderr)
246 .map_err(|source| Error::Io { source })?;
247 }
248
249 if !status.success() {
250 return Err(Error::ExitedNonZero { status, stderr });
251 }
252 Ok(stdout.trim().to_string())
253}
254
255pub fn build_command(opts: &SpawnOpts) -> Command {
259 let bin_os: &OsStr = opts
260 .bin
261 .as_deref()
262 .map(|p| p.as_os_str())
263 .unwrap_or_else(|| OsStr::new(DEFAULT_BIN));
264
265 let mut cmd = Command::new(bin_os);
266 cmd.current_dir(&opts.cwd);
267 for (k, v) in &opts.env_vars {
268 cmd.env(k, v);
269 }
270 cmd.args(["-p", "--output-format", "stream-json", "--verbose"]);
271
272 if let Some(model) = &opts.model {
273 cmd.args(["--model", model]);
274 }
275 if !opts.allowed_tools.is_empty() {
276 cmd.arg("--allowed-tools").arg(opts.allowed_tools.join(","));
277 }
278 match &opts.session {
279 Some(Session::New(uuid)) => {
280 cmd.args(["--session-id", uuid]);
281 }
282 Some(Session::Resume(uuid)) => {
283 cmd.args(["--resume", uuid]);
284 }
285 None => {}
286 }
287 for extra in &opts.extra_args {
288 cmd.arg(extra);
289 }
290
291 cmd.arg(&opts.prompt);
294
295 cmd.stdin(Stdio::null());
296 cmd.stdout(Stdio::piped());
297 cmd.stderr(Stdio::piped());
298 cmd
299}
300
301pub fn spawn(opts: &SpawnOpts) -> Result<Child, Error> {
307 build_command(opts).spawn().map_err(map_spawn_error)
308}
309
310pub fn stream(child: Child, capture_path: &Path) -> Result<ClaudeProcess, Error> {
314 ClaudeProcess::from_child(child, capture_path)
315}
316
317fn map_spawn_error(source: std::io::Error) -> Error {
321 if source.kind() == std::io::ErrorKind::NotFound {
322 Error::HostMissing
323 } else {
324 Error::Spawn { source }
325 }
326}
327
328pub struct StreamParser<R: BufRead> {
337 reader: R,
338 capture: File,
339 capture_path: PathBuf,
340 buffers: HashMap<String, String>,
344 order: VecDeque<String>,
348 current_id: Option<String>,
351 pending: VecDeque<AssistantLine>,
353 finished: bool,
354}
355
356impl<R: BufRead> StreamParser<R> {
357 pub fn new(reader: R, capture_path: &Path) -> Result<Self, Error> {
362 Self::with_mode(reader, capture_path, CaptureMode::Truncate)
363 }
364
365 pub fn with_mode(reader: R, capture_path: &Path, mode: CaptureMode) -> Result<Self, Error> {
371 if let Some(parent) = capture_path.parent() {
372 if !parent.as_os_str().is_empty() {
373 std::fs::create_dir_all(parent).map_err(|source| Error::Capture {
374 path: parent.to_path_buf(),
375 source,
376 })?;
377 }
378 }
379 let mut open = OpenOptions::new();
380 open.create(true).write(true);
381 match mode {
382 CaptureMode::Truncate => {
383 open.truncate(true);
384 }
385 CaptureMode::Append => {
386 open.append(true);
387 }
388 }
389 let capture = open.open(capture_path).map_err(|source| Error::Capture {
390 path: capture_path.to_path_buf(),
391 source,
392 })?;
393 Ok(Self {
394 reader,
395 capture,
396 capture_path: capture_path.to_path_buf(),
397 buffers: HashMap::new(),
398 order: VecDeque::new(),
399 current_id: None,
400 pending: VecDeque::new(),
401 finished: false,
402 })
403 }
404
405 fn ingest_line(&mut self, raw: &str) -> Result<(), Error> {
410 let trimmed = raw.trim();
411 if trimmed.is_empty() {
412 return Ok(());
413 }
414 let env: serde_json::Value = match serde_json::from_str(trimmed) {
415 Ok(v) => v,
416 Err(err) => {
417 eprintln!("claude_proc: skipping malformed stream-json line: {err}");
418 return Ok(());
419 }
420 };
421 let ty = env.get("type").and_then(|t| t.as_str()).unwrap_or("");
422 if ty != "assistant" {
423 return Ok(());
428 }
429
430 let Some(message) = env.get("message") else {
431 return Ok(());
432 };
433 let id = message
434 .get("id")
435 .and_then(|v| v.as_str())
436 .unwrap_or("")
437 .to_string();
438
439 if let Some(prev) = self.current_id.as_ref() {
443 if prev != &id {
444 let prev = prev.clone();
445 self.flush_message(&prev);
446 }
447 }
448 if !self.buffers.contains_key(&id) {
449 self.order.push_back(id.clone());
450 }
451 self.current_id = Some(id.clone());
452
453 let empty = Vec::new();
454 let content = message
455 .get("content")
456 .and_then(|c| c.as_array())
457 .unwrap_or(&empty);
458 let mut appended = String::new();
459 for block in content {
460 let block_type = block.get("type").and_then(|t| t.as_str()).unwrap_or("");
461 if block_type != "text" {
462 continue;
463 }
464 if let Some(text) = block.get("text").and_then(|v| v.as_str()) {
465 appended.push_str(text);
466 }
467 }
468 if !appended.is_empty() {
469 self.buffers.entry(id).or_default().push_str(&appended);
470 self.capture
471 .write_all(appended.as_bytes())
472 .map_err(|source| Error::Capture {
473 path: self.capture_path.clone(),
474 source,
475 })?;
476 }
477 Ok(())
478 }
479
480 fn flush_message(&mut self, id: &str) {
484 let Some(buf) = self.buffers.remove(id) else {
485 return;
486 };
487 if let Some(pos) = self.order.iter().position(|x| x == id) {
489 self.order.remove(pos);
490 }
491 for piece in buf.split('\n') {
492 let trimmed = piece.trim();
493 if trimmed.is_empty() {
494 continue;
495 }
496 self.pending.push_back(AssistantLine {
497 text: trimmed.to_string(),
498 message_id: id.to_string(),
499 });
500 }
501 }
502
503 fn flush_all(&mut self) {
507 self.current_id = None;
508 while let Some(id) = self.order.pop_front() {
509 let buf = match self.buffers.remove(&id) {
514 Some(b) => b,
515 None => continue,
516 };
517 for piece in buf.split('\n') {
518 let trimmed = piece.trim();
519 if trimmed.is_empty() {
520 continue;
521 }
522 self.pending.push_back(AssistantLine {
523 text: trimmed.to_string(),
524 message_id: id.clone(),
525 });
526 }
527 }
528 }
529
530 fn flush_capture(&mut self) -> Result<(), Error> {
534 self.capture.flush().map_err(|source| Error::Capture {
535 path: self.capture_path.clone(),
536 source,
537 })
538 }
539}
540
541impl<R: BufRead> Iterator for StreamParser<R> {
542 type Item = Result<AssistantLine, Error>;
543
544 fn next(&mut self) -> Option<Self::Item> {
545 loop {
546 if let Some(line) = self.pending.pop_front() {
547 return Some(Ok(line));
548 }
549 if self.finished {
550 return None;
551 }
552 let mut raw = String::new();
553 match self.reader.read_line(&mut raw) {
554 Ok(0) => {
555 self.finished = true;
556 self.flush_all();
557 if let Err(e) = self.flush_capture() {
558 return Some(Err(e));
559 }
560 }
561 Ok(_) => {
562 if let Err(e) = self.ingest_line(&raw) {
563 return Some(Err(e));
564 }
565 }
566 Err(source) => {
567 self.finished = true;
568 return Some(Err(Error::Io { source }));
569 }
570 }
571 }
572 }
573}
574
575pub struct ClaudeProcess {
585 parser: StreamParser<BufReader<ChildStdout>>,
586 child: Arc<Mutex<Option<Child>>>,
587 stderr_thread: Option<JoinHandle<Vec<u8>>>,
588}
589
590#[derive(Clone)]
598pub struct ChildKiller {
599 child: Arc<Mutex<Option<Child>>>,
600}
601
602impl ChildKiller {
603 pub fn kill(&self) -> std::io::Result<()> {
607 let mut guard = match self.child.lock() {
608 Ok(g) => g,
609 Err(poisoned) => poisoned.into_inner(),
610 };
611 if let Some(c) = guard.as_mut() {
612 return c.kill();
613 }
614 Ok(())
615 }
616}
617
618impl ClaudeProcess {
619 pub fn from_child(child: Child, capture_path: &Path) -> Result<Self, Error> {
628 Self::from_child_with_mode(child, capture_path, CaptureMode::Truncate)
629 }
630
631 pub fn from_child_with_mode(
635 mut child: Child,
636 capture_path: &Path,
637 mode: CaptureMode,
638 ) -> Result<Self, Error> {
639 let stdout = child.stdout.take().ok_or_else(|| Error::Io {
640 source: std::io::Error::new(
641 std::io::ErrorKind::BrokenPipe,
642 "claude child spawned without piped stdout",
643 ),
644 })?;
645 let stderr_thread = child.stderr.take().map(|mut s| {
646 thread::spawn(move || {
647 let mut buf = Vec::new();
648 let _ = s.read_to_end(&mut buf);
649 buf
650 })
651 });
652 let parser = StreamParser::with_mode(BufReader::new(stdout), capture_path, mode)?;
653 Ok(Self {
654 parser,
655 child: Arc::new(Mutex::new(Some(child))),
656 stderr_thread,
657 })
658 }
659
660 pub fn killer(&self) -> ChildKiller {
662 ChildKiller {
663 child: Arc::clone(&self.child),
664 }
665 }
666
667 pub fn finish(mut self) -> Result<(ExitStatus, String), Error> {
675 for item in self.parser.by_ref() {
678 item?;
679 }
680 self.parser.flush_capture()?;
681
682 let child_opt = {
683 let mut guard = match self.child.lock() {
684 Ok(g) => g,
685 Err(poisoned) => poisoned.into_inner(),
686 };
687 guard.take()
688 };
689 let mut child = child_opt.ok_or_else(|| Error::Io {
690 source: std::io::Error::new(
691 std::io::ErrorKind::BrokenPipe,
692 "ClaudeProcess::finish called with no live child — \
693 finish already called or kill-drop left the Option None",
694 ),
695 })?;
696 let status = child.wait().map_err(|source| Error::Io { source })?;
697 let stderr_bytes = self
698 .stderr_thread
699 .take()
700 .map(|h| h.join().unwrap_or_default())
701 .unwrap_or_default();
702 let stderr = String::from_utf8_lossy(&stderr_bytes).into_owned();
703 Ok((status, stderr))
704 }
705}
706
707impl Iterator for ClaudeProcess {
708 type Item = Result<AssistantLine, Error>;
709
710 fn next(&mut self) -> Option<Self::Item> {
711 self.parser.next()
712 }
713}
714
715#[cfg(test)]
716mod tests {
717 use super::*;
718 use std::io::Cursor;
719 use tempfile::TempDir;
720
721 fn collect_args(cmd: &Command) -> Vec<String> {
724 cmd.get_args()
725 .map(|a| a.to_string_lossy().into_owned())
726 .collect()
727 }
728
729 #[test]
730 fn build_command_sets_required_streaming_flags() {
731 let opts = SpawnOpts::new("go", "/tmp");
732 let cmd = build_command(&opts);
733 let args = collect_args(&cmd);
734 assert!(args
735 .windows(4)
736 .any(|w| w == ["-p", "--output-format", "stream-json", "--verbose"]));
737 assert_eq!(args.last().unwrap(), "go");
738 }
739
740 #[test]
741 fn build_command_omits_unset_model_and_tools() {
742 let args = collect_args(&build_command(&SpawnOpts::new("x", "/tmp")));
743 assert!(!args.iter().any(|a| a == "--model"));
744 assert!(!args.iter().any(|a| a == "--allowed-tools"));
745 assert!(!args.iter().any(|a| a == "--session-id"));
746 assert!(!args.iter().any(|a| a == "--resume"));
747 }
748
749 #[test]
750 fn build_command_forwards_env_vars() {
751 let mut opts = SpawnOpts::new("x", "/tmp");
752 opts.env_vars = vec![
753 ("OMNE_RUN_ID".into(), "feature-abc".into()),
754 ("OMNE_INPUT_FEATURE_NAME".into(), "hello".into()),
755 ];
756 let cmd = build_command(&opts);
757 let envs: std::collections::HashMap<_, _> = cmd
758 .get_envs()
759 .filter_map(|(k, v)| v.map(|vv| (k.to_os_string(), vv.to_os_string())))
760 .collect();
761 assert_eq!(
762 envs.get(std::ffi::OsStr::new("OMNE_RUN_ID"))
763 .map(|v| v.to_string_lossy().into_owned()),
764 Some("feature-abc".to_string())
765 );
766 assert_eq!(
767 envs.get(std::ffi::OsStr::new("OMNE_INPUT_FEATURE_NAME"))
768 .map(|v| v.to_string_lossy().into_owned()),
769 Some("hello".to_string())
770 );
771 }
772
773 #[test]
774 fn build_command_with_empty_env_vars_adds_none() {
775 let cmd = build_command(&SpawnOpts::new("x", "/tmp"));
778 let has_omne = cmd
779 .get_envs()
780 .any(|(k, _)| k.to_string_lossy().starts_with("OMNE_"));
781 assert!(!has_omne, "unexpected OMNE_* env on bare SpawnOpts");
782 }
783
784 #[test]
785 fn build_command_joins_allowed_tools_with_commas() {
786 let mut opts = SpawnOpts::new("x", "/tmp");
787 opts.allowed_tools = vec!["Read".into(), "Bash".into()];
788 let args = collect_args(&build_command(&opts));
789 let tools_idx = args.iter().position(|a| a == "--allowed-tools").unwrap();
790 assert_eq!(args[tools_idx + 1], "Read,Bash");
791 }
792
793 #[test]
794 fn build_command_uses_session_id_for_new_session() {
795 let mut opts = SpawnOpts::new("x", "/tmp");
796 opts.session = Some(Session::New("abc-123".into()));
797 let args = collect_args(&build_command(&opts));
798 let idx = args.iter().position(|a| a == "--session-id").unwrap();
799 assert_eq!(args[idx + 1], "abc-123");
800 assert!(!args.iter().any(|a| a == "--resume"));
801 }
802
803 #[test]
804 fn build_command_uses_resume_for_continued_session() {
805 let mut opts = SpawnOpts::new("x", "/tmp");
806 opts.session = Some(Session::Resume("abc-123".into()));
807 let args = collect_args(&build_command(&opts));
808 let idx = args.iter().position(|a| a == "--resume").unwrap();
809 assert_eq!(args[idx + 1], "abc-123");
810 assert!(!args.iter().any(|a| a == "--session-id"));
811 }
812
813 #[test]
814 fn build_command_puts_prompt_last_even_with_extra_args() {
815 let mut opts = SpawnOpts::new("go do it", "/tmp");
816 opts.extra_args = vec!["--dangerously-skip-permissions".into()];
817 let args = collect_args(&build_command(&opts));
818 assert_eq!(args.last().unwrap(), "go do it");
819 let danger_idx = args
820 .iter()
821 .position(|a| a == "--dangerously-skip-permissions")
822 .unwrap();
823 let prompt_idx = args.iter().position(|a| a == "go do it").unwrap();
824 assert!(danger_idx < prompt_idx, "extra args precede the prompt");
825 }
826
827 fn asst(id: &str, text: &str) -> String {
830 serde_json::json!({
831 "type": "assistant",
832 "message": {
833 "id": id,
834 "content": [ { "type": "text", "text": text } ]
835 }
836 })
837 .to_string()
838 }
839
840 fn asst_multi(id: &str, texts: &[&str]) -> String {
841 let content: Vec<_> = texts
842 .iter()
843 .map(|t| serde_json::json!({ "type": "text", "text": t }))
844 .collect();
845 serde_json::json!({
846 "type": "assistant",
847 "message": { "id": id, "content": content }
848 })
849 .to_string()
850 }
851
852 fn run_parser(lines: &[String], capture: &Path) -> Vec<AssistantLine> {
853 let body = lines.join("\n") + "\n";
854 let parser = StreamParser::new(Cursor::new(body.into_bytes()), capture).expect("open");
855 parser.map(|r| r.expect("parse ok")).collect()
856 }
857
858 #[test]
859 fn parses_three_messages_totaling_five_lines() {
860 let tmp = TempDir::new().unwrap();
861 let cap = tmp.path().join("node.out");
862 let lines = vec![
863 asst("m1", "first\nsecond"),
864 asst("m2", "third"),
865 asst("m3", "fourth\nfifth"),
866 ];
867 let yielded = run_parser(&lines, &cap);
868 assert_eq!(
869 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
870 vec!["first", "second", "third", "fourth", "fifth"]
871 );
872 assert_eq!(yielded[0].message_id, "m1");
873 assert_eq!(yielded[4].message_id, "m3");
874 }
875
876 #[test]
877 fn joins_multiple_text_blocks_within_one_message() {
878 let tmp = TempDir::new().unwrap();
879 let cap = tmp.path().join("node.out");
880 let line = asst_multi(
883 "m1",
884 &["first part ", "of line\nsecond ", "line split ", "too"],
885 );
886 let yielded = run_parser(&[line], &cap);
887 assert_eq!(
888 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
889 vec!["first part of line", "second line split too"]
890 );
891 }
892
893 #[test]
894 fn joins_partial_message_across_envelopes_sharing_id() {
895 let tmp = TempDir::new().unwrap();
898 let cap = tmp.path().join("node.out");
899 let lines = vec![asst("msame", "hello "), asst("msame", "world\ngoodbye")];
900 let yielded = run_parser(&lines, &cap);
901 assert_eq!(
902 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
903 vec!["hello world", "goodbye"]
904 );
905 }
906
907 #[test]
908 fn skips_malformed_stream_json_line_and_continues() {
909 let tmp = TempDir::new().unwrap();
910 let cap = tmp.path().join("node.out");
911 let lines = vec![
912 asst("m1", "ok"),
913 "{not valid json".to_string(),
914 asst("m2", "still ok"),
915 ];
916 let yielded = run_parser(&lines, &cap);
917 assert_eq!(
918 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
919 vec!["ok", "still ok"]
920 );
921 }
922
923 #[test]
924 fn skips_unknown_top_level_type() {
925 let tmp = TempDir::new().unwrap();
926 let cap = tmp.path().join("node.out");
927 let lines = vec![
928 serde_json::json!({ "type": "thinking", "content": "pondering" }).to_string(),
929 asst("m1", "visible"),
930 ];
931 let yielded = run_parser(&lines, &cap);
932 assert_eq!(
933 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
934 vec!["visible"]
935 );
936 }
937
938 #[test]
939 fn skips_system_hook_response_envelope() {
940 let tmp = TempDir::new().unwrap();
942 let cap = tmp.path().join("node.out");
943 let lines = vec![
944 serde_json::json!({
945 "type": "system",
946 "subtype": "hook_response",
947 "hook_name": "SessionStart",
948 "output": "CAVEMAN MODE ACTIVE"
949 })
950 .to_string(),
951 asst("m1", "real content"),
952 ];
953 let yielded = run_parser(&lines, &cap);
954 assert_eq!(
955 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
956 vec!["real content"]
957 );
958 }
959
960 #[test]
961 fn skips_non_text_blocks_inside_assistant_message() {
962 let tmp = TempDir::new().unwrap();
963 let cap = tmp.path().join("node.out");
964 let line = serde_json::json!({
967 "type": "assistant",
968 "message": {
969 "id": "m1",
970 "content": [
971 { "type": "thinking", "thinking": "...internal..." },
972 { "type": "text", "text": "visible line" },
973 { "type": "tool_use", "name": "Read", "input": {} }
974 ]
975 }
976 })
977 .to_string();
978 let yielded = run_parser(&[line], &cap);
979 assert_eq!(
980 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
981 vec!["visible line"]
982 );
983 }
984
985 #[test]
986 fn trims_whitespace_around_reconstructed_lines() {
987 let tmp = TempDir::new().unwrap();
988 let cap = tmp.path().join("node.out");
989 let line = asst("m1", " BLOCKED \n trailing\n");
990 let yielded = run_parser(&[line], &cap);
991 assert_eq!(
992 yielded.iter().map(|l| &l.text).collect::<Vec<_>>(),
993 vec!["BLOCKED", "trailing"]
994 );
995 }
996
997 #[test]
998 fn capture_file_mirrors_concatenated_assistant_text() {
999 let tmp = TempDir::new().unwrap();
1000 let cap = tmp.path().join("deep").join("node.out");
1001 let lines = vec![
1002 asst("m1", "alpha\n"),
1003 asst_multi("m2", ["beta ", "gamma"].as_ref()),
1004 ];
1005 let _ = run_parser(&lines, &cap);
1006 let mut written = String::new();
1007 File::open(&cap)
1008 .unwrap()
1009 .read_to_string(&mut written)
1010 .unwrap();
1011 assert_eq!(written, "alpha\nbeta gamma");
1012 }
1013
1014 #[test]
1015 fn empty_stream_yields_no_lines_and_creates_empty_capture() {
1016 let tmp = TempDir::new().unwrap();
1017 let cap = tmp.path().join("node.out");
1018 let parser = StreamParser::new(Cursor::new(Vec::<u8>::new()), &cap).unwrap();
1019 assert_eq!(parser.count(), 0);
1020 assert!(cap.exists());
1021 assert_eq!(std::fs::metadata(&cap).unwrap().len(), 0);
1022 }
1023
1024 #[test]
1025 fn preflight_missing_binary_is_host_missing() {
1026 let err = preflight(Some(Path::new("/definitely/not/a/binary/omne-xyz"))).unwrap_err();
1028 assert!(
1029 matches!(err, Error::HostMissing),
1030 "expected HostMissing, got {err:?}"
1031 );
1032 }
1033}