Skip to main content

zag_agent/
streaming.rs

1//! Streaming session for programmatic stdin/stdout interaction with agents.
2//!
3//! A `StreamingSession` wraps a running agent subprocess with piped stdin and
4//! stdout, allowing callers to send NDJSON messages to the agent and read
5//! unified events back.
6//!
7//! # Examples
8//!
9//! ```no_run
10//! use zag_agent::builder::AgentBuilder;
11//!
12//! # async fn example() -> anyhow::Result<()> {
13//! let mut session = AgentBuilder::new()
14//!     .provider("claude")
15//!     .exec_streaming("initial prompt")
16//!     .await?;
17//!
18//! // Send a user message
19//! session.send_user_message("do something").await?;
20//!
21//! // Read events
22//! while let Some(event) = session.next_event().await? {
23//!     println!("{:?}", event);
24//! }
25//!
26//! session.wait().await?;
27//! # Ok(())
28//! # }
29//! ```
30
31use crate::output::Event;
32use anyhow::{Result, bail};
33use serde_json;
34use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
35use tokio::process::{Child, ChildStdin, ChildStdout};
36
37/// A live streaming session connected to an agent subprocess.
38///
39/// stdin is piped for sending NDJSON messages, stdout is piped for reading
40/// NDJSON events. The session owns the child process.
41pub struct StreamingSession {
42    child: Child,
43    stdin: Option<ChildStdin>,
44    lines: Lines<BufReader<ChildStdout>>,
45}
46
47impl StreamingSession {
48    /// Create a new `StreamingSession` from a spawned child process.
49    ///
50    /// The child must have been spawned with piped stdin and stdout.
51    pub(crate) fn new(mut child: Child) -> Result<Self> {
52        let stdout = child
53            .stdout
54            .take()
55            .ok_or_else(|| anyhow::anyhow!("Child process stdout not piped"))?;
56        let stdin = child.stdin.take();
57        let reader = BufReader::new(stdout);
58        let lines = reader.lines();
59
60        Ok(Self {
61            child,
62            stdin,
63            lines,
64        })
65    }
66
67    /// Send a raw NDJSON line to the agent's stdin.
68    ///
69    /// The message should be a single JSON object (no trailing newline needed).
70    pub async fn send(&mut self, message: &str) -> Result<()> {
71        let stdin = self
72            .stdin
73            .as_mut()
74            .ok_or_else(|| anyhow::anyhow!("stdin already closed"))?;
75        stdin.write_all(message.as_bytes()).await?;
76        stdin.write_all(b"\n").await?;
77        stdin.flush().await?;
78        Ok(())
79    }
80
81    /// Send a user message to the agent.
82    ///
83    /// Formats the content as a `{"type":"user_message","content":"..."}` NDJSON line.
84    pub async fn send_user_message(&mut self, content: &str) -> Result<()> {
85        let msg = serde_json::json!({
86            "type": "user_message",
87            "content": content,
88        });
89        self.send(&serde_json::to_string(&msg)?).await
90    }
91
92    /// Read the next event from the agent's stdout.
93    ///
94    /// Returns `None` when stdout is closed (agent exited).
95    /// Skips lines that fail to parse as JSON events.
96    pub async fn next_event(&mut self) -> Result<Option<Event>> {
97        loop {
98            match self.lines.next_line().await? {
99                None => return Ok(None),
100                Some(line) => {
101                    let trimmed = line.trim();
102                    if trimmed.is_empty() {
103                        continue;
104                    }
105                    match serde_json::from_str::<Event>(trimmed) {
106                        Ok(event) => return Ok(Some(event)),
107                        Err(e) => {
108                            log::debug!(
109                                "Skipping unparseable streaming event: {}. Line: {}",
110                                e,
111                                crate::truncate_str(trimmed, 200)
112                            );
113                            continue;
114                        }
115                    }
116                }
117            }
118        }
119    }
120
121    /// Close the stdin pipe, signaling no more input to the agent.
122    pub fn close_input(&mut self) {
123        self.stdin.take();
124    }
125
126    /// Wait for the agent process to exit.
127    ///
128    /// Consumes the session. Returns an error if the process exits with a
129    /// non-zero status.
130    pub async fn wait(mut self) -> Result<()> {
131        // Drop stdin to ensure the agent sees EOF
132        self.stdin.take();
133
134        let stderr_handle = self.child.stderr.take();
135        let status = self.child.wait().await?;
136
137        let stderr_text = if let Some(stderr) = stderr_handle {
138            let mut buf = Vec::new();
139            let mut reader = tokio::io::BufReader::new(stderr);
140            let _ = tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut buf).await;
141            String::from_utf8_lossy(&buf).trim().to_string()
142        } else {
143            String::new()
144        };
145
146        crate::process::log_stderr_text(&stderr_text);
147
148        if !status.success() {
149            if stderr_text.is_empty() {
150                bail!("Agent process failed with status: {}", status);
151            } else {
152                bail!("{}", stderr_text);
153            }
154        }
155
156        Ok(())
157    }
158}
159
160#[cfg(test)]
161#[path = "streaming_tests.rs"]
162mod tests;