ralph_workflow/json_parser/streaming_state/contract.rs
1// Streaming configuration constants and contract types.
2//
3// This file contains the delta contract validation constants, thresholds,
4// and the state enums (`StreamingState`, `ContentBlockState`) that define
5// the streaming protocol.
6
7use std::sync::OnceLock;
8
9// Streaming configuration constants
10
11/// Default threshold for detecting snapshot-as-delta violations (in characters).
12///
13/// Deltas exceeding this size are flagged as potential snapshots. The value of 200
14/// characters was chosen because:
15/// - Normal deltas are typically < 100 characters (a few tokens)
16/// - Snapshots often contain the full accumulated content (200+ chars)
17/// - This threshold catches most violations while minimizing false positives
18pub(super) const DEFAULT_SNAPSHOT_THRESHOLD: usize = 200;
19
20/// Minimum allowed snapshot threshold (in characters).
21///
22/// Values below 50 would cause excessive false positives for normal deltas,
23/// as even small text chunks (1-2 sentences) can exceed 30 characters.
24const MIN_SNAPSHOT_THRESHOLD: usize = 50;
25
26/// Maximum allowed snapshot threshold (in characters).
27///
28/// Values above 1000 would allow malicious snapshots to pass undetected,
29/// potentially causing exponential duplication bugs.
30const MAX_SNAPSHOT_THRESHOLD: usize = 1000;
31
32/// Minimum number of consecutive large deltas required to trigger pattern detection warning.
33///
34/// This threshold prevents false positives from occasional large deltas.
35/// Three consecutive large deltas indicate a pattern (not a one-off event).
36pub(super) const DEFAULT_PATTERN_DETECTION_MIN_DELTAS: usize = 3;
37
38/// Maximum number of delta sizes to track per content key for pattern detection.
39///
40/// Tracking recent delta sizes allows us to detect patterns of repeated large
41/// content (a sign of snapshot-as-delta bugs). Ten entries provide sufficient
42/// history without excessive memory usage.
43pub(super) const DEFAULT_MAX_DELTA_HISTORY: usize = 10;
44
45/// Ralph enforces a **delta contract** for all streaming content.
46///
47/// Every streaming event must contain only the newly generated text (delta),
48/// never the full accumulated content (snapshot).
49///
50/// # Contract Violations
51///
52/// If a parser emits snapshot-style content when deltas are expected, it will
53/// cause exponential duplication bugs. The `StreamingSession` validates that
54/// incoming content is delta-sized and logs warnings when violations are detected.
55///
56/// # Validation Threshold
57///
58/// Deltas are expected to be small chunks (typically < 100 chars). If a single
59/// "delta" exceeds `snapshot_threshold()` characters, it may indicate a snapshot
60/// being treated as a delta.
61///
62/// # Pattern Detection
63///
64/// In addition to size threshold, we track patterns of repeated large content
65/// which may indicate a snapshot-as-delta bug where the same content is being
66/// sent repeatedly as if it were incremental.
67///
68/// # Environment Variables
69///
70/// The following environment variables can be set to configure streaming behavior:
71///
72/// - `RALPH_STREAMING_SNAPSHOT_THRESHOLD`: Threshold for detecting snapshot-as-delta
73/// violations (default: 200). Deltas exceeding this size trigger warnings.
74///
75/// Get the snapshot threshold from an injected environment accessor.
76///
77/// Reads `RALPH_STREAMING_SNAPSHOT_THRESHOLD`.
78/// Valid range: 50-1000 characters.
79/// Falls back to default of 200 if not set, not parseable, or out of range.
80pub(super) fn snapshot_threshold_from_env_fn(get: impl Fn(&str) -> Option<String>) -> usize {
81 get("RALPH_STREAMING_SNAPSHOT_THRESHOLD")
82 .and_then(|s| s.parse::<usize>().ok())
83 .and_then(|v| {
84 if (MIN_SNAPSHOT_THRESHOLD..=MAX_SNAPSHOT_THRESHOLD).contains(&v) {
85 Some(v)
86 } else {
87 None
88 }
89 })
90 .unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD)
91}
92
93/// Get the snapshot threshold from environment variable or use default.
94///
95/// Reads `RALPH_STREAMING_SNAPSHOT_THRESHOLD` env var.
96/// Valid range: 50-1000 characters.
97/// Falls back to default of 200 if not set, not parseable, or out of range.
98pub(super) fn snapshot_threshold() -> usize {
99 static THRESHOLD: OnceLock<usize> = OnceLock::new();
100 *THRESHOLD.get_or_init(|| snapshot_threshold_from_env_fn(|k| std::env::var(k).ok()))
101}
102
103/// Streaming state for the current message lifecycle.
104///
105/// Tracks whether we're in the middle of streaming content and whether
106/// that content has been displayed to the user.
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
108pub enum StreamingState {
109 /// No active streaming - idle state
110 #[default]
111 Idle,
112 /// Currently streaming content deltas
113 Streaming,
114 /// Content has been finalized (after `MessageStop` or equivalent)
115 Finalized,
116}
117
118/// State tracking for content blocks during streaming.
119///
120/// Replaces the boolean `in_content_block` with richer state that tracks
121/// which block is active and whether output has started for that block.
122/// This prevents "glued text" bugs where block boundaries are crossed
123/// without proper finalization.
124#[derive(Debug, Clone, PartialEq, Eq, Default)]
125pub enum ContentBlockState {
126 /// Not currently inside a content block
127 #[default]
128 NotInBlock,
129 /// Inside a content block with tracking for output state
130 InBlock {
131 /// The block index/identifier
132 index: String,
133 /// Whether any content has been output for this block
134 started_output: bool,
135 },
136}