zag_agent/streaming.rs
1//! Streaming session for programmatic stdin/stdout interaction with agents.
2//!
3//! A `StreamingSession` wraps a running agent subprocess with piped stdin and
4//! stdout, allowing callers to send NDJSON messages to the agent and read
5//! unified events back.
6//!
7//! # Event lifecycle
8//!
9//! In bidirectional streaming mode (Claude only), [`StreamingSession::next_event`]
10//! yields unified [`Event`](crate::output::Event) values converted from Claude's
11//! native `stream-json` output. A [`Event::Result`](crate::output::Event::Result)
12//! is emitted at the **end of every agent turn** — not only at final session
13//! end. After a `Result`, the session remains open and accepts another
14//! [`StreamingSession::send_user_message`] for the next turn. `next_event`
15//! returns `Ok(None)` only when the subprocess exits (e.g. after
16//! [`StreamingSession::close_input`] and EOF).
17//!
18//! Consumers should use the `Result` event as the authoritative turn-boundary
19//! signal. Do **not** rely on replayed `user_message` events for this purpose;
20//! those only appear when `--replay-user-messages` is set.
21//!
22//! # Mid-turn input semantics
23//!
24//! `send_user_message` writes a user message to the agent's stdin. What the
25//! agent does when the message arrives *while it is still producing a response
26//! on the current turn* is provider-specific. Callers that need to reason about
27//! mid-turn behavior should branch on
28//! `ProviderCapability::features.streaming_input.semantics`, which is one of:
29//!
30//! - `"queue"` — the message is buffered and delivered at the next turn
31//! boundary. The current turn runs to completion; the new message becomes
32//! the next user turn. **Currently Claude.**
33//! - `"interrupt"` — the message cancels the current turn and starts a new one
34//! with the new input.
35//! - `"between-turns-only"` — mid-turn sends are an error or no-op; callers
36//! must wait for the current turn to finish before sending.
37//!
38//! Providers with `streaming_input.supported == false` (codex, gemini, copilot,
39//! ollama) do not expose a `StreamingSession` at all — `exec_streaming` is
40//! unavailable for them.
41//!
42//! # Examples
43//!
44//! ```no_run
45//! use zag_agent::builder::AgentBuilder;
46//! use zag_agent::output::Event;
47//!
48//! # async fn example() -> anyhow::Result<()> {
49//! let mut session = AgentBuilder::new()
50//! .provider("claude")
51//! .exec_streaming("initial prompt")
52//! .await?;
53//!
54//! // First turn: drain events until the per-turn Result.
55//! while let Some(event) = session.next_event().await? {
56//! println!("{:?}", event);
57//! if matches!(event, Event::Result { .. }) {
58//! break; // turn complete
59//! }
60//! }
61//!
62//! // Send a follow-up user message for the next turn.
63//! session.send_user_message("do something else").await?;
64//!
65//! // Drain the second turn, then close the session.
66//! while let Some(event) = session.next_event().await? {
67//! if matches!(event, Event::Result { .. }) {
68//! break;
69//! }
70//! }
71//!
72//! session.close_input();
73//! session.wait().await?;
74//! # Ok(())
75//! # }
76//! ```
77
78use crate::output::Event;
79use anyhow::{Result, bail};
80use serde_json;
81use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
82use tokio::process::{Child, ChildStdin, ChildStdout};
83
84/// A live streaming session connected to an agent subprocess.
85///
86/// stdin is piped for sending NDJSON messages, stdout is piped for reading
87/// NDJSON events. The session owns the child process.
88pub struct StreamingSession {
89 child: Child,
90 stdin: Option<ChildStdin>,
91 lines: Lines<BufReader<ChildStdout>>,
92}
93
94impl StreamingSession {
95 /// Create a new `StreamingSession` from a spawned child process.
96 ///
97 /// The child must have been spawned with piped stdin and stdout.
98 pub(crate) fn new(mut child: Child) -> Result<Self> {
99 let stdout = child
100 .stdout
101 .take()
102 .ok_or_else(|| anyhow::anyhow!("Child process stdout not piped"))?;
103 let stdin = child.stdin.take();
104 let reader = BufReader::new(stdout);
105 let lines = reader.lines();
106
107 Ok(Self {
108 child,
109 stdin,
110 lines,
111 })
112 }
113
114 /// Send a raw NDJSON line to the agent's stdin.
115 ///
116 /// The message should be a single JSON object (no trailing newline needed).
117 pub async fn send(&mut self, message: &str) -> Result<()> {
118 let stdin = self
119 .stdin
120 .as_mut()
121 .ok_or_else(|| anyhow::anyhow!("stdin already closed"))?;
122 stdin.write_all(message.as_bytes()).await?;
123 stdin.write_all(b"\n").await?;
124 stdin.flush().await?;
125 Ok(())
126 }
127
128 /// Send a user message to the agent.
129 ///
130 /// Formats the content as a `{"type":"user_message","content":"..."}` NDJSON line.
131 ///
132 /// # Mid-turn semantics
133 ///
134 /// The effect of calling this while the agent is still producing a
135 /// response on the current turn is provider-specific. Check
136 /// `ProviderCapability::features.streaming_input.semantics` at runtime
137 /// to branch on behavior. The possible values are:
138 ///
139 /// - `"queue"` — buffered and delivered at the next turn boundary; the
140 /// current turn runs to completion. **This is Claude's behavior**, which
141 /// is the only provider currently exposing a `StreamingSession`.
142 /// - `"interrupt"` — cancels the current turn and starts a new one with
143 /// the new input.
144 /// - `"between-turns-only"` — mid-turn sends are an error or no-op; wait
145 /// for the current turn to finish before sending.
146 ///
147 /// See the module-level documentation for the full matrix.
148 pub async fn send_user_message(&mut self, content: &str) -> Result<()> {
149 let msg = serde_json::json!({
150 "type": "user_message",
151 "content": content,
152 });
153 self.send(&serde_json::to_string(&msg)?).await
154 }
155
156 /// Read the next unified event from the agent's stdout.
157 ///
158 /// Lines are parsed as Claude's native `stream-json` schema and then
159 /// converted into the unified [`Event`] enum. Events that don't map to a
160 /// user-visible unified event (e.g. `thinking` blocks) are skipped
161 /// transparently, as are blank and unparseable lines.
162 ///
163 /// A unified `Result` event is returned at the end of each agent turn;
164 /// callers can use it as a turn boundary. `Ok(None)` is returned only
165 /// when the subprocess closes its stdout (EOF).
166 pub async fn next_event(&mut self) -> Result<Option<Event>> {
167 use crate::providers::claude::{convert_claude_event_to_unified, models::ClaudeEvent};
168
169 loop {
170 match self.lines.next_line().await? {
171 None => return Ok(None),
172 Some(line) => {
173 let trimmed = line.trim();
174 if trimmed.is_empty() {
175 continue;
176 }
177 match serde_json::from_str::<ClaudeEvent>(trimmed) {
178 Ok(claude_event) => {
179 if let Some(event) = convert_claude_event_to_unified(&claude_event) {
180 return Ok(Some(event));
181 }
182 // Converter filtered this event (e.g. thinking block
183 // or ClaudeEvent::Other); read the next line.
184 continue;
185 }
186 Err(e) => {
187 log::debug!(
188 "Skipping unparseable streaming event: {}. Line: {}",
189 e,
190 crate::truncate_str(trimmed, 200)
191 );
192 continue;
193 }
194 }
195 }
196 }
197 }
198 }
199
200 /// Close the stdin pipe, signaling no more input to the agent.
201 pub fn close_input(&mut self) {
202 self.stdin.take();
203 }
204
205 /// Wait for the agent process to exit.
206 ///
207 /// Consumes the session. Returns an error if the process exits with a
208 /// non-zero status.
209 pub async fn wait(mut self) -> Result<()> {
210 // Drop stdin to ensure the agent sees EOF
211 self.stdin.take();
212
213 let stderr_handle = self.child.stderr.take();
214 let status = self.child.wait().await?;
215
216 let stderr_text = if let Some(stderr) = stderr_handle {
217 let mut buf = Vec::new();
218 let mut reader = tokio::io::BufReader::new(stderr);
219 let _ = tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut buf).await;
220 String::from_utf8_lossy(&buf).trim().to_string()
221 } else {
222 String::new()
223 };
224
225 crate::process::log_stderr_text(&stderr_text);
226
227 if !status.success() {
228 if stderr_text.is_empty() {
229 bail!("Agent process failed with status: {}", status);
230 } else {
231 bail!("{}", stderr_text);
232 }
233 }
234
235 Ok(())
236 }
237}
238
239#[cfg(test)]
240#[path = "streaming_tests.rs"]
241mod tests;