1use std::collections::VecDeque;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20use tokio::sync::{Mutex, Notify};
21
22use crate::stream_effect::{BackpressureAnnotation, BackpressurePolicy};
23
24pub type DegradationFn<T> = Arc<dyn Fn(T) -> T + Send + Sync>;
31
32#[derive(Debug)]
36pub enum StreamError {
37 Overflow {
39 policy: BackpressurePolicy,
40 buffer_capacity: usize,
41 },
42 Cancelled,
44 MissingDegrader,
46}
47
48impl std::fmt::Display for StreamError {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::Overflow {
52 policy,
53 buffer_capacity,
54 } => write!(
55 f,
56 "stream overflow under policy {policy} (capacity={buffer_capacity})"
57 ),
58 Self::Cancelled => write!(f, "stream cancelled"),
59 Self::MissingDegrader => write!(
60 f,
61 "DegradeQuality policy requires a degrader function; none attached"
62 ),
63 }
64 }
65}
66
67impl std::error::Error for StreamError {}
68
69#[derive(Debug, Default)]
73pub struct StreamMetrics {
74 pub items_pushed: AtomicU64,
75 pub items_delivered: AtomicU64,
76 pub drop_oldest_hits: AtomicU64,
77 pub degrade_quality_hits: AtomicU64,
78 pub pause_upstream_blocks: AtomicU64,
79 pub fail_overflows: AtomicU64,
80}
81
82impl StreamMetrics {
83 pub fn snapshot(&self) -> StreamMetricsSnapshot {
84 StreamMetricsSnapshot {
85 items_pushed: self.items_pushed.load(Ordering::Relaxed),
86 items_delivered: self.items_delivered.load(Ordering::Relaxed),
87 drop_oldest_hits: self.drop_oldest_hits.load(Ordering::Relaxed),
88 degrade_quality_hits: self
89 .degrade_quality_hits
90 .load(Ordering::Relaxed),
91 pause_upstream_blocks: self
92 .pause_upstream_blocks
93 .load(Ordering::Relaxed),
94 fail_overflows: self.fail_overflows.load(Ordering::Relaxed),
95 }
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct StreamMetricsSnapshot {
101 pub items_pushed: u64,
102 pub items_delivered: u64,
103 pub drop_oldest_hits: u64,
104 pub degrade_quality_hits: u64,
105 pub pause_upstream_blocks: u64,
106 pub fail_overflows: u64,
107}
108
109struct Inner<T> {
112 buffer: VecDeque<T>,
113 capacity: usize,
114 closed: bool,
115}
116
117pub struct Stream<T> {
120 inner: Arc<Mutex<Inner<T>>>,
121 not_empty: Arc<Notify>,
122 not_full: Arc<Notify>,
123 policy: BackpressurePolicy,
124 annotation: BackpressureAnnotation,
125 degrader: Option<DegradationFn<T>>,
126 pub metrics: Arc<StreamMetrics>,
127}
128
129impl<T> Clone for Stream<T> {
130 fn clone(&self) -> Self {
131 Stream {
132 inner: Arc::clone(&self.inner),
133 not_empty: Arc::clone(&self.not_empty),
134 not_full: Arc::clone(&self.not_full),
135 policy: self.policy,
136 annotation: self.annotation.clone(),
137 degrader: self.degrader.clone(),
138 metrics: Arc::clone(&self.metrics),
139 }
140 }
141}
142
143impl<T: Send + 'static> Stream<T> {
144 pub fn new(capacity: usize, annotation: BackpressureAnnotation) -> Self {
147 Self {
148 inner: Arc::new(Mutex::new(Inner {
149 buffer: VecDeque::with_capacity(capacity),
150 capacity,
151 closed: false,
152 })),
153 not_empty: Arc::new(Notify::new()),
154 not_full: Arc::new(Notify::new()),
155 policy: annotation.policy,
156 annotation,
157 degrader: None,
158 metrics: Arc::new(StreamMetrics::default()),
159 }
160 }
161
162 pub fn with_degrader(
164 capacity: usize,
165 annotation: BackpressureAnnotation,
166 degrader: DegradationFn<T>,
167 ) -> Self {
168 let mut s = Self::new(capacity, annotation);
169 s.degrader = Some(degrader);
170 s
171 }
172
173 pub fn policy(&self) -> BackpressurePolicy {
174 self.policy
175 }
176
177 pub fn annotation(&self) -> &BackpressureAnnotation {
178 &self.annotation
179 }
180
181 pub async fn push(&self, item: T) -> Result<(), StreamError> {
183 self.metrics.items_pushed.fetch_add(1, Ordering::Relaxed);
184 match self.policy {
185 BackpressurePolicy::DropOldest => self.push_drop_oldest(item).await,
186 BackpressurePolicy::DegradeQuality => {
187 self.push_degrade_quality(item).await
188 }
189 BackpressurePolicy::PauseUpstream => {
190 self.push_pause_upstream(item).await
191 }
192 BackpressurePolicy::Fail => self.push_fail(item).await,
193 }
194 }
195
196 async fn push_drop_oldest(&self, item: T) -> Result<(), StreamError> {
197 let mut g = self.inner.lock().await;
198 if g.closed {
199 return Err(StreamError::Cancelled);
200 }
201 if g.buffer.len() >= g.capacity {
202 g.buffer.pop_front();
203 self.metrics
204 .drop_oldest_hits
205 .fetch_add(1, Ordering::Relaxed);
206 }
207 g.buffer.push_back(item);
208 self.not_empty.notify_one();
209 Ok(())
210 }
211
212 async fn push_degrade_quality(&self, item: T) -> Result<(), StreamError> {
213 let mut g = self.inner.lock().await;
214 if g.closed {
215 return Err(StreamError::Cancelled);
216 }
217 let value = if g.buffer.len() >= g.capacity {
218 let degrader = self
226 .degrader
227 .as_ref()
228 .ok_or(StreamError::MissingDegrader)?
229 .clone();
230 self.metrics
231 .degrade_quality_hits
232 .fetch_add(1, Ordering::Relaxed);
233 g.buffer.pop_front();
237 degrader(item)
238 } else {
239 item
240 };
241 g.buffer.push_back(value);
242 self.not_empty.notify_one();
243 Ok(())
244 }
245
246 async fn push_pause_upstream(&self, item: T) -> Result<(), StreamError> {
247 loop {
248 {
249 let mut g = self.inner.lock().await;
250 if g.closed {
251 return Err(StreamError::Cancelled);
252 }
253 if g.buffer.len() < g.capacity {
254 g.buffer.push_back(item);
255 self.not_empty.notify_one();
256 return Ok(());
257 }
258 }
259 self.metrics
260 .pause_upstream_blocks
261 .fetch_add(1, Ordering::Relaxed);
262 self.not_full.notified().await;
263 }
264 }
265
266 async fn push_fail(&self, item: T) -> Result<(), StreamError> {
267 let mut g = self.inner.lock().await;
268 if g.closed {
269 return Err(StreamError::Cancelled);
270 }
271 if g.buffer.len() >= g.capacity {
272 self.metrics
273 .fail_overflows
274 .fetch_add(1, Ordering::Relaxed);
275 return Err(StreamError::Overflow {
276 policy: BackpressurePolicy::Fail,
277 buffer_capacity: g.capacity,
278 });
279 }
280 g.buffer.push_back(item);
281 self.not_empty.notify_one();
282 Ok(())
283 }
284
285 pub async fn pop(&self) -> Option<T> {
288 loop {
289 {
290 let mut g = self.inner.lock().await;
291 if let Some(item) = g.buffer.pop_front() {
292 self.not_full.notify_one();
293 self.metrics
294 .items_delivered
295 .fetch_add(1, Ordering::Relaxed);
296 return Some(item);
297 }
298 if g.closed {
299 return None;
300 }
301 }
302 self.not_empty.notified().await;
303 }
304 }
305
306 pub async fn close(&self) {
308 let mut g = self.inner.lock().await;
309 g.closed = true;
310 drop(g);
311 self.not_empty.notify_waiters();
312 self.not_full.notify_waiters();
313 }
314
315 pub async fn depth(&self) -> usize {
318 self.inner.lock().await.buffer.len()
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use crate::stream_effect::parse_backpressure_annotation;
326
327 fn annotation(slug: &str) -> BackpressureAnnotation {
328 parse_backpressure_annotation(slug).expect("valid slug")
329 }
330
331 #[tokio::test]
332 async fn drop_oldest_replaces_oldest_under_pressure() {
333 let s: Stream<i32> = Stream::new(2, annotation("drop_oldest"));
334 s.push(1).await.unwrap();
335 s.push(2).await.unwrap();
336 s.push(3).await.unwrap(); assert_eq!(s.pop().await, Some(2));
338 assert_eq!(s.pop().await, Some(3));
339 let m = s.metrics.snapshot();
340 assert_eq!(m.drop_oldest_hits, 1);
341 assert_eq!(m.items_pushed, 3);
342 assert_eq!(m.items_delivered, 2);
343 }
344
345 #[tokio::test]
346 async fn degrade_quality_applies_degrader() {
347 let degrader: DegradationFn<i32> = Arc::new(|x| x / 2);
348 let s: Stream<i32> = Stream::with_degrader(
349 2,
350 annotation("degrade_quality"),
351 degrader,
352 );
353 s.push(100).await.unwrap();
354 s.push(200).await.unwrap();
355 s.push(300).await.unwrap(); assert_eq!(s.pop().await, Some(200));
357 assert_eq!(s.pop().await, Some(150));
358 let m = s.metrics.snapshot();
359 assert_eq!(m.degrade_quality_hits, 1);
360 }
361
362 #[tokio::test]
363 async fn degrade_quality_without_degrader_errors() {
364 let s: Stream<i32> = Stream::new(1, annotation("degrade_quality"));
365 s.push(1).await.unwrap();
366 let err = s.push(2).await.unwrap_err();
369 matches!(err, StreamError::MissingDegrader);
370 }
371
372 #[tokio::test]
373 async fn pause_upstream_blocks_until_consumer_drains() {
374 let s: Stream<i32> = Stream::new(1, annotation("pause_upstream"));
375 s.push(1).await.unwrap();
376
377 let consumer = {
379 let s = s.clone();
380 tokio::spawn(async move {
381 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
382 s.pop().await
383 })
384 };
385
386 s.push(2).await.unwrap();
388 assert_eq!(consumer.await.unwrap(), Some(1));
389 assert_eq!(s.pop().await, Some(2));
390 let m = s.metrics.snapshot();
391 assert!(m.pause_upstream_blocks >= 1);
392 }
393
394 #[tokio::test]
395 async fn fail_policy_errors_on_overflow() {
396 let s: Stream<i32> = Stream::new(1, annotation("fail"));
397 s.push(1).await.unwrap();
398 let err = s.push(2).await.unwrap_err();
399 match err {
400 StreamError::Overflow {
401 policy,
402 buffer_capacity,
403 } => {
404 assert_eq!(policy, BackpressurePolicy::Fail);
405 assert_eq!(buffer_capacity, 1);
406 }
407 other => panic!("expected Overflow, got {other:?}"),
408 }
409 let m = s.metrics.snapshot();
410 assert_eq!(m.fail_overflows, 1);
411 }
412
413 #[tokio::test]
414 async fn close_drains_buffer_then_signals_end() {
415 let s: Stream<i32> = Stream::new(4, annotation("fail"));
416 s.push(1).await.unwrap();
417 s.push(2).await.unwrap();
418 s.close().await;
419 assert_eq!(s.pop().await, Some(1));
420 assert_eq!(s.pop().await, Some(2));
421 assert_eq!(s.pop().await, None); }
423
424 #[tokio::test]
425 async fn push_after_close_errors() {
426 let s: Stream<i32> = Stream::new(4, annotation("fail"));
427 s.close().await;
428 let err = s.push(99).await.unwrap_err();
429 matches!(err, StreamError::Cancelled);
430 }
431}