aimdb_core/buffer/
traits.rs

1//! Runtime-agnostic buffer traits
2//!
3//! Defines `Buffer<T>` (static trait) and `DynBuffer<T>` (trait object) for
4//! buffer implementations. Adapters (tokio, embassy) provide concrete types.
5//!
6//! See `aimdb-tokio-adapter` and `aimdb-embassy-adapter` for implementations.
7
8use core::future::Future;
9use core::pin::Pin;
10
11#[cfg(not(feature = "std"))]
12extern crate alloc;
13
14#[cfg(not(feature = "std"))]
15use alloc::boxed::Box;
16
17#[cfg(feature = "std")]
18use std::boxed::Box;
19
20use super::BufferCfg;
21use crate::DbError;
22
23/// Static buffer trait for concrete implementations
24///
25/// Provides push/subscribe operations for typed buffers. Readers are owned
26/// and can outlive the subscription call (required for spawned tasks).
27///
28/// Trait bounds ensure thread-safety and `'static` lifetime for async runtimes.
29///
30/// See `aimdb_tokio_adapter::TokioRingBuffer` for implementation example.
31pub trait Buffer<T: Clone + Send>: Send + Sync + 'static {
32    /// Reader type for consuming values
33    ///
34    /// Each `subscribe()` call returns an independent owned reader.
35    type Reader: BufferReader<T> + 'static;
36
37    /// Creates a new buffer with the given configuration
38    ///
39    /// # Panics
40    /// May panic if configuration is invalid (call `cfg.validate()` first)
41    fn new(cfg: &BufferCfg) -> Self
42    where
43        Self: Sized;
44
45    /// Push a value into the buffer (non-blocking)
46    ///
47    /// Behavior depends on buffer type:
48    /// - **SPMC Ring**: Overwrites oldest value if full
49    /// - **SingleLatest**: Overwrites previous value
50    /// - **Mailbox**: Overwrites pending value if not consumed
51    fn push(&self, value: T);
52
53    /// Create a new independent reader for a consumer
54    ///
55    /// Each reader maintains its own position and can consume at its own pace.
56    /// The returned reader is owned and can outlive this reference.
57    fn subscribe(&self) -> Self::Reader;
58}
59
60/// Dynamic buffer trait for trait objects (object-safe)
61///
62/// Type-erased interface for buffers that can be stored as trait objects.
63/// Automatically implemented for all `Buffer<T>` types via blanket impl.
64///
65/// Used when storing heterogeneous buffer types (e.g., in `TypedRecord`).
66pub trait DynBuffer<T: Clone + Send>: Send + Sync {
67    /// Push a value into the buffer (non-blocking)
68    fn push(&self, value: T);
69
70    /// Create a boxed reader for consuming values
71    ///
72    /// Returns a type-erased reader. Each reader maintains its own position.
73    fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send>;
74
75    /// Returns self as Any for downcasting to concrete buffer types
76    fn as_any(&self) -> &dyn core::any::Any;
77}
78
79/// Reader trait for consuming values from a buffer
80///
81/// All read operations are async. Each reader is independent with its own state.
82///
83/// # Error Handling
84/// - `Ok(value)` - Successfully received a value
85/// - `Err(BufferLagged)` - Missed messages (SPMC ring only, can continue)
86/// - `Err(BufferClosed)` - Buffer closed (graceful shutdown)
87pub trait BufferReader<T: Clone + Send>: Send {
88    /// Receive the next value (async)
89    ///
90    /// Waits for the next available value. Returns immediately if buffered.
91    ///
92    /// # Behavior by Buffer Type
93    /// - **SPMC Ring**: Returns next value, or `Lagged(n)` if fell behind
94    /// - **SingleLatest**: Waits for value change, returns most recent
95    /// - **Mailbox**: Waits for slot value, takes and clears it
96    fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>>;
97}
98
99/// Reader trait for consuming JSON-serialized values from a buffer (std only)
100///
101/// Type-erased reader that subscribes to a typed buffer and emits values as
102/// `serde_json::Value`. Used by remote access protocol for subscriptions.
103///
104/// This trait enables subscribing to a buffer without knowing the concrete type `T`
105/// at compile time, by serializing values to JSON on each `recv_json()` call.
106///
107/// # Requirements
108/// - Record must be configured with `.with_serialization()`
109/// - Only available with `std` feature (requires serde_json)
110///
111/// # Example
112/// ```rust,ignore
113/// // Internal use in remote access handler
114/// let json_reader: Box<dyn JsonBufferReader> = record.subscribe_json()?;
115/// while let Ok(json_val) = json_reader.recv_json().await {
116///     // Forward JSON value to remote client...
117/// }
118/// ```
119#[cfg(feature = "std")]
120pub trait JsonBufferReader: Send {
121    /// Receive the next value as JSON (async)
122    ///
123    /// Waits for the next value from the underlying buffer and serializes it to JSON.
124    ///
125    /// # Returns
126    /// - `Ok(JsonValue)` - Successfully received and serialized value
127    /// - `Err(BufferLagged)` - Missed messages (can continue reading)
128    /// - `Err(BufferClosed)` - Buffer closed (graceful shutdown)
129    /// - `Err(SerializationFailed)` - Failed to serialize value to JSON
130    fn recv_json(
131        &mut self,
132    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, DbError>> + Send + '_>>;
133}
134
135/// Blanket implementation of DynBuffer for all Buffer types
136impl<T, B> DynBuffer<T> for B
137where
138    T: Clone + Send + 'static,
139    B: Buffer<T>,
140{
141    fn push(&self, value: T) {
142        <Self as Buffer<T>>::push(self, value)
143    }
144
145    fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send> {
146        Box::new(self.subscribe())
147    }
148
149    fn as_any(&self) -> &dyn core::any::Any {
150        self
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    // Mock implementation for testing trait bounds
159    struct MockBuffer<T: Clone + Send + Sync> {
160        _phantom: core::marker::PhantomData<T>,
161    }
162
163    struct MockReader<T: Clone + Send> {
164        _phantom: core::marker::PhantomData<T>,
165    }
166
167    impl<T: Clone + Send + Sync + 'static> Buffer<T> for MockBuffer<T> {
168        type Reader = MockReader<T>;
169
170        fn new(_cfg: &BufferCfg) -> Self {
171            Self {
172                _phantom: core::marker::PhantomData,
173            }
174        }
175
176        fn push(&self, _value: T) {
177            // No-op for testing
178        }
179
180        fn subscribe(&self) -> Self::Reader {
181            MockReader {
182                _phantom: core::marker::PhantomData,
183            }
184        }
185    }
186
187    impl<T: Clone + Send> BufferReader<T> for MockReader<T> {
188        fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>> {
189            Box::pin(async {
190                // Return closed for testing
191                Err(DbError::BufferClosed {
192                    #[cfg(feature = "std")]
193                    buffer_name: "mock".to_string(),
194                    #[cfg(not(feature = "std"))]
195                    _buffer_name: (),
196                })
197            })
198        }
199    }
200
201    #[test]
202    fn test_buffer_trait_bounds() {
203        // Verify trait bounds compile
204        fn assert_send<T: Send>() {}
205        fn assert_sync<T: Sync>() {}
206
207        assert_send::<MockBuffer<i32>>();
208        assert_sync::<MockBuffer<i32>>();
209        assert_send::<MockReader<i32>>();
210    }
211
212    #[test]
213    fn test_dyn_buffer_blanket_impl() {
214        // Verify DynBuffer is automatically implemented
215        let buffer = MockBuffer::<i32> {
216            _phantom: core::marker::PhantomData,
217        };
218
219        // Should be able to use as DynBuffer
220        let _: &dyn DynBuffer<i32> = &buffer;
221    }
222}