aimdb_sync/
consumer.rs

1//! Synchronous consumer for typed records.
2
3use aimdb_core::{DbError, DbResult};
4use std::fmt::Debug;
5use std::sync::mpsc;
6use std::sync::{Arc, Mutex};
7use std::time::Duration;
8
9/// Synchronous consumer for records of type `T`.
10///
11/// Thread-safe, can be cloned and shared across threads.
12/// Each clone receives data independently according to buffer semantics (SPMC, etc.).
13///
14/// # Thread Safety
15///
16/// Multiple clones of `SyncConsumer<T>` can be used concurrently from
17/// different threads. Each receives data independently based on the
18/// configured buffer type (SPMC, SingleLatest, etc.).
19///
20/// # Example
21///
22/// ```rust,ignore
23/// # use aimdb_sync::*;
24/// # use serde::{Serialize, Deserialize};
25/// # #[derive(Debug, Clone, Serialize, Deserialize)]
26/// # struct Temperature { celsius: f32 }
27/// # fn example(consumer: &SyncConsumer<Temperature>) -> Result<(), Box<dyn std::error::Error>> {
28/// // Get value (blocks until available)
29/// let temp = consumer.get()?;
30/// println!("Temperature: {}°C", temp.celsius);
31///
32/// // Get with timeout
33/// use std::time::Duration;
34/// match consumer.get_with_timeout(Duration::from_millis(100)) {
35///     Ok(temp) => println!("Got: {}°C", temp.celsius),
36///     Err(_) => println!("No data available"),
37/// }
38///
39/// // Try to get (non-blocking)
40/// match consumer.try_get() {
41///     Ok(temp) => println!("Got: {}°C", temp.celsius),
42///     Err(_) => println!("No data yet"),
43/// }
44/// # Ok(())
45/// # }
46/// ```
47pub struct SyncConsumer<T>
48where
49    T: Send + Sync + 'static + Debug + Clone,
50{
51    /// Channel receiver for consumer data
52    /// Wrapped in Arc<Mutex> so it can be shared but only one thread receives at a time
53    rx: Arc<Mutex<mpsc::Receiver<T>>>,
54}
55
56impl<T> SyncConsumer<T>
57where
58    T: Send + Sync + 'static + Debug + Clone,
59{
60    /// Create a new sync consumer (internal use only)
61    pub(crate) fn new(rx: mpsc::Receiver<T>) -> Self {
62        Self {
63            rx: Arc::new(Mutex::new(rx)),
64        }
65    }
66
67    /// Get a value, blocking until one is available.
68    ///
69    /// Blocks indefinitely until a value is available from the
70    /// runtime thread.
71    ///
72    /// # Returns
73    ///
74    /// The next available record of type `T`.
75    ///
76    /// # Errors
77    ///
78    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
79    ///
80    /// # Example
81    ///
82    /// ```no_run
83    /// use aimdb_core::AimDbBuilder;
84    /// use aimdb_sync::AimDbBuilderSyncExt;
85    /// use aimdb_tokio_adapter::TokioAdapter;
86    /// use std::sync::Arc;
87    ///
88    /// # #[derive(Debug, Clone)]
89    /// # struct MyData { value: i32 }
90    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
91    /// let handle = AimDbBuilder::new()
92    ///     .runtime(Arc::new(TokioAdapter))
93    ///     .attach()?;
94    /// let consumer = handle.consumer::<MyData>()?;
95    /// let data = consumer.get()?; // blocks until value available
96    /// println!("Got: {:?}", data);
97    /// # Ok(())
98    /// # }
99    /// ```
100    pub fn get(&self) -> DbResult<T> {
101        let rx = self.rx.lock().unwrap();
102        rx.recv().map_err(|_| DbError::RuntimeShutdown)
103    }
104
105    /// Get a value with a timeout.
106    ///
107    /// Blocks until a value is available or the timeout expires.
108    ///
109    /// # Arguments
110    ///
111    /// - `timeout`: Maximum time to wait
112    ///
113    /// # Errors
114    ///
115    /// - `DbError::GetTimeout` if the timeout expires
116    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
117    ///
118    /// # Example
119    ///
120    /// ```no_run
121    /// use aimdb_core::AimDbBuilder;
122    /// use aimdb_sync::AimDbBuilderSyncExt;
123    /// use aimdb_tokio_adapter::TokioAdapter;
124    /// use std::sync::Arc;
125    /// use std::time::Duration;
126    ///
127    /// # #[derive(Debug, Clone)]
128    /// # struct MyData { value: i32 }
129    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
130    /// let handle = AimDbBuilder::new()
131    ///     .runtime(Arc::new(TokioAdapter))
132    ///     .attach()?;
133    /// let consumer = handle.consumer::<MyData>()?;
134    /// match consumer.get_with_timeout(Duration::from_millis(100)) {
135    ///     Ok(data) => println!("Got: {:?}", data),
136    ///     Err(_) => println!("No data available"),
137    /// }
138    /// # Ok(())
139    /// # }
140    /// ```
141    pub fn get_with_timeout(&self, timeout: Duration) -> DbResult<T> {
142        let rx = self.rx.lock().unwrap();
143        rx.recv_timeout(timeout).map_err(|e| match e {
144            mpsc::RecvTimeoutError::Timeout => DbError::GetTimeout,
145            mpsc::RecvTimeoutError::Disconnected => DbError::RuntimeShutdown,
146        })
147    }
148
149    /// Try to get a value without blocking.
150    ///
151    /// Returns immediately with either a value or an error if
152    /// no data is available.
153    ///
154    /// # Errors
155    ///
156    /// - `DbError::GetTimeout` if no data is available (non-blocking)
157    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
158    ///
159    /// # Example
160    ///
161    /// ```no_run
162    /// use aimdb_core::AimDbBuilder;
163    /// use aimdb_sync::AimDbBuilderSyncExt;
164    /// use aimdb_tokio_adapter::TokioAdapter;
165    /// use std::sync::Arc;
166    ///
167    /// # #[derive(Debug, Clone)]
168    /// # struct MyData { value: i32 }
169    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
170    /// let handle = AimDbBuilder::new()
171    ///     .runtime(Arc::new(TokioAdapter))
172    ///     .attach()?;
173    /// let consumer = handle.consumer::<MyData>()?;
174    /// match consumer.try_get() {
175    ///     Ok(data) => println!("Got: {:?}", data),
176    ///     Err(_) => println!("No data yet"),
177    /// }
178    /// # Ok(())
179    /// # }
180    /// ```
181    pub fn try_get(&self) -> DbResult<T> {
182        let rx = self.rx.lock().unwrap();
183        rx.try_recv().map_err(|e| match e {
184            mpsc::TryRecvError::Empty => DbError::GetTimeout,
185            mpsc::TryRecvError::Disconnected => DbError::RuntimeShutdown,
186        })
187    }
188
189    /// Get the latest value by draining all queued values.
190    ///
191    /// This method drains the internal channel to get the most recent value,
192    /// discarding any intermediate values. This is useful for SingleLatest-like
193    /// semantics where you only care about the most recent data.
194    ///
195    /// Blocks until at least one value is available, then drains all queued
196    /// values and returns the last one.
197    ///
198    /// # Returns
199    ///
200    /// The most recent available record of type `T`.
201    ///
202    /// # Errors
203    ///
204    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
205    ///
206    /// # Example
207    ///
208    /// ```no_run
209    /// use aimdb_core::AimDbBuilder;
210    /// use aimdb_sync::AimDbBuilderSyncExt;
211    /// use aimdb_tokio_adapter::TokioAdapter;
212    /// use std::sync::Arc;
213    ///
214    /// # #[derive(Debug, Clone)]
215    /// # struct MyData { value: i32 }
216    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
217    /// let handle = AimDbBuilder::new()
218    ///     .runtime(Arc::new(TokioAdapter))
219    ///     .attach()?;
220    /// let consumer = handle.consumer::<MyData>()?;
221    ///
222    /// // Get the latest value, skipping any queued intermediate values
223    /// let latest = consumer.get_latest()?;
224    /// println!("Latest: {:?}", latest);
225    /// # Ok(())
226    /// # }
227    /// ```
228    pub fn get_latest(&self) -> DbResult<T> {
229        let rx = self.rx.lock().unwrap();
230
231        // First, block until we have at least one value
232        let mut latest = rx.recv().map_err(|_| DbError::RuntimeShutdown)?;
233
234        // Then drain all remaining values to get the most recent
235        while let Ok(value) = rx.try_recv() {
236            latest = value;
237        }
238
239        Ok(latest)
240    }
241
242    /// Get the latest value with a timeout, draining all queued values.
243    ///
244    /// Like `get_latest()`, but with a timeout. Blocks until at least one
245    /// value is available or the timeout expires, then drains all queued
246    /// values and returns the last one.
247    ///
248    /// # Arguments
249    ///
250    /// - `timeout`: Maximum time to wait for the first value
251    ///
252    /// # Errors
253    ///
254    /// - `DbError::GetTimeout` if the timeout expires before any value arrives
255    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
256    ///
257    /// # Example
258    ///
259    /// ```no_run
260    /// use aimdb_core::AimDbBuilder;
261    /// use aimdb_sync::AimDbBuilderSyncExt;
262    /// use aimdb_tokio_adapter::TokioAdapter;
263    /// use std::sync::Arc;
264    /// use std::time::Duration;
265    ///
266    /// # #[derive(Debug, Clone)]
267    /// # struct MyData { value: i32 }
268    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
269    /// let handle = AimDbBuilder::new()
270    ///     .runtime(Arc::new(TokioAdapter))
271    ///     .attach()?;
272    /// let consumer = handle.consumer::<MyData>()?;
273    ///
274    /// // Get the latest value within 100ms
275    /// match consumer.get_latest_with_timeout(Duration::from_millis(100)) {
276    ///     Ok(latest) => println!("Latest: {:?}", latest),
277    ///     Err(_) => println!("No data available"),
278    /// }
279    /// # Ok(())
280    /// # }
281    /// ```
282    pub fn get_latest_with_timeout(&self, timeout: Duration) -> DbResult<T> {
283        let rx = self.rx.lock().unwrap();
284
285        // First, block with timeout until we have at least one value
286        let mut latest = rx.recv_timeout(timeout).map_err(|e| match e {
287            mpsc::RecvTimeoutError::Timeout => DbError::GetTimeout,
288            mpsc::RecvTimeoutError::Disconnected => DbError::RuntimeShutdown,
289        })?;
290
291        // Then drain all remaining values to get the most recent
292        while let Ok(value) = rx.try_recv() {
293            latest = value;
294        }
295
296        Ok(latest)
297    }
298}
299
300impl<T> Clone for SyncConsumer<T>
301where
302    T: Send + Sync + 'static + Debug + Clone,
303{
304    /// Clone the consumer to share across threads.
305    ///
306    /// Note: All clones share the same receiver, so only one thread
307    /// will receive each value. For independent subscriptions, call
308    /// `handle.consumer()` multiple times instead.
309    fn clone(&self) -> Self {
310        Self {
311            rx: self.rx.clone(),
312        }
313    }
314}
315
316// Safety: SyncConsumer uses Arc internally and is safe to send/share
317unsafe impl<T> Send for SyncConsumer<T> where T: Send + Sync + 'static + Debug + Clone {}
318unsafe impl<T> Sync for SyncConsumer<T> where T: Send + Sync + 'static + Debug + Clone {}
319
320#[cfg(test)]
321mod tests {
322    #[test]
323    fn test_sync_consumer_is_send_sync() {
324        // Just checking that the type implements Send + Sync
325        // Actual functionality tests will come later
326    }
327}