ralph_workflow/json_parser/
streaming_state.rs

1//! Unified streaming state tracking module.
2//!
3//! This module provides a single source of truth for streaming state across
4//! all parsers (`Claude`, `Codex`, `Gemini`, `OpenCode`). It implements the streaming
5//! contract:
6//!
7//! # Streaming Contract
8//!
9//! 1. **Delta contract**: Each streaming event contains only newly generated text
10//! 2. **Message lifecycle**: `MessageStart` → (`ContentBlockStart` + deltas)* → `MessageStop`
11//! 3. **Deduplication rule**: Content displayed during streaming is never re-displayed
12//! 4. **State reset**: Streaming state resets on `MessageStart`/Init events
13//!
14//! # Message Lifecycle
15//!
16//! The streaming message lifecycle follows this sequence:
17//!
18//! 1. **`MessageStart`** (or equivalent init event):
19//!    - Resets all streaming state to `Idle`
20//!    - Clears accumulated content
21//!    - Resets content block state
22//!
23//! 2. **`ContentBlockStart`** (optional, for each content block):
24//!    - If already in a block with `started_output=true`, finalizes the previous block
25//!    - Initializes tracking for the new block index
26//!    - Clears any accumulated content for this block index
27//!
28//! 3. **Text/Thinking deltas** (zero or more per block):
29//!    - First delta for each `(content_type, index)` shows prefix
30//!    - Subsequent deltas update in-place (with prefix, using carriage return)
31//!    - Sets `started_output=true` for the current block
32//!
33//! 4. **`MessageStop`**:
34//!    - Finalizes the current content block
35//!    - Marks message as displayed to prevent duplicate final output
36//!    - Returns whether content was streamed (for emitting completion newline)
37//!
38//! # Content Block Transitions
39//!
40//! When transitioning between content blocks (e.g., block 0 → block 1):
41//!
42//! ```ignore
43//! // Streaming "Hello" in block 0
44//! session.on_text_delta(0, "Hello");  // started_output = true
45//!
46//! // Transition to block 1
47//! session.on_content_block_start(1);  // Finalizes block 0, started_output was true
48//!
49//! // Stream "World" in block 1
50//! session.on_text_delta(1, "World");  // New block, shows prefix again
51//! ```
52//!
53//! The `ContentBlockState::InBlock { index, started_output }` tracks:
54//! - `index`: Which block is currently active
55//! - `started_output`: Whether any content was output for this block
56//!
57//! This state enables proper finalization of previous blocks when new ones start.
58//!
59//! # Delta Contract
60//!
61//! This module enforces a strict **delta contract** - all streaming events must
62//! contain only the newly generated text (deltas), not the full accumulated content.
63//!
64//! Treating snapshots as deltas causes exponential duplication bugs. The session
65//! validates that incoming content is genuinely delta-sized and rejects likely
66//! snapshot-as-delta violations.
67//!
68//! # Example
69//!
70//! ```ignore
71//! use crate::json_parser::streaming_state::{StreamingSession, StreamingState};
72//!
73//! let mut session = StreamingSession::new();
74//!
75//! // Message starts - reset state
76//! session.on_message_start();
77//!
78//! // Content block starts
79//! session.on_content_block_start(0);
80//!
81//! // Text deltas arrive - accumulate and display
82//! let should_show_prefix = session.on_text_delta(0, "Hello");
83//! assert!(should_show_prefix); // First chunk shows prefix
84//!
85//! let should_show_prefix = session.on_text_delta(0, " World");
86//! assert!(!should_show_prefix); // Subsequent chunks don't show prefix
87//!
88//! // Check if content was already streamed (for deduplication)
89//! assert!(session.has_any_streamed_content());
90//!
91//! // Message stops - finalize
92//! session.on_message_stop();
93//! ```
94
95use crate::json_parser::deduplication::RollingHashWindow;
96use crate::json_parser::deduplication::{get_overlap_thresholds, DeltaDeduplicator};
97use crate::json_parser::health::StreamingQualityMetrics;
98use crate::json_parser::types::ContentType;
99use std::collections::hash_map::DefaultHasher;
100use std::collections::{HashMap, HashSet};
101use std::hash::{Hash, Hasher};
102use std::sync::OnceLock;
103
104// Streaming configuration constants
105
106/// Default threshold for detecting snapshot-as-delta violations (in characters).
107///
108/// Deltas exceeding this size are flagged as potential snapshots. The value of 200
109/// characters was chosen because:
110/// - Normal deltas are typically < 100 characters (a few tokens)
111/// - Snapshots often contain the full accumulated content (200+ chars)
112/// - This threshold catches most violations while minimizing false positives
113const DEFAULT_SNAPSHOT_THRESHOLD: usize = 200;
114
115/// Minimum allowed snapshot threshold (in characters).
116///
117/// Values below 50 would cause excessive false positives for normal deltas,
118/// as even small text chunks (1-2 sentences) can exceed 30 characters.
119const MIN_SNAPSHOT_THRESHOLD: usize = 50;
120
121/// Maximum allowed snapshot threshold (in characters).
122///
123/// Values above 1000 would allow malicious snapshots to pass undetected,
124/// potentially causing exponential duplication bugs.
125const MAX_SNAPSHOT_THRESHOLD: usize = 1000;
126
127/// Minimum number of consecutive large deltas required to trigger pattern detection warning.
128///
129/// This threshold prevents false positives from occasional large deltas.
130/// Three consecutive large deltas indicate a pattern (not a one-off event).
131const DEFAULT_PATTERN_DETECTION_MIN_DELTAS: usize = 3;
132
133/// Maximum number of delta sizes to track per content key for pattern detection.
134///
135/// Tracking recent delta sizes allows us to detect patterns of repeated large
136/// content (a sign of snapshot-as-delta bugs). Ten entries provide sufficient
137/// history without excessive memory usage.
138const DEFAULT_MAX_DELTA_HISTORY: usize = 10;
139
140/// Ralph enforces a **delta contract** for all streaming content.
141///
142/// Every streaming event must contain only the newly generated text (delta),
143/// never the full accumulated content (snapshot).
144///
145/// # Contract Violations
146///
147/// If a parser emits snapshot-style content when deltas are expected, it will
148/// cause exponential duplication bugs. The `StreamingSession` validates that
149/// incoming content is delta-sized and logs warnings when violations are detected.
150///
151/// # Validation Threshold
152///
153/// Deltas are expected to be small chunks (typically < 100 chars). If a single
154/// "delta" exceeds `snapshot_threshold()` characters, it may indicate a snapshot
155/// being treated as a delta.
156///
157/// # Pattern Detection
158///
159/// In addition to size threshold, we track patterns of repeated large content
160/// which may indicate a snapshot-as-delta bug where the same content is being
161/// sent repeatedly as if it were incremental.
162///
163/// # Environment Variables
164///
165/// The following environment variables can be set to configure streaming behavior:
166///
167/// - `RALPH_STREAMING_SNAPSHOT_THRESHOLD`: Threshold for detecting snapshot-as-delta
168///   violations (default: 200). Deltas exceeding this size trigger warnings.
169///
170/// Get the snapshot threshold from environment variable or use default.
171///
172/// Reads `RALPH_STREAMING_SNAPSHOT_THRESHOLD` env var.
173/// Valid range: 50-1000 characters.
174/// Falls back to default of 200 if not set or out of range.
175fn snapshot_threshold() -> usize {
176    static THRESHOLD: OnceLock<usize> = OnceLock::new();
177    *THRESHOLD.get_or_init(|| {
178        std::env::var("RALPH_STREAMING_SNAPSHOT_THRESHOLD")
179            .ok()
180            .and_then(|s| s.parse::<usize>().ok())
181            .and_then(|v| {
182                if (MIN_SNAPSHOT_THRESHOLD..=MAX_SNAPSHOT_THRESHOLD).contains(&v) {
183                    Some(v)
184                } else {
185                    None
186                }
187            })
188            .unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD)
189    })
190}
191
192/// Streaming state for the current message lifecycle.
193///
194/// Tracks whether we're in the middle of streaming content and whether
195/// that content has been displayed to the user.
196#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
197pub enum StreamingState {
198    /// No active streaming - idle state
199    #[default]
200    Idle,
201    /// Currently streaming content deltas
202    Streaming,
203    /// Content has been finalized (after `MessageStop` or equivalent)
204    Finalized,
205}
206
207/// State tracking for content blocks during streaming.
208///
209/// Replaces the boolean `in_content_block` with richer state that tracks
210/// which block is active and whether output has started for that block.
211/// This prevents "glued text" bugs where block boundaries are crossed
212/// without proper finalization.
213#[derive(Debug, Clone, PartialEq, Eq, Default)]
214pub enum ContentBlockState {
215    /// Not currently inside a content block
216    #[default]
217    NotInBlock,
218    /// Inside a content block with tracking for output state
219    InBlock {
220        /// The block index/identifier
221        index: String,
222        /// Whether any content has been output for this block
223        started_output: bool,
224    },
225}
226
227/// Unified streaming session tracker.
228///
229/// Provides a single source of truth for streaming state across all parsers.
230/// Tracks:
231/// - Current streaming state (`Idle`/`Streaming`/`Finalized`)
232/// - Which content types have been streamed
233/// - Accumulated content by content type and index
234/// - Whether prefix should be shown on next delta
235/// - Delta size patterns for detecting snapshot-as-delta violations
236/// - Persistent "output started" tracking independent of accumulated content
237/// - Verbosity-aware warning emission
238///
239/// # Lifecycle
240///
241/// 1. **Start**: `on_message_start()` - resets all state
242/// 2. **Stream**: `on_text_delta()` / `on_thinking_delta()` - accumulate content
243/// 3. **Stop**: `on_message_stop()` - finalize the message
244/// 4. **Repeat**: Back to step 1 for next message
245#[derive(Debug, Default, Clone)]
246pub struct StreamingSession {
247    /// Current streaming state
248    state: StreamingState,
249    /// Track which content types have been streamed (for deduplication)
250    /// Maps `ContentType` → whether it has been streamed
251    streamed_types: HashMap<ContentType, bool>,
252    /// Track the current content block state
253    current_block: ContentBlockState,
254    /// Accumulated content by (`content_type`, index) for display
255    /// This mirrors `DeltaAccumulator` but adds deduplication tracking
256    accumulated: HashMap<(ContentType, String), String>,
257    /// Track the order of keys for `most_recent` operations
258    key_order: Vec<(ContentType, String)>,
259    /// Track recent delta sizes for pattern detection
260    /// Maps `(content_type, key)` → vec of recent delta sizes
261    delta_sizes: HashMap<(ContentType, String), Vec<usize>>,
262    /// Maximum number of delta sizes to track per key
263    max_delta_history: usize,
264    /// Track the current message ID for duplicate detection
265    current_message_id: Option<String>,
266    /// Track the last finalized message ID to detect duplicates
267    last_finalized_message_id: Option<String>,
268    /// Track which messages have been displayed to prevent duplicate final output
269    displayed_final_messages: HashSet<String>,
270    /// Track which (`content_type`, key) pairs have had output started.
271    /// This is independent of `accumulated` to handle cases where accumulated
272    /// content may be cleared (e.g., repeated `ContentBlockStart` for same index).
273    /// Cleared on `on_message_start` to ensure fresh state for each message.
274    output_started_for_key: HashSet<(ContentType, String)>,
275    /// Whether to emit verbose warnings about streaming anomalies.
276    /// When false, suppresses diagnostic warnings that are useful for debugging
277    /// but noisy in production (e.g., GLM protocol violations, snapshot detection).
278    verbose_warnings: bool,
279    /// Count of snapshot repairs performed during this session
280    snapshot_repairs_count: usize,
281    /// Count of deltas that exceeded the size threshold
282    large_delta_count: usize,
283    /// Count of protocol violations detected (e.g., `MessageStart` during streaming)
284    protocol_violations: usize,
285    /// Hash of the final streamed content (for deduplication)
286    /// Computed at `message_stop` using all accumulated content
287    final_content_hash: Option<u64>,
288    /// Track the last rendered content for each key to detect when rendering
289    /// would produce identical output (prevents visual repetition).
290    /// Maps `(content_type, key)` → the last accumulated content that was rendered.
291    last_rendered: HashMap<(ContentType, String), String>,
292    /// Track all rendered content hashes for duplicate detection.
293    /// This is preserved across `MessageStart` boundaries to prevent duplicate rendering.
294    rendered_content_hashes: HashSet<u64>,
295    /// Track the last delta for each key to detect exact duplicate deltas.
296    /// This is preserved across `MessageStart` boundaries to prevent duplicate processing.
297    /// Maps `(content_type, key)` → the last delta that was processed.
298    last_delta: HashMap<(ContentType, String), String>,
299    /// Track consecutive duplicates for resend glitch detection ("3 strikes" heuristic).
300    /// Maps `(content_type, key)` → (count, `delta_hash`) where count tracks how many
301    /// times the exact same delta has arrived consecutively. When count exceeds
302    /// the threshold, the delta is dropped as a resend glitch.
303    consecutive_duplicates: HashMap<(ContentType, String), (usize, u64)>,
304    /// Delta deduplicator using KMP and rolling hash for snapshot detection.
305    /// Provides O(n+m) guaranteed complexity for detecting snapshot-as-delta violations.
306    /// Cleared on message boundaries to prevent false positives.
307    deduplicator: DeltaDeduplicator,
308    /// Track message IDs that have been fully rendered from an assistant event BEFORE streaming.
309    /// When an assistant event arrives before streaming deltas, we render it and record
310    /// the message_id. ALL subsequent streaming deltas for that message_id should be
311    /// suppressed to prevent duplication.
312    pre_rendered_message_ids: HashSet<String>,
313}
314
315impl StreamingSession {
316    /// Create a new streaming session.
317    pub fn new() -> Self {
318        Self {
319            max_delta_history: DEFAULT_MAX_DELTA_HISTORY,
320            verbose_warnings: false,
321            ..Default::default()
322        }
323    }
324
325    /// Configure whether to emit verbose warnings about streaming anomalies.
326    ///
327    /// When enabled, diagnostic warnings are printed for:
328    /// - Repeated `MessageStart` events (GLM protocol violations)
329    /// - Large deltas that may indicate snapshot-as-delta bugs
330    /// - Pattern detection of repeated large content
331    ///
332    /// When disabled (default), these warnings are suppressed to avoid
333    /// noise in production output.
334    ///
335    /// # Arguments
336    /// * `enabled` - Whether to enable verbose warnings
337    ///
338    /// # Returns
339    /// The modified session for builder chaining.
340    ///
341    /// # Example
342    ///
343    /// ```ignore
344    /// let mut session = StreamingSession::new().with_verbose_warnings(true);
345    /// ```
346    pub const fn with_verbose_warnings(mut self, enabled: bool) -> Self {
347        self.verbose_warnings = enabled;
348        self
349    }
350
351    /// Reset the session on new message start.
352    ///
353    /// This should be called when:
354    /// - Claude: `MessageStart` event
355    /// - Codex: `TurnStarted` event
356    /// - Gemini: `init` event or new message
357    /// - `OpenCode`: New part starts
358    ///
359    /// # Arguments
360    /// * `message_id` - Optional unique identifier for this message (for deduplication)
361    ///
362    /// # Note on Repeated `MessageStart` Events
363    ///
364    /// Some agents (notably GLM/ccs-glm) send repeated `MessageStart` events during
365    /// a single logical streaming session. When this happens while state is `Streaming`,
366    /// we preserve `output_started_for_key` to prevent prefix spam on each delta that
367    /// follows the repeated `MessageStart`. This is a defensive measure to handle
368    /// non-standard agent protocols while maintaining correct behavior for legitimate
369    /// multi-message scenarios.
370    pub fn on_message_start(&mut self) {
371        // Detect repeated MessageStart during active streaming
372        let is_mid_stream_restart = self.state == StreamingState::Streaming;
373
374        if is_mid_stream_restart {
375            // Track protocol violation
376            self.protocol_violations += 1;
377            // Log the contract violation for debugging (only if verbose warnings enabled)
378            if self.verbose_warnings {
379                eprintln!(
380                    "Warning: Received MessageStart while state is Streaming. \
381                    This indicates a non-standard agent protocol (e.g., GLM sending \
382                    repeated MessageStart events). Preserving output_started_for_key \
383                    to prevent prefix spam. File: streaming_state.rs, Line: {}",
384                    line!()
385                );
386            }
387
388            // Preserve output_started_for_key to prevent prefix spam.
389            // std::mem::take replaces the HashSet with an empty one and returns the old values,
390            // which we restore after clearing other state. This ensures repeated MessageStart
391            // events don't reset output tracking, preventing duplicate prefix display.
392            let preserved_output_started = std::mem::take(&mut self.output_started_for_key);
393
394            // Also preserve last_delta to detect duplicate deltas across MessageStart boundaries
395            let preserved_last_delta = std::mem::take(&mut self.last_delta);
396
397            // Also preserve rendered_content_hashes to detect duplicate rendering across MessageStart
398            let preserved_rendered_hashes = std::mem::take(&mut self.rendered_content_hashes);
399
400            // Also preserve consecutive_duplicates to detect resend glitches across MessageStart
401            let preserved_consecutive_duplicates = std::mem::take(&mut self.consecutive_duplicates);
402
403            self.state = StreamingState::Idle;
404            self.streamed_types.clear();
405            self.current_block = ContentBlockState::NotInBlock;
406            self.accumulated.clear();
407            self.key_order.clear();
408            self.delta_sizes.clear();
409            self.last_rendered.clear();
410            self.deduplicator.clear();
411
412            // Restore preserved state
413            self.output_started_for_key = preserved_output_started;
414            self.last_delta = preserved_last_delta;
415            self.rendered_content_hashes = preserved_rendered_hashes;
416            self.consecutive_duplicates = preserved_consecutive_duplicates;
417        } else {
418            // Normal reset for new message
419            self.state = StreamingState::Idle;
420            self.streamed_types.clear();
421            self.current_block = ContentBlockState::NotInBlock;
422            self.accumulated.clear();
423            self.key_order.clear();
424            self.delta_sizes.clear();
425            self.output_started_for_key.clear();
426            self.last_rendered.clear();
427            self.last_delta.clear();
428            self.rendered_content_hashes.clear();
429            self.consecutive_duplicates.clear();
430            self.deduplicator.clear();
431        }
432        // Note: We don't reset current_message_id here - it's set by a separate method
433        // This allows for more flexible message ID handling
434    }
435
436    /// Set the current message ID for tracking.
437    ///
438    /// This should be called when processing a `MessageStart` event that contains
439    /// a message identifier. Used to prevent duplicate display of final messages.
440    ///
441    /// # Arguments
442    /// * `message_id` - The unique identifier for this message (or None to clear)
443    pub fn set_current_message_id(&mut self, message_id: Option<String>) {
444        self.current_message_id = message_id;
445    }
446
447    /// Get the current message ID.
448    ///
449    /// # Returns
450    /// * `Some(id)` - The current message ID
451    /// * `None` - No message ID is set
452    pub fn get_current_message_id(&self) -> Option<&str> {
453        self.current_message_id.as_deref()
454    }
455
456    /// Check if a message ID represents a duplicate final message.
457    ///
458    /// This prevents displaying the same message twice - once after streaming
459    /// completes and again when the final "Assistant" event arrives.
460    ///
461    /// # Arguments
462    /// * `message_id` - The message ID to check
463    ///
464    /// # Returns
465    /// * `true` - This message has already been displayed (is a duplicate)
466    /// * `false` - This is a new message
467    pub fn is_duplicate_final_message(&self, message_id: &str) -> bool {
468        self.displayed_final_messages.contains(message_id)
469    }
470
471    /// Mark a message as displayed to prevent duplicate display.
472    ///
473    /// This should be called after displaying a message's final content.
474    ///
475    /// # Arguments
476    /// * `message_id` - The message ID to mark as displayed
477    pub fn mark_message_displayed(&mut self, message_id: &str) {
478        self.displayed_final_messages.insert(message_id.to_string());
479        self.last_finalized_message_id = Some(message_id.to_string());
480    }
481
482    /// Mark that an assistant event has pre-rendered content BEFORE streaming started.
483    ///
484    /// This is used to handle the case where an assistant event arrives with full content
485    /// BEFORE any streaming deltas. When this happens, we render the assistant event content
486    /// and mark the message_id as pre-rendered. ALL subsequent streaming deltas for the
487    /// same message_id should be suppressed to prevent duplication.
488    ///
489    /// # Arguments
490    /// * `message_id` - The message ID that was pre-rendered
491    pub fn mark_message_pre_rendered(&mut self, message_id: &str) {
492        self.pre_rendered_message_ids.insert(message_id.to_string());
493    }
494
495    /// Check if a message was pre-rendered from an assistant event.
496    ///
497    /// This checks if the given message_id was previously rendered from an assistant
498    /// event (before streaming started). If so, ALL subsequent streaming deltas for
499    /// this message should be suppressed.
500    ///
501    /// # Arguments
502    /// * `message_id` - The message ID to check
503    ///
504    /// # Returns
505    /// * `true` - This message was pre-rendered, suppress all streaming output
506    /// * `false` - This message was not pre-rendered, allow streaming output
507    pub fn is_message_pre_rendered(&self, message_id: &str) -> bool {
508        self.pre_rendered_message_ids.contains(message_id)
509    }
510
511    /// Mark the start of a content block.
512    ///
513    /// This should be called when:
514    /// - Claude: `ContentBlockStart` event
515    /// - Codex: `ItemStarted` with relevant type
516    /// - Gemini: Content section begins
517    /// - `OpenCode`: Part with content starts
518    ///
519    /// If we're already in a block, this method finalizes the previous block
520    /// by emitting a newline if output had started.
521    ///
522    /// # Arguments
523    /// * `index` - The content block index (for multi-block messages)
524    pub fn on_content_block_start(&mut self, index: u64) {
525        let index_str = index.to_string();
526
527        // Check if we're transitioning to a different index BEFORE finalizing.
528        // This is important because some agents (e.g., GLM) may send ContentBlockStart
529        // repeatedly for the same index, and we should NOT clear accumulated content
530        // in that case (which would cause the next delta to show prefix again).
531        let (is_same_index, old_index) = match &self.current_block {
532            ContentBlockState::NotInBlock => (false, None),
533            ContentBlockState::InBlock {
534                index: current_index,
535                ..
536            } => (current_index == &index_str, Some(current_index.clone())),
537        };
538
539        // Finalize previous block if we're in one
540        self.ensure_content_block_finalized();
541
542        // Only clear accumulated content if transitioning to a DIFFERENT index.
543        // We clear the OLD index's content, not the new one.
544        if !is_same_index {
545            if let Some(old) = old_index {
546                for content_type in [
547                    ContentType::Text,
548                    ContentType::Thinking,
549                    ContentType::ToolInput,
550                ] {
551                    let key = (content_type, old.clone());
552                    self.accumulated.remove(&key);
553                    self.key_order.retain(|k| k != &key);
554                    // Also clear output_started tracking to ensure prefix shows when switching back
555                    self.output_started_for_key.remove(&key);
556                    // Clear delta sizes for the old index to prevent incorrect pattern detection
557                    self.delta_sizes.remove(&key);
558                    self.last_rendered.remove(&key);
559                    // Clear consecutive duplicates for the old index
560                    self.consecutive_duplicates.remove(&key);
561                }
562            }
563        }
564
565        // Initialize the new content block
566        self.current_block = ContentBlockState::InBlock {
567            index: index_str,
568            started_output: false,
569        };
570    }
571
572    /// Ensure the current content block is finalized.
573    ///
574    /// If we're in a block and output has started, this returns true to indicate
575    /// that a newline should be emitted. This prevents "glued text" bugs where
576    /// content from different blocks is concatenated without separation.
577    ///
578    /// # Returns
579    /// * `true` - A newline should be emitted (output had started)
580    /// * `false` - No newline needed (no output or not in a block)
581    fn ensure_content_block_finalized(&mut self) -> bool {
582        if let ContentBlockState::InBlock { started_output, .. } = &self.current_block {
583            let had_output = *started_output;
584            self.current_block = ContentBlockState::NotInBlock;
585            had_output
586        } else {
587            false
588        }
589    }
590
591    /// Assert that the session is in a valid lifecycle state.
592    ///
593    /// In debug builds, this will panic if the current state doesn't match
594    /// any of the expected states. In release builds, this does nothing.
595    ///
596    /// # Arguments
597    /// * `expected` - Slice of acceptable states
598    fn assert_lifecycle_state(&self, expected: &[StreamingState]) {
599        #[cfg(debug_assertions)]
600        assert!(
601            expected.contains(&self.state),
602            "Invalid lifecycle state: expected {:?}, got {:?}. \
603            This indicates a bug in the parser's event handling.",
604            expected,
605            self.state
606        );
607        #[cfg(not(debug_assertions))]
608        let _ = expected;
609    }
610
611    /// Process a text delta and return whether prefix should be shown.
612    ///
613    /// # Arguments
614    /// * `index` - The content block index
615    /// * `delta` - The text delta to accumulate
616    ///
617    /// # Returns
618    /// * `true` - Show prefix with this delta (first chunk)
619    /// * `false` - Don't show prefix (subsequent chunks)
620    pub fn on_text_delta(&mut self, index: u64, delta: &str) -> bool {
621        self.on_text_delta_key(&index.to_string(), delta)
622    }
623
624    /// Check for consecutive duplicate delta using the "3 strikes" heuristic.
625    ///
626    /// Detects resend glitches where the exact same delta arrives repeatedly.
627    /// Returns true if the delta should be dropped (exceeded threshold), false otherwise.
628    ///
629    /// # Arguments
630    /// * `content_key` - The content key to check
631    /// * `delta` - The delta to check
632    /// * `key_str` - The string key for logging
633    ///
634    /// # Returns
635    /// * `true` - The delta should be dropped (consecutive duplicate exceeded threshold)
636    /// * `false` - The delta should be processed
637    fn check_consecutive_duplicate(
638        &mut self,
639        content_key: &(ContentType, String),
640        delta: &str,
641        key_str: &str,
642    ) -> bool {
643        let delta_hash = RollingHashWindow::compute_hash(delta);
644        let thresholds = get_overlap_thresholds();
645
646        if let Some((count, prev_hash)) = self.consecutive_duplicates.get_mut(content_key) {
647            if *prev_hash == delta_hash {
648                *count += 1;
649                // Check if we've exceeded the consecutive duplicate threshold
650                if *count >= thresholds.consecutive_duplicate_threshold {
651                    // This is a resend glitch - drop the delta entirely
652                    if self.verbose_warnings {
653                        eprintln!(
654                            "Warning: Dropping consecutive duplicate delta (count={count}, threshold={}). \
655                            This appears to be a resend glitch. Key: '{key_str}', Delta: {delta:?}",
656                            thresholds.consecutive_duplicate_threshold
657                        );
658                    }
659                    // Don't update last_delta - preserve previous for comparison
660                    return true;
661                }
662            } else {
663                // Different delta - reset count and update hash
664                *count = 1;
665                *prev_hash = delta_hash;
666            }
667        } else {
668            // First occurrence of this delta
669            self.consecutive_duplicates
670                .insert(content_key.clone(), (1, delta_hash));
671        }
672
673        false
674    }
675
676    /// Process a text delta with a string key and return whether prefix should be shown.
677    ///
678    /// This variant is for parsers that use string keys instead of numeric indices
679    /// (e.g., Codex uses `agent_msg`, `reasoning`; Gemini uses `main`; `OpenCode` uses `main`).
680    ///
681    /// # Delta Validation
682    ///
683    /// This method validates that incoming content appears to be a genuine delta
684    /// (small chunk) rather than a snapshot (full accumulated content). Large "deltas"
685    /// that exceed `snapshot_threshold()` trigger a warning as they may indicate a
686    /// contract violation.
687    ///
688    /// Additionally, we track patterns of delta sizes to detect repeated large
689    /// content being sent as if it were incremental (a common snapshot-as-delta bug).
690    ///
691    /// # Arguments
692    /// * `key` - The content key (e.g., `main`, `agent_msg`, `reasoning`)
693    /// * `delta` - The text delta to accumulate
694    ///
695    /// # Returns
696    /// * `true` - Show prefix with this delta (first chunk)
697    /// * `false` - Don't show prefix (subsequent chunks)
698    pub fn on_text_delta_key(&mut self, key: &str, delta: &str) -> bool {
699        // Lifecycle enforcement: deltas should only arrive during streaming
700        // or idle (first delta starts streaming), never after finalization
701        self.assert_lifecycle_state(&[StreamingState::Idle, StreamingState::Streaming]);
702
703        let content_key = (ContentType::Text, key.to_string());
704        let delta_size = delta.len();
705
706        // Track delta size and warn on large deltas BEFORE duplicate check
707        // This ensures we track all received deltas even if they're duplicates
708        if delta_size > snapshot_threshold() {
709            self.large_delta_count += 1;
710            if self.verbose_warnings {
711                eprintln!(
712                    "Warning: Large delta ({delta_size} chars) for key '{key}'. \
713                    This may indicate unusual streaming behavior or a snapshot being sent as a delta."
714                );
715            }
716        }
717
718        // Track delta size for pattern detection
719        {
720            let sizes = self.delta_sizes.entry(content_key.clone()).or_default();
721            sizes.push(delta_size);
722
723            // Keep only the most recent delta sizes
724            if sizes.len() > self.max_delta_history {
725                sizes.remove(0);
726            }
727        }
728
729        // Check for exact duplicate delta (same delta sent twice)
730        // This handles the ccs-glm repeated MessageStart scenario where the same
731        // delta is sent multiple times. We skip processing exact duplicates ONLY when
732        // the accumulated content is empty (indicating we just had a MessageStart and
733        // this is a true duplicate, not just a repeated token in normal streaming).
734        if let Some(last) = self.last_delta.get(&content_key) {
735            if delta == last {
736                // Check if accumulated content is empty (just after MessageStart)
737                if let Some(current_accumulated) = self.accumulated.get(&content_key) {
738                    // If accumulated content is empty, this is likely a ccs-glm duplicate
739                    if current_accumulated.is_empty() {
740                        // Skip without updating last_delta (to preserve previous delta for comparison)
741                        return false;
742                    }
743                } else {
744                    // No accumulated content yet, definitely after MessageStart
745                    // Skip without updating last_delta
746                    return false;
747                }
748            }
749        }
750
751        // Consecutive duplicate detection ("3 strikes" heuristic)
752        // Detects resend glitches where the exact same delta arrives repeatedly.
753        // This is different from the above check - it tracks HOW MANY TIMES
754        // the same delta has arrived consecutively, not just if it matches once.
755        if self.check_consecutive_duplicate(&content_key, delta, key) {
756            return false;
757        }
758
759        // Auto-repair: Check if this is a snapshot being sent as a delta
760        // Do this BEFORE any mutable borrows so we can use immutable methods.
761        // Use content-based detection which is more reliable than size-based alone.
762        let is_snapshot = self.is_likely_snapshot(delta, key);
763        let actual_delta = if is_snapshot {
764            // Extract only the new portion to prevent exponential duplication
765            match self.get_delta_from_snapshot(delta, key) {
766                Ok(extracted) => {
767                    // Track successful snapshot repair
768                    self.snapshot_repairs_count += 1;
769                    extracted.to_string()
770                }
771                Err(e) => {
772                    // Snapshot detection had a false positive - use the original delta
773                    if self.verbose_warnings {
774                        eprintln!(
775                            "Warning: Snapshot extraction failed: {e}. Using original delta."
776                        );
777                    }
778                    delta.to_string()
779                }
780            }
781        } else {
782            // Genuine delta - use as-is
783            delta.to_string()
784        };
785
786        // Pattern detection: Check if we're seeing repeated large deltas
787        // This indicates the same content is being sent repeatedly (snapshot-as-delta)
788        let sizes = self.delta_sizes.get(&content_key);
789        if let Some(sizes) = sizes {
790            if sizes.len() >= DEFAULT_PATTERN_DETECTION_MIN_DELTAS && self.verbose_warnings {
791                // Check if at least 3 of the last N deltas were large
792                let large_count = sizes.iter().filter(|&&s| s > snapshot_threshold()).count();
793                if large_count >= DEFAULT_PATTERN_DETECTION_MIN_DELTAS {
794                    eprintln!(
795                        "Warning: Detected pattern of {large_count} large deltas for key '{key}'. \
796                        This strongly suggests a snapshot-as-delta bug where the same \
797                        large content is being sent repeatedly. File: streaming_state.rs, Line: {}",
798                        line!()
799                    );
800                }
801            }
802        }
803
804        // If the actual delta is empty (identical content detected), skip processing
805        if actual_delta.is_empty() {
806            // Return false to indicate no prefix should be shown (content unchanged)
807            return false;
808        }
809
810        // Mark that we're streaming text content
811        self.streamed_types.insert(ContentType::Text, true);
812        self.state = StreamingState::Streaming;
813
814        // Update block state to track this block and mark output as started
815        self.current_block = ContentBlockState::InBlock {
816            index: key.to_string(),
817            started_output: true,
818        };
819
820        // Check if this is the first delta for this key using output_started_for_key
821        // This is independent of accumulated content to handle cases where accumulated
822        // content may be cleared (e.g., repeated ContentBlockStart for same index)
823        let is_first = !self.output_started_for_key.contains(&content_key);
824
825        // Mark that output has started for this key
826        self.output_started_for_key.insert(content_key.clone());
827
828        // Accumulate the delta (using auto-repaired delta if snapshot was detected)
829        self.accumulated
830            .entry(content_key.clone())
831            .and_modify(|buf| buf.push_str(&actual_delta))
832            .or_insert_with(|| actual_delta);
833
834        // Track the last delta for duplicate detection
835        // Use the original delta for tracking (not the auto-repaired version)
836        self.last_delta
837            .insert(content_key.clone(), delta.to_string());
838
839        // Track order
840        if is_first {
841            self.key_order.push(content_key);
842        }
843
844        // Show prefix only on the very first delta
845        is_first
846    }
847
848    /// Process a thinking delta and return whether prefix should be shown.
849    ///
850    /// # Arguments
851    /// * `index` - The content block index
852    /// * `delta` - The thinking delta to accumulate
853    ///
854    /// # Returns
855    /// * `true` - Show prefix with this delta (first chunk)
856    /// * `false` - Don't show prefix (subsequent chunks)
857    pub fn on_thinking_delta(&mut self, index: u64, delta: &str) -> bool {
858        self.on_thinking_delta_key(&index.to_string(), delta)
859    }
860
861    /// Process a thinking delta with a string key and return whether prefix should be shown.
862    ///
863    /// This variant is for parsers that use string keys instead of numeric indices.
864    ///
865    /// # Arguments
866    /// * `key` - The content key (e.g., "reasoning")
867    /// * `delta` - The thinking delta to accumulate
868    ///
869    /// # Returns
870    /// * `true` - Show prefix with this delta (first chunk)
871    /// * `false` - Don't show prefix (subsequent chunks)
872    pub fn on_thinking_delta_key(&mut self, key: &str, delta: &str) -> bool {
873        // Mark that we're streaming thinking content
874        self.streamed_types.insert(ContentType::Thinking, true);
875        self.state = StreamingState::Streaming;
876
877        // Get the key for this content
878        let content_key = (ContentType::Thinking, key.to_string());
879
880        // Check if this is the first delta for this key using output_started_for_key
881        let is_first = !self.output_started_for_key.contains(&content_key);
882
883        // Mark that output has started for this key
884        self.output_started_for_key.insert(content_key.clone());
885
886        // Accumulate the delta
887        self.accumulated
888            .entry(content_key.clone())
889            .and_modify(|buf| buf.push_str(delta))
890            .or_insert_with(|| delta.to_string());
891
892        // Track order
893        if is_first {
894            self.key_order.push(content_key);
895        }
896
897        is_first
898    }
899
900    /// Process a tool input delta.
901    ///
902    /// # Arguments
903    /// * `index` - The content block index
904    /// * `delta` - The tool input delta to accumulate
905    pub fn on_tool_input_delta(&mut self, index: u64, delta: &str) {
906        // Mark that we're streaming tool input
907        self.streamed_types.insert(ContentType::ToolInput, true);
908        self.state = StreamingState::Streaming;
909
910        // Get the key for this content
911        let key = (ContentType::ToolInput, index.to_string());
912
913        // Accumulate the delta
914        self.accumulated
915            .entry(key.clone())
916            .and_modify(|buf| buf.push_str(delta))
917            .or_insert_with(|| delta.to_string());
918
919        // Track order
920        if !self.key_order.contains(&key) {
921            self.key_order.push(key);
922        }
923    }
924
925    /// Finalize the message on stop event.
926    ///
927    /// This should be called when:
928    /// - Claude: `MessageStop` event
929    /// - Codex: `TurnCompleted` or `ItemCompleted` with text
930    /// - Gemini: Message completion
931    /// - `OpenCode`: Part completion
932    ///
933    /// # Returns
934    /// * `true` - A completion newline should be emitted (was in a content block)
935    /// * `false` - No completion needed (no content block active)
936    pub fn on_message_stop(&mut self) -> bool {
937        let was_in_block = self.ensure_content_block_finalized();
938        self.state = StreamingState::Finalized;
939
940        // Compute content hash for deduplication
941        self.final_content_hash = self.compute_content_hash();
942
943        // Mark the current message as displayed to prevent duplicate display
944        // when the final "Assistant" event arrives
945        if let Some(message_id) = self.current_message_id.clone() {
946            self.mark_message_displayed(&message_id);
947        }
948
949        was_in_block
950    }
951
952    /// Check if ANY content has been streamed for this message.
953    ///
954    /// This is a broader check that returns true if ANY content type
955    /// has been streamed. Used to skip entire message display when
956    /// all content was already streamed.
957    pub fn has_any_streamed_content(&self) -> bool {
958        !self.streamed_types.is_empty()
959    }
960
961    /// Compute hash of all accumulated content for deduplication.
962    ///
963    /// This computes a hash of ALL accumulated content across all content types
964    /// and indices. This is used to detect if a final message contains the same
965    /// content that was already streamed.
966    ///
967    /// # Returns
968    /// * `Some(hash)` - Hash of all accumulated content, or None if no content
969    fn compute_content_hash(&self) -> Option<u64> {
970        if self.accumulated.is_empty() {
971            return None;
972        }
973
974        let mut hasher = DefaultHasher::new();
975
976        // Collect and sort keys for consistent hashing
977        // Note: We can't sort ContentType directly, so we convert to tuple representation
978        let mut keys: Vec<_> = self.accumulated.keys().collect();
979        // Sort by string representation for consistency (not perfect but good enough for deduplication)
980        keys.sort_by_key(|k| format!("{:?}-{}", k.0, k.1));
981
982        for key in keys {
983            if let Some(content) = self.accumulated.get(key) {
984                // Hash the key and content together
985                format!("{:?}-{}", key.0, key.1).hash(&mut hasher);
986                content.hash(&mut hasher);
987            }
988        }
989
990        Some(hasher.finish())
991    }
992
993    /// Check if content matches the previously streamed content by hash.
994    ///
995    /// This is a more precise alternative to `has_any_streamed_content()` for
996    /// deduplication. Instead of checking if ANY content was streamed, this checks
997    /// if the EXACT content was streamed by comparing hashes.
998    ///
999    /// This method looks at ALL accumulated content across all content types and indices.
1000    /// If the combined accumulated content matches the input, it returns true.
1001    ///
1002    /// # Arguments
1003    /// * `content` - The content to check (typically text content from final message)
1004    ///
1005    /// # Returns
1006    /// * `true` - The content hash matches the previously streamed content
1007    /// * `false` - The content is different or no content was streamed
1008    pub fn is_duplicate_by_hash(&self, content: &str) -> bool {
1009        // Check if the input content matches ALL accumulated text content combined
1010        // This handles the case where assistant events arrive during streaming (before message_stop)
1011        // We collect and combine all text content in index order
1012        let mut text_keys: Vec<_> = self
1013            .accumulated
1014            .keys()
1015            .filter(|(ct, _)| *ct == ContentType::Text)
1016            .collect();
1017        text_keys.sort_by_key(|k| format!("{:?}-{}", k.0, k.1));
1018
1019        // Combine all accumulated text content in sorted order
1020        let combined_content: String = text_keys
1021            .iter()
1022            .filter_map(|key| self.accumulated.get(key))
1023            .cloned()
1024            .collect();
1025
1026        // Direct string comparison is more reliable than hashing
1027        // because hashing can have collisions and we want exact match
1028        combined_content == content
1029    }
1030
1031    /// Get accumulated content for a specific type and index.
1032    ///
1033    /// # Arguments
1034    /// * `content_type` - The type of content
1035    /// * `index` - The content index (as string for flexibility)
1036    ///
1037    /// # Returns
1038    /// * `Some(text)` - Accumulated content
1039    /// * `None` - No content accumulated for this key
1040    pub fn get_accumulated(&self, content_type: ContentType, index: &str) -> Option<&str> {
1041        self.accumulated
1042            .get(&(content_type, index.to_string()))
1043            .map(std::string::String::as_str)
1044    }
1045
1046    /// Mark content as having been rendered (HashMap-based tracking).
1047    ///
1048    /// This should be called after rendering to update the per-key tracking.
1049    ///
1050    /// # Arguments
1051    /// * `content_type` - The type of content
1052    /// * `index` - The content index (as string for flexibility)
1053    pub fn mark_rendered(&mut self, content_type: ContentType, index: &str) {
1054        let content_key = (content_type, index.to_string());
1055
1056        // Store the current accumulated content as last rendered
1057        if let Some(current) = self.accumulated.get(&content_key) {
1058            self.last_rendered.insert(content_key, current.clone());
1059        }
1060    }
1061
1062    /// Check if content has been rendered before using hash-based tracking.
1063    ///
1064    /// This provides global duplicate detection across all content by computing
1065    /// a hash of the accumulated content and checking if it's in the rendered set.
1066    /// This is preserved across `MessageStart` boundaries to prevent duplicate rendering.
1067    ///
1068    /// # Arguments
1069    /// * `content_type` - The type of content
1070    /// * `index` - The content index (as string for flexibility)
1071    ///
1072    /// # Returns
1073    /// * `true` - This exact content has been rendered before
1074    /// * `false` - This exact content has not been rendered
1075    #[cfg(test)]
1076    pub fn is_content_rendered(&self, content_type: ContentType, index: &str) -> bool {
1077        let content_key = (content_type, index.to_string());
1078
1079        // Check if we have accumulated content for this key
1080        if let Some(current) = self.accumulated.get(&content_key) {
1081            // Compute hash of current accumulated content
1082            let mut hasher = DefaultHasher::new();
1083            current.hash(&mut hasher);
1084            let hash = hasher.finish();
1085
1086            // Check if this hash has been rendered before
1087            return self.rendered_content_hashes.contains(&hash);
1088        }
1089
1090        false
1091    }
1092
1093    /// Check if content has been rendered before and starts with previously rendered content.
1094    ///
1095    /// This method detects when new content extends previously rendered content,
1096    /// indicating an in-place update should be performed (e.g., using carriage return).
1097    ///
1098    /// With the new KMP + Rolling Hash approach, this checks if output has started
1099    /// for this key, which indicates we're in an in-place update scenario.
1100    ///
1101    /// # Arguments
1102    /// * `content_type` - The type of content
1103    /// * `index` - The content index (as string for flexibility)
1104    ///
1105    /// # Returns
1106    /// * `true` - Output has started for this key (do in-place update)
1107    /// * `false` - Output has not started for this key (show new content)
1108    pub fn has_rendered_prefix(&self, content_type: ContentType, index: &str) -> bool {
1109        let content_key = (content_type, index.to_string());
1110        self.output_started_for_key.contains(&content_key)
1111    }
1112
1113    /// Mark content as rendered using hash-based tracking.
1114    ///
1115    /// This method updates the `rendered_content_hashes` set to track all
1116    /// content that has been rendered for deduplication.
1117    ///
1118    /// # Arguments
1119    /// * `content_type` - The type of content
1120    /// * `index` - The content index (as string for flexibility)
1121    #[cfg(test)]
1122    pub fn mark_content_rendered(&mut self, content_type: ContentType, index: &str) {
1123        // Also update last_rendered for compatibility
1124        self.mark_rendered(content_type, index);
1125
1126        // Add the hash of the accumulated content to the rendered set
1127        let content_key = (content_type, index.to_string());
1128        if let Some(current) = self.accumulated.get(&content_key) {
1129            let mut hasher = DefaultHasher::new();
1130            current.hash(&mut hasher);
1131            let hash = hasher.finish();
1132            self.rendered_content_hashes.insert(hash);
1133        }
1134    }
1135
1136    /// Mark content as rendered using pre-sanitized content.
1137    ///
1138    /// This method uses the sanitized content (with whitespace normalized)
1139    /// for hash-based deduplication, which prevents duplicates when the
1140    /// accumulated content differs only by whitespace.
1141    ///
1142    /// # Arguments
1143    /// * `content_type` - The type of content
1144    /// * `index` - The content index (as string for flexibility)
1145    /// * `content` - The content to hash
1146    pub fn mark_content_hash_rendered(
1147        &mut self,
1148        content_type: ContentType,
1149        index: &str,
1150        content: &str,
1151    ) {
1152        // Also update last_rendered for compatibility
1153        self.mark_rendered(content_type, index);
1154
1155        // Add the hash of the content to the rendered set
1156        let mut hasher = DefaultHasher::new();
1157        content.hash(&mut hasher);
1158        let hash = hasher.finish();
1159        self.rendered_content_hashes.insert(hash);
1160    }
1161
1162    /// Check if sanitized content has already been rendered.
1163    ///
1164    /// This method checks the hash of the sanitized content against the
1165    /// rendered set to prevent duplicate rendering.
1166    ///
1167    /// # Arguments
1168    /// * `_content_type` - The type of content (kept for API consistency)
1169    /// * `_index` - The content index (kept for API consistency)
1170    /// * `sanitized_content` - The sanitized content to check
1171    ///
1172    /// # Returns
1173    /// * `true` - This exact content has been rendered before
1174    /// * `false` - This exact content has not been rendered
1175    pub fn is_content_hash_rendered(
1176        &self,
1177        _content_type: ContentType,
1178        _index: &str,
1179        content: &str,
1180    ) -> bool {
1181        // Compute hash of exact content
1182        let mut hasher = DefaultHasher::new();
1183        content.hash(&mut hasher);
1184        let hash = hasher.finish();
1185
1186        // Check if this hash has been rendered before
1187        self.rendered_content_hashes.contains(&hash)
1188    }
1189
1190    /// Check if incoming text is likely a snapshot (full accumulated content) rather than a delta.
1191    ///
1192    /// This uses the KMP + Rolling Hash algorithm for efficient O(n+m) snapshot detection.
1193    /// The two-phase approach ensures optimal performance:
1194    /// 1. Rolling hash for fast O(n) filtering
1195    /// 2. KMP for exact O(n+m) verification
1196    ///
1197    /// # Arguments
1198    /// * `text` - The incoming text to check
1199    /// * `key` - The content key to compare against
1200    ///
1201    /// # Returns
1202    /// * `true` - The text appears to be a snapshot (starts with previous accumulated content)
1203    /// * `false` - The text appears to be a genuine delta
1204    pub fn is_likely_snapshot(&self, text: &str, key: &str) -> bool {
1205        let content_key = (ContentType::Text, key.to_string());
1206
1207        // Check if we have accumulated content for this key
1208        if let Some(previous) = self.accumulated.get(&content_key) {
1209            // Use DeltaDeduplicator with threshold-aware snapshot detection
1210            // This prevents false positives by requiring strong overlap (>=30 chars, >=50% ratio)
1211            return DeltaDeduplicator::is_likely_snapshot_with_thresholds(text, previous);
1212        }
1213
1214        false
1215    }
1216
1217    /// Extract the delta portion from a snapshot.
1218    ///
1219    /// When a snapshot is detected (full accumulated content sent as a "delta"),
1220    /// this method extracts only the new portion that hasn't been accumulated yet.
1221    ///
1222    /// # Arguments
1223    /// * `text` - The snapshot text (full accumulated content + new content)
1224    /// * `key` - The content key to compare against
1225    ///
1226    /// # Returns
1227    /// * `Ok(usize)` - The length of the delta portion (new content only)
1228    /// * `Err` - If the text is not actually a snapshot (doesn't start with accumulated content)
1229    ///
1230    /// # Note
1231    /// Returns the length of the delta portion as `usize` since we can't return
1232    /// a reference to `text` with the correct lifetime. Callers can slice `text`
1233    /// themselves using `&text[delta_len..]`.
1234    pub fn extract_delta_from_snapshot(&self, text: &str, key: &str) -> Result<usize, String> {
1235        let content_key = (ContentType::Text, key.to_string());
1236
1237        if let Some(previous) = self.accumulated.get(&content_key) {
1238            // Use DeltaDeduplicator with threshold-aware delta extraction
1239            // This ensures we only extract when overlap meets strong criteria
1240            if let Some(new_content) =
1241                DeltaDeduplicator::extract_new_content_with_thresholds(text, previous)
1242            {
1243                // Calculate the position where new content starts
1244                let delta_start = text.len() - new_content.len();
1245                return Ok(delta_start);
1246            }
1247        }
1248
1249        // If we get here, the text wasn't actually a snapshot
1250        // This could indicate a false positive from is_likely_snapshot
1251        Err(format!(
1252            "extract_delta_from_snapshot called on non-snapshot text. \
1253            key={key:?}, text={text:?}. Snapshot detection may have had a false positive."
1254        ))
1255    }
1256
1257    /// Get the delta portion as a string slice from a snapshot.
1258    ///
1259    /// This is a convenience wrapper that returns the actual substring
1260    /// instead of just the length.
1261    ///
1262    /// # Returns
1263    /// * `Ok(&str)` - The delta portion (new content only)
1264    /// * `Err` - If the text is not actually a snapshot
1265    pub fn get_delta_from_snapshot<'a>(&self, text: &'a str, key: &str) -> Result<&'a str, String> {
1266        let delta_len = self.extract_delta_from_snapshot(text, key)?;
1267        Ok(&text[delta_len..])
1268    }
1269
1270    /// Get streaming quality metrics for the current session.
1271    ///
1272    /// Returns aggregated metrics about delta sizes and streaming patterns
1273    /// during the session. This is useful for debugging and analyzing
1274    /// streaming behavior.
1275    ///
1276    /// # Returns
1277    /// Aggregated metrics across all content types and keys.
1278    pub fn get_streaming_quality_metrics(&self) -> StreamingQualityMetrics {
1279        // Flatten all delta sizes across all content types and keys
1280        let all_sizes = self.delta_sizes.values().flat_map(|v| v.iter().copied());
1281        let mut metrics = StreamingQualityMetrics::from_sizes(all_sizes);
1282
1283        // Add session-level metrics
1284        metrics.snapshot_repairs_count = self.snapshot_repairs_count;
1285        metrics.large_delta_count = self.large_delta_count;
1286        metrics.protocol_violations = self.protocol_violations;
1287
1288        metrics
1289    }
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294    use super::*;
1295
1296    #[test]
1297    fn test_session_lifecycle() {
1298        let mut session = StreamingSession::new();
1299
1300        // Initially no content streamed
1301        assert!(!session.has_any_streamed_content());
1302
1303        // Message start
1304        session.on_message_start();
1305        assert!(!session.has_any_streamed_content());
1306
1307        // Text delta
1308        let show_prefix = session.on_text_delta(0, "Hello");
1309        assert!(show_prefix);
1310        assert!(session.has_any_streamed_content());
1311
1312        // Another delta
1313        let show_prefix = session.on_text_delta(0, " World");
1314        assert!(!show_prefix);
1315
1316        // Message stop
1317        let was_in_block = session.on_message_stop();
1318        assert!(was_in_block);
1319    }
1320
1321    #[test]
1322    fn test_accumulated_content() {
1323        let mut session = StreamingSession::new();
1324        session.on_message_start();
1325
1326        session.on_text_delta(0, "Hello");
1327        session.on_text_delta(0, " World");
1328
1329        let accumulated = session.get_accumulated(ContentType::Text, "0");
1330        assert_eq!(accumulated, Some("Hello World"));
1331    }
1332
1333    #[test]
1334    fn test_reset_between_messages() {
1335        let mut session = StreamingSession::new();
1336
1337        // First message
1338        session.on_message_start();
1339        session.on_text_delta(0, "First");
1340        assert!(session.has_any_streamed_content());
1341        session.on_message_stop();
1342
1343        // Second message - state should be reset
1344        session.on_message_start();
1345        assert!(!session.has_any_streamed_content());
1346    }
1347
1348    #[test]
1349    fn test_multiple_indices() {
1350        let mut session = StreamingSession::new();
1351        session.on_message_start();
1352
1353        session.on_text_delta(0, "First block");
1354        session.on_text_delta(1, "Second block");
1355
1356        assert_eq!(
1357            session.get_accumulated(ContentType::Text, "0"),
1358            Some("First block")
1359        );
1360        assert_eq!(
1361            session.get_accumulated(ContentType::Text, "1"),
1362            Some("Second block")
1363        );
1364    }
1365
1366    #[test]
1367    fn test_clear_index() {
1368        // Behavioral test: verify that creating a new session gives clean state
1369        // instead of testing clear_index() which is now removed
1370        let mut session = StreamingSession::new();
1371        session.on_message_start();
1372
1373        session.on_text_delta(0, "Before");
1374        // Instead of clearing, verify that a new session starts fresh
1375        let mut fresh_session = StreamingSession::new();
1376        fresh_session.on_message_start();
1377        fresh_session.on_text_delta(0, "After");
1378
1379        assert_eq!(
1380            fresh_session.get_accumulated(ContentType::Text, "0"),
1381            Some("After")
1382        );
1383        // Original session should still have "Before"
1384        assert_eq!(
1385            session.get_accumulated(ContentType::Text, "0"),
1386            Some("Before")
1387        );
1388    }
1389
1390    #[test]
1391    fn test_delta_validation_warns_on_large_delta() {
1392        let mut session = StreamingSession::new();
1393        session.on_message_start();
1394
1395        // Create a delta larger than snapshot_threshold()
1396        let large_delta = "x".repeat(snapshot_threshold() + 1);
1397
1398        // This should trigger a warning but still work
1399        let show_prefix = session.on_text_delta(0, &large_delta);
1400        assert!(show_prefix);
1401
1402        // Content should still be accumulated correctly
1403        assert_eq!(
1404            session.get_accumulated(ContentType::Text, "0"),
1405            Some(large_delta.as_str())
1406        );
1407    }
1408
1409    #[test]
1410    fn test_delta_validation_no_warning_for_small_delta() {
1411        let mut session = StreamingSession::new();
1412        session.on_message_start();
1413
1414        // Small delta should not trigger warning
1415        let small_delta = "Hello, world!";
1416        let show_prefix = session.on_text_delta(0, small_delta);
1417        assert!(show_prefix);
1418
1419        assert_eq!(
1420            session.get_accumulated(ContentType::Text, "0"),
1421            Some(small_delta)
1422        );
1423    }
1424
1425    // Tests for snapshot-as-delta detection methods
1426
1427    #[test]
1428    fn test_is_likely_snapshot_detects_snapshot() {
1429        let mut session = StreamingSession::new();
1430        session.on_message_start();
1431        session.on_content_block_start(0);
1432
1433        // First delta is long enough to meet threshold requirements
1434        // Using 40 chars to ensure it exceeds the 30 char minimum
1435        let initial = "This is a long message that exceeds threshold";
1436        session.on_text_delta(0, initial);
1437
1438        // Simulate GLM sending full accumulated content as next "delta"
1439        // The overlap is 45 chars (100% of initial), meeting both thresholds:
1440        // - char_count = 45 >= 30 ✓
1441        // - ratio = 45/48 ≈ 94% >= 50% ✓
1442        // - ends at safe boundary (space) ✓
1443        let snapshot = format!("{initial} plus new content");
1444        let is_snapshot = session.is_likely_snapshot(&snapshot, "0");
1445        assert!(
1446            is_snapshot,
1447            "Should detect snapshot-as-delta with strong overlap"
1448        );
1449    }
1450
1451    #[test]
1452    fn test_is_likely_snapshot_returns_false_for_genuine_delta() {
1453        let mut session = StreamingSession::new();
1454        session.on_message_start();
1455        session.on_content_block_start(0);
1456
1457        // First delta is "Hello"
1458        session.on_text_delta(0, "Hello");
1459
1460        // Genuine delta " World" doesn't start with previous content
1461        let is_snapshot = session.is_likely_snapshot(" World", "0");
1462        assert!(
1463            !is_snapshot,
1464            "Genuine delta should not be flagged as snapshot"
1465        );
1466    }
1467
1468    #[test]
1469    fn test_is_likely_snapshot_returns_false_when_no_previous_content() {
1470        let mut session = StreamingSession::new();
1471        session.on_message_start();
1472        session.on_content_block_start(0);
1473
1474        // No previous content, so anything is a genuine first delta
1475        let is_snapshot = session.is_likely_snapshot("Hello", "0");
1476        assert!(
1477            !is_snapshot,
1478            "First delta should not be flagged as snapshot"
1479        );
1480    }
1481
1482    #[test]
1483    fn test_extract_delta_from_snapshot() {
1484        let mut session = StreamingSession::new();
1485        session.on_message_start();
1486        session.on_content_block_start(0);
1487
1488        // First delta is long enough to meet threshold requirements
1489        let initial = "This is a long message that exceeds threshold";
1490        session.on_text_delta(0, initial);
1491
1492        // Snapshot should extract new portion
1493        let snapshot = format!("{initial} plus new content");
1494        let delta = session.get_delta_from_snapshot(&snapshot, "0").unwrap();
1495        assert_eq!(delta, " plus new content");
1496    }
1497
1498    #[test]
1499    fn test_extract_delta_from_snapshot_empty_delta() {
1500        let mut session = StreamingSession::new();
1501        session.on_message_start();
1502        session.on_content_block_start(0);
1503
1504        // First delta is "Hello"
1505        session.on_text_delta(0, "Hello");
1506
1507        // Snapshot "Hello" (identical to previous) should extract "" as delta
1508        let delta = session.get_delta_from_snapshot("Hello", "0").unwrap();
1509        assert_eq!(delta, "");
1510    }
1511
1512    #[test]
1513    fn test_extract_delta_from_snapshot_returns_error_on_non_snapshot() {
1514        let mut session = StreamingSession::new();
1515        session.on_message_start();
1516        session.on_content_block_start(0);
1517
1518        // First delta is "Hello"
1519        session.on_text_delta(0, "Hello");
1520
1521        // Calling on non-snapshot should return error (not panic)
1522        let result = session.get_delta_from_snapshot("World", "0");
1523        assert!(result.is_err());
1524        assert!(result
1525            .unwrap_err()
1526            .contains("extract_delta_from_snapshot called on non-snapshot text"));
1527    }
1528
1529    #[test]
1530    fn test_snapshot_detection_with_string_keys() {
1531        let mut session = StreamingSession::new();
1532        session.on_message_start();
1533
1534        // Test with string keys (like Codex/Gemini use)
1535        // Use content long enough to meet threshold requirements
1536        let initial = "This is a long message that exceeds threshold";
1537        session.on_text_delta_key("main", initial);
1538
1539        // Should detect snapshot for string key with strong overlap
1540        let snapshot = format!("{initial} plus new content");
1541        let is_snapshot = session.is_likely_snapshot(&snapshot, "main");
1542        assert!(
1543            is_snapshot,
1544            "Should detect snapshot with string keys when thresholds are met"
1545        );
1546
1547        // Should extract delta correctly
1548        let delta = session.get_delta_from_snapshot(&snapshot, "main").unwrap();
1549        assert_eq!(delta, " plus new content");
1550    }
1551
1552    #[test]
1553    fn test_snapshot_extraction_exact_match() {
1554        let mut session = StreamingSession::new();
1555        session.on_message_start();
1556
1557        // First delta is long enough to meet threshold requirements
1558        let initial = "This is a long message that exceeds threshold";
1559        session.on_text_delta(0, initial);
1560
1561        // Exact match with additional content (strong overlap)
1562        let exact_match = format!("{initial} World");
1563        let delta1 = session.get_delta_from_snapshot(&exact_match, "0").unwrap();
1564        assert_eq!(delta1, " World");
1565    }
1566
1567    #[test]
1568    fn test_token_by_token_streaming_scenario() {
1569        let mut session = StreamingSession::new();
1570        session.on_message_start();
1571
1572        // Simulate token-by-token streaming
1573        let tokens = ["H", "e", "l", "l", "o", " ", "W", "o", "r", "l", "d", "!"];
1574
1575        for token in tokens {
1576            let show_prefix = session.on_text_delta(0, token);
1577
1578            // Only first token should show prefix
1579            if token == "H" {
1580                assert!(show_prefix, "First token should show prefix");
1581            } else {
1582                assert!(!show_prefix, "Subsequent tokens should not show prefix");
1583            }
1584        }
1585
1586        // Verify accumulated content
1587        assert_eq!(
1588            session.get_accumulated(ContentType::Text, "0"),
1589            Some("Hello World!")
1590        );
1591    }
1592
1593    #[test]
1594    fn test_snapshot_in_token_stream() {
1595        let mut session = StreamingSession::new();
1596        session.on_message_start();
1597
1598        // First few tokens as genuine deltas - use longer content to meet thresholds
1599        let initial = "This is a long message that exceeds threshold";
1600        session.on_text_delta(0, initial);
1601        session.on_text_delta(0, " with more content");
1602
1603        // Now GLM sends a snapshot instead of delta
1604        // The accumulated content plus new content should meet thresholds:
1605        // Accumulated: "This is a long message that exceeds threshold with more content" (62 chars)
1606        // Snapshot: accumulated + "! This is additional content" (88 chars total)
1607        // Overlap: 62 chars
1608        // Ratio: 62/88 ≈ 70% >= 50% ✓
1609        let accumulated = session
1610            .get_accumulated(ContentType::Text, "0")
1611            .unwrap()
1612            .to_string();
1613        let snapshot = format!("{accumulated}! This is additional content");
1614        assert!(
1615            session.is_likely_snapshot(&snapshot, "0"),
1616            "Should detect snapshot in token stream with strong overlap"
1617        );
1618
1619        // Extract delta and continue
1620        let delta = session.get_delta_from_snapshot(&snapshot, "0").unwrap();
1621        assert!(delta.contains("! This is additional content"));
1622
1623        // Apply the delta
1624        session.on_text_delta(0, delta);
1625
1626        // Verify final accumulated content
1627        let expected = format!("{accumulated}! This is additional content");
1628        assert_eq!(
1629            session.get_accumulated(ContentType::Text, "0"),
1630            Some(expected.as_str())
1631        );
1632    }
1633
1634    // Tests for message identity tracking
1635
1636    #[test]
1637    fn test_set_and_get_current_message_id() {
1638        let mut session = StreamingSession::new();
1639
1640        // Initially no message ID
1641        assert!(session.get_current_message_id().is_none());
1642
1643        // Set a message ID
1644        session.set_current_message_id(Some("msg-123".to_string()));
1645        assert_eq!(session.get_current_message_id(), Some("msg-123"));
1646
1647        // Clear the message ID
1648        session.set_current_message_id(None);
1649        assert!(session.get_current_message_id().is_none());
1650    }
1651
1652    #[test]
1653    fn test_mark_message_displayed() {
1654        let mut session = StreamingSession::new();
1655
1656        // Initially not marked as displayed
1657        assert!(!session.is_duplicate_final_message("msg-123"));
1658
1659        // Mark as displayed
1660        session.mark_message_displayed("msg-123");
1661        assert!(session.is_duplicate_final_message("msg-123"));
1662
1663        // Different message ID is not a duplicate
1664        assert!(!session.is_duplicate_final_message("msg-456"));
1665    }
1666
1667    #[test]
1668    fn test_message_stop_marks_displayed() {
1669        let mut session = StreamingSession::new();
1670
1671        // Set a message ID
1672        session.set_current_message_id(Some("msg-123".to_string()));
1673
1674        // Start a message with content
1675        session.on_message_start();
1676        session.on_text_delta(0, "Hello");
1677
1678        // Stop should mark as displayed
1679        session.on_message_stop();
1680        assert!(session.is_duplicate_final_message("msg-123"));
1681    }
1682
1683    #[test]
1684    fn test_multiple_messages_tracking() {
1685        let mut session = StreamingSession::new();
1686
1687        // First message
1688        session.set_current_message_id(Some("msg-1".to_string()));
1689        session.on_message_start();
1690        session.on_text_delta(0, "First");
1691        session.on_message_stop();
1692        assert!(session.is_duplicate_final_message("msg-1"));
1693
1694        // Second message
1695        session.set_current_message_id(Some("msg-2".to_string()));
1696        session.on_message_start();
1697        session.on_text_delta(0, "Second");
1698        session.on_message_stop();
1699        assert!(session.is_duplicate_final_message("msg-1"));
1700        assert!(session.is_duplicate_final_message("msg-2"));
1701    }
1702
1703    // Tests for repeated MessageStart handling (GLM/ccs-glm protocol quirk)
1704
1705    #[test]
1706    fn test_repeated_message_start_preserves_output_started() {
1707        let mut session = StreamingSession::new();
1708
1709        // First message start
1710        session.on_message_start();
1711
1712        // First delta should show prefix
1713        let show_prefix = session.on_text_delta(0, "Hello");
1714        assert!(show_prefix, "First delta should show prefix");
1715
1716        // Second delta should NOT show prefix
1717        let show_prefix = session.on_text_delta(0, " World");
1718        assert!(!show_prefix, "Second delta should not show prefix");
1719
1720        // Simulate GLM sending repeated MessageStart during streaming
1721        // This should preserve output_started_for_key to prevent prefix spam
1722        session.on_message_start();
1723
1724        // After repeated MessageStart, delta should NOT show prefix
1725        // because output_started_for_key was preserved
1726        let show_prefix = session.on_text_delta(0, "!");
1727        assert!(
1728            !show_prefix,
1729            "After repeated MessageStart, delta should not show prefix"
1730        );
1731
1732        // Verify accumulated content was cleared (as expected for mid-stream restart)
1733        assert_eq!(
1734            session.get_accumulated(ContentType::Text, "0"),
1735            Some("!"),
1736            "Accumulated content should start fresh after repeated MessageStart"
1737        );
1738    }
1739
1740    #[test]
1741    fn test_repeated_message_start_with_normal_reset_between_messages() {
1742        let mut session = StreamingSession::new();
1743
1744        // First message
1745        session.on_message_start();
1746        session.on_text_delta(0, "First");
1747        session.on_message_stop();
1748
1749        // Second message - normal reset should clear output_started_for_key
1750        session.on_message_start();
1751
1752        // First delta of second message SHOULD show prefix
1753        let show_prefix = session.on_text_delta(0, "Second");
1754        assert!(
1755            show_prefix,
1756            "First delta of new message should show prefix after normal reset"
1757        );
1758    }
1759
1760    #[test]
1761    fn test_repeated_message_start_with_multiple_indices() {
1762        let mut session = StreamingSession::new();
1763
1764        // First message start
1765        session.on_message_start();
1766
1767        // First delta for index 0
1768        let show_prefix = session.on_text_delta(0, "Index0");
1769        assert!(show_prefix, "First delta for index 0 should show prefix");
1770
1771        // First delta for index 1
1772        let show_prefix = session.on_text_delta(1, "Index1");
1773        assert!(show_prefix, "First delta for index 1 should show prefix");
1774
1775        // Simulate repeated MessageStart
1776        session.on_message_start();
1777
1778        // After repeated MessageStart, deltas should NOT show prefix
1779        // because output_started_for_key was preserved for both indices
1780        let show_prefix = session.on_text_delta(0, " more");
1781        assert!(
1782            !show_prefix,
1783            "Delta for index 0 should not show prefix after repeated MessageStart"
1784        );
1785
1786        let show_prefix = session.on_text_delta(1, " more");
1787        assert!(
1788            !show_prefix,
1789            "Delta for index 1 should not show prefix after repeated MessageStart"
1790        );
1791    }
1792
1793    #[test]
1794    fn test_repeated_message_start_during_thinking_stream() {
1795        let mut session = StreamingSession::new();
1796
1797        // First message start
1798        session.on_message_start();
1799
1800        // First thinking delta should show prefix
1801        let show_prefix = session.on_thinking_delta(0, "Thinking...");
1802        assert!(show_prefix, "First thinking delta should show prefix");
1803
1804        // Simulate repeated MessageStart
1805        session.on_message_start();
1806
1807        // After repeated MessageStart, thinking delta should NOT show prefix
1808        let show_prefix = session.on_thinking_delta(0, " more");
1809        assert!(
1810            !show_prefix,
1811            "Thinking delta after repeated MessageStart should not show prefix"
1812        );
1813    }
1814
1815    #[test]
1816    fn test_message_stop_then_message_start_resets_normally() {
1817        let mut session = StreamingSession::new();
1818
1819        // First message
1820        session.on_message_start();
1821        session.on_text_delta(0, "First");
1822
1823        // Message stop finalizes the message
1824        session.on_message_stop();
1825
1826        // New message start should reset normally (not preserve output_started)
1827        session.on_message_start();
1828
1829        // First delta of new message SHOULD show prefix
1830        let show_prefix = session.on_text_delta(0, "Second");
1831        assert!(
1832            show_prefix,
1833            "First delta after MessageStop should show prefix (normal reset)"
1834        );
1835    }
1836
1837    #[test]
1838    fn test_repeated_content_block_start_same_index() {
1839        let mut session = StreamingSession::new();
1840
1841        // Message start
1842        session.on_message_start();
1843
1844        // First delta for index 0
1845        let show_prefix = session.on_text_delta(0, "Hello");
1846        assert!(show_prefix, "First delta should show prefix");
1847
1848        // Simulate repeated ContentBlockStart for same index
1849        // (Some agents send this, and we should NOT clear accumulated content)
1850        session.on_content_block_start(0);
1851
1852        // Delta after repeated ContentBlockStart should NOT show prefix
1853        let show_prefix = session.on_text_delta(0, " World");
1854        assert!(
1855            !show_prefix,
1856            "Delta after repeated ContentBlockStart should not show prefix"
1857        );
1858
1859        // Verify accumulated content was preserved
1860        assert_eq!(
1861            session.get_accumulated(ContentType::Text, "0"),
1862            Some("Hello World"),
1863            "Accumulated content should be preserved across repeated ContentBlockStart"
1864        );
1865    }
1866
1867    // Tests for verbose_warnings feature
1868
1869    #[test]
1870    fn test_verbose_warnings_default_is_disabled() {
1871        let session = StreamingSession::new();
1872        assert!(
1873            !session.verbose_warnings,
1874            "Default should have verbose_warnings disabled"
1875        );
1876    }
1877
1878    #[test]
1879    fn test_with_verbose_warnings_enables_flag() {
1880        let session = StreamingSession::new().with_verbose_warnings(true);
1881        assert!(
1882            session.verbose_warnings,
1883            "Should have verbose_warnings enabled"
1884        );
1885    }
1886
1887    #[test]
1888    fn test_with_verbose_warnings_disabled_explicitly() {
1889        let session = StreamingSession::new().with_verbose_warnings(false);
1890        assert!(
1891            !session.verbose_warnings,
1892            "Should have verbose_warnings disabled"
1893        );
1894    }
1895
1896    #[test]
1897    fn test_large_delta_warning_respects_verbose_flag() {
1898        // Test with verbose warnings enabled
1899        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
1900        session_verbose.on_message_start();
1901
1902        let large_delta = "x".repeat(snapshot_threshold() + 1);
1903        // This would emit a warning to stderr if verbose_warnings is enabled
1904        let _show_prefix = session_verbose.on_text_delta(0, &large_delta);
1905
1906        // Test with verbose warnings disabled (default)
1907        let mut session_quiet = StreamingSession::new();
1908        session_quiet.on_message_start();
1909
1910        let large_delta = "x".repeat(snapshot_threshold() + 1);
1911        // This should NOT emit a warning
1912        let _show_prefix = session_quiet.on_text_delta(0, &large_delta);
1913
1914        // Both sessions should accumulate content correctly
1915        assert_eq!(
1916            session_verbose.get_accumulated(ContentType::Text, "0"),
1917            Some(large_delta.as_str())
1918        );
1919        assert_eq!(
1920            session_quiet.get_accumulated(ContentType::Text, "0"),
1921            Some(large_delta.as_str())
1922        );
1923    }
1924
1925    #[test]
1926    fn test_repeated_message_start_warning_respects_verbose_flag() {
1927        // Test with verbose warnings enabled
1928        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
1929        session_verbose.on_message_start();
1930        session_verbose.on_text_delta(0, "Hello");
1931        // This would emit a warning about repeated MessageStart
1932        session_verbose.on_message_start();
1933
1934        // Test with verbose warnings disabled (default)
1935        let mut session_quiet = StreamingSession::new();
1936        session_quiet.on_message_start();
1937        session_quiet.on_text_delta(0, "Hello");
1938        // This should NOT emit a warning
1939        session_quiet.on_message_start();
1940
1941        // Both sessions should handle the restart correctly
1942        assert_eq!(
1943            session_verbose.get_accumulated(ContentType::Text, "0"),
1944            None,
1945            "Accumulated content should be cleared after repeated MessageStart"
1946        );
1947        assert_eq!(
1948            session_quiet.get_accumulated(ContentType::Text, "0"),
1949            None,
1950            "Accumulated content should be cleared after repeated MessageStart"
1951        );
1952    }
1953
1954    #[test]
1955    fn test_pattern_detection_warning_respects_verbose_flag() {
1956        // Test with verbose warnings enabled
1957        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
1958        session_verbose.on_message_start();
1959
1960        // Send 3 large deltas to trigger pattern detection
1961        // Use different content to avoid consecutive duplicate detection
1962        for i in 0..3 {
1963            let large_delta = format!("{}{i}", "x".repeat(snapshot_threshold() + 1));
1964            let _ = session_verbose.on_text_delta(0, &large_delta);
1965        }
1966
1967        // Test with verbose warnings disabled (default)
1968        let mut session_quiet = StreamingSession::new();
1969        session_quiet.on_message_start();
1970
1971        // Send 3 large deltas (different content to avoid consecutive duplicate detection)
1972        for i in 0..3 {
1973            let large_delta = format!("{}{i}", "x".repeat(snapshot_threshold() + 1));
1974            let _ = session_quiet.on_text_delta(0, &large_delta);
1975        }
1976
1977        // Verify that large_delta_count still tracks all 3 large deltas for both sessions
1978        assert_eq!(
1979            session_verbose
1980                .get_streaming_quality_metrics()
1981                .large_delta_count,
1982            3
1983        );
1984        assert_eq!(
1985            session_quiet
1986                .get_streaming_quality_metrics()
1987                .large_delta_count,
1988            3
1989        );
1990    }
1991
1992    #[test]
1993    fn test_snapshot_extraction_error_warning_respects_verbose_flag() {
1994        // Create a session where we'll trigger a snapshot extraction error
1995        // by manually manipulating accumulated content
1996        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
1997        session_verbose.on_message_start();
1998        session_verbose.on_content_block_start(0);
1999
2000        // First delta
2001        session_verbose.on_text_delta(0, "Hello");
2002
2003        // Manually clear accumulated to simulate a state mismatch
2004        session_verbose.accumulated.clear();
2005
2006        // Now try to process a snapshot - extraction will fail
2007        // This would emit a warning if verbose_warnings is enabled
2008        let _show_prefix = session_verbose.on_text_delta(0, "Hello World");
2009
2010        // Test with verbose warnings disabled (default)
2011        let mut session_quiet = StreamingSession::new();
2012        session_quiet.on_message_start();
2013        session_quiet.on_content_block_start(0);
2014
2015        session_quiet.on_text_delta(0, "Hello");
2016        session_quiet.accumulated.clear();
2017
2018        // This should NOT emit a warning
2019        let _show_prefix = session_quiet.on_text_delta(0, "Hello World");
2020
2021        // The quiet session should handle the error gracefully
2022        assert!(session_quiet
2023            .get_accumulated(ContentType::Text, "0")
2024            .is_some());
2025    }
2026
2027    // Tests for enhanced streaming metrics
2028
2029    #[test]
2030    fn test_streaming_quality_metrics_includes_snapshot_repairs() {
2031        let mut session = StreamingSession::new();
2032        session.on_message_start();
2033        session.on_content_block_start(0);
2034
2035        // First delta - long enough to meet threshold requirements
2036        let initial = "This is a long message that exceeds threshold";
2037        session.on_text_delta(0, initial);
2038
2039        // GLM sends snapshot instead of delta (with strong overlap)
2040        let snapshot = format!("{initial} World!");
2041        let _ = session.on_text_delta(0, &snapshot);
2042
2043        let metrics = session.get_streaming_quality_metrics();
2044        assert_eq!(
2045            metrics.snapshot_repairs_count, 1,
2046            "Should track one snapshot repair"
2047        );
2048    }
2049
2050    #[test]
2051    fn test_streaming_quality_metrics_includes_large_delta_count() {
2052        let mut session = StreamingSession::new();
2053        session.on_message_start();
2054
2055        // Send 3 large deltas
2056        for _ in 0..3 {
2057            let large_delta = "x".repeat(snapshot_threshold() + 1);
2058            let _ = session.on_text_delta(0, &large_delta);
2059        }
2060
2061        let metrics = session.get_streaming_quality_metrics();
2062        assert_eq!(
2063            metrics.large_delta_count, 3,
2064            "Should track three large deltas"
2065        );
2066    }
2067
2068    #[test]
2069    fn test_streaming_quality_metrics_includes_protocol_violations() {
2070        let mut session = StreamingSession::new();
2071        session.on_message_start();
2072        session.on_text_delta(0, "Hello");
2073
2074        // Simulate GLM sending repeated MessageStart during streaming
2075        session.on_message_start();
2076        session.on_text_delta(0, " World");
2077
2078        // Another violation
2079        session.on_message_start();
2080
2081        let metrics = session.get_streaming_quality_metrics();
2082        assert_eq!(
2083            metrics.protocol_violations, 2,
2084            "Should track two protocol violations"
2085        );
2086    }
2087
2088    #[test]
2089    fn test_streaming_quality_metrics_all_new_fields_zero_by_default() {
2090        let session = StreamingSession::new();
2091        let metrics = session.get_streaming_quality_metrics();
2092
2093        assert_eq!(metrics.snapshot_repairs_count, 0);
2094        assert_eq!(metrics.large_delta_count, 0);
2095        assert_eq!(metrics.protocol_violations, 0);
2096    }
2097
2098    #[test]
2099    fn test_streaming_quality_metrics_comprehensive_tracking() {
2100        let mut session = StreamingSession::new();
2101        session.on_message_start();
2102
2103        // Normal delta
2104        session.on_text_delta(0, "Hello");
2105
2106        // Large delta
2107        let large_delta = "x".repeat(snapshot_threshold() + 1);
2108        let _ = session.on_text_delta(0, &large_delta);
2109
2110        // Snapshot repair (note: the snapshot is also large, so it counts as another large delta)
2111        let snapshot = format!("Hello{large_delta} World");
2112        let _ = session.on_text_delta(0, &snapshot);
2113
2114        // Check metrics BEFORE the protocol violation (which clears delta_sizes)
2115        let metrics = session.get_streaming_quality_metrics();
2116        assert_eq!(metrics.snapshot_repairs_count, 1);
2117        assert_eq!(
2118            metrics.large_delta_count, 2,
2119            "Both the large delta and the snapshot are large"
2120        );
2121        assert_eq!(metrics.total_deltas, 3);
2122        assert_eq!(metrics.protocol_violations, 0, "No violation yet");
2123
2124        // Protocol violation
2125        session.on_message_start();
2126
2127        // After violation, protocol_violations is incremented but delta_sizes is cleared
2128        let metrics_after = session.get_streaming_quality_metrics();
2129        assert_eq!(metrics_after.protocol_violations, 1);
2130        assert_eq!(
2131            metrics_after.total_deltas, 0,
2132            "Delta sizes cleared after violation"
2133        );
2134    }
2135
2136    // Tests for hash-based deduplication
2137
2138    #[test]
2139    fn test_content_hash_computed_on_message_stop() {
2140        let mut session = StreamingSession::new();
2141        session.on_message_start();
2142        session.on_text_delta(0, "Hello");
2143        session.on_text_delta(0, " World");
2144
2145        // Hash should be None before message_stop
2146        assert_eq!(session.final_content_hash, None);
2147
2148        // Hash should be computed after message_stop
2149        session.on_message_stop();
2150        assert!(session.final_content_hash.is_some());
2151    }
2152
2153    #[test]
2154    fn test_content_hash_none_when_no_content() {
2155        let mut session = StreamingSession::new();
2156        session.on_message_start();
2157
2158        // No content streamed
2159        session.on_message_stop();
2160        assert_eq!(session.final_content_hash, None);
2161    }
2162
2163    #[test]
2164    fn test_is_duplicate_by_hash_returns_true_for_matching_content() {
2165        let mut session = StreamingSession::new();
2166        session.on_message_start();
2167        session.on_text_delta(0, "Hello World");
2168        session.on_message_stop();
2169
2170        // Same content should be detected as duplicate
2171        assert!(session.is_duplicate_by_hash("Hello World"));
2172    }
2173
2174    #[test]
2175    fn test_is_duplicate_by_hash_returns_false_for_different_content() {
2176        let mut session = StreamingSession::new();
2177        session.on_message_start();
2178        session.on_text_delta(0, "Hello World");
2179        session.on_message_stop();
2180
2181        // Different content should NOT be detected as duplicate
2182        assert!(!session.is_duplicate_by_hash("Different content"));
2183    }
2184
2185    #[test]
2186    fn test_is_duplicate_by_hash_returns_false_when_no_content_streamed() {
2187        let session = StreamingSession::new();
2188
2189        // No content streamed, so no hash
2190        assert!(!session.is_duplicate_by_hash("Hello World"));
2191    }
2192
2193    #[test]
2194    fn test_content_hash_multiple_content_blocks() {
2195        let mut session = StreamingSession::new();
2196        session.on_message_start();
2197        session.on_text_delta(0, "First block");
2198        session.on_text_delta(1, "Second block");
2199        session.on_message_stop();
2200
2201        // Hash should be computed from all blocks
2202        assert!(session.final_content_hash.is_some());
2203        // Individual content shouldn't match the combined hash
2204        assert!(!session.is_duplicate_by_hash("First block"));
2205        assert!(!session.is_duplicate_by_hash("Second block"));
2206    }
2207
2208    #[test]
2209    fn test_content_hash_consistent_for_same_content() {
2210        let mut session1 = StreamingSession::new();
2211        session1.on_message_start();
2212        session1.on_text_delta(0, "Hello");
2213        session1.on_text_delta(0, " World");
2214        session1.on_message_stop();
2215
2216        let mut session2 = StreamingSession::new();
2217        session2.on_message_start();
2218        session2.on_text_delta(0, "Hello World");
2219        session2.on_message_stop();
2220
2221        // Same content should produce the same hash
2222        assert_eq!(session1.final_content_hash, session2.final_content_hash);
2223    }
2224
2225    // Tests for rapid index switching edge case (RFC-003)
2226
2227    #[test]
2228    fn test_rapid_index_switch_with_clear() {
2229        let mut session = StreamingSession::new();
2230        session.on_message_start();
2231
2232        // Start block 0 and accumulate content
2233        session.on_content_block_start(0);
2234        let show_prefix = session.on_text_delta(0, "X");
2235        assert!(show_prefix, "First delta for index 0 should show prefix");
2236        assert_eq!(session.get_accumulated(ContentType::Text, "0"), Some("X"));
2237
2238        // Switch to block 1 - this should clear accumulated content for index 0
2239        session.on_content_block_start(1);
2240
2241        // Verify accumulated for index 0 was cleared
2242        assert_eq!(
2243            session.get_accumulated(ContentType::Text, "0"),
2244            None,
2245            "Accumulated content for index 0 should be cleared when switching to index 1"
2246        );
2247
2248        // Switch back to index 0
2249        session.on_content_block_start(0);
2250
2251        // Since output_started_for_key was also cleared, prefix should show again
2252        let show_prefix = session.on_text_delta(0, "Y");
2253        assert!(
2254            show_prefix,
2255            "Prefix should show when switching back to a previously cleared index"
2256        );
2257
2258        // Verify new content is accumulated fresh
2259        assert_eq!(
2260            session.get_accumulated(ContentType::Text, "0"),
2261            Some("Y"),
2262            "New content should be accumulated fresh after clear"
2263        );
2264    }
2265
2266    #[test]
2267    fn test_delta_sizes_cleared_on_index_switch() {
2268        let mut session = StreamingSession::new();
2269        session.on_message_start();
2270
2271        // Track some delta sizes for index 0
2272        session.on_text_delta(0, "Hello");
2273        session.on_text_delta(0, " World");
2274
2275        let content_key = (ContentType::Text, "0".to_string());
2276        assert!(
2277            session.delta_sizes.contains_key(&content_key),
2278            "Delta sizes should be tracked for index 0"
2279        );
2280        let sizes_before = session.delta_sizes.get(&content_key).unwrap();
2281        assert_eq!(sizes_before.len(), 2, "Should have 2 delta sizes tracked");
2282
2283        // Switch to index 1 - this should clear delta_sizes for index 0
2284        session.on_content_block_start(1);
2285
2286        assert!(
2287            !session.delta_sizes.contains_key(&content_key),
2288            "Delta sizes for index 0 should be cleared when switching to index 1"
2289        );
2290
2291        // Add deltas for index 1
2292        session.on_text_delta(1, "New");
2293
2294        let content_key_1 = (ContentType::Text, "1".to_string());
2295        let sizes_after = session.delta_sizes.get(&content_key_1).unwrap();
2296        assert_eq!(
2297            sizes_after.len(),
2298            1,
2299            "Should have fresh size tracking for index 1"
2300        );
2301    }
2302
2303    #[test]
2304    fn test_rapid_index_switch_with_thinking_content() {
2305        let mut session = StreamingSession::new();
2306        session.on_message_start();
2307
2308        // Start thinking content in index 0
2309        session.on_content_block_start(0);
2310        let show_prefix = session.on_thinking_delta(0, "Thinking...");
2311        assert!(show_prefix, "First thinking delta should show prefix");
2312        assert_eq!(
2313            session.get_accumulated(ContentType::Thinking, "0"),
2314            Some("Thinking...")
2315        );
2316
2317        // Switch to text content in index 1 - this should clear index 0's accumulated
2318        session.on_content_block_start(1);
2319
2320        // Verify index 0's accumulated thinking was cleared
2321        assert_eq!(
2322            session.get_accumulated(ContentType::Thinking, "0"),
2323            None,
2324            "Thinking content for index 0 should be cleared when switching to index 1"
2325        );
2326
2327        let show_prefix = session.on_text_delta(1, "Text");
2328        assert!(
2329            show_prefix,
2330            "First text delta for index 1 should show prefix"
2331        );
2332
2333        // Switch back to index 0 for thinking
2334        session.on_content_block_start(0);
2335
2336        // Since output_started_for_key for (Thinking, "0") was cleared when switching to index 1,
2337        // the prefix should show again
2338        let show_prefix = session.on_thinking_delta(0, " more");
2339        assert!(
2340            show_prefix,
2341            "Thinking prefix should show when switching back to cleared index 0"
2342        );
2343
2344        // Verify thinking content was accumulated fresh (only the new content)
2345        assert_eq!(
2346            session.get_accumulated(ContentType::Thinking, "0"),
2347            Some(" more"),
2348            "Thinking content should be accumulated fresh after clear"
2349        );
2350    }
2351
2352    #[test]
2353    fn test_output_started_for_key_cleared_across_all_content_types() {
2354        let mut session = StreamingSession::new();
2355        session.on_message_start();
2356
2357        // Start block 0 with text and thinking
2358        // Note: ToolInput does not use output_started_for_key tracking
2359        session.on_content_block_start(0);
2360        session.on_text_delta(0, "Text");
2361        session.on_thinking_delta(0, "Thinking");
2362
2363        // Verify text and thinking have started output
2364        let text_key = (ContentType::Text, "0".to_string());
2365        let thinking_key = (ContentType::Thinking, "0".to_string());
2366
2367        assert!(session.output_started_for_key.contains(&text_key));
2368        assert!(session.output_started_for_key.contains(&thinking_key));
2369
2370        // Switch to index 1 - should clear output_started_for_key for all content types
2371        session.on_content_block_start(1);
2372
2373        assert!(
2374            !session.output_started_for_key.contains(&text_key),
2375            "Text output_started should be cleared for index 0"
2376        );
2377        assert!(
2378            !session.output_started_for_key.contains(&thinking_key),
2379            "Thinking output_started should be cleared for index 0"
2380        );
2381    }
2382
2383    // Tests for environment variable configuration
2384
2385    #[test]
2386    fn test_snapshot_threshold_default() {
2387        // Ensure no env var is set for this test
2388        std::env::remove_var("RALPH_STREAMING_SNAPSHOT_THRESHOLD");
2389        // Note: Since we use OnceLock, we can't reset the value in tests.
2390        // This test documents the default behavior.
2391        let threshold = snapshot_threshold();
2392        assert_eq!(
2393            threshold, DEFAULT_SNAPSHOT_THRESHOLD,
2394            "Default threshold should be 200"
2395        );
2396    }
2397}