Skip to main content

zag_agent/
streaming.rs

1//! Streaming session for programmatic stdin/stdout interaction with agents.
2//!
3//! A `StreamingSession` wraps a running agent subprocess with piped stdin and
4//! stdout, allowing callers to send NDJSON messages to the agent and read
5//! unified events back.
6//!
7//! # Event lifecycle
8//!
9//! In bidirectional streaming mode (Claude only), [`StreamingSession::next_event`]
10//! yields unified [`Event`](crate::output::Event) values converted from Claude's
11//! native `stream-json` output. A [`Event::Result`](crate::output::Event::Result)
12//! is emitted at the **end of every agent turn** — not only at final session
13//! end. After a `Result`, the session remains open and accepts another
14//! [`StreamingSession::send_user_message`] for the next turn. `next_event`
15//! returns `Ok(None)` only when the subprocess exits (e.g. after
16//! [`StreamingSession::close_input`] and EOF).
17//!
18//! Consumers should use the `Result` event as the authoritative turn-boundary
19//! signal. Do **not** rely on replayed `user_message` events for this purpose;
20//! those only appear when `--replay-user-messages` is set.
21//!
22//! # Mid-turn input semantics
23//!
24//! `send_user_message` writes a user message to the agent's stdin. What the
25//! agent does when the message arrives *while it is still producing a response
26//! on the current turn* is provider-specific. Callers that need to reason about
27//! mid-turn behavior should branch on
28//! `ProviderCapability::features.streaming_input.semantics`, which is one of:
29//!
30//! - `"queue"` — the message is buffered and delivered at the next turn
31//!   boundary. The current turn runs to completion; the new message becomes
32//!   the next user turn. **Currently Claude.**
33//! - `"interrupt"` — the message cancels the current turn and starts a new one
34//!   with the new input.
35//! - `"between-turns-only"` — mid-turn sends are an error or no-op; callers
36//!   must wait for the current turn to finish before sending.
37//!
38//! Providers with `streaming_input.supported == false` (codex, gemini, copilot,
39//! ollama) do not expose a `StreamingSession` at all — `exec_streaming` is
40//! unavailable for them.
41//!
42//! # Examples
43//!
44//! ```no_run
45//! use zag_agent::builder::AgentBuilder;
46//! use zag_agent::output::Event;
47//!
48//! # async fn example() -> anyhow::Result<()> {
49//! let mut session = AgentBuilder::new()
50//!     .provider("claude")
51//!     .exec_streaming("initial prompt")
52//!     .await?;
53//!
54//! // First turn: drain events until the per-turn Result.
55//! while let Some(event) = session.next_event().await? {
56//!     println!("{:?}", event);
57//!     if matches!(event, Event::Result { .. }) {
58//!         break; // turn complete
59//!     }
60//! }
61//!
62//! // Send a follow-up user message for the next turn.
63//! session.send_user_message("do something else").await?;
64//!
65//! // Drain the second turn, then close the session.
66//! while let Some(event) = session.next_event().await? {
67//!     if matches!(event, Event::Result { .. }) {
68//!         break;
69//!     }
70//! }
71//!
72//! session.close_input();
73//! session.wait().await?;
74//! # Ok(())
75//! # }
76//! ```
77
78use crate::output::Event;
79use anyhow::{Result, bail};
80use serde_json;
81use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
82use tokio::process::{Child, ChildStdin, ChildStdout};
83
84/// A live streaming session connected to an agent subprocess.
85///
86/// stdin is piped for sending NDJSON messages, stdout is piped for reading
87/// NDJSON events. The session owns the child process.
88pub struct StreamingSession {
89    child: Child,
90    stdin: Option<ChildStdin>,
91    lines: Lines<BufReader<ChildStdout>>,
92}
93
94impl StreamingSession {
95    /// Create a new `StreamingSession` from a spawned child process.
96    ///
97    /// The child must have been spawned with piped stdin and stdout.
98    pub(crate) fn new(mut child: Child) -> Result<Self> {
99        let stdout = child
100            .stdout
101            .take()
102            .ok_or_else(|| anyhow::anyhow!("Child process stdout not piped"))?;
103        let stdin = child.stdin.take();
104        let reader = BufReader::new(stdout);
105        let lines = reader.lines();
106
107        Ok(Self {
108            child,
109            stdin,
110            lines,
111        })
112    }
113
114    /// Send a raw NDJSON line to the agent's stdin.
115    ///
116    /// The message should be a single JSON object (no trailing newline needed).
117    pub async fn send(&mut self, message: &str) -> Result<()> {
118        let stdin = self
119            .stdin
120            .as_mut()
121            .ok_or_else(|| anyhow::anyhow!("stdin already closed"))?;
122        stdin.write_all(message.as_bytes()).await?;
123        stdin.write_all(b"\n").await?;
124        stdin.flush().await?;
125        Ok(())
126    }
127
128    /// Send a user message to the agent.
129    ///
130    /// Formats the content as a `{"type":"user_message","content":"..."}` NDJSON line.
131    ///
132    /// # Mid-turn semantics
133    ///
134    /// The effect of calling this while the agent is still producing a
135    /// response on the current turn is provider-specific. Check
136    /// `ProviderCapability::features.streaming_input.semantics` at runtime
137    /// to branch on behavior. The possible values are:
138    ///
139    /// - `"queue"` — buffered and delivered at the next turn boundary; the
140    ///   current turn runs to completion. **This is Claude's behavior**, which
141    ///   is the only provider currently exposing a `StreamingSession`.
142    /// - `"interrupt"` — cancels the current turn and starts a new one with
143    ///   the new input.
144    /// - `"between-turns-only"` — mid-turn sends are an error or no-op; wait
145    ///   for the current turn to finish before sending.
146    ///
147    /// See the module-level documentation for the full matrix.
148    pub async fn send_user_message(&mut self, content: &str) -> Result<()> {
149        let msg = serde_json::json!({
150            "type": "user_message",
151            "content": content,
152        });
153        self.send(&serde_json::to_string(&msg)?).await
154    }
155
156    /// Read the next unified event from the agent's stdout.
157    ///
158    /// Lines are parsed as Claude's native `stream-json` schema and then
159    /// converted into the unified [`Event`] enum. Events that don't map to a
160    /// user-visible unified event (e.g. `thinking` blocks) are skipped
161    /// transparently, as are blank and unparseable lines.
162    ///
163    /// A unified `Result` event is returned at the end of each agent turn;
164    /// callers can use it as a turn boundary. `Ok(None)` is returned only
165    /// when the subprocess closes its stdout (EOF).
166    pub async fn next_event(&mut self) -> Result<Option<Event>> {
167        use crate::providers::claude::{convert_claude_event_to_unified, models::ClaudeEvent};
168
169        loop {
170            match self.lines.next_line().await? {
171                None => return Ok(None),
172                Some(line) => {
173                    let trimmed = line.trim();
174                    if trimmed.is_empty() {
175                        continue;
176                    }
177                    match serde_json::from_str::<ClaudeEvent>(trimmed) {
178                        Ok(claude_event) => {
179                            if let Some(event) = convert_claude_event_to_unified(&claude_event) {
180                                return Ok(Some(event));
181                            }
182                            // Converter filtered this event (e.g. thinking block
183                            // or ClaudeEvent::Other); read the next line.
184                            continue;
185                        }
186                        Err(e) => {
187                            log::debug!(
188                                "Skipping unparseable streaming event: {}. Line: {}",
189                                e,
190                                crate::truncate_str(trimmed, 200)
191                            );
192                            continue;
193                        }
194                    }
195                }
196            }
197        }
198    }
199
200    /// Close the stdin pipe, signaling no more input to the agent.
201    pub fn close_input(&mut self) {
202        self.stdin.take();
203    }
204
205    /// Wait for the agent process to exit.
206    ///
207    /// Consumes the session. Returns an error if the process exits with a
208    /// non-zero status.
209    pub async fn wait(mut self) -> Result<()> {
210        // Drop stdin to ensure the agent sees EOF
211        self.stdin.take();
212
213        let stderr_handle = self.child.stderr.take();
214        let status = self.child.wait().await?;
215
216        let stderr_text = if let Some(stderr) = stderr_handle {
217            let mut buf = Vec::new();
218            let mut reader = tokio::io::BufReader::new(stderr);
219            let _ = tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut buf).await;
220            String::from_utf8_lossy(&buf).trim().to_string()
221        } else {
222            String::new()
223        };
224
225        crate::process::log_stderr_text(&stderr_text);
226
227        if !status.success() {
228            if stderr_text.is_empty() {
229                bail!("Agent process failed with status: {}", status);
230            } else {
231                bail!("{}", stderr_text);
232            }
233        }
234
235        Ok(())
236    }
237}
238
239#[cfg(test)]
240#[path = "streaming_tests.rs"]
241mod tests;