use core::future::Future;
use core::pin::Pin;
#[cfg(not(feature = "std"))]
extern crate alloc;
#[cfg(not(feature = "std"))]
use alloc::boxed::Box;
#[cfg(feature = "std")]
use std::boxed::Box;
use super::BufferCfg;
use crate::DbError;
pub trait Buffer<T: Clone + Send>: Send + Sync + 'static {
type Reader: BufferReader<T> + 'static;
fn new(cfg: &BufferCfg) -> Self
where
Self: Sized;
fn push(&self, value: T);
fn subscribe(&self) -> Self::Reader;
}
pub trait DynBuffer<T: Clone + Send>: Send + Sync {
fn push(&self, value: T);
fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send>;
fn as_any(&self) -> &dyn core::any::Any;
#[cfg(feature = "metrics")]
fn metrics_snapshot(&self) -> Option<BufferMetricsSnapshot> {
None
}
}
pub trait BufferReader<T: Clone + Send>: Send {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>>;
fn try_recv(&mut self) -> Result<T, DbError>;
}
#[cfg(feature = "std")]
pub trait JsonBufferReader: Send {
fn recv_json(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, DbError>> + Send + '_>>;
fn try_recv_json(&mut self) -> Result<serde_json::Value, DbError>;
}
#[cfg(feature = "metrics")]
#[derive(Debug, Clone, Default)]
pub struct BufferMetricsSnapshot {
pub produced_count: u64,
pub consumed_count: u64,
pub dropped_count: u64,
pub occupancy: (usize, usize),
}
#[cfg(feature = "metrics")]
pub trait BufferMetrics {
fn metrics(&self) -> BufferMetricsSnapshot;
fn reset_metrics(&self);
}
#[cfg(test)]
mod tests {
use super::*;
struct MockBuffer<T: Clone + Send + Sync> {
_phantom: core::marker::PhantomData<T>,
}
struct MockReader<T: Clone + Send> {
_phantom: core::marker::PhantomData<T>,
}
impl<T: Clone + Send + Sync + 'static> Buffer<T> for MockBuffer<T> {
type Reader = MockReader<T>;
fn new(_cfg: &BufferCfg) -> Self {
Self {
_phantom: core::marker::PhantomData,
}
}
fn push(&self, _value: T) {
}
fn subscribe(&self) -> Self::Reader {
MockReader {
_phantom: core::marker::PhantomData,
}
}
}
impl<T: Clone + Send + Sync + 'static> DynBuffer<T> for MockBuffer<T> {
fn push(&self, value: T) {
<Self as Buffer<T>>::push(self, value)
}
fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send> {
Box::new(self.subscribe())
}
fn as_any(&self) -> &dyn core::any::Any {
self
}
#[cfg(feature = "metrics")]
fn metrics_snapshot(&self) -> Option<BufferMetricsSnapshot> {
None }
}
impl<T: Clone + Send> BufferReader<T> for MockReader<T> {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>> {
Box::pin(async {
Err(DbError::BufferClosed {
#[cfg(feature = "std")]
buffer_name: "mock".to_string(),
#[cfg(not(feature = "std"))]
_buffer_name: (),
})
})
}
fn try_recv(&mut self) -> Result<T, DbError> {
Err(DbError::BufferEmpty)
}
}
#[test]
fn test_buffer_trait_bounds() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<MockBuffer<i32>>();
assert_sync::<MockBuffer<i32>>();
assert_send::<MockReader<i32>>();
}
#[test]
fn test_dyn_buffer_impl() {
let buffer = MockBuffer::<i32> {
_phantom: core::marker::PhantomData,
};
let _: &dyn DynBuffer<i32> = &buffer;
}
}