Skip to main content

Module streaming

Module streaming 

Source
Expand description

Streaming session for programmatic stdin/stdout interaction with agents.

A StreamingSession wraps a running agent subprocess with piped stdin and stdout, allowing callers to send NDJSON messages to the agent and read unified events back.

§Event lifecycle

In bidirectional streaming mode (Claude only), StreamingSession::next_event yields unified Event values converted from Claude’s native stream-json output. A Event::Result is emitted at the end of every agent turn — not only at final session end. After a Result, the session remains open and accepts another StreamingSession::send_user_message for the next turn. next_event returns Ok(None) only when the subprocess exits (e.g. after StreamingSession::close_input and EOF).

Consumers should use the Result event as the authoritative turn-boundary signal. Do not rely on replayed user_message events for this purpose; those only appear when --replay-user-messages is set.

§Mid-turn input semantics

send_user_message writes a user message to the agent’s stdin. What the agent does when the message arrives while it is still producing a response on the current turn is provider-specific. Callers that need to reason about mid-turn behavior should branch on ProviderCapability::features.streaming_input.semantics, which is one of:

  • "queue" — the message is buffered and delivered at the next turn boundary. The current turn runs to completion; the new message becomes the next user turn. Currently Claude.
  • "interrupt" — the message cancels the current turn and starts a new one with the new input.
  • "between-turns-only" — mid-turn sends are an error or no-op; callers must wait for the current turn to finish before sending.

Providers with streaming_input.supported == false (codex, gemini, copilot, ollama) do not expose a StreamingSession at all — exec_streaming is unavailable for them.

§Examples

use zag_agent::builder::AgentBuilder;
use zag_agent::output::Event;

let mut session = AgentBuilder::new()
    .provider("claude")
    .exec_streaming("initial prompt")
    .await?;

// First turn: drain events until the per-turn Result.
while let Some(event) = session.next_event().await? {
    println!("{:?}", event);
    if matches!(event, Event::Result { .. }) {
        break; // turn complete
    }
}

// Send a follow-up user message for the next turn.
session.send_user_message("do something else").await?;

// Drain the second turn, then close the session.
while let Some(event) = session.next_event().await? {
    if matches!(event, Event::Result { .. }) {
        break;
    }
}

session.close_input();
session.wait().await?;

Structs§

StreamingSession
A live streaming session connected to an agent subprocess.