aimdb_sync/
producer.rs

1//! Synchronous producer for typed records.
2
3use aimdb_core::{DbError, DbResult};
4use std::fmt::Debug;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{mpsc, oneshot};
8
9/// Synchronous producer for records of type `T`.
10///
11/// Thread-safe, can be cloned and shared across threads.
12/// Values are moved (not cloned) through channels for zero-copy performance.
13///
14/// # Thread Safety
15///
16/// Multiple clones of `SyncProducer<T>` can be used concurrently from
17/// different threads. Each `set()` operation is independent and thread-safe.
18///
19/// # Example
20///
21/// ```rust,no_run
22/// # use aimdb_sync::*;
23/// # use serde::{Serialize, Deserialize};
24/// # #[derive(Clone, Debug, Serialize, Deserialize)]
25/// # struct Temperature { celsius: f32 }
26/// # fn example(producer: &SyncProducer<Temperature>) -> Result<(), Box<dyn std::error::Error>> {
27/// // Set value (blocks until sent)
28/// producer.set(Temperature { celsius: 25.0 })?;
29///
30/// // Set with timeout
31/// use std::time::Duration;
32/// producer.set_with_timeout(
33///     Temperature { celsius: 26.0 },
34///     Duration::from_millis(100)
35/// )?;
36///
37/// // Try to set (non-blocking)
38/// match producer.try_set(Temperature { celsius: 27.0 }) {
39///     Ok(()) => println!("Success"),
40///     Err(_) => println!("Channel full, try later"),
41/// }
42/// # Ok(())
43/// # }
44/// ```
45pub struct SyncProducer<T>
46where
47    T: Send + 'static + Debug + Clone,
48{
49    /// Channel sender for producer commands
50    /// Wrapped in Arc so it can be cloned across threads
51    /// Sends (value, result_sender) tuples to propagate produce errors back to caller
52    tx: Arc<mpsc::Sender<(T, oneshot::Sender<DbResult<()>>)>>,
53
54    /// Runtime handle for executing async operations with timeout
55    runtime_handle: tokio::runtime::Handle,
56}
57
58impl<T> SyncProducer<T>
59where
60    T: Send + 'static + Debug + Clone,
61{
62    /// Create a new sync producer (internal use only)
63    pub(crate) fn new(
64        tx: mpsc::Sender<(T, oneshot::Sender<DbResult<()>>)>,
65        runtime_handle: tokio::runtime::Handle,
66    ) -> Self {
67        Self {
68            tx: Arc::new(tx),
69            runtime_handle,
70        }
71    }
72
73    /// Internal helper: send value and wait for result with optional timeout
74    fn send_internal(&self, value: T, timeout: Option<Duration>) -> DbResult<()> {
75        let (result_tx, result_rx) = oneshot::channel();
76        let tx = self.tx.clone();
77
78        self.runtime_handle.block_on(async move {
79            // Send with optional timeout
80            let send_result = match timeout {
81                Some(duration) => tokio::time::timeout(duration, tx.send((value, result_tx))).await,
82                None => Ok(tx.send((value, result_tx)).await),
83            };
84
85            match send_result {
86                Ok(Ok(())) => {
87                    // Successfully sent, now wait for produce result
88                    let recv_result = match timeout {
89                        Some(duration) => tokio::time::timeout(duration, result_rx).await,
90                        None => Ok(result_rx.await),
91                    };
92
93                    match recv_result {
94                        Ok(Ok(result)) => result,
95                        Ok(Err(_)) => Err(DbError::RuntimeShutdown),
96                        Err(_) => Err(DbError::SetTimeout),
97                    }
98                }
99                Ok(Err(_)) => Err(DbError::RuntimeShutdown),
100                Err(_) => Err(DbError::SetTimeout),
101            }
102        })
103    }
104
105    /// Set the value, blocking until it can be sent.
106    ///
107    /// This call will block the current thread until the value can be sent to the runtime thread.
108    /// It's guaranteed to deliver the value eventually unless the runtime thread has shut down.
109    ///
110    /// # Errors
111    ///
112    /// Returns `DbError::RuntimeShutdown` if the runtime thread has been detached.
113    /// Returns any error from the underlying `produce()` operation (e.g., record not registered,
114    /// buffer full, etc.).
115    ///
116    /// # Example
117    ///
118    /// ```no_run
119    /// use aimdb_core::AimDbBuilder;
120    /// use aimdb_sync::AimDbBuilderSyncExt;
121    /// use aimdb_tokio_adapter::TokioAdapter;
122    /// use std::sync::Arc;
123    ///
124    /// # #[derive(Debug, Clone)]
125    /// # struct MyData { value: i32 }
126    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
127    /// let handle = AimDbBuilder::new()
128    ///     .runtime(Arc::new(TokioAdapter))
129    ///     .attach()?;
130    /// let producer = handle.producer::<MyData>("my_data")?;
131    /// producer.set(MyData { value: 42 })?; // blocks until value is sent and produced
132    /// # Ok(())
133    /// # }
134    /// ```
135    pub fn set(&self, value: T) -> DbResult<()> {
136        self.send_internal(value, None)
137    }
138
139    /// Set the value with a timeout.
140    ///
141    /// Attempts to send the value to the runtime thread and wait for produce completion,
142    /// blocking for at most `timeout` duration.
143    ///
144    /// # Errors
145    ///
146    /// Returns `DbError::SetTimeout` if the timeout expires before the value can be sent
147    /// or if waiting for the produce result exceeds the timeout.
148    /// Returns `DbError::RuntimeShutdown` if the runtime thread has been detached.
149    /// Returns any error from the underlying `produce()` operation.
150    ///
151    /// # Example
152    ///
153    /// ```no_run
154    /// use aimdb_core::AimDbBuilder;
155    /// use aimdb_sync::AimDbBuilderSyncExt;
156    /// use aimdb_tokio_adapter::TokioAdapter;
157    /// use std::sync::Arc;
158    /// use std::time::Duration;
159    ///
160    /// # #[derive(Debug, Clone)]
161    /// # struct MyData { value: i32 }
162    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
163    /// let handle = AimDbBuilder::new()
164    ///     .runtime(Arc::new(TokioAdapter))
165    ///     .attach()?;
166    /// let producer = handle.producer::<MyData>("my_data")?;
167    /// producer.set_with_timeout(MyData { value: 42 }, Duration::from_millis(100))?;
168    /// # Ok(())
169    /// # }
170    /// ```
171    pub fn set_with_timeout(&self, value: T, timeout: Duration) -> DbResult<()> {
172        self.send_internal(value, Some(timeout))
173    }
174
175    /// Try to set the value without blocking.
176    ///
177    /// Attempts to send the value immediately. Returns an error if the channel is full
178    /// or the runtime thread has shut down.
179    ///
180    /// **Note**: This method returns immediately after sending to the channel, but does NOT
181    /// wait for the produce operation to complete. Use `set()` or `set_with_timeout()` if
182    /// you need to know whether the produce operation succeeded.
183    ///
184    /// # Errors
185    ///
186    /// Returns `DbError::SetTimeout` if the channel is full.
187    /// Returns `DbError::RuntimeShutdown` if the runtime thread has been detached.
188    ///
189    /// # Example
190    ///
191    /// ```no_run
192    /// use aimdb_core::AimDbBuilder;
193    /// use aimdb_sync::AimDbBuilderSyncExt;
194    /// use aimdb_tokio_adapter::TokioAdapter;
195    /// use std::sync::Arc;
196    ///
197    /// # #[derive(Debug, Clone)]
198    /// # struct MyData { value: i32 }
199    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
200    /// let handle = AimDbBuilder::new()
201    ///     .runtime(Arc::new(TokioAdapter))
202    ///     .attach()?;
203    /// let producer = handle.producer::<MyData>("my_data")?;
204    /// match producer.try_set(MyData { value: 42 }) {
205    ///     Ok(()) => println!("Sent immediately"),
206    ///     Err(_) => println!("Channel full or runtime shutdown"),
207    /// }
208    /// # Ok(())
209    /// # }
210    /// ```
211    pub fn try_set(&self, value: T) -> DbResult<()> {
212        // Create a oneshot channel but don't wait for the result
213        let (result_tx, _result_rx) = oneshot::channel();
214
215        self.tx.try_send((value, result_tx)).map_err(|e| match e {
216            mpsc::error::TrySendError::Full(_) => DbError::SetTimeout,
217            mpsc::error::TrySendError::Closed(_) => DbError::RuntimeShutdown,
218        })
219    }
220}
221
222impl<T> Clone for SyncProducer<T>
223where
224    T: Send + 'static + Debug + Clone,
225{
226    /// Clone the producer to share across threads.
227    ///
228    /// Multiple clones can set values concurrently.
229    fn clone(&self) -> Self {
230        Self {
231            tx: self.tx.clone(),
232            runtime_handle: self.runtime_handle.clone(),
233        }
234    }
235}
236
237// Safety: SyncProducer uses Arc internally and is safe to send/share
238unsafe impl<T> Send for SyncProducer<T> where T: Send + 'static + Debug + Clone {}
239unsafe impl<T> Sync for SyncProducer<T> where T: Send + 'static + Debug + Clone {}
240
241#[cfg(test)]
242mod tests {
243    #[test]
244    fn test_sync_producer_is_send_sync() {
245        // Just checking that the type implements Send + Sync
246        // Actual functionality tests will come later
247    }
248}