aimdb_sync/
handle.rs

1//! AimDB handle for managing the sync API runtime thread.
2
3use aimdb_core::{AimDb, AimDbBuilder, DbError, DbResult};
4use aimdb_tokio_adapter::TokioAdapter;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::thread::{self, JoinHandle};
8use std::time::Duration;
9use tokio::sync::mpsc;
10
11/// Default channel capacity for sync producers and consumers.
12///
13/// This is the buffer size used by `producer()` and `consumer()` methods.
14/// A capacity of 100 provides a good balance between:
15/// - Memory usage (100 × sizeof(T) per channel)
16/// - Latency (small bursts don't block)
17/// - Backpressure (prevents unbounded growth)
18///
19/// Use `producer_with_capacity()` or `consumer_with_capacity()` if you need
20/// different buffering for specific record types.
21pub const DEFAULT_SYNC_CHANNEL_CAPACITY: usize = 100;
22
23/// Extension trait to add `attach()` method to `AimDbBuilder`.
24///
25/// This trait provides the entry point to the sync API by allowing
26/// an `AimDbBuilder` instance to build the database and attach it to
27/// a background runtime thread in one step.
28pub trait AimDbBuilderSyncExt {
29    /// Build the database inside a runtime thread and attach for sync API.
30    ///
31    /// This method takes a configured builder (WITH `.runtime(TokioAdapter)` set),
32    /// spawns a background thread with a Tokio runtime, builds the database
33    /// inside that context, and returns a sync handle.
34    ///
35    /// **Important**: Call `.runtime(Arc::new(TokioAdapter))` before `.attach()`.
36    /// Even though TokioAdapter is created in sync context, the actual building
37    /// happens in the async context where it can be used.
38    ///
39    /// # Errors
40    ///
41    /// - `DbError::RuntimeError` if the database fails to build
42    /// - `DbError::AttachFailed` if the runtime thread fails to start
43    ///
44    /// # Example
45    ///
46    /// ```rust,ignore
47    /// use aimdb_core::AimDbBuilder;
48    /// use aimdb_tokio_adapter::TokioAdapter;
49    /// use aimdb_sync::AimDbBuilderSyncExt;
50    /// use std::sync::Arc;
51    ///
52    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
53    /// let mut builder = AimDbBuilder::new()
54    ///     .runtime(Arc::new(TokioAdapter));  // Create adapter (it's just a marker)
55    /// builder.configure::<MyData>(|reg| {
56    ///     // Configure buffer, sources, taps, etc.
57    /// });
58    /// let handle = builder.attach()?;  // Build happens in runtime thread
59    /// # Ok(())
60    /// # }
61    /// ```
62    fn attach(self) -> DbResult<AimDbHandle>;
63}
64
65impl AimDbBuilderSyncExt for AimDbBuilder<TokioAdapter> {
66    fn attach(self) -> DbResult<AimDbHandle> {
67        AimDbHandle::new_from_builder(self)
68    }
69}
70
71/// Extension trait to add `attach()` method to `AimDb`.
72///
73/// This trait provides an alternative entry point to the sync API by allowing
74/// an already-built `AimDb` instance to be attached to a background runtime thread.
75pub trait AimDbSyncExt {
76    /// Attach the database to a background runtime thread.
77    ///
78    /// Takes ownership of the database and spawns a dedicated thread running
79    /// a Tokio runtime. Returns a handle for sync API access.
80    ///
81    /// # Errors
82    ///
83    /// - `DbError::AttachFailed` if the runtime thread fails to start
84    ///
85    /// # Example
86    ///
87    /// ```rust,ignore
88    /// use aimdb_core::AimDbBuilder;
89    /// use aimdb_tokio_adapter::TokioAdapter;
90    /// use aimdb_sync::AimDbSyncExt;
91    /// use std::sync::Arc;
92    ///
93    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
94    /// let db = AimDbBuilder::new()
95    ///     .runtime(Arc::new(TokioAdapter::new()?))
96    ///     .build()?;
97    ///
98    /// let handle = db.attach()?;
99    /// # Ok(())
100    /// # }
101    /// ```
102    fn attach(self) -> DbResult<AimDbHandle>;
103}
104
105impl AimDbSyncExt for AimDb<aimdb_tokio_adapter::TokioAdapter> {
106    fn attach(self) -> DbResult<AimDbHandle> {
107        AimDbHandle::new(self)
108    }
109}
110
111/// Handle to the AimDB runtime thread.
112///
113/// Created by calling `AimDb::attach()`. Provides factory methods
114/// for creating typed producers and consumers.
115///
116/// # Thread Safety
117///
118/// `AimDbHandle` is `Send + Sync` and can be shared across threads.
119/// However, it should typically be owned by one thread, with only
120/// the producers/consumers being cloned and shared.
121///
122/// # Resource Management
123///
124/// Call `detach()` explicitly to ensure clean shutdown. If the handle
125/// is dropped without calling `detach()`, a warning will be logged
126/// and an emergency shutdown will be attempted.
127pub struct AimDbHandle {
128    /// Thread handle for the runtime thread
129    thread_handle: Option<JoinHandle<()>>,
130
131    /// Shutdown signal sender
132    shutdown_tx: Option<mpsc::Sender<ShutdownSignal>>,
133
134    /// Tokio runtime handle for submitting async work
135    runtime_handle: tokio::runtime::Handle,
136
137    /// Shared reference to the database (protected by Arc for thread safety)
138    db: Arc<AimDb<TokioAdapter>>,
139}
140
141/// Signal to shut down the runtime thread.
142#[derive(Debug, Clone, Copy)]
143struct ShutdownSignal;
144
145impl AimDbHandle {
146    /// Create a new handle by spawning the runtime thread and building the database inside it.
147    pub(crate) fn new_from_builder(builder: AimDbBuilder<TokioAdapter>) -> DbResult<Self> {
148        // Create shutdown channel
149        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<ShutdownSignal>(1);
150
151        // Create channels for passing the built database and runtime handle back
152        let (db_tx, mut db_rx) = mpsc::channel::<Arc<AimDb<TokioAdapter>>>(1);
153        let (handle_tx, mut handle_rx) = mpsc::channel::<tokio::runtime::Handle>(1);
154
155        // Spawn the runtime thread
156        let thread_handle = thread::Builder::new()
157            .name("aimdb-sync-runtime".to_string())
158            .spawn(move || {
159                // Create a new Tokio runtime for this thread
160                let runtime = match tokio::runtime::Runtime::new() {
161                    Ok(rt) => rt,
162                    Err(e) => {
163                        eprintln!("Failed to create Tokio runtime: {}", e);
164                        return;
165                    }
166                };
167
168                // Get the runtime handle before moving into block_on
169                let rt_handle = runtime.handle().clone();
170
171                // Send the runtime handle to the main thread
172                if handle_tx.blocking_send(rt_handle).is_err() {
173                    eprintln!("Failed to send runtime handle to main thread");
174                    return;
175                }
176
177                // Build the database inside the async context
178                runtime.block_on(async move {
179                    // Build the database (now we're in Tokio context where it can spawn tasks!)
180                    let db = match builder.build().await {
181                        Ok(d) => Arc::new(d),
182                        Err(e) => {
183                            eprintln!("Failed to build database: {}", e);
184                            return;
185                        }
186                    };
187
188                    // Send the database to the main thread
189                    if db_tx.send(db.clone()).await.is_err() {
190                        eprintln!("Failed to send database to main thread");
191                        return;
192                    }
193
194                    // Wait for shutdown signal, keeping database alive
195                    let _ = shutdown_rx.recv().await;
196                });
197            })
198            .map_err(|e| DbError::AttachFailed {
199                message: format!("Failed to spawn runtime thread: {}", e),
200            })?;
201
202        // Wait for runtime handle to be available
203        let runtime_handle = handle_rx
204            .blocking_recv()
205            .ok_or_else(|| DbError::AttachFailed {
206                message: "Runtime thread failed to send handle".to_string(),
207            })?;
208
209        // Wait for database to be built
210        let db = db_rx.blocking_recv().ok_or_else(|| DbError::AttachFailed {
211            message: "Runtime thread failed to build database".to_string(),
212        })?;
213
214        Ok(Self {
215            thread_handle: Some(thread_handle),
216            shutdown_tx: Some(shutdown_tx),
217            runtime_handle,
218            db,
219        })
220    }
221
222    /// Create a new handle from an already-built database (legacy method).
223    #[allow(dead_code)]
224    pub(crate) fn new(db: AimDb<TokioAdapter>) -> DbResult<Self> {
225        // Create shutdown channel
226        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<ShutdownSignal>(1);
227
228        // Wrap database in Arc for sharing
229        let db = Arc::new(db);
230
231        // Spawn the runtime thread
232        let runtime_handle_result = Arc::new(std::sync::Mutex::new(None));
233        let runtime_handle_clone = runtime_handle_result.clone();
234
235        let thread_handle = thread::Builder::new()
236            .name("aimdb-sync-runtime".to_string())
237            .spawn(move || {
238                // Create a new Tokio runtime for this thread
239                let runtime = match tokio::runtime::Runtime::new() {
240                    Ok(rt) => rt,
241                    Err(e) => {
242                        eprintln!("Failed to create Tokio runtime: {}", e);
243                        return;
244                    }
245                };
246
247                // Store the runtime handle so the main thread can access it
248                {
249                    let mut handle = runtime_handle_clone.lock().unwrap();
250                    *handle = Some(runtime.handle().clone());
251                }
252
253                // Wait for shutdown signal
254                runtime.block_on(async move {
255                    let _ = shutdown_rx.recv().await;
256                    // When shutdown signal is received, we exit and drop the database
257                });
258            })
259            .map_err(|e| DbError::AttachFailed {
260                message: format!("Failed to spawn runtime thread: {}", e),
261            })?;
262
263        // Wait for runtime handle to be available
264        let runtime_handle = loop {
265            let handle_opt = runtime_handle_result.lock().unwrap().clone();
266            if let Some(handle) = handle_opt {
267                break handle;
268            }
269            thread::sleep(Duration::from_millis(1));
270        };
271
272        Ok(Self {
273            thread_handle: Some(thread_handle),
274            shutdown_tx: Some(shutdown_tx),
275            runtime_handle,
276            db,
277        })
278    }
279
280    /// Create a synchronous producer for type `T`.
281    ///
282    /// # Type Parameters
283    ///
284    /// - `T`: The record type, must implement `TypedRecord`
285    ///
286    /// # Errors
287    ///
288    /// - `DbError::RecordNotFound` if type `T` was not registered
289    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
290    ///
291    /// # Example
292    ///
293    /// ```rust,ignore
294    /// # use aimdb_sync::*;
295    /// # use serde::{Serialize, Deserialize};
296    /// # #[derive(Debug, Clone, Serialize, Deserialize)]
297    /// # struct Temperature { celsius: f32 }
298    /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
299    /// let producer = handle.producer::<Temperature>()?;
300    /// producer.set(Temperature { celsius: 25.0 })?;
301    /// # Ok(())
302    /// # }
303    /// ```
304    pub fn producer<T>(&self) -> DbResult<crate::SyncProducer<T>>
305    where
306        T: Send + 'static + Debug + Clone,
307    {
308        self.producer_with_capacity(DEFAULT_SYNC_CHANNEL_CAPACITY)
309    }
310
311    /// Create a synchronous consumer for type `T`.
312    ///
313    /// # Type Parameters
314    ///
315    /// - `T`: The record type, must implement `TypedRecord`
316    ///
317    /// # Errors
318    ///
319    /// - `DbError::RecordNotFound` if type `T` was not registered
320    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
321    ///
322    /// # Example
323    ///
324    /// ```rust,no_run
325    /// # use aimdb_sync::*;
326    /// # use serde::{Serialize, Deserialize};
327    /// # #[derive(Clone, Debug, Serialize, Deserialize)]
328    /// # struct Temperature { celsius: f32 }
329    /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
330    /// let consumer = handle.consumer::<Temperature>()?;
331    /// let temp = consumer.get()?;
332    /// # Ok(())
333    /// # }
334    /// ```
335    pub fn consumer<T>(&self) -> DbResult<crate::SyncConsumer<T>>
336    where
337        T: Send + Sync + 'static + Debug + Clone,
338    {
339        self.consumer_with_capacity(DEFAULT_SYNC_CHANNEL_CAPACITY)
340    }
341
342    /// Create a synchronous producer with custom channel capacity.
343    ///
344    /// Like `producer()` but allows specifying the channel buffer size.
345    /// Use this when you need different buffering characteristics for specific record types.
346    ///
347    /// # Arguments
348    ///
349    /// - `capacity`: Channel buffer size (number of items that can be buffered)
350    ///
351    /// # Type Parameters
352    ///
353    /// - `T`: The record type, must implement `TypedRecord`
354    ///
355    /// # Errors
356    ///
357    /// - `DbError::RecordNotFound` if type `T` was not registered
358    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
359    ///
360    /// # Example
361    ///
362    /// ```rust,ignore
363    /// # use aimdb_sync::*;
364    /// # use serde::{Serialize, Deserialize};
365    /// # #[derive(Debug, Clone, Serialize, Deserialize)]
366    /// # struct HighFrequencySensor { value: f32 }
367    /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
368    /// // High-frequency sensor needs larger buffer
369    /// let producer = handle.producer_with_capacity::<HighFrequencySensor>(1000)?;
370    /// producer.set(HighFrequencySensor { value: 42.0 })?;
371    /// # Ok(())
372    /// # }
373    /// ```
374    pub fn producer_with_capacity<T>(&self, capacity: usize) -> DbResult<crate::SyncProducer<T>>
375    where
376        T: Send + 'static + Debug + Clone,
377    {
378        // Create a bounded tokio channel for async/sync bridging
379        // Channel carries (value, result_sender) tuples to propagate errors back
380        let (tx, mut rx) =
381            mpsc::channel::<(T, tokio::sync::oneshot::Sender<DbResult<()>>)>(capacity);
382
383        // Spawn a task on the runtime to forward values to the database
384        let db = self.db.clone();
385        self.runtime_handle.spawn(async move {
386            while let Some((value, result_tx)) = rx.recv().await {
387                // Forward the value to the database's produce pipeline
388                let result = db.produce(value).await;
389
390                // Send the result back to the caller (may fail if caller dropped)
391                let _ = result_tx.send(result);
392            }
393        });
394
395        Ok(crate::SyncProducer::new(tx, self.runtime_handle.clone()))
396    }
397
398    /// Create a synchronous consumer with custom channel capacity.
399    ///
400    /// Like `consumer()` but allows specifying the channel buffer size.
401    /// Use this when you need different buffering characteristics for specific record types.
402    ///
403    /// # Arguments
404    ///
405    /// - `capacity`: Channel buffer size (number of items that can be buffered)
406    ///
407    /// # Type Parameters
408    ///
409    /// - `T`: The record type, must implement `TypedRecord`
410    ///
411    /// # Errors
412    ///
413    /// - `DbError::RecordNotFound` if type `T` was not registered
414    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
415    ///
416    /// # Example
417    ///
418    /// ```rust,no_run
419    /// # use aimdb_sync::*;
420    /// # use serde::{Serialize, Deserialize};
421    /// # #[derive(Clone, Debug, Serialize, Deserialize)]
422    /// # struct RareEvent { id: u32 }
423    /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
424    /// // Rare events need smaller buffer
425    /// let consumer = handle.consumer_with_capacity::<RareEvent>(10)?;
426    /// let event = consumer.get()?;
427    /// # Ok(())
428    /// # }
429    /// ```
430    pub fn consumer_with_capacity<T>(&self, capacity: usize) -> DbResult<crate::SyncConsumer<T>>
431    where
432        T: Send + Sync + 'static + Debug + Clone,
433    {
434        // Create std::sync::mpsc channel for sync API
435        let (std_tx, std_rx) = std::sync::mpsc::sync_channel::<T>(capacity);
436
437        // Create a oneshot channel to confirm subscription succeeded
438        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
439
440        // Spawn a task on the runtime to forward buffer data to the std channel
441        let db = self.db.clone();
442        self.runtime_handle.spawn(async move {
443            // Subscribe to the database buffer for type T
444            match db.subscribe::<T>() {
445                Ok(mut reader) => {
446                    // Signal that subscription succeeded
447                    let _ = ready_tx.send(());
448
449                    // Forward all values from the buffer reader to the std channel
450                    loop {
451                        match reader.recv().await {
452                            Ok(value) => {
453                                // Send to std channel (non-async operation)
454                                // If the receiver is dropped, send() will fail
455                                if std_tx.send(value).is_err() {
456                                    break;
457                                }
458                            }
459                            Err(DbError::BufferLagged { lag_count, .. }) => {
460                                // Consumer fell behind - this is not fatal
461                                // Log warning but continue receiving
462                                eprintln!(
463                                    "Warning: Consumer for {} lagged by {} messages",
464                                    std::any::type_name::<T>(),
465                                    lag_count
466                                );
467                                // Don't break - next recv() will get latest data
468                            }
469                            Err(DbError::BufferClosed { .. }) => {
470                                // Buffer closed (shutdown) - exit gracefully
471                                break;
472                            }
473                            Err(e) => {
474                                // Other unexpected errors - log and stop
475                                eprintln!(
476                                    "Error reading from buffer for {}: {}",
477                                    std::any::type_name::<T>(),
478                                    e
479                                );
480                                break;
481                            }
482                        }
483                    }
484                }
485                Err(e) => {
486                    eprintln!(
487                        "Failed to subscribe to record type {}: {}",
488                        std::any::type_name::<T>(),
489                        e
490                    );
491                    // Signal failure (will be ignored if receiver dropped)
492                    let _ = ready_tx.send(());
493                }
494            }
495        });
496
497        // Wait for subscription to complete (with timeout)
498        ready_rx
499            .blocking_recv()
500            .map_err(|_| DbError::AttachFailed {
501                message: format!("Failed to subscribe to {}", std::any::type_name::<T>()),
502            })?;
503
504        Ok(crate::SyncConsumer::new(std_rx))
505    }
506
507    /// Gracefully shut down the runtime thread.
508    ///
509    /// Signals the runtime to stop, waits for all pending operations
510    /// to complete, then joins the thread. This is the preferred way
511    /// to shut down.
512    ///
513    /// # Errors
514    ///
515    /// - `DbError::DetachFailed` if shutdown fails or times out
516    ///
517    /// # Example
518    ///
519    /// ```rust,no_run
520    /// # use aimdb_sync::*;
521    /// # fn example(handle: AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
522    /// handle.detach()?;
523    /// # Ok(())
524    /// # }
525    /// ```
526    pub fn detach(mut self) -> DbResult<()> {
527        self.detach_internal(None)
528    }
529
530    /// Gracefully shut down with a timeout.
531    ///
532    /// Like `detach()`, but fails if shutdown takes longer than
533    /// the specified duration.
534    ///
535    /// # Arguments
536    ///
537    /// - `timeout`: Maximum time to wait for shutdown
538    ///
539    /// # Errors
540    ///
541    /// - `DbError::DetachFailed` if shutdown fails or times out
542    ///
543    /// # Example
544    ///
545    /// ```rust,no_run
546    /// # use aimdb_sync::*;
547    /// # use std::time::Duration;
548    /// # fn example(handle: AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
549    /// handle.detach_timeout(Duration::from_secs(5))?;
550    /// # Ok(())
551    /// # }
552    /// ```
553    pub fn detach_timeout(mut self, timeout: Duration) -> DbResult<()> {
554        self.detach_internal(Some(timeout))
555    }
556
557    /// Internal detach implementation.
558    fn detach_internal(&mut self, timeout: Option<Duration>) -> DbResult<()> {
559        // Send shutdown signal
560        if let Some(shutdown_tx) = self.shutdown_tx.take() {
561            // Try to send shutdown signal (non-blocking)
562            // If it fails, the runtime may have already stopped
563            let _ = shutdown_tx.try_send(ShutdownSignal);
564        }
565
566        // Join the runtime thread
567        if let Some(thread_handle) = self.thread_handle.take() {
568            match timeout {
569                Some(duration) => {
570                    // Join with timeout using a different approach since JoinHandle
571                    // doesn't directly support timeouts
572                    let handle_thread = thread::spawn(move || thread_handle.join());
573
574                    // Wait for the thread to complete with timeout
575                    let start = std::time::Instant::now();
576                    loop {
577                        if handle_thread.is_finished() {
578                            break;
579                        }
580                        if start.elapsed() > duration {
581                            return Err(DbError::DetachFailed {
582                                message: format!(
583                                    "Runtime thread did not shut down within {:?}",
584                                    duration
585                                ),
586                            });
587                        }
588                        thread::sleep(Duration::from_millis(10));
589                    }
590
591                    // Retrieve the result
592                    handle_thread
593                        .join()
594                        .map_err(|_| DbError::DetachFailed {
595                            message: "Failed to join helper thread".to_string(),
596                        })?
597                        .map_err(|_| DbError::DetachFailed {
598                            message: "Runtime thread panicked".to_string(),
599                        })?;
600                }
601                None => {
602                    // Join without timeout
603                    thread_handle.join().map_err(|_| DbError::DetachFailed {
604                        message: "Runtime thread panicked during shutdown".to_string(),
605                    })?;
606                }
607            }
608        }
609
610        Ok(())
611    }
612}
613
614impl Drop for AimDbHandle {
615    /// Attempts graceful shutdown if `detach()` was not called.
616    ///
617    /// Logs a warning and attempts shutdown with a 5-second timeout.
618    /// If shutdown fails, the runtime thread may be left running.
619    fn drop(&mut self) {
620        if self.thread_handle.is_some() {
621            eprintln!("Warning: AimDbHandle dropped without calling detach()");
622            eprintln!("Attempting emergency shutdown with 5 second timeout");
623
624            let timeout = Duration::from_secs(5);
625            if let Err(e) = self.detach_internal(Some(timeout)) {
626                eprintln!("Error during emergency shutdown: {}", e);
627            }
628        }
629    }
630}
631
632// Safety: AimDbHandle owns the runtime thread and channels are Send + Sync
633unsafe impl Send for AimDbHandle {}
634unsafe impl Sync for AimDbHandle {}
635
636#[cfg(test)]
637mod tests {
638    #[test]
639    fn test_extension_trait_exists() {
640        // Just ensure the module compiles
641        // Actual functionality tests will come later
642    }
643}