1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
//! Streaming session for programmatic stdin/stdout interaction with agents.
//!
//! A `StreamingSession` wraps a running agent subprocess with piped stdin and
//! stdout, allowing callers to send NDJSON messages to the agent and read
//! unified events back.
//!
//! # Examples
//!
//! ```no_run
//! use zag_agent::builder::AgentBuilder;
//!
//! # async fn example() -> anyhow::Result<()> {
//! let mut session = AgentBuilder::new()
//! .provider("claude")
//! .exec_streaming("initial prompt")
//! .await?;
//!
//! // Send a user message
//! session.send_user_message("do something").await?;
//!
//! // Read events
//! while let Some(event) = session.next_event().await? {
//! println!("{:?}", event);
//! }
//!
//! session.wait().await?;
//! # Ok(())
//! # }
//! ```
use crate::output::Event;
use anyhow::{Result, bail};
use serde_json;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
use tokio::process::{Child, ChildStdin, ChildStdout};
/// A live streaming session connected to an agent subprocess.
///
/// stdin is piped for sending NDJSON messages, stdout is piped for reading
/// NDJSON events. The session owns the child process.
pub struct StreamingSession {
child: Child,
stdin: Option<ChildStdin>,
lines: Lines<BufReader<ChildStdout>>,
}
impl StreamingSession {
/// Create a new `StreamingSession` from a spawned child process.
///
/// The child must have been spawned with piped stdin and stdout.
pub(crate) fn new(mut child: Child) -> Result<Self> {
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("Child process stdout not piped"))?;
let stdin = child.stdin.take();
let reader = BufReader::new(stdout);
let lines = reader.lines();
Ok(Self {
child,
stdin,
lines,
})
}
/// Send a raw NDJSON line to the agent's stdin.
///
/// The message should be a single JSON object (no trailing newline needed).
pub async fn send(&mut self, message: &str) -> Result<()> {
let stdin = self
.stdin
.as_mut()
.ok_or_else(|| anyhow::anyhow!("stdin already closed"))?;
stdin.write_all(message.as_bytes()).await?;
stdin.write_all(b"\n").await?;
stdin.flush().await?;
Ok(())
}
/// Send a user message to the agent.
///
/// Formats the content as a `{"type":"user_message","content":"..."}` NDJSON line.
pub async fn send_user_message(&mut self, content: &str) -> Result<()> {
let msg = serde_json::json!({
"type": "user_message",
"content": content,
});
self.send(&serde_json::to_string(&msg)?).await
}
/// Read the next event from the agent's stdout.
///
/// Returns `None` when stdout is closed (agent exited).
/// Skips lines that fail to parse as JSON events.
pub async fn next_event(&mut self) -> Result<Option<Event>> {
loop {
match self.lines.next_line().await? {
None => return Ok(None),
Some(line) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<Event>(trimmed) {
Ok(event) => return Ok(Some(event)),
Err(e) => {
log::debug!(
"Skipping unparseable streaming event: {}. Line: {}",
e,
crate::truncate_str(trimmed, 200)
);
continue;
}
}
}
}
}
}
/// Close the stdin pipe, signaling no more input to the agent.
pub fn close_input(&mut self) {
self.stdin.take();
}
/// Wait for the agent process to exit.
///
/// Consumes the session. Returns an error if the process exits with a
/// non-zero status.
pub async fn wait(mut self) -> Result<()> {
// Drop stdin to ensure the agent sees EOF
self.stdin.take();
let stderr_handle = self.child.stderr.take();
let status = self.child.wait().await?;
let stderr_text = if let Some(stderr) = stderr_handle {
let mut buf = Vec::new();
let mut reader = tokio::io::BufReader::new(stderr);
let _ = tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut buf).await;
String::from_utf8_lossy(&buf).trim().to_string()
} else {
String::new()
};
crate::process::log_stderr_text(&stderr_text);
if !status.success() {
if stderr_text.is_empty() {
bail!("Agent process failed with status: {}", status);
} else {
bail!("{}", stderr_text);
}
}
Ok(())
}
}
#[cfg(test)]
#[path = "streaming_tests.rs"]
mod tests;