Skip to main content

StreamingSession

Struct StreamingSession 

Source
pub struct StreamingSession { /* private fields */ }
Expand description

Unified streaming session tracker.

Provides a single source of truth for streaming state across all parsers. Tracks:

  • Current streaming state (Idle/Streaming/Finalized)
  • Which content types have been streamed
  • Accumulated content by content type and index
  • Whether prefix should be shown on next delta
  • Delta size patterns for detecting snapshot-as-delta violations
  • Persistent “output started” tracking independent of accumulated content
  • Verbosity-aware warning emission

§Lifecycle

  1. Start: on_message_start() - resets all state
  2. Stream: on_text_delta() / on_thinking_delta() - accumulate content
  3. Stop: on_message_stop() - finalize the message
  4. Repeat: Back to step 1 for next message

Implementations§

Source§

impl StreamingSession

Source

pub fn new() -> Self

Create a new streaming session.

Source

pub const fn with_verbose_warnings(self, enabled: bool) -> Self

Configure whether to emit verbose warnings about streaming anomalies.

When enabled, diagnostic warnings are printed for:

  • Repeated MessageStart events (GLM protocol violations)
  • Large deltas that may indicate snapshot-as-delta bugs
  • Pattern detection of repeated large content

When disabled (default), these warnings are suppressed to avoid noise in production output.

§Arguments
  • enabled - Whether to enable verbose warnings
§Returns

The modified session for builder chaining.

§Example
let mut session = StreamingSession::new().with_verbose_warnings(true);
Source

pub fn on_message_start(&mut self)

Reset the session on new message start.

This should be called when:

  • Claude: MessageStart event
  • Codex: TurnStarted event
  • Gemini: init event or new message
  • OpenCode: New part starts
§Arguments
  • message_id - Optional unique identifier for this message (for deduplication)
§Note on Repeated MessageStart Events

Some agents (notably GLM/ccs-glm) send repeated MessageStart events during a single logical streaming session. When this happens while state is Streaming, we preserve output_started_for_key to prevent prefix spam on each delta that follows the repeated MessageStart. This is a defensive measure to handle non-standard agent protocols while maintaining correct behavior for legitimate multi-message scenarios.

Source

pub fn set_current_message_id(&mut self, message_id: Option<String>)

Set the current message ID for tracking.

This should be called when processing a MessageStart event that contains a message identifier. Used to prevent duplicate display of final messages.

§Arguments
  • message_id - The unique identifier for this message (or None to clear)
Source

pub fn get_current_message_id(&self) -> Option<&str>

Get the current message ID.

§Returns
  • Some(id) - The current message ID
  • None - No message ID is set
Source

pub fn is_duplicate_final_message(&self, message_id: &str) -> bool

Check if a message ID represents a duplicate final message.

This prevents displaying the same message twice - once after streaming completes and again when the final “Assistant” event arrives.

§Arguments
  • message_id - The message ID to check
§Returns
  • true - This message has already been displayed (is a duplicate)
  • false - This is a new message
Source

pub fn mark_message_displayed(&mut self, message_id: &str)

Mark a message as displayed to prevent duplicate display.

This should be called after displaying a message’s final content.

§Arguments
  • message_id - The message ID to mark as displayed
Source

pub fn mark_message_pre_rendered(&mut self, message_id: &str)

Mark that an assistant event has pre-rendered content BEFORE streaming started.

This is used to handle the case where an assistant event arrives with full content BEFORE any streaming deltas. When this happens, we render the assistant event content and mark the message_id as pre-rendered. ALL subsequent streaming deltas for the same message_id should be suppressed to prevent duplication.

§Arguments
  • message_id - The message ID that was pre-rendered
Source

pub fn is_message_pre_rendered(&self, message_id: &str) -> bool

Check if a message was pre-rendered from an assistant event.

This checks if the given message_id was previously rendered from an assistant event (before streaming started). If so, ALL subsequent streaming deltas for this message should be suppressed.

§Arguments
  • message_id - The message ID to check
§Returns
  • true - This message was pre-rendered, suppress all streaming output
  • false - This message was not pre-rendered, allow streaming output
Source

pub fn is_assistant_content_rendered(&self, content_hash: u64) -> bool

Check if assistant event content has already been rendered.

This prevents duplicate assistant events with the same content from being rendered multiple times. GLM/CCS may send multiple assistant events during streaming with the same content but different message_ids.

§Arguments
  • content_hash - The hash of the assistant event content
§Returns
  • true - This content was already rendered, suppress rendering
  • false - This content was not rendered, allow rendering
Source

pub fn mark_assistant_content_rendered(&mut self, content_hash: u64)

Mark assistant event content as having been rendered.

This should be called after rendering an assistant event to prevent duplicate rendering of the same content.

§Arguments
  • content_hash - The hash of the assistant event content that was rendered
Source

pub fn on_content_block_start(&mut self, index: u64)

Mark the start of a content block.

This should be called when:

  • Claude: ContentBlockStart event
  • Codex: ItemStarted with relevant type
  • Gemini: Content section begins
  • OpenCode: Part with content starts

If we’re already in a block, this method finalizes the previous block by emitting a newline if output had started.

§Arguments
  • index - The content block index (for multi-block messages)
Source

pub fn on_text_delta(&mut self, index: u64, delta: &str) -> bool

Process a text delta and return whether prefix should be shown.

§Arguments
  • index - The content block index
  • delta - The text delta to accumulate
§Returns
  • true - Show prefix with this delta (first chunk)
  • false - Don’t show prefix (subsequent chunks)
Source

pub fn on_text_delta_key(&mut self, key: &str, delta: &str) -> bool

Process a text delta with a string key and return whether prefix should be shown.

This variant is for parsers that use string keys instead of numeric indices (e.g., Codex uses agent_msg, reasoning; Gemini uses main; OpenCode uses main).

§Delta Validation

This method validates that incoming content appears to be a genuine delta (small chunk) rather than a snapshot (full accumulated content). Large “deltas” that exceed snapshot_threshold() trigger a warning as they may indicate a contract violation.

Additionally, we track patterns of delta sizes to detect repeated large content being sent as if it were incremental (a common snapshot-as-delta bug).

§Arguments
  • key - The content key (e.g., main, agent_msg, reasoning)
  • delta - The text delta to accumulate
§Returns
  • true - Show prefix with this delta (first chunk)
  • false - Don’t show prefix (subsequent chunks)
Source

pub fn on_thinking_delta(&mut self, index: u64, delta: &str) -> bool

Process a thinking delta and return whether prefix should be shown.

§Arguments
  • index - The content block index
  • delta - The thinking delta to accumulate
§Returns
  • true - Show prefix with this delta (first chunk)
  • false - Don’t show prefix (subsequent chunks)
Source

pub fn on_thinking_delta_key(&mut self, key: &str, delta: &str) -> bool

Process a thinking delta with a string key and return whether prefix should be shown.

This variant is for parsers that use string keys instead of numeric indices.

§Arguments
  • key - The content key (e.g., “reasoning”)
  • delta - The thinking delta to accumulate
§Returns
  • true - Show prefix with this delta (first chunk)
  • false - Don’t show prefix (subsequent chunks)
Source

pub fn on_tool_input_delta(&mut self, index: u64, delta: &str)

Process a tool input delta.

§Arguments
  • index - The content block index
  • delta - The tool input delta to accumulate
Source

pub fn set_tool_name(&mut self, index: u64, name: Option<String>)

Record the tool name for a specific content block index.

This is used for GLM/CCS deduplication where assistant events contain tool_use blocks (name + input) but streaming only accumulates the input. By tracking the name separately, we can reconstruct the normalized representation for proper hash-based deduplication.

§Arguments
  • index - The content block index
  • name - The tool name (if available)
Source

pub fn on_message_stop(&mut self) -> bool

Finalize the message on stop event.

This should be called when:

  • Claude: MessageStop event
  • Codex: TurnCompleted or ItemCompleted with text
  • Gemini: Message completion
  • OpenCode: Part completion
§Returns
  • true - A completion newline should be emitted (was in a content block)
  • false - No completion needed (no content block active)
Source

pub fn has_any_streamed_content(&self) -> bool

Check if ANY content has been streamed for this message.

This is a broader check that returns true if ANY content type has been streamed. Used to skip entire message display when all content was already streamed.

Source

pub fn is_duplicate_by_hash( &self, content: &str, tool_name_hints: Option<&HashMap<usize, String>>, ) -> bool

Check if content matches the previously streamed content by hash.

This is a more precise alternative to has_any_streamed_content() for deduplication. Instead of checking if ANY content was streamed, this checks if the EXACT content was streamed by comparing hashes.

This method looks at ALL accumulated content across all content types and indices. If the combined accumulated content matches the input, it returns true.

§Arguments
  • content - The content to check (typically normalized content from assistant events)
  • tool_name_hints - Optional tool names from assistant event (by content block index)
§Returns
  • true - The content hash matches the previously streamed content
  • false - The content is different or no content was streamed
Source

pub fn get_accumulated( &self, content_type: ContentType, index: &str, ) -> Option<&str>

Get accumulated content for a specific type and index.

§Arguments
  • content_type - The type of content
  • index - The content index (as string for flexibility)
§Returns
  • Some(text) - Accumulated content
  • None - No content accumulated for this key
Source

pub fn mark_rendered(&mut self, content_type: ContentType, index: &str)

Mark content as having been rendered (HashMap-based tracking).

This should be called after rendering to update the per-key tracking.

§Arguments
  • content_type - The type of content
  • index - The content index (as string for flexibility)
Source

pub fn has_rendered_prefix( &self, content_type: ContentType, index: &str, ) -> bool

Check if content has been rendered before and starts with previously rendered content.

This method detects when new content extends previously rendered content, indicating an in-place update should be performed (e.g., using carriage return).

With the new KMP + Rolling Hash approach, this checks if output has started for this key, which indicates we’re in an in-place update scenario.

§Arguments
  • content_type - The type of content
  • index - The content index (as string for flexibility)
§Returns
  • true - Output has started for this key (do in-place update)
  • false - Output has not started for this key (show new content)
Source

pub fn mark_content_hash_rendered( &mut self, content_type: ContentType, index: &str, content: &str, )

Mark content as rendered using pre-sanitized content.

This method uses the sanitized content (with whitespace normalized) for hash-based deduplication, which prevents duplicates when the accumulated content differs only by whitespace.

§Arguments
  • content_type - The type of content
  • index - The content index (as string for flexibility)
  • content - The content to hash
Source

pub fn is_content_hash_rendered( &self, _content_type: ContentType, _index: &str, content: &str, ) -> bool

Check if sanitized content has already been rendered.

This method checks the hash of the sanitized content against the rendered set to prevent duplicate rendering.

§Arguments
  • _content_type - The type of content (kept for API consistency)
  • _index - The content index (kept for API consistency)
  • sanitized_content - The sanitized content to check
§Returns
  • true - This exact content has been rendered before
  • false - This exact content has not been rendered
Source

pub fn is_likely_snapshot(&self, text: &str, key: &str) -> bool

Check if incoming text is likely a snapshot (full accumulated content) rather than a delta.

This uses the KMP + Rolling Hash algorithm for efficient O(n+m) snapshot detection. The two-phase approach ensures optimal performance:

  1. Rolling hash for fast O(n) filtering
  2. KMP for exact O(n+m) verification
§Arguments
  • text - The incoming text to check
  • key - The content key to compare against
§Returns
  • true - The text appears to be a snapshot (starts with previous accumulated content)
  • false - The text appears to be a genuine delta
Source

pub fn extract_delta_from_snapshot( &self, text: &str, key: &str, ) -> Result<usize, String>

Extract the delta portion from a snapshot.

When a snapshot is detected (full accumulated content sent as a “delta”), this method extracts only the new portion that hasn’t been accumulated yet.

§Arguments
  • text - The snapshot text (full accumulated content + new content)
  • key - The content key to compare against
§Returns
  • Ok(usize) - The length of the delta portion (new content only)
  • Err - If the text is not actually a snapshot (doesn’t start with accumulated content)
§Note

Returns the length of the delta portion as usize since we can’t return a reference to text with the correct lifetime. Callers can slice text themselves using &text[delta_len..].

Source

pub fn get_delta_from_snapshot<'a>( &self, text: &'a str, key: &str, ) -> Result<&'a str, String>

Get the delta portion as a string slice from a snapshot.

This is a convenience wrapper that returns the actual substring instead of just the length.

§Returns
  • Ok(&str) - The delta portion (new content only)
  • Err - If the text is not actually a snapshot
Source

pub fn get_streaming_quality_metrics(&self) -> StreamingQualityMetrics

Get streaming quality metrics for the current session.

Returns aggregated metrics about delta sizes and streaming patterns during the session. This is useful for debugging and analyzing streaming behavior.

§Returns

Aggregated metrics across all content types and keys.

Trait Implementations§

Source§

impl Clone for StreamingSession

Source§

fn clone(&self) -> StreamingSession

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for StreamingSession

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for StreamingSession

Source§

fn default() -> StreamingSession

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.