zag_agent/streaming.rs
1//! Streaming session for programmatic stdin/stdout interaction with agents.
2//!
3//! A `StreamingSession` wraps a running agent subprocess with piped stdin and
4//! stdout, allowing callers to send NDJSON messages to the agent and read
5//! unified events back.
6//!
7//! # Event lifecycle
8//!
9//! In bidirectional streaming mode (Claude only), [`StreamingSession::next_event`]
10//! yields unified [`Event`](crate::output::Event) values converted from Claude's
11//! native `stream-json` output. At the end of every agent turn the session
12//! emits a [`Event::TurnComplete`](crate::output::Event::TurnComplete) with
13//! the provider's `stop_reason`, a zero-based `turn_index`, and the turn's
14//! `usage`, followed immediately by a per-turn
15//! [`Event::Result`](crate::output::Event::Result). After that pair the
16//! session remains open and accepts another
17//! [`StreamingSession::send_user_message`] for the next turn. `next_event`
18//! returns `Ok(None)` only when the subprocess exits (e.g. after
19//! [`StreamingSession::close_input`] and EOF).
20//!
21//! New consumers should use `TurnComplete` as the authoritative
22//! turn-boundary signal. `Result` continues to fire per-turn for backward
23//! compatibility, but `TurnComplete` is what carries `stop_reason` and
24//! `turn_index`. Do **not** rely on replayed `user_message` events as a
25//! turn delimiter; those only appear when `--replay-user-messages` is set
26//! and only fire *after* the next user message is sent.
27//!
28//! # Mid-turn input semantics
29//!
30//! `send_user_message` writes a user message to the agent's stdin. What the
31//! agent does when the message arrives *while it is still producing a response
32//! on the current turn* is provider-specific. Callers that need to reason about
33//! mid-turn behavior should branch on
34//! `ProviderCapability::features.streaming_input.semantics`, which is one of:
35//!
36//! - `"queue"` — the message is buffered and delivered at the next turn
37//! boundary. The current turn runs to completion; the new message becomes
38//! the next user turn. **Currently Claude.**
39//! - `"interrupt"` — the message cancels the current turn and starts a new one
40//! with the new input.
41//! - `"between-turns-only"` — mid-turn sends are an error or no-op; callers
42//! must wait for the current turn to finish before sending.
43//!
44//! Providers with `streaming_input.supported == false` (codex, gemini, copilot,
45//! ollama) do not expose a `StreamingSession` at all — `exec_streaming` is
46//! unavailable for them.
47//!
48//! # Examples
49//!
50//! ```no_run
51//! use zag_agent::builder::AgentBuilder;
52//! use zag_agent::output::Event;
53//!
54//! # async fn example() -> anyhow::Result<()> {
55//! let mut session = AgentBuilder::new()
56//! .provider("claude")
57//! .exec_streaming("initial prompt")
58//! .await?;
59//!
60//! // First turn: drain events until TurnComplete.
61//! while let Some(event) = session.next_event().await? {
62//! println!("{:?}", event);
63//! if matches!(event, Event::TurnComplete { .. }) {
64//! break; // turn complete — the per-turn Result follows next
65//! }
66//! }
67//!
68//! // Send a follow-up user message for the next turn.
69//! session.send_user_message("do something else").await?;
70//!
71//! // Drain the second turn, then close the session.
72//! while let Some(event) = session.next_event().await? {
73//! if matches!(event, Event::TurnComplete { .. }) {
74//! break;
75//! }
76//! }
77//!
78//! session.close_input();
79//! session.wait().await?;
80//! # Ok(())
81//! # }
82//! ```
83
84use crate::output::Event;
85use anyhow::{Result, bail};
86use serde_json;
87use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
88use tokio::process::{Child, ChildStdin, ChildStdout};
89
90/// A live streaming session connected to an agent subprocess.
91///
92/// stdin is piped for sending NDJSON messages, stdout is piped for reading
93/// NDJSON events. The session owns the child process.
94pub struct StreamingSession {
95 child: Child,
96 stdin: Option<ChildStdin>,
97 lines: Lines<BufReader<ChildStdout>>,
98 /// Stateful per-event translator. Required because some unified
99 /// events (e.g. `TurnComplete`) are synthesized from data carried on
100 /// earlier events in the same turn.
101 translator: crate::providers::claude::ClaudeEventTranslator,
102 /// FIFO buffer of unified events produced by the translator but not
103 /// yet returned to the caller. A single Claude `Result` expands into
104 /// `[TurnComplete, Result]`, so we need room for at least two.
105 pending: std::collections::VecDeque<Event>,
106}
107
108impl StreamingSession {
109 /// Create a new `StreamingSession` from a spawned child process.
110 ///
111 /// The child must have been spawned with piped stdin and stdout.
112 pub(crate) fn new(mut child: Child) -> Result<Self> {
113 let stdout = child
114 .stdout
115 .take()
116 .ok_or_else(|| anyhow::anyhow!("Child process stdout not piped"))?;
117 let stdin = child.stdin.take();
118 let reader = BufReader::new(stdout);
119 let lines = reader.lines();
120
121 Ok(Self {
122 child,
123 stdin,
124 lines,
125 translator: crate::providers::claude::ClaudeEventTranslator::new(),
126 pending: std::collections::VecDeque::new(),
127 })
128 }
129
130 /// Send a raw NDJSON line to the agent's stdin.
131 ///
132 /// The message should be a single JSON object (no trailing newline needed).
133 pub async fn send(&mut self, message: &str) -> Result<()> {
134 let stdin = self
135 .stdin
136 .as_mut()
137 .ok_or_else(|| anyhow::anyhow!("stdin already closed"))?;
138 stdin.write_all(message.as_bytes()).await?;
139 stdin.write_all(b"\n").await?;
140 stdin.flush().await?;
141 Ok(())
142 }
143
144 /// Send a user message to the agent.
145 ///
146 /// Formats the content as a `{"type":"user_message","content":"..."}` NDJSON line.
147 ///
148 /// # Mid-turn semantics
149 ///
150 /// The effect of calling this while the agent is still producing a
151 /// response on the current turn is provider-specific. Check
152 /// `ProviderCapability::features.streaming_input.semantics` at runtime
153 /// to branch on behavior. The possible values are:
154 ///
155 /// - `"queue"` — buffered and delivered at the next turn boundary; the
156 /// current turn runs to completion. **This is Claude's behavior**, which
157 /// is the only provider currently exposing a `StreamingSession`.
158 /// - `"interrupt"` — cancels the current turn and starts a new one with
159 /// the new input.
160 /// - `"between-turns-only"` — mid-turn sends are an error or no-op; wait
161 /// for the current turn to finish before sending.
162 ///
163 /// See the module-level documentation for the full matrix.
164 pub async fn send_user_message(&mut self, content: &str) -> Result<()> {
165 let msg = serde_json::json!({
166 "type": "user_message",
167 "content": content,
168 });
169 self.send(&serde_json::to_string(&msg)?).await
170 }
171
172 /// Read the next unified event from the agent's stdout.
173 ///
174 /// Lines are parsed as Claude's native `stream-json` schema and then
175 /// converted into the unified [`Event`] enum. Events that don't map to a
176 /// user-visible unified event (e.g. `thinking` blocks) are skipped
177 /// transparently, as are blank and unparseable lines.
178 ///
179 /// At the end of each agent turn the session emits a
180 /// [`Event::TurnComplete`](crate::output::Event::TurnComplete) followed
181 /// immediately by a per-turn [`Event::Result`](crate::output::Event::Result);
182 /// prefer `TurnComplete` as the turn boundary in new code. `Ok(None)`
183 /// is returned only when the subprocess closes its stdout (EOF).
184 pub async fn next_event(&mut self) -> Result<Option<Event>> {
185 use crate::providers::claude::models::ClaudeEvent;
186
187 loop {
188 if let Some(event) = self.pending.pop_front() {
189 return Ok(Some(event));
190 }
191
192 match self.lines.next_line().await? {
193 None => return Ok(None),
194 Some(line) => {
195 let trimmed = line.trim();
196 if trimmed.is_empty() {
197 continue;
198 }
199 match serde_json::from_str::<ClaudeEvent>(trimmed) {
200 Ok(claude_event) => {
201 for event in self.translator.translate(&claude_event) {
202 self.pending.push_back(event);
203 }
204 // Next loop iteration will drain `pending` (or
205 // read another line if the translator emitted
206 // nothing for this event).
207 continue;
208 }
209 Err(e) => {
210 log::debug!(
211 "Skipping unparseable streaming event: {}. Line: {}",
212 e,
213 crate::truncate_str(trimmed, 200)
214 );
215 continue;
216 }
217 }
218 }
219 }
220 }
221 }
222
223 /// Close the stdin pipe, signaling no more input to the agent.
224 pub fn close_input(&mut self) {
225 self.stdin.take();
226 }
227
228 /// Wait for the agent process to exit.
229 ///
230 /// Consumes the session. Returns an error if the process exits with a
231 /// non-zero status.
232 pub async fn wait(mut self) -> Result<()> {
233 // Drop stdin to ensure the agent sees EOF
234 self.stdin.take();
235
236 let stderr_handle = self.child.stderr.take();
237 let status = self.child.wait().await?;
238
239 let stderr_text = if let Some(stderr) = stderr_handle {
240 let mut buf = Vec::new();
241 let mut reader = tokio::io::BufReader::new(stderr);
242 let _ = tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut buf).await;
243 String::from_utf8_lossy(&buf).trim().to_string()
244 } else {
245 String::new()
246 };
247
248 crate::process::log_stderr_text(&stderr_text);
249
250 if !status.success() {
251 if stderr_text.is_empty() {
252 bail!("Agent process failed with status: {}", status);
253 } else {
254 bail!("{}", stderr_text);
255 }
256 }
257
258 Ok(())
259 }
260}
261
262#[cfg(test)]
263#[path = "streaming_tests.rs"]
264mod tests;