axon_frontend/stream_effect.rs
1//! `Stream<T>` — temporal algebraic effect with mandatory backpressure.
2//!
3//! §λ-L-E Fase 11.a — every `Stream<T>` declaration MUST carry a
4//! [`BackpressurePolicy`] annotation. The checker rejects any flow
5//! that declares a stream-valued parameter or return without one.
6//! The rationale is operational: a reactive stream without an
7//! explicit contrapressure strategy silently drops or fails under
8//! load, and that's exactly the class of incident the type system
9//! exists to prevent.
10//!
11//! The catalogue of policies is **closed** at the compiler level.
12//! Adding a new one requires a compiler patch — we don't want an
13//! adopter to invent "retry_forever" and starve the rest of the
14//! runtime. Custom *composition* of the four primitives is fine and
15//! lives outside this module (runtime combinators).
16
17use std::fmt;
18
19// ── Closed catalogue of backpressure policies ────────────────────────
20
21/// The four strategies the runtime knows how to execute when a
22/// producer outruns its consumer. See also [`crate::stream_runtime`]
23/// for the runtime impls.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum BackpressurePolicy {
26 /// Drop the oldest item to make room for a fresh one. Used for
27 /// "keep only the most recent" telemetry streams.
28 DropOldest,
29 /// Apply a pure degradation function (e.g. resample audio to a
30 /// lower bitrate) so the downstream consumer still gets every
31 /// frame, just lossy. The degrader is declared via
32 /// `degrade_quality(resample_to=8000)` syntax.
33 DegradeQuality,
34 /// Block the producer until the buffer drains. Safe for
35 /// request/response flows but MUST NOT be used on real-time
36 /// ingest paths (microphones, market data) or the source hangs.
37 PauseUpstream,
38 /// Raise an error and cancel the stream. Forces callers to deal
39 /// with saturation as an explicit failure mode. The default-is-
40 /// no-default: a `Stream<T>` without a declared policy never
41 /// falls back to `Fail`; it fails to compile.
42 Fail,
43}
44
45impl BackpressurePolicy {
46 /// Every variant. Explicit slice so adding a policy without
47 /// updating consumers is a compile error.
48 pub const ALL: &'static [BackpressurePolicy] = &[
49 BackpressurePolicy::DropOldest,
50 BackpressurePolicy::DegradeQuality,
51 BackpressurePolicy::PauseUpstream,
52 BackpressurePolicy::Fail,
53 ];
54
55 pub fn slug(self) -> &'static str {
56 match self {
57 BackpressurePolicy::DropOldest => "drop_oldest",
58 BackpressurePolicy::DegradeQuality => "degrade_quality",
59 BackpressurePolicy::PauseUpstream => "pause_upstream",
60 BackpressurePolicy::Fail => "fail",
61 }
62 }
63
64 pub fn from_slug(slug: &str) -> Option<BackpressurePolicy> {
65 Self::ALL.iter().copied().find(|p| p.slug() == slug)
66 }
67}
68
69impl fmt::Display for BackpressurePolicy {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.write_str(self.slug())
72 }
73}
74
75/// Catalogue lookup for checker diagnostics.
76pub const BACKPRESSURE_CATALOG: &[&str] =
77 &["drop_oldest", "degrade_quality", "pause_upstream", "fail"];
78
79// ── Stream type constructor ──────────────────────────────────────────
80
81/// Canonical name of the stream type constructor, as it appears in
82/// source (`Stream<Bytes>`, `Stream<AudioFrame>`).
83pub const STREAM_TYPE_CTOR: &str = "Stream";
84
85/// True when the given type name denotes a stream.
86pub fn is_stream_type(name: &str) -> bool {
87 name == STREAM_TYPE_CTOR
88}
89
90// ── Annotation parsing ───────────────────────────────────────────────
91
92/// A `@backpressure(policy, ...options)` annotation attached to the
93/// flow or tool that owns the stream. `options` forwards to the
94/// policy runtime (e.g. `buffer_size=128`, `degrade_quality(resample
95/// _to=8000)` → `options=[(resample_to, 8000)]`).
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct BackpressureAnnotation {
98 pub policy: BackpressurePolicy,
99 pub options: Vec<(String, String)>,
100}
101
102/// Parse a backpressure annotation body. Returns `None` if the policy
103/// slug is unknown so the checker emits a targeted diagnostic.
104pub fn parse_backpressure_annotation(body: &str) -> Option<BackpressureAnnotation> {
105 let mut parts = body.split(',').map(|p| p.trim());
106 let policy_slug = parts.next()?.trim();
107 let policy = BackpressurePolicy::from_slug(policy_slug)?;
108
109 let mut options = Vec::new();
110 for raw in parts {
111 if raw.is_empty() {
112 continue;
113 }
114 let (k, v) = raw.split_once('=')?;
115 options.push((k.trim().to_string(), v.trim().to_string()));
116 }
117 Some(BackpressureAnnotation { policy, options })
118}
119
120// ── Effect surface integration ───────────────────────────────────────
121
122/// Effect slug surfaced in the existing `VALID_EFFECTS` catalogue.
123/// A tool declaring `effects: [stream]` signals that it produces or
124/// consumes a `Stream<T>` and therefore mandates a backpressure
125/// handler on every flow that wires through it.
126pub const STREAM_EFFECT_SLUG: &str = "stream";
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn slug_roundtrip_covers_closed_catalog() {
134 for policy in BackpressurePolicy::ALL {
135 let slug = policy.slug();
136 assert_eq!(Some(*policy), BackpressurePolicy::from_slug(slug));
137 assert!(BACKPRESSURE_CATALOG.contains(&slug));
138 }
139 assert_eq!(BackpressurePolicy::ALL.len(), BACKPRESSURE_CATALOG.len());
140 }
141
142 #[test]
143 fn unknown_policy_slug_rejected() {
144 assert!(BackpressurePolicy::from_slug("retry_forever").is_none());
145 assert!(BackpressurePolicy::from_slug("").is_none());
146 }
147
148 #[test]
149 fn stream_type_recognised() {
150 assert!(is_stream_type("Stream"));
151 assert!(!is_stream_type("stream")); // case-sensitive
152 assert!(!is_stream_type("Iterator"));
153 }
154
155 #[test]
156 fn parse_annotation_minimal() {
157 let ann = parse_backpressure_annotation("drop_oldest").unwrap();
158 assert_eq!(ann.policy, BackpressurePolicy::DropOldest);
159 assert!(ann.options.is_empty());
160 }
161
162 #[test]
163 fn parse_annotation_with_options() {
164 let ann = parse_backpressure_annotation("degrade_quality, resample_to=8000, codec=mulaw")
165 .unwrap();
166 assert_eq!(ann.policy, BackpressurePolicy::DegradeQuality);
167 assert_eq!(
168 ann.options,
169 vec![
170 ("resample_to".to_string(), "8000".to_string()),
171 ("codec".to_string(), "mulaw".to_string()),
172 ]
173 );
174 }
175
176 #[test]
177 fn parse_annotation_rejects_malformed() {
178 assert!(parse_backpressure_annotation("drop_oldest, no_equals").is_none());
179 assert!(parse_backpressure_annotation("bogus_policy").is_none());
180 }
181
182 #[test]
183 fn stream_effect_slug_is_stable() {
184 assert_eq!(STREAM_EFFECT_SLUG, "stream");
185 }
186}