aimdb_sync/producer.rs
1//! Synchronous producer for typed records.
2
3use aimdb_core::{DbError, DbResult};
4use std::fmt::Debug;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{mpsc, oneshot};
8
9/// Synchronous producer for records of type `T`.
10///
11/// Thread-safe, can be cloned and shared across threads.
12/// Values are moved (not cloned) through channels for zero-copy performance.
13///
14/// # Thread Safety
15///
16/// Multiple clones of `SyncProducer<T>` can be used concurrently from
17/// different threads. Each `set()` operation is independent and thread-safe.
18///
19/// # Example
20///
21/// ```rust,no_run
22/// # use aimdb_sync::*;
23/// # use serde::{Serialize, Deserialize};
24/// # #[derive(Clone, Debug, Serialize, Deserialize)]
25/// # struct Temperature { celsius: f32 }
26/// # fn example(producer: &SyncProducer<Temperature>) -> Result<(), Box<dyn std::error::Error>> {
27/// // Set value (blocks until sent)
28/// producer.set(Temperature { celsius: 25.0 })?;
29///
30/// // Set with timeout
31/// use std::time::Duration;
32/// producer.set_with_timeout(
33/// Temperature { celsius: 26.0 },
34/// Duration::from_millis(100)
35/// )?;
36///
37/// // Try to set (non-blocking)
38/// match producer.try_set(Temperature { celsius: 27.0 }) {
39/// Ok(()) => println!("Success"),
40/// Err(_) => println!("Channel full, try later"),
41/// }
42/// # Ok(())
43/// # }
44/// ```
45pub struct SyncProducer<T>
46where
47 T: Send + 'static + Debug + Clone,
48{
49 /// Channel sender for producer commands
50 /// Wrapped in Arc so it can be cloned across threads
51 /// Sends (value, result_sender) tuples to propagate produce errors back to caller
52 tx: Arc<mpsc::Sender<(T, oneshot::Sender<DbResult<()>>)>>,
53
54 /// Runtime handle for executing async operations with timeout
55 runtime_handle: tokio::runtime::Handle,
56}
57
58impl<T> SyncProducer<T>
59where
60 T: Send + 'static + Debug + Clone,
61{
62 /// Create a new sync producer (internal use only)
63 pub(crate) fn new(
64 tx: mpsc::Sender<(T, oneshot::Sender<DbResult<()>>)>,
65 runtime_handle: tokio::runtime::Handle,
66 ) -> Self {
67 Self {
68 tx: Arc::new(tx),
69 runtime_handle,
70 }
71 }
72
73 /// Internal helper: send value and wait for result with optional timeout
74 fn send_internal(&self, value: T, timeout: Option<Duration>) -> DbResult<()> {
75 let (result_tx, result_rx) = oneshot::channel();
76 let tx = self.tx.clone();
77
78 self.runtime_handle.block_on(async move {
79 // Send with optional timeout
80 let send_result = match timeout {
81 Some(duration) => tokio::time::timeout(duration, tx.send((value, result_tx))).await,
82 None => Ok(tx.send((value, result_tx)).await),
83 };
84
85 match send_result {
86 Ok(Ok(())) => {
87 // Successfully sent, now wait for produce result
88 let recv_result = match timeout {
89 Some(duration) => tokio::time::timeout(duration, result_rx).await,
90 None => Ok(result_rx.await),
91 };
92
93 match recv_result {
94 Ok(Ok(result)) => result,
95 Ok(Err(_)) => Err(DbError::RuntimeShutdown),
96 Err(_) => Err(DbError::SetTimeout),
97 }
98 }
99 Ok(Err(_)) => Err(DbError::RuntimeShutdown),
100 Err(_) => Err(DbError::SetTimeout),
101 }
102 })
103 }
104
105 /// Set the value, blocking until it can be sent.
106 ///
107 /// This call will block the current thread until the value can be sent to the runtime thread.
108 /// It's guaranteed to deliver the value eventually unless the runtime thread has shut down.
109 ///
110 /// # Errors
111 ///
112 /// Returns `DbError::RuntimeShutdown` if the runtime thread has been detached.
113 /// Returns any error from the underlying `produce()` operation (e.g., record not registered,
114 /// buffer full, etc.).
115 ///
116 /// # Example
117 ///
118 /// ```no_run
119 /// use aimdb_core::AimDbBuilder;
120 /// use aimdb_sync::AimDbBuilderSyncExt;
121 /// use aimdb_tokio_adapter::TokioAdapter;
122 /// use std::sync::Arc;
123 ///
124 /// # #[derive(Debug, Clone)]
125 /// # struct MyData { value: i32 }
126 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
127 /// let handle = AimDbBuilder::new()
128 /// .runtime(Arc::new(TokioAdapter))
129 /// .attach()?;
130 /// let producer = handle.producer::<MyData>("my_data")?;
131 /// producer.set(MyData { value: 42 })?; // blocks until value is sent and produced
132 /// # Ok(())
133 /// # }
134 /// ```
135 pub fn set(&self, value: T) -> DbResult<()> {
136 self.send_internal(value, None)
137 }
138
139 /// Set the value with a timeout.
140 ///
141 /// Attempts to send the value to the runtime thread and wait for produce completion,
142 /// blocking for at most `timeout` duration.
143 ///
144 /// # Errors
145 ///
146 /// Returns `DbError::SetTimeout` if the timeout expires before the value can be sent
147 /// or if waiting for the produce result exceeds the timeout.
148 /// Returns `DbError::RuntimeShutdown` if the runtime thread has been detached.
149 /// Returns any error from the underlying `produce()` operation.
150 ///
151 /// # Example
152 ///
153 /// ```no_run
154 /// use aimdb_core::AimDbBuilder;
155 /// use aimdb_sync::AimDbBuilderSyncExt;
156 /// use aimdb_tokio_adapter::TokioAdapter;
157 /// use std::sync::Arc;
158 /// use std::time::Duration;
159 ///
160 /// # #[derive(Debug, Clone)]
161 /// # struct MyData { value: i32 }
162 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
163 /// let handle = AimDbBuilder::new()
164 /// .runtime(Arc::new(TokioAdapter))
165 /// .attach()?;
166 /// let producer = handle.producer::<MyData>("my_data")?;
167 /// producer.set_with_timeout(MyData { value: 42 }, Duration::from_millis(100))?;
168 /// # Ok(())
169 /// # }
170 /// ```
171 pub fn set_with_timeout(&self, value: T, timeout: Duration) -> DbResult<()> {
172 self.send_internal(value, Some(timeout))
173 }
174
175 /// Try to set the value without blocking.
176 ///
177 /// Attempts to send the value immediately. Returns an error if the channel is full
178 /// or the runtime thread has shut down.
179 ///
180 /// **Note**: This method returns immediately after sending to the channel, but does NOT
181 /// wait for the produce operation to complete. Use `set()` or `set_with_timeout()` if
182 /// you need to know whether the produce operation succeeded.
183 ///
184 /// # Errors
185 ///
186 /// Returns `DbError::SetTimeout` if the channel is full.
187 /// Returns `DbError::RuntimeShutdown` if the runtime thread has been detached.
188 ///
189 /// # Example
190 ///
191 /// ```no_run
192 /// use aimdb_core::AimDbBuilder;
193 /// use aimdb_sync::AimDbBuilderSyncExt;
194 /// use aimdb_tokio_adapter::TokioAdapter;
195 /// use std::sync::Arc;
196 ///
197 /// # #[derive(Debug, Clone)]
198 /// # struct MyData { value: i32 }
199 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
200 /// let handle = AimDbBuilder::new()
201 /// .runtime(Arc::new(TokioAdapter))
202 /// .attach()?;
203 /// let producer = handle.producer::<MyData>("my_data")?;
204 /// match producer.try_set(MyData { value: 42 }) {
205 /// Ok(()) => println!("Sent immediately"),
206 /// Err(_) => println!("Channel full or runtime shutdown"),
207 /// }
208 /// # Ok(())
209 /// # }
210 /// ```
211 pub fn try_set(&self, value: T) -> DbResult<()> {
212 // Create a oneshot channel but don't wait for the result
213 let (result_tx, _result_rx) = oneshot::channel();
214
215 self.tx.try_send((value, result_tx)).map_err(|e| match e {
216 mpsc::error::TrySendError::Full(_) => DbError::SetTimeout,
217 mpsc::error::TrySendError::Closed(_) => DbError::RuntimeShutdown,
218 })
219 }
220}
221
222impl<T> Clone for SyncProducer<T>
223where
224 T: Send + 'static + Debug + Clone,
225{
226 /// Clone the producer to share across threads.
227 ///
228 /// Multiple clones can set values concurrently.
229 fn clone(&self) -> Self {
230 Self {
231 tx: self.tx.clone(),
232 runtime_handle: self.runtime_handle.clone(),
233 }
234 }
235}
236
237// Safety: SyncProducer uses Arc internally and is safe to send/share
238unsafe impl<T> Send for SyncProducer<T> where T: Send + 'static + Debug + Clone {}
239unsafe impl<T> Sync for SyncProducer<T> where T: Send + 'static + Debug + Clone {}
240
241#[cfg(test)]
242mod tests {
243 #[test]
244 fn test_sync_producer_is_send_sync() {
245 // Just checking that the type implements Send + Sync
246 // Actual functionality tests will come later
247 }
248}