Skip to main content

obs_core/sink/
in_memory.rs

1//! `InMemorySink` — bounded ring buffer for tests.
2//!
3//! Spec 61 § 2.4 + spec 72 § 2: the test harness observer collects
4//! envelopes into this sink, and tests `drain()` to assert what was
5//! emitted. Bounded so a runaway test cannot OOM the test binary.
6
7use std::{collections::VecDeque, sync::Arc};
8
9use obs_proto::obs::v1::ObsEnvelope;
10use parking_lot::Mutex;
11
12use super::Sink;
13use crate::registry::ScrubbedEnvelope;
14
15const DEFAULT_CAPACITY: usize = 1024;
16
17/// Test sink: collects envelopes into a bounded ring buffer.
18#[derive(Debug, Clone)]
19pub struct InMemorySink {
20    inner: Arc<Inner>,
21}
22
23#[derive(Debug)]
24struct Inner {
25    capacity: usize,
26    buffer: Mutex<VecDeque<ObsEnvelope>>,
27}
28
29impl InMemorySink {
30    /// Create a sink with the default capacity (1024).
31    #[must_use]
32    pub fn new() -> Self {
33        Self::with_capacity(DEFAULT_CAPACITY)
34    }
35
36    /// Create a sink with a specific ring buffer capacity.
37    #[must_use]
38    pub fn with_capacity(capacity: usize) -> Self {
39        Self {
40            inner: Arc::new(Inner {
41                capacity,
42                buffer: Mutex::new(VecDeque::with_capacity(capacity)),
43            }),
44        }
45    }
46
47    /// Stable handle for `drain()` / `wait_for()` / `count()`. Cheap
48    /// to clone — internally one `Arc` ref-count bump.
49    #[must_use]
50    pub fn handle(&self) -> InMemoryHandle {
51        InMemoryHandle {
52            inner: Arc::clone(&self.inner),
53        }
54    }
55}
56
57impl Default for InMemorySink {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl Sink for InMemorySink {
64    fn deliver(&self, env: ScrubbedEnvelope<'_>) {
65        let cloned = env.envelope().clone();
66        let mut buf = self.inner.buffer.lock();
67        if buf.len() >= self.inner.capacity {
68            // Drop oldest — bounded buffer.
69            buf.pop_front();
70        }
71        buf.push_back(cloned);
72    }
73}
74
75/// Stable handle to an [`InMemorySink`]. Clone-safe; share across
76/// threads.
77#[derive(Debug, Clone)]
78pub struct InMemoryHandle {
79    inner: Arc<Inner>,
80}
81
82impl InMemoryHandle {
83    /// Drain all collected envelopes (clears the buffer). Order
84    /// is FIFO — oldest first.
85    #[must_use]
86    pub fn drain(&self) -> Vec<ObsEnvelope> {
87        let mut buf = self.inner.buffer.lock();
88        buf.drain(..).collect()
89    }
90
91    /// Number of envelopes currently buffered.
92    #[must_use]
93    pub fn count(&self) -> usize {
94        self.inner.buffer.lock().len()
95    }
96
97    /// Snapshot the buffer without draining.
98    #[must_use]
99    pub fn snapshot(&self) -> Vec<ObsEnvelope> {
100        self.inner.buffer.lock().iter().cloned().collect()
101    }
102
103    /// Block the current thread until the buffer holds at least
104    /// `min_count` envelopes, or `timeout` elapses. Returns the
105    /// drained envelopes on success, `None` on timeout. Used by
106    /// tests that emit on a background task.
107    #[must_use]
108    pub fn wait_for(
109        &self,
110        min_count: usize,
111        timeout: std::time::Duration,
112    ) -> Option<Vec<ObsEnvelope>> {
113        let deadline = std::time::Instant::now() + timeout;
114        loop {
115            if self.count() >= min_count {
116                return Some(self.drain());
117            }
118            if std::time::Instant::now() >= deadline {
119                return None;
120            }
121            std::thread::sleep(std::time::Duration::from_millis(2));
122        }
123    }
124}