zag_agent/streaming.rs
1//! Streaming session for programmatic stdin/stdout interaction with agents.
2//!
3//! A `StreamingSession` wraps a running agent subprocess with piped stdin and
4//! stdout, allowing callers to send NDJSON messages to the agent and read
5//! unified events back.
6//!
7//! # Examples
8//!
9//! ```no_run
10//! use zag_agent::builder::AgentBuilder;
11//!
12//! # async fn example() -> anyhow::Result<()> {
13//! let mut session = AgentBuilder::new()
14//! .provider("claude")
15//! .exec_streaming("initial prompt")
16//! .await?;
17//!
18//! // Send a user message
19//! session.send_user_message("do something").await?;
20//!
21//! // Read events
22//! while let Some(event) = session.next_event().await? {
23//! println!("{:?}", event);
24//! }
25//!
26//! session.wait().await?;
27//! # Ok(())
28//! # }
29//! ```
30
31use crate::output::Event;
32use anyhow::{Result, bail};
33use serde_json;
34use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
35use tokio::process::{Child, ChildStdin, ChildStdout};
36
37/// A live streaming session connected to an agent subprocess.
38///
39/// stdin is piped for sending NDJSON messages, stdout is piped for reading
40/// NDJSON events. The session owns the child process.
41pub struct StreamingSession {
42 child: Child,
43 stdin: Option<ChildStdin>,
44 lines: Lines<BufReader<ChildStdout>>,
45}
46
47impl StreamingSession {
48 /// Create a new `StreamingSession` from a spawned child process.
49 ///
50 /// The child must have been spawned with piped stdin and stdout.
51 pub(crate) fn new(mut child: Child) -> Result<Self> {
52 let stdout = child
53 .stdout
54 .take()
55 .ok_or_else(|| anyhow::anyhow!("Child process stdout not piped"))?;
56 let stdin = child.stdin.take();
57 let reader = BufReader::new(stdout);
58 let lines = reader.lines();
59
60 Ok(Self {
61 child,
62 stdin,
63 lines,
64 })
65 }
66
67 /// Send a raw NDJSON line to the agent's stdin.
68 ///
69 /// The message should be a single JSON object (no trailing newline needed).
70 pub async fn send(&mut self, message: &str) -> Result<()> {
71 let stdin = self
72 .stdin
73 .as_mut()
74 .ok_or_else(|| anyhow::anyhow!("stdin already closed"))?;
75 stdin.write_all(message.as_bytes()).await?;
76 stdin.write_all(b"\n").await?;
77 stdin.flush().await?;
78 Ok(())
79 }
80
81 /// Send a user message to the agent.
82 ///
83 /// Formats the content as a `{"type":"user_message","content":"..."}` NDJSON line.
84 pub async fn send_user_message(&mut self, content: &str) -> Result<()> {
85 let msg = serde_json::json!({
86 "type": "user_message",
87 "content": content,
88 });
89 self.send(&serde_json::to_string(&msg)?).await
90 }
91
92 /// Read the next event from the agent's stdout.
93 ///
94 /// Returns `None` when stdout is closed (agent exited).
95 /// Skips lines that fail to parse as JSON events.
96 pub async fn next_event(&mut self) -> Result<Option<Event>> {
97 loop {
98 match self.lines.next_line().await? {
99 None => return Ok(None),
100 Some(line) => {
101 let trimmed = line.trim();
102 if trimmed.is_empty() {
103 continue;
104 }
105 match serde_json::from_str::<Event>(trimmed) {
106 Ok(event) => return Ok(Some(event)),
107 Err(e) => {
108 log::debug!(
109 "Skipping unparseable streaming event: {}. Line: {}",
110 e,
111 crate::truncate_str(trimmed, 200)
112 );
113 continue;
114 }
115 }
116 }
117 }
118 }
119 }
120
121 /// Close the stdin pipe, signaling no more input to the agent.
122 pub fn close_input(&mut self) {
123 self.stdin.take();
124 }
125
126 /// Wait for the agent process to exit.
127 ///
128 /// Consumes the session. Returns an error if the process exits with a
129 /// non-zero status.
130 pub async fn wait(mut self) -> Result<()> {
131 // Drop stdin to ensure the agent sees EOF
132 self.stdin.take();
133
134 let stderr_handle = self.child.stderr.take();
135 let status = self.child.wait().await?;
136
137 let stderr_text = if let Some(stderr) = stderr_handle {
138 let mut buf = Vec::new();
139 let mut reader = tokio::io::BufReader::new(stderr);
140 let _ = tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut buf).await;
141 String::from_utf8_lossy(&buf).trim().to_string()
142 } else {
143 String::new()
144 };
145
146 crate::process::log_stderr_text(&stderr_text);
147
148 if !status.success() {
149 if stderr_text.is_empty() {
150 bail!("Agent process failed with status: {}", status);
151 } else {
152 bail!("{}", stderr_text);
153 }
154 }
155
156 Ok(())
157 }
158}
159
160#[cfg(test)]
161#[path = "streaming_tests.rs"]
162mod tests;