aimdb_core/buffer/traits.rs
1//! Runtime-agnostic buffer traits
2//!
3//! Defines `Buffer<T>` (static trait) and `DynBuffer<T>` (trait object) for
4//! buffer implementations. Adapters (tokio, embassy) provide concrete types.
5//!
6//! See `aimdb-tokio-adapter` and `aimdb-embassy-adapter` for implementations.
7
8use core::future::Future;
9use core::pin::Pin;
10
11#[cfg(not(feature = "std"))]
12extern crate alloc;
13
14#[cfg(not(feature = "std"))]
15use alloc::boxed::Box;
16
17#[cfg(feature = "std")]
18use std::boxed::Box;
19
20use super::BufferCfg;
21use crate::DbError;
22
23/// Static buffer trait for concrete implementations
24///
25/// Provides push/subscribe operations for typed buffers. Readers are owned
26/// and can outlive the subscription call (required for spawned tasks).
27///
28/// Trait bounds ensure thread-safety and `'static` lifetime for async runtimes.
29///
30/// See `aimdb_tokio_adapter::TokioRingBuffer` for implementation example.
31pub trait Buffer<T: Clone + Send>: Send + Sync + 'static {
32 /// Reader type for consuming values
33 ///
34 /// Each `subscribe()` call returns an independent owned reader.
35 type Reader: BufferReader<T> + 'static;
36
37 /// Creates a new buffer with the given configuration
38 ///
39 /// # Panics
40 /// May panic if configuration is invalid (call `cfg.validate()` first)
41 fn new(cfg: &BufferCfg) -> Self
42 where
43 Self: Sized;
44
45 /// Push a value into the buffer (non-blocking)
46 ///
47 /// Behavior depends on buffer type:
48 /// - **SPMC Ring**: Overwrites oldest value if full
49 /// - **SingleLatest**: Overwrites previous value
50 /// - **Mailbox**: Overwrites pending value if not consumed
51 fn push(&self, value: T);
52
53 /// Create a new independent reader for a consumer
54 ///
55 /// Each reader maintains its own position and can consume at its own pace.
56 /// The returned reader is owned and can outlive this reference.
57 fn subscribe(&self) -> Self::Reader;
58}
59
60/// Dynamic buffer trait for trait objects (object-safe)
61///
62/// Type-erased interface for buffers that can be stored as trait objects.
63/// Automatically implemented for all `Buffer<T>` types via blanket impl.
64///
65/// Used when storing heterogeneous buffer types (e.g., in `TypedRecord`).
66pub trait DynBuffer<T: Clone + Send>: Send + Sync {
67 /// Push a value into the buffer (non-blocking)
68 fn push(&self, value: T);
69
70 /// Create a boxed reader for consuming values
71 ///
72 /// Returns a type-erased reader. Each reader maintains its own position.
73 fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send>;
74
75 /// Returns self as Any for downcasting to concrete buffer types
76 fn as_any(&self) -> &dyn core::any::Any;
77
78 /// Get buffer metrics snapshot (metrics feature only)
79 ///
80 /// Returns `Some(snapshot)` if the buffer implementation supports metrics,
81 /// `None` otherwise. Default implementation returns `None`.
82 #[cfg(feature = "metrics")]
83 fn metrics_snapshot(&self) -> Option<BufferMetricsSnapshot> {
84 None
85 }
86}
87
88/// Reader trait for consuming values from a buffer
89///
90/// All read operations are async. Each reader is independent with its own state.
91///
92/// # Error Handling
93/// - `Ok(value)` - Successfully received a value
94/// - `Err(BufferLagged)` - Missed messages (SPMC ring only, can continue)
95/// - `Err(BufferClosed)` - Buffer closed (graceful shutdown)
96pub trait BufferReader<T: Clone + Send>: Send {
97 /// Receive the next value (async)
98 ///
99 /// Waits for the next available value. Returns immediately if buffered.
100 ///
101 /// # Behavior by Buffer Type
102 /// - **SPMC Ring**: Returns next value, or `Lagged(n)` if fell behind
103 /// - **SingleLatest**: Waits for value change, returns most recent
104 /// - **Mailbox**: Waits for slot value, takes and clears it
105 fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>>;
106
107 /// Non-blocking receive — returns immediately.
108 ///
109 /// Returns `Err(DbError::BufferEmpty)` if no pending values.
110 ///
111 /// # Behavior by Buffer Type
112 /// - **SPMC Ring**: Returns next buffered value, or `BufferEmpty` if caught up
113 /// - **SingleLatest**: Returns value if changed since last read, or `BufferEmpty`
114 /// - **Mailbox**: Takes and returns slot value, or `BufferEmpty` if empty
115 fn try_recv(&mut self) -> Result<T, DbError>;
116}
117
118/// Reader trait for consuming JSON-serialized values from a buffer (std only)
119///
120/// Type-erased reader that subscribes to a typed buffer and emits values as
121/// `serde_json::Value`. Used by remote access protocol for subscriptions.
122///
123/// This trait enables subscribing to a buffer without knowing the concrete type `T`
124/// at compile time, by serializing values to JSON on each `recv_json()` call.
125///
126/// # Requirements
127/// - Record must be configured with `.with_remote_access()`
128/// - Only available with `std` feature (requires serde_json)
129///
130/// # Example
131/// ```rust,ignore
132/// // Internal use in remote access handler
133/// let json_reader: Box<dyn JsonBufferReader> = record.subscribe_json()?;
134/// while let Ok(json_val) = json_reader.recv_json().await {
135/// // Forward JSON value to remote client...
136/// }
137/// ```
138#[cfg(feature = "std")]
139pub trait JsonBufferReader: Send {
140 /// Receive the next value as JSON (async)
141 ///
142 /// Waits for the next value from the underlying buffer and serializes it to JSON.
143 ///
144 /// # Returns
145 /// - `Ok(JsonValue)` - Successfully received and serialized value
146 /// - `Err(BufferLagged)` - Missed messages (can continue reading)
147 /// - `Err(BufferClosed)` - Buffer closed (graceful shutdown)
148 /// - `Err(SerializationFailed)` - Failed to serialize value to JSON
149 fn recv_json(
150 &mut self,
151 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, DbError>> + Send + '_>>;
152
153 /// Non-blocking receive as JSON — returns immediately.
154 ///
155 /// Returns `Err(DbError::BufferEmpty)` if no pending values.
156 fn try_recv_json(&mut self) -> Result<serde_json::Value, DbError>;
157}
158
159/// Snapshot of buffer metrics at a point in time
160///
161/// Used for introspection and diagnostics. All counters are monotonically
162/// increasing (except after reset).
163#[cfg(feature = "metrics")]
164#[derive(Debug, Clone, Default)]
165pub struct BufferMetricsSnapshot {
166 /// Total items pushed to this buffer since creation
167 pub produced_count: u64,
168
169 /// Total items successfully consumed from this buffer (aggregate across all readers)
170 pub consumed_count: u64,
171
172 /// Total items dropped due to overflow/lag (SPMC ring only)
173 ///
174 /// **Note**: When multiple readers lag simultaneously on a broadcast buffer,
175 /// each reader reports its own dropped count independently. This means the
176 /// aggregate dropped_count may exceed the actual number of unique items that
177 /// overflowed from the ring buffer (each lagged reader adds its own lag count).
178 /// This is intentional: it reflects total "missed reads" across all consumers,
179 /// which is useful for diagnosing per-consumer backpressure issues.
180 pub dropped_count: u64,
181
182 /// Current buffer occupancy: (items_in_buffer, capacity)
183 /// Returns (0, 0) for SingleLatest/Mailbox where occupancy is not meaningful
184 pub occupancy: (usize, usize),
185}
186
187/// Optional buffer metrics for introspection (std only, feature-gated)
188///
189/// Implemented by buffer types when the `metrics` feature is enabled.
190/// Provides counters for diagnosing producer-consumer imbalances.
191///
192/// # Example
193/// ```rust,ignore
194/// use aimdb_core::buffer::BufferMetrics;
195///
196/// // After enabling `metrics` feature
197/// let metrics = buffer.metrics();
198/// if metrics.produced_count > metrics.consumed_count + 1000 {
199/// println!("Warning: consumer is {} items behind",
200/// metrics.produced_count - metrics.consumed_count);
201/// }
202/// if metrics.dropped_count > 0 {
203/// println!("Warning: {} items dropped due to overflow", metrics.dropped_count);
204/// }
205/// ```
206#[cfg(feature = "metrics")]
207pub trait BufferMetrics {
208 /// Get a snapshot of current buffer metrics
209 ///
210 /// Returns counters for produced, consumed, and dropped items,
211 /// plus current buffer occupancy.
212 fn metrics(&self) -> BufferMetricsSnapshot;
213
214 /// Reset all metrics counters to zero
215 ///
216 /// Useful for windowed metrics collection. Note that this affects
217 /// all observers of this buffer's metrics.
218 fn reset_metrics(&self);
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 // Mock implementation for testing trait bounds
226 struct MockBuffer<T: Clone + Send + Sync> {
227 _phantom: core::marker::PhantomData<T>,
228 }
229
230 struct MockReader<T: Clone + Send> {
231 _phantom: core::marker::PhantomData<T>,
232 }
233
234 impl<T: Clone + Send + Sync + 'static> Buffer<T> for MockBuffer<T> {
235 type Reader = MockReader<T>;
236
237 fn new(_cfg: &BufferCfg) -> Self {
238 Self {
239 _phantom: core::marker::PhantomData,
240 }
241 }
242
243 fn push(&self, _value: T) {
244 // No-op for testing
245 }
246
247 fn subscribe(&self) -> Self::Reader {
248 MockReader {
249 _phantom: core::marker::PhantomData,
250 }
251 }
252 }
253
254 // Explicit DynBuffer implementation for MockBuffer
255 // (no blanket impl - adapters provide their own)
256 impl<T: Clone + Send + Sync + 'static> DynBuffer<T> for MockBuffer<T> {
257 fn push(&self, value: T) {
258 <Self as Buffer<T>>::push(self, value)
259 }
260
261 fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send> {
262 Box::new(self.subscribe())
263 }
264
265 fn as_any(&self) -> &dyn core::any::Any {
266 self
267 }
268
269 #[cfg(feature = "metrics")]
270 fn metrics_snapshot(&self) -> Option<BufferMetricsSnapshot> {
271 None // Mock doesn't track metrics
272 }
273 }
274
275 impl<T: Clone + Send> BufferReader<T> for MockReader<T> {
276 fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>> {
277 Box::pin(async {
278 // Return closed for testing
279 Err(DbError::BufferClosed {
280 #[cfg(feature = "std")]
281 buffer_name: "mock".to_string(),
282 #[cfg(not(feature = "std"))]
283 _buffer_name: (),
284 })
285 })
286 }
287
288 fn try_recv(&mut self) -> Result<T, DbError> {
289 Err(DbError::BufferEmpty)
290 }
291 }
292
293 #[test]
294 fn test_buffer_trait_bounds() {
295 // Verify trait bounds compile
296 fn assert_send<T: Send>() {}
297 fn assert_sync<T: Sync>() {}
298
299 assert_send::<MockBuffer<i32>>();
300 assert_sync::<MockBuffer<i32>>();
301 assert_send::<MockReader<i32>>();
302 }
303
304 #[test]
305 fn test_dyn_buffer_impl() {
306 // Verify DynBuffer can be used as trait object
307 let buffer = MockBuffer::<i32> {
308 _phantom: core::marker::PhantomData,
309 };
310
311 // Should be able to use as DynBuffer
312 let _: &dyn DynBuffer<i32> = &buffer;
313 }
314}