ralph_workflow/json_parser/
streaming_state.rs

1//! Unified streaming state tracking module.
2//!
3//! This module provides a single source of truth for streaming state across
4//! all parsers (`Claude`, `Codex`, `Gemini`, `OpenCode`). It implements the streaming
5//! contract:
6//!
7//! # Streaming Contract
8//!
9//! 1. **Delta contract**: Each streaming event contains only newly generated text
10//! 2. **Message lifecycle**: `MessageStart` → (`ContentBlockStart` + deltas)* → `MessageStop`
11//! 3. **Deduplication rule**: Content displayed during streaming is never re-displayed
12//! 4. **State reset**: Streaming state resets on `MessageStart`/Init events
13//!
14//! # Message Lifecycle
15//!
16//! The streaming message lifecycle follows this sequence:
17//!
18//! 1. **`MessageStart`** (or equivalent init event):
19//!    - Resets all streaming state to `Idle`
20//!    - Clears accumulated content
21//!    - Resets content block state
22//!
23//! 2. **`ContentBlockStart`** (optional, for each content block):
24//!    - If already in a block with `started_output=true`, finalizes the previous block
25//!    - Initializes tracking for the new block index
26//!    - Clears any accumulated content for this block index
27//!
28//! 3. **Text/Thinking deltas** (zero or more per block):
29//!    - First delta for each `(content_type, index)` shows prefix
30//!    - Subsequent deltas update in-place (with prefix, using carriage return)
31//!    - Sets `started_output=true` for the current block
32//!
33//! 4. **`MessageStop`**:
34//!    - Finalizes the current content block
35//!    - Marks message as displayed to prevent duplicate final output
36//!    - Returns whether content was streamed (for emitting completion newline)
37//!
38//! # Content Block Transitions
39//!
40//! When transitioning between content blocks (e.g., block 0 → block 1):
41//!
42//! ```ignore
43//! // Streaming "Hello" in block 0
44//! session.on_text_delta(0, "Hello");  // started_output = true
45//!
46//! // Transition to block 1
47//! session.on_content_block_start(1);  // Finalizes block 0, started_output was true
48//!
49//! // Stream "World" in block 1
50//! session.on_text_delta(1, "World");  // New block, shows prefix again
51//! ```
52//!
53//! The `ContentBlockState::InBlock { index, started_output }` tracks:
54//! - `index`: Which block is currently active
55//! - `started_output`: Whether any content was output for this block
56//!
57//! This state enables proper finalization of previous blocks when new ones start.
58//!
59//! # Delta Contract
60//!
61//! This module enforces a strict **delta contract** - all streaming events must
62//! contain only the newly generated text (deltas), not the full accumulated content.
63//!
64//! Treating snapshots as deltas causes exponential duplication bugs. The session
65//! validates that incoming content is genuinely delta-sized and rejects likely
66//! snapshot-as-delta violations.
67//!
68//! # Example
69//!
70//! ```ignore
71//! use crate::json_parser::streaming_state::{StreamingSession, StreamingState};
72//!
73//! let mut session = StreamingSession::new();
74//!
75//! // Message starts - reset state
76//! session.on_message_start();
77//!
78//! // Content block starts
79//! session.on_content_block_start(0);
80//!
81//! // Text deltas arrive - accumulate and display
82//! let should_show_prefix = session.on_text_delta(0, "Hello");
83//! assert!(should_show_prefix); // First chunk shows prefix
84//!
85//! let should_show_prefix = session.on_text_delta(0, " World");
86//! assert!(!should_show_prefix); // Subsequent chunks don't show prefix
87//!
88//! // Check if content was already streamed (for deduplication)
89//! assert!(session.has_any_streamed_content());
90//!
91//! // Message stops - finalize
92//! session.on_message_stop();
93//! ```
94
95use crate::json_parser::deduplication::RollingHashWindow;
96use crate::json_parser::deduplication::{get_overlap_thresholds, DeltaDeduplicator};
97use crate::json_parser::health::StreamingQualityMetrics;
98use crate::json_parser::types::ContentType;
99use std::collections::hash_map::DefaultHasher;
100use std::collections::{HashMap, HashSet};
101use std::hash::{Hash, Hasher};
102use std::sync::OnceLock;
103
104// Streaming configuration constants
105
106/// Default threshold for detecting snapshot-as-delta violations (in characters).
107///
108/// Deltas exceeding this size are flagged as potential snapshots. The value of 200
109/// characters was chosen because:
110/// - Normal deltas are typically < 100 characters (a few tokens)
111/// - Snapshots often contain the full accumulated content (200+ chars)
112/// - This threshold catches most violations while minimizing false positives
113const DEFAULT_SNAPSHOT_THRESHOLD: usize = 200;
114
115/// Minimum allowed snapshot threshold (in characters).
116///
117/// Values below 50 would cause excessive false positives for normal deltas,
118/// as even small text chunks (1-2 sentences) can exceed 30 characters.
119const MIN_SNAPSHOT_THRESHOLD: usize = 50;
120
121/// Maximum allowed snapshot threshold (in characters).
122///
123/// Values above 1000 would allow malicious snapshots to pass undetected,
124/// potentially causing exponential duplication bugs.
125const MAX_SNAPSHOT_THRESHOLD: usize = 1000;
126
127/// Minimum number of consecutive large deltas required to trigger pattern detection warning.
128///
129/// This threshold prevents false positives from occasional large deltas.
130/// Three consecutive large deltas indicate a pattern (not a one-off event).
131const DEFAULT_PATTERN_DETECTION_MIN_DELTAS: usize = 3;
132
133/// Maximum number of delta sizes to track per content key for pattern detection.
134///
135/// Tracking recent delta sizes allows us to detect patterns of repeated large
136/// content (a sign of snapshot-as-delta bugs). Ten entries provide sufficient
137/// history without excessive memory usage.
138const DEFAULT_MAX_DELTA_HISTORY: usize = 10;
139
140/// Ralph enforces a **delta contract** for all streaming content.
141///
142/// Every streaming event must contain only the newly generated text (delta),
143/// never the full accumulated content (snapshot).
144///
145/// # Contract Violations
146///
147/// If a parser emits snapshot-style content when deltas are expected, it will
148/// cause exponential duplication bugs. The `StreamingSession` validates that
149/// incoming content is delta-sized and logs warnings when violations are detected.
150///
151/// # Validation Threshold
152///
153/// Deltas are expected to be small chunks (typically < 100 chars). If a single
154/// "delta" exceeds `snapshot_threshold()` characters, it may indicate a snapshot
155/// being treated as a delta.
156///
157/// # Pattern Detection
158///
159/// In addition to size threshold, we track patterns of repeated large content
160/// which may indicate a snapshot-as-delta bug where the same content is being
161/// sent repeatedly as if it were incremental.
162///
163/// # Environment Variables
164///
165/// The following environment variables can be set to configure streaming behavior:
166///
167/// - `RALPH_STREAMING_SNAPSHOT_THRESHOLD`: Threshold for detecting snapshot-as-delta
168///   violations (default: 200). Deltas exceeding this size trigger warnings.
169///
170/// Get the snapshot threshold from environment variable or use default.
171///
172/// Reads `RALPH_STREAMING_SNAPSHOT_THRESHOLD` env var.
173/// Valid range: 50-1000 characters.
174/// Falls back to default of 200 if not set or out of range.
175fn snapshot_threshold() -> usize {
176    static THRESHOLD: OnceLock<usize> = OnceLock::new();
177    *THRESHOLD.get_or_init(|| {
178        std::env::var("RALPH_STREAMING_SNAPSHOT_THRESHOLD")
179            .ok()
180            .and_then(|s| s.parse::<usize>().ok())
181            .and_then(|v| {
182                if (MIN_SNAPSHOT_THRESHOLD..=MAX_SNAPSHOT_THRESHOLD).contains(&v) {
183                    Some(v)
184                } else {
185                    None
186                }
187            })
188            .unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD)
189    })
190}
191
192/// Streaming state for the current message lifecycle.
193///
194/// Tracks whether we're in the middle of streaming content and whether
195/// that content has been displayed to the user.
196#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
197pub enum StreamingState {
198    /// No active streaming - idle state
199    #[default]
200    Idle,
201    /// Currently streaming content deltas
202    Streaming,
203    /// Content has been finalized (after `MessageStop` or equivalent)
204    Finalized,
205}
206
207/// State tracking for content blocks during streaming.
208///
209/// Replaces the boolean `in_content_block` with richer state that tracks
210/// which block is active and whether output has started for that block.
211/// This prevents "glued text" bugs where block boundaries are crossed
212/// without proper finalization.
213#[derive(Debug, Clone, PartialEq, Eq, Default)]
214pub enum ContentBlockState {
215    /// Not currently inside a content block
216    #[default]
217    NotInBlock,
218    /// Inside a content block with tracking for output state
219    InBlock {
220        /// The block index/identifier
221        index: String,
222        /// Whether any content has been output for this block
223        started_output: bool,
224    },
225}
226
227/// Unified streaming session tracker.
228///
229/// Provides a single source of truth for streaming state across all parsers.
230/// Tracks:
231/// - Current streaming state (`Idle`/`Streaming`/`Finalized`)
232/// - Which content types have been streamed
233/// - Accumulated content by content type and index
234/// - Whether prefix should be shown on next delta
235/// - Delta size patterns for detecting snapshot-as-delta violations
236/// - Persistent "output started" tracking independent of accumulated content
237/// - Verbosity-aware warning emission
238///
239/// # Lifecycle
240///
241/// 1. **Start**: `on_message_start()` - resets all state
242/// 2. **Stream**: `on_text_delta()` / `on_thinking_delta()` - accumulate content
243/// 3. **Stop**: `on_message_stop()` - finalize the message
244/// 4. **Repeat**: Back to step 1 for next message
245#[derive(Debug, Default, Clone)]
246pub struct StreamingSession {
247    /// Current streaming state
248    state: StreamingState,
249    /// Track which content types have been streamed (for deduplication)
250    /// Maps `ContentType` → whether it has been streamed
251    streamed_types: HashMap<ContentType, bool>,
252    /// Track the current content block state
253    current_block: ContentBlockState,
254    /// Accumulated content by (`content_type`, index) for display
255    /// This mirrors `DeltaAccumulator` but adds deduplication tracking
256    accumulated: HashMap<(ContentType, String), String>,
257    /// Track the order of keys for `most_recent` operations
258    key_order: Vec<(ContentType, String)>,
259    /// Track recent delta sizes for pattern detection
260    /// Maps `(content_type, key)` → vec of recent delta sizes
261    delta_sizes: HashMap<(ContentType, String), Vec<usize>>,
262    /// Maximum number of delta sizes to track per key
263    max_delta_history: usize,
264    /// Track the current message ID for duplicate detection
265    current_message_id: Option<String>,
266    /// Track the last finalized message ID to detect duplicates
267    last_finalized_message_id: Option<String>,
268    /// Track which messages have been displayed to prevent duplicate final output
269    displayed_final_messages: HashSet<String>,
270    /// Track which (`content_type`, key) pairs have had output started.
271    /// This is independent of `accumulated` to handle cases where accumulated
272    /// content may be cleared (e.g., repeated `ContentBlockStart` for same index).
273    /// Cleared on `on_message_start` to ensure fresh state for each message.
274    output_started_for_key: HashSet<(ContentType, String)>,
275    /// Whether to emit verbose warnings about streaming anomalies.
276    /// When false, suppresses diagnostic warnings that are useful for debugging
277    /// but noisy in production (e.g., GLM protocol violations, snapshot detection).
278    verbose_warnings: bool,
279    /// Count of snapshot repairs performed during this session
280    snapshot_repairs_count: usize,
281    /// Count of deltas that exceeded the size threshold
282    large_delta_count: usize,
283    /// Count of protocol violations detected (e.g., `MessageStart` during streaming)
284    protocol_violations: usize,
285    /// Hash of the final streamed content (for deduplication)
286    /// Computed at `message_stop` using all accumulated content
287    final_content_hash: Option<u64>,
288    /// Track the last rendered content for each key to detect when rendering
289    /// would produce identical output (prevents visual repetition).
290    /// Maps `(content_type, key)` → the last accumulated content that was rendered.
291    last_rendered: HashMap<(ContentType, String), String>,
292    /// Track all rendered content hashes for duplicate detection.
293    /// This is preserved across `MessageStart` boundaries to prevent duplicate rendering.
294    rendered_content_hashes: HashSet<u64>,
295    /// Track the last delta for each key to detect exact duplicate deltas.
296    /// This is preserved across `MessageStart` boundaries to prevent duplicate processing.
297    /// Maps `(content_type, key)` → the last delta that was processed.
298    last_delta: HashMap<(ContentType, String), String>,
299    /// Track consecutive duplicates for resend glitch detection ("3 strikes" heuristic).
300    /// Maps `(content_type, key)` → (count, `delta_hash`) where count tracks how many
301    /// times the exact same delta has arrived consecutively. When count exceeds
302    /// the threshold, the delta is dropped as a resend glitch.
303    consecutive_duplicates: HashMap<(ContentType, String), (usize, u64)>,
304    /// Delta deduplicator using KMP and rolling hash for snapshot detection.
305    /// Provides O(n+m) guaranteed complexity for detecting snapshot-as-delta violations.
306    /// Cleared on message boundaries to prevent false positives.
307    deduplicator: DeltaDeduplicator,
308}
309
310impl StreamingSession {
311    /// Create a new streaming session.
312    pub fn new() -> Self {
313        Self {
314            max_delta_history: DEFAULT_MAX_DELTA_HISTORY,
315            verbose_warnings: false,
316            ..Default::default()
317        }
318    }
319
320    /// Configure whether to emit verbose warnings about streaming anomalies.
321    ///
322    /// When enabled, diagnostic warnings are printed for:
323    /// - Repeated `MessageStart` events (GLM protocol violations)
324    /// - Large deltas that may indicate snapshot-as-delta bugs
325    /// - Pattern detection of repeated large content
326    ///
327    /// When disabled (default), these warnings are suppressed to avoid
328    /// noise in production output.
329    ///
330    /// # Arguments
331    /// * `enabled` - Whether to enable verbose warnings
332    ///
333    /// # Returns
334    /// The modified session for builder chaining.
335    ///
336    /// # Example
337    ///
338    /// ```ignore
339    /// let mut session = StreamingSession::new().with_verbose_warnings(true);
340    /// ```
341    pub const fn with_verbose_warnings(mut self, enabled: bool) -> Self {
342        self.verbose_warnings = enabled;
343        self
344    }
345
346    /// Reset the session on new message start.
347    ///
348    /// This should be called when:
349    /// - Claude: `MessageStart` event
350    /// - Codex: `TurnStarted` event
351    /// - Gemini: `init` event or new message
352    /// - `OpenCode`: New part starts
353    ///
354    /// # Arguments
355    /// * `message_id` - Optional unique identifier for this message (for deduplication)
356    ///
357    /// # Note on Repeated `MessageStart` Events
358    ///
359    /// Some agents (notably GLM/ccs-glm) send repeated `MessageStart` events during
360    /// a single logical streaming session. When this happens while state is `Streaming`,
361    /// we preserve `output_started_for_key` to prevent prefix spam on each delta that
362    /// follows the repeated `MessageStart`. This is a defensive measure to handle
363    /// non-standard agent protocols while maintaining correct behavior for legitimate
364    /// multi-message scenarios.
365    pub fn on_message_start(&mut self) {
366        // Detect repeated MessageStart during active streaming
367        let is_mid_stream_restart = self.state == StreamingState::Streaming;
368
369        if is_mid_stream_restart {
370            // Track protocol violation
371            self.protocol_violations += 1;
372            // Log the contract violation for debugging (only if verbose warnings enabled)
373            if self.verbose_warnings {
374                eprintln!(
375                    "Warning: Received MessageStart while state is Streaming. \
376                    This indicates a non-standard agent protocol (e.g., GLM sending \
377                    repeated MessageStart events). Preserving output_started_for_key \
378                    to prevent prefix spam. File: streaming_state.rs, Line: {}",
379                    line!()
380                );
381            }
382
383            // Preserve output_started_for_key to prevent prefix spam.
384            // std::mem::take replaces the HashSet with an empty one and returns the old values,
385            // which we restore after clearing other state. This ensures repeated MessageStart
386            // events don't reset output tracking, preventing duplicate prefix display.
387            let preserved_output_started = std::mem::take(&mut self.output_started_for_key);
388
389            // Also preserve last_delta to detect duplicate deltas across MessageStart boundaries
390            let preserved_last_delta = std::mem::take(&mut self.last_delta);
391
392            // Also preserve rendered_content_hashes to detect duplicate rendering across MessageStart
393            let preserved_rendered_hashes = std::mem::take(&mut self.rendered_content_hashes);
394
395            // Also preserve consecutive_duplicates to detect resend glitches across MessageStart
396            let preserved_consecutive_duplicates = std::mem::take(&mut self.consecutive_duplicates);
397
398            self.state = StreamingState::Idle;
399            self.streamed_types.clear();
400            self.current_block = ContentBlockState::NotInBlock;
401            self.accumulated.clear();
402            self.key_order.clear();
403            self.delta_sizes.clear();
404            self.last_rendered.clear();
405            self.deduplicator.clear();
406
407            // Restore preserved state
408            self.output_started_for_key = preserved_output_started;
409            self.last_delta = preserved_last_delta;
410            self.rendered_content_hashes = preserved_rendered_hashes;
411            self.consecutive_duplicates = preserved_consecutive_duplicates;
412        } else {
413            // Normal reset for new message
414            self.state = StreamingState::Idle;
415            self.streamed_types.clear();
416            self.current_block = ContentBlockState::NotInBlock;
417            self.accumulated.clear();
418            self.key_order.clear();
419            self.delta_sizes.clear();
420            self.output_started_for_key.clear();
421            self.last_rendered.clear();
422            self.last_delta.clear();
423            self.rendered_content_hashes.clear();
424            self.consecutive_duplicates.clear();
425            self.deduplicator.clear();
426        }
427        // Note: We don't reset current_message_id here - it's set by a separate method
428        // This allows for more flexible message ID handling
429    }
430
431    /// Set the current message ID for tracking.
432    ///
433    /// This should be called when processing a `MessageStart` event that contains
434    /// a message identifier. Used to prevent duplicate display of final messages.
435    ///
436    /// # Arguments
437    /// * `message_id` - The unique identifier for this message (or None to clear)
438    pub fn set_current_message_id(&mut self, message_id: Option<String>) {
439        self.current_message_id = message_id;
440    }
441
442    /// Get the current message ID.
443    ///
444    /// # Returns
445    /// * `Some(id)` - The current message ID
446    /// * `None` - No message ID is set
447    pub fn get_current_message_id(&self) -> Option<&str> {
448        self.current_message_id.as_deref()
449    }
450
451    /// Check if a message ID represents a duplicate final message.
452    ///
453    /// This prevents displaying the same message twice - once after streaming
454    /// completes and again when the final "Assistant" event arrives.
455    ///
456    /// # Arguments
457    /// * `message_id` - The message ID to check
458    ///
459    /// # Returns
460    /// * `true` - This message has already been displayed (is a duplicate)
461    /// * `false` - This is a new message
462    pub fn is_duplicate_final_message(&self, message_id: &str) -> bool {
463        self.displayed_final_messages.contains(message_id)
464    }
465
466    /// Mark a message as displayed to prevent duplicate display.
467    ///
468    /// This should be called after displaying a message's final content.
469    ///
470    /// # Arguments
471    /// * `message_id` - The message ID to mark as displayed
472    pub fn mark_message_displayed(&mut self, message_id: &str) {
473        self.displayed_final_messages.insert(message_id.to_string());
474        self.last_finalized_message_id = Some(message_id.to_string());
475    }
476
477    /// Mark the start of a content block.
478    ///
479    /// This should be called when:
480    /// - Claude: `ContentBlockStart` event
481    /// - Codex: `ItemStarted` with relevant type
482    /// - Gemini: Content section begins
483    /// - `OpenCode`: Part with content starts
484    ///
485    /// If we're already in a block, this method finalizes the previous block
486    /// by emitting a newline if output had started.
487    ///
488    /// # Arguments
489    /// * `index` - The content block index (for multi-block messages)
490    pub fn on_content_block_start(&mut self, index: u64) {
491        let index_str = index.to_string();
492
493        // Check if we're transitioning to a different index BEFORE finalizing.
494        // This is important because some agents (e.g., GLM) may send ContentBlockStart
495        // repeatedly for the same index, and we should NOT clear accumulated content
496        // in that case (which would cause the next delta to show prefix again).
497        let (is_same_index, old_index) = match &self.current_block {
498            ContentBlockState::NotInBlock => (false, None),
499            ContentBlockState::InBlock {
500                index: current_index,
501                ..
502            } => (current_index == &index_str, Some(current_index.clone())),
503        };
504
505        // Finalize previous block if we're in one
506        self.ensure_content_block_finalized();
507
508        // Only clear accumulated content if transitioning to a DIFFERENT index.
509        // We clear the OLD index's content, not the new one.
510        if !is_same_index {
511            if let Some(old) = old_index {
512                for content_type in [
513                    ContentType::Text,
514                    ContentType::Thinking,
515                    ContentType::ToolInput,
516                ] {
517                    let key = (content_type, old.clone());
518                    self.accumulated.remove(&key);
519                    self.key_order.retain(|k| k != &key);
520                    // Also clear output_started tracking to ensure prefix shows when switching back
521                    self.output_started_for_key.remove(&key);
522                    // Clear delta sizes for the old index to prevent incorrect pattern detection
523                    self.delta_sizes.remove(&key);
524                    self.last_rendered.remove(&key);
525                    // Clear consecutive duplicates for the old index
526                    self.consecutive_duplicates.remove(&key);
527                }
528            }
529        }
530
531        // Initialize the new content block
532        self.current_block = ContentBlockState::InBlock {
533            index: index_str,
534            started_output: false,
535        };
536    }
537
538    /// Ensure the current content block is finalized.
539    ///
540    /// If we're in a block and output has started, this returns true to indicate
541    /// that a newline should be emitted. This prevents "glued text" bugs where
542    /// content from different blocks is concatenated without separation.
543    ///
544    /// # Returns
545    /// * `true` - A newline should be emitted (output had started)
546    /// * `false` - No newline needed (no output or not in a block)
547    fn ensure_content_block_finalized(&mut self) -> bool {
548        if let ContentBlockState::InBlock { started_output, .. } = &self.current_block {
549            let had_output = *started_output;
550            self.current_block = ContentBlockState::NotInBlock;
551            had_output
552        } else {
553            false
554        }
555    }
556
557    /// Assert that the session is in a valid lifecycle state.
558    ///
559    /// In debug builds, this will panic if the current state doesn't match
560    /// any of the expected states. In release builds, this does nothing.
561    ///
562    /// # Arguments
563    /// * `expected` - Slice of acceptable states
564    fn assert_lifecycle_state(&self, expected: &[StreamingState]) {
565        #[cfg(debug_assertions)]
566        assert!(
567            expected.contains(&self.state),
568            "Invalid lifecycle state: expected {:?}, got {:?}. \
569            This indicates a bug in the parser's event handling.",
570            expected,
571            self.state
572        );
573        #[cfg(not(debug_assertions))]
574        let _ = expected;
575    }
576
577    /// Process a text delta and return whether prefix should be shown.
578    ///
579    /// # Arguments
580    /// * `index` - The content block index
581    /// * `delta` - The text delta to accumulate
582    ///
583    /// # Returns
584    /// * `true` - Show prefix with this delta (first chunk)
585    /// * `false` - Don't show prefix (subsequent chunks)
586    pub fn on_text_delta(&mut self, index: u64, delta: &str) -> bool {
587        self.on_text_delta_key(&index.to_string(), delta)
588    }
589
590    /// Check for consecutive duplicate delta using the "3 strikes" heuristic.
591    ///
592    /// Detects resend glitches where the exact same delta arrives repeatedly.
593    /// Returns true if the delta should be dropped (exceeded threshold), false otherwise.
594    ///
595    /// # Arguments
596    /// * `content_key` - The content key to check
597    /// * `delta` - The delta to check
598    /// * `key_str` - The string key for logging
599    ///
600    /// # Returns
601    /// * `true` - The delta should be dropped (consecutive duplicate exceeded threshold)
602    /// * `false` - The delta should be processed
603    fn check_consecutive_duplicate(
604        &mut self,
605        content_key: &(ContentType, String),
606        delta: &str,
607        key_str: &str,
608    ) -> bool {
609        let delta_hash = RollingHashWindow::compute_hash(delta);
610        let thresholds = get_overlap_thresholds();
611
612        if let Some((count, prev_hash)) = self.consecutive_duplicates.get_mut(content_key) {
613            if *prev_hash == delta_hash {
614                *count += 1;
615                // Check if we've exceeded the consecutive duplicate threshold
616                if *count >= thresholds.consecutive_duplicate_threshold {
617                    // This is a resend glitch - drop the delta entirely
618                    if self.verbose_warnings {
619                        eprintln!(
620                            "Warning: Dropping consecutive duplicate delta (count={count}, threshold={}). \
621                            This appears to be a resend glitch. Key: '{key_str}', Delta: {delta:?}",
622                            thresholds.consecutive_duplicate_threshold
623                        );
624                    }
625                    // Don't update last_delta - preserve previous for comparison
626                    return true;
627                }
628            } else {
629                // Different delta - reset count and update hash
630                *count = 1;
631                *prev_hash = delta_hash;
632            }
633        } else {
634            // First occurrence of this delta
635            self.consecutive_duplicates
636                .insert(content_key.clone(), (1, delta_hash));
637        }
638
639        false
640    }
641
642    /// Process a text delta with a string key and return whether prefix should be shown.
643    ///
644    /// This variant is for parsers that use string keys instead of numeric indices
645    /// (e.g., Codex uses `agent_msg`, `reasoning`; Gemini uses `main`; `OpenCode` uses `main`).
646    ///
647    /// # Delta Validation
648    ///
649    /// This method validates that incoming content appears to be a genuine delta
650    /// (small chunk) rather than a snapshot (full accumulated content). Large "deltas"
651    /// that exceed `snapshot_threshold()` trigger a warning as they may indicate a
652    /// contract violation.
653    ///
654    /// Additionally, we track patterns of delta sizes to detect repeated large
655    /// content being sent as if it were incremental (a common snapshot-as-delta bug).
656    ///
657    /// # Arguments
658    /// * `key` - The content key (e.g., `main`, `agent_msg`, `reasoning`)
659    /// * `delta` - The text delta to accumulate
660    ///
661    /// # Returns
662    /// * `true` - Show prefix with this delta (first chunk)
663    /// * `false` - Don't show prefix (subsequent chunks)
664    pub fn on_text_delta_key(&mut self, key: &str, delta: &str) -> bool {
665        // Lifecycle enforcement: deltas should only arrive during streaming
666        // or idle (first delta starts streaming), never after finalization
667        self.assert_lifecycle_state(&[StreamingState::Idle, StreamingState::Streaming]);
668
669        let content_key = (ContentType::Text, key.to_string());
670        let delta_size = delta.len();
671
672        // Track delta size and warn on large deltas BEFORE duplicate check
673        // This ensures we track all received deltas even if they're duplicates
674        if delta_size > snapshot_threshold() {
675            self.large_delta_count += 1;
676            if self.verbose_warnings {
677                eprintln!(
678                    "Warning: Large delta ({delta_size} chars) for key '{key}'. \
679                    This may indicate unusual streaming behavior or a snapshot being sent as a delta."
680                );
681            }
682        }
683
684        // Track delta size for pattern detection
685        {
686            let sizes = self.delta_sizes.entry(content_key.clone()).or_default();
687            sizes.push(delta_size);
688
689            // Keep only the most recent delta sizes
690            if sizes.len() > self.max_delta_history {
691                sizes.remove(0);
692            }
693        }
694
695        // Check for exact duplicate delta (same delta sent twice)
696        // This handles the ccs-glm repeated MessageStart scenario where the same
697        // delta is sent multiple times. We skip processing exact duplicates ONLY when
698        // the accumulated content is empty (indicating we just had a MessageStart and
699        // this is a true duplicate, not just a repeated token in normal streaming).
700        if let Some(last) = self.last_delta.get(&content_key) {
701            if delta == last {
702                // Check if accumulated content is empty (just after MessageStart)
703                if let Some(current_accumulated) = self.accumulated.get(&content_key) {
704                    // If accumulated content is empty, this is likely a ccs-glm duplicate
705                    if current_accumulated.is_empty() {
706                        // Skip without updating last_delta (to preserve previous delta for comparison)
707                        return false;
708                    }
709                } else {
710                    // No accumulated content yet, definitely after MessageStart
711                    // Skip without updating last_delta
712                    return false;
713                }
714            }
715        }
716
717        // Consecutive duplicate detection ("3 strikes" heuristic)
718        // Detects resend glitches where the exact same delta arrives repeatedly.
719        // This is different from the above check - it tracks HOW MANY TIMES
720        // the same delta has arrived consecutively, not just if it matches once.
721        if self.check_consecutive_duplicate(&content_key, delta, key) {
722            return false;
723        }
724
725        // Auto-repair: Check if this is a snapshot being sent as a delta
726        // Do this BEFORE any mutable borrows so we can use immutable methods.
727        // Use content-based detection which is more reliable than size-based alone.
728        let is_snapshot = self.is_likely_snapshot(delta, key);
729        let actual_delta = if is_snapshot {
730            // Extract only the new portion to prevent exponential duplication
731            match self.get_delta_from_snapshot(delta, key) {
732                Ok(extracted) => {
733                    // Track successful snapshot repair
734                    self.snapshot_repairs_count += 1;
735                    extracted.to_string()
736                }
737                Err(e) => {
738                    // Snapshot detection had a false positive - use the original delta
739                    if self.verbose_warnings {
740                        eprintln!(
741                            "Warning: Snapshot extraction failed: {e}. Using original delta."
742                        );
743                    }
744                    delta.to_string()
745                }
746            }
747        } else {
748            // Genuine delta - use as-is
749            delta.to_string()
750        };
751
752        // Pattern detection: Check if we're seeing repeated large deltas
753        // This indicates the same content is being sent repeatedly (snapshot-as-delta)
754        let sizes = self.delta_sizes.get(&content_key);
755        if let Some(sizes) = sizes {
756            if sizes.len() >= DEFAULT_PATTERN_DETECTION_MIN_DELTAS && self.verbose_warnings {
757                // Check if at least 3 of the last N deltas were large
758                let large_count = sizes.iter().filter(|&&s| s > snapshot_threshold()).count();
759                if large_count >= DEFAULT_PATTERN_DETECTION_MIN_DELTAS {
760                    eprintln!(
761                        "Warning: Detected pattern of {large_count} large deltas for key '{key}'. \
762                        This strongly suggests a snapshot-as-delta bug where the same \
763                        large content is being sent repeatedly. File: streaming_state.rs, Line: {}",
764                        line!()
765                    );
766                }
767            }
768        }
769
770        // If the actual delta is empty (identical content detected), skip processing
771        if actual_delta.is_empty() {
772            // Return false to indicate no prefix should be shown (content unchanged)
773            return false;
774        }
775
776        // Mark that we're streaming text content
777        self.streamed_types.insert(ContentType::Text, true);
778        self.state = StreamingState::Streaming;
779
780        // Update block state to track this block and mark output as started
781        self.current_block = ContentBlockState::InBlock {
782            index: key.to_string(),
783            started_output: true,
784        };
785
786        // Check if this is the first delta for this key using output_started_for_key
787        // This is independent of accumulated content to handle cases where accumulated
788        // content may be cleared (e.g., repeated ContentBlockStart for same index)
789        let is_first = !self.output_started_for_key.contains(&content_key);
790
791        // Mark that output has started for this key
792        self.output_started_for_key.insert(content_key.clone());
793
794        // Accumulate the delta (using auto-repaired delta if snapshot was detected)
795        self.accumulated
796            .entry(content_key.clone())
797            .and_modify(|buf| buf.push_str(&actual_delta))
798            .or_insert_with(|| actual_delta);
799
800        // Track the last delta for duplicate detection
801        // Use the original delta for tracking (not the auto-repaired version)
802        self.last_delta
803            .insert(content_key.clone(), delta.to_string());
804
805        // Track order
806        if is_first {
807            self.key_order.push(content_key);
808        }
809
810        // Show prefix only on the very first delta
811        is_first
812    }
813
814    /// Process a thinking delta and return whether prefix should be shown.
815    ///
816    /// # Arguments
817    /// * `index` - The content block index
818    /// * `delta` - The thinking delta to accumulate
819    ///
820    /// # Returns
821    /// * `true` - Show prefix with this delta (first chunk)
822    /// * `false` - Don't show prefix (subsequent chunks)
823    pub fn on_thinking_delta(&mut self, index: u64, delta: &str) -> bool {
824        self.on_thinking_delta_key(&index.to_string(), delta)
825    }
826
827    /// Process a thinking delta with a string key and return whether prefix should be shown.
828    ///
829    /// This variant is for parsers that use string keys instead of numeric indices.
830    ///
831    /// # Arguments
832    /// * `key` - The content key (e.g., "reasoning")
833    /// * `delta` - The thinking delta to accumulate
834    ///
835    /// # Returns
836    /// * `true` - Show prefix with this delta (first chunk)
837    /// * `false` - Don't show prefix (subsequent chunks)
838    pub fn on_thinking_delta_key(&mut self, key: &str, delta: &str) -> bool {
839        // Mark that we're streaming thinking content
840        self.streamed_types.insert(ContentType::Thinking, true);
841        self.state = StreamingState::Streaming;
842
843        // Get the key for this content
844        let content_key = (ContentType::Thinking, key.to_string());
845
846        // Check if this is the first delta for this key using output_started_for_key
847        let is_first = !self.output_started_for_key.contains(&content_key);
848
849        // Mark that output has started for this key
850        self.output_started_for_key.insert(content_key.clone());
851
852        // Accumulate the delta
853        self.accumulated
854            .entry(content_key.clone())
855            .and_modify(|buf| buf.push_str(delta))
856            .or_insert_with(|| delta.to_string());
857
858        // Track order
859        if is_first {
860            self.key_order.push(content_key);
861        }
862
863        is_first
864    }
865
866    /// Process a tool input delta.
867    ///
868    /// # Arguments
869    /// * `index` - The content block index
870    /// * `delta` - The tool input delta to accumulate
871    pub fn on_tool_input_delta(&mut self, index: u64, delta: &str) {
872        // Mark that we're streaming tool input
873        self.streamed_types.insert(ContentType::ToolInput, true);
874        self.state = StreamingState::Streaming;
875
876        // Get the key for this content
877        let key = (ContentType::ToolInput, index.to_string());
878
879        // Accumulate the delta
880        self.accumulated
881            .entry(key.clone())
882            .and_modify(|buf| buf.push_str(delta))
883            .or_insert_with(|| delta.to_string());
884
885        // Track order
886        if !self.key_order.contains(&key) {
887            self.key_order.push(key);
888        }
889    }
890
891    /// Finalize the message on stop event.
892    ///
893    /// This should be called when:
894    /// - Claude: `MessageStop` event
895    /// - Codex: `TurnCompleted` or `ItemCompleted` with text
896    /// - Gemini: Message completion
897    /// - `OpenCode`: Part completion
898    ///
899    /// # Returns
900    /// * `true` - A completion newline should be emitted (was in a content block)
901    /// * `false` - No completion needed (no content block active)
902    pub fn on_message_stop(&mut self) -> bool {
903        let was_in_block = self.ensure_content_block_finalized();
904        self.state = StreamingState::Finalized;
905
906        // Compute content hash for deduplication
907        self.final_content_hash = self.compute_content_hash();
908
909        // Mark the current message as displayed to prevent duplicate display
910        // when the final "Assistant" event arrives
911        if let Some(message_id) = self.current_message_id.clone() {
912            self.mark_message_displayed(&message_id);
913        }
914
915        was_in_block
916    }
917
918    /// Check if ANY content has been streamed for this message.
919    ///
920    /// This is a broader check that returns true if ANY content type
921    /// has been streamed. Used to skip entire message display when
922    /// all content was already streamed.
923    pub fn has_any_streamed_content(&self) -> bool {
924        !self.streamed_types.is_empty()
925    }
926
927    /// Compute hash of all accumulated content for deduplication.
928    ///
929    /// This computes a hash of ALL accumulated content across all content types
930    /// and indices. This is used to detect if a final message contains the same
931    /// content that was already streamed.
932    ///
933    /// # Returns
934    /// * `Some(hash)` - Hash of all accumulated content, or None if no content
935    fn compute_content_hash(&self) -> Option<u64> {
936        if self.accumulated.is_empty() {
937            return None;
938        }
939
940        let mut hasher = DefaultHasher::new();
941
942        // Collect and sort keys for consistent hashing
943        // Note: We can't sort ContentType directly, so we convert to tuple representation
944        let mut keys: Vec<_> = self.accumulated.keys().collect();
945        // Sort by string representation for consistency (not perfect but good enough for deduplication)
946        keys.sort_by_key(|k| format!("{:?}-{}", k.0, k.1));
947
948        for key in keys {
949            if let Some(content) = self.accumulated.get(key) {
950                // Hash the key and content together
951                format!("{:?}-{}", key.0, key.1).hash(&mut hasher);
952                content.hash(&mut hasher);
953            }
954        }
955
956        Some(hasher.finish())
957    }
958
959    /// Check if content matches the previously streamed content by hash.
960    ///
961    /// This is a more precise alternative to `has_any_streamed_content()` for
962    /// deduplication. Instead of checking if ANY content was streamed, this checks
963    /// if the EXACT content was streamed by comparing hashes.
964    ///
965    /// This method looks at ALL accumulated content across all content types and indices.
966    /// If the combined accumulated content matches the input, it returns true.
967    ///
968    /// # Arguments
969    /// * `content` - The content to check (typically text content from final message)
970    ///
971    /// # Returns
972    /// * `true` - The content hash matches the previously streamed content
973    /// * `false` - The content is different or no content was streamed
974    pub fn is_duplicate_by_hash(&self, content: &str) -> bool {
975        // Check if the input content matches ALL accumulated text content combined
976        // This handles the case where assistant events arrive during streaming (before message_stop)
977        // We collect and combine all text content in index order
978        let mut text_keys: Vec<_> = self
979            .accumulated
980            .keys()
981            .filter(|(ct, _)| *ct == ContentType::Text)
982            .collect();
983        text_keys.sort_by_key(|k| format!("{:?}-{}", k.0, k.1));
984
985        // Combine all accumulated text content in sorted order
986        let combined_content: String = text_keys
987            .iter()
988            .filter_map(|key| self.accumulated.get(key))
989            .cloned()
990            .collect();
991
992        // Direct string comparison is more reliable than hashing
993        // because hashing can have collisions and we want exact match
994        combined_content == content
995    }
996
997    /// Get accumulated content for a specific type and index.
998    ///
999    /// # Arguments
1000    /// * `content_type` - The type of content
1001    /// * `index` - The content index (as string for flexibility)
1002    ///
1003    /// # Returns
1004    /// * `Some(text)` - Accumulated content
1005    /// * `None` - No content accumulated for this key
1006    pub fn get_accumulated(&self, content_type: ContentType, index: &str) -> Option<&str> {
1007        self.accumulated
1008            .get(&(content_type, index.to_string()))
1009            .map(std::string::String::as_str)
1010    }
1011
1012    /// Mark content as having been rendered (HashMap-based tracking).
1013    ///
1014    /// This should be called after rendering to update the per-key tracking.
1015    ///
1016    /// # Arguments
1017    /// * `content_type` - The type of content
1018    /// * `index` - The content index (as string for flexibility)
1019    pub fn mark_rendered(&mut self, content_type: ContentType, index: &str) {
1020        let content_key = (content_type, index.to_string());
1021
1022        // Store the current accumulated content as last rendered
1023        if let Some(current) = self.accumulated.get(&content_key) {
1024            self.last_rendered.insert(content_key, current.clone());
1025        }
1026    }
1027
1028    /// Check if content has been rendered before using hash-based tracking.
1029    ///
1030    /// This provides global duplicate detection across all content by computing
1031    /// a hash of the accumulated content and checking if it's in the rendered set.
1032    /// This is preserved across `MessageStart` boundaries to prevent duplicate rendering.
1033    ///
1034    /// # Arguments
1035    /// * `content_type` - The type of content
1036    /// * `index` - The content index (as string for flexibility)
1037    ///
1038    /// # Returns
1039    /// * `true` - This exact content has been rendered before
1040    /// * `false` - This exact content has not been rendered
1041    #[cfg(test)]
1042    pub fn is_content_rendered(&self, content_type: ContentType, index: &str) -> bool {
1043        let content_key = (content_type, index.to_string());
1044
1045        // Check if we have accumulated content for this key
1046        if let Some(current) = self.accumulated.get(&content_key) {
1047            // Compute hash of current accumulated content
1048            let mut hasher = DefaultHasher::new();
1049            current.hash(&mut hasher);
1050            let hash = hasher.finish();
1051
1052            // Check if this hash has been rendered before
1053            return self.rendered_content_hashes.contains(&hash);
1054        }
1055
1056        false
1057    }
1058
1059    /// Check if content has been rendered before and starts with previously rendered content.
1060    ///
1061    /// This method detects when new content extends previously rendered content,
1062    /// indicating an in-place update should be performed (e.g., using carriage return).
1063    ///
1064    /// With the new KMP + Rolling Hash approach, this checks if output has started
1065    /// for this key, which indicates we're in an in-place update scenario.
1066    ///
1067    /// # Arguments
1068    /// * `content_type` - The type of content
1069    /// * `index` - The content index (as string for flexibility)
1070    ///
1071    /// # Returns
1072    /// * `true` - Output has started for this key (do in-place update)
1073    /// * `false` - Output has not started for this key (show new content)
1074    pub fn has_rendered_prefix(&self, content_type: ContentType, index: &str) -> bool {
1075        let content_key = (content_type, index.to_string());
1076        self.output_started_for_key.contains(&content_key)
1077    }
1078
1079    /// Mark content as rendered using hash-based tracking.
1080    ///
1081    /// This method updates the `rendered_content_hashes` set to track all
1082    /// content that has been rendered for deduplication.
1083    ///
1084    /// # Arguments
1085    /// * `content_type` - The type of content
1086    /// * `index` - The content index (as string for flexibility)
1087    #[cfg(test)]
1088    pub fn mark_content_rendered(&mut self, content_type: ContentType, index: &str) {
1089        // Also update last_rendered for compatibility
1090        self.mark_rendered(content_type, index);
1091
1092        // Add the hash of the accumulated content to the rendered set
1093        let content_key = (content_type, index.to_string());
1094        if let Some(current) = self.accumulated.get(&content_key) {
1095            let mut hasher = DefaultHasher::new();
1096            current.hash(&mut hasher);
1097            let hash = hasher.finish();
1098            self.rendered_content_hashes.insert(hash);
1099        }
1100    }
1101
1102    /// Mark content as rendered using pre-sanitized content.
1103    ///
1104    /// This method uses the sanitized content (with whitespace normalized)
1105    /// for hash-based deduplication, which prevents duplicates when the
1106    /// accumulated content differs only by whitespace.
1107    ///
1108    /// # Arguments
1109    /// * `content_type` - The type of content
1110    /// * `index` - The content index (as string for flexibility)
1111    /// * `content` - The content to hash
1112    pub fn mark_content_hash_rendered(
1113        &mut self,
1114        content_type: ContentType,
1115        index: &str,
1116        content: &str,
1117    ) {
1118        // Also update last_rendered for compatibility
1119        self.mark_rendered(content_type, index);
1120
1121        // Add the hash of the content to the rendered set
1122        let mut hasher = DefaultHasher::new();
1123        content.hash(&mut hasher);
1124        let hash = hasher.finish();
1125        self.rendered_content_hashes.insert(hash);
1126    }
1127
1128    /// Check if sanitized content has already been rendered.
1129    ///
1130    /// This method checks the hash of the sanitized content against the
1131    /// rendered set to prevent duplicate rendering.
1132    ///
1133    /// # Arguments
1134    /// * `_content_type` - The type of content (kept for API consistency)
1135    /// * `_index` - The content index (kept for API consistency)
1136    /// * `sanitized_content` - The sanitized content to check
1137    ///
1138    /// # Returns
1139    /// * `true` - This exact content has been rendered before
1140    /// * `false` - This exact content has not been rendered
1141    pub fn is_content_hash_rendered(
1142        &self,
1143        _content_type: ContentType,
1144        _index: &str,
1145        content: &str,
1146    ) -> bool {
1147        // Compute hash of exact content
1148        let mut hasher = DefaultHasher::new();
1149        content.hash(&mut hasher);
1150        let hash = hasher.finish();
1151
1152        // Check if this hash has been rendered before
1153        self.rendered_content_hashes.contains(&hash)
1154    }
1155
1156    /// Check if incoming text is likely a snapshot (full accumulated content) rather than a delta.
1157    ///
1158    /// This uses the KMP + Rolling Hash algorithm for efficient O(n+m) snapshot detection.
1159    /// The two-phase approach ensures optimal performance:
1160    /// 1. Rolling hash for fast O(n) filtering
1161    /// 2. KMP for exact O(n+m) verification
1162    ///
1163    /// # Arguments
1164    /// * `text` - The incoming text to check
1165    /// * `key` - The content key to compare against
1166    ///
1167    /// # Returns
1168    /// * `true` - The text appears to be a snapshot (starts with previous accumulated content)
1169    /// * `false` - The text appears to be a genuine delta
1170    pub fn is_likely_snapshot(&self, text: &str, key: &str) -> bool {
1171        let content_key = (ContentType::Text, key.to_string());
1172
1173        // Check if we have accumulated content for this key
1174        if let Some(previous) = self.accumulated.get(&content_key) {
1175            // Use DeltaDeduplicator with threshold-aware snapshot detection
1176            // This prevents false positives by requiring strong overlap (>=30 chars, >=50% ratio)
1177            return DeltaDeduplicator::is_likely_snapshot_with_thresholds(text, previous);
1178        }
1179
1180        false
1181    }
1182
1183    /// Extract the delta portion from a snapshot.
1184    ///
1185    /// When a snapshot is detected (full accumulated content sent as a "delta"),
1186    /// this method extracts only the new portion that hasn't been accumulated yet.
1187    ///
1188    /// # Arguments
1189    /// * `text` - The snapshot text (full accumulated content + new content)
1190    /// * `key` - The content key to compare against
1191    ///
1192    /// # Returns
1193    /// * `Ok(usize)` - The length of the delta portion (new content only)
1194    /// * `Err` - If the text is not actually a snapshot (doesn't start with accumulated content)
1195    ///
1196    /// # Note
1197    /// Returns the length of the delta portion as `usize` since we can't return
1198    /// a reference to `text` with the correct lifetime. Callers can slice `text`
1199    /// themselves using `&text[delta_len..]`.
1200    pub fn extract_delta_from_snapshot(&self, text: &str, key: &str) -> Result<usize, String> {
1201        let content_key = (ContentType::Text, key.to_string());
1202
1203        if let Some(previous) = self.accumulated.get(&content_key) {
1204            // Use DeltaDeduplicator with threshold-aware delta extraction
1205            // This ensures we only extract when overlap meets strong criteria
1206            if let Some(new_content) =
1207                DeltaDeduplicator::extract_new_content_with_thresholds(text, previous)
1208            {
1209                // Calculate the position where new content starts
1210                let delta_start = text.len() - new_content.len();
1211                return Ok(delta_start);
1212            }
1213        }
1214
1215        // If we get here, the text wasn't actually a snapshot
1216        // This could indicate a false positive from is_likely_snapshot
1217        Err(format!(
1218            "extract_delta_from_snapshot called on non-snapshot text. \
1219            key={key:?}, text={text:?}. Snapshot detection may have had a false positive."
1220        ))
1221    }
1222
1223    /// Get the delta portion as a string slice from a snapshot.
1224    ///
1225    /// This is a convenience wrapper that returns the actual substring
1226    /// instead of just the length.
1227    ///
1228    /// # Returns
1229    /// * `Ok(&str)` - The delta portion (new content only)
1230    /// * `Err` - If the text is not actually a snapshot
1231    pub fn get_delta_from_snapshot<'a>(&self, text: &'a str, key: &str) -> Result<&'a str, String> {
1232        let delta_len = self.extract_delta_from_snapshot(text, key)?;
1233        Ok(&text[delta_len..])
1234    }
1235
1236    /// Get streaming quality metrics for the current session.
1237    ///
1238    /// Returns aggregated metrics about delta sizes and streaming patterns
1239    /// during the session. This is useful for debugging and analyzing
1240    /// streaming behavior.
1241    ///
1242    /// # Returns
1243    /// Aggregated metrics across all content types and keys.
1244    pub fn get_streaming_quality_metrics(&self) -> StreamingQualityMetrics {
1245        // Flatten all delta sizes across all content types and keys
1246        let all_sizes = self.delta_sizes.values().flat_map(|v| v.iter().copied());
1247        let mut metrics = StreamingQualityMetrics::from_sizes(all_sizes);
1248
1249        // Add session-level metrics
1250        metrics.snapshot_repairs_count = self.snapshot_repairs_count;
1251        metrics.large_delta_count = self.large_delta_count;
1252        metrics.protocol_violations = self.protocol_violations;
1253
1254        metrics
1255    }
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260    use super::*;
1261
1262    #[test]
1263    fn test_session_lifecycle() {
1264        let mut session = StreamingSession::new();
1265
1266        // Initially no content streamed
1267        assert!(!session.has_any_streamed_content());
1268
1269        // Message start
1270        session.on_message_start();
1271        assert!(!session.has_any_streamed_content());
1272
1273        // Text delta
1274        let show_prefix = session.on_text_delta(0, "Hello");
1275        assert!(show_prefix);
1276        assert!(session.has_any_streamed_content());
1277
1278        // Another delta
1279        let show_prefix = session.on_text_delta(0, " World");
1280        assert!(!show_prefix);
1281
1282        // Message stop
1283        let was_in_block = session.on_message_stop();
1284        assert!(was_in_block);
1285    }
1286
1287    #[test]
1288    fn test_accumulated_content() {
1289        let mut session = StreamingSession::new();
1290        session.on_message_start();
1291
1292        session.on_text_delta(0, "Hello");
1293        session.on_text_delta(0, " World");
1294
1295        let accumulated = session.get_accumulated(ContentType::Text, "0");
1296        assert_eq!(accumulated, Some("Hello World"));
1297    }
1298
1299    #[test]
1300    fn test_reset_between_messages() {
1301        let mut session = StreamingSession::new();
1302
1303        // First message
1304        session.on_message_start();
1305        session.on_text_delta(0, "First");
1306        assert!(session.has_any_streamed_content());
1307        session.on_message_stop();
1308
1309        // Second message - state should be reset
1310        session.on_message_start();
1311        assert!(!session.has_any_streamed_content());
1312    }
1313
1314    #[test]
1315    fn test_multiple_indices() {
1316        let mut session = StreamingSession::new();
1317        session.on_message_start();
1318
1319        session.on_text_delta(0, "First block");
1320        session.on_text_delta(1, "Second block");
1321
1322        assert_eq!(
1323            session.get_accumulated(ContentType::Text, "0"),
1324            Some("First block")
1325        );
1326        assert_eq!(
1327            session.get_accumulated(ContentType::Text, "1"),
1328            Some("Second block")
1329        );
1330    }
1331
1332    #[test]
1333    fn test_clear_index() {
1334        // Behavioral test: verify that creating a new session gives clean state
1335        // instead of testing clear_index() which is now removed
1336        let mut session = StreamingSession::new();
1337        session.on_message_start();
1338
1339        session.on_text_delta(0, "Before");
1340        // Instead of clearing, verify that a new session starts fresh
1341        let mut fresh_session = StreamingSession::new();
1342        fresh_session.on_message_start();
1343        fresh_session.on_text_delta(0, "After");
1344
1345        assert_eq!(
1346            fresh_session.get_accumulated(ContentType::Text, "0"),
1347            Some("After")
1348        );
1349        // Original session should still have "Before"
1350        assert_eq!(
1351            session.get_accumulated(ContentType::Text, "0"),
1352            Some("Before")
1353        );
1354    }
1355
1356    #[test]
1357    fn test_delta_validation_warns_on_large_delta() {
1358        let mut session = StreamingSession::new();
1359        session.on_message_start();
1360
1361        // Create a delta larger than snapshot_threshold()
1362        let large_delta = "x".repeat(snapshot_threshold() + 1);
1363
1364        // This should trigger a warning but still work
1365        let show_prefix = session.on_text_delta(0, &large_delta);
1366        assert!(show_prefix);
1367
1368        // Content should still be accumulated correctly
1369        assert_eq!(
1370            session.get_accumulated(ContentType::Text, "0"),
1371            Some(large_delta.as_str())
1372        );
1373    }
1374
1375    #[test]
1376    fn test_delta_validation_no_warning_for_small_delta() {
1377        let mut session = StreamingSession::new();
1378        session.on_message_start();
1379
1380        // Small delta should not trigger warning
1381        let small_delta = "Hello, world!";
1382        let show_prefix = session.on_text_delta(0, small_delta);
1383        assert!(show_prefix);
1384
1385        assert_eq!(
1386            session.get_accumulated(ContentType::Text, "0"),
1387            Some(small_delta)
1388        );
1389    }
1390
1391    // Tests for snapshot-as-delta detection methods
1392
1393    #[test]
1394    fn test_is_likely_snapshot_detects_snapshot() {
1395        let mut session = StreamingSession::new();
1396        session.on_message_start();
1397        session.on_content_block_start(0);
1398
1399        // First delta is long enough to meet threshold requirements
1400        // Using 40 chars to ensure it exceeds the 30 char minimum
1401        let initial = "This is a long message that exceeds threshold";
1402        session.on_text_delta(0, initial);
1403
1404        // Simulate GLM sending full accumulated content as next "delta"
1405        // The overlap is 45 chars (100% of initial), meeting both thresholds:
1406        // - char_count = 45 >= 30 ✓
1407        // - ratio = 45/48 ≈ 94% >= 50% ✓
1408        // - ends at safe boundary (space) ✓
1409        let snapshot = format!("{initial} plus new content");
1410        let is_snapshot = session.is_likely_snapshot(&snapshot, "0");
1411        assert!(
1412            is_snapshot,
1413            "Should detect snapshot-as-delta with strong overlap"
1414        );
1415    }
1416
1417    #[test]
1418    fn test_is_likely_snapshot_returns_false_for_genuine_delta() {
1419        let mut session = StreamingSession::new();
1420        session.on_message_start();
1421        session.on_content_block_start(0);
1422
1423        // First delta is "Hello"
1424        session.on_text_delta(0, "Hello");
1425
1426        // Genuine delta " World" doesn't start with previous content
1427        let is_snapshot = session.is_likely_snapshot(" World", "0");
1428        assert!(
1429            !is_snapshot,
1430            "Genuine delta should not be flagged as snapshot"
1431        );
1432    }
1433
1434    #[test]
1435    fn test_is_likely_snapshot_returns_false_when_no_previous_content() {
1436        let mut session = StreamingSession::new();
1437        session.on_message_start();
1438        session.on_content_block_start(0);
1439
1440        // No previous content, so anything is a genuine first delta
1441        let is_snapshot = session.is_likely_snapshot("Hello", "0");
1442        assert!(
1443            !is_snapshot,
1444            "First delta should not be flagged as snapshot"
1445        );
1446    }
1447
1448    #[test]
1449    fn test_extract_delta_from_snapshot() {
1450        let mut session = StreamingSession::new();
1451        session.on_message_start();
1452        session.on_content_block_start(0);
1453
1454        // First delta is long enough to meet threshold requirements
1455        let initial = "This is a long message that exceeds threshold";
1456        session.on_text_delta(0, initial);
1457
1458        // Snapshot should extract new portion
1459        let snapshot = format!("{initial} plus new content");
1460        let delta = session.get_delta_from_snapshot(&snapshot, "0").unwrap();
1461        assert_eq!(delta, " plus new content");
1462    }
1463
1464    #[test]
1465    fn test_extract_delta_from_snapshot_empty_delta() {
1466        let mut session = StreamingSession::new();
1467        session.on_message_start();
1468        session.on_content_block_start(0);
1469
1470        // First delta is "Hello"
1471        session.on_text_delta(0, "Hello");
1472
1473        // Snapshot "Hello" (identical to previous) should extract "" as delta
1474        let delta = session.get_delta_from_snapshot("Hello", "0").unwrap();
1475        assert_eq!(delta, "");
1476    }
1477
1478    #[test]
1479    fn test_extract_delta_from_snapshot_returns_error_on_non_snapshot() {
1480        let mut session = StreamingSession::new();
1481        session.on_message_start();
1482        session.on_content_block_start(0);
1483
1484        // First delta is "Hello"
1485        session.on_text_delta(0, "Hello");
1486
1487        // Calling on non-snapshot should return error (not panic)
1488        let result = session.get_delta_from_snapshot("World", "0");
1489        assert!(result.is_err());
1490        assert!(result
1491            .unwrap_err()
1492            .contains("extract_delta_from_snapshot called on non-snapshot text"));
1493    }
1494
1495    #[test]
1496    fn test_snapshot_detection_with_string_keys() {
1497        let mut session = StreamingSession::new();
1498        session.on_message_start();
1499
1500        // Test with string keys (like Codex/Gemini use)
1501        // Use content long enough to meet threshold requirements
1502        let initial = "This is a long message that exceeds threshold";
1503        session.on_text_delta_key("main", initial);
1504
1505        // Should detect snapshot for string key with strong overlap
1506        let snapshot = format!("{initial} plus new content");
1507        let is_snapshot = session.is_likely_snapshot(&snapshot, "main");
1508        assert!(
1509            is_snapshot,
1510            "Should detect snapshot with string keys when thresholds are met"
1511        );
1512
1513        // Should extract delta correctly
1514        let delta = session.get_delta_from_snapshot(&snapshot, "main").unwrap();
1515        assert_eq!(delta, " plus new content");
1516    }
1517
1518    #[test]
1519    fn test_snapshot_extraction_exact_match() {
1520        let mut session = StreamingSession::new();
1521        session.on_message_start();
1522
1523        // First delta is long enough to meet threshold requirements
1524        let initial = "This is a long message that exceeds threshold";
1525        session.on_text_delta(0, initial);
1526
1527        // Exact match with additional content (strong overlap)
1528        let exact_match = format!("{initial} World");
1529        let delta1 = session.get_delta_from_snapshot(&exact_match, "0").unwrap();
1530        assert_eq!(delta1, " World");
1531    }
1532
1533    #[test]
1534    fn test_token_by_token_streaming_scenario() {
1535        let mut session = StreamingSession::new();
1536        session.on_message_start();
1537
1538        // Simulate token-by-token streaming
1539        let tokens = ["H", "e", "l", "l", "o", " ", "W", "o", "r", "l", "d", "!"];
1540
1541        for token in tokens {
1542            let show_prefix = session.on_text_delta(0, token);
1543
1544            // Only first token should show prefix
1545            if token == "H" {
1546                assert!(show_prefix, "First token should show prefix");
1547            } else {
1548                assert!(!show_prefix, "Subsequent tokens should not show prefix");
1549            }
1550        }
1551
1552        // Verify accumulated content
1553        assert_eq!(
1554            session.get_accumulated(ContentType::Text, "0"),
1555            Some("Hello World!")
1556        );
1557    }
1558
1559    #[test]
1560    fn test_snapshot_in_token_stream() {
1561        let mut session = StreamingSession::new();
1562        session.on_message_start();
1563
1564        // First few tokens as genuine deltas - use longer content to meet thresholds
1565        let initial = "This is a long message that exceeds threshold";
1566        session.on_text_delta(0, initial);
1567        session.on_text_delta(0, " with more content");
1568
1569        // Now GLM sends a snapshot instead of delta
1570        // The accumulated content plus new content should meet thresholds:
1571        // Accumulated: "This is a long message that exceeds threshold with more content" (62 chars)
1572        // Snapshot: accumulated + "! This is additional content" (88 chars total)
1573        // Overlap: 62 chars
1574        // Ratio: 62/88 ≈ 70% >= 50% ✓
1575        let accumulated = session
1576            .get_accumulated(ContentType::Text, "0")
1577            .unwrap()
1578            .to_string();
1579        let snapshot = format!("{accumulated}! This is additional content");
1580        assert!(
1581            session.is_likely_snapshot(&snapshot, "0"),
1582            "Should detect snapshot in token stream with strong overlap"
1583        );
1584
1585        // Extract delta and continue
1586        let delta = session.get_delta_from_snapshot(&snapshot, "0").unwrap();
1587        assert!(delta.contains("! This is additional content"));
1588
1589        // Apply the delta
1590        session.on_text_delta(0, delta);
1591
1592        // Verify final accumulated content
1593        let expected = format!("{accumulated}! This is additional content");
1594        assert_eq!(
1595            session.get_accumulated(ContentType::Text, "0"),
1596            Some(expected.as_str())
1597        );
1598    }
1599
1600    // Tests for message identity tracking
1601
1602    #[test]
1603    fn test_set_and_get_current_message_id() {
1604        let mut session = StreamingSession::new();
1605
1606        // Initially no message ID
1607        assert!(session.get_current_message_id().is_none());
1608
1609        // Set a message ID
1610        session.set_current_message_id(Some("msg-123".to_string()));
1611        assert_eq!(session.get_current_message_id(), Some("msg-123"));
1612
1613        // Clear the message ID
1614        session.set_current_message_id(None);
1615        assert!(session.get_current_message_id().is_none());
1616    }
1617
1618    #[test]
1619    fn test_mark_message_displayed() {
1620        let mut session = StreamingSession::new();
1621
1622        // Initially not marked as displayed
1623        assert!(!session.is_duplicate_final_message("msg-123"));
1624
1625        // Mark as displayed
1626        session.mark_message_displayed("msg-123");
1627        assert!(session.is_duplicate_final_message("msg-123"));
1628
1629        // Different message ID is not a duplicate
1630        assert!(!session.is_duplicate_final_message("msg-456"));
1631    }
1632
1633    #[test]
1634    fn test_message_stop_marks_displayed() {
1635        let mut session = StreamingSession::new();
1636
1637        // Set a message ID
1638        session.set_current_message_id(Some("msg-123".to_string()));
1639
1640        // Start a message with content
1641        session.on_message_start();
1642        session.on_text_delta(0, "Hello");
1643
1644        // Stop should mark as displayed
1645        session.on_message_stop();
1646        assert!(session.is_duplicate_final_message("msg-123"));
1647    }
1648
1649    #[test]
1650    fn test_multiple_messages_tracking() {
1651        let mut session = StreamingSession::new();
1652
1653        // First message
1654        session.set_current_message_id(Some("msg-1".to_string()));
1655        session.on_message_start();
1656        session.on_text_delta(0, "First");
1657        session.on_message_stop();
1658        assert!(session.is_duplicate_final_message("msg-1"));
1659
1660        // Second message
1661        session.set_current_message_id(Some("msg-2".to_string()));
1662        session.on_message_start();
1663        session.on_text_delta(0, "Second");
1664        session.on_message_stop();
1665        assert!(session.is_duplicate_final_message("msg-1"));
1666        assert!(session.is_duplicate_final_message("msg-2"));
1667    }
1668
1669    // Tests for repeated MessageStart handling (GLM/ccs-glm protocol quirk)
1670
1671    #[test]
1672    fn test_repeated_message_start_preserves_output_started() {
1673        let mut session = StreamingSession::new();
1674
1675        // First message start
1676        session.on_message_start();
1677
1678        // First delta should show prefix
1679        let show_prefix = session.on_text_delta(0, "Hello");
1680        assert!(show_prefix, "First delta should show prefix");
1681
1682        // Second delta should NOT show prefix
1683        let show_prefix = session.on_text_delta(0, " World");
1684        assert!(!show_prefix, "Second delta should not show prefix");
1685
1686        // Simulate GLM sending repeated MessageStart during streaming
1687        // This should preserve output_started_for_key to prevent prefix spam
1688        session.on_message_start();
1689
1690        // After repeated MessageStart, delta should NOT show prefix
1691        // because output_started_for_key was preserved
1692        let show_prefix = session.on_text_delta(0, "!");
1693        assert!(
1694            !show_prefix,
1695            "After repeated MessageStart, delta should not show prefix"
1696        );
1697
1698        // Verify accumulated content was cleared (as expected for mid-stream restart)
1699        assert_eq!(
1700            session.get_accumulated(ContentType::Text, "0"),
1701            Some("!"),
1702            "Accumulated content should start fresh after repeated MessageStart"
1703        );
1704    }
1705
1706    #[test]
1707    fn test_repeated_message_start_with_normal_reset_between_messages() {
1708        let mut session = StreamingSession::new();
1709
1710        // First message
1711        session.on_message_start();
1712        session.on_text_delta(0, "First");
1713        session.on_message_stop();
1714
1715        // Second message - normal reset should clear output_started_for_key
1716        session.on_message_start();
1717
1718        // First delta of second message SHOULD show prefix
1719        let show_prefix = session.on_text_delta(0, "Second");
1720        assert!(
1721            show_prefix,
1722            "First delta of new message should show prefix after normal reset"
1723        );
1724    }
1725
1726    #[test]
1727    fn test_repeated_message_start_with_multiple_indices() {
1728        let mut session = StreamingSession::new();
1729
1730        // First message start
1731        session.on_message_start();
1732
1733        // First delta for index 0
1734        let show_prefix = session.on_text_delta(0, "Index0");
1735        assert!(show_prefix, "First delta for index 0 should show prefix");
1736
1737        // First delta for index 1
1738        let show_prefix = session.on_text_delta(1, "Index1");
1739        assert!(show_prefix, "First delta for index 1 should show prefix");
1740
1741        // Simulate repeated MessageStart
1742        session.on_message_start();
1743
1744        // After repeated MessageStart, deltas should NOT show prefix
1745        // because output_started_for_key was preserved for both indices
1746        let show_prefix = session.on_text_delta(0, " more");
1747        assert!(
1748            !show_prefix,
1749            "Delta for index 0 should not show prefix after repeated MessageStart"
1750        );
1751
1752        let show_prefix = session.on_text_delta(1, " more");
1753        assert!(
1754            !show_prefix,
1755            "Delta for index 1 should not show prefix after repeated MessageStart"
1756        );
1757    }
1758
1759    #[test]
1760    fn test_repeated_message_start_during_thinking_stream() {
1761        let mut session = StreamingSession::new();
1762
1763        // First message start
1764        session.on_message_start();
1765
1766        // First thinking delta should show prefix
1767        let show_prefix = session.on_thinking_delta(0, "Thinking...");
1768        assert!(show_prefix, "First thinking delta should show prefix");
1769
1770        // Simulate repeated MessageStart
1771        session.on_message_start();
1772
1773        // After repeated MessageStart, thinking delta should NOT show prefix
1774        let show_prefix = session.on_thinking_delta(0, " more");
1775        assert!(
1776            !show_prefix,
1777            "Thinking delta after repeated MessageStart should not show prefix"
1778        );
1779    }
1780
1781    #[test]
1782    fn test_message_stop_then_message_start_resets_normally() {
1783        let mut session = StreamingSession::new();
1784
1785        // First message
1786        session.on_message_start();
1787        session.on_text_delta(0, "First");
1788
1789        // Message stop finalizes the message
1790        session.on_message_stop();
1791
1792        // New message start should reset normally (not preserve output_started)
1793        session.on_message_start();
1794
1795        // First delta of new message SHOULD show prefix
1796        let show_prefix = session.on_text_delta(0, "Second");
1797        assert!(
1798            show_prefix,
1799            "First delta after MessageStop should show prefix (normal reset)"
1800        );
1801    }
1802
1803    #[test]
1804    fn test_repeated_content_block_start_same_index() {
1805        let mut session = StreamingSession::new();
1806
1807        // Message start
1808        session.on_message_start();
1809
1810        // First delta for index 0
1811        let show_prefix = session.on_text_delta(0, "Hello");
1812        assert!(show_prefix, "First delta should show prefix");
1813
1814        // Simulate repeated ContentBlockStart for same index
1815        // (Some agents send this, and we should NOT clear accumulated content)
1816        session.on_content_block_start(0);
1817
1818        // Delta after repeated ContentBlockStart should NOT show prefix
1819        let show_prefix = session.on_text_delta(0, " World");
1820        assert!(
1821            !show_prefix,
1822            "Delta after repeated ContentBlockStart should not show prefix"
1823        );
1824
1825        // Verify accumulated content was preserved
1826        assert_eq!(
1827            session.get_accumulated(ContentType::Text, "0"),
1828            Some("Hello World"),
1829            "Accumulated content should be preserved across repeated ContentBlockStart"
1830        );
1831    }
1832
1833    // Tests for verbose_warnings feature
1834
1835    #[test]
1836    fn test_verbose_warnings_default_is_disabled() {
1837        let session = StreamingSession::new();
1838        assert!(
1839            !session.verbose_warnings,
1840            "Default should have verbose_warnings disabled"
1841        );
1842    }
1843
1844    #[test]
1845    fn test_with_verbose_warnings_enables_flag() {
1846        let session = StreamingSession::new().with_verbose_warnings(true);
1847        assert!(
1848            session.verbose_warnings,
1849            "Should have verbose_warnings enabled"
1850        );
1851    }
1852
1853    #[test]
1854    fn test_with_verbose_warnings_disabled_explicitly() {
1855        let session = StreamingSession::new().with_verbose_warnings(false);
1856        assert!(
1857            !session.verbose_warnings,
1858            "Should have verbose_warnings disabled"
1859        );
1860    }
1861
1862    #[test]
1863    fn test_large_delta_warning_respects_verbose_flag() {
1864        // Test with verbose warnings enabled
1865        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
1866        session_verbose.on_message_start();
1867
1868        let large_delta = "x".repeat(snapshot_threshold() + 1);
1869        // This would emit a warning to stderr if verbose_warnings is enabled
1870        let _show_prefix = session_verbose.on_text_delta(0, &large_delta);
1871
1872        // Test with verbose warnings disabled (default)
1873        let mut session_quiet = StreamingSession::new();
1874        session_quiet.on_message_start();
1875
1876        let large_delta = "x".repeat(snapshot_threshold() + 1);
1877        // This should NOT emit a warning
1878        let _show_prefix = session_quiet.on_text_delta(0, &large_delta);
1879
1880        // Both sessions should accumulate content correctly
1881        assert_eq!(
1882            session_verbose.get_accumulated(ContentType::Text, "0"),
1883            Some(large_delta.as_str())
1884        );
1885        assert_eq!(
1886            session_quiet.get_accumulated(ContentType::Text, "0"),
1887            Some(large_delta.as_str())
1888        );
1889    }
1890
1891    #[test]
1892    fn test_repeated_message_start_warning_respects_verbose_flag() {
1893        // Test with verbose warnings enabled
1894        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
1895        session_verbose.on_message_start();
1896        session_verbose.on_text_delta(0, "Hello");
1897        // This would emit a warning about repeated MessageStart
1898        session_verbose.on_message_start();
1899
1900        // Test with verbose warnings disabled (default)
1901        let mut session_quiet = StreamingSession::new();
1902        session_quiet.on_message_start();
1903        session_quiet.on_text_delta(0, "Hello");
1904        // This should NOT emit a warning
1905        session_quiet.on_message_start();
1906
1907        // Both sessions should handle the restart correctly
1908        assert_eq!(
1909            session_verbose.get_accumulated(ContentType::Text, "0"),
1910            None,
1911            "Accumulated content should be cleared after repeated MessageStart"
1912        );
1913        assert_eq!(
1914            session_quiet.get_accumulated(ContentType::Text, "0"),
1915            None,
1916            "Accumulated content should be cleared after repeated MessageStart"
1917        );
1918    }
1919
1920    #[test]
1921    fn test_pattern_detection_warning_respects_verbose_flag() {
1922        // Test with verbose warnings enabled
1923        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
1924        session_verbose.on_message_start();
1925
1926        // Send 3 large deltas to trigger pattern detection
1927        // Use different content to avoid consecutive duplicate detection
1928        for i in 0..3 {
1929            let large_delta = format!("{}{i}", "x".repeat(snapshot_threshold() + 1));
1930            let _ = session_verbose.on_text_delta(0, &large_delta);
1931        }
1932
1933        // Test with verbose warnings disabled (default)
1934        let mut session_quiet = StreamingSession::new();
1935        session_quiet.on_message_start();
1936
1937        // Send 3 large deltas (different content to avoid consecutive duplicate detection)
1938        for i in 0..3 {
1939            let large_delta = format!("{}{i}", "x".repeat(snapshot_threshold() + 1));
1940            let _ = session_quiet.on_text_delta(0, &large_delta);
1941        }
1942
1943        // Verify that large_delta_count still tracks all 3 large deltas for both sessions
1944        assert_eq!(
1945            session_verbose
1946                .get_streaming_quality_metrics()
1947                .large_delta_count,
1948            3
1949        );
1950        assert_eq!(
1951            session_quiet
1952                .get_streaming_quality_metrics()
1953                .large_delta_count,
1954            3
1955        );
1956    }
1957
1958    #[test]
1959    fn test_snapshot_extraction_error_warning_respects_verbose_flag() {
1960        // Create a session where we'll trigger a snapshot extraction error
1961        // by manually manipulating accumulated content
1962        let mut session_verbose = StreamingSession::new().with_verbose_warnings(true);
1963        session_verbose.on_message_start();
1964        session_verbose.on_content_block_start(0);
1965
1966        // First delta
1967        session_verbose.on_text_delta(0, "Hello");
1968
1969        // Manually clear accumulated to simulate a state mismatch
1970        session_verbose.accumulated.clear();
1971
1972        // Now try to process a snapshot - extraction will fail
1973        // This would emit a warning if verbose_warnings is enabled
1974        let _show_prefix = session_verbose.on_text_delta(0, "Hello World");
1975
1976        // Test with verbose warnings disabled (default)
1977        let mut session_quiet = StreamingSession::new();
1978        session_quiet.on_message_start();
1979        session_quiet.on_content_block_start(0);
1980
1981        session_quiet.on_text_delta(0, "Hello");
1982        session_quiet.accumulated.clear();
1983
1984        // This should NOT emit a warning
1985        let _show_prefix = session_quiet.on_text_delta(0, "Hello World");
1986
1987        // The quiet session should handle the error gracefully
1988        assert!(session_quiet
1989            .get_accumulated(ContentType::Text, "0")
1990            .is_some());
1991    }
1992
1993    // Tests for enhanced streaming metrics
1994
1995    #[test]
1996    fn test_streaming_quality_metrics_includes_snapshot_repairs() {
1997        let mut session = StreamingSession::new();
1998        session.on_message_start();
1999        session.on_content_block_start(0);
2000
2001        // First delta - long enough to meet threshold requirements
2002        let initial = "This is a long message that exceeds threshold";
2003        session.on_text_delta(0, initial);
2004
2005        // GLM sends snapshot instead of delta (with strong overlap)
2006        let snapshot = format!("{initial} World!");
2007        let _ = session.on_text_delta(0, &snapshot);
2008
2009        let metrics = session.get_streaming_quality_metrics();
2010        assert_eq!(
2011            metrics.snapshot_repairs_count, 1,
2012            "Should track one snapshot repair"
2013        );
2014    }
2015
2016    #[test]
2017    fn test_streaming_quality_metrics_includes_large_delta_count() {
2018        let mut session = StreamingSession::new();
2019        session.on_message_start();
2020
2021        // Send 3 large deltas
2022        for _ in 0..3 {
2023            let large_delta = "x".repeat(snapshot_threshold() + 1);
2024            let _ = session.on_text_delta(0, &large_delta);
2025        }
2026
2027        let metrics = session.get_streaming_quality_metrics();
2028        assert_eq!(
2029            metrics.large_delta_count, 3,
2030            "Should track three large deltas"
2031        );
2032    }
2033
2034    #[test]
2035    fn test_streaming_quality_metrics_includes_protocol_violations() {
2036        let mut session = StreamingSession::new();
2037        session.on_message_start();
2038        session.on_text_delta(0, "Hello");
2039
2040        // Simulate GLM sending repeated MessageStart during streaming
2041        session.on_message_start();
2042        session.on_text_delta(0, " World");
2043
2044        // Another violation
2045        session.on_message_start();
2046
2047        let metrics = session.get_streaming_quality_metrics();
2048        assert_eq!(
2049            metrics.protocol_violations, 2,
2050            "Should track two protocol violations"
2051        );
2052    }
2053
2054    #[test]
2055    fn test_streaming_quality_metrics_all_new_fields_zero_by_default() {
2056        let session = StreamingSession::new();
2057        let metrics = session.get_streaming_quality_metrics();
2058
2059        assert_eq!(metrics.snapshot_repairs_count, 0);
2060        assert_eq!(metrics.large_delta_count, 0);
2061        assert_eq!(metrics.protocol_violations, 0);
2062    }
2063
2064    #[test]
2065    fn test_streaming_quality_metrics_comprehensive_tracking() {
2066        let mut session = StreamingSession::new();
2067        session.on_message_start();
2068
2069        // Normal delta
2070        session.on_text_delta(0, "Hello");
2071
2072        // Large delta
2073        let large_delta = "x".repeat(snapshot_threshold() + 1);
2074        let _ = session.on_text_delta(0, &large_delta);
2075
2076        // Snapshot repair (note: the snapshot is also large, so it counts as another large delta)
2077        let snapshot = format!("Hello{large_delta} World");
2078        let _ = session.on_text_delta(0, &snapshot);
2079
2080        // Check metrics BEFORE the protocol violation (which clears delta_sizes)
2081        let metrics = session.get_streaming_quality_metrics();
2082        assert_eq!(metrics.snapshot_repairs_count, 1);
2083        assert_eq!(
2084            metrics.large_delta_count, 2,
2085            "Both the large delta and the snapshot are large"
2086        );
2087        assert_eq!(metrics.total_deltas, 3);
2088        assert_eq!(metrics.protocol_violations, 0, "No violation yet");
2089
2090        // Protocol violation
2091        session.on_message_start();
2092
2093        // After violation, protocol_violations is incremented but delta_sizes is cleared
2094        let metrics_after = session.get_streaming_quality_metrics();
2095        assert_eq!(metrics_after.protocol_violations, 1);
2096        assert_eq!(
2097            metrics_after.total_deltas, 0,
2098            "Delta sizes cleared after violation"
2099        );
2100    }
2101
2102    // Tests for hash-based deduplication
2103
2104    #[test]
2105    fn test_content_hash_computed_on_message_stop() {
2106        let mut session = StreamingSession::new();
2107        session.on_message_start();
2108        session.on_text_delta(0, "Hello");
2109        session.on_text_delta(0, " World");
2110
2111        // Hash should be None before message_stop
2112        assert_eq!(session.final_content_hash, None);
2113
2114        // Hash should be computed after message_stop
2115        session.on_message_stop();
2116        assert!(session.final_content_hash.is_some());
2117    }
2118
2119    #[test]
2120    fn test_content_hash_none_when_no_content() {
2121        let mut session = StreamingSession::new();
2122        session.on_message_start();
2123
2124        // No content streamed
2125        session.on_message_stop();
2126        assert_eq!(session.final_content_hash, None);
2127    }
2128
2129    #[test]
2130    fn test_is_duplicate_by_hash_returns_true_for_matching_content() {
2131        let mut session = StreamingSession::new();
2132        session.on_message_start();
2133        session.on_text_delta(0, "Hello World");
2134        session.on_message_stop();
2135
2136        // Same content should be detected as duplicate
2137        assert!(session.is_duplicate_by_hash("Hello World"));
2138    }
2139
2140    #[test]
2141    fn test_is_duplicate_by_hash_returns_false_for_different_content() {
2142        let mut session = StreamingSession::new();
2143        session.on_message_start();
2144        session.on_text_delta(0, "Hello World");
2145        session.on_message_stop();
2146
2147        // Different content should NOT be detected as duplicate
2148        assert!(!session.is_duplicate_by_hash("Different content"));
2149    }
2150
2151    #[test]
2152    fn test_is_duplicate_by_hash_returns_false_when_no_content_streamed() {
2153        let session = StreamingSession::new();
2154
2155        // No content streamed, so no hash
2156        assert!(!session.is_duplicate_by_hash("Hello World"));
2157    }
2158
2159    #[test]
2160    fn test_content_hash_multiple_content_blocks() {
2161        let mut session = StreamingSession::new();
2162        session.on_message_start();
2163        session.on_text_delta(0, "First block");
2164        session.on_text_delta(1, "Second block");
2165        session.on_message_stop();
2166
2167        // Hash should be computed from all blocks
2168        assert!(session.final_content_hash.is_some());
2169        // Individual content shouldn't match the combined hash
2170        assert!(!session.is_duplicate_by_hash("First block"));
2171        assert!(!session.is_duplicate_by_hash("Second block"));
2172    }
2173
2174    #[test]
2175    fn test_content_hash_consistent_for_same_content() {
2176        let mut session1 = StreamingSession::new();
2177        session1.on_message_start();
2178        session1.on_text_delta(0, "Hello");
2179        session1.on_text_delta(0, " World");
2180        session1.on_message_stop();
2181
2182        let mut session2 = StreamingSession::new();
2183        session2.on_message_start();
2184        session2.on_text_delta(0, "Hello World");
2185        session2.on_message_stop();
2186
2187        // Same content should produce the same hash
2188        assert_eq!(session1.final_content_hash, session2.final_content_hash);
2189    }
2190
2191    // Tests for rapid index switching edge case (RFC-003)
2192
2193    #[test]
2194    fn test_rapid_index_switch_with_clear() {
2195        let mut session = StreamingSession::new();
2196        session.on_message_start();
2197
2198        // Start block 0 and accumulate content
2199        session.on_content_block_start(0);
2200        let show_prefix = session.on_text_delta(0, "X");
2201        assert!(show_prefix, "First delta for index 0 should show prefix");
2202        assert_eq!(session.get_accumulated(ContentType::Text, "0"), Some("X"));
2203
2204        // Switch to block 1 - this should clear accumulated content for index 0
2205        session.on_content_block_start(1);
2206
2207        // Verify accumulated for index 0 was cleared
2208        assert_eq!(
2209            session.get_accumulated(ContentType::Text, "0"),
2210            None,
2211            "Accumulated content for index 0 should be cleared when switching to index 1"
2212        );
2213
2214        // Switch back to index 0
2215        session.on_content_block_start(0);
2216
2217        // Since output_started_for_key was also cleared, prefix should show again
2218        let show_prefix = session.on_text_delta(0, "Y");
2219        assert!(
2220            show_prefix,
2221            "Prefix should show when switching back to a previously cleared index"
2222        );
2223
2224        // Verify new content is accumulated fresh
2225        assert_eq!(
2226            session.get_accumulated(ContentType::Text, "0"),
2227            Some("Y"),
2228            "New content should be accumulated fresh after clear"
2229        );
2230    }
2231
2232    #[test]
2233    fn test_delta_sizes_cleared_on_index_switch() {
2234        let mut session = StreamingSession::new();
2235        session.on_message_start();
2236
2237        // Track some delta sizes for index 0
2238        session.on_text_delta(0, "Hello");
2239        session.on_text_delta(0, " World");
2240
2241        let content_key = (ContentType::Text, "0".to_string());
2242        assert!(
2243            session.delta_sizes.contains_key(&content_key),
2244            "Delta sizes should be tracked for index 0"
2245        );
2246        let sizes_before = session.delta_sizes.get(&content_key).unwrap();
2247        assert_eq!(sizes_before.len(), 2, "Should have 2 delta sizes tracked");
2248
2249        // Switch to index 1 - this should clear delta_sizes for index 0
2250        session.on_content_block_start(1);
2251
2252        assert!(
2253            !session.delta_sizes.contains_key(&content_key),
2254            "Delta sizes for index 0 should be cleared when switching to index 1"
2255        );
2256
2257        // Add deltas for index 1
2258        session.on_text_delta(1, "New");
2259
2260        let content_key_1 = (ContentType::Text, "1".to_string());
2261        let sizes_after = session.delta_sizes.get(&content_key_1).unwrap();
2262        assert_eq!(
2263            sizes_after.len(),
2264            1,
2265            "Should have fresh size tracking for index 1"
2266        );
2267    }
2268
2269    #[test]
2270    fn test_rapid_index_switch_with_thinking_content() {
2271        let mut session = StreamingSession::new();
2272        session.on_message_start();
2273
2274        // Start thinking content in index 0
2275        session.on_content_block_start(0);
2276        let show_prefix = session.on_thinking_delta(0, "Thinking...");
2277        assert!(show_prefix, "First thinking delta should show prefix");
2278        assert_eq!(
2279            session.get_accumulated(ContentType::Thinking, "0"),
2280            Some("Thinking...")
2281        );
2282
2283        // Switch to text content in index 1 - this should clear index 0's accumulated
2284        session.on_content_block_start(1);
2285
2286        // Verify index 0's accumulated thinking was cleared
2287        assert_eq!(
2288            session.get_accumulated(ContentType::Thinking, "0"),
2289            None,
2290            "Thinking content for index 0 should be cleared when switching to index 1"
2291        );
2292
2293        let show_prefix = session.on_text_delta(1, "Text");
2294        assert!(
2295            show_prefix,
2296            "First text delta for index 1 should show prefix"
2297        );
2298
2299        // Switch back to index 0 for thinking
2300        session.on_content_block_start(0);
2301
2302        // Since output_started_for_key for (Thinking, "0") was cleared when switching to index 1,
2303        // the prefix should show again
2304        let show_prefix = session.on_thinking_delta(0, " more");
2305        assert!(
2306            show_prefix,
2307            "Thinking prefix should show when switching back to cleared index 0"
2308        );
2309
2310        // Verify thinking content was accumulated fresh (only the new content)
2311        assert_eq!(
2312            session.get_accumulated(ContentType::Thinking, "0"),
2313            Some(" more"),
2314            "Thinking content should be accumulated fresh after clear"
2315        );
2316    }
2317
2318    #[test]
2319    fn test_output_started_for_key_cleared_across_all_content_types() {
2320        let mut session = StreamingSession::new();
2321        session.on_message_start();
2322
2323        // Start block 0 with text and thinking
2324        // Note: ToolInput does not use output_started_for_key tracking
2325        session.on_content_block_start(0);
2326        session.on_text_delta(0, "Text");
2327        session.on_thinking_delta(0, "Thinking");
2328
2329        // Verify text and thinking have started output
2330        let text_key = (ContentType::Text, "0".to_string());
2331        let thinking_key = (ContentType::Thinking, "0".to_string());
2332
2333        assert!(session.output_started_for_key.contains(&text_key));
2334        assert!(session.output_started_for_key.contains(&thinking_key));
2335
2336        // Switch to index 1 - should clear output_started_for_key for all content types
2337        session.on_content_block_start(1);
2338
2339        assert!(
2340            !session.output_started_for_key.contains(&text_key),
2341            "Text output_started should be cleared for index 0"
2342        );
2343        assert!(
2344            !session.output_started_for_key.contains(&thinking_key),
2345            "Thinking output_started should be cleared for index 0"
2346        );
2347    }
2348
2349    // Tests for environment variable configuration
2350
2351    #[test]
2352    fn test_snapshot_threshold_default() {
2353        // Ensure no env var is set for this test
2354        std::env::remove_var("RALPH_STREAMING_SNAPSHOT_THRESHOLD");
2355        // Note: Since we use OnceLock, we can't reset the value in tests.
2356        // This test documents the default behavior.
2357        let threshold = snapshot_threshold();
2358        assert_eq!(
2359            threshold, DEFAULT_SNAPSHOT_THRESHOLD,
2360            "Default threshold should be 200"
2361        );
2362    }
2363}