Skip to main content

ralph_workflow/json_parser/
streaming_state.rs

1//! Unified streaming state tracking module.
2//!
3//! This module provides a single source of truth for streaming state across
4//! all parsers (`Claude`, `Codex`, `Gemini`, `OpenCode`). It implements the streaming
5//! contract:
6//!
7//! # Streaming Contract
8//!
9//! 1. **Delta contract**: Each streaming event contains only newly generated text
10//! 2. **Message lifecycle**: `MessageStart` → (`ContentBlockStart` + deltas)* → `MessageStop`
11//! 3. **Deduplication rule**: Content displayed during streaming is never re-displayed
12//! 4. **State reset**: Streaming state resets on `MessageStart`/Init events
13//!
14//! # Message Lifecycle
15//!
16//! The streaming message lifecycle follows this sequence:
17//!
18//! 1. **`MessageStart`** (or equivalent init event):
19//!    - Resets all streaming state to `Idle`
20//!    - Clears accumulated content
21//!    - Resets content block state
22//!
23//! 2. **`ContentBlockStart`** (optional, for each content block):
24//!    - If already in a block with `started_output=true`, finalizes the previous block
25//!    - Initializes tracking for the new block index
26//!    - Clears any accumulated content for this block index
27//!
28//! 3. **Text/Thinking deltas** (zero or more per block):
29//!    - First delta for each `(content_type, index)` shows prefix
30//!    - Subsequent deltas update in-place (with prefix, using carriage return)
31//!    - Sets `started_output=true` for the current block
32//!
33//! 4. **`MessageStop`**:
34//!    - Finalizes the current content block
35//!    - Marks message as displayed to prevent duplicate final output
36//!    - Returns whether content was streamed (for emitting completion newline)
37//!
38//! # Content Block Transitions
39//!
40//! When transitioning between content blocks (e.g., block 0 → block 1):
41//!
42//! ```ignore
43//! // Streaming "Hello" in block 0
44//! session.on_text_delta(0, "Hello");  // started_output = true
45//!
46//! // Transition to block 1
47//! session.on_content_block_start(1);  // Finalizes block 0, started_output was true
48//!
49//! // Stream "World" in block 1
50//! session.on_text_delta(1, "World");  // New block, shows prefix again
51//! ```
52//!
53//! The `ContentBlockState::InBlock { index, started_output }` tracks:
54//! - `index`: Which block is currently active
55//! - `started_output`: Whether any content was output for this block
56//!
57//! This state enables proper finalization of previous blocks when new ones start.
58//!
59//! # Delta Contract
60//!
61//! This module enforces a strict **delta contract** - all streaming events must
62//! contain only the newly generated text (deltas), not the full accumulated content.
63//!
64//! Treating snapshots as deltas causes exponential duplication bugs. The session
65//! validates that incoming content is genuinely delta-sized and rejects likely
66//! snapshot-as-delta violations.
67//!
68//! # Example
69//!
70//! ```ignore
71//! use crate::json_parser::streaming_state::{StreamingSession, StreamingState};
72//!
73//! let mut session = StreamingSession::new();
74//!
75//! // Message starts - reset state
76//! session.on_message_start();
77//!
78//! // Content block starts
79//! session.on_content_block_start(0);
80//!
81//! // Text deltas arrive - accumulate and display
82//! let should_show_prefix = session.on_text_delta(0, "Hello");
83//! assert!(should_show_prefix); // First chunk shows prefix
84//!
85//! let should_show_prefix = session.on_text_delta(0, " World");
86//! assert!(!should_show_prefix); // Subsequent chunks don't show prefix
87//!
88//! // Check if content was already streamed (for deduplication)
89//! assert!(session.has_any_streamed_content());
90//!
91//! // Message stops - finalize
92//! session.on_message_stop();
93//! ```
94
95use crate::json_parser::deduplication::RollingHashWindow;
96use crate::json_parser::deduplication::{get_overlap_thresholds, DeltaDeduplicator};
97use crate::json_parser::health::StreamingQualityMetrics;
98use crate::json_parser::types::ContentType;
99use std::collections::hash_map::DefaultHasher;
100use std::collections::{HashMap, HashSet};
101use std::hash::{Hash, Hasher};
102use std::sync::OnceLock;
103
104// Streaming configuration constants
105
106/// Default threshold for detecting snapshot-as-delta violations (in characters).
107///
108/// Deltas exceeding this size are flagged as potential snapshots. The value of 200
109/// characters was chosen because:
110/// - Normal deltas are typically < 100 characters (a few tokens)
111/// - Snapshots often contain the full accumulated content (200+ chars)
112/// - This threshold catches most violations while minimizing false positives
113const DEFAULT_SNAPSHOT_THRESHOLD: usize = 200;
114
115/// Minimum allowed snapshot threshold (in characters).
116///
117/// Values below 50 would cause excessive false positives for normal deltas,
118/// as even small text chunks (1-2 sentences) can exceed 30 characters.
119const MIN_SNAPSHOT_THRESHOLD: usize = 50;
120
121/// Maximum allowed snapshot threshold (in characters).
122///
123/// Values above 1000 would allow malicious snapshots to pass undetected,
124/// potentially causing exponential duplication bugs.
125const MAX_SNAPSHOT_THRESHOLD: usize = 1000;
126
127/// Minimum number of consecutive large deltas required to trigger pattern detection warning.
128///
129/// This threshold prevents false positives from occasional large deltas.
130/// Three consecutive large deltas indicate a pattern (not a one-off event).
131const DEFAULT_PATTERN_DETECTION_MIN_DELTAS: usize = 3;
132
133/// Maximum number of delta sizes to track per content key for pattern detection.
134///
135/// Tracking recent delta sizes allows us to detect patterns of repeated large
136/// content (a sign of snapshot-as-delta bugs). Ten entries provide sufficient
137/// history without excessive memory usage.
138const DEFAULT_MAX_DELTA_HISTORY: usize = 10;
139
140/// Ralph enforces a **delta contract** for all streaming content.
141///
142/// Every streaming event must contain only the newly generated text (delta),
143/// never the full accumulated content (snapshot).
144///
145/// # Contract Violations
146///
147/// If a parser emits snapshot-style content when deltas are expected, it will
148/// cause exponential duplication bugs. The `StreamingSession` validates that
149/// incoming content is delta-sized and logs warnings when violations are detected.
150///
151/// # Validation Threshold
152///
153/// Deltas are expected to be small chunks (typically < 100 chars). If a single
154/// "delta" exceeds `snapshot_threshold()` characters, it may indicate a snapshot
155/// being treated as a delta.
156///
157/// # Pattern Detection
158///
159/// In addition to size threshold, we track patterns of repeated large content
160/// which may indicate a snapshot-as-delta bug where the same content is being
161/// sent repeatedly as if it were incremental.
162///
163/// # Environment Variables
164///
165/// The following environment variables can be set to configure streaming behavior:
166///
167/// - `RALPH_STREAMING_SNAPSHOT_THRESHOLD`: Threshold for detecting snapshot-as-delta
168///   violations (default: 200). Deltas exceeding this size trigger warnings.
169///
170/// Get the snapshot threshold from environment variable or use default.
171///
172/// Reads `RALPH_STREAMING_SNAPSHOT_THRESHOLD` env var.
173/// Valid range: 50-1000 characters.
174/// Falls back to default of 200 if not set or out of range.
175fn snapshot_threshold() -> usize {
176    static THRESHOLD: OnceLock<usize> = OnceLock::new();
177    *THRESHOLD.get_or_init(|| {
178        std::env::var("RALPH_STREAMING_SNAPSHOT_THRESHOLD")
179            .ok()
180            .and_then(|s| s.parse::<usize>().ok())
181            .and_then(|v| {
182                if (MIN_SNAPSHOT_THRESHOLD..=MAX_SNAPSHOT_THRESHOLD).contains(&v) {
183                    Some(v)
184                } else {
185                    None
186                }
187            })
188            .unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD)
189    })
190}
191
192/// Streaming state for the current message lifecycle.
193///
194/// Tracks whether we're in the middle of streaming content and whether
195/// that content has been displayed to the user.
196#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
197pub enum StreamingState {
198    /// No active streaming - idle state
199    #[default]
200    Idle,
201    /// Currently streaming content deltas
202    Streaming,
203    /// Content has been finalized (after `MessageStop` or equivalent)
204    Finalized,
205}
206
207/// State tracking for content blocks during streaming.
208///
209/// Replaces the boolean `in_content_block` with richer state that tracks
210/// which block is active and whether output has started for that block.
211/// This prevents "glued text" bugs where block boundaries are crossed
212/// without proper finalization.
213#[derive(Debug, Clone, PartialEq, Eq, Default)]
214pub enum ContentBlockState {
215    /// Not currently inside a content block
216    #[default]
217    NotInBlock,
218    /// Inside a content block with tracking for output state
219    InBlock {
220        /// The block index/identifier
221        index: String,
222        /// Whether any content has been output for this block
223        started_output: bool,
224    },
225}
226
227/// Unified streaming session tracker.
228///
229/// Provides a single source of truth for streaming state across all parsers.
230/// Tracks:
231/// - Current streaming state (`Idle`/`Streaming`/`Finalized`)
232/// - Which content types have been streamed
233/// - Accumulated content by content type and index
234/// - Whether prefix should be shown on next delta
235/// - Delta size patterns for detecting snapshot-as-delta violations
236/// - Persistent "output started" tracking independent of accumulated content
237/// - Verbosity-aware warning emission
238///
239/// # Lifecycle
240///
241/// 1. **Start**: `on_message_start()` - resets all state
242/// 2. **Stream**: `on_text_delta()` / `on_thinking_delta()` - accumulate content
243/// 3. **Stop**: `on_message_stop()` - finalize the message
244/// 4. **Repeat**: Back to step 1 for next message
245#[derive(Debug, Default, Clone)]
246pub struct StreamingSession {
247    /// Current streaming state
248    state: StreamingState,
249    /// Track which content types have been streamed (for deduplication)
250    /// Maps `ContentType` → whether it has been streamed
251    streamed_types: HashMap<ContentType, bool>,
252    /// Track the current content block state
253    current_block: ContentBlockState,
254    /// Accumulated content by (`content_type`, index) for display
255    /// This mirrors `DeltaAccumulator` but adds deduplication tracking
256    accumulated: HashMap<(ContentType, String), String>,
257    /// Track the order of keys for `most_recent` operations
258    key_order: Vec<(ContentType, String)>,
259    /// Track recent delta sizes for pattern detection
260    /// Maps `(content_type, key)` → vec of recent delta sizes
261    delta_sizes: HashMap<(ContentType, String), Vec<usize>>,
262    /// Maximum number of delta sizes to track per key
263    max_delta_history: usize,
264    /// Track the current message ID for duplicate detection
265    current_message_id: Option<String>,
266    /// Track the last finalized message ID to detect duplicates
267    last_finalized_message_id: Option<String>,
268    /// Track which messages have been displayed to prevent duplicate final output
269    displayed_final_messages: HashSet<String>,
270    /// Track which (`content_type`, key) pairs have had output started.
271    /// This is independent of `accumulated` to handle cases where accumulated
272    /// content may be cleared (e.g., repeated `ContentBlockStart` for same index).
273    /// Cleared on `on_message_start` to ensure fresh state for each message.
274    output_started_for_key: HashSet<(ContentType, String)>,
275    /// Whether to emit verbose warnings about streaming anomalies.
276    /// When false, suppresses diagnostic warnings that are useful for debugging
277    /// but noisy in production (e.g., GLM protocol violations, snapshot detection).
278    verbose_warnings: bool,
279    /// Count of snapshot repairs performed during this session
280    snapshot_repairs_count: usize,
281    /// Count of deltas that exceeded the size threshold
282    large_delta_count: usize,
283    /// Count of protocol violations detected (e.g., `MessageStart` during streaming)
284    protocol_violations: usize,
285    /// Hash of the final streamed content (for deduplication)
286    /// Computed at `message_stop` using all accumulated content
287    final_content_hash: Option<u64>,
288    /// Track the last rendered content for each key to detect when rendering
289    /// would produce identical output (prevents visual repetition).
290    /// Maps `(content_type, key)` → the last accumulated content that was rendered.
291    last_rendered: HashMap<(ContentType, String), String>,
292    /// Track all rendered content hashes for duplicate detection.
293    /// This is preserved across `MessageStart` boundaries to prevent duplicate rendering.
294    rendered_content_hashes: HashSet<u64>,
295    /// Track the last delta for each key to detect exact duplicate deltas.
296    /// This is preserved across `MessageStart` boundaries to prevent duplicate processing.
297    /// Maps `(content_type, key)` → the last delta that was processed.
298    last_delta: HashMap<(ContentType, String), String>,
299    /// Track consecutive duplicates for resend glitch detection ("3 strikes" heuristic).
300    /// Maps `(content_type, key)` → (count, `delta_hash`) where count tracks how many
301    /// times the exact same delta has arrived consecutively. When count exceeds
302    /// the threshold, the delta is dropped as a resend glitch.
303    consecutive_duplicates: HashMap<(ContentType, String), (usize, u64)>,
304    /// Delta deduplicator using KMP and rolling hash for snapshot detection.
305    /// Provides O(n+m) guaranteed complexity for detecting snapshot-as-delta violations.
306    /// Cleared on message boundaries to prevent false positives.
307    deduplicator: DeltaDeduplicator,
308    /// Track message IDs that have been fully rendered from an assistant event BEFORE streaming.
309    /// When an assistant event arrives before streaming deltas, we render it and record
310    /// the message_id. ALL subsequent streaming deltas for that message_id should be
311    /// suppressed to prevent duplication.
312    pre_rendered_message_ids: HashSet<String>,
313    /// Track content hashes of assistant events that have been rendered during streaming.
314    /// This prevents duplicate assistant events with the same content from being rendered
315    /// multiple times. GLM/CCS may send multiple assistant events during streaming with
316    /// the same content but different message_ids.
317    /// This is preserved across `MessageStart` boundaries to handle mid-stream assistant events.
318    rendered_assistant_content_hashes: HashSet<u64>,
319    /// Track tool names by index for GLM/CCS deduplication.
320    /// GLM sends assistant events with tool_use blocks (name + input) during streaming,
321    /// but only the input is accumulated via deltas. We track the tool name to properly
322    /// reconstruct the normalized representation for deduplication.
323    /// Maps the content block index to the tool name.
324    tool_names: HashMap<u64, Option<String>>,
325}
326
327impl StreamingSession {
328    /// Create a new streaming session.
329    pub fn new() -> Self {
330        Self {
331            max_delta_history: DEFAULT_MAX_DELTA_HISTORY,
332            verbose_warnings: false,
333            ..Default::default()
334        }
335    }
336
337    /// Configure whether to emit verbose warnings about streaming anomalies.
338    ///
339    /// When enabled, diagnostic warnings are printed for:
340    /// - Repeated `MessageStart` events (GLM protocol violations)
341    /// - Large deltas that may indicate snapshot-as-delta bugs
342    /// - Pattern detection of repeated large content
343    ///
344    /// When disabled (default), these warnings are suppressed to avoid
345    /// noise in production output.
346    ///
347    /// # Arguments
348    /// * `enabled` - Whether to enable verbose warnings
349    ///
350    /// # Returns
351    /// The modified session for builder chaining.
352    ///
353    /// # Example
354    ///
355    /// ```ignore
356    /// let mut session = StreamingSession::new().with_verbose_warnings(true);
357    /// ```
358    pub const fn with_verbose_warnings(mut self, enabled: bool) -> Self {
359        self.verbose_warnings = enabled;
360        self
361    }
362
363    /// Reset the session on new message start.
364    ///
365    /// This should be called when:
366    /// - Claude: `MessageStart` event
367    /// - Codex: `TurnStarted` event
368    /// - Gemini: `init` event or new message
369    /// - `OpenCode`: New part starts
370    ///
371    /// # Arguments
372    /// * `message_id` - Optional unique identifier for this message (for deduplication)
373    ///
374    /// # Note on Repeated `MessageStart` Events
375    ///
376    /// Some agents (notably GLM/ccs-glm) send repeated `MessageStart` events during
377    /// a single logical streaming session. When this happens while state is `Streaming`,
378    /// we preserve `output_started_for_key` to prevent prefix spam on each delta that
379    /// follows the repeated `MessageStart`. This is a defensive measure to handle
380    /// non-standard agent protocols while maintaining correct behavior for legitimate
381    /// multi-message scenarios.
382    pub fn on_message_start(&mut self) {
383        // Detect repeated MessageStart during active streaming
384        let is_mid_stream_restart = self.state == StreamingState::Streaming;
385
386        if is_mid_stream_restart {
387            // Track protocol violation
388            self.protocol_violations += 1;
389            // Log the contract violation for debugging (only if verbose warnings enabled)
390            if self.verbose_warnings {
391                eprintln!(
392                    "Warning: Received MessageStart while state is Streaming. \
393                    This indicates a non-standard agent protocol (e.g., GLM sending \
394                    repeated MessageStart events). Preserving output_started_for_key \
395                    to prevent prefix spam. File: streaming_state.rs, Line: {}",
396                    line!()
397                );
398            }
399
400            // Preserve output_started_for_key to prevent prefix spam.
401            // std::mem::take replaces the HashSet with an empty one and returns the old values,
402            // which we restore after clearing other state. This ensures repeated MessageStart
403            // events don't reset output tracking, preventing duplicate prefix display.
404            let preserved_output_started = std::mem::take(&mut self.output_started_for_key);
405
406            // Also preserve last_delta to detect duplicate deltas across MessageStart boundaries
407            let preserved_last_delta = std::mem::take(&mut self.last_delta);
408
409            // Also preserve rendered_content_hashes to detect duplicate rendering across MessageStart
410            let preserved_rendered_hashes = std::mem::take(&mut self.rendered_content_hashes);
411
412            // Also preserve consecutive_duplicates to detect resend glitches across MessageStart
413            let preserved_consecutive_duplicates = std::mem::take(&mut self.consecutive_duplicates);
414
415            self.state = StreamingState::Idle;
416            self.streamed_types.clear();
417            self.current_block = ContentBlockState::NotInBlock;
418            self.accumulated.clear();
419            self.key_order.clear();
420            self.delta_sizes.clear();
421            self.last_rendered.clear();
422            self.deduplicator.clear();
423            self.tool_names.clear();
424
425            // Restore preserved state
426            self.output_started_for_key = preserved_output_started;
427            self.last_delta = preserved_last_delta;
428            self.rendered_content_hashes = preserved_rendered_hashes;
429            self.consecutive_duplicates = preserved_consecutive_duplicates;
430        } else {
431            // Normal reset for new message
432            self.state = StreamingState::Idle;
433            self.streamed_types.clear();
434            self.current_block = ContentBlockState::NotInBlock;
435            self.accumulated.clear();
436            self.key_order.clear();
437            self.delta_sizes.clear();
438            self.output_started_for_key.clear();
439            self.last_rendered.clear();
440            self.last_delta.clear();
441            self.rendered_content_hashes.clear();
442            self.consecutive_duplicates.clear();
443            self.deduplicator.clear();
444            self.tool_names.clear();
445        }
446        // Note: We don't reset current_message_id here - it's set by a separate method
447        // This allows for more flexible message ID handling
448    }
449
450    /// Set the current message ID for tracking.
451    ///
452    /// This should be called when processing a `MessageStart` event that contains
453    /// a message identifier. Used to prevent duplicate display of final messages.
454    ///
455    /// # Arguments
456    /// * `message_id` - The unique identifier for this message (or None to clear)
457    pub fn set_current_message_id(&mut self, message_id: Option<String>) {
458        self.current_message_id = message_id;
459    }
460
461    /// Get the current message ID.
462    ///
463    /// # Returns
464    /// * `Some(id)` - The current message ID
465    /// * `None` - No message ID is set
466    pub fn get_current_message_id(&self) -> Option<&str> {
467        self.current_message_id.as_deref()
468    }
469
470    /// Check if a message ID represents a duplicate final message.
471    ///
472    /// This prevents displaying the same message twice - once after streaming
473    /// completes and again when the final "Assistant" event arrives.
474    ///
475    /// # Arguments
476    /// * `message_id` - The message ID to check
477    ///
478    /// # Returns
479    /// * `true` - This message has already been displayed (is a duplicate)
480    /// * `false` - This is a new message
481    pub fn is_duplicate_final_message(&self, message_id: &str) -> bool {
482        self.displayed_final_messages.contains(message_id)
483    }
484
485    /// Mark a message as displayed to prevent duplicate display.
486    ///
487    /// This should be called after displaying a message's final content.
488    ///
489    /// # Arguments
490    /// * `message_id` - The message ID to mark as displayed
491    pub fn mark_message_displayed(&mut self, message_id: &str) {
492        self.displayed_final_messages.insert(message_id.to_string());
493        self.last_finalized_message_id = Some(message_id.to_string());
494    }
495
496    /// Mark that an assistant event has pre-rendered content BEFORE streaming started.
497    ///
498    /// This is used to handle the case where an assistant event arrives with full content
499    /// BEFORE any streaming deltas. When this happens, we render the assistant event content
500    /// and mark the message_id as pre-rendered. ALL subsequent streaming deltas for the
501    /// same message_id should be suppressed to prevent duplication.
502    ///
503    /// # Arguments
504    /// * `message_id` - The message ID that was pre-rendered
505    pub fn mark_message_pre_rendered(&mut self, message_id: &str) {
506        self.pre_rendered_message_ids.insert(message_id.to_string());
507    }
508
509    /// Check if a message was pre-rendered from an assistant event.
510    ///
511    /// This checks if the given message_id was previously rendered from an assistant
512    /// event (before streaming started). If so, ALL subsequent streaming deltas for
513    /// this message should be suppressed.
514    ///
515    /// # Arguments
516    /// * `message_id` - The message ID to check
517    ///
518    /// # Returns
519    /// * `true` - This message was pre-rendered, suppress all streaming output
520    /// * `false` - This message was not pre-rendered, allow streaming output
521    pub fn is_message_pre_rendered(&self, message_id: &str) -> bool {
522        self.pre_rendered_message_ids.contains(message_id)
523    }
524
525    /// Check if assistant event content has already been rendered.
526    ///
527    /// This prevents duplicate assistant events with the same content from being rendered
528    /// multiple times. GLM/CCS may send multiple assistant events during streaming with
529    /// the same content but different message_ids.
530    ///
531    /// # Arguments
532    /// * `content_hash` - The hash of the assistant event content
533    ///
534    /// # Returns
535    /// * `true` - This content was already rendered, suppress rendering
536    /// * `false` - This content was not rendered, allow rendering
537    pub fn is_assistant_content_rendered(&self, content_hash: u64) -> bool {
538        self.rendered_assistant_content_hashes
539            .contains(&content_hash)
540    }
541
542    /// Mark assistant event content as having been rendered.
543    ///
544    /// This should be called after rendering an assistant event to prevent
545    /// duplicate rendering of the same content.
546    ///
547    /// # Arguments
548    /// * `content_hash` - The hash of the assistant event content that was rendered
549    pub fn mark_assistant_content_rendered(&mut self, content_hash: u64) {
550        self.rendered_assistant_content_hashes.insert(content_hash);
551    }
552
553    /// Mark the start of a content block.
554    ///
555    /// This should be called when:
556    /// - Claude: `ContentBlockStart` event
557    /// - Codex: `ItemStarted` with relevant type
558    /// - Gemini: Content section begins
559    /// - `OpenCode`: Part with content starts
560    ///
561    /// If we're already in a block, this method finalizes the previous block
562    /// by emitting a newline if output had started.
563    ///
564    /// # Arguments
565    /// * `index` - The content block index (for multi-block messages)
566    pub fn on_content_block_start(&mut self, index: u64) {
567        let index_str = index.to_string();
568
569        // Check if we're transitioning to a different index BEFORE finalizing.
570        // This is important because some agents (e.g., GLM) may send ContentBlockStart
571        // repeatedly for the same index, and we should NOT clear accumulated content
572        // in that case (which would cause the next delta to show prefix again).
573        let (is_same_index, old_index) = match &self.current_block {
574            ContentBlockState::NotInBlock => (false, None),
575            ContentBlockState::InBlock {
576                index: current_index,
577                ..
578            } => (current_index == &index_str, Some(current_index.clone())),
579        };
580
581        // Finalize previous block if we're in one
582        self.ensure_content_block_finalized();
583
584        // Only clear accumulated content if transitioning to a DIFFERENT index.
585        // We clear the OLD index's content, not the new one.
586        if !is_same_index {
587            if let Some(old) = old_index {
588                for content_type in [
589                    ContentType::Text,
590                    ContentType::Thinking,
591                    ContentType::ToolInput,
592                ] {
593                    let key = (content_type, old.clone());
594                    self.accumulated.remove(&key);
595                    self.key_order.retain(|k| k != &key);
596                    // Also clear output_started tracking to ensure prefix shows when switching back
597                    self.output_started_for_key.remove(&key);
598                    // Clear delta sizes for the old index to prevent incorrect pattern detection
599                    self.delta_sizes.remove(&key);
600                    self.last_rendered.remove(&key);
601                    // Clear consecutive duplicates for the old index
602                    self.consecutive_duplicates.remove(&key);
603                }
604            }
605        }
606
607        // Initialize the new content block
608        self.current_block = ContentBlockState::InBlock {
609            index: index_str,
610            started_output: false,
611        };
612    }
613
614    /// Ensure the current content block is finalized.
615    ///
616    /// If we're in a block and output has started, this returns true to indicate
617    /// that a newline should be emitted. This prevents "glued text" bugs where
618    /// content from different blocks is concatenated without separation.
619    ///
620    /// # Returns
621    /// * `true` - A newline should be emitted (output had started)
622    /// * `false` - No newline needed (no output or not in a block)
623    fn ensure_content_block_finalized(&mut self) -> bool {
624        if let ContentBlockState::InBlock { started_output, .. } = &self.current_block {
625            let had_output = *started_output;
626            self.current_block = ContentBlockState::NotInBlock;
627            had_output
628        } else {
629            false
630        }
631    }
632
633    /// Assert that the session is in a valid lifecycle state.
634    ///
635    /// In debug builds, this will panic if the current state doesn't match
636    /// any of the expected states. In release builds, this does nothing.
637    ///
638    /// # Arguments
639    /// * `expected` - Slice of acceptable states
640    fn assert_lifecycle_state(&self, expected: &[StreamingState]) {
641        #[cfg(debug_assertions)]
642        assert!(
643            expected.contains(&self.state),
644            "Invalid lifecycle state: expected {:?}, got {:?}. \
645            This indicates a bug in the parser's event handling.",
646            expected,
647            self.state
648        );
649        #[cfg(not(debug_assertions))]
650        let _ = expected;
651    }
652
653    /// Process a text delta and return whether prefix should be shown.
654    ///
655    /// # Arguments
656    /// * `index` - The content block index
657    /// * `delta` - The text delta to accumulate
658    ///
659    /// # Returns
660    /// * `true` - Show prefix with this delta (first chunk)
661    /// * `false` - Don't show prefix (subsequent chunks)
662    pub fn on_text_delta(&mut self, index: u64, delta: &str) -> bool {
663        self.on_text_delta_key(&index.to_string(), delta)
664    }
665
666    /// Check for consecutive duplicate delta using the "3 strikes" heuristic.
667    ///
668    /// Detects resend glitches where the exact same delta arrives repeatedly.
669    /// Returns true if the delta should be dropped (exceeded threshold), false otherwise.
670    ///
671    /// # Arguments
672    /// * `content_key` - The content key to check
673    /// * `delta` - The delta to check
674    /// * `key_str` - The string key for logging
675    ///
676    /// # Returns
677    /// * `true` - The delta should be dropped (consecutive duplicate exceeded threshold)
678    /// * `false` - The delta should be processed
679    fn check_consecutive_duplicate(
680        &mut self,
681        content_key: &(ContentType, String),
682        delta: &str,
683        key_str: &str,
684    ) -> bool {
685        let delta_hash = RollingHashWindow::compute_hash(delta);
686        let thresholds = get_overlap_thresholds();
687
688        if let Some((count, prev_hash)) = self.consecutive_duplicates.get_mut(content_key) {
689            if *prev_hash == delta_hash {
690                *count += 1;
691                // Check if we've exceeded the consecutive duplicate threshold
692                if *count >= thresholds.consecutive_duplicate_threshold {
693                    // This is a resend glitch - drop the delta entirely
694                    if self.verbose_warnings {
695                        eprintln!(
696                            "Warning: Dropping consecutive duplicate delta (count={count}, threshold={}). \
697                            This appears to be a resend glitch. Key: '{key_str}', Delta: {delta:?}",
698                            thresholds.consecutive_duplicate_threshold
699                        );
700                    }
701                    // Don't update last_delta - preserve previous for comparison
702                    return true;
703                }
704            } else {
705                // Different delta - reset count and update hash
706                *count = 1;
707                *prev_hash = delta_hash;
708            }
709        } else {
710            // First occurrence of this delta
711            self.consecutive_duplicates
712                .insert(content_key.clone(), (1, delta_hash));
713        }
714
715        false
716    }
717
718    /// Process a text delta with a string key and return whether prefix should be shown.
719    ///
720    /// This variant is for parsers that use string keys instead of numeric indices
721    /// (e.g., Codex uses `agent_msg`, `reasoning`; Gemini uses `main`; `OpenCode` uses `main`).
722    ///
723    /// # Delta Validation
724    ///
725    /// This method validates that incoming content appears to be a genuine delta
726    /// (small chunk) rather than a snapshot (full accumulated content). Large "deltas"
727    /// that exceed `snapshot_threshold()` trigger a warning as they may indicate a
728    /// contract violation.
729    ///
730    /// Additionally, we track patterns of delta sizes to detect repeated large
731    /// content being sent as if it were incremental (a common snapshot-as-delta bug).
732    ///
733    /// # Arguments
734    /// * `key` - The content key (e.g., `main`, `agent_msg`, `reasoning`)
735    /// * `delta` - The text delta to accumulate
736    ///
737    /// # Returns
738    /// * `true` - Show prefix with this delta (first chunk)
739    /// * `false` - Don't show prefix (subsequent chunks)
740    pub fn on_text_delta_key(&mut self, key: &str, delta: &str) -> bool {
741        // Lifecycle enforcement: deltas should only arrive during streaming
742        // or idle (first delta starts streaming), never after finalization
743        self.assert_lifecycle_state(&[StreamingState::Idle, StreamingState::Streaming]);
744
745        let content_key = (ContentType::Text, key.to_string());
746        let delta_size = delta.len();
747
748        // Track delta size and warn on large deltas BEFORE duplicate check
749        // This ensures we track all received deltas even if they're duplicates
750        if delta_size > snapshot_threshold() {
751            self.large_delta_count += 1;
752            if self.verbose_warnings {
753                eprintln!(
754                    "Warning: Large delta ({delta_size} chars) for key '{key}'. \
755                    This may indicate unusual streaming behavior or a snapshot being sent as a delta."
756                );
757            }
758        }
759
760        // Track delta size for pattern detection
761        {
762            let sizes = self.delta_sizes.entry(content_key.clone()).or_default();
763            sizes.push(delta_size);
764
765            // Keep only the most recent delta sizes
766            if sizes.len() > self.max_delta_history {
767                sizes.remove(0);
768            }
769        }
770
771        // Check for exact duplicate delta (same delta sent twice)
772        // This handles the ccs-glm repeated MessageStart scenario where the same
773        // delta is sent multiple times. We skip processing exact duplicates ONLY when
774        // the accumulated content is empty (indicating we just had a MessageStart and
775        // this is a true duplicate, not just a repeated token in normal streaming).
776        if let Some(last) = self.last_delta.get(&content_key) {
777            if delta == last {
778                // Check if accumulated content is empty (just after MessageStart)
779                if let Some(current_accumulated) = self.accumulated.get(&content_key) {
780                    // If accumulated content is empty, this is likely a ccs-glm duplicate
781                    if current_accumulated.is_empty() {
782                        // Skip without updating last_delta (to preserve previous delta for comparison)
783                        return false;
784                    }
785                } else {
786                    // No accumulated content yet, definitely after MessageStart
787                    // Skip without updating last_delta
788                    return false;
789                }
790            }
791        }
792
793        // Consecutive duplicate detection ("3 strikes" heuristic)
794        // Detects resend glitches where the exact same delta arrives repeatedly.
795        // This is different from the above check - it tracks HOW MANY TIMES
796        // the same delta has arrived consecutively, not just if it matches once.
797        if self.check_consecutive_duplicate(&content_key, delta, key) {
798            return false;
799        }
800
801        // Auto-repair: Check if this is a snapshot being sent as a delta
802        // Do this BEFORE any mutable borrows so we can use immutable methods.
803        // Use content-based detection which is more reliable than size-based alone.
804        let is_snapshot = self.is_likely_snapshot(delta, key);
805        let actual_delta = if is_snapshot {
806            // Extract only the new portion to prevent exponential duplication
807            match self.get_delta_from_snapshot(delta, key) {
808                Ok(extracted) => {
809                    // Track successful snapshot repair
810                    self.snapshot_repairs_count += 1;
811                    extracted.to_string()
812                }
813                Err(e) => {
814                    // Snapshot detection had a false positive - use the original delta
815                    if self.verbose_warnings {
816                        eprintln!(
817                            "Warning: Snapshot extraction failed: {e}. Using original delta."
818                        );
819                    }
820                    delta.to_string()
821                }
822            }
823        } else {
824            // Genuine delta - use as-is
825            delta.to_string()
826        };
827
828        // Pattern detection: Check if we're seeing repeated large deltas
829        // This indicates the same content is being sent repeatedly (snapshot-as-delta)
830        let sizes = self.delta_sizes.get(&content_key);
831        if let Some(sizes) = sizes {
832            if sizes.len() >= DEFAULT_PATTERN_DETECTION_MIN_DELTAS && self.verbose_warnings {
833                // Check if at least 3 of the last N deltas were large
834                let large_count = sizes.iter().filter(|&&s| s > snapshot_threshold()).count();
835                if large_count >= DEFAULT_PATTERN_DETECTION_MIN_DELTAS {
836                    eprintln!(
837                        "Warning: Detected pattern of {large_count} large deltas for key '{key}'. \
838                        This strongly suggests a snapshot-as-delta bug where the same \
839                        large content is being sent repeatedly. File: streaming_state.rs, Line: {}",
840                        line!()
841                    );
842                }
843            }
844        }
845
846        // If the actual delta is empty (identical content detected), skip processing
847        if actual_delta.is_empty() {
848            // Return false to indicate no prefix should be shown (content unchanged)
849            return false;
850        }
851
852        // Mark that we're streaming text content
853        self.streamed_types.insert(ContentType::Text, true);
854        self.state = StreamingState::Streaming;
855
856        // Update block state to track this block and mark output as started
857        self.current_block = ContentBlockState::InBlock {
858            index: key.to_string(),
859            started_output: true,
860        };
861
862        // Check if this is the first delta for this key using output_started_for_key
863        // This is independent of accumulated content to handle cases where accumulated
864        // content may be cleared (e.g., repeated ContentBlockStart for same index)
865        let is_first = !self.output_started_for_key.contains(&content_key);
866
867        // Mark that output has started for this key
868        self.output_started_for_key.insert(content_key.clone());
869
870        // Accumulate the delta (using auto-repaired delta if snapshot was detected)
871        self.accumulated
872            .entry(content_key.clone())
873            .and_modify(|buf| buf.push_str(&actual_delta))
874            .or_insert_with(|| actual_delta);
875
876        // Track the last delta for duplicate detection
877        // Use the original delta for tracking (not the auto-repaired version)
878        self.last_delta
879            .insert(content_key.clone(), delta.to_string());
880
881        // Track order
882        if is_first {
883            self.key_order.push(content_key);
884        }
885
886        // Show prefix only on the very first delta
887        is_first
888    }
889
890    /// Process a thinking delta and return whether prefix should be shown.
891    ///
892    /// # Arguments
893    /// * `index` - The content block index
894    /// * `delta` - The thinking delta to accumulate
895    ///
896    /// # Returns
897    /// * `true` - Show prefix with this delta (first chunk)
898    /// * `false` - Don't show prefix (subsequent chunks)
899    pub fn on_thinking_delta(&mut self, index: u64, delta: &str) -> bool {
900        self.on_thinking_delta_key(&index.to_string(), delta)
901    }
902
903    /// Process a thinking delta with a string key and return whether prefix should be shown.
904    ///
905    /// This variant is for parsers that use string keys instead of numeric indices.
906    ///
907    /// # Arguments
908    /// * `key` - The content key (e.g., "reasoning")
909    /// * `delta` - The thinking delta to accumulate
910    ///
911    /// # Returns
912    /// * `true` - Show prefix with this delta (first chunk)
913    /// * `false` - Don't show prefix (subsequent chunks)
914    pub fn on_thinking_delta_key(&mut self, key: &str, delta: &str) -> bool {
915        // Mark that we're streaming thinking content
916        self.streamed_types.insert(ContentType::Thinking, true);
917        self.state = StreamingState::Streaming;
918
919        // Get the key for this content
920        let content_key = (ContentType::Thinking, key.to_string());
921
922        // Check if this is the first delta for this key using output_started_for_key
923        let is_first = !self.output_started_for_key.contains(&content_key);
924
925        // Mark that output has started for this key
926        self.output_started_for_key.insert(content_key.clone());
927
928        // Accumulate the delta
929        self.accumulated
930            .entry(content_key.clone())
931            .and_modify(|buf| buf.push_str(delta))
932            .or_insert_with(|| delta.to_string());
933
934        // Track order
935        if is_first {
936            self.key_order.push(content_key);
937        }
938
939        is_first
940    }
941
942    /// Process a tool input delta.
943    ///
944    /// # Arguments
945    /// * `index` - The content block index
946    /// * `delta` - The tool input delta to accumulate
947    pub fn on_tool_input_delta(&mut self, index: u64, delta: &str) {
948        // Mark that we're streaming tool input
949        self.streamed_types.insert(ContentType::ToolInput, true);
950        self.state = StreamingState::Streaming;
951
952        // Get the key for this content
953        let key = (ContentType::ToolInput, index.to_string());
954
955        // Accumulate the delta
956        self.accumulated
957            .entry(key.clone())
958            .and_modify(|buf| buf.push_str(delta))
959            .or_insert_with(|| delta.to_string());
960
961        // Track order
962        if !self.key_order.contains(&key) {
963            self.key_order.push(key);
964        }
965    }
966
967    /// Record the tool name for a specific content block index.
968    ///
969    /// This is used for GLM/CCS deduplication where assistant events contain
970    /// tool_use blocks (name + input) but streaming only accumulates the input.
971    /// By tracking the name separately, we can reconstruct the normalized
972    /// representation for proper hash-based deduplication.
973    ///
974    /// # Arguments
975    /// * `index` - The content block index
976    /// * `name` - The tool name (if available)
977    pub fn set_tool_name(&mut self, index: u64, name: Option<String>) {
978        self.tool_names.insert(index, name);
979    }
980
981    /// Finalize the message on stop event.
982    ///
983    /// This should be called when:
984    /// - Claude: `MessageStop` event
985    /// - Codex: `TurnCompleted` or `ItemCompleted` with text
986    /// - Gemini: Message completion
987    /// - `OpenCode`: Part completion
988    ///
989    /// # Returns
990    /// * `true` - A completion newline should be emitted (was in a content block)
991    /// * `false` - No completion needed (no content block active)
992    pub fn on_message_stop(&mut self) -> bool {
993        let was_in_block = self.ensure_content_block_finalized();
994        self.state = StreamingState::Finalized;
995
996        // Compute content hash for deduplication
997        self.final_content_hash = self.compute_content_hash();
998
999        // Mark the current message as displayed to prevent duplicate display
1000        // when the final "Assistant" event arrives
1001        if let Some(message_id) = self.current_message_id.clone() {
1002            self.mark_message_displayed(&message_id);
1003        }
1004
1005        was_in_block
1006    }
1007
1008    /// Check if ANY content has been streamed for this message.
1009    ///
1010    /// This is a broader check that returns true if ANY content type
1011    /// has been streamed. Used to skip entire message display when
1012    /// all content was already streamed.
1013    pub fn has_any_streamed_content(&self) -> bool {
1014        !self.streamed_types.is_empty()
1015    }
1016
1017    /// Compute hash of all accumulated content for deduplication.
1018    ///
1019    /// This computes a hash of ALL accumulated content across all content types
1020    /// and indices. This is used to detect if a final message contains the same
1021    /// content that was already streamed.
1022    ///
1023    /// # Returns
1024    /// * `Some(hash)` - Hash of all accumulated content, or None if no content
1025    fn compute_content_hash(&self) -> Option<u64> {
1026        if self.accumulated.is_empty() {
1027            return None;
1028        }
1029
1030        let mut hasher = DefaultHasher::new();
1031
1032        // Collect and sort keys for consistent hashing
1033        // Sort by numeric index (not lexicographic) to preserve actual message order.
1034        // This is critical for correctness when indices don't sort lexicographically
1035        // (e.g., 0, 1, 10, 2 should sort as 0, 1, 2, 10).
1036        let mut keys: Vec<_> = self.accumulated.keys().collect();
1037        keys.sort_by_key(|k| {
1038            let type_order = match k.0 {
1039                ContentType::Text => 0,
1040                ContentType::ToolInput => 1,
1041                ContentType::Thinking => 2,
1042            };
1043            let index = k.1.parse::<u64>().unwrap_or(u64::MAX);
1044            (index, type_order)
1045        });
1046
1047        for key in keys {
1048            if let Some(content) = self.accumulated.get(key) {
1049                // Hash the key and content together
1050                format!("{:?}-{}", key.0, key.1).hash(&mut hasher);
1051                content.hash(&mut hasher);
1052            }
1053        }
1054
1055        Some(hasher.finish())
1056    }
1057
1058    /// Check if content matches the previously streamed content by hash.
1059    ///
1060    /// This is a more precise alternative to `has_any_streamed_content()` for
1061    /// deduplication. Instead of checking if ANY content was streamed, this checks
1062    /// if the EXACT content was streamed by comparing hashes.
1063    ///
1064    /// This method looks at ALL accumulated content across all content types and indices.
1065    /// If the combined accumulated content matches the input, it returns true.
1066    ///
1067    /// # Arguments
1068    /// * `content` - The content to check (typically normalized content from assistant events)
1069    /// * `tool_name_hints` - Optional tool names from assistant event (by content block index)
1070    ///
1071    /// # Returns
1072    /// * `true` - The content hash matches the previously streamed content
1073    /// * `false` - The content is different or no content was streamed
1074    pub fn is_duplicate_by_hash(
1075        &self,
1076        content: &str,
1077        tool_name_hints: Option<&std::collections::HashMap<usize, String>>,
1078    ) -> bool {
1079        // Check if content contains both text and tool_use markers (mixed content)
1080        let has_tool_use = content.contains("TOOL_USE:");
1081        let has_text = self
1082            .accumulated
1083            .iter()
1084            .any(|((ct, _), v)| *ct == ContentType::Text && !v.is_empty());
1085
1086        if has_tool_use && has_text {
1087            // Mixed content: both text and tool_use need to be checked
1088            // Reconstruct the full normalized content from both text and tool_use
1089            return self.is_duplicate_mixed_content(content, tool_name_hints);
1090        } else if has_tool_use {
1091            // Only tool_use content
1092            return self.is_duplicate_tool_use(content, tool_name_hints);
1093        }
1094
1095        // Only text content - check if the input content matches ALL accumulated text content combined
1096        // This handles the case where assistant events arrive during streaming (before message_stop)
1097        // We collect and combine all text content in index order
1098        let mut text_keys: Vec<_> = self
1099            .accumulated
1100            .keys()
1101            .filter(|(ct, _)| *ct == ContentType::Text)
1102            .collect();
1103        // Sort by numeric index (not lexicographic) to preserve actual message order
1104        // This is critical for correctness when indices don't sort lexicographically (e.g., 0, 1, 10, 2)
1105        // unwrap_or(u64::MAX) handles non-numeric indices by sorting them last; this should
1106        // never happen in practice since GLM always sends numeric string indices, but if
1107        // it does occur, it indicates a protocol violation and will be visible in output
1108        text_keys.sort_by_key(|k| k.1.parse::<u64>().unwrap_or(u64::MAX));
1109
1110        // Combine all accumulated text content in sorted order
1111        let combined_content: String = text_keys
1112            .iter()
1113            .filter_map(|key| self.accumulated.get(key))
1114            .cloned()
1115            .collect();
1116
1117        // Direct string comparison is more reliable than hashing
1118        // because hashing can have collisions and we want exact match
1119        combined_content == content
1120    }
1121
1122    /// Check if mixed content (text + tool_use) from an assistant event matches accumulated content.
1123    ///
1124    /// This handles the case where assistant events contain both text and tool_use blocks.
1125    /// We reconstruct the full normalized content from both text and tool_use accumulated content
1126    /// and compare it against the assistant event content.
1127    ///
1128    /// # Arguments
1129    /// * `normalized_content` - Content potentially containing both text and "TOOL_USE:{name}:{input}" markers
1130    /// * `tool_name_hints` - Optional tool names from assistant event (by content block index)
1131    ///
1132    /// # Returns
1133    /// * `true` - All content (text + tool_use) matches accumulated content
1134    /// * `false` - Content differs or not accumulated yet
1135    fn is_duplicate_mixed_content(
1136        &self,
1137        normalized_content: &str,
1138        tool_name_hints: Option<&std::collections::HashMap<usize, String>>,
1139    ) -> bool {
1140        // Collect all content blocks in order (text and tool_use interleaved)
1141        // We need to reconstruct the exact order as it appears in the assistant event
1142        let mut reconstructed = String::new();
1143
1144        // Get all accumulated content keys and sort by index
1145        let mut all_keys: Vec<_> = self.accumulated.keys().collect();
1146        all_keys.sort_by_key(|k| {
1147            // Sort by index first (to preserve actual order), then by content type as tiebreaker
1148            let index = k.1.parse::<u64>().unwrap_or(u64::MAX);
1149            let type_order = match k.0 {
1150                ContentType::Text => 0,
1151                ContentType::ToolInput => 1,
1152                ContentType::Thinking => 2,
1153            };
1154            (index, type_order)
1155        });
1156
1157        for (ct, index_str) in all_keys {
1158            if let Some(accumulated_content) = self.accumulated.get(&(*ct, index_str.clone())) {
1159                match ct {
1160                    ContentType::Text => {
1161                        // Text content is added as-is
1162                        reconstructed.push_str(accumulated_content);
1163                    }
1164                    ContentType::ToolInput => {
1165                        // Tool_use content needs normalization with tool name
1166                        let index_num = index_str.parse::<u64>().unwrap_or(0);
1167                        let tool_name = tool_name_hints
1168                            .and_then(|hints| hints.get(&(index_num as usize)).map(|s| s.as_str()))
1169                            .or_else(|| self.tool_names.get(&index_num).and_then(|n| n.as_deref()))
1170                            .unwrap_or("");
1171
1172                        // Normalize: "TOOL_USE:{name}:{input}"
1173                        reconstructed
1174                            .push_str(&format!("TOOL_USE:{}:{}", tool_name, accumulated_content));
1175                    }
1176                    ContentType::Thinking => {
1177                        // Thinking content - not currently used in assistant events
1178                        // but included for completeness
1179                    }
1180                }
1181            }
1182        }
1183
1184        // Check if the reconstructed content matches the input
1185        normalized_content == reconstructed
1186    }
1187
1188    /// Check if tool_use content from an assistant event matches accumulated ToolInput.
1189    ///
1190    /// Assistant events may contain normalized tool_use blocks (with "TOOL_USE:" prefix).
1191    /// This method reconstructs the normalized representation from accumulated content
1192    /// and checks if it matches the assistant event content.
1193    ///
1194    /// # Arguments
1195    /// * `normalized_content` - Content potentially containing "TOOL_USE:{name}:{input}" markers
1196    /// * `tool_name_hints` - Optional tool names from assistant event (by content block index)
1197    ///
1198    /// # Returns
1199    /// * `true` - All tool_use blocks match accumulated content
1200    /// * `false` - Tool_use content differs or not accumulated yet
1201    fn is_duplicate_tool_use(
1202        &self,
1203        normalized_content: &str,
1204        tool_name_hints: Option<&std::collections::HashMap<usize, String>>,
1205    ) -> bool {
1206        // Reconstruct the normalized representation from accumulated content
1207        // For each tool_use index, create "TOOL_USE:{name}:{input}" and check if it's in the content
1208        let mut reconstructed = String::new();
1209
1210        // Get all ToolInput keys and sort by index
1211        let mut tool_keys: Vec<_> = self
1212            .accumulated
1213            .keys()
1214            .filter(|(ct, _)| *ct == ContentType::ToolInput)
1215            .collect();
1216        tool_keys.sort_by_key(|k| k.1.parse::<u64>().unwrap_or(u64::MAX));
1217
1218        for (ct, index_str) in tool_keys {
1219            if let Some(accumulated_input) = self.accumulated.get(&(*ct, index_str.clone())) {
1220                // Get the tool name from hints first (from assistant event), then from tracking
1221                let index_num = index_str.parse::<u64>().unwrap_or(0);
1222                let tool_name = tool_name_hints
1223                    .and_then(|hints| hints.get(&(index_num as usize)).map(|s| s.as_str()))
1224                    .or_else(|| self.tool_names.get(&index_num).and_then(|n| n.as_deref()))
1225                    .unwrap_or("");
1226
1227                // Normalize: "TOOL_USE:{name}:{input}"
1228                reconstructed.push_str(&format!("TOOL_USE:{}:{}", tool_name, accumulated_input));
1229            }
1230        }
1231
1232        // Check if the reconstructed content matches the input
1233        if reconstructed.is_empty() {
1234            return false;
1235        }
1236
1237        normalized_content == reconstructed
1238    }
1239
1240    /// Get accumulated content for a specific type and index.
1241    ///
1242    /// # Arguments
1243    /// * `content_type` - The type of content
1244    /// * `index` - The content index (as string for flexibility)
1245    ///
1246    /// # Returns
1247    /// * `Some(text)` - Accumulated content
1248    /// * `None` - No content accumulated for this key
1249    pub fn get_accumulated(&self, content_type: ContentType, index: &str) -> Option<&str> {
1250        self.accumulated
1251            .get(&(content_type, index.to_string()))
1252            .map(std::string::String::as_str)
1253    }
1254
1255    /// Mark content as having been rendered (HashMap-based tracking).
1256    ///
1257    /// This should be called after rendering to update the per-key tracking.
1258    ///
1259    /// # Arguments
1260    /// * `content_type` - The type of content
1261    /// * `index` - The content index (as string for flexibility)
1262    pub fn mark_rendered(&mut self, content_type: ContentType, index: &str) {
1263        let content_key = (content_type, index.to_string());
1264
1265        // Store the current accumulated content as last rendered
1266        if let Some(current) = self.accumulated.get(&content_key) {
1267            self.last_rendered.insert(content_key, current.clone());
1268        }
1269    }
1270
1271    /// Check if content has been rendered before using hash-based tracking.
1272    ///
1273    /// This provides global duplicate detection across all content by computing
1274    /// a hash of the accumulated content and checking if it's in the rendered set.
1275    /// This is preserved across `MessageStart` boundaries to prevent duplicate rendering.
1276    ///
1277    /// # Arguments
1278    /// * `content_type` - The type of content
1279    /// * `index` - The content index (as string for flexibility)
1280    ///
1281    /// # Returns
1282    /// * `true` - This exact content has been rendered before
1283    /// * `false` - This exact content has not been rendered
1284    #[cfg(test)]
1285    pub fn is_content_rendered(&self, content_type: ContentType, index: &str) -> bool {
1286        let content_key = (content_type, index.to_string());
1287
1288        // Check if we have accumulated content for this key
1289        if let Some(current) = self.accumulated.get(&content_key) {
1290            // Compute hash of current accumulated content
1291            let mut hasher = DefaultHasher::new();
1292            current.hash(&mut hasher);
1293            let hash = hasher.finish();
1294
1295            // Check if this hash has been rendered before
1296            return self.rendered_content_hashes.contains(&hash);
1297        }
1298
1299        false
1300    }
1301
1302    /// Check if content has been rendered before and starts with previously rendered content.
1303    ///
1304    /// This method detects when new content extends previously rendered content,
1305    /// indicating an in-place update should be performed (e.g., using carriage return).
1306    ///
1307    /// With the new KMP + Rolling Hash approach, this checks if output has started
1308    /// for this key, which indicates we're in an in-place update scenario.
1309    ///
1310    /// # Arguments
1311    /// * `content_type` - The type of content
1312    /// * `index` - The content index (as string for flexibility)
1313    ///
1314    /// # Returns
1315    /// * `true` - Output has started for this key (do in-place update)
1316    /// * `false` - Output has not started for this key (show new content)
1317    pub fn has_rendered_prefix(&self, content_type: ContentType, index: &str) -> bool {
1318        let content_key = (content_type, index.to_string());
1319        self.output_started_for_key.contains(&content_key)
1320    }
1321
1322    /// Mark content as rendered using hash-based tracking.
1323    ///
1324    /// This method updates the `rendered_content_hashes` set to track all
1325    /// content that has been rendered for deduplication.
1326    ///
1327    /// # Arguments
1328    /// * `content_type` - The type of content
1329    /// * `index` - The content index (as string for flexibility)
1330    #[cfg(test)]
1331    pub fn mark_content_rendered(&mut self, content_type: ContentType, index: &str) {
1332        // Also update last_rendered for compatibility
1333        self.mark_rendered(content_type, index);
1334
1335        // Add the hash of the accumulated content to the rendered set
1336        let content_key = (content_type, index.to_string());
1337        if let Some(current) = self.accumulated.get(&content_key) {
1338            let mut hasher = DefaultHasher::new();
1339            current.hash(&mut hasher);
1340            let hash = hasher.finish();
1341            self.rendered_content_hashes.insert(hash);
1342        }
1343    }
1344
1345    /// Mark content as rendered using pre-sanitized content.
1346    ///
1347    /// This method uses the sanitized content (with whitespace normalized)
1348    /// for hash-based deduplication, which prevents duplicates when the
1349    /// accumulated content differs only by whitespace.
1350    ///
1351    /// # Arguments
1352    /// * `content_type` - The type of content
1353    /// * `index` - The content index (as string for flexibility)
1354    /// * `content` - The content to hash
1355    pub fn mark_content_hash_rendered(
1356        &mut self,
1357        content_type: ContentType,
1358        index: &str,
1359        content: &str,
1360    ) {
1361        // Also update last_rendered for compatibility
1362        self.mark_rendered(content_type, index);
1363
1364        // Add the hash of the content to the rendered set
1365        let mut hasher = DefaultHasher::new();
1366        content.hash(&mut hasher);
1367        let hash = hasher.finish();
1368        self.rendered_content_hashes.insert(hash);
1369    }
1370
1371    /// Check if sanitized content has already been rendered.
1372    ///
1373    /// This method checks the hash of the sanitized content against the
1374    /// rendered set to prevent duplicate rendering.
1375    ///
1376    /// # Arguments
1377    /// * `_content_type` - The type of content (kept for API consistency)
1378    /// * `_index` - The content index (kept for API consistency)
1379    /// * `sanitized_content` - The sanitized content to check
1380    ///
1381    /// # Returns
1382    /// * `true` - This exact content has been rendered before
1383    /// * `false` - This exact content has not been rendered
1384    pub fn is_content_hash_rendered(
1385        &self,
1386        _content_type: ContentType,
1387        _index: &str,
1388        content: &str,
1389    ) -> bool {
1390        // Compute hash of exact content
1391        let mut hasher = DefaultHasher::new();
1392        content.hash(&mut hasher);
1393        let hash = hasher.finish();
1394
1395        // Check if this hash has been rendered before
1396        self.rendered_content_hashes.contains(&hash)
1397    }
1398
1399    /// Check if incoming text is likely a snapshot (full accumulated content) rather than a delta.
1400    ///
1401    /// This uses the KMP + Rolling Hash algorithm for efficient O(n+m) snapshot detection.
1402    /// The two-phase approach ensures optimal performance:
1403    /// 1. Rolling hash for fast O(n) filtering
1404    /// 2. KMP for exact O(n+m) verification
1405    ///
1406    /// # Arguments
1407    /// * `text` - The incoming text to check
1408    /// * `key` - The content key to compare against
1409    ///
1410    /// # Returns
1411    /// * `true` - The text appears to be a snapshot (starts with previous accumulated content)
1412    /// * `false` - The text appears to be a genuine delta
1413    pub fn is_likely_snapshot(&self, text: &str, key: &str) -> bool {
1414        let content_key = (ContentType::Text, key.to_string());
1415
1416        // Check if we have accumulated content for this key
1417        if let Some(previous) = self.accumulated.get(&content_key) {
1418            // Use DeltaDeduplicator with threshold-aware snapshot detection
1419            // This prevents false positives by requiring strong overlap (>=30 chars, >=50% ratio)
1420            return DeltaDeduplicator::is_likely_snapshot_with_thresholds(text, previous);
1421        }
1422
1423        false
1424    }
1425
1426    /// Extract the delta portion from a snapshot.
1427    ///
1428    /// When a snapshot is detected (full accumulated content sent as a "delta"),
1429    /// this method extracts only the new portion that hasn't been accumulated yet.
1430    ///
1431    /// # Arguments
1432    /// * `text` - The snapshot text (full accumulated content + new content)
1433    /// * `key` - The content key to compare against
1434    ///
1435    /// # Returns
1436    /// * `Ok(usize)` - The length of the delta portion (new content only)
1437    /// * `Err` - If the text is not actually a snapshot (doesn't start with accumulated content)
1438    ///
1439    /// # Note
1440    /// Returns the length of the delta portion as `usize` since we can't return
1441    /// a reference to `text` with the correct lifetime. Callers can slice `text`
1442    /// themselves using `&text[delta_len..]`.
1443    pub fn extract_delta_from_snapshot(&self, text: &str, key: &str) -> Result<usize, String> {
1444        let content_key = (ContentType::Text, key.to_string());
1445
1446        if let Some(previous) = self.accumulated.get(&content_key) {
1447            // Use DeltaDeduplicator with threshold-aware delta extraction
1448            // This ensures we only extract when overlap meets strong criteria
1449            if let Some(new_content) =
1450                DeltaDeduplicator::extract_new_content_with_thresholds(text, previous)
1451            {
1452                // Calculate the position where new content starts
1453                let delta_start = text.len() - new_content.len();
1454                return Ok(delta_start);
1455            }
1456        }
1457
1458        // If we get here, the text wasn't actually a snapshot
1459        // This could indicate a false positive from is_likely_snapshot
1460        Err(format!(
1461            "extract_delta_from_snapshot called on non-snapshot text. \
1462            key={key:?}, text={text:?}. Snapshot detection may have had a false positive."
1463        ))
1464    }
1465
1466    /// Get the delta portion as a string slice from a snapshot.
1467    ///
1468    /// This is a convenience wrapper that returns the actual substring
1469    /// instead of just the length.
1470    ///
1471    /// # Returns
1472    /// * `Ok(&str)` - The delta portion (new content only)
1473    /// * `Err` - If the text is not actually a snapshot
1474    pub fn get_delta_from_snapshot<'a>(&self, text: &'a str, key: &str) -> Result<&'a str, String> {
1475        let delta_len = self.extract_delta_from_snapshot(text, key)?;
1476        Ok(&text[delta_len..])
1477    }
1478
1479    /// Get streaming quality metrics for the current session.
1480    ///
1481    /// Returns aggregated metrics about delta sizes and streaming patterns
1482    /// during the session. This is useful for debugging and analyzing
1483    /// streaming behavior.
1484    ///
1485    /// # Returns
1486    /// Aggregated metrics across all content types and keys.
1487    pub fn get_streaming_quality_metrics(&self) -> StreamingQualityMetrics {
1488        // Flatten all delta sizes across all content types and keys
1489        let all_sizes = self.delta_sizes.values().flat_map(|v| v.iter().copied());
1490        let mut metrics = StreamingQualityMetrics::from_sizes(all_sizes);
1491
1492        // Add session-level metrics
1493        metrics.snapshot_repairs_count = self.snapshot_repairs_count;
1494        metrics.large_delta_count = self.large_delta_count;
1495        metrics.protocol_violations = self.protocol_violations;
1496
1497        metrics
1498    }
1499}
1500
1501#[cfg(test)]
1502mod tests {
1503    use super::*;
1504
1505    #[test]
1506    fn test_session_lifecycle() {
1507        let mut session = StreamingSession::new();
1508
1509        // Initially no content streamed
1510        assert!(!session.has_any_streamed_content());
1511
1512        // Message start
1513        session.on_message_start();
1514        assert!(!session.has_any_streamed_content());
1515
1516        // Text delta
1517        let show_prefix = session.on_text_delta(0, "Hello");
1518        assert!(show_prefix);
1519        assert!(session.has_any_streamed_content());
1520
1521        // Another delta
1522        let show_prefix = session.on_text_delta(0, " World");
1523        assert!(!show_prefix);
1524
1525        // Message stop
1526        let was_in_block = session.on_message_stop();
1527        assert!(was_in_block);
1528    }
1529
1530    #[test]
1531    fn test_accumulated_content() {
1532        let mut session = StreamingSession::new();
1533        session.on_message_start();
1534
1535        session.on_text_delta(0, "Hello");
1536        session.on_text_delta(0, " World");
1537
1538        let accumulated = session.get_accumulated(ContentType::Text, "0");
1539        assert_eq!(accumulated, Some("Hello World"));
1540    }
1541
1542    #[test]
1543    fn test_reset_between_messages() {
1544        let mut session = StreamingSession::new();
1545
1546        // First message
1547        session.on_message_start();
1548        session.on_text_delta(0, "First");
1549        assert!(session.has_any_streamed_content());
1550        session.on_message_stop();
1551
1552        // Second message - state should be reset
1553        session.on_message_start();
1554        assert!(!session.has_any_streamed_content());
1555    }
1556
1557    #[test]
1558    fn test_multiple_indices() {
1559        let mut session = StreamingSession::new();
1560        session.on_message_start();
1561
1562        session.on_text_delta(0, "First block");
1563        session.on_text_delta(1, "Second block");
1564
1565        assert_eq!(
1566            session.get_accumulated(ContentType::Text, "0"),
1567            Some("First block")
1568        );
1569        assert_eq!(
1570            session.get_accumulated(ContentType::Text, "1"),
1571            Some("Second block")
1572        );
1573    }
1574
1575    #[test]
1576    fn test_clear_index() {
1577        // Behavioral test: verify that creating a new session gives clean state
1578        // instead of testing clear_index() which is now removed
1579        let mut session = StreamingSession::new();
1580        session.on_message_start();
1581
1582        session.on_text_delta(0, "Before");
1583        // Instead of clearing, verify that a new session starts fresh
1584        let mut fresh_session = StreamingSession::new();
1585        fresh_session.on_message_start();
1586        fresh_session.on_text_delta(0, "After");
1587
1588        assert_eq!(
1589            fresh_session.get_accumulated(ContentType::Text, "0"),
1590            Some("After")
1591        );
1592        // Original session should still have "Before"
1593        assert_eq!(
1594            session.get_accumulated(ContentType::Text, "0"),
1595            Some("Before")
1596        );
1597    }
1598
1599    #[test]
1600    fn test_delta_validation_warns_on_large_delta() {
1601        let mut session = StreamingSession::new();
1602        session.on_message_start();
1603
1604        // Create a delta larger than snapshot_threshold()
1605        let large_delta = "x".repeat(snapshot_threshold() + 1);
1606
1607        // This should trigger a warning but still work
1608        let show_prefix = session.on_text_delta(0, &large_delta);
1609        assert!(show_prefix);
1610
1611        // Content should still be accumulated correctly
1612        assert_eq!(
1613            session.get_accumulated(ContentType::Text, "0"),
1614            Some(large_delta.as_str())
1615        );
1616    }
1617
1618    #[test]
1619    fn test_delta_validation_no_warning_for_small_delta() {
1620        let mut session = StreamingSession::new();
1621        session.on_message_start();
1622
1623        // Small delta should not trigger warning
1624        let small_delta = "Hello, world!";
1625        let show_prefix = session.on_text_delta(0, small_delta);
1626        assert!(show_prefix);
1627
1628        assert_eq!(
1629            session.get_accumulated(ContentType::Text, "0"),
1630            Some(small_delta)
1631        );
1632    }
1633
1634    // Tests for snapshot-as-delta detection methods
1635
1636    #[test]
1637    fn test_is_likely_snapshot_detects_snapshot() {
1638        let mut session = StreamingSession::new();
1639        session.on_message_start();
1640        session.on_content_block_start(0);
1641
1642        // First delta is long enough to meet threshold requirements
1643        // Using 40 chars to ensure it exceeds the 30 char minimum
1644        let initial = "This is a long message that exceeds threshold";
1645        session.on_text_delta(0, initial);
1646
1647        // Simulate GLM sending full accumulated content as next "delta"
1648        // The overlap is 45 chars (100% of initial), meeting both thresholds:
1649        // - char_count = 45 >= 30 ✓
1650        // - ratio = 45/48 ≈ 94% >= 50% ✓
1651        // - ends at safe boundary (space) ✓
1652        let snapshot = format!("{initial} plus new content");
1653        let is_snapshot = session.is_likely_snapshot(&snapshot, "0");
1654        assert!(
1655            is_snapshot,
1656            "Should detect snapshot-as-delta with strong overlap"
1657        );
1658    }
1659
1660    #[test]
1661    fn test_is_likely_snapshot_returns_false_for_genuine_delta() {
1662        let mut session = StreamingSession::new();
1663        session.on_message_start();
1664        session.on_content_block_start(0);
1665
1666        // First delta is "Hello"
1667        session.on_text_delta(0, "Hello");
1668
1669        // Genuine delta " World" doesn't start with previous content
1670        let is_snapshot = session.is_likely_snapshot(" World", "0");
1671        assert!(
1672            !is_snapshot,
1673            "Genuine delta should not be flagged as snapshot"
1674        );
1675    }
1676
1677    #[test]
1678    fn test_is_likely_snapshot_returns_false_when_no_previous_content() {
1679        let mut session = StreamingSession::new();
1680        session.on_message_start();
1681        session.on_content_block_start(0);
1682
1683        // No previous content, so anything is a genuine first delta
1684        let is_snapshot = session.is_likely_snapshot("Hello", "0");
1685        assert!(
1686            !is_snapshot,
1687            "First delta should not be flagged as snapshot"
1688        );
1689    }
1690
1691    #[test]
1692    fn test_extract_delta_from_snapshot() {
1693        let mut session = StreamingSession::new();
1694        session.on_message_start();
1695        session.on_content_block_start(0);
1696
1697        // First delta is long enough to meet threshold requirements
1698        let initial = "This is a long message that exceeds threshold";
1699        session.on_text_delta(0, initial);
1700
1701        // Snapshot should extract new portion
1702        let snapshot = format!("{initial} plus new content");
1703        let delta = session.get_delta_from_snapshot(&snapshot, "0").unwrap();
1704        assert_eq!(delta, " plus new content");
1705    }
1706
1707    #[test]
1708    fn test_extract_delta_from_snapshot_empty_delta() {
1709        let mut session = StreamingSession::new();
1710        session.on_message_start();
1711        session.on_content_block_start(0);
1712
1713        // First delta is "Hello"
1714        session.on_text_delta(0, "Hello");
1715
1716        // Snapshot "Hello" (identical to previous) should extract "" as delta
1717        let delta = session.get_delta_from_snapshot("Hello", "0").unwrap();
1718        assert_eq!(delta, "");
1719    }
1720
1721    #[test]
1722    fn test_extract_delta_from_snapshot_returns_error_on_non_snapshot() {
1723        let mut session = StreamingSession::new();
1724        session.on_message_start();
1725        session.on_content_block_start(0);
1726
1727        // First delta is "Hello"
1728        session.on_text_delta(0, "Hello");
1729
1730        // Calling on non-snapshot should return error (not panic)
1731        let result = session.get_delta_from_snapshot("World", "0");
1732        assert!(result.is_err());
1733        assert!(result
1734            .unwrap_err()
1735            .contains("extract_delta_from_snapshot called on non-snapshot text"));
1736    }
1737
1738    #[test]
1739    fn test_snapshot_detection_with_string_keys() {
1740        let mut session = StreamingSession::new();
1741        session.on_message_start();
1742
1743        // Test with string keys (like Codex/Gemini use)
1744        // Use content long enough to meet threshold requirements
1745        let initial = "This is a long message that exceeds threshold";
1746        session.on_text_delta_key("main", initial);
1747
1748        // Should detect snapshot for string key with strong overlap
1749        let snapshot = format!("{initial} plus new content");
1750        let is_snapshot = session.is_likely_snapshot(&snapshot, "main");
1751        assert!(
1752            is_snapshot,
1753            "Should detect snapshot with string keys when thresholds are met"
1754        );
1755
1756        // Should extract delta correctly
1757        let delta = session.get_delta_from_snapshot(&snapshot, "main").unwrap();
1758        assert_eq!(delta, " plus new content");
1759    }
1760
1761    #[test]
1762    fn test_snapshot_extraction_exact_match() {
1763        let mut session = StreamingSession::new();
1764        session.on_message_start();
1765
1766        // First delta is long enough to meet threshold requirements
1767        let initial = "This is a long message that exceeds threshold";
1768        session.on_text_delta(0, initial);
1769
1770        // Exact match with additional content (strong overlap)
1771        let exact_match = format!("{initial} World");
1772        let delta1 = session.get_delta_from_snapshot(&exact_match, "0").unwrap();
1773        assert_eq!(delta1, " World");
1774    }
1775
1776    #[test]
1777    fn test_token_by_token_streaming_scenario() {
1778        let mut session = StreamingSession::new();
1779        session.on_message_start();
1780
1781        // Simulate token-by-token streaming
1782        let tokens = ["H", "e", "l", "l", "o", " ", "W", "o", "r", "l", "d", "!"];
1783
1784        for token in tokens {
1785            let show_prefix = session.on_text_delta(0, token);
1786
1787            // Only first token should show prefix
1788            if token == "H" {
1789                assert!(show_prefix, "First token should show prefix");
1790            } else {
1791                assert!(!show_prefix, "Subsequent tokens should not show prefix");
1792            }
1793        }
1794
1795        // Verify accumulated content
1796        assert_eq!(
1797            session.get_accumulated(ContentType::Text, "0"),
1798            Some("Hello World!")
1799        );
1800    }
1801
1802    #[test]
1803    fn test_snapshot_in_token_stream() {
1804        let mut session = StreamingSession::new();
1805        session.on_message_start();
1806
1807        // First few tokens as genuine deltas - use longer content to meet thresholds
1808        let initial = "This is a long message that exceeds threshold";
1809        session.on_text_delta(0, initial);
1810        session.on_text_delta(0, " with more content");
1811
1812        // Now GLM sends a snapshot instead of delta
1813        // The accumulated content plus new content should meet thresholds:
1814        // Accumulated: "This is a long message that exceeds threshold with more content" (62 chars)
1815        // Snapshot: accumulated + "! This is additional content" (88 chars total)
1816        // Overlap: 62 chars
1817        // Ratio: 62/88 ≈ 70% >= 50% ✓
1818        let accumulated = session
1819            .get_accumulated(ContentType::Text, "0")
1820            .unwrap()
1821            .to_string();
1822        let snapshot = format!("{accumulated}! This is additional content");
1823        assert!(
1824            session.is_likely_snapshot(&snapshot, "0"),
1825            "Should detect snapshot in token stream with strong overlap"
1826        );
1827
1828        // Extract delta and continue
1829        let delta = session.get_delta_from_snapshot(&snapshot, "0").unwrap();
1830        assert!(delta.contains("! This is additional content"));
1831
1832        // Apply the delta
1833        session.on_text_delta(0, delta);
1834
1835        // Verify final accumulated content
1836        let expected = format!("{accumulated}! This is additional content");
1837        assert_eq!(
1838            session.get_accumulated(ContentType::Text, "0"),
1839            Some(expected.as_str())
1840        );
1841    }
1842
1843    // Tests for message identity tracking
1844
1845    #[test]
1846    fn test_set_and_get_current_message_id() {
1847        let mut session = StreamingSession::new();
1848
1849        // Initially no message ID
1850        assert!(session.get_current_message_id().is_none());
1851
1852        // Set a message ID
1853        session.set_current_message_id(Some("msg-123".to_string()));
1854        assert_eq!(session.get_current_message_id(), Some("msg-123"));
1855
1856        // Clear the message ID
1857        session.set_current_message_id(None);
1858        assert!(session.get_current_message_id().is_none());
1859    }
1860
1861    #[test]
1862    fn test_mark_message_displayed() {
1863        let mut session = StreamingSession::new();
1864
1865        // Initially not marked as displayed
1866        assert!(!session.is_duplicate_final_message("msg-123"));
1867
1868        // Mark as displayed
1869        session.mark_message_displayed("msg-123");
1870        assert!(session.is_duplicate_final_message("msg-123"));
1871
1872        // Different message ID is not a duplicate
1873        assert!(!session.is_duplicate_final_message("msg-456"));
1874    }
1875
1876    #[test]
1877    fn test_message_stop_marks_displayed() {
1878        let mut session = StreamingSession::new();
1879
1880        // Set a message ID
1881        session.set_current_message_id(Some("msg-123".to_string()));
1882
1883        // Start a message with content
1884        session.on_message_start();
1885        session.on_text_delta(0, "Hello");
1886
1887        // Stop should mark as displayed
1888        session.on_message_stop();
1889        assert!(session.is_duplicate_final_message("msg-123"));
1890    }
1891
1892    #[test]
1893    fn test_multiple_messages_tracking() {
1894        let mut session = StreamingSession::new();
1895
1896        // First message
1897        session.set_current_message_id(Some("msg-1".to_string()));
1898        session.on_message_start();
1899        session.on_text_delta(0, "First");
1900        session.on_message_stop();
1901        assert!(session.is_duplicate_final_message("msg-1"));
1902
1903        // Second message
1904        session.set_current_message_id(Some("msg-2".to_string()));
1905        session.on_message_start();
1906        session.on_text_delta(0, "Second");
1907        session.on_message_stop();
1908        assert!(session.is_duplicate_final_message("msg-1"));
1909        assert!(session.is_duplicate_final_message("msg-2"));
1910    }
1911
1912    // Tests for repeated MessageStart handling (GLM/ccs-glm protocol quirk)
1913
1914    #[test]
1915    fn test_repeated_message_start_preserves_output_started() {
1916        let mut session = StreamingSession::new();
1917
1918        // First message start
1919        session.on_message_start();
1920
1921        // First delta should show prefix
1922        let show_prefix = session.on_text_delta(0, "Hello");
1923        assert!(show_prefix, "First delta should show prefix");
1924
1925        // Second delta should NOT show prefix
1926        let show_prefix = session.on_text_delta(0, " World");
1927        assert!(!show_prefix, "Second delta should not show prefix");
1928
1929        // Simulate GLM sending repeated MessageStart during streaming
1930        // This should preserve output_started_for_key to prevent prefix spam
1931        session.on_message_start();
1932
1933        // After repeated MessageStart, delta should NOT show prefix
1934        // because output_started_for_key was preserved
1935        let show_prefix = session.on_text_delta(0, "!");
1936        assert!(
1937            !show_prefix,
1938            "After repeated MessageStart, delta should not show prefix"
1939        );
1940
1941        // Verify accumulated content was cleared (as expected for mid-stream restart)
1942        assert_eq!(
1943            session.get_accumulated(ContentType::Text, "0"),
1944            Some("!"),
1945            "Accumulated content should start fresh after repeated MessageStart"
1946        );
1947    }
1948
1949    #[test]
1950    fn test_repeated_message_start_with_normal_reset_between_messages() {
1951        let mut session = StreamingSession::new();
1952
1953        // First message
1954        session.on_message_start();
1955        session.on_text_delta(0, "First");
1956        session.on_message_stop();
1957
1958        // Second message - normal reset should clear output_started_for_key
1959        session.on_message_start();
1960
1961        // First delta of second message SHOULD show prefix
1962        let show_prefix = session.on_text_delta(0, "Second");
1963        assert!(
1964            show_prefix,
1965            "First delta of new message should show prefix after normal reset"
1966        );
1967    }
1968
1969    #[test]
1970    fn test_repeated_message_start_with_multiple_indices() {
1971        let mut session = StreamingSession::new();
1972
1973        // First message start
1974        session.on_message_start();
1975
1976        // First delta for index 0
1977        let show_prefix = session.on_text_delta(0, "Index0");
1978        assert!(show_prefix, "First delta for index 0 should show prefix");
1979
1980        // First delta for index 1
1981        let show_prefix = session.on_text_delta(1, "Index1");
1982        assert!(show_prefix, "First delta for index 1 should show prefix");
1983
1984        // Simulate repeated MessageStart
1985        session.on_message_start();
1986
1987        // After repeated MessageStart, deltas should NOT show prefix
1988        // because output_started_for_key was preserved for both indices
1989        let show_prefix = session.on_text_delta(0, " more");
1990        assert!(
1991            !show_prefix,
1992            "Delta for index 0 should not show prefix after repeated MessageStart"
1993        );
1994
1995        let show_prefix = session.on_text_delta(1, " more");
1996        assert!(
1997            !show_prefix,
1998            "Delta for index 1 should not show prefix after repeated MessageStart"
1999        );
2000    }
2001
2002    #[test]
2003    fn test_repeated_message_start_during_thinking_stream() {
2004        let mut session = StreamingSession::new();
2005
2006        // First message start
2007        session.on_message_start();
2008
2009        // First thinking delta should show prefix
2010        let show_prefix = session.on_thinking_delta(0, "Thinking...");
2011        assert!(show_prefix, "First thinking delta should show prefix");
2012
2013        // Simulate repeated MessageStart
2014        session.on_message_start();
2015
2016        // After repeated MessageStart, thinking delta should NOT show prefix
2017        let show_prefix = session.on_thinking_delta(0, " more");
2018        assert!(
2019            !show_prefix,
2020            "Thinking delta after repeated MessageStart should not show prefix"
2021        );
2022    }
2023
2024    #[test]
2025    fn test_message_stop_then_message_start_resets_normally() {
2026        let mut session = StreamingSession::new();
2027
2028        // First message
2029        session.on_message_start();
2030        session.on_text_delta(0, "First");
2031
2032        // Message stop finalizes the message
2033        session.on_message_stop();
2034
2035        // New message start should reset normally (not preserve output_started)
2036        session.on_message_start();
2037
2038        // First delta of new message SHOULD show prefix
2039        let show_prefix = session.on_text_delta(0, "Second");
2040        assert!(
2041            show_prefix,
2042            "First delta after MessageStop should show prefix (normal reset)"
2043        );
2044    }
2045
2046    #[test]
2047    fn test_repeated_content_block_start_same_index() {
2048        let mut session = StreamingSession::new();
2049
2050        // Message start
2051        session.on_message_start();
2052
2053        // First delta for index 0
2054        let show_prefix = session.on_text_delta(0, "Hello");
2055        assert!(show_prefix, "First delta should show prefix");
2056
2057        // Simulate repeated ContentBlockStart for same index
2058        // (Some agents send this, and we should NOT clear accumulated content)
2059        session.on_content_block_start(0);
2060
2061        // Delta after repeated ContentBlockStart should NOT show prefix
2062        let show_prefix = session.on_text_delta(0, " World");
2063        assert!(
2064            !show_prefix,
2065            "Delta after repeated ContentBlockStart should not show prefix"
2066        );
2067
2068        // Verify accumulated content was preserved
2069        assert_eq!(
2070            session.get_accumulated(ContentType::Text, "0"),
2071            Some("Hello World"),
2072            "Accumulated content should be preserved across repeated ContentBlockStart"
2073        );
2074    }
2075
2076    // Tests for verbose_warnings feature
2077
2078    #[test]
2079    fn test_verbose_warnings_default_is_disabled() {
2080        let session = StreamingSession::new();
2081        assert!(
2082            !session.verbose_warnings,
2083            "Default should have verbose_warnings disabled"
2084        );
2085    }
2086
2087    #[test]
2088    fn test_with_verbose_warnings_enables_flag() {
2089        let session = StreamingSession::new().with_verbose_warnings(true);
2090        assert!(
2091            session.verbose_warnings,
2092            "Should have verbose_warnings enabled"
2093        );
2094    }
2095
2096    #[test]
2097    fn test_with_verbose_warnings_disabled_explicitly() {
2098        let session = StreamingSession::new().with_verbose_warnings(false);
2099        assert!(
2100            !session.verbose_warnings,
2101            "Should have verbose_warnings disabled"
2102        );
2103    }
2104
2105    #[test]
2106    fn test_large_delta_warning_respects_verbose_flag() {
2107        // Test with verbose warnings enabled
2108        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
2109        session_verbose.on_message_start();
2110
2111        let large_delta = "x".repeat(snapshot_threshold() + 1);
2112        // This would emit a warning to stderr if verbose_warnings is enabled
2113        let _show_prefix = session_verbose.on_text_delta(0, &large_delta);
2114
2115        // Test with verbose warnings disabled (default)
2116        let mut session_quiet = StreamingSession::new();
2117        session_quiet.on_message_start();
2118
2119        let large_delta = "x".repeat(snapshot_threshold() + 1);
2120        // This should NOT emit a warning
2121        let _show_prefix = session_quiet.on_text_delta(0, &large_delta);
2122
2123        // Both sessions should accumulate content correctly
2124        assert_eq!(
2125            session_verbose.get_accumulated(ContentType::Text, "0"),
2126            Some(large_delta.as_str())
2127        );
2128        assert_eq!(
2129            session_quiet.get_accumulated(ContentType::Text, "0"),
2130            Some(large_delta.as_str())
2131        );
2132    }
2133
2134    #[test]
2135    fn test_repeated_message_start_warning_respects_verbose_flag() {
2136        // Test with verbose warnings enabled
2137        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
2138        session_verbose.on_message_start();
2139        session_verbose.on_text_delta(0, "Hello");
2140        // This would emit a warning about repeated MessageStart
2141        session_verbose.on_message_start();
2142
2143        // Test with verbose warnings disabled (default)
2144        let mut session_quiet = StreamingSession::new();
2145        session_quiet.on_message_start();
2146        session_quiet.on_text_delta(0, "Hello");
2147        // This should NOT emit a warning
2148        session_quiet.on_message_start();
2149
2150        // Both sessions should handle the restart correctly
2151        assert_eq!(
2152            session_verbose.get_accumulated(ContentType::Text, "0"),
2153            None,
2154            "Accumulated content should be cleared after repeated MessageStart"
2155        );
2156        assert_eq!(
2157            session_quiet.get_accumulated(ContentType::Text, "0"),
2158            None,
2159            "Accumulated content should be cleared after repeated MessageStart"
2160        );
2161    }
2162
2163    #[test]
2164    fn test_pattern_detection_warning_respects_verbose_flag() {
2165        // Test with verbose warnings enabled
2166        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
2167        session_verbose.on_message_start();
2168
2169        // Send 3 large deltas to trigger pattern detection
2170        // Use different content to avoid consecutive duplicate detection
2171        for i in 0..3 {
2172            let large_delta = format!("{}{i}", "x".repeat(snapshot_threshold() + 1));
2173            let _ = session_verbose.on_text_delta(0, &large_delta);
2174        }
2175
2176        // Test with verbose warnings disabled (default)
2177        let mut session_quiet = StreamingSession::new();
2178        session_quiet.on_message_start();
2179
2180        // Send 3 large deltas (different content to avoid consecutive duplicate detection)
2181        for i in 0..3 {
2182            let large_delta = format!("{}{i}", "x".repeat(snapshot_threshold() + 1));
2183            let _ = session_quiet.on_text_delta(0, &large_delta);
2184        }
2185
2186        // Verify that large_delta_count still tracks all 3 large deltas for both sessions
2187        assert_eq!(
2188            session_verbose
2189                .get_streaming_quality_metrics()
2190                .large_delta_count,
2191            3
2192        );
2193        assert_eq!(
2194            session_quiet
2195                .get_streaming_quality_metrics()
2196                .large_delta_count,
2197            3
2198        );
2199    }
2200
2201    #[test]
2202    fn test_snapshot_extraction_error_warning_respects_verbose_flag() {
2203        // Create a session where we'll trigger a snapshot extraction error
2204        // by manually manipulating accumulated content
2205        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
2206        session_verbose.on_message_start();
2207        session_verbose.on_content_block_start(0);
2208
2209        // First delta
2210        session_verbose.on_text_delta(0, "Hello");
2211
2212        // Manually clear accumulated to simulate a state mismatch
2213        session_verbose.accumulated.clear();
2214
2215        // Now try to process a snapshot - extraction will fail
2216        // This would emit a warning if verbose_warnings is enabled
2217        let _show_prefix = session_verbose.on_text_delta(0, "Hello World");
2218
2219        // Test with verbose warnings disabled (default)
2220        let mut session_quiet = StreamingSession::new();
2221        session_quiet.on_message_start();
2222        session_quiet.on_content_block_start(0);
2223
2224        session_quiet.on_text_delta(0, "Hello");
2225        session_quiet.accumulated.clear();
2226
2227        // This should NOT emit a warning
2228        let _show_prefix = session_quiet.on_text_delta(0, "Hello World");
2229
2230        // The quiet session should handle the error gracefully
2231        assert!(session_quiet
2232            .get_accumulated(ContentType::Text, "0")
2233            .is_some());
2234    }
2235
2236    // Tests for enhanced streaming metrics
2237
2238    #[test]
2239    fn test_streaming_quality_metrics_includes_snapshot_repairs() {
2240        let mut session = StreamingSession::new();
2241        session.on_message_start();
2242        session.on_content_block_start(0);
2243
2244        // First delta - long enough to meet threshold requirements
2245        let initial = "This is a long message that exceeds threshold";
2246        session.on_text_delta(0, initial);
2247
2248        // GLM sends snapshot instead of delta (with strong overlap)
2249        let snapshot = format!("{initial} World!");
2250        let _ = session.on_text_delta(0, &snapshot);
2251
2252        let metrics = session.get_streaming_quality_metrics();
2253        assert_eq!(
2254            metrics.snapshot_repairs_count, 1,
2255            "Should track one snapshot repair"
2256        );
2257    }
2258
2259    #[test]
2260    fn test_streaming_quality_metrics_includes_large_delta_count() {
2261        let mut session = StreamingSession::new();
2262        session.on_message_start();
2263
2264        // Send 3 large deltas
2265        for _ in 0..3 {
2266            let large_delta = "x".repeat(snapshot_threshold() + 1);
2267            let _ = session.on_text_delta(0, &large_delta);
2268        }
2269
2270        let metrics = session.get_streaming_quality_metrics();
2271        assert_eq!(
2272            metrics.large_delta_count, 3,
2273            "Should track three large deltas"
2274        );
2275    }
2276
2277    #[test]
2278    fn test_streaming_quality_metrics_includes_protocol_violations() {
2279        let mut session = StreamingSession::new();
2280        session.on_message_start();
2281        session.on_text_delta(0, "Hello");
2282
2283        // Simulate GLM sending repeated MessageStart during streaming
2284        session.on_message_start();
2285        session.on_text_delta(0, " World");
2286
2287        // Another violation
2288        session.on_message_start();
2289
2290        let metrics = session.get_streaming_quality_metrics();
2291        assert_eq!(
2292            metrics.protocol_violations, 2,
2293            "Should track two protocol violations"
2294        );
2295    }
2296
2297    #[test]
2298    fn test_streaming_quality_metrics_all_new_fields_zero_by_default() {
2299        let session = StreamingSession::new();
2300        let metrics = session.get_streaming_quality_metrics();
2301
2302        assert_eq!(metrics.snapshot_repairs_count, 0);
2303        assert_eq!(metrics.large_delta_count, 0);
2304        assert_eq!(metrics.protocol_violations, 0);
2305    }
2306
2307    #[test]
2308    fn test_streaming_quality_metrics_comprehensive_tracking() {
2309        let mut session = StreamingSession::new();
2310        session.on_message_start();
2311
2312        // Normal delta
2313        session.on_text_delta(0, "Hello");
2314
2315        // Large delta
2316        let large_delta = "x".repeat(snapshot_threshold() + 1);
2317        let _ = session.on_text_delta(0, &large_delta);
2318
2319        // Snapshot repair (note: the snapshot is also large, so it counts as another large delta)
2320        let snapshot = format!("Hello{large_delta} World");
2321        let _ = session.on_text_delta(0, &snapshot);
2322
2323        // Check metrics BEFORE the protocol violation (which clears delta_sizes)
2324        let metrics = session.get_streaming_quality_metrics();
2325        assert_eq!(metrics.snapshot_repairs_count, 1);
2326        assert_eq!(
2327            metrics.large_delta_count, 2,
2328            "Both the large delta and the snapshot are large"
2329        );
2330        assert_eq!(metrics.total_deltas, 3);
2331        assert_eq!(metrics.protocol_violations, 0, "No violation yet");
2332
2333        // Protocol violation
2334        session.on_message_start();
2335
2336        // After violation, protocol_violations is incremented but delta_sizes is cleared
2337        let metrics_after = session.get_streaming_quality_metrics();
2338        assert_eq!(metrics_after.protocol_violations, 1);
2339        assert_eq!(
2340            metrics_after.total_deltas, 0,
2341            "Delta sizes cleared after violation"
2342        );
2343    }
2344
2345    // Tests for hash-based deduplication
2346
2347    #[test]
2348    fn test_content_hash_computed_on_message_stop() {
2349        let mut session = StreamingSession::new();
2350        session.on_message_start();
2351        session.on_text_delta(0, "Hello");
2352        session.on_text_delta(0, " World");
2353
2354        // Hash should be None before message_stop
2355        assert_eq!(session.final_content_hash, None);
2356
2357        // Hash should be computed after message_stop
2358        session.on_message_stop();
2359        assert!(session.final_content_hash.is_some());
2360    }
2361
2362    #[test]
2363    fn test_content_hash_none_when_no_content() {
2364        let mut session = StreamingSession::new();
2365        session.on_message_start();
2366
2367        // No content streamed
2368        session.on_message_stop();
2369        assert_eq!(session.final_content_hash, None);
2370    }
2371
2372    #[test]
2373    fn test_is_duplicate_by_hash_returns_true_for_matching_content() {
2374        let mut session = StreamingSession::new();
2375        session.on_message_start();
2376        session.on_text_delta(0, "Hello World");
2377        session.on_message_stop();
2378
2379        // Same content should be detected as duplicate
2380        assert!(session.is_duplicate_by_hash("Hello World", None));
2381    }
2382
2383    #[test]
2384    fn test_is_duplicate_by_hash_returns_false_for_different_content() {
2385        let mut session = StreamingSession::new();
2386        session.on_message_start();
2387        session.on_text_delta(0, "Hello World");
2388        session.on_message_stop();
2389
2390        // Different content should NOT be detected as duplicate
2391        assert!(!session.is_duplicate_by_hash("Different content", None));
2392    }
2393
2394    #[test]
2395    fn test_is_duplicate_by_hash_returns_false_when_no_content_streamed() {
2396        let session = StreamingSession::new();
2397
2398        // No content streamed, so no hash
2399        assert!(!session.is_duplicate_by_hash("Hello World", None));
2400    }
2401
2402    #[test]
2403    fn test_content_hash_multiple_content_blocks() {
2404        let mut session = StreamingSession::new();
2405        session.on_message_start();
2406        session.on_text_delta(0, "First block");
2407        session.on_text_delta(1, "Second block");
2408        session.on_message_stop();
2409
2410        // Hash should be computed from all blocks
2411        assert!(session.final_content_hash.is_some());
2412        // Individual content shouldn't match the combined hash
2413        assert!(!session.is_duplicate_by_hash("First block", None));
2414        assert!(!session.is_duplicate_by_hash("Second block", None));
2415    }
2416
2417    #[test]
2418    fn test_content_hash_consistent_for_same_content() {
2419        let mut session1 = StreamingSession::new();
2420        session1.on_message_start();
2421        session1.on_text_delta(0, "Hello");
2422        session1.on_text_delta(0, " World");
2423        session1.on_message_stop();
2424
2425        let mut session2 = StreamingSession::new();
2426        session2.on_message_start();
2427        session2.on_text_delta(0, "Hello World");
2428        session2.on_message_stop();
2429
2430        // Same content should produce the same hash
2431        assert_eq!(session1.final_content_hash, session2.final_content_hash);
2432    }
2433
2434    #[test]
2435    fn test_content_hash_multiple_content_blocks_non_sequential_indices() {
2436        // Test that content hash uses numeric sorting, not lexicographic sorting.
2437        // This prevents bugs when GLM sends non-sequential indices like 0, 1, 10, 2.
2438        // With lexicographic sorting, indices would be ordered as 0, 1, 10, 2,
2439        // but with numeric sorting they should be ordered as 0, 1, 2, 10.
2440        let mut session1 = StreamingSession::new();
2441        session1.on_message_start();
2442        // Add content in non-sequential order: 0, 1, 10, 2
2443        session1.on_text_delta(0, "Block 0");
2444        session1.on_text_delta(1, "Block 1");
2445        session1.on_text_delta(10, "Block 10");
2446        session1.on_text_delta(2, "Block 2");
2447        session1.on_message_stop();
2448
2449        let mut session2 = StreamingSession::new();
2450        session2.on_message_start();
2451        // Add the same content but in numeric order: 0, 1, 2, 10
2452        session2.on_text_delta(0, "Block 0");
2453        session2.on_text_delta(1, "Block 1");
2454        session2.on_text_delta(2, "Block 2");
2455        session2.on_text_delta(10, "Block 10");
2456        session2.on_message_stop();
2457
2458        // Both sessions should produce the same hash because they contain the same content
2459        // in the same logical order (sorted by numeric index, not insertion order)
2460        assert_eq!(
2461            session1.final_content_hash,
2462            session2.final_content_hash,
2463            "Content hash should be consistent regardless of insertion order when using numeric sorting"
2464        );
2465
2466        // Verify that is_duplicate_by_hash also works correctly
2467        let combined_content = "Block 0Block 1Block 2Block 10";
2468        assert!(
2469            session1.is_duplicate_by_hash(combined_content, None),
2470            "is_duplicate_by_hash should match content in numeric order"
2471        );
2472    }
2473
2474    // Tests for rapid index switching edge case (RFC-003)
2475
2476    #[test]
2477    fn test_rapid_index_switch_with_clear() {
2478        let mut session = StreamingSession::new();
2479        session.on_message_start();
2480
2481        // Start block 0 and accumulate content
2482        session.on_content_block_start(0);
2483        let show_prefix = session.on_text_delta(0, "X");
2484        assert!(show_prefix, "First delta for index 0 should show prefix");
2485        assert_eq!(session.get_accumulated(ContentType::Text, "0"), Some("X"));
2486
2487        // Switch to block 1 - this should clear accumulated content for index 0
2488        session.on_content_block_start(1);
2489
2490        // Verify accumulated for index 0 was cleared
2491        assert_eq!(
2492            session.get_accumulated(ContentType::Text, "0"),
2493            None,
2494            "Accumulated content for index 0 should be cleared when switching to index 1"
2495        );
2496
2497        // Switch back to index 0
2498        session.on_content_block_start(0);
2499
2500        // Since output_started_for_key was also cleared, prefix should show again
2501        let show_prefix = session.on_text_delta(0, "Y");
2502        assert!(
2503            show_prefix,
2504            "Prefix should show when switching back to a previously cleared index"
2505        );
2506
2507        // Verify new content is accumulated fresh
2508        assert_eq!(
2509            session.get_accumulated(ContentType::Text, "0"),
2510            Some("Y"),
2511            "New content should be accumulated fresh after clear"
2512        );
2513    }
2514
2515    #[test]
2516    fn test_delta_sizes_cleared_on_index_switch() {
2517        let mut session = StreamingSession::new();
2518        session.on_message_start();
2519
2520        // Track some delta sizes for index 0
2521        session.on_text_delta(0, "Hello");
2522        session.on_text_delta(0, " World");
2523
2524        let content_key = (ContentType::Text, "0".to_string());
2525        assert!(
2526            session.delta_sizes.contains_key(&content_key),
2527            "Delta sizes should be tracked for index 0"
2528        );
2529        let sizes_before = session.delta_sizes.get(&content_key).unwrap();
2530        assert_eq!(sizes_before.len(), 2, "Should have 2 delta sizes tracked");
2531
2532        // Switch to index 1 - this should clear delta_sizes for index 0
2533        session.on_content_block_start(1);
2534
2535        assert!(
2536            !session.delta_sizes.contains_key(&content_key),
2537            "Delta sizes for index 0 should be cleared when switching to index 1"
2538        );
2539
2540        // Add deltas for index 1
2541        session.on_text_delta(1, "New");
2542
2543        let content_key_1 = (ContentType::Text, "1".to_string());
2544        let sizes_after = session.delta_sizes.get(&content_key_1).unwrap();
2545        assert_eq!(
2546            sizes_after.len(),
2547            1,
2548            "Should have fresh size tracking for index 1"
2549        );
2550    }
2551
2552    #[test]
2553    fn test_rapid_index_switch_with_thinking_content() {
2554        let mut session = StreamingSession::new();
2555        session.on_message_start();
2556
2557        // Start thinking content in index 0
2558        session.on_content_block_start(0);
2559        let show_prefix = session.on_thinking_delta(0, "Thinking...");
2560        assert!(show_prefix, "First thinking delta should show prefix");
2561        assert_eq!(
2562            session.get_accumulated(ContentType::Thinking, "0"),
2563            Some("Thinking...")
2564        );
2565
2566        // Switch to text content in index 1 - this should clear index 0's accumulated
2567        session.on_content_block_start(1);
2568
2569        // Verify index 0's accumulated thinking was cleared
2570        assert_eq!(
2571            session.get_accumulated(ContentType::Thinking, "0"),
2572            None,
2573            "Thinking content for index 0 should be cleared when switching to index 1"
2574        );
2575
2576        let show_prefix = session.on_text_delta(1, "Text");
2577        assert!(
2578            show_prefix,
2579            "First text delta for index 1 should show prefix"
2580        );
2581
2582        // Switch back to index 0 for thinking
2583        session.on_content_block_start(0);
2584
2585        // Since output_started_for_key for (Thinking, "0") was cleared when switching to index 1,
2586        // the prefix should show again
2587        let show_prefix = session.on_thinking_delta(0, " more");
2588        assert!(
2589            show_prefix,
2590            "Thinking prefix should show when switching back to cleared index 0"
2591        );
2592
2593        // Verify thinking content was accumulated fresh (only the new content)
2594        assert_eq!(
2595            session.get_accumulated(ContentType::Thinking, "0"),
2596            Some(" more"),
2597            "Thinking content should be accumulated fresh after clear"
2598        );
2599    }
2600
2601    #[test]
2602    fn test_output_started_for_key_cleared_across_all_content_types() {
2603        let mut session = StreamingSession::new();
2604        session.on_message_start();
2605
2606        // Start block 0 with text and thinking
2607        // Note: ToolInput does not use output_started_for_key tracking
2608        session.on_content_block_start(0);
2609        session.on_text_delta(0, "Text");
2610        session.on_thinking_delta(0, "Thinking");
2611
2612        // Verify text and thinking have started output
2613        let text_key = (ContentType::Text, "0".to_string());
2614        let thinking_key = (ContentType::Thinking, "0".to_string());
2615
2616        assert!(session.output_started_for_key.contains(&text_key));
2617        assert!(session.output_started_for_key.contains(&thinking_key));
2618
2619        // Switch to index 1 - should clear output_started_for_key for all content types
2620        session.on_content_block_start(1);
2621
2622        assert!(
2623            !session.output_started_for_key.contains(&text_key),
2624            "Text output_started should be cleared for index 0"
2625        );
2626        assert!(
2627            !session.output_started_for_key.contains(&thinking_key),
2628            "Thinking output_started should be cleared for index 0"
2629        );
2630    }
2631
2632    // Tests for environment variable configuration
2633
2634    #[test]
2635    fn test_snapshot_threshold_default() {
2636        // Ensure no env var is set for this test
2637        std::env::remove_var("RALPH_STREAMING_SNAPSHOT_THRESHOLD");
2638        // Note: Since we use OnceLock, we can't reset the value in tests.
2639        // This test documents the default behavior.
2640        let threshold = snapshot_threshold();
2641        assert_eq!(
2642            threshold, DEFAULT_SNAPSHOT_THRESHOLD,
2643            "Default threshold should be 200"
2644        );
2645    }
2646}