Skip to main content

axon/
stream_runtime.rs

1//! Runtime implementation of `Stream<T>` with the four closed
2//! backpressure policies from [`crate::stream_effect`].
3//!
4//! §λ-L-E Fase 11.a — Temporal Algebraic Effects.
5//!
6//! A [`Stream<T>`] is a bounded async channel with an explicit
7//! backpressure handler. Every producer push goes through the
8//! policy: the policy is not a fallback but the primary control
9//! point, which is why the compiler makes it mandatory.
10//!
11//! Naming is intentionally generic. The existing [`crate::runtime`]
12//! tree keeps LLM-token streaming (§semantic streaming with
13//! epistemic gradient) separate — that's higher-level, cognitive.
14//! `Stream<T>` is the byte/frame/event-level primitive that
15//! websocket ingress, microphone taps, and file uploads land into.
16
17use std::collections::VecDeque;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20use tokio::sync::{Mutex, Notify};
21
22use crate::stream_effect::{BackpressureAnnotation, BackpressurePolicy};
23
24// ── Policy-specific inputs ───────────────────────────────────────────
25
26/// Pure (effect-free) degradation function the runtime applies when
27/// the [`BackpressurePolicy::DegradeQuality`] policy fires. The
28/// function MUST be deterministic — the checker rejects impure
29/// degraders at compile time.
30pub type DegradationFn<T> = Arc<dyn Fn(T) -> T + Send + Sync>;
31
32/// Error surfaced to a producer whose push was dropped by [`Fail`]
33/// or which timed out waiting for [`PauseUpstream`]. Also the error
34/// a consumer sees when the stream was cancelled.
35#[derive(Debug)]
36pub enum StreamError {
37    /// [`BackpressurePolicy::Fail`] triggered — buffer was full.
38    Overflow {
39        policy: BackpressurePolicy,
40        buffer_capacity: usize,
41    },
42    /// The stream was cancelled (producer dropped, consumer gave up).
43    Cancelled,
44    /// Policy is `DegradeQuality` but no degrader was attached.
45    MissingDegrader,
46}
47
48impl std::fmt::Display for StreamError {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::Overflow {
52                policy,
53                buffer_capacity,
54            } => write!(
55                f,
56                "stream overflow under policy {policy} (capacity={buffer_capacity})"
57            ),
58            Self::Cancelled => write!(f, "stream cancelled"),
59            Self::MissingDegrader => write!(
60                f,
61                "DegradeQuality policy requires a degrader function; none attached"
62            ),
63        }
64    }
65}
66
67impl std::error::Error for StreamError {}
68
69// ── Metrics (counters only — tenant_id tags are out of scope for
70//                  the primitive; adopters wrap and tag per-tenant) ──
71
72#[derive(Debug, Default)]
73pub struct StreamMetrics {
74    pub items_pushed: AtomicU64,
75    pub items_delivered: AtomicU64,
76    pub drop_oldest_hits: AtomicU64,
77    pub degrade_quality_hits: AtomicU64,
78    pub pause_upstream_blocks: AtomicU64,
79    pub fail_overflows: AtomicU64,
80}
81
82impl StreamMetrics {
83    pub fn snapshot(&self) -> StreamMetricsSnapshot {
84        StreamMetricsSnapshot {
85            items_pushed: self.items_pushed.load(Ordering::Relaxed),
86            items_delivered: self.items_delivered.load(Ordering::Relaxed),
87            drop_oldest_hits: self.drop_oldest_hits.load(Ordering::Relaxed),
88            degrade_quality_hits: self
89                .degrade_quality_hits
90                .load(Ordering::Relaxed),
91            pause_upstream_blocks: self
92                .pause_upstream_blocks
93                .load(Ordering::Relaxed),
94            fail_overflows: self.fail_overflows.load(Ordering::Relaxed),
95        }
96    }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct StreamMetricsSnapshot {
101    pub items_pushed: u64,
102    pub items_delivered: u64,
103    pub drop_oldest_hits: u64,
104    pub degrade_quality_hits: u64,
105    pub pause_upstream_blocks: u64,
106    pub fail_overflows: u64,
107}
108
109// ── The stream itself ────────────────────────────────────────────────
110
111struct Inner<T> {
112    buffer: VecDeque<T>,
113    capacity: usize,
114    closed: bool,
115}
116
117/// Bounded async stream with a declared backpressure policy.
118/// Clone-able handle — producer and consumer hold the same `Arc`.
119pub struct Stream<T> {
120    inner: Arc<Mutex<Inner<T>>>,
121    not_empty: Arc<Notify>,
122    not_full: Arc<Notify>,
123    policy: BackpressurePolicy,
124    annotation: BackpressureAnnotation,
125    degrader: Option<DegradationFn<T>>,
126    pub metrics: Arc<StreamMetrics>,
127}
128
129impl<T> Clone for Stream<T> {
130    fn clone(&self) -> Self {
131        Stream {
132            inner: Arc::clone(&self.inner),
133            not_empty: Arc::clone(&self.not_empty),
134            not_full: Arc::clone(&self.not_full),
135            policy: self.policy,
136            annotation: self.annotation.clone(),
137            degrader: self.degrader.clone(),
138            metrics: Arc::clone(&self.metrics),
139        }
140    }
141}
142
143impl<T: Send + 'static> Stream<T> {
144    /// Build a stream whose policy is `DropOldest` / `PauseUpstream` /
145    /// `Fail`. Use [`Stream::with_degrader`] for `DegradeQuality`.
146    pub fn new(capacity: usize, annotation: BackpressureAnnotation) -> Self {
147        Self {
148            inner: Arc::new(Mutex::new(Inner {
149                buffer: VecDeque::with_capacity(capacity),
150                capacity,
151                closed: false,
152            })),
153            not_empty: Arc::new(Notify::new()),
154            not_full: Arc::new(Notify::new()),
155            policy: annotation.policy,
156            annotation,
157            degrader: None,
158            metrics: Arc::new(StreamMetrics::default()),
159        }
160    }
161
162    /// Build a `DegradeQuality` stream with the mandatory degrader.
163    pub fn with_degrader(
164        capacity: usize,
165        annotation: BackpressureAnnotation,
166        degrader: DegradationFn<T>,
167    ) -> Self {
168        let mut s = Self::new(capacity, annotation);
169        s.degrader = Some(degrader);
170        s
171    }
172
173    pub fn policy(&self) -> BackpressurePolicy {
174        self.policy
175    }
176
177    pub fn annotation(&self) -> &BackpressureAnnotation {
178        &self.annotation
179    }
180
181    /// Push an item. Dispatches to the declared policy.
182    pub async fn push(&self, item: T) -> Result<(), StreamError> {
183        self.metrics.items_pushed.fetch_add(1, Ordering::Relaxed);
184        match self.policy {
185            BackpressurePolicy::DropOldest => self.push_drop_oldest(item).await,
186            BackpressurePolicy::DegradeQuality => {
187                self.push_degrade_quality(item).await
188            }
189            BackpressurePolicy::PauseUpstream => {
190                self.push_pause_upstream(item).await
191            }
192            BackpressurePolicy::Fail => self.push_fail(item).await,
193        }
194    }
195
196    async fn push_drop_oldest(&self, item: T) -> Result<(), StreamError> {
197        let mut g = self.inner.lock().await;
198        if g.closed {
199            return Err(StreamError::Cancelled);
200        }
201        if g.buffer.len() >= g.capacity {
202            g.buffer.pop_front();
203            self.metrics
204                .drop_oldest_hits
205                .fetch_add(1, Ordering::Relaxed);
206        }
207        g.buffer.push_back(item);
208        self.not_empty.notify_one();
209        Ok(())
210    }
211
212    async fn push_degrade_quality(&self, item: T) -> Result<(), StreamError> {
213        let mut g = self.inner.lock().await;
214        if g.closed {
215            return Err(StreamError::Cancelled);
216        }
217        let value = if g.buffer.len() >= g.capacity {
218            // §Fase 12.c — the degrader lookup belongs in the overflow
219            // branch, not at function entry. A `Stream::new` built
220            // without `with_degrader` must still accept pushes while
221            // the buffer has room; only when we actually need to
222            // degrade an incoming item is the absence of a degrader
223            // a policy violation. Failing closed early prevented
224            // even the first push from succeeding.
225            let degrader = self
226                .degrader
227                .as_ref()
228                .ok_or(StreamError::MissingDegrader)?
229                .clone();
230            self.metrics
231                .degrade_quality_hits
232                .fetch_add(1, Ordering::Relaxed);
233            // Drop oldest degraded + push new item at degraded
234            // quality — caller decides whether "degraded" means
235            // lower bitrate, coarser resolution, etc.
236            g.buffer.pop_front();
237            degrader(item)
238        } else {
239            item
240        };
241        g.buffer.push_back(value);
242        self.not_empty.notify_one();
243        Ok(())
244    }
245
246    async fn push_pause_upstream(&self, item: T) -> Result<(), StreamError> {
247        loop {
248            {
249                let mut g = self.inner.lock().await;
250                if g.closed {
251                    return Err(StreamError::Cancelled);
252                }
253                if g.buffer.len() < g.capacity {
254                    g.buffer.push_back(item);
255                    self.not_empty.notify_one();
256                    return Ok(());
257                }
258            }
259            self.metrics
260                .pause_upstream_blocks
261                .fetch_add(1, Ordering::Relaxed);
262            self.not_full.notified().await;
263        }
264    }
265
266    async fn push_fail(&self, item: T) -> Result<(), StreamError> {
267        let mut g = self.inner.lock().await;
268        if g.closed {
269            return Err(StreamError::Cancelled);
270        }
271        if g.buffer.len() >= g.capacity {
272            self.metrics
273                .fail_overflows
274                .fetch_add(1, Ordering::Relaxed);
275            return Err(StreamError::Overflow {
276                policy: BackpressurePolicy::Fail,
277                buffer_capacity: g.capacity,
278            });
279        }
280        g.buffer.push_back(item);
281        self.not_empty.notify_one();
282        Ok(())
283    }
284
285    /// Pull the next item. Returns `None` when the stream is closed
286    /// AND the buffer drained — analogous to a closed channel.
287    pub async fn pop(&self) -> Option<T> {
288        loop {
289            {
290                let mut g = self.inner.lock().await;
291                if let Some(item) = g.buffer.pop_front() {
292                    self.not_full.notify_one();
293                    self.metrics
294                        .items_delivered
295                        .fetch_add(1, Ordering::Relaxed);
296                    return Some(item);
297                }
298                if g.closed {
299                    return None;
300                }
301            }
302            self.not_empty.notified().await;
303        }
304    }
305
306    /// Signal end-of-stream. Wakes pending consumers; idempotent.
307    pub async fn close(&self) {
308        let mut g = self.inner.lock().await;
309        g.closed = true;
310        drop(g);
311        self.not_empty.notify_waiters();
312        self.not_full.notify_waiters();
313    }
314
315    /// Current buffer depth. Snapshot only — use for dashboards,
316    /// never for flow-control (race with any concurrent push).
317    pub async fn depth(&self) -> usize {
318        self.inner.lock().await.buffer.len()
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use crate::stream_effect::parse_backpressure_annotation;
326
327    fn annotation(slug: &str) -> BackpressureAnnotation {
328        parse_backpressure_annotation(slug).expect("valid slug")
329    }
330
331    #[tokio::test]
332    async fn drop_oldest_replaces_oldest_under_pressure() {
333        let s: Stream<i32> = Stream::new(2, annotation("drop_oldest"));
334        s.push(1).await.unwrap();
335        s.push(2).await.unwrap();
336        s.push(3).await.unwrap(); // triggers drop_oldest -> removes 1
337        assert_eq!(s.pop().await, Some(2));
338        assert_eq!(s.pop().await, Some(3));
339        let m = s.metrics.snapshot();
340        assert_eq!(m.drop_oldest_hits, 1);
341        assert_eq!(m.items_pushed, 3);
342        assert_eq!(m.items_delivered, 2);
343    }
344
345    #[tokio::test]
346    async fn degrade_quality_applies_degrader() {
347        let degrader: DegradationFn<i32> = Arc::new(|x| x / 2);
348        let s: Stream<i32> = Stream::with_degrader(
349            2,
350            annotation("degrade_quality"),
351            degrader,
352        );
353        s.push(100).await.unwrap();
354        s.push(200).await.unwrap();
355        s.push(300).await.unwrap(); // overflow → 300/2 pushed
356        assert_eq!(s.pop().await, Some(200));
357        assert_eq!(s.pop().await, Some(150));
358        let m = s.metrics.snapshot();
359        assert_eq!(m.degrade_quality_hits, 1);
360    }
361
362    #[tokio::test]
363    async fn degrade_quality_without_degrader_errors() {
364        let s: Stream<i32> = Stream::new(1, annotation("degrade_quality"));
365        s.push(1).await.unwrap();
366        // Second push overflows; without a degrader the policy fails
367        // closed with MissingDegrader.
368        let err = s.push(2).await.unwrap_err();
369        matches!(err, StreamError::MissingDegrader);
370    }
371
372    #[tokio::test]
373    async fn pause_upstream_blocks_until_consumer_drains() {
374        let s: Stream<i32> = Stream::new(1, annotation("pause_upstream"));
375        s.push(1).await.unwrap();
376
377        // Spawn a consumer that pops after a short delay.
378        let consumer = {
379            let s = s.clone();
380            tokio::spawn(async move {
381                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
382                s.pop().await
383            })
384        };
385
386        // Producer push blocks until consumer drains.
387        s.push(2).await.unwrap();
388        assert_eq!(consumer.await.unwrap(), Some(1));
389        assert_eq!(s.pop().await, Some(2));
390        let m = s.metrics.snapshot();
391        assert!(m.pause_upstream_blocks >= 1);
392    }
393
394    #[tokio::test]
395    async fn fail_policy_errors_on_overflow() {
396        let s: Stream<i32> = Stream::new(1, annotation("fail"));
397        s.push(1).await.unwrap();
398        let err = s.push(2).await.unwrap_err();
399        match err {
400            StreamError::Overflow {
401                policy,
402                buffer_capacity,
403            } => {
404                assert_eq!(policy, BackpressurePolicy::Fail);
405                assert_eq!(buffer_capacity, 1);
406            }
407            other => panic!("expected Overflow, got {other:?}"),
408        }
409        let m = s.metrics.snapshot();
410        assert_eq!(m.fail_overflows, 1);
411    }
412
413    #[tokio::test]
414    async fn close_drains_buffer_then_signals_end() {
415        let s: Stream<i32> = Stream::new(4, annotation("fail"));
416        s.push(1).await.unwrap();
417        s.push(2).await.unwrap();
418        s.close().await;
419        assert_eq!(s.pop().await, Some(1));
420        assert_eq!(s.pop().await, Some(2));
421        assert_eq!(s.pop().await, None); // closed + drained
422    }
423
424    #[tokio::test]
425    async fn push_after_close_errors() {
426        let s: Stream<i32> = Stream::new(4, annotation("fail"));
427        s.close().await;
428        let err = s.push(99).await.unwrap_err();
429        matches!(err, StreamError::Cancelled);
430    }
431}