aimdb_core/buffer/traits.rs
1//! Runtime-agnostic buffer traits
2//!
3//! Defines `Buffer<T>` (static trait) and `DynBuffer<T>` (trait object) for
4//! buffer implementations. Adapters (tokio, embassy) provide concrete types.
5//!
6//! See `aimdb-tokio-adapter` and `aimdb-embassy-adapter` for implementations.
7
8use core::future::Future;
9use core::pin::Pin;
10
11#[cfg(not(feature = "std"))]
12extern crate alloc;
13
14#[cfg(not(feature = "std"))]
15use alloc::boxed::Box;
16
17#[cfg(feature = "std")]
18use std::boxed::Box;
19
20use super::BufferCfg;
21use crate::DbError;
22
23/// Static buffer trait for concrete implementations
24///
25/// Provides push/subscribe operations for typed buffers. Readers are owned
26/// and can outlive the subscription call (required for spawned tasks).
27///
28/// Trait bounds ensure thread-safety and `'static` lifetime for async runtimes.
29///
30/// See `aimdb_tokio_adapter::TokioRingBuffer` for implementation example.
31pub trait Buffer<T: Clone + Send>: Send + Sync + 'static {
32 /// Reader type for consuming values
33 ///
34 /// Each `subscribe()` call returns an independent owned reader.
35 type Reader: BufferReader<T> + 'static;
36
37 /// Creates a new buffer with the given configuration
38 ///
39 /// # Panics
40 /// May panic if configuration is invalid (call `cfg.validate()` first)
41 fn new(cfg: &BufferCfg) -> Self
42 where
43 Self: Sized;
44
45 /// Push a value into the buffer (non-blocking)
46 ///
47 /// Behavior depends on buffer type:
48 /// - **SPMC Ring**: Overwrites oldest value if full
49 /// - **SingleLatest**: Overwrites previous value
50 /// - **Mailbox**: Overwrites pending value if not consumed
51 fn push(&self, value: T);
52
53 /// Create a new independent reader for a consumer
54 ///
55 /// Each reader maintains its own position and can consume at its own pace.
56 /// The returned reader is owned and can outlive this reference.
57 fn subscribe(&self) -> Self::Reader;
58}
59
60/// Dynamic buffer trait for trait objects (object-safe)
61///
62/// Type-erased interface for buffers that can be stored as trait objects.
63/// Automatically implemented for all `Buffer<T>` types via blanket impl.
64///
65/// Used when storing heterogeneous buffer types (e.g., in `TypedRecord`).
66pub trait DynBuffer<T: Clone + Send>: Send + Sync {
67 /// Push a value into the buffer (non-blocking)
68 fn push(&self, value: T);
69
70 /// Create a boxed reader for consuming values
71 ///
72 /// Returns a type-erased reader. Each reader maintains its own position.
73 fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send>;
74
75 /// Returns self as Any for downcasting to concrete buffer types
76 fn as_any(&self) -> &dyn core::any::Any;
77}
78
79/// Reader trait for consuming values from a buffer
80///
81/// All read operations are async. Each reader is independent with its own state.
82///
83/// # Error Handling
84/// - `Ok(value)` - Successfully received a value
85/// - `Err(BufferLagged)` - Missed messages (SPMC ring only, can continue)
86/// - `Err(BufferClosed)` - Buffer closed (graceful shutdown)
87pub trait BufferReader<T: Clone + Send>: Send {
88 /// Receive the next value (async)
89 ///
90 /// Waits for the next available value. Returns immediately if buffered.
91 ///
92 /// # Behavior by Buffer Type
93 /// - **SPMC Ring**: Returns next value, or `Lagged(n)` if fell behind
94 /// - **SingleLatest**: Waits for value change, returns most recent
95 /// - **Mailbox**: Waits for slot value, takes and clears it
96 fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>>;
97}
98
99/// Reader trait for consuming JSON-serialized values from a buffer (std only)
100///
101/// Type-erased reader that subscribes to a typed buffer and emits values as
102/// `serde_json::Value`. Used by remote access protocol for subscriptions.
103///
104/// This trait enables subscribing to a buffer without knowing the concrete type `T`
105/// at compile time, by serializing values to JSON on each `recv_json()` call.
106///
107/// # Requirements
108/// - Record must be configured with `.with_serialization()`
109/// - Only available with `std` feature (requires serde_json)
110///
111/// # Example
112/// ```rust,ignore
113/// // Internal use in remote access handler
114/// let json_reader: Box<dyn JsonBufferReader> = record.subscribe_json()?;
115/// while let Ok(json_val) = json_reader.recv_json().await {
116/// // Forward JSON value to remote client...
117/// }
118/// ```
119#[cfg(feature = "std")]
120pub trait JsonBufferReader: Send {
121 /// Receive the next value as JSON (async)
122 ///
123 /// Waits for the next value from the underlying buffer and serializes it to JSON.
124 ///
125 /// # Returns
126 /// - `Ok(JsonValue)` - Successfully received and serialized value
127 /// - `Err(BufferLagged)` - Missed messages (can continue reading)
128 /// - `Err(BufferClosed)` - Buffer closed (graceful shutdown)
129 /// - `Err(SerializationFailed)` - Failed to serialize value to JSON
130 fn recv_json(
131 &mut self,
132 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, DbError>> + Send + '_>>;
133}
134
135/// Blanket implementation of DynBuffer for all Buffer types
136impl<T, B> DynBuffer<T> for B
137where
138 T: Clone + Send + 'static,
139 B: Buffer<T>,
140{
141 fn push(&self, value: T) {
142 <Self as Buffer<T>>::push(self, value)
143 }
144
145 fn subscribe_boxed(&self) -> Box<dyn BufferReader<T> + Send> {
146 Box::new(self.subscribe())
147 }
148
149 fn as_any(&self) -> &dyn core::any::Any {
150 self
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 // Mock implementation for testing trait bounds
159 struct MockBuffer<T: Clone + Send + Sync> {
160 _phantom: core::marker::PhantomData<T>,
161 }
162
163 struct MockReader<T: Clone + Send> {
164 _phantom: core::marker::PhantomData<T>,
165 }
166
167 impl<T: Clone + Send + Sync + 'static> Buffer<T> for MockBuffer<T> {
168 type Reader = MockReader<T>;
169
170 fn new(_cfg: &BufferCfg) -> Self {
171 Self {
172 _phantom: core::marker::PhantomData,
173 }
174 }
175
176 fn push(&self, _value: T) {
177 // No-op for testing
178 }
179
180 fn subscribe(&self) -> Self::Reader {
181 MockReader {
182 _phantom: core::marker::PhantomData,
183 }
184 }
185 }
186
187 impl<T: Clone + Send> BufferReader<T> for MockReader<T> {
188 fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, DbError>> + Send + '_>> {
189 Box::pin(async {
190 // Return closed for testing
191 Err(DbError::BufferClosed {
192 #[cfg(feature = "std")]
193 buffer_name: "mock".to_string(),
194 #[cfg(not(feature = "std"))]
195 _buffer_name: (),
196 })
197 })
198 }
199 }
200
201 #[test]
202 fn test_buffer_trait_bounds() {
203 // Verify trait bounds compile
204 fn assert_send<T: Send>() {}
205 fn assert_sync<T: Sync>() {}
206
207 assert_send::<MockBuffer<i32>>();
208 assert_sync::<MockBuffer<i32>>();
209 assert_send::<MockReader<i32>>();
210 }
211
212 #[test]
213 fn test_dyn_buffer_blanket_impl() {
214 // Verify DynBuffer is automatically implemented
215 let buffer = MockBuffer::<i32> {
216 _phantom: core::marker::PhantomData,
217 };
218
219 // Should be able to use as DynBuffer
220 let _: &dyn DynBuffer<i32> = &buffer;
221 }
222}