ralph_workflow/json_parser/streaming_state/session/session_struct.rs
1// StreamingSession struct definition and basic methods.
2//
3// This file contains the struct definition for StreamingSession and its
4// basic construction methods.
5
6/// Unified streaming session tracker.
7///
8/// Provides a single source of truth for streaming state across all parsers.
9/// Tracks:
10/// - Current streaming state (`Idle`/`Streaming`/`Finalized`)
11/// - Which content types have been streamed
12/// - Accumulated content by content type and index
13/// - Whether prefix should be shown on next delta
14/// - Delta size patterns for detecting snapshot-as-delta violations
15/// - Persistent "output started" tracking independent of accumulated content
16/// - Verbosity-aware warning emission
17///
18/// # Lifecycle
19///
20/// 1. **Start**: `on_message_start()` - resets all state
21/// 2. **Stream**: `on_text_delta()` / `on_thinking_delta()` - accumulate content
22/// 3. **Stop**: `on_message_stop()` - finalize the message
23/// 4. **Repeat**: Back to step 1 for next message
24#[derive(Debug, Default, Clone)]
25pub struct StreamingSession {
26 /// Current streaming state
27 pub(super) state: StreamingState,
28 /// Track which content types have been streamed (for deduplication)
29 /// Maps `ContentType` → whether it has been streamed
30 pub(super) streamed_types: HashMap<ContentType, bool>,
31 /// Track the current content block state
32 pub(super) current_block: ContentBlockState,
33 /// Accumulated content by (`content_type`, index) for display
34 /// This mirrors `DeltaAccumulator` but adds deduplication tracking
35 pub(super) accumulated: HashMap<(ContentType, String), String>,
36 /// Track the order of keys for `most_recent` operations
37 pub(super) key_order: Vec<(ContentType, String)>,
38 /// Track recent delta sizes for pattern detection
39 /// Maps `(content_type, key)` → vec of recent delta sizes
40 pub(super) delta_sizes: HashMap<(ContentType, String), Vec<usize>>,
41 /// Maximum number of delta sizes to track per key
42 pub(super) max_delta_history: usize,
43 /// Track the current message ID for duplicate detection
44 pub(super) current_message_id: Option<String>,
45 /// Track which messages have been displayed to prevent duplicate final output
46 pub(super) displayed_final_messages: HashSet<String>,
47 /// Track which (`content_type`, key) pairs have had output started.
48 /// This is independent of `accumulated` to handle cases where accumulated
49 /// content may be cleared (e.g., repeated `ContentBlockStart` for same index).
50 /// Cleared on `on_message_start` to ensure fresh state for each message.
51 pub(super) output_started_for_key: HashSet<(ContentType, String)>,
52 /// Whether to emit verbose warnings about streaming anomalies.
53 /// When false, suppresses diagnostic warnings that are useful for debugging
54 /// but noisy in production (e.g., GLM protocol violations, snapshot detection).
55 pub(super) verbose_warnings: bool,
56 /// Count of snapshot repairs performed during this session
57 pub(super) snapshot_repairs_count: usize,
58 /// Count of deltas that exceeded the size threshold
59 pub(super) large_delta_count: usize,
60 /// Count of protocol violations detected (e.g., `MessageStart` during streaming)
61 pub(super) protocol_violations: usize,
62 /// Hash of the final streamed content (for deduplication)
63 /// Computed at `message_stop` using all accumulated content
64 pub(super) final_content_hash: Option<u64>,
65 /// Track the last rendered content for each key to detect when rendering
66 /// would produce identical output (prevents visual repetition).
67 /// Maps `(content_type, key)` → the last accumulated content that was rendered.
68 pub(super) last_rendered: HashMap<(ContentType, String), String>,
69 /// Track rendered content hashes for duplicate detection.
70 ///
71 /// This stores a hash of the *sanitized content* together with the `(content_type, key)`
72 /// it was rendered for. This is preserved across repeated `MessageStart` boundaries.
73 ///
74 /// Keying by `(content_type, key)` is important because some parsers reuse keys within the
75 /// same turn (e.g., Codex can reuse `reasoning` for multiple items). When that happens,
76 /// we need `clear_key()` to fully reset per-key deduplication state.
77 pub(super) rendered_content_hashes: HashSet<(ContentType, String, u64)>,
78 /// Track the last delta for each key to detect exact duplicate deltas.
79 /// This is preserved across `MessageStart` boundaries to prevent duplicate processing.
80 /// Maps `(content_type, key)` → the last delta that was processed.
81 pub(super) last_delta: HashMap<(ContentType, String), String>,
82 /// Track consecutive duplicates for resend glitch detection ("3 strikes" heuristic).
83 /// Maps `(content_type, key)` → (count, `delta_hash`) where count tracks how many
84 /// times the exact same delta has arrived consecutively. When count exceeds
85 /// the threshold, the delta is dropped as a resend glitch.
86 pub(super) consecutive_duplicates: HashMap<(ContentType, String), (usize, u64)>,
87 /// Delta deduplicator using KMP and rolling hash for snapshot detection.
88 /// Provides O(n+m) guaranteed complexity for detecting snapshot-as-delta violations.
89 /// Cleared on message boundaries to prevent false positives.
90 pub(super) deduplicator: DeltaDeduplicator,
91 /// Track message IDs that have been fully rendered from an assistant event BEFORE streaming.
92 /// When an assistant event arrives before streaming deltas, we render it and record
93 /// the message_id. ALL subsequent streaming deltas for that message_id should be
94 /// suppressed to prevent duplication.
95 pub(super) pre_rendered_message_ids: HashSet<String>,
96 /// Track content hashes of assistant events that have been rendered during streaming.
97 /// This prevents duplicate assistant events with the same content from being rendered
98 /// multiple times. GLM/CCS may send multiple assistant events during streaming with
99 /// the same content but different message_ids.
100 /// This is preserved across `MessageStart` boundaries to handle mid-stream assistant events.
101 pub(super) rendered_assistant_content_hashes: HashSet<u64>,
102 /// Track tool names by index for GLM/CCS deduplication.
103 /// GLM sends assistant events with tool_use blocks (name + input) during streaming,
104 /// but only the input is accumulated via deltas. We track the tool name to properly
105 /// reconstruct the normalized representation for deduplication.
106 /// Maps the content block index to the tool name.
107 pub(super) tool_names: HashMap<u64, Option<String>>,
108}
109
110impl StreamingSession {
111 /// Create a new streaming session.
112 pub fn new() -> Self {
113 Self {
114 max_delta_history: DEFAULT_MAX_DELTA_HISTORY,
115 verbose_warnings: false,
116 ..Default::default()
117 }
118 }
119
120 /// Configure whether to emit verbose warnings about streaming anomalies.
121 ///
122 /// When enabled, diagnostic warnings are printed for:
123 /// - Repeated `MessageStart` events (GLM protocol violations)
124 /// - Large deltas that may indicate snapshot-as-delta bugs
125 /// - Pattern detection of repeated large content
126 ///
127 /// When disabled (default), these warnings are suppressed to avoid
128 /// noise in production output.
129 ///
130 /// # Arguments
131 /// * `enabled` - Whether to enable verbose warnings
132 ///
133 /// # Returns
134 /// The modified session for builder chaining.
135 ///
136 /// # Example
137 ///
138 /// ```ignore
139 /// let mut session = StreamingSession::new().with_verbose_warnings(true);
140 /// ```
141 pub const fn with_verbose_warnings(mut self, enabled: bool) -> Self {
142 self.verbose_warnings = enabled;
143 self
144 }
145}