Skip to main content

zag_agent/
streaming.rs

1//! Streaming session for programmatic stdin/stdout interaction with agents.
2//!
3//! A `StreamingSession` wraps a running agent subprocess with piped stdin and
4//! stdout, allowing callers to send NDJSON messages to the agent and read
5//! unified events back.
6//!
7//! # Event lifecycle
8//!
9//! In bidirectional streaming mode (Claude only), [`StreamingSession::next_event`]
10//! yields unified [`Event`](crate::output::Event) values converted from Claude's
11//! native `stream-json` output. At the end of every agent turn the session
12//! emits a [`Event::TurnComplete`](crate::output::Event::TurnComplete) with
13//! the provider's `stop_reason`, a zero-based `turn_index`, and the turn's
14//! `usage`, followed immediately by a per-turn
15//! [`Event::Result`](crate::output::Event::Result). After that pair the
16//! session remains open and accepts another
17//! [`StreamingSession::send_user_message`] for the next turn. `next_event`
18//! returns `Ok(None)` only when the subprocess exits (e.g. after
19//! [`StreamingSession::close_input`] and EOF).
20//!
21//! New consumers should use `TurnComplete` as the authoritative
22//! turn-boundary signal. `Result` continues to fire per-turn for backward
23//! compatibility, but `TurnComplete` is what carries `stop_reason` and
24//! `turn_index`. Do **not** rely on replayed `user_message` events as a
25//! turn delimiter; those only appear when `--replay-user-messages` is set
26//! and only fire *after* the next user message is sent.
27//!
28//! # Mid-turn input semantics
29//!
30//! `send_user_message` writes a user message to the agent's stdin. What the
31//! agent does when the message arrives *while it is still producing a response
32//! on the current turn* is provider-specific. Callers that need to reason about
33//! mid-turn behavior should branch on
34//! `ProviderCapability::features.streaming_input.semantics`, which is one of:
35//!
36//! - `"queue"` — the message is buffered and delivered at the next turn
37//!   boundary. The current turn runs to completion; the new message becomes
38//!   the next user turn. **Currently Claude.**
39//! - `"interrupt"` — the message cancels the current turn and starts a new one
40//!   with the new input.
41//! - `"between-turns-only"` — mid-turn sends are an error or no-op; callers
42//!   must wait for the current turn to finish before sending.
43//!
44//! Providers with `streaming_input.supported == false` (codex, gemini, copilot,
45//! ollama) do not expose a `StreamingSession` at all — `exec_streaming` is
46//! unavailable for them.
47//!
48//! # Examples
49//!
50//! ```no_run
51//! use zag_agent::builder::AgentBuilder;
52//! use zag_agent::output::Event;
53//!
54//! # async fn example() -> anyhow::Result<()> {
55//! let mut session = AgentBuilder::new()
56//!     .provider("claude")
57//!     .exec_streaming("initial prompt")
58//!     .await?;
59//!
60//! // First turn: drain events until TurnComplete.
61//! while let Some(event) = session.next_event().await? {
62//!     println!("{:?}", event);
63//!     if matches!(event, Event::TurnComplete { .. }) {
64//!         break; // turn complete — the per-turn Result follows next
65//!     }
66//! }
67//!
68//! // Send a follow-up user message for the next turn.
69//! session.send_user_message("do something else").await?;
70//!
71//! // Drain the second turn, then close the session.
72//! while let Some(event) = session.next_event().await? {
73//!     if matches!(event, Event::TurnComplete { .. }) {
74//!         break;
75//!     }
76//! }
77//!
78//! session.close_input();
79//! session.wait().await?;
80//! # Ok(())
81//! # }
82//! ```
83
84use crate::output::Event;
85use anyhow::{Result, bail};
86use serde_json;
87use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
88use tokio::process::{Child, ChildStdin, ChildStdout};
89
90/// A live streaming session connected to an agent subprocess.
91///
92/// stdin is piped for sending NDJSON messages, stdout is piped for reading
93/// NDJSON events. The session owns the child process.
94pub struct StreamingSession {
95    child: Child,
96    stdin: Option<ChildStdin>,
97    lines: Lines<BufReader<ChildStdout>>,
98    /// Stateful per-event translator. Required because some unified
99    /// events (e.g. `TurnComplete`) are synthesized from data carried on
100    /// earlier events in the same turn.
101    translator: crate::providers::claude::ClaudeEventTranslator,
102    /// FIFO buffer of unified events produced by the translator but not
103    /// yet returned to the caller. A single Claude `Result` expands into
104    /// `[TurnComplete, Result]`, so we need room for at least two.
105    pending: std::collections::VecDeque<Event>,
106}
107
108impl StreamingSession {
109    /// Create a new `StreamingSession` from a spawned child process.
110    ///
111    /// The child must have been spawned with piped stdin and stdout.
112    pub(crate) fn new(mut child: Child) -> Result<Self> {
113        let stdout = child
114            .stdout
115            .take()
116            .ok_or_else(|| anyhow::anyhow!("Child process stdout not piped"))?;
117        let stdin = child.stdin.take();
118        let reader = BufReader::new(stdout);
119        let lines = reader.lines();
120
121        Ok(Self {
122            child,
123            stdin,
124            lines,
125            translator: crate::providers::claude::ClaudeEventTranslator::new(),
126            pending: std::collections::VecDeque::new(),
127        })
128    }
129
130    /// Send a raw NDJSON line to the agent's stdin.
131    ///
132    /// The message should be a single JSON object (no trailing newline needed).
133    pub async fn send(&mut self, message: &str) -> Result<()> {
134        let stdin = self
135            .stdin
136            .as_mut()
137            .ok_or_else(|| anyhow::anyhow!("stdin already closed"))?;
138        stdin.write_all(message.as_bytes()).await?;
139        stdin.write_all(b"\n").await?;
140        stdin.flush().await?;
141        Ok(())
142    }
143
144    /// Send a user message to the agent.
145    ///
146    /// Formats the content as a `{"type":"user_message","content":"..."}` NDJSON line.
147    ///
148    /// # Mid-turn semantics
149    ///
150    /// The effect of calling this while the agent is still producing a
151    /// response on the current turn is provider-specific. Check
152    /// `ProviderCapability::features.streaming_input.semantics` at runtime
153    /// to branch on behavior. The possible values are:
154    ///
155    /// - `"queue"` — buffered and delivered at the next turn boundary; the
156    ///   current turn runs to completion. **This is Claude's behavior**, which
157    ///   is the only provider currently exposing a `StreamingSession`.
158    /// - `"interrupt"` — cancels the current turn and starts a new one with
159    ///   the new input.
160    /// - `"between-turns-only"` — mid-turn sends are an error or no-op; wait
161    ///   for the current turn to finish before sending.
162    ///
163    /// See the module-level documentation for the full matrix.
164    pub async fn send_user_message(&mut self, content: &str) -> Result<()> {
165        let msg = serde_json::json!({
166            "type": "user_message",
167            "content": content,
168        });
169        self.send(&serde_json::to_string(&msg)?).await
170    }
171
172    /// Read the next unified event from the agent's stdout.
173    ///
174    /// Lines are parsed as Claude's native `stream-json` schema and then
175    /// converted into the unified [`Event`] enum. Events that don't map to a
176    /// user-visible unified event (e.g. `thinking` blocks) are skipped
177    /// transparently, as are blank and unparseable lines.
178    ///
179    /// At the end of each agent turn the session emits a
180    /// [`Event::TurnComplete`](crate::output::Event::TurnComplete) followed
181    /// immediately by a per-turn [`Event::Result`](crate::output::Event::Result);
182    /// prefer `TurnComplete` as the turn boundary in new code. `Ok(None)`
183    /// is returned only when the subprocess closes its stdout (EOF).
184    pub async fn next_event(&mut self) -> Result<Option<Event>> {
185        use crate::providers::claude::models::ClaudeEvent;
186
187        loop {
188            if let Some(event) = self.pending.pop_front() {
189                return Ok(Some(event));
190            }
191
192            match self.lines.next_line().await? {
193                None => return Ok(None),
194                Some(line) => {
195                    let trimmed = line.trim();
196                    if trimmed.is_empty() {
197                        continue;
198                    }
199                    match serde_json::from_str::<ClaudeEvent>(trimmed) {
200                        Ok(claude_event) => {
201                            for event in self.translator.translate(&claude_event) {
202                                self.pending.push_back(event);
203                            }
204                            // Next loop iteration will drain `pending` (or
205                            // read another line if the translator emitted
206                            // nothing for this event).
207                            continue;
208                        }
209                        Err(e) => {
210                            log::debug!(
211                                "Skipping unparseable streaming event: {}. Line: {}",
212                                e,
213                                crate::truncate_str(trimmed, 200)
214                            );
215                            continue;
216                        }
217                    }
218                }
219            }
220        }
221    }
222
223    /// Close the stdin pipe, signaling no more input to the agent.
224    pub fn close_input(&mut self) {
225        self.stdin.take();
226    }
227
228    /// Wait for the agent process to exit.
229    ///
230    /// Consumes the session. Returns an error if the process exits with a
231    /// non-zero status.
232    pub async fn wait(mut self) -> Result<()> {
233        // Drop stdin to ensure the agent sees EOF
234        self.stdin.take();
235
236        let stderr_handle = self.child.stderr.take();
237        let status = self.child.wait().await?;
238
239        let stderr_text = if let Some(stderr) = stderr_handle {
240            let mut buf = Vec::new();
241            let mut reader = tokio::io::BufReader::new(stderr);
242            let _ = tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut buf).await;
243            String::from_utf8_lossy(&buf).trim().to_string()
244        } else {
245            String::new()
246        };
247
248        crate::process::log_stderr_text(&stderr_text);
249
250        if !status.success() {
251            if stderr_text.is_empty() {
252                bail!("Agent process failed with status: {}", status);
253            } else {
254                bail!("{}", stderr_text);
255            }
256        }
257
258        Ok(())
259    }
260}
261
262#[cfg(test)]
263#[path = "streaming_tests.rs"]
264mod tests;