Skip to main content

ralph_workflow/json_parser/streaming_state/session/
session_struct.rs

1// StreamingSession struct definition and basic methods.
2//
3// This file contains the struct definition for StreamingSession and its
4// basic construction methods.
5
6/// Unified streaming session tracker.
7///
8/// Provides a single source of truth for streaming state across all parsers.
9/// Tracks:
10/// - Current streaming state (`Idle`/`Streaming`/`Finalized`)
11/// - Which content types have been streamed
12/// - Accumulated content by content type and index
13/// - Whether prefix should be shown on next delta
14/// - Delta size patterns for detecting snapshot-as-delta violations
15/// - Persistent "output started" tracking independent of accumulated content
16/// - Verbosity-aware warning emission
17///
18/// # Lifecycle
19///
20/// 1. **Start**: `on_message_start()` - resets all state
21/// 2. **Stream**: `on_text_delta()` / `on_thinking_delta()` - accumulate content
22/// 3. **Stop**: `on_message_stop()` - finalize the message
23/// 4. **Repeat**: Back to step 1 for next message
24#[derive(Debug, Default, Clone)]
25pub struct StreamingSession {
26    /// Current streaming state
27    pub(super) state: StreamingState,
28    /// Track which content types have been streamed (for deduplication)
29    /// Maps `ContentType` → whether it has been streamed
30    pub(super) streamed_types: HashMap<ContentType, bool>,
31    /// Track the current content block state
32    pub(super) current_block: ContentBlockState,
33    /// Accumulated content by (`content_type`, index) for display
34    /// This mirrors `DeltaAccumulator` but adds deduplication tracking
35    pub(super) accumulated: HashMap<(ContentType, String), String>,
36    /// Track the order of keys for `most_recent` operations
37    pub(super) key_order: Vec<(ContentType, String)>,
38    /// Track recent delta sizes for pattern detection
39    /// Maps `(content_type, key)` → vec of recent delta sizes
40    pub(super) delta_sizes: HashMap<(ContentType, String), Vec<usize>>,
41    /// Maximum number of delta sizes to track per key
42    pub(super) max_delta_history: usize,
43    /// Track the current message ID for duplicate detection
44    pub(super) current_message_id: Option<String>,
45    /// Track which messages have been displayed to prevent duplicate final output
46    pub(super) displayed_final_messages: HashSet<String>,
47    /// Track which (`content_type`, key) pairs have had output started.
48    /// This is independent of `accumulated` to handle cases where accumulated
49    /// content may be cleared (e.g., repeated `ContentBlockStart` for same index).
50    /// Cleared on `on_message_start` to ensure fresh state for each message.
51    pub(super) output_started_for_key: HashSet<(ContentType, String)>,
52    /// Whether to emit verbose warnings about streaming anomalies.
53    /// When false, suppresses diagnostic warnings that are useful for debugging
54    /// but noisy in production (e.g., GLM protocol violations, snapshot detection).
55    pub(super) verbose_warnings: bool,
56    /// Count of snapshot repairs performed during this session
57    pub(super) snapshot_repairs_count: usize,
58    /// Count of deltas that exceeded the size threshold
59    pub(super) large_delta_count: usize,
60    /// Count of protocol violations detected (e.g., `MessageStart` during streaming)
61    pub(super) protocol_violations: usize,
62    /// Hash of the final streamed content (for deduplication)
63    /// Computed at `message_stop` using all accumulated content
64    pub(super) final_content_hash: Option<u64>,
65    /// Track the last rendered content for each key to detect when rendering
66    /// would produce identical output (prevents visual repetition).
67    /// Maps `(content_type, key)` → the last accumulated content that was rendered.
68    pub(super) last_rendered: HashMap<(ContentType, String), String>,
69    /// Track rendered content hashes for duplicate detection.
70    ///
71    /// This stores a hash of the *sanitized content* together with the `(content_type, key)`
72    /// it was rendered for. This is preserved across repeated `MessageStart` boundaries.
73    ///
74    /// Keying by `(content_type, key)` is important because some parsers reuse keys within the
75    /// same turn (e.g., Codex can reuse `reasoning` for multiple items). When that happens,
76    /// we need `clear_key()` to fully reset per-key deduplication state.
77    pub(super) rendered_content_hashes: HashSet<(ContentType, String, u64)>,
78    /// Track the last delta for each key to detect exact duplicate deltas.
79    /// This is preserved across `MessageStart` boundaries to prevent duplicate processing.
80    /// Maps `(content_type, key)` → the last delta that was processed.
81    pub(super) last_delta: HashMap<(ContentType, String), String>,
82    /// Track consecutive duplicates for resend glitch detection ("3 strikes" heuristic).
83    /// Maps `(content_type, key)` → (count, `delta_hash`) where count tracks how many
84    /// times the exact same delta has arrived consecutively. When count exceeds
85    /// the threshold, the delta is dropped as a resend glitch.
86    pub(super) consecutive_duplicates: HashMap<(ContentType, String), (usize, u64)>,
87    /// Delta deduplicator using KMP and rolling hash for snapshot detection.
88    /// Provides O(n+m) guaranteed complexity for detecting snapshot-as-delta violations.
89    /// Cleared on message boundaries to prevent false positives.
90    pub(super) deduplicator: DeltaDeduplicator,
91    /// Track message IDs that have been fully rendered from an assistant event BEFORE streaming.
92    /// When an assistant event arrives before streaming deltas, we render it and record
93    /// the message_id. ALL subsequent streaming deltas for that message_id should be
94    /// suppressed to prevent duplication.
95    pub(super) pre_rendered_message_ids: HashSet<String>,
96    /// Track content hashes of assistant events that have been rendered during streaming.
97    /// This prevents duplicate assistant events with the same content from being rendered
98    /// multiple times. GLM/CCS may send multiple assistant events during streaming with
99    /// the same content but different message_ids.
100    /// This is preserved across `MessageStart` boundaries to handle mid-stream assistant events.
101    pub(super) rendered_assistant_content_hashes: HashSet<u64>,
102    /// Track tool names by index for GLM/CCS deduplication.
103    /// GLM sends assistant events with tool_use blocks (name + input) during streaming,
104    /// but only the input is accumulated via deltas. We track the tool name to properly
105    /// reconstruct the normalized representation for deduplication.
106    /// Maps the content block index to the tool name.
107    pub(super) tool_names: HashMap<u64, Option<String>>,
108}
109
110impl StreamingSession {
111    /// Create a new streaming session.
112    pub fn new() -> Self {
113        Self {
114            max_delta_history: DEFAULT_MAX_DELTA_HISTORY,
115            verbose_warnings: false,
116            ..Default::default()
117        }
118    }
119
120    /// Configure whether to emit verbose warnings about streaming anomalies.
121    ///
122    /// When enabled, diagnostic warnings are printed for:
123    /// - Repeated `MessageStart` events (GLM protocol violations)
124    /// - Large deltas that may indicate snapshot-as-delta bugs
125    /// - Pattern detection of repeated large content
126    ///
127    /// When disabled (default), these warnings are suppressed to avoid
128    /// noise in production output.
129    ///
130    /// # Arguments
131    /// * `enabled` - Whether to enable verbose warnings
132    ///
133    /// # Returns
134    /// The modified session for builder chaining.
135    ///
136    /// # Example
137    ///
138    /// ```ignore
139    /// let mut session = StreamingSession::new().with_verbose_warnings(true);
140    /// ```
141    pub const fn with_verbose_warnings(mut self, enabled: bool) -> Self {
142        self.verbose_warnings = enabled;
143        self
144    }
145}