Skip to main content

axon_frontend/
stream_effect.rs

1//! `Stream<T>` — temporal algebraic effect with mandatory backpressure.
2//!
3//! §λ-L-E Fase 11.a — every `Stream<T>` declaration MUST carry a
4//! [`BackpressurePolicy`] annotation. The checker rejects any flow
5//! that declares a stream-valued parameter or return without one.
6//! The rationale is operational: a reactive stream without an
7//! explicit contrapressure strategy silently drops or fails under
8//! load, and that's exactly the class of incident the type system
9//! exists to prevent.
10//!
11//! The catalogue of policies is **closed** at the compiler level.
12//! Adding a new one requires a compiler patch — we don't want an
13//! adopter to invent "retry_forever" and starve the rest of the
14//! runtime. Custom *composition* of the four primitives is fine and
15//! lives outside this module (runtime combinators).
16
17use std::fmt;
18
19// ── Closed catalogue of backpressure policies ────────────────────────
20
21/// The four strategies the runtime knows how to execute when a
22/// producer outruns its consumer. See also [`crate::stream_runtime`]
23/// for the runtime impls.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum BackpressurePolicy {
26    /// Drop the oldest item to make room for a fresh one. Used for
27    /// "keep only the most recent" telemetry streams.
28    DropOldest,
29    /// Apply a pure degradation function (e.g. resample audio to a
30    /// lower bitrate) so the downstream consumer still gets every
31    /// frame, just lossy. The degrader is declared via
32    /// `degrade_quality(resample_to=8000)` syntax.
33    DegradeQuality,
34    /// Block the producer until the buffer drains. Safe for
35    /// request/response flows but MUST NOT be used on real-time
36    /// ingest paths (microphones, market data) or the source hangs.
37    PauseUpstream,
38    /// Raise an error and cancel the stream. Forces callers to deal
39    /// with saturation as an explicit failure mode. The default-is-
40    /// no-default: a `Stream<T>` without a declared policy never
41    /// falls back to `Fail`; it fails to compile.
42    Fail,
43}
44
45impl BackpressurePolicy {
46    /// Every variant. Explicit slice so adding a policy without
47    /// updating consumers is a compile error.
48    pub const ALL: &'static [BackpressurePolicy] = &[
49        BackpressurePolicy::DropOldest,
50        BackpressurePolicy::DegradeQuality,
51        BackpressurePolicy::PauseUpstream,
52        BackpressurePolicy::Fail,
53    ];
54
55    pub fn slug(self) -> &'static str {
56        match self {
57            BackpressurePolicy::DropOldest => "drop_oldest",
58            BackpressurePolicy::DegradeQuality => "degrade_quality",
59            BackpressurePolicy::PauseUpstream => "pause_upstream",
60            BackpressurePolicy::Fail => "fail",
61        }
62    }
63
64    pub fn from_slug(slug: &str) -> Option<BackpressurePolicy> {
65        Self::ALL.iter().copied().find(|p| p.slug() == slug)
66    }
67}
68
69impl fmt::Display for BackpressurePolicy {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.write_str(self.slug())
72    }
73}
74
75/// Catalogue lookup for checker diagnostics.
76pub const BACKPRESSURE_CATALOG: &[&str] =
77    &["drop_oldest", "degrade_quality", "pause_upstream", "fail"];
78
79// ── Stream type constructor ──────────────────────────────────────────
80
81/// Canonical name of the stream type constructor, as it appears in
82/// source (`Stream<Bytes>`, `Stream<AudioFrame>`).
83pub const STREAM_TYPE_CTOR: &str = "Stream";
84
85/// True when the given type name denotes a stream.
86pub fn is_stream_type(name: &str) -> bool {
87    name == STREAM_TYPE_CTOR
88}
89
90// ── Annotation parsing ───────────────────────────────────────────────
91
92/// A `@backpressure(policy, ...options)` annotation attached to the
93/// flow or tool that owns the stream. `options` forwards to the
94/// policy runtime (e.g. `buffer_size=128`, `degrade_quality(resample
95/// _to=8000)` → `options=[(resample_to, 8000)]`).
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct BackpressureAnnotation {
98    pub policy: BackpressurePolicy,
99    pub options: Vec<(String, String)>,
100}
101
102/// Parse a backpressure annotation body. Returns `None` if the policy
103/// slug is unknown so the checker emits a targeted diagnostic.
104pub fn parse_backpressure_annotation(body: &str) -> Option<BackpressureAnnotation> {
105    let mut parts = body.split(',').map(|p| p.trim());
106    let policy_slug = parts.next()?.trim();
107    let policy = BackpressurePolicy::from_slug(policy_slug)?;
108
109    let mut options = Vec::new();
110    for raw in parts {
111        if raw.is_empty() {
112            continue;
113        }
114        let (k, v) = raw.split_once('=')?;
115        options.push((k.trim().to_string(), v.trim().to_string()));
116    }
117    Some(BackpressureAnnotation { policy, options })
118}
119
120// ── Effect surface integration ───────────────────────────────────────
121
122/// Effect slug surfaced in the existing `VALID_EFFECTS` catalogue.
123/// A tool declaring `effects: [stream]` signals that it produces or
124/// consumes a `Stream<T>` and therefore mandates a backpressure
125/// handler on every flow that wires through it.
126pub const STREAM_EFFECT_SLUG: &str = "stream";
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    #[test]
133    fn slug_roundtrip_covers_closed_catalog() {
134        for policy in BackpressurePolicy::ALL {
135            let slug = policy.slug();
136            assert_eq!(Some(*policy), BackpressurePolicy::from_slug(slug));
137            assert!(BACKPRESSURE_CATALOG.contains(&slug));
138        }
139        assert_eq!(BackpressurePolicy::ALL.len(), BACKPRESSURE_CATALOG.len());
140    }
141
142    #[test]
143    fn unknown_policy_slug_rejected() {
144        assert!(BackpressurePolicy::from_slug("retry_forever").is_none());
145        assert!(BackpressurePolicy::from_slug("").is_none());
146    }
147
148    #[test]
149    fn stream_type_recognised() {
150        assert!(is_stream_type("Stream"));
151        assert!(!is_stream_type("stream")); // case-sensitive
152        assert!(!is_stream_type("Iterator"));
153    }
154
155    #[test]
156    fn parse_annotation_minimal() {
157        let ann = parse_backpressure_annotation("drop_oldest").unwrap();
158        assert_eq!(ann.policy, BackpressurePolicy::DropOldest);
159        assert!(ann.options.is_empty());
160    }
161
162    #[test]
163    fn parse_annotation_with_options() {
164        let ann = parse_backpressure_annotation("degrade_quality, resample_to=8000, codec=mulaw")
165            .unwrap();
166        assert_eq!(ann.policy, BackpressurePolicy::DegradeQuality);
167        assert_eq!(
168            ann.options,
169            vec![
170                ("resample_to".to_string(), "8000".to_string()),
171                ("codec".to_string(), "mulaw".to_string()),
172            ]
173        );
174    }
175
176    #[test]
177    fn parse_annotation_rejects_malformed() {
178        assert!(parse_backpressure_annotation("drop_oldest, no_equals").is_none());
179        assert!(parse_backpressure_annotation("bogus_policy").is_none());
180    }
181
182    #[test]
183    fn stream_effect_slug_is_stable() {
184        assert_eq!(STREAM_EFFECT_SLUG, "stream");
185    }
186}