obs_core/sink/
in_memory.rs1use std::{collections::VecDeque, sync::Arc};
8
9use obs_proto::obs::v1::ObsEnvelope;
10use parking_lot::Mutex;
11
12use super::Sink;
13use crate::registry::ScrubbedEnvelope;
14
15const DEFAULT_CAPACITY: usize = 1024;
16
17#[derive(Debug, Clone)]
19pub struct InMemorySink {
20 inner: Arc<Inner>,
21}
22
23#[derive(Debug)]
24struct Inner {
25 capacity: usize,
26 buffer: Mutex<VecDeque<ObsEnvelope>>,
27}
28
29impl InMemorySink {
30 #[must_use]
32 pub fn new() -> Self {
33 Self::with_capacity(DEFAULT_CAPACITY)
34 }
35
36 #[must_use]
38 pub fn with_capacity(capacity: usize) -> Self {
39 Self {
40 inner: Arc::new(Inner {
41 capacity,
42 buffer: Mutex::new(VecDeque::with_capacity(capacity)),
43 }),
44 }
45 }
46
47 #[must_use]
50 pub fn handle(&self) -> InMemoryHandle {
51 InMemoryHandle {
52 inner: Arc::clone(&self.inner),
53 }
54 }
55}
56
57impl Default for InMemorySink {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl Sink for InMemorySink {
64 fn deliver(&self, env: ScrubbedEnvelope<'_>) {
65 let cloned = env.envelope().clone();
66 let mut buf = self.inner.buffer.lock();
67 if buf.len() >= self.inner.capacity {
68 buf.pop_front();
70 }
71 buf.push_back(cloned);
72 }
73}
74
75#[derive(Debug, Clone)]
78pub struct InMemoryHandle {
79 inner: Arc<Inner>,
80}
81
82impl InMemoryHandle {
83 #[must_use]
86 pub fn drain(&self) -> Vec<ObsEnvelope> {
87 let mut buf = self.inner.buffer.lock();
88 buf.drain(..).collect()
89 }
90
91 #[must_use]
93 pub fn count(&self) -> usize {
94 self.inner.buffer.lock().len()
95 }
96
97 #[must_use]
99 pub fn snapshot(&self) -> Vec<ObsEnvelope> {
100 self.inner.buffer.lock().iter().cloned().collect()
101 }
102
103 #[must_use]
108 pub fn wait_for(
109 &self,
110 min_count: usize,
111 timeout: std::time::Duration,
112 ) -> Option<Vec<ObsEnvelope>> {
113 let deadline = std::time::Instant::now() + timeout;
114 loop {
115 if self.count() >= min_count {
116 return Some(self.drain());
117 }
118 if std::time::Instant::now() >= deadline {
119 return None;
120 }
121 std::thread::sleep(std::time::Duration::from_millis(2));
122 }
123 }
124}