use std::sync::Arc;
use std::{fmt, time::Duration};
use parking_lot::Mutex;
use re_chunk::ChunkBatcherConfig;
use re_grpc_client::write::{Client as MessageProxyClient, GrpcFlushError, Options};
use re_log_encoding::{EncodeError, Encoder};
use re_log_types::{BlueprintActivationCommand, LogMsg, StoreId};
use crate::RecordingStream;
#[derive(Debug, thiserror::Error)]
pub enum SinkFlushError {
#[error("Flush timed out - not all log messages were sent")]
Timeout,
#[error("{message}")]
Failed {
message: String,
},
}
impl SinkFlushError {
pub fn failed(message: impl Into<String>) -> Self {
Self::Failed {
message: message.into(),
}
}
}
pub trait LogSink: Send + Sync + 'static {
fn send(&self, msg: LogMsg);
#[inline]
fn send_all(&self, messages: Vec<LogMsg>) {
for msg in messages {
self.send(msg);
}
}
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
vec![]
}
fn flush_blocking(&self, timeout: Duration) -> Result<(), SinkFlushError>;
fn send_blueprint(&self, blueprint: Vec<LogMsg>, activation_cmd: BlueprintActivationCommand) {
let mut blueprint_id = None;
for msg in blueprint {
if blueprint_id.is_none() {
blueprint_id = Some(msg.store_id().clone());
}
self.send(msg);
}
if let Some(blueprint_id) = blueprint_id {
if blueprint_id == activation_cmd.blueprint_id {
self.send(activation_cmd.into());
} else {
re_log::warn!(
"Blueprint ID mismatch when sending blueprint: {:?} != {:?}. Ignoring activation.",
blueprint_id,
activation_cmd.blueprint_id
);
}
}
}
fn default_batcher_config(&self) -> ChunkBatcherConfig {
ChunkBatcherConfig::DEFAULT
}
fn as_any(&self) -> &dyn std::any::Any;
}
pub struct MultiSink(parking_lot::Mutex<Vec<Box<dyn LogSink>>>);
impl MultiSink {
#[inline]
pub fn new(sinks: Vec<Box<dyn LogSink>>) -> Self {
Self(parking_lot::Mutex::new(sinks))
}
}
impl LogSink for MultiSink {
#[inline]
fn send(&self, msg: LogMsg) {
for sink in self.0.lock().iter() {
sink.send(msg.clone());
}
}
#[inline]
fn send_all(&self, messages: Vec<LogMsg>) {
for sink in self.0.lock().iter() {
sink.send_all(messages.clone());
}
}
#[inline]
fn flush_blocking(&self, timeout: Duration) -> Result<(), SinkFlushError> {
let mut worst_result = Ok(());
for sink in self.0.lock().iter() {
if let Err(err) = sink.flush_blocking(timeout)
&& matches!(worst_result, Ok(()) | Err(SinkFlushError::Timeout))
{
worst_result = Err(err);
}
}
worst_result
}
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
Vec::new()
}
fn default_batcher_config(&self) -> ChunkBatcherConfig {
let ChunkBatcherConfig {
mut flush_tick,
mut flush_num_bytes,
mut flush_num_rows,
mut chunk_max_rows_if_unsorted,
mut max_commands_in_flight,
mut max_chunks_in_flight,
} = ChunkBatcherConfig::DEFAULT;
for sink in self.0.lock().iter() {
let config = sink.default_batcher_config();
flush_tick = flush_tick.max(config.flush_tick);
flush_num_bytes = flush_num_bytes.max(config.flush_num_bytes);
flush_num_rows = flush_num_rows.max(config.flush_num_rows);
chunk_max_rows_if_unsorted =
chunk_max_rows_if_unsorted.max(config.chunk_max_rows_if_unsorted);
max_commands_in_flight = max_commands_in_flight.max(config.max_commands_in_flight);
max_chunks_in_flight = max_chunks_in_flight.max(config.max_chunks_in_flight);
}
ChunkBatcherConfig {
flush_tick,
flush_num_bytes,
flush_num_rows,
chunk_max_rows_if_unsorted,
max_commands_in_flight,
max_chunks_in_flight,
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
mod private {
pub trait Sealed {}
}
pub trait MultiSinkCompatible: private::Sealed {}
pub trait IntoMultiSink {
fn into_multi_sink(self) -> MultiSink;
}
macro_rules! impl_multi_sink_tuple {
($($T:ident),*) => {
impl<$($T),*> IntoMultiSink for ($($T,)*)
where
$($T: LogSink + MultiSinkCompatible,)*
{
#[expect(non_snake_case)] #[inline]
fn into_multi_sink(self) -> MultiSink {
let ($($T,)*) = self;
MultiSink::new(vec![$(Box::new($T)),*])
}
}
};
}
impl_multi_sink_tuple!(A);
impl_multi_sink_tuple!(A, B);
impl_multi_sink_tuple!(A, B, C);
impl_multi_sink_tuple!(A, B, C, D);
impl_multi_sink_tuple!(A, B, C, D, E);
impl_multi_sink_tuple!(A, B, C, D, E, F);
impl IntoMultiSink for Vec<Box<dyn LogSink>> {
fn into_multi_sink(self) -> MultiSink {
MultiSink::new(self)
}
}
impl private::Sealed for crate::sink::FileSink {}
impl MultiSinkCompatible for crate::sink::FileSink {}
impl private::Sealed for crate::sink::GrpcSink {}
impl MultiSinkCompatible for crate::sink::GrpcSink {}
#[derive(Default)]
pub struct BufferedSink(parking_lot::Mutex<Vec<LogMsg>>);
impl Drop for BufferedSink {
fn drop(&mut self) {
for msg in self.0.lock().iter() {
if !matches!(msg, LogMsg::SetStoreInfo(_)) {
re_log::warn!("Dropping data in BufferedSink");
return;
}
}
}
}
impl BufferedSink {
#[inline]
pub fn new() -> Self {
Self::default()
}
}
impl LogSink for BufferedSink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
}
#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
}
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
std::mem::take(&mut self.0.lock())
}
#[inline]
fn flush_blocking(&self, _timeout: Duration) -> Result<(), SinkFlushError> {
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl fmt::Debug for BufferedSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BufferedSink {{ {} messages }}", self.0.lock().len())
}
}
pub struct MemorySink(MemorySinkStorage);
impl MemorySink {
#[inline]
pub fn new(rec: RecordingStream) -> Self {
Self(MemorySinkStorage::new(rec))
}
#[inline]
pub fn buffer(&self) -> MemorySinkStorage {
self.0.clone()
}
}
impl LogSink for MemorySink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.write().push(msg);
}
#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.write().append(&mut messages);
}
#[inline]
fn flush_blocking(&self, _timeout: Duration) -> Result<(), SinkFlushError> {
Ok(())
}
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
std::mem::take(&mut (self.0.write()))
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl fmt::Debug for MemorySink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MemorySink {{ {} messages }}", self.buffer().num_msgs())
}
}
#[derive(Default)]
struct MemorySinkStorageInner {
msgs: Vec<LogMsg>,
has_been_used: bool,
}
#[derive(Clone)]
pub struct MemorySinkStorage {
inner: Arc<Mutex<MemorySinkStorageInner>>,
pub(crate) rec: RecordingStream,
}
impl Drop for MemorySinkStorage {
fn drop(&mut self) {
let inner = self.inner.lock();
if !inner.has_been_used {
for msg in &inner.msgs {
if !matches!(msg, LogMsg::SetStoreInfo(_)) {
re_log::warn!("Dropping data in MemorySink");
return;
}
}
}
}
}
impl MemorySinkStorage {
fn new(rec: RecordingStream) -> Self {
Self {
inner: Default::default(),
rec,
}
}
#[inline]
fn write(&self) -> parking_lot::MappedMutexGuard<'_, Vec<LogMsg>> {
let mut inner = self.inner.lock();
inner.has_been_used = false;
parking_lot::MutexGuard::map(inner, |inner| &mut inner.msgs)
}
#[inline]
pub fn num_msgs(&self) -> usize {
self.rec.flush_blocking().ok();
self.inner.lock().msgs.len()
}
#[inline]
pub fn take(&self) -> Vec<LogMsg> {
self.rec.flush_blocking().ok();
std::mem::take(&mut (self.write()))
}
#[inline]
pub fn concat_memory_sinks_as_bytes(sinks: &[&Self]) -> Result<Vec<u8>, EncodeError> {
let mut encoder = Encoder::local()?;
for sink in sinks {
sink.rec.flush_blocking().ok();
let mut inner = sink.inner.lock();
inner.has_been_used = true;
for message in &inner.msgs {
encoder.append(message)?;
}
}
encoder.finish()?;
encoder.into_inner()
}
#[inline]
pub fn drain_as_bytes(&self) -> Result<Vec<u8>, EncodeError> {
self.rec.flush_blocking().ok();
let mut inner = self.inner.lock();
inner.has_been_used = true;
Encoder::encode(std::mem::take(&mut inner.msgs).into_iter().map(Ok))
}
#[inline]
pub fn store_id(&self) -> Option<StoreId> {
self.rec.store_info().map(|info| info.store_id.clone())
}
}
type LogMsgCallback = Box<dyn Fn(&[LogMsg]) + Send + Sync>;
pub struct CallbackSink {
callback: LogMsgCallback,
}
impl CallbackSink {
#[inline]
pub fn new<F>(callback: F) -> Self
where
F: Fn(&[LogMsg]) + Send + Sync + 'static,
{
Self {
callback: Box::new(callback),
}
}
}
impl LogSink for CallbackSink {
#[inline]
fn send(&self, msg: LogMsg) {
(self.callback)(&[msg]);
}
#[inline]
fn send_all(&self, messages: Vec<LogMsg>) {
(self.callback)(&messages[..]);
}
#[inline]
fn flush_blocking(&self, _timeout: Duration) -> Result<(), SinkFlushError> {
Ok(())
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
pub struct GrpcSink {
client: MessageProxyClient,
}
pub type GrpcSinkConnectionState = re_grpc_client::write::ClientConnectionState;
pub type GrpcSinkConnectionFailure = re_grpc_client::write::ClientConnectionFailure;
impl GrpcSink {
#[inline]
pub fn new(uri: re_uri::ProxyUri) -> Self {
Self {
client: MessageProxyClient::new(uri, Options::default()),
}
}
pub fn status(&self) -> GrpcSinkConnectionState {
self.client.status()
}
}
impl Default for GrpcSink {
fn default() -> Self {
use std::str::FromStr as _;
Self::new(
re_uri::ProxyUri::from_str(crate::DEFAULT_CONNECT_URL).expect("failed to parse uri"),
)
}
}
impl LogSink for GrpcSink {
fn send(&self, msg: LogMsg) {
self.client.send(msg);
}
fn flush_blocking(&self, timeout: Duration) -> Result<(), SinkFlushError> {
self.client
.flush_blocking(timeout)
.map_err(|err| match err {
GrpcFlushError::Timeout { .. } => SinkFlushError::Timeout,
err => SinkFlushError::failed(err.to_string()),
})
}
fn default_batcher_config(&self) -> ChunkBatcherConfig {
ChunkBatcherConfig::LOW_LATENCY
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}