Skip to main content

Module streaming

Module streaming 

Source
Expand description

Streaming session for programmatic stdin/stdout interaction with agents.

A StreamingSession wraps a running agent subprocess with piped stdin and stdout, allowing callers to send NDJSON messages to the agent and read unified events back.

§Event lifecycle

In bidirectional streaming mode (Claude only), StreamingSession::next_event yields unified Event values converted from Claude’s native stream-json output. At the end of every agent turn the session emits a Event::TurnComplete with the provider’s stop_reason, a zero-based turn_index, and the turn’s usage, followed immediately by a per-turn Event::Result. After that pair the session remains open and accepts another StreamingSession::send_user_message for the next turn. next_event returns Ok(None) only when the subprocess exits (e.g. after StreamingSession::close_input and EOF).

New consumers should use TurnComplete as the authoritative turn-boundary signal. Result continues to fire per-turn for backward compatibility, but TurnComplete is what carries stop_reason and turn_index. Do not rely on replayed user_message events as a turn delimiter; those only appear when --replay-user-messages is set and only fire after the next user message is sent.

§Mid-turn input semantics

send_user_message writes a user message to the agent’s stdin. What the agent does when the message arrives while it is still producing a response on the current turn is provider-specific. Callers that need to reason about mid-turn behavior should branch on ProviderCapability::features.streaming_input.semantics, which is one of:

  • "queue" — the message is buffered and delivered at the next turn boundary. The current turn runs to completion; the new message becomes the next user turn. Currently Claude.
  • "interrupt" — the message cancels the current turn and starts a new one with the new input.
  • "between-turns-only" — mid-turn sends are an error or no-op; callers must wait for the current turn to finish before sending.

Providers with streaming_input.supported == false (codex, gemini, copilot, ollama) do not expose a StreamingSession at all — exec_streaming is unavailable for them.

§Examples

use zag_agent::builder::AgentBuilder;
use zag_agent::output::Event;

let mut session = AgentBuilder::new()
    .provider("claude")
    .exec_streaming("initial prompt")
    .await?;

// First turn: drain events until TurnComplete.
while let Some(event) = session.next_event().await? {
    println!("{:?}", event);
    if matches!(event, Event::TurnComplete { .. }) {
        break; // turn complete — the per-turn Result follows next
    }
}

// Send a follow-up user message for the next turn.
session.send_user_message("do something else").await?;

// Drain the second turn, then close the session.
while let Some(event) = session.next_event().await? {
    if matches!(event, Event::TurnComplete { .. }) {
        break;
    }
}

session.close_input();
session.wait().await?;

Structs§

StreamingSession
A live streaming session connected to an agent subprocess.