Skip to main content

aimdb_core/buffer/
traits.rs

1//! Runtime-agnostic buffer traits
2//!
3//! Defines `Buffer<T>` (static trait) and `DynBuffer<T>` (trait object) for
4//! buffer implementations. Adapters (tokio, embassy) provide concrete types.
5//!
6//! See `aimdb-tokio-adapter` and `aimdb-embassy-adapter` for implementations.
7
8use core::future::Future;
9use core::pin::Pin;
10
11#[cfg(not(feature = "std"))]
12extern crate alloc;
13
14#[cfg(not(feature = "std"))]
15use alloc::boxed::Box;
16
17#[cfg(feature = "std")]
18use std::boxed::Box;
19
20use super::BufferCfg;
21use crate::DbError;
22
23/// Static buffer trait for concrete implementations
24///
25/// Provides push/subscribe operations for typed buffers. Readers are owned
26/// and can outlive the subscription call (required for spawned tasks).
27///
28/// Trait bounds ensure thread-safety and `'static` lifetime for async runtimes.
29///
30/// See `aimdb_tokio_adapter::TokioRingBuffer` for implementation example.
31pub trait Buffer<T: Clone + Send>: Send + Sync + 'static {
32    /// Reader type for consuming values
33    ///
34    /// Each `subscribe()` call returns an independent owned reader.
35    type Reader: BufferReader<T> + 'static;
36
37    /// Creates a new buffer with the given configuration
38    ///
39    /// # Panics
40    /// May panic if configuration is invalid (call `cfg.validate()` first)
41    fn new(cfg: &BufferCfg) -> Self
42    where
43        Self: Sized;
44
45    /// Push a value into the buffer (non-blocking)
46    ///
47    /// Behavior depends on buffer type:
48    /// - **SPMC Ring**: Overwrites oldest value if full
49    /// - **SingleLatest**: Overwrites previous value
50    /// - **Mailbox**: Overwrites pending value if not consumed
51    fn push(&self, value: T);
52
53    /// Create a new independent reader for a consumer
54    ///
55    /// Each reader maintains its own position and can consume at its own pace.
56    /// The returned reader is owned and can outlive this reference.
57    fn subscribe(&self) -> Self::Reader;
58}
59
60/// Dynamic buffer trait for trait objects (object-safe)
61///
62/// Type-erased interface for buffers that can be stored as trait objects.
63/// Automatically implemented for all `Buffer<T>` types via blanket impl.
64///
65/// Used when storing heterogeneous buffer types (e.g., in `TypedRecord`).
66pub trait DynBuffer<T: Clone + Send>: Send + Sync {
67    /// Push a value into the buffer (non-blocking)
68    fn push(&self, value: T);
69
70    /// Create a boxed reader for consuming values
71    ///
72    /// Returns a type-erased reader. Each reader maintains its own position.
73    fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send>;
74
75    /// Returns self as Any for downcasting to concrete buffer types
76    fn as_any(&self) -> &dyn core::any::Any;
77
78    /// Get buffer metrics snapshot (metrics feature only)
79    ///
80    /// Returns `Some(snapshot)` if the buffer implementation supports metrics,
81    /// `None` otherwise. Default implementation returns `None`.
82    #[cfg(feature = "metrics")]
83    fn metrics_snapshot(&self) -> Option<BufferMetricsSnapshot> {
84        None
85    }
86
87    /// Reset buffer metrics counters (metrics feature only)
88    ///
89    /// Default implementation is a no-op so buffers without metrics support are
90    /// safe to call. Implementations that track counters should override this
91    /// to zero them.
92    #[cfg(feature = "metrics")]
93    fn reset_metrics(&self) {}
94}
95
96/// Reader trait for consuming values from a buffer
97///
98/// All read operations are async. Each reader is independent with its own state.
99///
100/// # Error Handling
101/// - `Ok(value)` - Successfully received a value
102/// - `Err(BufferLagged)` - Missed messages (SPMC ring only, can continue)
103/// - `Err(BufferClosed)` - Buffer closed (graceful shutdown)
104pub trait BufferReader<T: Clone + Send>: Send {
105    /// Receive the next value (async)
106    ///
107    /// Waits for the next available value. Returns immediately if buffered.
108    ///
109    /// # Behavior by Buffer Type
110    /// - **SPMC Ring**: Returns next value, or `Lagged(n)` if fell behind
111    /// - **SingleLatest**: Waits for value change, returns most recent
112    /// - **Mailbox**: Waits for slot value, takes and clears it
113    fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>>;
114
115    /// Non-blocking receive — returns immediately.
116    ///
117    /// Returns `Err(DbError::BufferEmpty)` if no pending values.
118    ///
119    /// # Behavior by Buffer Type
120    /// - **SPMC Ring**: Returns next buffered value, or `BufferEmpty` if caught up
121    /// - **SingleLatest**: Returns value if changed since last read, or `BufferEmpty`
122    /// - **Mailbox**: Takes and returns slot value, or `BufferEmpty` if empty
123    fn try_recv(&mut self) -> Result<T, DbError>;
124}
125
126/// Reader trait for consuming JSON-serialized values from a buffer (std only)
127///
128/// Type-erased reader that subscribes to a typed buffer and emits values as
129/// `serde_json::Value`. Used by remote access protocol for subscriptions.
130///
131/// This trait enables subscribing to a buffer without knowing the concrete type `T`
132/// at compile time, by serializing values to JSON on each `recv_json()` call.
133///
134/// # Requirements
135/// - Record must be configured with `.with_remote_access()`
136/// - Only available with `std` feature (requires serde_json)
137///
138/// # Example
139/// ```rust,ignore
140/// // Internal use in remote access handler
141/// let json_reader: Box<dyn JsonBufferReader> = record.subscribe_json()?;
142/// while let Ok(json_val) = json_reader.recv_json().await {
143///     // Forward JSON value to remote client...
144/// }
145/// ```
146#[cfg(feature = "std")]
147pub trait JsonBufferReader: Send {
148    /// Receive the next value as JSON (async)
149    ///
150    /// Waits for the next value from the underlying buffer and serializes it to JSON.
151    ///
152    /// # Returns
153    /// - `Ok(JsonValue)` - Successfully received and serialized value
154    /// - `Err(BufferLagged)` - Missed messages (can continue reading)
155    /// - `Err(BufferClosed)` - Buffer closed (graceful shutdown)
156    /// - `Err(SerializationFailed)` - Failed to serialize value to JSON
157    fn recv_json(
158        &mut self,
159    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, DbError>> + Send + '_>>;
160
161    /// Non-blocking receive as JSON — returns immediately.
162    ///
163    /// Returns `Err(DbError::BufferEmpty)` if no pending values.
164    fn try_recv_json(&mut self) -> Result<serde_json::Value, DbError>;
165}
166
167/// Snapshot of buffer metrics at a point in time
168///
169/// Used for introspection and diagnostics. All counters are monotonically
170/// increasing (except after reset).
171#[cfg(feature = "metrics")]
172#[derive(Debug, Clone, Default)]
173pub struct BufferMetricsSnapshot {
174    /// Total items pushed to this buffer since creation
175    pub produced_count: u64,
176
177    /// Total items successfully consumed from this buffer (aggregate across all readers)
178    pub consumed_count: u64,
179
180    /// Total items dropped due to overflow/lag (SPMC ring only)
181    ///
182    /// **Note**: When multiple readers lag simultaneously on a broadcast buffer,
183    /// each reader reports its own dropped count independently. This means the
184    /// aggregate dropped_count may exceed the actual number of unique items that
185    /// overflowed from the ring buffer (each lagged reader adds its own lag count).
186    /// This is intentional: it reflects total "missed reads" across all consumers,
187    /// which is useful for diagnosing per-consumer backpressure issues.
188    pub dropped_count: u64,
189
190    /// Current buffer occupancy: (items_in_buffer, capacity)
191    /// Returns (0, 0) for SingleLatest/Mailbox where occupancy is not meaningful
192    pub occupancy: (usize, usize),
193}
194
195/// Optional buffer metrics for introspection (std only, feature-gated)
196///
197/// Implemented by buffer types when the `metrics` feature is enabled.
198/// Provides counters for diagnosing producer-consumer imbalances.
199///
200/// # Example
201/// ```rust,ignore
202/// use aimdb_core::buffer::BufferMetrics;
203///
204/// // After enabling `metrics` feature
205/// let metrics = buffer.metrics();
206/// if metrics.produced_count > metrics.consumed_count + 1000 {
207///     println!("Warning: consumer is {} items behind",
208///              metrics.produced_count - metrics.consumed_count);
209/// }
210/// if metrics.dropped_count > 0 {
211///     println!("Warning: {} items dropped due to overflow", metrics.dropped_count);
212/// }
213/// ```
214#[cfg(feature = "metrics")]
215pub trait BufferMetrics {
216    /// Get a snapshot of current buffer metrics
217    ///
218    /// Returns counters for produced, consumed, and dropped items,
219    /// plus current buffer occupancy.
220    fn metrics(&self) -> BufferMetricsSnapshot;
221
222    /// Reset all metrics counters to zero
223    ///
224    /// Useful for windowed metrics collection. Note that this affects
225    /// all observers of this buffer's metrics.
226    fn reset_metrics(&self);
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232
233    // Mock implementation for testing trait bounds
234    struct MockBuffer<T: Clone + Send + Sync> {
235        _phantom: core::marker::PhantomData<T>,
236    }
237
238    struct MockReader<T: Clone + Send> {
239        _phantom: core::marker::PhantomData<T>,
240    }
241
242    impl<T: Clone + Send + Sync + 'static> Buffer<T> for MockBuffer<T> {
243        type Reader = MockReader<T>;
244
245        fn new(_cfg: &BufferCfg) -> Self {
246            Self {
247                _phantom: core::marker::PhantomData,
248            }
249        }
250
251        fn push(&self, _value: T) {
252            // No-op for testing
253        }
254
255        fn subscribe(&self) -> Self::Reader {
256            MockReader {
257                _phantom: core::marker::PhantomData,
258            }
259        }
260    }
261
262    // Explicit DynBuffer implementation for MockBuffer
263    // (no blanket impl - adapters provide their own)
264    impl<T: Clone + Send + Sync + 'static> DynBuffer<T> for MockBuffer<T> {
265        fn push(&self, value: T) {
266            <Self as Buffer<T>>::push(self, value)
267        }
268
269        fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send> {
270            Box::new(self.subscribe())
271        }
272
273        fn as_any(&self) -> &dyn core::any::Any {
274            self
275        }
276
277        #[cfg(feature = "metrics")]
278        fn metrics_snapshot(&self) -> Option<BufferMetricsSnapshot> {
279            None // Mock doesn't track metrics
280        }
281    }
282
283    impl<T: Clone + Send> BufferReader<T> for MockReader<T> {
284        fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>> {
285            Box::pin(async {
286                // Return closed for testing
287                Err(DbError::BufferClosed {
288                    #[cfg(feature = "std")]
289                    buffer_name: "mock".to_string(),
290                    #[cfg(not(feature = "std"))]
291                    _buffer_name: (),
292                })
293            })
294        }
295
296        fn try_recv(&mut self) -> Result<T, DbError> {
297            Err(DbError::BufferEmpty)
298        }
299    }
300
301    #[test]
302    fn test_buffer_trait_bounds() {
303        // Verify trait bounds compile
304        fn assert_send<T: Send>() {}
305        fn assert_sync<T: Sync>() {}
306
307        assert_send::<MockBuffer<i32>>();
308        assert_sync::<MockBuffer<i32>>();
309        assert_send::<MockReader<i32>>();
310    }
311
312    #[test]
313    fn test_dyn_buffer_impl() {
314        // Verify DynBuffer can be used as trait object
315        let buffer = MockBuffer::<i32> {
316            _phantom: core::marker::PhantomData,
317        };
318
319        // Should be able to use as DynBuffer
320        let _: &dyn DynBuffer<i32> = &buffer;
321    }
322}