pub struct StreamingSession { /* private fields */ }Expand description
Unified streaming session tracker.
Provides a single source of truth for streaming state across all parsers. Tracks:
- Current streaming state (
Idle/Streaming/Finalized) - Which content types have been streamed
- Accumulated content by content type and index
- Whether prefix should be shown on next delta
- Delta size patterns for detecting snapshot-as-delta violations
- Persistent “output started” tracking independent of accumulated content
- Verbosity-aware warning emission
§Lifecycle
- Start:
on_message_start()- resets all state - Stream:
on_text_delta()/on_thinking_delta()- accumulate content - Stop:
on_message_stop()- finalize the message - Repeat: Back to step 1 for next message
Implementations§
Source§impl StreamingSession
impl StreamingSession
Sourcepub const fn with_verbose_warnings(self, enabled: bool) -> Self
pub const fn with_verbose_warnings(self, enabled: bool) -> Self
Configure whether to emit verbose warnings about streaming anomalies.
When enabled, diagnostic warnings are printed for:
- Repeated
MessageStartevents (GLM protocol violations) - Large deltas that may indicate snapshot-as-delta bugs
- Pattern detection of repeated large content
When disabled (default), these warnings are suppressed to avoid noise in production output.
§Arguments
enabled- Whether to enable verbose warnings
§Returns
The modified session for builder chaining.
§Example
let mut session = StreamingSession::new().with_verbose_warnings(true);Sourcepub fn on_message_start(&mut self)
pub fn on_message_start(&mut self)
Reset the session on new message start.
This should be called when:
- Claude:
MessageStartevent - Codex:
TurnStartedevent - Gemini:
initevent or new message OpenCode: New part starts
§Arguments
message_id- Optional unique identifier for this message (for deduplication)
§Note on Repeated MessageStart Events
Some agents (notably GLM/ccs-glm) send repeated MessageStart events during
a single logical streaming session. When this happens while state is Streaming,
we preserve output_started_for_key to prevent prefix spam on each delta that
follows the repeated MessageStart. This is a defensive measure to handle
non-standard agent protocols while maintaining correct behavior for legitimate
multi-message scenarios.
Sourcepub fn set_current_message_id(&mut self, message_id: Option<String>)
pub fn set_current_message_id(&mut self, message_id: Option<String>)
Set the current message ID for tracking.
This should be called when processing a MessageStart event that contains
a message identifier. Used to prevent duplicate display of final messages.
§Arguments
message_id- The unique identifier for this message (or None to clear)
Sourcepub fn get_current_message_id(&self) -> Option<&str>
pub fn get_current_message_id(&self) -> Option<&str>
Sourcepub fn is_duplicate_final_message(&self, message_id: &str) -> bool
pub fn is_duplicate_final_message(&self, message_id: &str) -> bool
Check if a message ID represents a duplicate final message.
This prevents displaying the same message twice - once after streaming completes and again when the final “Assistant” event arrives.
§Arguments
message_id- The message ID to check
§Returns
true- This message has already been displayed (is a duplicate)false- This is a new message
Sourcepub fn mark_message_displayed(&mut self, message_id: &str)
pub fn mark_message_displayed(&mut self, message_id: &str)
Mark a message as displayed to prevent duplicate display.
This should be called after displaying a message’s final content.
§Arguments
message_id- The message ID to mark as displayed
Sourcepub fn mark_message_pre_rendered(&mut self, message_id: &str)
pub fn mark_message_pre_rendered(&mut self, message_id: &str)
Mark that an assistant event has pre-rendered content BEFORE streaming started.
This is used to handle the case where an assistant event arrives with full content BEFORE any streaming deltas. When this happens, we render the assistant event content and mark the message_id as pre-rendered. ALL subsequent streaming deltas for the same message_id should be suppressed to prevent duplication.
§Arguments
message_id- The message ID that was pre-rendered
Sourcepub fn is_message_pre_rendered(&self, message_id: &str) -> bool
pub fn is_message_pre_rendered(&self, message_id: &str) -> bool
Check if a message was pre-rendered from an assistant event.
This checks if the given message_id was previously rendered from an assistant event (before streaming started). If so, ALL subsequent streaming deltas for this message should be suppressed.
§Arguments
message_id- The message ID to check
§Returns
true- This message was pre-rendered, suppress all streaming outputfalse- This message was not pre-rendered, allow streaming output
Sourcepub fn is_assistant_content_rendered(&self, content_hash: u64) -> bool
pub fn is_assistant_content_rendered(&self, content_hash: u64) -> bool
Check if assistant event content has already been rendered.
This prevents duplicate assistant events with the same content from being rendered multiple times. GLM/CCS may send multiple assistant events during streaming with the same content but different message_ids.
§Arguments
content_hash- The hash of the assistant event content
§Returns
true- This content was already rendered, suppress renderingfalse- This content was not rendered, allow rendering
Sourcepub fn mark_assistant_content_rendered(&mut self, content_hash: u64)
pub fn mark_assistant_content_rendered(&mut self, content_hash: u64)
Mark assistant event content as having been rendered.
This should be called after rendering an assistant event to prevent duplicate rendering of the same content.
§Arguments
content_hash- The hash of the assistant event content that was rendered
Sourcepub fn on_content_block_start(&mut self, index: u64)
pub fn on_content_block_start(&mut self, index: u64)
Mark the start of a content block.
This should be called when:
- Claude:
ContentBlockStartevent - Codex:
ItemStartedwith relevant type - Gemini: Content section begins
OpenCode: Part with content starts
If we’re already in a block, this method finalizes the previous block by emitting a newline if output had started.
§Arguments
index- The content block index (for multi-block messages)
Sourcepub fn on_text_delta(&mut self, index: u64, delta: &str) -> bool
pub fn on_text_delta(&mut self, index: u64, delta: &str) -> bool
Sourcepub fn on_text_delta_key(&mut self, key: &str, delta: &str) -> bool
pub fn on_text_delta_key(&mut self, key: &str, delta: &str) -> bool
Process a text delta with a string key and return whether prefix should be shown.
This variant is for parsers that use string keys instead of numeric indices
(e.g., Codex uses agent_msg, reasoning; Gemini uses main; OpenCode uses main).
§Delta Validation
This method validates that incoming content appears to be a genuine delta
(small chunk) rather than a snapshot (full accumulated content). Large “deltas”
that exceed snapshot_threshold() trigger a warning as they may indicate a
contract violation.
Additionally, we track patterns of delta sizes to detect repeated large content being sent as if it were incremental (a common snapshot-as-delta bug).
§Arguments
key- The content key (e.g.,main,agent_msg,reasoning)delta- The text delta to accumulate
§Returns
true- Show prefix with this delta (first chunk)false- Don’t show prefix (subsequent chunks)
Sourcepub fn on_thinking_delta(&mut self, index: u64, delta: &str) -> bool
pub fn on_thinking_delta(&mut self, index: u64, delta: &str) -> bool
Sourcepub fn on_thinking_delta_key(&mut self, key: &str, delta: &str) -> bool
pub fn on_thinking_delta_key(&mut self, key: &str, delta: &str) -> bool
Process a thinking delta with a string key and return whether prefix should be shown.
This variant is for parsers that use string keys instead of numeric indices.
§Arguments
key- The content key (e.g., “reasoning”)delta- The thinking delta to accumulate
§Returns
true- Show prefix with this delta (first chunk)false- Don’t show prefix (subsequent chunks)
Sourcepub fn on_tool_input_delta(&mut self, index: u64, delta: &str)
pub fn on_tool_input_delta(&mut self, index: u64, delta: &str)
Process a tool input delta.
§Arguments
index- The content block indexdelta- The tool input delta to accumulate
Sourcepub fn set_tool_name(&mut self, index: u64, name: Option<String>)
pub fn set_tool_name(&mut self, index: u64, name: Option<String>)
Record the tool name for a specific content block index.
This is used for GLM/CCS deduplication where assistant events contain tool_use blocks (name + input) but streaming only accumulates the input. By tracking the name separately, we can reconstruct the normalized representation for proper hash-based deduplication.
§Arguments
index- The content block indexname- The tool name (if available)
Sourcepub fn on_message_stop(&mut self) -> bool
pub fn on_message_stop(&mut self) -> bool
Finalize the message on stop event.
This should be called when:
- Claude:
MessageStopevent - Codex:
TurnCompletedorItemCompletedwith text - Gemini: Message completion
OpenCode: Part completion
§Returns
true- A completion newline should be emitted (was in a content block)false- No completion needed (no content block active)
Sourcepub fn has_any_streamed_content(&self) -> bool
pub fn has_any_streamed_content(&self) -> bool
Check if ANY content has been streamed for this message.
This is a broader check that returns true if ANY content type has been streamed. Used to skip entire message display when all content was already streamed.
Sourcepub fn is_duplicate_by_hash(
&self,
content: &str,
tool_name_hints: Option<&HashMap<usize, String>>,
) -> bool
pub fn is_duplicate_by_hash( &self, content: &str, tool_name_hints: Option<&HashMap<usize, String>>, ) -> bool
Check if content matches the previously streamed content by hash.
This is a more precise alternative to has_any_streamed_content() for
deduplication. Instead of checking if ANY content was streamed, this checks
if the EXACT content was streamed by comparing hashes.
This method looks at ALL accumulated content across all content types and indices. If the combined accumulated content matches the input, it returns true.
§Arguments
content- The content to check (typically normalized content from assistant events)tool_name_hints- Optional tool names from assistant event (by content block index)
§Returns
true- The content hash matches the previously streamed contentfalse- The content is different or no content was streamed
Sourcepub fn get_accumulated(
&self,
content_type: ContentType,
index: &str,
) -> Option<&str>
pub fn get_accumulated( &self, content_type: ContentType, index: &str, ) -> Option<&str>
Sourcepub fn mark_rendered(&mut self, content_type: ContentType, index: &str)
pub fn mark_rendered(&mut self, content_type: ContentType, index: &str)
Mark content as having been rendered (HashMap-based tracking).
This should be called after rendering to update the per-key tracking.
§Arguments
content_type- The type of contentindex- The content index (as string for flexibility)
Sourcepub fn has_rendered_prefix(
&self,
content_type: ContentType,
index: &str,
) -> bool
pub fn has_rendered_prefix( &self, content_type: ContentType, index: &str, ) -> bool
Check if content has been rendered before and starts with previously rendered content.
This method detects when new content extends previously rendered content, indicating an in-place update should be performed (e.g., using carriage return).
With the new KMP + Rolling Hash approach, this checks if output has started for this key, which indicates we’re in an in-place update scenario.
§Arguments
content_type- The type of contentindex- The content index (as string for flexibility)
§Returns
true- Output has started for this key (do in-place update)false- Output has not started for this key (show new content)
Sourcepub fn mark_content_hash_rendered(
&mut self,
content_type: ContentType,
index: &str,
content: &str,
)
pub fn mark_content_hash_rendered( &mut self, content_type: ContentType, index: &str, content: &str, )
Mark content as rendered using pre-sanitized content.
This method uses the sanitized content (with whitespace normalized) for hash-based deduplication, which prevents duplicates when the accumulated content differs only by whitespace.
§Arguments
content_type- The type of contentindex- The content index (as string for flexibility)content- The content to hash
Sourcepub fn is_content_hash_rendered(
&self,
_content_type: ContentType,
_index: &str,
content: &str,
) -> bool
pub fn is_content_hash_rendered( &self, _content_type: ContentType, _index: &str, content: &str, ) -> bool
Check if sanitized content has already been rendered.
This method checks the hash of the sanitized content against the rendered set to prevent duplicate rendering.
§Arguments
_content_type- The type of content (kept for API consistency)_index- The content index (kept for API consistency)sanitized_content- The sanitized content to check
§Returns
true- This exact content has been rendered beforefalse- This exact content has not been rendered
Sourcepub fn is_likely_snapshot(&self, text: &str, key: &str) -> bool
pub fn is_likely_snapshot(&self, text: &str, key: &str) -> bool
Check if incoming text is likely a snapshot (full accumulated content) rather than a delta.
This uses the KMP + Rolling Hash algorithm for efficient O(n+m) snapshot detection. The two-phase approach ensures optimal performance:
- Rolling hash for fast O(n) filtering
- KMP for exact O(n+m) verification
§Arguments
text- The incoming text to checkkey- The content key to compare against
§Returns
true- The text appears to be a snapshot (starts with previous accumulated content)false- The text appears to be a genuine delta
Sourcepub fn extract_delta_from_snapshot(
&self,
text: &str,
key: &str,
) -> Result<usize, String>
pub fn extract_delta_from_snapshot( &self, text: &str, key: &str, ) -> Result<usize, String>
Extract the delta portion from a snapshot.
When a snapshot is detected (full accumulated content sent as a “delta”), this method extracts only the new portion that hasn’t been accumulated yet.
§Arguments
text- The snapshot text (full accumulated content + new content)key- The content key to compare against
§Returns
Ok(usize)- The length of the delta portion (new content only)Err- If the text is not actually a snapshot (doesn’t start with accumulated content)
§Note
Returns the length of the delta portion as usize since we can’t return
a reference to text with the correct lifetime. Callers can slice text
themselves using &text[delta_len..].
Sourcepub fn get_delta_from_snapshot<'a>(
&self,
text: &'a str,
key: &str,
) -> Result<&'a str, String>
pub fn get_delta_from_snapshot<'a>( &self, text: &'a str, key: &str, ) -> Result<&'a str, String>
Get the delta portion as a string slice from a snapshot.
This is a convenience wrapper that returns the actual substring instead of just the length.
§Returns
Ok(&str)- The delta portion (new content only)Err- If the text is not actually a snapshot
Sourcepub fn get_streaming_quality_metrics(&self) -> StreamingQualityMetrics
pub fn get_streaming_quality_metrics(&self) -> StreamingQualityMetrics
Get streaming quality metrics for the current session.
Returns aggregated metrics about delta sizes and streaming patterns during the session. This is useful for debugging and analyzing streaming behavior.
§Returns
Aggregated metrics across all content types and keys.
Trait Implementations§
Source§impl Clone for StreamingSession
impl Clone for StreamingSession
Source§fn clone(&self) -> StreamingSession
fn clone(&self) -> StreamingSession
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for StreamingSession
impl Debug for StreamingSession
Source§impl Default for StreamingSession
impl Default for StreamingSession
Source§fn default() -> StreamingSession
fn default() -> StreamingSession
Auto Trait Implementations§
impl Freeze for StreamingSession
impl RefUnwindSafe for StreamingSession
impl Send for StreamingSession
impl Sync for StreamingSession
impl Unpin for StreamingSession
impl UnwindSafe for StreamingSession
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more