Skip to main content

ralph_workflow/json_parser/streaming_state/
contract.rs

1// Streaming configuration constants and contract types.
2//
3// This file contains the delta contract validation constants, thresholds,
4// and the state enums (`StreamingState`, `ContentBlockState`) that define
5// the streaming protocol.
6
7use std::sync::OnceLock;
8
9// Streaming configuration constants
10
11/// Default threshold for detecting snapshot-as-delta violations (in characters).
12///
13/// Deltas exceeding this size are flagged as potential snapshots. The value of 200
14/// characters was chosen because:
15/// - Normal deltas are typically < 100 characters (a few tokens)
16/// - Snapshots often contain the full accumulated content (200+ chars)
17/// - This threshold catches most violations while minimizing false positives
18pub(super) const DEFAULT_SNAPSHOT_THRESHOLD: usize = 200;
19
20/// Minimum allowed snapshot threshold (in characters).
21///
22/// Values below 50 would cause excessive false positives for normal deltas,
23/// as even small text chunks (1-2 sentences) can exceed 30 characters.
24const MIN_SNAPSHOT_THRESHOLD: usize = 50;
25
26/// Maximum allowed snapshot threshold (in characters).
27///
28/// Values above 1000 would allow malicious snapshots to pass undetected,
29/// potentially causing exponential duplication bugs.
30const MAX_SNAPSHOT_THRESHOLD: usize = 1000;
31
32/// Minimum number of consecutive large deltas required to trigger pattern detection warning.
33///
34/// This threshold prevents false positives from occasional large deltas.
35/// Three consecutive large deltas indicate a pattern (not a one-off event).
36pub(super) const DEFAULT_PATTERN_DETECTION_MIN_DELTAS: usize = 3;
37
38/// Maximum number of delta sizes to track per content key for pattern detection.
39///
40/// Tracking recent delta sizes allows us to detect patterns of repeated large
41/// content (a sign of snapshot-as-delta bugs). Ten entries provide sufficient
42/// history without excessive memory usage.
43pub(super) const DEFAULT_MAX_DELTA_HISTORY: usize = 10;
44
45/// Ralph enforces a **delta contract** for all streaming content.
46///
47/// Every streaming event must contain only the newly generated text (delta),
48/// never the full accumulated content (snapshot).
49///
50/// # Contract Violations
51///
52/// If a parser emits snapshot-style content when deltas are expected, it will
53/// cause exponential duplication bugs. The `StreamingSession` validates that
54/// incoming content is delta-sized and logs warnings when violations are detected.
55///
56/// # Validation Threshold
57///
58/// Deltas are expected to be small chunks (typically < 100 chars). If a single
59/// "delta" exceeds `snapshot_threshold()` characters, it may indicate a snapshot
60/// being treated as a delta.
61///
62/// # Pattern Detection
63///
64/// In addition to size threshold, we track patterns of repeated large content
65/// which may indicate a snapshot-as-delta bug where the same content is being
66/// sent repeatedly as if it were incremental.
67///
68/// # Environment Variables
69///
70/// The following environment variables can be set to configure streaming behavior:
71///
72/// - `RALPH_STREAMING_SNAPSHOT_THRESHOLD`: Threshold for detecting snapshot-as-delta
73///   violations (default: 200). Deltas exceeding this size trigger warnings.
74///
75/// Get the snapshot threshold from an injected environment accessor.
76///
77/// Reads `RALPH_STREAMING_SNAPSHOT_THRESHOLD`.
78/// Valid range: 50-1000 characters.
79/// Falls back to default of 200 if not set, not parseable, or out of range.
80pub(super) fn snapshot_threshold_from_env_fn(get: impl Fn(&str) -> Option<String>) -> usize {
81    get("RALPH_STREAMING_SNAPSHOT_THRESHOLD")
82        .and_then(|s| s.parse::<usize>().ok())
83        .and_then(|v| {
84            if (MIN_SNAPSHOT_THRESHOLD..=MAX_SNAPSHOT_THRESHOLD).contains(&v) {
85                Some(v)
86            } else {
87                None
88            }
89        })
90        .unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD)
91}
92
93/// Get the snapshot threshold from environment variable or use default.
94///
95/// Reads `RALPH_STREAMING_SNAPSHOT_THRESHOLD` env var.
96/// Valid range: 50-1000 characters.
97/// Falls back to default of 200 if not set, not parseable, or out of range.
98pub(super) fn snapshot_threshold() -> usize {
99    static THRESHOLD: OnceLock<usize> = OnceLock::new();
100    *THRESHOLD.get_or_init(|| snapshot_threshold_from_env_fn(|k| std::env::var(k).ok()))
101}
102
103/// Streaming state for the current message lifecycle.
104///
105/// Tracks whether we're in the middle of streaming content and whether
106/// that content has been displayed to the user.
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
108pub enum StreamingState {
109    /// No active streaming - idle state
110    #[default]
111    Idle,
112    /// Currently streaming content deltas
113    Streaming,
114    /// Content has been finalized (after `MessageStop` or equivalent)
115    Finalized,
116}
117
118/// State tracking for content blocks during streaming.
119///
120/// Replaces the boolean `in_content_block` with richer state that tracks
121/// which block is active and whether output has started for that block.
122/// This prevents "glued text" bugs where block boundaries are crossed
123/// without proper finalization.
124#[derive(Debug, Clone, PartialEq, Eq, Default)]
125pub enum ContentBlockState {
126    /// Not currently inside a content block
127    #[default]
128    NotInBlock,
129    /// Inside a content block with tracking for output state
130    InBlock {
131        /// The block index/identifier
132        index: String,
133        /// Whether any content has been output for this block
134        started_output: bool,
135    },
136}