aimdb_sync/handle.rs
1//! AimDB handle for managing the sync API runtime thread.
2
3use aimdb_core::{AimDb, AimDbBuilder, DbError, DbResult};
4use aimdb_tokio_adapter::TokioAdapter;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::thread::{self, JoinHandle};
8use std::time::Duration;
9use tokio::sync::mpsc;
10
11/// Default channel capacity for sync producers and consumers.
12///
13/// This is the buffer size used by `producer()` and `consumer()` methods.
14/// A capacity of 100 provides a good balance between:
15/// - Memory usage (100 × sizeof(T) per channel)
16/// - Latency (small bursts don't block)
17/// - Backpressure (prevents unbounded growth)
18///
19/// Use `producer_with_capacity()` or `consumer_with_capacity()` if you need
20/// different buffering for specific record types.
21pub const DEFAULT_SYNC_CHANNEL_CAPACITY: usize = 100;
22
23/// Extension trait to add `attach()` method to `AimDbBuilder`.
24///
25/// This trait provides the entry point to the sync API by allowing
26/// an `AimDbBuilder` instance to build the database and attach it to
27/// a background runtime thread in one step.
28pub trait AimDbBuilderSyncExt {
29 /// Build the database inside a runtime thread and attach for sync API.
30 ///
31 /// This method takes a configured builder (WITH `.runtime(TokioAdapter)` set),
32 /// spawns a background thread with a Tokio runtime, builds the database
33 /// inside that context, and returns a sync handle.
34 ///
35 /// **Important**: Call `.runtime(Arc::new(TokioAdapter))` before `.attach()`.
36 /// Even though TokioAdapter is created in sync context, the actual building
37 /// happens in the async context where it can be used.
38 ///
39 /// # Errors
40 ///
41 /// - `DbError::RuntimeError` if the database fails to build
42 /// - `DbError::AttachFailed` if the runtime thread fails to start
43 ///
44 /// # Example
45 ///
46 /// ```rust,ignore
47 /// use aimdb_core::AimDbBuilder;
48 /// use aimdb_tokio_adapter::TokioAdapter;
49 /// use aimdb_sync::AimDbBuilderSyncExt;
50 /// use std::sync::Arc;
51 ///
52 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
53 /// let mut builder = AimDbBuilder::new()
54 /// .runtime(Arc::new(TokioAdapter)); // Create adapter (it's just a marker)
55 /// builder.configure::<MyData>(|reg| {
56 /// // Configure buffer, sources, taps, etc.
57 /// });
58 /// let handle = builder.attach()?; // Build happens in runtime thread
59 /// # Ok(())
60 /// # }
61 /// ```
62 fn attach(self) -> DbResult<AimDbHandle>;
63}
64
65impl AimDbBuilderSyncExt for AimDbBuilder<TokioAdapter> {
66 fn attach(self) -> DbResult<AimDbHandle> {
67 AimDbHandle::new_from_builder(self)
68 }
69}
70
71/// Extension trait to add `attach()` method to `AimDb`.
72///
73/// This trait provides an alternative entry point to the sync API by allowing
74/// an already-built `AimDb` instance to be attached to a background runtime thread.
75pub trait AimDbSyncExt {
76 /// Attach the database to a background runtime thread.
77 ///
78 /// Takes ownership of the database and spawns a dedicated thread running
79 /// a Tokio runtime. Returns a handle for sync API access.
80 ///
81 /// # Errors
82 ///
83 /// - `DbError::AttachFailed` if the runtime thread fails to start
84 ///
85 /// # Example
86 ///
87 /// ```rust,ignore
88 /// use aimdb_core::AimDbBuilder;
89 /// use aimdb_tokio_adapter::TokioAdapter;
90 /// use aimdb_sync::AimDbSyncExt;
91 /// use std::sync::Arc;
92 ///
93 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
94 /// let db = AimDbBuilder::new()
95 /// .runtime(Arc::new(TokioAdapter::new()?))
96 /// .build()?;
97 ///
98 /// let handle = db.attach()?;
99 /// # Ok(())
100 /// # }
101 /// ```
102 fn attach(self) -> DbResult<AimDbHandle>;
103}
104
105impl AimDbSyncExt for AimDb<aimdb_tokio_adapter::TokioAdapter> {
106 fn attach(self) -> DbResult<AimDbHandle> {
107 AimDbHandle::new(self)
108 }
109}
110
111/// Handle to the AimDB runtime thread.
112///
113/// Created by calling `AimDb::attach()`. Provides factory methods
114/// for creating typed producers and consumers.
115///
116/// # Thread Safety
117///
118/// `AimDbHandle` is `Send + Sync` and can be shared across threads.
119/// However, it should typically be owned by one thread, with only
120/// the producers/consumers being cloned and shared.
121///
122/// # Resource Management
123///
124/// Call `detach()` explicitly to ensure clean shutdown. If the handle
125/// is dropped without calling `detach()`, a warning will be logged
126/// and an emergency shutdown will be attempted.
127pub struct AimDbHandle {
128 /// Thread handle for the runtime thread
129 thread_handle: Option<JoinHandle<()>>,
130
131 /// Shutdown signal sender
132 shutdown_tx: Option<mpsc::Sender<ShutdownSignal>>,
133
134 /// Tokio runtime handle for submitting async work
135 runtime_handle: tokio::runtime::Handle,
136
137 /// Shared reference to the database (protected by Arc for thread safety)
138 db: Arc<AimDb<TokioAdapter>>,
139}
140
141/// Signal to shut down the runtime thread.
142#[derive(Debug, Clone, Copy)]
143struct ShutdownSignal;
144
145impl AimDbHandle {
146 /// Create a new handle by spawning the runtime thread and building the database inside it.
147 pub(crate) fn new_from_builder(builder: AimDbBuilder<TokioAdapter>) -> DbResult<Self> {
148 // Create shutdown channel
149 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<ShutdownSignal>(1);
150
151 // Create channels for passing the built database and runtime handle back
152 let (db_tx, mut db_rx) = mpsc::channel::<Arc<AimDb<TokioAdapter>>>(1);
153 let (handle_tx, mut handle_rx) = mpsc::channel::<tokio::runtime::Handle>(1);
154
155 // Spawn the runtime thread
156 let thread_handle = thread::Builder::new()
157 .name("aimdb-sync-runtime".to_string())
158 .spawn(move || {
159 // Create a new Tokio runtime for this thread
160 let runtime = match tokio::runtime::Runtime::new() {
161 Ok(rt) => rt,
162 Err(e) => {
163 eprintln!("Failed to create Tokio runtime: {}", e);
164 return;
165 }
166 };
167
168 // Get the runtime handle before moving into block_on
169 let rt_handle = runtime.handle().clone();
170
171 // Send the runtime handle to the main thread
172 if handle_tx.blocking_send(rt_handle).is_err() {
173 eprintln!("Failed to send runtime handle to main thread");
174 return;
175 }
176
177 // Build the database inside the async context
178 runtime.block_on(async move {
179 // Build the database (now we're in Tokio context where it can spawn tasks!)
180 let db = match builder.build().await {
181 Ok(d) => Arc::new(d),
182 Err(e) => {
183 eprintln!("Failed to build database: {}", e);
184 return;
185 }
186 };
187
188 // Send the database to the main thread
189 if db_tx.send(db.clone()).await.is_err() {
190 eprintln!("Failed to send database to main thread");
191 return;
192 }
193
194 // Wait for shutdown signal, keeping database alive
195 let _ = shutdown_rx.recv().await;
196 });
197 })
198 .map_err(|e| DbError::AttachFailed {
199 message: format!("Failed to spawn runtime thread: {}", e),
200 })?;
201
202 // Wait for runtime handle to be available
203 let runtime_handle = handle_rx
204 .blocking_recv()
205 .ok_or_else(|| DbError::AttachFailed {
206 message: "Runtime thread failed to send handle".to_string(),
207 })?;
208
209 // Wait for database to be built
210 let db = db_rx.blocking_recv().ok_or_else(|| DbError::AttachFailed {
211 message: "Runtime thread failed to build database".to_string(),
212 })?;
213
214 Ok(Self {
215 thread_handle: Some(thread_handle),
216 shutdown_tx: Some(shutdown_tx),
217 runtime_handle,
218 db,
219 })
220 }
221
222 /// Create a new handle from an already-built database (legacy method).
223 #[allow(dead_code)]
224 pub(crate) fn new(db: AimDb<TokioAdapter>) -> DbResult<Self> {
225 // Create shutdown channel
226 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<ShutdownSignal>(1);
227
228 // Wrap database in Arc for sharing
229 let db = Arc::new(db);
230
231 // Spawn the runtime thread
232 let runtime_handle_result = Arc::new(std::sync::Mutex::new(None));
233 let runtime_handle_clone = runtime_handle_result.clone();
234
235 let thread_handle = thread::Builder::new()
236 .name("aimdb-sync-runtime".to_string())
237 .spawn(move || {
238 // Create a new Tokio runtime for this thread
239 let runtime = match tokio::runtime::Runtime::new() {
240 Ok(rt) => rt,
241 Err(e) => {
242 eprintln!("Failed to create Tokio runtime: {}", e);
243 return;
244 }
245 };
246
247 // Store the runtime handle so the main thread can access it
248 {
249 let mut handle = runtime_handle_clone.lock().unwrap();
250 *handle = Some(runtime.handle().clone());
251 }
252
253 // Wait for shutdown signal
254 runtime.block_on(async move {
255 let _ = shutdown_rx.recv().await;
256 // When shutdown signal is received, we exit and drop the database
257 });
258 })
259 .map_err(|e| DbError::AttachFailed {
260 message: format!("Failed to spawn runtime thread: {}", e),
261 })?;
262
263 // Wait for runtime handle to be available
264 let runtime_handle = loop {
265 let handle_opt = runtime_handle_result.lock().unwrap().clone();
266 if let Some(handle) = handle_opt {
267 break handle;
268 }
269 thread::sleep(Duration::from_millis(1));
270 };
271
272 Ok(Self {
273 thread_handle: Some(thread_handle),
274 shutdown_tx: Some(shutdown_tx),
275 runtime_handle,
276 db,
277 })
278 }
279
280 /// Create a synchronous producer for type `T`.
281 ///
282 /// # Type Parameters
283 ///
284 /// - `T`: The record type, must implement `TypedRecord`
285 ///
286 /// # Errors
287 ///
288 /// - `DbError::RecordNotFound` if type `T` was not registered
289 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
290 ///
291 /// # Example
292 ///
293 /// ```rust,ignore
294 /// # use aimdb_sync::*;
295 /// # use serde::{Serialize, Deserialize};
296 /// # #[derive(Debug, Clone, Serialize, Deserialize)]
297 /// # struct Temperature { celsius: f32 }
298 /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
299 /// let producer = handle.producer::<Temperature>()?;
300 /// producer.set(Temperature { celsius: 25.0 })?;
301 /// # Ok(())
302 /// # }
303 /// ```
304 pub fn producer<T>(&self) -> DbResult<crate::SyncProducer<T>>
305 where
306 T: Send + 'static + Debug + Clone,
307 {
308 self.producer_with_capacity(DEFAULT_SYNC_CHANNEL_CAPACITY)
309 }
310
311 /// Create a synchronous consumer for type `T`.
312 ///
313 /// # Type Parameters
314 ///
315 /// - `T`: The record type, must implement `TypedRecord`
316 ///
317 /// # Errors
318 ///
319 /// - `DbError::RecordNotFound` if type `T` was not registered
320 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
321 ///
322 /// # Example
323 ///
324 /// ```rust,no_run
325 /// # use aimdb_sync::*;
326 /// # use serde::{Serialize, Deserialize};
327 /// # #[derive(Clone, Debug, Serialize, Deserialize)]
328 /// # struct Temperature { celsius: f32 }
329 /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
330 /// let consumer = handle.consumer::<Temperature>()?;
331 /// let temp = consumer.get()?;
332 /// # Ok(())
333 /// # }
334 /// ```
335 pub fn consumer<T>(&self) -> DbResult<crate::SyncConsumer<T>>
336 where
337 T: Send + Sync + 'static + Debug + Clone,
338 {
339 self.consumer_with_capacity(DEFAULT_SYNC_CHANNEL_CAPACITY)
340 }
341
342 /// Create a synchronous producer with custom channel capacity.
343 ///
344 /// Like `producer()` but allows specifying the channel buffer size.
345 /// Use this when you need different buffering characteristics for specific record types.
346 ///
347 /// # Arguments
348 ///
349 /// - `capacity`: Channel buffer size (number of items that can be buffered)
350 ///
351 /// # Type Parameters
352 ///
353 /// - `T`: The record type, must implement `TypedRecord`
354 ///
355 /// # Errors
356 ///
357 /// - `DbError::RecordNotFound` if type `T` was not registered
358 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
359 ///
360 /// # Example
361 ///
362 /// ```rust,ignore
363 /// # use aimdb_sync::*;
364 /// # use serde::{Serialize, Deserialize};
365 /// # #[derive(Debug, Clone, Serialize, Deserialize)]
366 /// # struct HighFrequencySensor { value: f32 }
367 /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
368 /// // High-frequency sensor needs larger buffer
369 /// let producer = handle.producer_with_capacity::<HighFrequencySensor>(1000)?;
370 /// producer.set(HighFrequencySensor { value: 42.0 })?;
371 /// # Ok(())
372 /// # }
373 /// ```
374 pub fn producer_with_capacity<T>(&self, capacity: usize) -> DbResult<crate::SyncProducer<T>>
375 where
376 T: Send + 'static + Debug + Clone,
377 {
378 // Create a bounded tokio channel for async/sync bridging
379 // Channel carries (value, result_sender) tuples to propagate errors back
380 let (tx, mut rx) =
381 mpsc::channel::<(T, tokio::sync::oneshot::Sender<DbResult<()>>)>(capacity);
382
383 // Spawn a task on the runtime to forward values to the database
384 let db = self.db.clone();
385 self.runtime_handle.spawn(async move {
386 while let Some((value, result_tx)) = rx.recv().await {
387 // Forward the value to the database's produce pipeline
388 let result = db.produce(value).await;
389
390 // Send the result back to the caller (may fail if caller dropped)
391 let _ = result_tx.send(result);
392 }
393 });
394
395 Ok(crate::SyncProducer::new(tx, self.runtime_handle.clone()))
396 }
397
398 /// Create a synchronous consumer with custom channel capacity.
399 ///
400 /// Like `consumer()` but allows specifying the channel buffer size.
401 /// Use this when you need different buffering characteristics for specific record types.
402 ///
403 /// # Arguments
404 ///
405 /// - `capacity`: Channel buffer size (number of items that can be buffered)
406 ///
407 /// # Type Parameters
408 ///
409 /// - `T`: The record type, must implement `TypedRecord`
410 ///
411 /// # Errors
412 ///
413 /// - `DbError::RecordNotFound` if type `T` was not registered
414 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
415 ///
416 /// # Example
417 ///
418 /// ```rust,no_run
419 /// # use aimdb_sync::*;
420 /// # use serde::{Serialize, Deserialize};
421 /// # #[derive(Clone, Debug, Serialize, Deserialize)]
422 /// # struct RareEvent { id: u32 }
423 /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
424 /// // Rare events need smaller buffer
425 /// let consumer = handle.consumer_with_capacity::<RareEvent>(10)?;
426 /// let event = consumer.get()?;
427 /// # Ok(())
428 /// # }
429 /// ```
430 pub fn consumer_with_capacity<T>(&self, capacity: usize) -> DbResult<crate::SyncConsumer<T>>
431 where
432 T: Send + Sync + 'static + Debug + Clone,
433 {
434 // Create std::sync::mpsc channel for sync API
435 let (std_tx, std_rx) = std::sync::mpsc::sync_channel::<T>(capacity);
436
437 // Create a oneshot channel to confirm subscription succeeded
438 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
439
440 // Spawn a task on the runtime to forward buffer data to the std channel
441 let db = self.db.clone();
442 self.runtime_handle.spawn(async move {
443 // Subscribe to the database buffer for type T
444 match db.subscribe::<T>() {
445 Ok(mut reader) => {
446 // Signal that subscription succeeded
447 let _ = ready_tx.send(());
448
449 // Forward all values from the buffer reader to the std channel
450 loop {
451 match reader.recv().await {
452 Ok(value) => {
453 // Send to std channel (non-async operation)
454 // If the receiver is dropped, send() will fail
455 if std_tx.send(value).is_err() {
456 break;
457 }
458 }
459 Err(DbError::BufferLagged { lag_count, .. }) => {
460 // Consumer fell behind - this is not fatal
461 // Log warning but continue receiving
462 eprintln!(
463 "Warning: Consumer for {} lagged by {} messages",
464 std::any::type_name::<T>(),
465 lag_count
466 );
467 // Don't break - next recv() will get latest data
468 }
469 Err(DbError::BufferClosed { .. }) => {
470 // Buffer closed (shutdown) - exit gracefully
471 break;
472 }
473 Err(e) => {
474 // Other unexpected errors - log and stop
475 eprintln!(
476 "Error reading from buffer for {}: {}",
477 std::any::type_name::<T>(),
478 e
479 );
480 break;
481 }
482 }
483 }
484 }
485 Err(e) => {
486 eprintln!(
487 "Failed to subscribe to record type {}: {}",
488 std::any::type_name::<T>(),
489 e
490 );
491 // Signal failure (will be ignored if receiver dropped)
492 let _ = ready_tx.send(());
493 }
494 }
495 });
496
497 // Wait for subscription to complete (with timeout)
498 ready_rx
499 .blocking_recv()
500 .map_err(|_| DbError::AttachFailed {
501 message: format!("Failed to subscribe to {}", std::any::type_name::<T>()),
502 })?;
503
504 Ok(crate::SyncConsumer::new(std_rx))
505 }
506
507 /// Gracefully shut down the runtime thread.
508 ///
509 /// Signals the runtime to stop, waits for all pending operations
510 /// to complete, then joins the thread. This is the preferred way
511 /// to shut down.
512 ///
513 /// # Errors
514 ///
515 /// - `DbError::DetachFailed` if shutdown fails or times out
516 ///
517 /// # Example
518 ///
519 /// ```rust,no_run
520 /// # use aimdb_sync::*;
521 /// # fn example(handle: AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
522 /// handle.detach()?;
523 /// # Ok(())
524 /// # }
525 /// ```
526 pub fn detach(mut self) -> DbResult<()> {
527 self.detach_internal(None)
528 }
529
530 /// Gracefully shut down with a timeout.
531 ///
532 /// Like `detach()`, but fails if shutdown takes longer than
533 /// the specified duration.
534 ///
535 /// # Arguments
536 ///
537 /// - `timeout`: Maximum time to wait for shutdown
538 ///
539 /// # Errors
540 ///
541 /// - `DbError::DetachFailed` if shutdown fails or times out
542 ///
543 /// # Example
544 ///
545 /// ```rust,no_run
546 /// # use aimdb_sync::*;
547 /// # use std::time::Duration;
548 /// # fn example(handle: AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
549 /// handle.detach_timeout(Duration::from_secs(5))?;
550 /// # Ok(())
551 /// # }
552 /// ```
553 pub fn detach_timeout(mut self, timeout: Duration) -> DbResult<()> {
554 self.detach_internal(Some(timeout))
555 }
556
557 /// Internal detach implementation.
558 fn detach_internal(&mut self, timeout: Option<Duration>) -> DbResult<()> {
559 // Send shutdown signal
560 if let Some(shutdown_tx) = self.shutdown_tx.take() {
561 // Try to send shutdown signal (non-blocking)
562 // If it fails, the runtime may have already stopped
563 let _ = shutdown_tx.try_send(ShutdownSignal);
564 }
565
566 // Join the runtime thread
567 if let Some(thread_handle) = self.thread_handle.take() {
568 match timeout {
569 Some(duration) => {
570 // Join with timeout using a different approach since JoinHandle
571 // doesn't directly support timeouts
572 let handle_thread = thread::spawn(move || thread_handle.join());
573
574 // Wait for the thread to complete with timeout
575 let start = std::time::Instant::now();
576 loop {
577 if handle_thread.is_finished() {
578 break;
579 }
580 if start.elapsed() > duration {
581 return Err(DbError::DetachFailed {
582 message: format!(
583 "Runtime thread did not shut down within {:?}",
584 duration
585 ),
586 });
587 }
588 thread::sleep(Duration::from_millis(10));
589 }
590
591 // Retrieve the result
592 handle_thread
593 .join()
594 .map_err(|_| DbError::DetachFailed {
595 message: "Failed to join helper thread".to_string(),
596 })?
597 .map_err(|_| DbError::DetachFailed {
598 message: "Runtime thread panicked".to_string(),
599 })?;
600 }
601 None => {
602 // Join without timeout
603 thread_handle.join().map_err(|_| DbError::DetachFailed {
604 message: "Runtime thread panicked during shutdown".to_string(),
605 })?;
606 }
607 }
608 }
609
610 Ok(())
611 }
612}
613
614impl Drop for AimDbHandle {
615 /// Attempts graceful shutdown if `detach()` was not called.
616 ///
617 /// Logs a warning and attempts shutdown with a 5-second timeout.
618 /// If shutdown fails, the runtime thread may be left running.
619 fn drop(&mut self) {
620 if self.thread_handle.is_some() {
621 eprintln!("Warning: AimDbHandle dropped without calling detach()");
622 eprintln!("Attempting emergency shutdown with 5 second timeout");
623
624 let timeout = Duration::from_secs(5);
625 if let Err(e) = self.detach_internal(Some(timeout)) {
626 eprintln!("Error during emergency shutdown: {}", e);
627 }
628 }
629 }
630}
631
632// Safety: AimDbHandle owns the runtime thread and channels are Send + Sync
633unsafe impl Send for AimDbHandle {}
634unsafe impl Sync for AimDbHandle {}
635
636#[cfg(test)]
637mod tests {
638 #[test]
639 fn test_extension_trait_exists() {
640 // Just ensure the module compiles
641 // Actual functionality tests will come later
642 }
643}