aimdb_sync/consumer.rs
1//! Synchronous consumer for typed records.
2
3use aimdb_core::{DbError, DbResult};
4use std::fmt::Debug;
5use std::sync::mpsc;
6use std::sync::{Arc, Mutex};
7use std::time::Duration;
8
9/// Synchronous consumer for records of type `T`.
10///
11/// Thread-safe, can be cloned and shared across threads.
12/// Each clone receives data independently according to buffer semantics (SPMC, etc.).
13///
14/// # Thread Safety
15///
16/// Multiple clones of `SyncConsumer<T>` can be used concurrently from
17/// different threads. Each receives data independently based on the
18/// configured buffer type (SPMC, SingleLatest, etc.).
19///
20/// # Example
21///
22/// ```rust,ignore
23/// # use aimdb_sync::*;
24/// # use serde::{Serialize, Deserialize};
25/// # #[derive(Debug, Clone, Serialize, Deserialize)]
26/// # struct Temperature { celsius: f32 }
27/// # fn example(consumer: &SyncConsumer<Temperature>) -> Result<(), Box<dyn std::error::Error>> {
28/// // Get value (blocks until available)
29/// let temp = consumer.get()?;
30/// println!("Temperature: {}°C", temp.celsius);
31///
32/// // Get with timeout
33/// use std::time::Duration;
34/// match consumer.get_with_timeout(Duration::from_millis(100)) {
35/// Ok(temp) => println!("Got: {}°C", temp.celsius),
36/// Err(_) => println!("No data available"),
37/// }
38///
39/// // Try to get (non-blocking)
40/// match consumer.try_get() {
41/// Ok(temp) => println!("Got: {}°C", temp.celsius),
42/// Err(_) => println!("No data yet"),
43/// }
44/// # Ok(())
45/// # }
46/// ```
47pub struct SyncConsumer<T>
48where
49 T: Send + Sync + 'static + Debug + Clone,
50{
51 /// Channel receiver for consumer data
52 /// Wrapped in Arc<Mutex> so it can be shared but only one thread receives at a time
53 rx: Arc<Mutex<mpsc::Receiver<T>>>,
54}
55
56impl<T> SyncConsumer<T>
57where
58 T: Send + Sync + 'static + Debug + Clone,
59{
60 /// Create a new sync consumer (internal use only)
61 pub(crate) fn new(rx: mpsc::Receiver<T>) -> Self {
62 Self {
63 rx: Arc::new(Mutex::new(rx)),
64 }
65 }
66
67 /// Get a value, blocking until one is available.
68 ///
69 /// Blocks indefinitely until a value is available from the
70 /// runtime thread.
71 ///
72 /// # Returns
73 ///
74 /// The next available record of type `T`.
75 ///
76 /// # Errors
77 ///
78 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
79 ///
80 /// # Example
81 ///
82 /// ```no_run
83 /// use aimdb_core::AimDbBuilder;
84 /// use aimdb_sync::AimDbBuilderSyncExt;
85 /// use aimdb_tokio_adapter::TokioAdapter;
86 /// use std::sync::Arc;
87 ///
88 /// # #[derive(Debug, Clone)]
89 /// # struct MyData { value: i32 }
90 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
91 /// let handle = AimDbBuilder::new()
92 /// .runtime(Arc::new(TokioAdapter))
93 /// .attach()?;
94 /// let consumer = handle.consumer::<MyData>()?;
95 /// let data = consumer.get()?; // blocks until value available
96 /// println!("Got: {:?}", data);
97 /// # Ok(())
98 /// # }
99 /// ```
100 pub fn get(&self) -> DbResult<T> {
101 let rx = self.rx.lock().unwrap();
102 rx.recv().map_err(|_| DbError::RuntimeShutdown)
103 }
104
105 /// Get a value with a timeout.
106 ///
107 /// Blocks until a value is available or the timeout expires.
108 ///
109 /// # Arguments
110 ///
111 /// - `timeout`: Maximum time to wait
112 ///
113 /// # Errors
114 ///
115 /// - `DbError::GetTimeout` if the timeout expires
116 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
117 ///
118 /// # Example
119 ///
120 /// ```no_run
121 /// use aimdb_core::AimDbBuilder;
122 /// use aimdb_sync::AimDbBuilderSyncExt;
123 /// use aimdb_tokio_adapter::TokioAdapter;
124 /// use std::sync::Arc;
125 /// use std::time::Duration;
126 ///
127 /// # #[derive(Debug, Clone)]
128 /// # struct MyData { value: i32 }
129 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
130 /// let handle = AimDbBuilder::new()
131 /// .runtime(Arc::new(TokioAdapter))
132 /// .attach()?;
133 /// let consumer = handle.consumer::<MyData>()?;
134 /// match consumer.get_with_timeout(Duration::from_millis(100)) {
135 /// Ok(data) => println!("Got: {:?}", data),
136 /// Err(_) => println!("No data available"),
137 /// }
138 /// # Ok(())
139 /// # }
140 /// ```
141 pub fn get_with_timeout(&self, timeout: Duration) -> DbResult<T> {
142 let rx = self.rx.lock().unwrap();
143 rx.recv_timeout(timeout).map_err(|e| match e {
144 mpsc::RecvTimeoutError::Timeout => DbError::GetTimeout,
145 mpsc::RecvTimeoutError::Disconnected => DbError::RuntimeShutdown,
146 })
147 }
148
149 /// Try to get a value without blocking.
150 ///
151 /// Returns immediately with either a value or an error if
152 /// no data is available.
153 ///
154 /// # Errors
155 ///
156 /// - `DbError::GetTimeout` if no data is available (non-blocking)
157 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
158 ///
159 /// # Example
160 ///
161 /// ```no_run
162 /// use aimdb_core::AimDbBuilder;
163 /// use aimdb_sync::AimDbBuilderSyncExt;
164 /// use aimdb_tokio_adapter::TokioAdapter;
165 /// use std::sync::Arc;
166 ///
167 /// # #[derive(Debug, Clone)]
168 /// # struct MyData { value: i32 }
169 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
170 /// let handle = AimDbBuilder::new()
171 /// .runtime(Arc::new(TokioAdapter))
172 /// .attach()?;
173 /// let consumer = handle.consumer::<MyData>()?;
174 /// match consumer.try_get() {
175 /// Ok(data) => println!("Got: {:?}", data),
176 /// Err(_) => println!("No data yet"),
177 /// }
178 /// # Ok(())
179 /// # }
180 /// ```
181 pub fn try_get(&self) -> DbResult<T> {
182 let rx = self.rx.lock().unwrap();
183 rx.try_recv().map_err(|e| match e {
184 mpsc::TryRecvError::Empty => DbError::GetTimeout,
185 mpsc::TryRecvError::Disconnected => DbError::RuntimeShutdown,
186 })
187 }
188
189 /// Get the latest value by draining all queued values.
190 ///
191 /// This method drains the internal channel to get the most recent value,
192 /// discarding any intermediate values. This is useful for SingleLatest-like
193 /// semantics where you only care about the most recent data.
194 ///
195 /// Blocks until at least one value is available, then drains all queued
196 /// values and returns the last one.
197 ///
198 /// # Returns
199 ///
200 /// The most recent available record of type `T`.
201 ///
202 /// # Errors
203 ///
204 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
205 ///
206 /// # Example
207 ///
208 /// ```no_run
209 /// use aimdb_core::AimDbBuilder;
210 /// use aimdb_sync::AimDbBuilderSyncExt;
211 /// use aimdb_tokio_adapter::TokioAdapter;
212 /// use std::sync::Arc;
213 ///
214 /// # #[derive(Debug, Clone)]
215 /// # struct MyData { value: i32 }
216 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
217 /// let handle = AimDbBuilder::new()
218 /// .runtime(Arc::new(TokioAdapter))
219 /// .attach()?;
220 /// let consumer = handle.consumer::<MyData>()?;
221 ///
222 /// // Get the latest value, skipping any queued intermediate values
223 /// let latest = consumer.get_latest()?;
224 /// println!("Latest: {:?}", latest);
225 /// # Ok(())
226 /// # }
227 /// ```
228 pub fn get_latest(&self) -> DbResult<T> {
229 let rx = self.rx.lock().unwrap();
230
231 // First, block until we have at least one value
232 let mut latest = rx.recv().map_err(|_| DbError::RuntimeShutdown)?;
233
234 // Then drain all remaining values to get the most recent
235 while let Ok(value) = rx.try_recv() {
236 latest = value;
237 }
238
239 Ok(latest)
240 }
241
242 /// Get the latest value with a timeout, draining all queued values.
243 ///
244 /// Like `get_latest()`, but with a timeout. Blocks until at least one
245 /// value is available or the timeout expires, then drains all queued
246 /// values and returns the last one.
247 ///
248 /// # Arguments
249 ///
250 /// - `timeout`: Maximum time to wait for the first value
251 ///
252 /// # Errors
253 ///
254 /// - `DbError::GetTimeout` if the timeout expires before any value arrives
255 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
256 ///
257 /// # Example
258 ///
259 /// ```no_run
260 /// use aimdb_core::AimDbBuilder;
261 /// use aimdb_sync::AimDbBuilderSyncExt;
262 /// use aimdb_tokio_adapter::TokioAdapter;
263 /// use std::sync::Arc;
264 /// use std::time::Duration;
265 ///
266 /// # #[derive(Debug, Clone)]
267 /// # struct MyData { value: i32 }
268 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
269 /// let handle = AimDbBuilder::new()
270 /// .runtime(Arc::new(TokioAdapter))
271 /// .attach()?;
272 /// let consumer = handle.consumer::<MyData>()?;
273 ///
274 /// // Get the latest value within 100ms
275 /// match consumer.get_latest_with_timeout(Duration::from_millis(100)) {
276 /// Ok(latest) => println!("Latest: {:?}", latest),
277 /// Err(_) => println!("No data available"),
278 /// }
279 /// # Ok(())
280 /// # }
281 /// ```
282 pub fn get_latest_with_timeout(&self, timeout: Duration) -> DbResult<T> {
283 let rx = self.rx.lock().unwrap();
284
285 // First, block with timeout until we have at least one value
286 let mut latest = rx.recv_timeout(timeout).map_err(|e| match e {
287 mpsc::RecvTimeoutError::Timeout => DbError::GetTimeout,
288 mpsc::RecvTimeoutError::Disconnected => DbError::RuntimeShutdown,
289 })?;
290
291 // Then drain all remaining values to get the most recent
292 while let Ok(value) = rx.try_recv() {
293 latest = value;
294 }
295
296 Ok(latest)
297 }
298}
299
300impl<T> Clone for SyncConsumer<T>
301where
302 T: Send + Sync + 'static + Debug + Clone,
303{
304 /// Clone the consumer to share across threads.
305 ///
306 /// Note: All clones share the same receiver, so only one thread
307 /// will receive each value. For independent subscriptions, call
308 /// `handle.consumer()` multiple times instead.
309 fn clone(&self) -> Self {
310 Self {
311 rx: self.rx.clone(),
312 }
313 }
314}
315
316// Safety: SyncConsumer uses Arc internally and is safe to send/share
317unsafe impl<T> Send for SyncConsumer<T> where T: Send + Sync + 'static + Debug + Clone {}
318unsafe impl<T> Sync for SyncConsumer<T> where T: Send + Sync + 'static + Debug + Clone {}
319
320#[cfg(test)]
321mod tests {
322 #[test]
323 fn test_sync_consumer_is_send_sync() {
324 // Just checking that the type implements Send + Sync
325 // Actual functionality tests will come later
326 }
327}