use std::{collections::VecDeque, sync::Arc};
use obs_proto::obs::v1::ObsEnvelope;
use parking_lot::Mutex;
use super::Sink;
use crate::registry::ScrubbedEnvelope;
const DEFAULT_CAPACITY: usize = 1024;
#[derive(Debug, Clone)]
pub struct InMemorySink {
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
capacity: usize,
buffer: Mutex<VecDeque<ObsEnvelope>>,
}
impl InMemorySink {
#[must_use]
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CAPACITY)
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self {
inner: Arc::new(Inner {
capacity,
buffer: Mutex::new(VecDeque::with_capacity(capacity)),
}),
}
}
#[must_use]
pub fn handle(&self) -> InMemoryHandle {
InMemoryHandle {
inner: Arc::clone(&self.inner),
}
}
}
impl Default for InMemorySink {
fn default() -> Self {
Self::new()
}
}
impl Sink for InMemorySink {
fn deliver(&self, env: ScrubbedEnvelope<'_>) {
let cloned = env.envelope().clone();
let mut buf = self.inner.buffer.lock();
if buf.len() >= self.inner.capacity {
buf.pop_front();
}
buf.push_back(cloned);
}
}
#[derive(Debug, Clone)]
pub struct InMemoryHandle {
inner: Arc<Inner>,
}
impl InMemoryHandle {
#[must_use]
pub fn drain(&self) -> Vec<ObsEnvelope> {
let mut buf = self.inner.buffer.lock();
buf.drain(..).collect()
}
#[must_use]
pub fn count(&self) -> usize {
self.inner.buffer.lock().len()
}
#[must_use]
pub fn snapshot(&self) -> Vec<ObsEnvelope> {
self.inner.buffer.lock().iter().cloned().collect()
}
#[must_use]
pub fn wait_for(
&self,
min_count: usize,
timeout: std::time::Duration,
) -> Option<Vec<ObsEnvelope>> {
let deadline = std::time::Instant::now() + timeout;
loop {
if self.count() >= min_count {
return Some(self.drain());
}
if std::time::Instant::now() >= deadline {
return None;
}
std::thread::sleep(std::time::Duration::from_millis(2));
}
}
}