aimdb_sync/
handle.rs

1//! AimDB handle for managing the sync API runtime thread.
2
3use aimdb_core::{AimDb, AimDbBuilder, DbError, DbResult};
4use aimdb_tokio_adapter::TokioAdapter;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::thread::{self, JoinHandle};
8use std::time::Duration;
9use tokio::sync::mpsc;
10
11/// Default channel capacity for sync producers and consumers.
12///
13/// This is the buffer size used by `producer()` and `consumer()` methods.
14/// A capacity of 100 provides a good balance between:
15/// - Memory usage (100 × sizeof(T) per channel)
16/// - Latency (small bursts don't block)
17/// - Backpressure (prevents unbounded growth)
18///
19/// Use `producer_with_capacity()` or `consumer_with_capacity()` if you need
20/// different buffering for specific record types.
21pub const DEFAULT_SYNC_CHANNEL_CAPACITY: usize = 100;
22
23/// Extension trait to add `attach()` method to `AimDbBuilder`.
24///
25/// This trait provides the entry point to the sync API by allowing
26/// an `AimDbBuilder` instance to build the database and attach it to
27/// a background runtime thread in one step.
28pub trait AimDbBuilderSyncExt {
29    /// Build the database inside a runtime thread and attach for sync API.
30    ///
31    /// This method takes a configured builder (WITH `.runtime(TokioAdapter)` set),
32    /// spawns a background thread with a Tokio runtime, builds the database
33    /// inside that context, and returns a sync handle.
34    ///
35    /// **Important**: Call `.runtime(Arc::new(TokioAdapter))` before `.attach()`.
36    /// Even though TokioAdapter is created in sync context, the actual building
37    /// happens in the async context where it can be used.
38    ///
39    /// # Errors
40    ///
41    /// - `DbError::RuntimeError` if the database fails to build
42    /// - `DbError::AttachFailed` if the runtime thread fails to start
43    ///
44    /// # Example
45    ///
46    /// ```rust,ignore
47    /// use aimdb_core::AimDbBuilder;
48    /// use aimdb_tokio_adapter::TokioAdapter;
49    /// use aimdb_sync::AimDbBuilderSyncExt;
50    /// use std::sync::Arc;
51    ///
52    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
53    /// let mut builder = AimDbBuilder::new()
54    ///     .runtime(Arc::new(TokioAdapter));  // Create adapter (it's just a marker)
55    /// builder.configure::<MyData>(|reg| {
56    ///     // Configure buffer, sources, taps, etc.
57    /// });
58    /// let handle = builder.attach()?;  // Build happens in runtime thread
59    /// # Ok(())
60    /// # }
61    /// ```
62    fn attach(self) -> DbResult<AimDbHandle>;
63}
64
65impl AimDbBuilderSyncExt for AimDbBuilder<TokioAdapter> {
66    fn attach(self) -> DbResult<AimDbHandle> {
67        AimDbHandle::new_from_builder(self)
68    }
69}
70
71/// Extension trait to add `attach()` method to `AimDb`.
72///
73/// This trait provides an alternative entry point to the sync API by allowing
74/// an already-built `AimDb` instance to be attached to a background runtime thread.
75pub trait AimDbSyncExt {
76    /// Attach the database to a background runtime thread.
77    ///
78    /// Takes ownership of the database and spawns a dedicated thread running
79    /// a Tokio runtime. Returns a handle for sync API access.
80    ///
81    /// # Errors
82    ///
83    /// - `DbError::AttachFailed` if the runtime thread fails to start
84    ///
85    /// # Example
86    ///
87    /// ```rust,ignore
88    /// use aimdb_core::AimDbBuilder;
89    /// use aimdb_tokio_adapter::TokioAdapter;
90    /// use aimdb_sync::AimDbSyncExt;
91    /// use std::sync::Arc;
92    ///
93    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
94    /// let db = AimDbBuilder::new()
95    ///     .runtime(Arc::new(TokioAdapter::new()?))
96    ///     .build()?;
97    ///
98    /// let handle = db.attach()?;
99    /// # Ok(())
100    /// # }
101    /// ```
102    fn attach(self) -> DbResult<AimDbHandle>;
103}
104
105impl AimDbSyncExt for AimDb<aimdb_tokio_adapter::TokioAdapter> {
106    fn attach(self) -> DbResult<AimDbHandle> {
107        AimDbHandle::new(self)
108    }
109}
110
111/// Handle to the AimDB runtime thread.
112///
113/// Created by calling `AimDb::attach()`. Provides factory methods
114/// for creating typed producers and consumers.
115///
116/// # Thread Safety
117///
118/// `AimDbHandle` is `Send + Sync` and can be shared across threads.
119/// However, it should typically be owned by one thread, with only
120/// the producers/consumers being cloned and shared.
121///
122/// # Resource Management
123///
124/// Call `detach()` explicitly to ensure clean shutdown. If the handle
125/// is dropped without calling `detach()`, a warning will be logged
126/// and an emergency shutdown will be attempted.
127pub struct AimDbHandle {
128    /// Thread handle for the runtime thread
129    thread_handle: Option<JoinHandle<()>>,
130
131    /// Shutdown signal sender
132    shutdown_tx: Option<mpsc::Sender<ShutdownSignal>>,
133
134    /// Tokio runtime handle for submitting async work
135    runtime_handle: tokio::runtime::Handle,
136
137    /// Shared reference to the database (protected by Arc for thread safety)
138    db: Arc<AimDb<TokioAdapter>>,
139}
140
141/// Signal to shut down the runtime thread.
142#[derive(Debug, Clone, Copy)]
143struct ShutdownSignal;
144
145impl AimDbHandle {
146    /// Create a new handle by spawning the runtime thread and building the database inside it.
147    pub(crate) fn new_from_builder(builder: AimDbBuilder<TokioAdapter>) -> DbResult<Self> {
148        // Create shutdown channel
149        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<ShutdownSignal>(1);
150
151        // Create channels for passing the built database and runtime handle back
152        let (db_tx, mut db_rx) = mpsc::channel::<Arc<AimDb<TokioAdapter>>>(1);
153        let (handle_tx, mut handle_rx) = mpsc::channel::<tokio::runtime::Handle>(1);
154
155        // Spawn the runtime thread
156        let thread_handle = thread::Builder::new()
157            .name("aimdb-sync-runtime".to_string())
158            .spawn(move || {
159                // Create a new Tokio runtime for this thread
160                let runtime = match tokio::runtime::Runtime::new() {
161                    Ok(rt) => rt,
162                    Err(e) => {
163                        eprintln!("Failed to create Tokio runtime: {}", e);
164                        return;
165                    }
166                };
167
168                // Get the runtime handle before moving into block_on
169                let rt_handle = runtime.handle().clone();
170
171                // Send the runtime handle to the main thread
172                if handle_tx.blocking_send(rt_handle).is_err() {
173                    eprintln!("Failed to send runtime handle to main thread");
174                    return;
175                }
176
177                // Build the database inside the async context
178                runtime.block_on(async move {
179                    // Build the database (now we're in Tokio context where it can spawn tasks!)
180                    let db = match builder.build().await {
181                        Ok(d) => Arc::new(d),
182                        Err(e) => {
183                            eprintln!("Failed to build database: {}", e);
184                            return;
185                        }
186                    };
187
188                    // Send the database to the main thread
189                    if db_tx.send(db.clone()).await.is_err() {
190                        eprintln!("Failed to send database to main thread");
191                        return;
192                    }
193
194                    // Wait for shutdown signal, keeping database alive
195                    let _ = shutdown_rx.recv().await;
196                });
197            })
198            .map_err(|e| DbError::AttachFailed {
199                message: format!("Failed to spawn runtime thread: {}", e),
200            })?;
201
202        // Wait for runtime handle to be available
203        let runtime_handle = handle_rx
204            .blocking_recv()
205            .ok_or_else(|| DbError::AttachFailed {
206                message: "Runtime thread failed to send handle".to_string(),
207            })?;
208
209        // Wait for database to be built
210        let db = db_rx.blocking_recv().ok_or_else(|| DbError::AttachFailed {
211            message: "Runtime thread failed to build database".to_string(),
212        })?;
213
214        Ok(Self {
215            thread_handle: Some(thread_handle),
216            shutdown_tx: Some(shutdown_tx),
217            runtime_handle,
218            db,
219        })
220    }
221
222    /// Create a new handle from an already-built database (legacy method).
223    #[allow(dead_code)]
224    pub(crate) fn new(db: AimDb<TokioAdapter>) -> DbResult<Self> {
225        // Create shutdown channel
226        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<ShutdownSignal>(1);
227
228        // Wrap database in Arc for sharing
229        let db = Arc::new(db);
230
231        // Spawn the runtime thread
232        let runtime_handle_result = Arc::new(std::sync::Mutex::new(None));
233        let runtime_handle_clone = runtime_handle_result.clone();
234
235        let thread_handle = thread::Builder::new()
236            .name("aimdb-sync-runtime".to_string())
237            .spawn(move || {
238                // Create a new Tokio runtime for this thread
239                let runtime = match tokio::runtime::Runtime::new() {
240                    Ok(rt) => rt,
241                    Err(e) => {
242                        eprintln!("Failed to create Tokio runtime: {}", e);
243                        return;
244                    }
245                };
246
247                // Store the runtime handle so the main thread can access it
248                {
249                    let mut handle = runtime_handle_clone.lock().unwrap();
250                    *handle = Some(runtime.handle().clone());
251                }
252
253                // Wait for shutdown signal
254                runtime.block_on(async move {
255                    let _ = shutdown_rx.recv().await;
256                    // When shutdown signal is received, we exit and drop the database
257                });
258            })
259            .map_err(|e| DbError::AttachFailed {
260                message: format!("Failed to spawn runtime thread: {}", e),
261            })?;
262
263        // Wait for runtime handle to be available
264        let runtime_handle = loop {
265            let handle_opt = runtime_handle_result.lock().unwrap().clone();
266            if let Some(handle) = handle_opt {
267                break handle;
268            }
269            thread::sleep(Duration::from_millis(1));
270        };
271
272        Ok(Self {
273            thread_handle: Some(thread_handle),
274            shutdown_tx: Some(shutdown_tx),
275            runtime_handle,
276            db,
277        })
278    }
279
280    /// Create a synchronous producer for type `T`.
281    ///
282    /// # Arguments
283    ///
284    /// - `key`: The record key identifying this record instance
285    ///
286    /// # Type Parameters
287    ///
288    /// - `T`: The record type, must implement `TypedRecord`
289    ///
290    /// # Errors
291    ///
292    /// - `DbError::RecordNotFound` if type `T` was not registered
293    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
294    ///
295    /// # Example
296    ///
297    /// ```rust,ignore
298    /// # use aimdb_sync::*;
299    /// # use serde::{Serialize, Deserialize};
300    /// # #[derive(Debug, Clone, Serialize, Deserialize)]
301    /// # struct Temperature { celsius: f32 }
302    /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
303    /// let producer = handle.producer::<Temperature>("sensor::temp")?;
304    /// producer.set(Temperature { celsius: 25.0 })?;
305    /// # Ok(())
306    /// # }
307    /// ```
308    pub fn producer<T>(&self, key: impl AsRef<str>) -> DbResult<crate::SyncProducer<T>>
309    where
310        T: Send + 'static + Debug + Clone,
311    {
312        self.producer_with_capacity(key, DEFAULT_SYNC_CHANNEL_CAPACITY)
313    }
314
315    /// Create a synchronous consumer for type `T`.
316    ///
317    /// # Arguments
318    ///
319    /// - `key`: The record key identifying this record instance
320    ///
321    /// # Type Parameters
322    ///
323    /// - `T`: The record type, must implement `TypedRecord`
324    ///
325    /// # Errors
326    ///
327    /// - `DbError::RecordNotFound` if type `T` was not registered
328    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
329    ///
330    /// # Example
331    ///
332    /// ```rust,no_run
333    /// # use aimdb_sync::*;
334    /// # use serde::{Serialize, Deserialize};
335    /// # #[derive(Clone, Debug, Serialize, Deserialize)]
336    /// # struct Temperature { celsius: f32 }
337    /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
338    /// let consumer = handle.consumer::<Temperature>("sensor::temp")?;
339    /// let temp = consumer.get()?;
340    /// # Ok(())
341    /// # }
342    /// ```
343    pub fn consumer<T>(&self, key: impl AsRef<str>) -> DbResult<crate::SyncConsumer<T>>
344    where
345        T: Send + Sync + 'static + Debug + Clone,
346    {
347        self.consumer_with_capacity(key, DEFAULT_SYNC_CHANNEL_CAPACITY)
348    }
349
350    /// Create a synchronous producer with custom channel capacity.
351    ///
352    /// Like `producer()` but allows specifying the channel buffer size.
353    /// Use this when you need different buffering characteristics for specific record types.
354    ///
355    /// # Arguments
356    ///
357    /// - `key`: The record key identifying this record instance
358    /// - `capacity`: Channel buffer size (number of items that can be buffered)
359    ///
360    /// # Type Parameters
361    ///
362    /// - `T`: The record type, must implement `TypedRecord`
363    ///
364    /// # Errors
365    ///
366    /// - `DbError::RecordNotFound` if type `T` was not registered
367    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
368    ///
369    /// # Example
370    ///
371    /// ```rust,ignore
372    /// # use aimdb_sync::*;
373    /// # use serde::{Serialize, Deserialize};
374    /// # #[derive(Debug, Clone, Serialize, Deserialize)]
375    /// # struct HighFrequencySensor { value: f32 }
376    /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
377    /// // High-frequency sensor needs larger buffer
378    /// let producer = handle.producer_with_capacity::<HighFrequencySensor>("sensor::high_freq", 1000)?;
379    /// producer.set(HighFrequencySensor { value: 42.0 })?;
380    /// # Ok(())
381    /// # }
382    /// ```
383    pub fn producer_with_capacity<T>(
384        &self,
385        key: impl AsRef<str>,
386        capacity: usize,
387    ) -> DbResult<crate::SyncProducer<T>>
388    where
389        T: Send + 'static + Debug + Clone,
390    {
391        // Create a bounded tokio channel for async/sync bridging
392        // Channel carries (value, result_sender) tuples to propagate errors back
393        let (tx, mut rx) =
394            mpsc::channel::<(T, tokio::sync::oneshot::Sender<DbResult<()>>)>(capacity);
395
396        // Spawn a task on the runtime to forward values to the database
397        let db = self.db.clone();
398        let record_key = key.as_ref().to_string();
399        self.runtime_handle.spawn(async move {
400            while let Some((value, result_tx)) = rx.recv().await {
401                // Forward the value to the database's produce pipeline
402                let result = db.produce(&record_key, value).await;
403
404                // Send the result back to the caller (may fail if caller dropped)
405                let _ = result_tx.send(result);
406            }
407        });
408
409        Ok(crate::SyncProducer::new(tx, self.runtime_handle.clone()))
410    }
411
412    /// Create a synchronous consumer with custom channel capacity.
413    ///
414    /// Like `consumer()` but allows specifying the channel buffer size.
415    /// Use this when you need different buffering characteristics for specific record types.
416    ///
417    /// # Arguments
418    ///
419    /// - `key`: The record key identifying this record instance
420    /// - `capacity`: Channel buffer size (number of items that can be buffered)
421    ///
422    /// # Type Parameters
423    ///
424    /// - `T`: The record type, must implement `TypedRecord`
425    ///
426    /// # Errors
427    ///
428    /// - `DbError::RecordNotFound` if type `T` was not registered
429    /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
430    ///
431    /// # Example
432    ///
433    /// ```rust,no_run
434    /// # use aimdb_sync::*;
435    /// # use serde::{Serialize, Deserialize};
436    /// # #[derive(Clone, Debug, Serialize, Deserialize)]
437    /// # struct RareEvent { id: u32 }
438    /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
439    /// // Rare events need smaller buffer
440    /// let consumer = handle.consumer_with_capacity::<RareEvent>("events::rare", 10)?;
441    /// let event = consumer.get()?;
442    /// # Ok(())
443    /// # }
444    /// ```
445    pub fn consumer_with_capacity<T>(
446        &self,
447        key: impl AsRef<str>,
448        capacity: usize,
449    ) -> DbResult<crate::SyncConsumer<T>>
450    where
451        T: Send + Sync + 'static + Debug + Clone,
452    {
453        // Create std::sync::mpsc channel for sync API
454        let (std_tx, std_rx) = std::sync::mpsc::sync_channel::<T>(capacity);
455
456        // Create a oneshot channel to confirm subscription succeeded
457        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
458
459        // Spawn a task on the runtime to forward buffer data to the std channel
460        let db = self.db.clone();
461        let record_key = key.as_ref().to_string();
462        self.runtime_handle.spawn(async move {
463            // Subscribe to the database buffer for type T
464            match db.subscribe::<T>(&record_key) {
465                Ok(mut reader) => {
466                    // Signal that subscription succeeded
467                    let _ = ready_tx.send(());
468
469                    // Forward all values from the buffer reader to the std channel
470                    loop {
471                        match reader.recv().await {
472                            Ok(value) => {
473                                // Send to std channel (non-async operation)
474                                // If the receiver is dropped, send() will fail
475                                if std_tx.send(value).is_err() {
476                                    break;
477                                }
478                            }
479                            Err(DbError::BufferLagged { lag_count, .. }) => {
480                                // Consumer fell behind - this is not fatal
481                                // Log warning but continue receiving
482                                eprintln!(
483                                    "Warning: Consumer for {} lagged by {} messages",
484                                    std::any::type_name::<T>(),
485                                    lag_count
486                                );
487                                // Don't break - next recv() will get latest data
488                            }
489                            Err(DbError::BufferClosed { .. }) => {
490                                // Buffer closed (shutdown) - exit gracefully
491                                break;
492                            }
493                            Err(e) => {
494                                // Other unexpected errors - log and stop
495                                eprintln!(
496                                    "Error reading from buffer for {}: {}",
497                                    std::any::type_name::<T>(),
498                                    e
499                                );
500                                break;
501                            }
502                        }
503                    }
504                }
505                Err(e) => {
506                    eprintln!(
507                        "Failed to subscribe to record type {}: {}",
508                        std::any::type_name::<T>(),
509                        e
510                    );
511                    // Signal failure (will be ignored if receiver dropped)
512                    let _ = ready_tx.send(());
513                }
514            }
515        });
516
517        // Wait for subscription to complete (with timeout)
518        ready_rx
519            .blocking_recv()
520            .map_err(|_| DbError::AttachFailed {
521                message: format!("Failed to subscribe to {}", std::any::type_name::<T>()),
522            })?;
523
524        Ok(crate::SyncConsumer::new(std_rx))
525    }
526
527    /// Gracefully shut down the runtime thread.
528    ///
529    /// Signals the runtime to stop, waits for all pending operations
530    /// to complete, then joins the thread. This is the preferred way
531    /// to shut down.
532    ///
533    /// # Errors
534    ///
535    /// - `DbError::DetachFailed` if shutdown fails or times out
536    ///
537    /// # Example
538    ///
539    /// ```rust,no_run
540    /// # use aimdb_sync::*;
541    /// # fn example(handle: AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
542    /// handle.detach()?;
543    /// # Ok(())
544    /// # }
545    /// ```
546    pub fn detach(mut self) -> DbResult<()> {
547        self.detach_internal(None)
548    }
549
550    /// Gracefully shut down with a timeout.
551    ///
552    /// Like `detach()`, but fails if shutdown takes longer than
553    /// the specified duration.
554    ///
555    /// # Arguments
556    ///
557    /// - `timeout`: Maximum time to wait for shutdown
558    ///
559    /// # Errors
560    ///
561    /// - `DbError::DetachFailed` if shutdown fails or times out
562    ///
563    /// # Example
564    ///
565    /// ```rust,no_run
566    /// # use aimdb_sync::*;
567    /// # use std::time::Duration;
568    /// # fn example(handle: AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
569    /// handle.detach_timeout(Duration::from_secs(5))?;
570    /// # Ok(())
571    /// # }
572    /// ```
573    pub fn detach_timeout(mut self, timeout: Duration) -> DbResult<()> {
574        self.detach_internal(Some(timeout))
575    }
576
577    /// Internal detach implementation.
578    fn detach_internal(&mut self, timeout: Option<Duration>) -> DbResult<()> {
579        // Send shutdown signal
580        if let Some(shutdown_tx) = self.shutdown_tx.take() {
581            // Try to send shutdown signal (non-blocking)
582            // If it fails, the runtime may have already stopped
583            let _ = shutdown_tx.try_send(ShutdownSignal);
584        }
585
586        // Join the runtime thread
587        if let Some(thread_handle) = self.thread_handle.take() {
588            match timeout {
589                Some(duration) => {
590                    // Join with timeout using a different approach since JoinHandle
591                    // doesn't directly support timeouts
592                    let handle_thread = thread::spawn(move || thread_handle.join());
593
594                    // Wait for the thread to complete with timeout
595                    let start = std::time::Instant::now();
596                    loop {
597                        if handle_thread.is_finished() {
598                            break;
599                        }
600                        if start.elapsed() > duration {
601                            return Err(DbError::DetachFailed {
602                                message: format!(
603                                    "Runtime thread did not shut down within {:?}",
604                                    duration
605                                ),
606                            });
607                        }
608                        thread::sleep(Duration::from_millis(10));
609                    }
610
611                    // Retrieve the result
612                    handle_thread
613                        .join()
614                        .map_err(|_| DbError::DetachFailed {
615                            message: "Failed to join helper thread".to_string(),
616                        })?
617                        .map_err(|_| DbError::DetachFailed {
618                            message: "Runtime thread panicked".to_string(),
619                        })?;
620                }
621                None => {
622                    // Join without timeout
623                    thread_handle.join().map_err(|_| DbError::DetachFailed {
624                        message: "Runtime thread panicked during shutdown".to_string(),
625                    })?;
626                }
627            }
628        }
629
630        Ok(())
631    }
632}
633
634impl Drop for AimDbHandle {
635    /// Attempts graceful shutdown if `detach()` was not called.
636    ///
637    /// Logs a warning and attempts shutdown with a 5-second timeout.
638    /// If shutdown fails, the runtime thread may be left running.
639    fn drop(&mut self) {
640        if self.thread_handle.is_some() {
641            eprintln!("Warning: AimDbHandle dropped without calling detach()");
642            eprintln!("Attempting emergency shutdown with 5 second timeout");
643
644            let timeout = Duration::from_secs(5);
645            if let Err(e) = self.detach_internal(Some(timeout)) {
646                eprintln!("Error during emergency shutdown: {}", e);
647            }
648        }
649    }
650}
651
652// Safety: AimDbHandle owns the runtime thread and channels are Send + Sync
653unsafe impl Send for AimDbHandle {}
654unsafe impl Sync for AimDbHandle {}
655
656#[cfg(test)]
657mod tests {
658    #[test]
659    fn test_extension_trait_exists() {
660        // Just ensure the module compiles
661        // Actual functionality tests will come later
662    }
663}