ralph-workflow 0.7.18

PROMPT-driven multi-agent orchestrator for git repos
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
// Streaming session tracker implementation.
//
// This file contains the `StreamingSession` struct and all its implementation
// methods for tracking streaming state across all parsers.
//
// FLATENED: All content from session/ subdirectory has been inlined into this file.
// This module was restructured to ensure all session logic is in a single file
// for better code organization and simpler module structure.

use crate::json_parser::deduplication::get_overlap_thresholds;
use crate::json_parser::deduplication::rolling_hash::RollingHashWindow;
use std::io::Write as IoWrite;

// ============================================================================
// StreamingSession struct definition
// ============================================================================

/// Unified streaming session tracker.
///
/// Provides a single source of truth for streaming state across all parsers.
/// Tracks:
/// - Current streaming state (`Idle`/`Streaming`/`Finalized`)
/// - Which content types have been streamed
/// - Accumulated content by content type and index
/// - Whether prefix should be shown on next delta
/// - Delta size patterns for detecting snapshot-as-delta violations
/// - Persistent "output started" tracking independent of accumulated content
/// - Verbosity-aware warning emission
///
/// # Lifecycle
///
/// 1. **Start**: `on_message_start()` - resets all state
/// 2. **Stream**: `on_text_delta()` / `on_thinking_delta()` - accumulate content
/// 3. **Stop**: `on_message_stop()` - finalize the message
/// 4. **Repeat**: Back to step 1 for next message
#[derive(Debug, Default, Clone)]
pub struct StreamingSession {
    /// Current streaming state
    pub(super) state: StreamingState,
    /// Track which content types have been streamed (for deduplication)
    /// Maps `ContentType` → whether it has been streamed
    pub(super) streamed_types: HashMap<ContentType, bool>,
    /// Track the current content block state
    pub(super) current_block: ContentBlockState,
    /// Accumulated content by (`content_type`, index) for display
    /// This mirrors `DeltaAccumulator` but adds deduplication tracking
    pub(super) accumulated: HashMap<(ContentType, String), String>,
    /// Track the order of keys for `most_recent` operations
    pub(super) key_order: Vec<(ContentType, String)>,
    /// Track recent delta sizes for pattern detection
    /// Maps `(content_type, key)` → vec of recent delta sizes
    pub(super) delta_sizes: HashMap<(ContentType, String), Vec<usize>>,
    /// Maximum number of delta sizes to track per key
    pub(super) max_delta_history: usize,
    /// Track the current message ID for duplicate detection
    pub(super) current_message_id: Option<String>,
    /// Track which messages have been displayed to prevent duplicate final output
    pub(super) displayed_final_messages: HashSet<String>,
    /// Track which (`content_type`, key) pairs have had output started.
    /// This is independent of `accumulated` to handle cases where accumulated
    /// content may be cleared (e.g., repeated `ContentBlockStart` for same index).
    /// Cleared on `on_message_start` to ensure fresh state for each message.
    pub(super) output_started_for_key: HashSet<(ContentType, String)>,
    /// Whether to emit verbose warnings about streaming anomalies.
    /// When false, suppresses diagnostic warnings that are useful for debugging
    /// but noisy in production (e.g., GLM protocol violations, snapshot detection).
    pub(super) verbose_warnings: bool,
    /// Count of snapshot repairs performed during this session
    pub(super) snapshot_repairs_count: usize,
    /// Count of deltas that exceeded the size threshold
    pub(super) large_delta_count: usize,
    /// Count of protocol violations detected (e.g., `MessageStart` during streaming)
    pub(super) protocol_violations: usize,
    /// Hash of the final streamed content (for deduplication)
    /// Computed at `message_stop` using all accumulated content
    pub(super) final_content_hash: Option<u64>,
    /// Track the last rendered content for each key to detect when rendering
    /// would produce identical output (prevents visual repetition).
    /// Maps `(content_type, key)` → the last accumulated content that was rendered.
    pub(super) last_rendered: HashMap<(ContentType, String), String>,
    /// Track rendered content hashes for duplicate detection.
    ///
    /// This stores a hash of the *sanitized content* together with the `(content_type, key)`
    /// it was rendered for. This is preserved across repeated `MessageStart` boundaries.
    ///
    /// Keying by `(content_type, key)` is important because some parsers reuse keys within the
    /// same turn (e.g., Codex can reuse `reasoning` for multiple items). When that happens,
    /// we need `clear_key()` to fully reset per-key deduplication state.
    pub(super) rendered_content_hashes: HashSet<(ContentType, String, u64)>,
    /// Track the last delta for each key to detect exact duplicate deltas.
    /// This is preserved across `MessageStart` boundaries to prevent duplicate processing.
    /// Maps `(content_type, key)` → the last delta that was processed.
    pub(super) last_delta: HashMap<(ContentType, String), String>,
    /// Track consecutive duplicates for resend glitch detection ("3 strikes" heuristic).
    /// Maps `(content_type, key)` → (count, `delta_hash`) where count tracks how many
    /// times the exact same delta has arrived consecutively. When count exceeds
    /// the threshold, the delta is dropped as a resend glitch.
    pub(super) consecutive_duplicates: HashMap<(ContentType, String), (usize, u64)>,
    /// Delta deduplicator using KMP and rolling hash for snapshot detection.
    /// Provides O(n+m) guaranteed complexity for detecting snapshot-as-delta violations.
    /// Cleared on message boundaries to prevent false positives.
    pub(super) deduplicator: DeltaDeduplicator,
    /// Track message IDs that have been fully rendered from an assistant event BEFORE streaming.
    /// When an assistant event arrives before streaming deltas, we render it and record
    /// the `message_id`. ALL subsequent streaming deltas for that `message_id` should be
    /// suppressed to prevent duplication.
    pub(super) pre_rendered_message_ids: HashSet<String>,
    /// Track content hashes of assistant events that have been rendered during streaming.
    /// This prevents duplicate assistant events with the same content from being rendered
    /// multiple times. GLM/CCS may send multiple assistant events during streaming with
    /// the same content but different `message_ids`.
    /// This is preserved across `MessageStart` boundaries to handle mid-stream assistant events.
    pub(super) rendered_assistant_content_hashes: HashSet<u64>,
    /// Track tool names by index for GLM/CCS deduplication.
    /// GLM sends assistant events with `tool_use` blocks (name + input) during streaming,
    /// but only the input is accumulated via deltas. We track the tool name to properly
    /// reconstruct the normalized representation for deduplication.
    /// Maps the content block index to the tool name.
    pub(super) tool_names: HashMap<u64, Option<String>>,
}

// ============================================================================
// StreamingSession impl block: constructors and basic methods
// ============================================================================

impl StreamingSession {
    /// Create a new streaming session.
    #[must_use]
    pub fn new() -> Self {
        Self {
            max_delta_history: DEFAULT_MAX_DELTA_HISTORY,
            verbose_warnings: false,
            ..Default::default()
        }
    }

    /// Configure whether to emit verbose warnings about streaming anomalies.
    ///
    /// When enabled, diagnostic warnings are printed for:
    /// - Repeated `MessageStart` events (GLM protocol violations)
    /// - Large deltas that may indicate snapshot-as-delta bugs
    /// - Pattern detection of repeated large content
    ///
    /// When disabled (default), these warnings are suppressed to avoid
    /// noise in production output.
    ///
    /// # Arguments
    /// * `enabled` - Whether to enable verbose warnings
    ///
    /// # Returns
    /// The modified session for builder chaining.
    ///
    /// # Example
    ///
    /// ```ignore
    /// let mut session = StreamingSession::new().with_verbose_warnings(true);
    /// ```
    #[must_use]
    pub const fn with_verbose_warnings(mut self, enabled: bool) -> Self {
        self.verbose_warnings = enabled;
        self
    }
}

// ============================================================================
// StreamingSession impl block: state management methods
// ============================================================================

impl StreamingSession {
    /// Reset the session on new message start.
    ///
    /// This should be called when:
    /// - Claude: `MessageStart` event
    /// - Codex: `TurnStarted` event
    /// - Gemini: `init` event or new message
    /// - `OpenCode`: New part starts
    ///
    /// # Arguments
    /// * `message_id` - Optional unique identifier for this message (for deduplication)
    ///
    /// # Note on Repeated `MessageStart` Events
    ///
    /// Some agents (notably GLM/ccs-glm) send repeated `MessageStart` events during
    /// a single logical streaming session. When this happens while state is `Streaming`,
    /// we preserve `output_started_for_key` to prevent prefix spam on each delta that
    /// follows the repeated `MessageStart`. This is a defensive measure to handle
    /// non-standard agent protocols while maintaining correct behavior for legitimate
    /// multi-message scenarios.
    fn reset_streaming_state_base(&mut self) {
        self.state = StreamingState::Idle;
        self.streamed_types.clear();
        self.current_block = ContentBlockState::NotInBlock;
        self.accumulated.clear();
        self.key_order.clear();
        self.delta_sizes.clear();
        self.last_rendered.clear();
        self.deduplicator.clear();
        self.tool_names.clear();
    }

    fn on_mid_stream_restart(&mut self) {
        self.protocol_violations = self.protocol_violations.saturating_add(1);
        if self.verbose_warnings {
            let _ = writeln!(
                std::io::stderr(),
                "Warning: Received MessageStart while state is Streaming. \
                This indicates a non-standard agent protocol (e.g., GLM sending \
                repeated MessageStart events). Preserving output_started_for_key \
                to prevent prefix spam. File: state_management.rs, Line: {}",
                line!()
            );
        }
        let preserved_output_started = std::mem::take(&mut self.output_started_for_key);
        let preserved_last_delta = std::mem::take(&mut self.last_delta);
        let preserved_rendered_hashes = std::mem::take(&mut self.rendered_content_hashes);
        let preserved_consecutive_duplicates = std::mem::take(&mut self.consecutive_duplicates);
        self.reset_streaming_state_base();
        self.output_started_for_key = preserved_output_started;
        self.last_delta = preserved_last_delta;
        self.rendered_content_hashes = preserved_rendered_hashes;
        self.consecutive_duplicates = preserved_consecutive_duplicates;
    }

    fn on_normal_message_start(&mut self) {
        self.reset_streaming_state_base();
        self.output_started_for_key.clear();
        self.last_delta.clear();
        self.rendered_content_hashes.clear();
        self.consecutive_duplicates.clear();
    }

    pub fn on_message_start(&mut self) {
        if self.state == StreamingState::Streaming {
            self.on_mid_stream_restart();
        } else {
            self.on_normal_message_start();
        }
    }

    /// Set the current message ID for tracking.
    ///
    /// This should be called when processing a `MessageStart` event that contains
    /// a message identifier. Used to prevent duplicate display of final messages.
    ///
    /// # Arguments
    /// * `message_id` - The unique identifier for this message (or None to clear)
    pub fn set_current_message_id(&mut self, message_id: Option<String>) {
        self.current_message_id = message_id;
    }

    /// Get the current message ID.
    ///
    /// # Returns
    /// * `Some(id)` - The current message ID
    /// * `None` - No message ID is set
    #[must_use]
    pub fn get_current_message_id(&self) -> Option<&str> {
        self.current_message_id.as_deref()
    }

    /// Check if a message ID represents a duplicate final message.
    ///
    /// This prevents displaying the same message twice - once after streaming
    /// completes and again when the final "Assistant" event arrives.
    ///
    /// # Arguments
    /// * `message_id` - The message ID to check
    ///
    /// # Returns
    /// * `true` - This message has already been displayed (is a duplicate)
    /// * `false` - This is a new message
    #[must_use]
    pub fn is_duplicate_final_message(&self, message_id: &str) -> bool {
        self.displayed_final_messages.contains(message_id)
    }

    /// Mark a message as displayed to prevent duplicate display.
    ///
    /// This should be called after displaying a message's final content.
    ///
    /// # Arguments
    /// * `message_id` - The message ID to mark as displayed
    pub fn mark_message_displayed(&mut self, message_id: &str) {
        self.displayed_final_messages.insert(message_id.to_string());
    }

    /// Mark that an assistant event has pre-rendered content BEFORE streaming started.
    ///
    /// This is used to handle the case where an assistant event arrives with full content
    /// BEFORE any streaming deltas. When this happens, we render the assistant event content
    /// and mark the `message_id` as pre-rendered. ALL subsequent streaming deltas for the
    /// same `message_id` should be suppressed to prevent duplication.
    ///
    /// # Arguments
    /// * `message_id` - The message ID that was pre-rendered
    pub fn mark_message_pre_rendered(&mut self, message_id: &str) {
        self.pre_rendered_message_ids.insert(message_id.to_string());
    }

    /// Check if a message was pre-rendered from an assistant event.
    ///
    /// This checks if the given `message_id` was previously rendered from an assistant
    /// event (before streaming started). If so, ALL subsequent streaming deltas for
    /// this message should be suppressed.
    ///
    /// # Arguments
    /// * `message_id` - The message ID to check
    ///
    /// # Returns
    /// * `true` - This message was pre-rendered, suppress all streaming output
    /// * `false` - This message was not pre-rendered, allow streaming output
    #[must_use]
    pub fn is_message_pre_rendered(&self, message_id: &str) -> bool {
        self.pre_rendered_message_ids.contains(message_id)
    }

    /// Check if assistant event content has already been rendered.
    ///
    /// This prevents duplicate assistant events with the same content from being rendered
    /// multiple times. GLM/CCS may send multiple assistant events during streaming with
    /// the same content but different `message_ids`.
    ///
    /// # Arguments
    /// * `content_hash` - The hash of the assistant event content
    ///
    /// # Returns
    /// * `true` - This content was already rendered, suppress rendering
    /// * `false` - This content was not rendered, allow rendering
    #[must_use]
    pub fn is_assistant_content_rendered(&self, content_hash: u64) -> bool {
        self.rendered_assistant_content_hashes
            .contains(&content_hash)
    }

    /// Mark assistant event content as having been rendered.
    ///
    /// This should be called after rendering an assistant event to prevent
    /// duplicate rendering of the same content.
    ///
    /// # Arguments
    /// * `content_hash` - The hash of the assistant event content that was rendered
    pub fn mark_assistant_content_rendered(&mut self, content_hash: u64) {
        self.rendered_assistant_content_hashes.insert(content_hash);
    }

    /// Mark the start of a content block.
    ///
    /// This should be called when:
    /// - Claude: `ContentBlockStart` event
    /// - Codex: `ItemStarted` with relevant type
    /// - Gemini: Content section begins
    /// - `OpenCode`: Part with content starts
    ///
    /// If we're already in a block, this method finalizes the previous block
    /// by emitting a newline if output had started.
    ///
    /// # Arguments
    /// * `index` - The content block index (for multi-block messages)
    pub fn on_content_block_start(&mut self, index: u64) {
        let index_str = index.to_string();

        // Finalize previous block if we're in one
        self.ensure_content_block_finalized();

        // DO NOT clear accumulated content when transitioning blocks.
        //
        // RATIONALE:
        // In non-TTY modes (Basic/None), per-delta output is suppressed and accumulated
        // content is flushed ONCE at message_stop for ALL blocks. Clearing accumulated
        // content when transitioning to a new block would lose earlier blocks' content,
        // causing only the LAST block to be output (Bug: CCS renderer repeats streamed
        // lines across deltas - wt-24-ccs-repeat-2).
        //
        // In Full TTY mode, accumulated content is unused (deltas rendered in-place), so
        // letting it persist until message_stop has negligible memory impact.
        //
        // Accumulated content is properly cleared at message_start for the next message.
        //
        // This fix ensures multi-block messages are correctly flushed in non-TTY modes:
        // - Message with blocks [0, 1, 2]: ALL blocks' content is preserved until
        //   message_stop, then flushed via accumulated_keys() iteration.
        // - No per-delta spam (suppression already implemented in renderers).
        // - Content from ALL blocks appears in final output.
        //
        // EVIDENCE from baseline testing (wt-24-ccs-repeat-2 continuation #1):
        // When accumulated content IS cleared on block transition (baseline behavior):
        // - test_ccs_glm_architecture_verification_none_mode FAILS: only tool input
        //   (c0...c99) present, thinking (t0...t99) and text (w0...w99) MISSING
        // - test_ccs_glm_interleaved_blocks_with_many_deltas_none_mode FAILS: thinking
        //   block 0 (t0_) MISSING, only later blocks appear
        // Root cause confirmed: Clearing accumulated content on block transition loses
        // earlier blocks, violating the suppress-accumulate-flush architecture.

        // Initialize the new content block
        self.current_block = ContentBlockState::InBlock {
            index: index_str,
            started_output: false,
        };
    }

    /// Ensure the current content block is finalized.
    ///
    /// If we're in a block and output has started, this returns true to indicate
    /// that a newline should be emitted. This prevents "glued text" bugs where
    /// content from different blocks is concatenated without separation.
    ///
    /// # Returns
    /// * `true` - A newline should be emitted (output had started)
    /// * `false` - No newline needed (no output or not in a block)
    fn ensure_content_block_finalized(&mut self) -> bool {
        if let ContentBlockState::InBlock { started_output, .. } = &self.current_block {
            let had_output = *started_output;
            self.current_block = ContentBlockState::NotInBlock;
            had_output
        } else {
            false
        }
    }

    /// Assert that the session is in a valid lifecycle state.
    ///
    /// In debug builds, this will panic if the current state doesn't match
    /// any of the expected states. In release builds, this does nothing.
    ///
    /// # Arguments
    /// * `expected` - Slice of acceptable states
    fn assert_lifecycle_state(&self, expected: &[StreamingState]) {
        #[cfg(debug_assertions)]
        assert!(
            expected.contains(&self.state),
            "Invalid lifecycle state: expected {:?}, got {:?}. \
            This indicates a bug in the parser's event handling.",
            expected,
            self.state
        );
        #[cfg(not(debug_assertions))]
        let _ = expected;
    }

    /// Finalize the message on stop event.
    ///
    /// This should be called when:
    /// - Claude: `MessageStop` event
    /// - Codex: `TurnCompleted` or `ItemCompleted` with text
    /// - Gemini: Message completion
    /// - `OpenCode`: Part completion
    ///
    /// # Returns
    /// * `true` - A completion newline should be emitted (was in a content block)
    /// * `false` - No completion needed (no content block active)
    pub fn on_message_stop(&mut self) -> bool {
        let was_in_block = self.ensure_content_block_finalized();
        self.state = StreamingState::Finalized;

        // Compute content hash for deduplication
        self.final_content_hash = self.compute_content_hash();

        // Mark the current message as displayed to prevent duplicate display
        // when the final "Assistant" event arrives
        if let Some(message_id) = self.current_message_id.clone() {
            self.mark_message_displayed(&message_id);
        }

        was_in_block
    }

    /// Clear all state for a specific (`content_type`, key) pair.
    ///
    /// This is used when a logical sub-stream completes (e.g., Codex `reasoning`
    /// item completion) but the overall turn/message continues.
    pub fn clear_key(&mut self, content_type: ContentType, key: &str) {
        let content_key = (content_type, key.to_string());
        self.accumulated.remove(&content_key);
        self.key_order.retain(|k| k != &content_key);
        self.output_started_for_key.remove(&content_key);
        self.delta_sizes.remove(&content_key);
        self.last_rendered.remove(&content_key);
        self.last_delta.remove(&content_key);
        self.consecutive_duplicates.remove(&content_key);

        // Clear any per-key rendered-hash entries so subsequent sub-streams reusing the
        // same key (e.g., Codex `reasoning`) won't be incorrectly suppressed as duplicates.
        self.rendered_content_hashes
            .retain(|(ct, k, _hash)| !(*ct == content_type && k == key));
    }

    /// Check if ANY content has been streamed for this message.
    ///
    /// This is a broader check that returns true if ANY content type
    /// has been streamed. Used to skip entire message display when
    /// all content was already streamed.
    #[must_use]
    pub fn has_any_streamed_content(&self) -> bool {
        !self.streamed_types.is_empty()
    }
}

// ============================================================================
// StreamingSession impl block: text delta handling
// ============================================================================

fn update_consecutive_dup_entry(count: &mut usize, prev_hash: &mut u64, delta_hash: u64, threshold: usize) -> bool {
    if *prev_hash == delta_hash {
        *count = count.saturating_add(1);
        *count >= threshold
    } else {
        *count = 1;
        *prev_hash = delta_hash;
        false
    }
}

fn warn_if_verbose_consecutive_dup(verbose: bool, count: usize, threshold: usize, key_str: &str, delta: &str) {
    if verbose {
        let _ = writeln!(
            std::io::stderr(),
            "Warning: Dropping consecutive duplicate delta (count={count}, threshold={threshold}). \
            This appears to be a resend glitch. Key: '{key_str}', Delta: {delta:?}",
        );
    }
}

fn is_exact_duplicate_after_message_start(
    last_delta: &HashMap<(ContentType, String), String>,
    accumulated: &HashMap<(ContentType, String), String>,
    content_key: &(ContentType, String),
    delta: &str,
) -> bool {
    if let Some(last) = last_delta.get(content_key) {
        if delta == last {
            return accumulated.get(content_key).is_none_or(String::is_empty);
        }
    }
    false
}

fn warn_large_delta_pattern(verbose: bool, sizes: &[usize], key: &str) {
    let large_count = sizes.iter().filter(|&&s| s > snapshot_threshold()).count();
    if sizes.len() >= DEFAULT_PATTERN_DETECTION_MIN_DELTAS && large_count >= DEFAULT_PATTERN_DETECTION_MIN_DELTAS && verbose {
        let _ = writeln!(std::io::stderr(), "Warning: Detected pattern of {large_count} large deltas for key '{key}'. This strongly suggests a snapshot-as-delta bug.");
    }
}

impl StreamingSession {
    pub fn on_text_delta(&mut self, index: u64, delta: &str) -> bool {
        self.on_text_delta_key(&index.to_string(), delta)
    }

    /// Check for consecutive duplicate delta using the "3 strikes" heuristic.
    ///
    /// Detects resend glitches where the exact same delta arrives repeatedly.
    /// Returns true if the delta should be dropped (exceeded threshold), false otherwise.
    ///
    /// # Arguments
    /// * `content_key` - The content key to check
    /// * `delta` - The delta to check
    /// * `key_str` - The string key for logging
    ///
    /// # Returns
    /// * `true` - The delta should be dropped (consecutive duplicate exceeded threshold)
    /// * `false` - The delta should be processed
    fn check_consecutive_duplicate(&mut self, content_key: &(ContentType, String), delta: &str, key_str: &str) -> bool {
        let delta_hash = RollingHashWindow::compute_hash(delta);
        let threshold = get_overlap_thresholds().consecutive_duplicate_threshold;
        let Some((count, prev_hash)) = self.consecutive_duplicates.get_mut(content_key) else {
            self.consecutive_duplicates.insert(content_key.clone(), (1, delta_hash));
            return false;
        };
        let exceeded = update_consecutive_dup_entry(count, prev_hash, delta_hash, threshold);
        if exceeded { warn_if_verbose_consecutive_dup(self.verbose_warnings, *count, threshold, key_str, delta); }
        exceeded
    }

    /// Process a text delta with a string key and return whether prefix should be shown.
    ///
    /// This variant is for parsers that use string keys instead of numeric indices
    /// (e.g., Codex uses `agent_msg`, `reasoning`; Gemini uses `main`; `OpenCode` uses `main`).
    ///
    /// # Delta Validation
    ///
    /// This method validates that incoming content appears to be a genuine delta
    /// (small chunk) rather than a snapshot (full accumulated content). Large "deltas"
    /// that exceed `snapshot_threshold()` trigger a warning as they may indicate a
    /// contract violation.
    ///
    /// Additionally, we track patterns of delta sizes to detect repeated large
    /// content being sent as if it were incremental (a common snapshot-as-delta bug).
    ///
    /// # Arguments
    /// * `key` - The content key (e.g., `main`, `agent_msg`, `reasoning`)
    /// * `delta` - The text delta to accumulate
    ///
    /// # Returns
    /// * `true` - Show prefix with this delta (first chunk)
    /// * `false` - Don't show prefix (subsequent chunks)
    fn track_large_delta_warning(&mut self, delta_size: usize, key: &str) {
        if delta_size <= snapshot_threshold() { return; }
        self.large_delta_count = self.large_delta_count.saturating_add(1);
        if self.verbose_warnings {
            let _ = writeln!(std::io::stderr(), "Warning: Large delta ({delta_size} chars) for key '{key}'. This may indicate unusual streaming behavior.");
        }
    }

    fn track_delta_size(&mut self, content_key: &(ContentType, String), delta_size: usize) {
        let sizes = self.delta_sizes.entry(content_key.clone()).or_default();
        sizes.push(delta_size);
        if sizes.len() > self.max_delta_history { sizes.remove(0); }
    }

    fn track_delta_metrics(&mut self, content_key: &(ContentType, String), delta: &str, key: &str) {
        self.track_large_delta_warning(delta.len(), key);
        self.track_delta_size(content_key, delta.len());
    }

    fn warn_snapshot_fallback(&self, e: &SnapshotDeltaError) {
        if self.verbose_warnings {
            let _ = writeln!(std::io::stderr(), "Warning: Snapshot extraction failed: {e}. Using original delta.");
        }
    }

    fn resolve_snapshot_or_delta(&mut self, delta: &str, key: &str) -> String {
        if !self.is_likely_snapshot(delta, key) { return delta.to_string(); }
        match self.get_delta_from_snapshot(delta, key) {
            Ok(extracted) => { self.snapshot_repairs_count = self.snapshot_repairs_count.saturating_add(1); extracted.to_string() }
            Err(e) => { self.warn_snapshot_fallback(&e); delta.to_string() }
        }
    }

    fn commit_text_delta(&mut self, content_key: (ContentType, String), actual_delta: String, delta: &str, key: &str) -> bool {
        self.streamed_types.insert(ContentType::Text, true);
        self.state = StreamingState::Streaming;
        self.current_block = ContentBlockState::InBlock { index: key.to_string(), started_output: true };
        let is_first = !self.output_started_for_key.contains(&content_key);
        self.output_started_for_key.insert(content_key.clone());
        self.accumulated.entry(content_key.clone()).and_modify(|buf| buf.push_str(&actual_delta)).or_insert_with(|| actual_delta);
        self.last_delta.insert(content_key.clone(), delta.to_string());
        if is_first { self.key_order.push(content_key); }
        is_first
    }

    pub fn on_text_delta_key(&mut self, key: &str, delta: &str) -> bool {
        self.assert_lifecycle_state(&[StreamingState::Idle, StreamingState::Streaming]);
        let content_key = (ContentType::Text, key.to_string());
        self.track_delta_metrics(&content_key, delta, key);
        if is_exact_duplicate_after_message_start(&self.last_delta, &self.accumulated, &content_key, delta) { return false; }
        if self.check_consecutive_duplicate(&content_key, delta, key) { return false; }
        let actual_delta = self.resolve_snapshot_or_delta(delta, key);
        warn_large_delta_pattern(self.verbose_warnings, self.delta_sizes.get(&content_key).map_or(&[], |v| v), key);
        if actual_delta.is_empty() { return false; }
        self.commit_text_delta(content_key, actual_delta, delta, key)
    }
}

// ============================================================================
// StreamingSession impl block: thinking delta handling
// ============================================================================

impl StreamingSession {
    pub fn on_thinking_delta(&mut self, index: u64, delta: &str) -> bool {
        self.on_thinking_delta_key(&index.to_string(), delta)
    }

    /// Process a thinking delta with a string key and return whether prefix should be shown.
    ///
    /// This variant is for parsers that use string keys instead of numeric indices.
    ///
    /// # Arguments
    /// * `key` - The content key (e.g., "reasoning")
    /// * `delta` - The thinking delta to accumulate
    ///
    /// # Returns
    /// * `true` - Show prefix with this delta (first chunk)
    /// * `false` - Don't show prefix (subsequent chunks)
    pub fn on_thinking_delta_key(&mut self, key: &str, delta: &str) -> bool {
        // Mark that we're streaming thinking content
        self.streamed_types.insert(ContentType::Thinking, true);
        self.state = StreamingState::Streaming;

        merge_delta(
            &mut self.accumulated,
            &mut self.key_order,
            &mut self.output_started_for_key,
            ContentType::Thinking,
            key,
            delta,
        )
    }
}

// ============================================================================
// StreamingSession impl block: tool input delta handling
// ============================================================================

impl StreamingSession {
    pub fn on_tool_input_delta(&mut self, index: u64, delta: &str) {
        // Mark that we're streaming tool input
        self.streamed_types.insert(ContentType::ToolInput, true);
        self.state = StreamingState::Streaming;

        let _ = merge_delta(
            &mut self.accumulated,
            &mut self.key_order,
            &mut self.output_started_for_key,
            ContentType::ToolInput,
            &index.to_string(),
            delta,
        );
    }

    /// Record the tool name for a specific content block index.
    ///
    /// This is used for GLM/CCS deduplication where assistant events contain
    /// `tool_use` blocks (name + input) but streaming only accumulates the input.
    /// By tracking the name separately, we can reconstruct the normalized
    /// representation for proper hash-based deduplication.
    ///
    /// # Arguments
    /// * `index` - The content block index
    /// * `name` - The tool name (if available)
    pub fn set_tool_name(&mut self, index: u64, name: Option<String>) {
        self.tool_names.insert(index, name);
    }
}

// ============================================================================
// StreamingSession impl block: rendering and accumulation tracking
// ============================================================================

impl StreamingSession {
    pub fn get_accumulated(&self, content_type: ContentType, index: &str) -> Option<&str> {
        self.accumulated
            .get(&(content_type, index.to_string()))
            .map(std::string::String::as_str)
    }

    /// Return the set of accumulated keys for a given content type.
    ///
    /// This is used by non-TTY flush logic to render the final accumulated content
    /// once at a completion boundary (e.g., `message_stop`) without relying on
    /// arbitrary index bounds.
    #[must_use]
    pub fn accumulated_keys(&self, content_type: ContentType) -> Vec<String> {
        sorted_content_keys(&self.accumulated, content_type)
    }

    /// Mark content as having been rendered (HashMap-based tracking).
    ///
    /// This should be called after rendering to update the per-key tracking.
    ///
    /// # Arguments
    /// * `content_type` - The type of content
    /// * `index` - The content index (as string for flexibility)
    pub fn mark_rendered(&mut self, content_type: ContentType, index: &str) {
        let content_key = (content_type, index.to_string());

        // Store the current accumulated content as last rendered
        if let Some(current) = self.accumulated.get(&content_key) {
            self.last_rendered.insert(content_key, current.clone());
        }
    }

    /// Check if content has been rendered before using hash-based tracking.
    ///
    /// This provides global duplicate detection across all content by computing
    /// a hash of the accumulated content and checking if it's in the rendered set.
    /// This is preserved across `MessageStart` boundaries to prevent duplicate rendering.
    ///
    /// # Arguments
    /// * `content_type` - The type of content
    /// * `index` - The content index (as string for flexibility)
    ///
    /// # Returns
    /// * `true` - This exact content has been rendered before
    /// * `false` - This exact content has not been rendered
    #[must_use]
    pub fn is_content_rendered(&self, content_type: ContentType, index: &str) -> bool {
        let content_key = (content_type, index.to_string());

        // Check if we have accumulated content for this key
        if let Some(current) = self.accumulated.get(&content_key) {
            let hash = compute_hash(current);

            // Check if this hash has been rendered before for this key
            return self
                .rendered_content_hashes
                .contains(&(content_type, index.to_string(), hash));
        }

        false
    }

    /// Check if content has been rendered before and starts with previously rendered content.
    ///
    /// This method detects when new content extends previously rendered content,
    /// indicating an in-place update should be performed (e.g., using carriage return).
    ///
    /// With the new KMP + Rolling Hash approach, this checks if output has started
    /// for this key, which indicates we're in an in-place update scenario.
    ///
    /// # Arguments
    /// * `content_type` - The type of content
    /// * `index` - The content index (as string for flexibility)
    ///
    /// # Returns
    /// * `true` - Output has started for this key (do in-place update)
    /// * `false` - Output has not started for this key (show new content)
    #[must_use]
    pub fn has_rendered_prefix(&self, content_type: ContentType, index: &str) -> bool {
        let content_key = (content_type, index.to_string());
        self.output_started_for_key.contains(&content_key)
    }

    /// Mark content as rendered using hash-based tracking.
    ///
    /// This method updates the `rendered_content_hashes` set to track all
    /// content that has been rendered for deduplication.
    ///
    /// # Arguments
    /// * `content_type` - The type of content
    /// * `index` - The content index (as string for flexibility)
    pub fn mark_content_rendered(&mut self, content_type: ContentType, index: &str) {
        // Also update last_rendered for compatibility
        self.mark_rendered(content_type, index);

        // Add the hash of the accumulated content to the rendered set
        let content_key = (content_type, index.to_string());
        if let Some(current) = self.accumulated.get(&content_key) {
            let hash = compute_hash(current);
            self.rendered_content_hashes
                .insert((content_type, index.to_string(), hash));
        }
    }

    /// Mark content as rendered using pre-sanitized content.
    ///
    /// This method uses the sanitized content (with whitespace normalized)
    /// for hash-based deduplication, which prevents duplicates when the
    /// accumulated content differs only by whitespace.
    ///
    /// # Arguments
    /// * `content_type` - The type of content
    /// * `index` - The content index (as string for flexibility)
    /// * `content` - The content to hash
    pub fn mark_content_hash_rendered(
        &mut self,
        content_type: ContentType,
        index: &str,
        content: &str,
    ) {
        // Also update last_rendered for compatibility
        self.mark_rendered(content_type, index);

        // Add the hash of the content to the rendered set.
        //
        // NOTE: We key by (content_type, index) so `clear_key()` can fully reset
        // per-substream deduplication.
        let hash = compute_hash(content);
        self.rendered_content_hashes
            .insert((content_type, index.to_string(), hash));
    }

    /// Check if sanitized content has already been rendered.
    ///
    /// This method checks the hash of the sanitized content against the
    /// rendered set to prevent duplicate rendering.
    ///
    /// # Arguments
    /// * `_content_type` - The type of content (kept for API consistency)
    /// * `_index` - The content index (kept for API consistency)
    /// * `sanitized_content` - The sanitized content to check
    ///
    /// # Returns
    /// * `true` - This exact content has been rendered before
    /// * `false` - This exact content has not been rendered
    #[must_use]
    pub fn is_content_hash_rendered(
        &self,
        content_type: ContentType,
        index: &str,
        content: &str,
    ) -> bool {
        let hash = compute_hash(content);

        // Check if this hash has been rendered before for this (content_type, index)
        self.rendered_content_hashes
            .contains(&(content_type, index.to_string(), hash))
    }
}