use std::sync::Arc;
use parking_lot::Mutex;
use re_log::ResultExt as _;
use re_log_encoding::Encoder;
use re_log_types::LogMsg;
use crate::sink::LogSink;
use crate::{RecordingStream, log_sink::SinkFlushError};
pub struct BinaryStreamStorage {
inner: Arc<Mutex<Vec<LogMsg>>>,
rec: RecordingStream,
}
impl BinaryStreamStorage {
fn new(rec: RecordingStream) -> Self {
Self {
inner: Default::default(),
rec,
}
}
#[inline]
pub fn read(&self) -> Option<Vec<u8>> {
let mut inner = self.inner.lock();
if inner.is_empty() {
return None;
}
Encoder::encode(inner.drain(..).map(Ok)).ok_or_log_error()
}
#[inline]
pub fn flush(&self, timeout: std::time::Duration) -> Result<(), SinkFlushError> {
self.rec.flush_with_timeout(timeout)
}
}
impl Drop for BinaryStreamStorage {
fn drop(&mut self) {
if let Err(err) = self.flush(std::time::Duration::MAX) {
re_log::error!("Failed to flush BinaryStreamStorage: {err}");
}
let bytes = self.read();
if let Some(bytes) = bytes {
re_log::warn!(
"Dropping data in BinaryStreamStorage ({} bytes)",
bytes.len()
);
}
}
}
pub struct BinaryStreamSink {
buffer: Arc<Mutex<Vec<LogMsg>>>,
}
impl BinaryStreamSink {
pub fn new(rec: RecordingStream) -> (Self, BinaryStreamStorage) {
let storage = BinaryStreamStorage::new(rec);
(
Self {
buffer: storage.inner.clone(),
},
storage,
)
}
}
impl LogSink for BinaryStreamSink {
#[inline]
fn send(&self, msg: re_log_types::LogMsg) {
self.buffer.lock().push(msg);
}
#[inline]
fn flush_blocking(&self, _timeout: std::time::Duration) -> Result<(), SinkFlushError> {
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}