aimdb_sync/handle.rs
1//! AimDB handle for managing the sync API runtime thread.
2
3use aimdb_core::{AimDb, AimDbBuilder, DbError, DbResult};
4use aimdb_tokio_adapter::TokioAdapter;
5use std::fmt::Debug;
6use std::sync::Arc;
7use std::thread::{self, JoinHandle};
8use std::time::Duration;
9use tokio::sync::mpsc;
10
11/// Default channel capacity for sync producers and consumers.
12///
13/// This is the buffer size used by `producer()` and `consumer()` methods.
14/// A capacity of 100 provides a good balance between:
15/// - Memory usage (100 × sizeof(T) per channel)
16/// - Latency (small bursts don't block)
17/// - Backpressure (prevents unbounded growth)
18///
19/// Use `producer_with_capacity()` or `consumer_with_capacity()` if you need
20/// different buffering for specific record types.
21pub const DEFAULT_SYNC_CHANNEL_CAPACITY: usize = 100;
22
23/// Extension trait to add `attach()` method to `AimDbBuilder`.
24///
25/// This trait provides the entry point to the sync API by allowing
26/// an `AimDbBuilder` instance to build the database and attach it to
27/// a background runtime thread in one step.
28pub trait AimDbBuilderSyncExt {
29 /// Build the database inside a runtime thread and attach for sync API.
30 ///
31 /// This method takes a configured builder (WITH `.runtime(TokioAdapter)` set),
32 /// spawns a background thread with a Tokio runtime, builds the database
33 /// inside that context, and returns a sync handle.
34 ///
35 /// **Important**: Call `.runtime(Arc::new(TokioAdapter))` before `.attach()`.
36 /// Even though TokioAdapter is created in sync context, the actual building
37 /// happens in the async context where it can be used.
38 ///
39 /// # Errors
40 ///
41 /// - `DbError::RuntimeError` if the database fails to build
42 /// - `DbError::AttachFailed` if the runtime thread fails to start
43 ///
44 /// # Example
45 ///
46 /// ```rust,ignore
47 /// use aimdb_core::AimDbBuilder;
48 /// use aimdb_tokio_adapter::TokioAdapter;
49 /// use aimdb_sync::AimDbBuilderSyncExt;
50 /// use std::sync::Arc;
51 ///
52 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
53 /// let mut builder = AimDbBuilder::new()
54 /// .runtime(Arc::new(TokioAdapter)); // Create adapter (it's just a marker)
55 /// builder.configure::<MyData>(|reg| {
56 /// // Configure buffer, sources, taps, etc.
57 /// });
58 /// let handle = builder.attach()?; // Build happens in runtime thread
59 /// # Ok(())
60 /// # }
61 /// ```
62 fn attach(self) -> DbResult<AimDbHandle>;
63}
64
65impl AimDbBuilderSyncExt for AimDbBuilder<TokioAdapter> {
66 fn attach(self) -> DbResult<AimDbHandle> {
67 AimDbHandle::new_from_builder(self)
68 }
69}
70
71/// Extension trait to add `attach()` method to `AimDb`.
72///
73/// This trait provides an alternative entry point to the sync API by allowing
74/// an already-built `AimDb` instance to be attached to a background runtime thread.
75pub trait AimDbSyncExt {
76 /// Attach the database to a background runtime thread.
77 ///
78 /// Takes ownership of the database and spawns a dedicated thread running
79 /// a Tokio runtime. Returns a handle for sync API access.
80 ///
81 /// # Errors
82 ///
83 /// - `DbError::AttachFailed` if the runtime thread fails to start
84 ///
85 /// # Example
86 ///
87 /// ```rust,ignore
88 /// use aimdb_core::AimDbBuilder;
89 /// use aimdb_tokio_adapter::TokioAdapter;
90 /// use aimdb_sync::AimDbSyncExt;
91 /// use std::sync::Arc;
92 ///
93 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
94 /// let db = AimDbBuilder::new()
95 /// .runtime(Arc::new(TokioAdapter::new()?))
96 /// .build()?;
97 ///
98 /// let handle = db.attach()?;
99 /// # Ok(())
100 /// # }
101 /// ```
102 fn attach(self) -> DbResult<AimDbHandle>;
103}
104
105impl AimDbSyncExt for AimDb<aimdb_tokio_adapter::TokioAdapter> {
106 fn attach(self) -> DbResult<AimDbHandle> {
107 AimDbHandle::new(self)
108 }
109}
110
111/// Handle to the AimDB runtime thread.
112///
113/// Created by calling `AimDb::attach()`. Provides factory methods
114/// for creating typed producers and consumers.
115///
116/// # Thread Safety
117///
118/// `AimDbHandle` is `Send + Sync` and can be shared across threads.
119/// However, it should typically be owned by one thread, with only
120/// the producers/consumers being cloned and shared.
121///
122/// # Resource Management
123///
124/// Call `detach()` explicitly to ensure clean shutdown. If the handle
125/// is dropped without calling `detach()`, a warning will be logged
126/// and an emergency shutdown will be attempted.
127pub struct AimDbHandle {
128 /// Thread handle for the runtime thread
129 thread_handle: Option<JoinHandle<()>>,
130
131 /// Shutdown signal sender
132 shutdown_tx: Option<mpsc::Sender<ShutdownSignal>>,
133
134 /// Tokio runtime handle for submitting async work
135 runtime_handle: tokio::runtime::Handle,
136
137 /// Shared reference to the database (protected by Arc for thread safety)
138 db: Arc<AimDb<TokioAdapter>>,
139}
140
141/// Signal to shut down the runtime thread.
142#[derive(Debug, Clone, Copy)]
143struct ShutdownSignal;
144
145impl AimDbHandle {
146 /// Create a new handle by spawning the runtime thread and building the database inside it.
147 pub(crate) fn new_from_builder(builder: AimDbBuilder<TokioAdapter>) -> DbResult<Self> {
148 // Create shutdown channel
149 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<ShutdownSignal>(1);
150
151 // Create channels for passing the built database and runtime handle back
152 let (db_tx, mut db_rx) = mpsc::channel::<Arc<AimDb<TokioAdapter>>>(1);
153 let (handle_tx, mut handle_rx) = mpsc::channel::<tokio::runtime::Handle>(1);
154
155 // Spawn the runtime thread
156 let thread_handle = thread::Builder::new()
157 .name("aimdb-sync-runtime".to_string())
158 .spawn(move || {
159 // Create a new Tokio runtime for this thread
160 let runtime = match tokio::runtime::Runtime::new() {
161 Ok(rt) => rt,
162 Err(e) => {
163 eprintln!("Failed to create Tokio runtime: {}", e);
164 return;
165 }
166 };
167
168 // Get the runtime handle before moving into block_on
169 let rt_handle = runtime.handle().clone();
170
171 // Send the runtime handle to the main thread
172 if handle_tx.blocking_send(rt_handle).is_err() {
173 eprintln!("Failed to send runtime handle to main thread");
174 return;
175 }
176
177 // Build the database inside the async context
178 runtime.block_on(async move {
179 // Build the database (now we're in Tokio context where it can spawn tasks!)
180 let db = match builder.build().await {
181 Ok(d) => Arc::new(d),
182 Err(e) => {
183 eprintln!("Failed to build database: {}", e);
184 return;
185 }
186 };
187
188 // Send the database to the main thread
189 if db_tx.send(db.clone()).await.is_err() {
190 eprintln!("Failed to send database to main thread");
191 return;
192 }
193
194 // Wait for shutdown signal, keeping database alive
195 let _ = shutdown_rx.recv().await;
196 });
197 })
198 .map_err(|e| DbError::AttachFailed {
199 message: format!("Failed to spawn runtime thread: {}", e),
200 })?;
201
202 // Wait for runtime handle to be available
203 let runtime_handle = handle_rx
204 .blocking_recv()
205 .ok_or_else(|| DbError::AttachFailed {
206 message: "Runtime thread failed to send handle".to_string(),
207 })?;
208
209 // Wait for database to be built
210 let db = db_rx.blocking_recv().ok_or_else(|| DbError::AttachFailed {
211 message: "Runtime thread failed to build database".to_string(),
212 })?;
213
214 Ok(Self {
215 thread_handle: Some(thread_handle),
216 shutdown_tx: Some(shutdown_tx),
217 runtime_handle,
218 db,
219 })
220 }
221
222 /// Create a new handle from an already-built database (legacy method).
223 #[allow(dead_code)]
224 pub(crate) fn new(db: AimDb<TokioAdapter>) -> DbResult<Self> {
225 // Create shutdown channel
226 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<ShutdownSignal>(1);
227
228 // Wrap database in Arc for sharing
229 let db = Arc::new(db);
230
231 // Spawn the runtime thread
232 let runtime_handle_result = Arc::new(std::sync::Mutex::new(None));
233 let runtime_handle_clone = runtime_handle_result.clone();
234
235 let thread_handle = thread::Builder::new()
236 .name("aimdb-sync-runtime".to_string())
237 .spawn(move || {
238 // Create a new Tokio runtime for this thread
239 let runtime = match tokio::runtime::Runtime::new() {
240 Ok(rt) => rt,
241 Err(e) => {
242 eprintln!("Failed to create Tokio runtime: {}", e);
243 return;
244 }
245 };
246
247 // Store the runtime handle so the main thread can access it
248 {
249 let mut handle = runtime_handle_clone.lock().unwrap();
250 *handle = Some(runtime.handle().clone());
251 }
252
253 // Wait for shutdown signal
254 runtime.block_on(async move {
255 let _ = shutdown_rx.recv().await;
256 // When shutdown signal is received, we exit and drop the database
257 });
258 })
259 .map_err(|e| DbError::AttachFailed {
260 message: format!("Failed to spawn runtime thread: {}", e),
261 })?;
262
263 // Wait for runtime handle to be available
264 let runtime_handle = loop {
265 let handle_opt = runtime_handle_result.lock().unwrap().clone();
266 if let Some(handle) = handle_opt {
267 break handle;
268 }
269 thread::sleep(Duration::from_millis(1));
270 };
271
272 Ok(Self {
273 thread_handle: Some(thread_handle),
274 shutdown_tx: Some(shutdown_tx),
275 runtime_handle,
276 db,
277 })
278 }
279
280 /// Create a synchronous producer for type `T`.
281 ///
282 /// # Arguments
283 ///
284 /// - `key`: The record key identifying this record instance
285 ///
286 /// # Type Parameters
287 ///
288 /// - `T`: The record type, must implement `TypedRecord`
289 ///
290 /// # Errors
291 ///
292 /// - `DbError::RecordNotFound` if type `T` was not registered
293 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
294 ///
295 /// # Example
296 ///
297 /// ```rust,ignore
298 /// # use aimdb_sync::*;
299 /// # use serde::{Serialize, Deserialize};
300 /// # #[derive(Debug, Clone, Serialize, Deserialize)]
301 /// # struct Temperature { celsius: f32 }
302 /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
303 /// let producer = handle.producer::<Temperature>("sensor::temp")?;
304 /// producer.set(Temperature { celsius: 25.0 })?;
305 /// # Ok(())
306 /// # }
307 /// ```
308 pub fn producer<T>(&self, key: impl AsRef<str>) -> DbResult<crate::SyncProducer<T>>
309 where
310 T: Send + 'static + Debug + Clone,
311 {
312 self.producer_with_capacity(key, DEFAULT_SYNC_CHANNEL_CAPACITY)
313 }
314
315 /// Create a synchronous consumer for type `T`.
316 ///
317 /// # Arguments
318 ///
319 /// - `key`: The record key identifying this record instance
320 ///
321 /// # Type Parameters
322 ///
323 /// - `T`: The record type, must implement `TypedRecord`
324 ///
325 /// # Errors
326 ///
327 /// - `DbError::RecordNotFound` if type `T` was not registered
328 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
329 ///
330 /// # Example
331 ///
332 /// ```rust,no_run
333 /// # use aimdb_sync::*;
334 /// # use serde::{Serialize, Deserialize};
335 /// # #[derive(Clone, Debug, Serialize, Deserialize)]
336 /// # struct Temperature { celsius: f32 }
337 /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
338 /// let consumer = handle.consumer::<Temperature>("sensor::temp")?;
339 /// let temp = consumer.get()?;
340 /// # Ok(())
341 /// # }
342 /// ```
343 pub fn consumer<T>(&self, key: impl AsRef<str>) -> DbResult<crate::SyncConsumer<T>>
344 where
345 T: Send + Sync + 'static + Debug + Clone,
346 {
347 self.consumer_with_capacity(key, DEFAULT_SYNC_CHANNEL_CAPACITY)
348 }
349
350 /// Create a synchronous producer with custom channel capacity.
351 ///
352 /// Like `producer()` but allows specifying the channel buffer size.
353 /// Use this when you need different buffering characteristics for specific record types.
354 ///
355 /// # Arguments
356 ///
357 /// - `key`: The record key identifying this record instance
358 /// - `capacity`: Channel buffer size (number of items that can be buffered)
359 ///
360 /// # Type Parameters
361 ///
362 /// - `T`: The record type, must implement `TypedRecord`
363 ///
364 /// # Errors
365 ///
366 /// - `DbError::RecordNotFound` if type `T` was not registered
367 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
368 ///
369 /// # Example
370 ///
371 /// ```rust,ignore
372 /// # use aimdb_sync::*;
373 /// # use serde::{Serialize, Deserialize};
374 /// # #[derive(Debug, Clone, Serialize, Deserialize)]
375 /// # struct HighFrequencySensor { value: f32 }
376 /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
377 /// // High-frequency sensor needs larger buffer
378 /// let producer = handle.producer_with_capacity::<HighFrequencySensor>("sensor::high_freq", 1000)?;
379 /// producer.set(HighFrequencySensor { value: 42.0 })?;
380 /// # Ok(())
381 /// # }
382 /// ```
383 pub fn producer_with_capacity<T>(
384 &self,
385 key: impl AsRef<str>,
386 capacity: usize,
387 ) -> DbResult<crate::SyncProducer<T>>
388 where
389 T: Send + 'static + Debug + Clone,
390 {
391 // Create a bounded tokio channel for async/sync bridging
392 // Channel carries (value, result_sender) tuples to propagate errors back
393 let (tx, mut rx) =
394 mpsc::channel::<(T, tokio::sync::oneshot::Sender<DbResult<()>>)>(capacity);
395
396 // Spawn a task on the runtime to forward values to the database
397 let db = self.db.clone();
398 let record_key = key.as_ref().to_string();
399 self.runtime_handle.spawn(async move {
400 while let Some((value, result_tx)) = rx.recv().await {
401 // Forward the value to the database's produce pipeline
402 let result = db.produce(&record_key, value).await;
403
404 // Send the result back to the caller (may fail if caller dropped)
405 let _ = result_tx.send(result);
406 }
407 });
408
409 Ok(crate::SyncProducer::new(tx, self.runtime_handle.clone()))
410 }
411
412 /// Create a synchronous consumer with custom channel capacity.
413 ///
414 /// Like `consumer()` but allows specifying the channel buffer size.
415 /// Use this when you need different buffering characteristics for specific record types.
416 ///
417 /// # Arguments
418 ///
419 /// - `key`: The record key identifying this record instance
420 /// - `capacity`: Channel buffer size (number of items that can be buffered)
421 ///
422 /// # Type Parameters
423 ///
424 /// - `T`: The record type, must implement `TypedRecord`
425 ///
426 /// # Errors
427 ///
428 /// - `DbError::RecordNotFound` if type `T` was not registered
429 /// - `DbError::RuntimeShutdown` if the runtime thread has stopped
430 ///
431 /// # Example
432 ///
433 /// ```rust,no_run
434 /// # use aimdb_sync::*;
435 /// # use serde::{Serialize, Deserialize};
436 /// # #[derive(Clone, Debug, Serialize, Deserialize)]
437 /// # struct RareEvent { id: u32 }
438 /// # fn example(handle: &AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
439 /// // Rare events need smaller buffer
440 /// let consumer = handle.consumer_with_capacity::<RareEvent>("events::rare", 10)?;
441 /// let event = consumer.get()?;
442 /// # Ok(())
443 /// # }
444 /// ```
445 pub fn consumer_with_capacity<T>(
446 &self,
447 key: impl AsRef<str>,
448 capacity: usize,
449 ) -> DbResult<crate::SyncConsumer<T>>
450 where
451 T: Send + Sync + 'static + Debug + Clone,
452 {
453 // Create std::sync::mpsc channel for sync API
454 let (std_tx, std_rx) = std::sync::mpsc::sync_channel::<T>(capacity);
455
456 // Create a oneshot channel to confirm subscription succeeded
457 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
458
459 // Spawn a task on the runtime to forward buffer data to the std channel
460 let db = self.db.clone();
461 let record_key = key.as_ref().to_string();
462 self.runtime_handle.spawn(async move {
463 // Subscribe to the database buffer for type T
464 match db.subscribe::<T>(&record_key) {
465 Ok(mut reader) => {
466 // Signal that subscription succeeded
467 let _ = ready_tx.send(());
468
469 // Forward all values from the buffer reader to the std channel
470 loop {
471 match reader.recv().await {
472 Ok(value) => {
473 // Send to std channel (non-async operation)
474 // If the receiver is dropped, send() will fail
475 if std_tx.send(value).is_err() {
476 break;
477 }
478 }
479 Err(DbError::BufferLagged { lag_count, .. }) => {
480 // Consumer fell behind - this is not fatal
481 // Log warning but continue receiving
482 eprintln!(
483 "Warning: Consumer for {} lagged by {} messages",
484 std::any::type_name::<T>(),
485 lag_count
486 );
487 // Don't break - next recv() will get latest data
488 }
489 Err(DbError::BufferClosed { .. }) => {
490 // Buffer closed (shutdown) - exit gracefully
491 break;
492 }
493 Err(e) => {
494 // Other unexpected errors - log and stop
495 eprintln!(
496 "Error reading from buffer for {}: {}",
497 std::any::type_name::<T>(),
498 e
499 );
500 break;
501 }
502 }
503 }
504 }
505 Err(e) => {
506 eprintln!(
507 "Failed to subscribe to record type {}: {}",
508 std::any::type_name::<T>(),
509 e
510 );
511 // Signal failure (will be ignored if receiver dropped)
512 let _ = ready_tx.send(());
513 }
514 }
515 });
516
517 // Wait for subscription to complete (with timeout)
518 ready_rx
519 .blocking_recv()
520 .map_err(|_| DbError::AttachFailed {
521 message: format!("Failed to subscribe to {}", std::any::type_name::<T>()),
522 })?;
523
524 Ok(crate::SyncConsumer::new(std_rx))
525 }
526
527 /// Gracefully shut down the runtime thread.
528 ///
529 /// Signals the runtime to stop, waits for all pending operations
530 /// to complete, then joins the thread. This is the preferred way
531 /// to shut down.
532 ///
533 /// # Errors
534 ///
535 /// - `DbError::DetachFailed` if shutdown fails or times out
536 ///
537 /// # Example
538 ///
539 /// ```rust,no_run
540 /// # use aimdb_sync::*;
541 /// # fn example(handle: AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
542 /// handle.detach()?;
543 /// # Ok(())
544 /// # }
545 /// ```
546 pub fn detach(mut self) -> DbResult<()> {
547 self.detach_internal(None)
548 }
549
550 /// Gracefully shut down with a timeout.
551 ///
552 /// Like `detach()`, but fails if shutdown takes longer than
553 /// the specified duration.
554 ///
555 /// # Arguments
556 ///
557 /// - `timeout`: Maximum time to wait for shutdown
558 ///
559 /// # Errors
560 ///
561 /// - `DbError::DetachFailed` if shutdown fails or times out
562 ///
563 /// # Example
564 ///
565 /// ```rust,no_run
566 /// # use aimdb_sync::*;
567 /// # use std::time::Duration;
568 /// # fn example(handle: AimDbHandle) -> Result<(), Box<dyn std::error::Error>> {
569 /// handle.detach_timeout(Duration::from_secs(5))?;
570 /// # Ok(())
571 /// # }
572 /// ```
573 pub fn detach_timeout(mut self, timeout: Duration) -> DbResult<()> {
574 self.detach_internal(Some(timeout))
575 }
576
577 /// Internal detach implementation.
578 fn detach_internal(&mut self, timeout: Option<Duration>) -> DbResult<()> {
579 // Send shutdown signal
580 if let Some(shutdown_tx) = self.shutdown_tx.take() {
581 // Try to send shutdown signal (non-blocking)
582 // If it fails, the runtime may have already stopped
583 let _ = shutdown_tx.try_send(ShutdownSignal);
584 }
585
586 // Join the runtime thread
587 if let Some(thread_handle) = self.thread_handle.take() {
588 match timeout {
589 Some(duration) => {
590 // Join with timeout using a different approach since JoinHandle
591 // doesn't directly support timeouts
592 let handle_thread = thread::spawn(move || thread_handle.join());
593
594 // Wait for the thread to complete with timeout
595 let start = std::time::Instant::now();
596 loop {
597 if handle_thread.is_finished() {
598 break;
599 }
600 if start.elapsed() > duration {
601 return Err(DbError::DetachFailed {
602 message: format!(
603 "Runtime thread did not shut down within {:?}",
604 duration
605 ),
606 });
607 }
608 thread::sleep(Duration::from_millis(10));
609 }
610
611 // Retrieve the result
612 handle_thread
613 .join()
614 .map_err(|_| DbError::DetachFailed {
615 message: "Failed to join helper thread".to_string(),
616 })?
617 .map_err(|_| DbError::DetachFailed {
618 message: "Runtime thread panicked".to_string(),
619 })?;
620 }
621 None => {
622 // Join without timeout
623 thread_handle.join().map_err(|_| DbError::DetachFailed {
624 message: "Runtime thread panicked during shutdown".to_string(),
625 })?;
626 }
627 }
628 }
629
630 Ok(())
631 }
632}
633
634impl Drop for AimDbHandle {
635 /// Attempts graceful shutdown if `detach()` was not called.
636 ///
637 /// Logs a warning and attempts shutdown with a 5-second timeout.
638 /// If shutdown fails, the runtime thread may be left running.
639 fn drop(&mut self) {
640 if self.thread_handle.is_some() {
641 eprintln!("Warning: AimDbHandle dropped without calling detach()");
642 eprintln!("Attempting emergency shutdown with 5 second timeout");
643
644 let timeout = Duration::from_secs(5);
645 if let Err(e) = self.detach_internal(Some(timeout)) {
646 eprintln!("Error during emergency shutdown: {}", e);
647 }
648 }
649 }
650}
651
652// Safety: AimDbHandle owns the runtime thread and channels are Send + Sync
653unsafe impl Send for AimDbHandle {}
654unsafe impl Sync for AimDbHandle {}
655
656#[cfg(test)]
657mod tests {
658 #[test]
659 fn test_extension_trait_exists() {
660 // Just ensure the module compiles
661 // Actual functionality tests will come later
662 }
663}