Expand description
Streaming session for programmatic stdin/stdout interaction with agents.
A StreamingSession wraps a running agent subprocess with piped stdin and
stdout, allowing callers to send NDJSON messages to the agent and read
unified events back.
§Event lifecycle
In bidirectional streaming mode (Claude only), StreamingSession::next_event
yields unified Event values converted from Claude’s
native stream-json output. At the end of every agent turn the session
emits a Event::TurnComplete with
the provider’s stop_reason, a zero-based turn_index, and the turn’s
usage, followed immediately by a per-turn
Event::Result. After that pair the
session remains open and accepts another
StreamingSession::send_user_message for the next turn. next_event
returns Ok(None) only when the subprocess exits (e.g. after
StreamingSession::close_input and EOF).
New consumers should use TurnComplete as the authoritative
turn-boundary signal. Result continues to fire per-turn for backward
compatibility, but TurnComplete is what carries stop_reason and
turn_index. Do not rely on replayed user_message events as a
turn delimiter; those only appear when --replay-user-messages is set
and only fire after the next user message is sent.
§Mid-turn input semantics
send_user_message writes a user message to the agent’s stdin. What the
agent does when the message arrives while it is still producing a response
on the current turn is provider-specific. Callers that need to reason about
mid-turn behavior should branch on
ProviderCapability::features.streaming_input.semantics, which is one of:
"queue"— the message is buffered and delivered at the next turn boundary. The current turn runs to completion; the new message becomes the next user turn. Currently Claude."interrupt"— the message cancels the current turn and starts a new one with the new input."between-turns-only"— mid-turn sends are an error or no-op; callers must wait for the current turn to finish before sending.
Providers with streaming_input.supported == false (codex, gemini, copilot,
ollama) do not expose a StreamingSession at all — exec_streaming is
unavailable for them.
§Examples
use zag_agent::builder::AgentBuilder;
use zag_agent::output::Event;
let mut session = AgentBuilder::new()
.provider("claude")
.exec_streaming("initial prompt")
.await?;
// First turn: drain events until TurnComplete.
while let Some(event) = session.next_event().await? {
println!("{:?}", event);
if matches!(event, Event::TurnComplete { .. }) {
break; // turn complete — the per-turn Result follows next
}
}
// Send a follow-up user message for the next turn.
session.send_user_message("do something else").await?;
// Drain the second turn, then close the session.
while let Some(event) = session.next_event().await? {
if matches!(event, Event::TurnComplete { .. }) {
break;
}
}
session.close_input();
session.wait().await?;Structs§
- Streaming
Session - A live streaming session connected to an agent subprocess.