ralph_workflow/json_parser/streaming_state/contract.rs
1// Streaming configuration constants and contract types.
2//
3// This file contains the delta contract validation constants, thresholds,
4// and the state enums (`StreamingState`, `ContentBlockState`) that define
5// the streaming protocol.
6
7use std::sync::OnceLock;
8
9// Streaming configuration constants
10
11/// Default threshold for detecting snapshot-as-delta violations (in characters).
12///
13/// Deltas exceeding this size are flagged as potential snapshots. The value of 200
14/// characters was chosen because:
15/// - Normal deltas are typically < 100 characters (a few tokens)
16/// - Snapshots often contain the full accumulated content (200+ chars)
17/// - This threshold catches most violations while minimizing false positives
18pub(super) const DEFAULT_SNAPSHOT_THRESHOLD: usize = 200;
19
20/// Minimum allowed snapshot threshold (in characters).
21///
22/// Values below 50 would cause excessive false positives for normal deltas,
23/// as even small text chunks (1-2 sentences) can exceed 30 characters.
24const MIN_SNAPSHOT_THRESHOLD: usize = 50;
25
26/// Maximum allowed snapshot threshold (in characters).
27///
28/// Values above 1000 would allow malicious snapshots to pass undetected,
29/// potentially causing exponential duplication bugs.
30const MAX_SNAPSHOT_THRESHOLD: usize = 1000;
31
32/// Minimum number of consecutive large deltas required to trigger pattern detection warning.
33///
34/// This threshold prevents false positives from occasional large deltas.
35/// Three consecutive large deltas indicate a pattern (not a one-off event).
36pub(super) const DEFAULT_PATTERN_DETECTION_MIN_DELTAS: usize = 3;
37
38/// Maximum number of delta sizes to track per content key for pattern detection.
39///
40/// Tracking recent delta sizes allows us to detect patterns of repeated large
41/// content (a sign of snapshot-as-delta bugs). Ten entries provide sufficient
42/// history without excessive memory usage.
43pub(super) const DEFAULT_MAX_DELTA_HISTORY: usize = 10;
44
45/// Ralph enforces a **delta contract** for all streaming content.
46///
47/// Every streaming event must contain only the newly generated text (delta),
48/// never the full accumulated content (snapshot).
49///
50/// # Contract Violations
51///
52/// If a parser emits snapshot-style content when deltas are expected, it will
53/// cause exponential duplication bugs. The `StreamingSession` validates that
54/// incoming content is delta-sized and logs warnings when violations are detected.
55///
56/// # Validation Threshold
57///
58/// Deltas are expected to be small chunks (typically < 100 chars). If a single
59/// "delta" exceeds `snapshot_threshold()` characters, it may indicate a snapshot
60/// being treated as a delta.
61///
62/// # Pattern Detection
63///
64/// In addition to size threshold, we track patterns of repeated large content
65/// which may indicate a snapshot-as-delta bug where the same content is being
66/// sent repeatedly as if it were incremental.
67///
68/// # Environment Variables
69///
70/// The following environment variables can be set to configure streaming behavior:
71///
72/// - `RALPH_STREAMING_SNAPSHOT_THRESHOLD`: Threshold for detecting snapshot-as-delta
73/// violations (default: 200). Deltas exceeding this size trigger warnings.
74///
75/// Get the snapshot threshold from environment variable or use default.
76///
77/// Reads `RALPH_STREAMING_SNAPSHOT_THRESHOLD` env var.
78/// Valid range: 50-1000 characters.
79/// Falls back to default of 200 if not set or out of range.
80pub(super) fn snapshot_threshold() -> usize {
81 static THRESHOLD: OnceLock<usize> = OnceLock::new();
82 *THRESHOLD.get_or_init(|| {
83 std::env::var("RALPH_STREAMING_SNAPSHOT_THRESHOLD")
84 .ok()
85 .and_then(|s| s.parse::<usize>().ok())
86 .and_then(|v| {
87 if (MIN_SNAPSHOT_THRESHOLD..=MAX_SNAPSHOT_THRESHOLD).contains(&v) {
88 Some(v)
89 } else {
90 None
91 }
92 })
93 .unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD)
94 })
95}
96
97/// Streaming state for the current message lifecycle.
98///
99/// Tracks whether we're in the middle of streaming content and whether
100/// that content has been displayed to the user.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
102pub enum StreamingState {
103 /// No active streaming - idle state
104 #[default]
105 Idle,
106 /// Currently streaming content deltas
107 Streaming,
108 /// Content has been finalized (after `MessageStop` or equivalent)
109 Finalized,
110}
111
112/// State tracking for content blocks during streaming.
113///
114/// Replaces the boolean `in_content_block` with richer state that tracks
115/// which block is active and whether output has started for that block.
116/// This prevents "glued text" bugs where block boundaries are crossed
117/// without proper finalization.
118#[derive(Debug, Clone, PartialEq, Eq, Default)]
119pub enum ContentBlockState {
120 /// Not currently inside a content block
121 #[default]
122 NotInBlock,
123 /// Inside a content block with tracking for output state
124 InBlock {
125 /// The block index/identifier
126 index: String,
127 /// Whether any content has been output for this block
128 started_output: bool,
129 },
130}