aimdb_core/builder.rs
1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::vec::Vec;
13use hashbrown::HashMap;
14
15#[cfg(not(feature = "std"))]
16use alloc::{boxed::Box, sync::Arc};
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19use alloc::string::{String, ToString};
20
21#[cfg(feature = "std")]
22use std::{boxed::Box, sync::Arc};
23
24use crate::extensions::Extensions;
25use crate::graph::DependencyGraph;
26
27/// Type-erased on_start function stored in `AimDbBuilder::start_fns`.
28///
29/// Defined once here so `on_start()` (which stores) and `build()` (which
30/// downcasts) share the *exact same* type and a silent type mismatch cannot
31/// cause a runtime panic.
32#[cfg(feature = "std")]
33type StartFnType<R> = Box<
34 dyn FnOnce(
35 Arc<R>,
36 )
37 -> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
38 + Send,
39>;
40#[cfg(not(feature = "std"))]
41type NoStdStartFnType<R> = Box<
42 dyn FnOnce(
43 Arc<R>,
44 )
45 -> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
46 + Send,
47>;
48use crate::record_id::{RecordId, RecordKey, StringKey};
49use crate::typed_api::{RecordRegistrar, RecordT};
50use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
51use crate::{DbError, DbResult};
52
53/// Type alias for outbound route tuples returned by `collect_outbound_routes`
54///
55/// Each tuple contains:
56/// - `String` - Default topic/destination from the URL path
57/// - `Box<dyn ConsumerTrait>` - Consumer for subscribing to record values
58/// - `SerializerFn` - User-provided serializer for the record type
59/// - `Vec<(String, String)>` - Configuration options from the URL query
60/// - `Option<TopicProviderFn>` - Optional dynamic topic provider
61#[cfg(feature = "alloc")]
62pub type OutboundRoute = (
63 String,
64 Box<dyn crate::connector::ConsumerTrait>,
65 crate::connector::SerializerFn,
66 Vec<(String, String)>,
67 Option<crate::connector::TopicProviderFn>,
68);
69
70/// Marker type for untyped builder (before runtime is set)
71pub struct NoRuntime;
72
73/// Internal database state
74///
75/// Holds the registry of typed records with multiple index structures for
76/// efficient access patterns:
77///
78/// - **`storages`**: Vec for O(1) hot-path access by RecordId
79/// - **`by_key`**: HashMap for O(1) lookup by stable RecordKey
80/// - **`by_type`**: HashMap for introspection (find all records of type T)
81/// - **`types`**: Vec for runtime type validation during downcasts
82/// - **`dependency_graph`**: DAG of record relationships (immutable after build)
83pub struct AimDbInner {
84 /// Record storage (hot path - indexed by RecordId)
85 ///
86 /// Order matches registration order. Immutable after build().
87 storages: Vec<Box<dyn AnyRecord>>,
88
89 /// Name → RecordId lookup (control plane)
90 ///
91 /// Used by remote access, CLI, MCP for O(1) name resolution.
92 by_key: HashMap<StringKey, RecordId>,
93
94 /// TypeId → RecordIds lookup (introspection)
95 ///
96 /// Enables "find all Temperature records" queries.
97 by_type: HashMap<TypeId, Vec<RecordId>>,
98
99 /// RecordId → TypeId lookup (type safety assertions)
100 ///
101 /// Used to validate downcasts at runtime.
102 types: Vec<TypeId>,
103
104 /// RecordId → StringKey lookup (reverse mapping)
105 ///
106 /// Used to get the key for a given record ID.
107 keys: Vec<StringKey>,
108
109 /// Dependency graph (immutable after build)
110 ///
111 /// Contains all record nodes, edges, and topological order.
112 /// Used for introspection, spawn ordering, and graph queries.
113 dependency_graph: crate::graph::DependencyGraph,
114
115 /// Generic extension storage (frozen after build)
116 ///
117 /// External crates (e.g. `aimdb-persistence`) store typed state here
118 /// during builder configuration and retrieve it at query time via
119 /// `AimDb::extensions()`.
120 pub(crate) extensions: Extensions,
121}
122
123impl AimDbInner {
124 /// Resolve RecordKey to RecordId (control plane - O(1) average)
125 #[inline]
126 pub fn resolve<K: RecordKey>(&self, key: &K) -> Option<RecordId> {
127 self.by_key.get(key.as_str()).copied()
128 }
129
130 /// Resolve string to RecordId (convenience for remote access)
131 ///
132 /// O(1) average thanks to `Borrow<str>` implementation on `RecordKey`.
133 #[inline]
134 pub fn resolve_str(&self, name: &str) -> Option<RecordId> {
135 self.by_key.get(name).copied()
136 }
137
138 /// Get storage by RecordId (hot path - O(1))
139 #[inline]
140 pub fn storage(&self, id: RecordId) -> Option<&dyn AnyRecord> {
141 self.storages.get(id.index()).map(|b| b.as_ref())
142 }
143
144 /// Get the StringKey for a given RecordId
145 #[inline]
146 pub fn key_for(&self, id: RecordId) -> Option<&StringKey> {
147 self.keys.get(id.index())
148 }
149
150 /// Get all RecordIds for a type (introspection)
151 pub fn records_of_type<T: 'static>(&self) -> &[RecordId] {
152 self.by_type
153 .get(&TypeId::of::<T>())
154 .map(|v| v.as_slice())
155 .unwrap_or(&[])
156 }
157
158 /// Get the number of registered records
159 #[inline]
160 pub fn record_count(&self) -> usize {
161 self.storages.len()
162 }
163
164 /// Get the dependency graph (immutable reference)
165 ///
166 /// The graph is constructed once during `build()` and never changes.
167 /// Contains all record nodes, edges, and topological ordering.
168 #[inline]
169 pub fn dependency_graph(&self) -> &crate::graph::DependencyGraph {
170 &self.dependency_graph
171 }
172
173 /// Helper to get a typed record by RecordKey
174 ///
175 /// This encapsulates the common pattern of:
176 /// 1. Resolving key to RecordId
177 /// 2. Validating TypeId matches
178 /// 3. Downcasting to the typed record
179 pub fn get_typed_record_by_key<T, R>(
180 &self,
181 key: impl AsRef<str>,
182 ) -> DbResult<&TypedRecord<T, R>>
183 where
184 T: Send + 'static + Debug + Clone,
185 R: aimdb_executor::Spawn + 'static,
186 {
187 let key_str = key.as_ref();
188
189 // Resolve key to RecordId
190 let id = self.resolve_str(key_str).ok_or({
191 #[cfg(feature = "std")]
192 {
193 DbError::RecordKeyNotFound {
194 key: key_str.to_string(),
195 }
196 }
197 #[cfg(not(feature = "std"))]
198 {
199 DbError::RecordKeyNotFound { _key: () }
200 }
201 })?;
202
203 self.get_typed_record_by_id::<T, R>(id)
204 }
205
206 /// Helper to get a typed record by RecordId with type validation
207 pub fn get_typed_record_by_id<T, R>(&self, id: RecordId) -> DbResult<&TypedRecord<T, R>>
208 where
209 T: Send + 'static + Debug + Clone,
210 R: aimdb_executor::Spawn + 'static,
211 {
212 use crate::typed_record::AnyRecordExt;
213
214 // Validate RecordId is in bounds
215 if id.index() >= self.storages.len() {
216 return Err(DbError::InvalidRecordId { id: id.raw() });
217 }
218
219 // Validate TypeId matches
220 let expected = TypeId::of::<T>();
221 let actual = self.types[id.index()];
222 if expected != actual {
223 #[cfg(feature = "std")]
224 return Err(DbError::TypeMismatch {
225 record_id: id.raw(),
226 expected_type: core::any::type_name::<T>().to_string(),
227 });
228 #[cfg(not(feature = "std"))]
229 return Err(DbError::TypeMismatch {
230 record_id: id.raw(),
231 _expected_type: (),
232 });
233 }
234
235 // Safe to downcast (type validated above)
236 let record = &self.storages[id.index()];
237
238 #[cfg(feature = "std")]
239 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
240 operation: "get_typed_record_by_id".to_string(),
241 reason: "type mismatch during downcast".to_string(),
242 })?;
243
244 #[cfg(not(feature = "std"))]
245 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
246 _operation: (),
247 _reason: (),
248 })?;
249
250 Ok(typed_record)
251 }
252
253 /// Collects metadata for all registered records (std only)
254 ///
255 /// Returns a vector of `RecordMetadata` for remote access introspection.
256 /// Available only when the `std` feature is enabled.
257 #[cfg(feature = "std")]
258 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
259 self.storages
260 .iter()
261 .enumerate()
262 .map(|(i, record)| {
263 let id = RecordId::new(i as u32);
264 let type_id = self.types[i];
265 let key = self.keys[i];
266 record.collect_metadata(type_id, key, id)
267 })
268 .collect()
269 }
270
271 /// Try to get record's latest value as JSON by record key (std only)
272 ///
273 /// O(1) lookup using the key-based index.
274 ///
275 /// # Arguments
276 /// * `record_key` - The record key (e.g., "sensors.temperature")
277 ///
278 /// # Returns
279 /// `Some(JsonValue)` with the current record value, or `None`
280 #[cfg(feature = "std")]
281 pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
282 let id = self.resolve_str(record_key)?;
283 self.storages.get(id.index())?.latest_json()
284 }
285
286 /// Sets a record value from JSON (remote access API)
287 ///
288 /// Deserializes the JSON value and writes it to the record's buffer.
289 ///
290 /// **SAFETY:** Enforces the "No Producer Override" rule:
291 /// - Only works for records with `producer_count == 0`
292 /// - Returns error if the record has active producers
293 ///
294 /// # Arguments
295 /// * `record_key` - The record key (e.g., "config.app")
296 /// * `json_value` - JSON representation of the value
297 ///
298 /// # Returns
299 /// - `Ok(())` - Successfully set the value
300 /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
301 #[cfg(feature = "std")]
302 pub fn set_record_from_json(
303 &self,
304 record_key: &str,
305 json_value: serde_json::Value,
306 ) -> DbResult<()> {
307 let id = self
308 .resolve_str(record_key)
309 .ok_or_else(|| DbError::RecordKeyNotFound {
310 key: record_key.to_string(),
311 })?;
312
313 self.storages[id.index()].set_from_json(json_value)
314 }
315}
316
317/// Database builder for producer-consumer pattern
318///
319/// Provides a fluent API for constructing databases with type-safe record registration.
320/// Use `.runtime()` to set the runtime and transition to a typed builder.
321pub struct AimDbBuilder<R = NoRuntime> {
322 /// Registered records with their keys (order matters for RecordId assignment)
323 records: Vec<(StringKey, TypeId, Box<dyn AnyRecord>)>,
324
325 /// Runtime adapter
326 runtime: Option<Arc<R>>,
327
328 /// Connector builders that will be invoked during build()
329 connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
330
331 /// Spawn functions with their keys
332 spawn_fns: Vec<(StringKey, Box<dyn core::any::Any + Send>)>,
333
334 /// Startup tasks registered via on_start() — spawned after build() completes.
335 /// Stored type-erased (Box<Box<dyn FnOnce(Arc<R>) -> BoxFuture<…>>>) to allow
336 /// the field to exist on the unparameterised NoRuntime builder too.
337 start_fns: Vec<Box<dyn core::any::Any + Send>>,
338
339 /// Generic extension storage for external crates (e.g., persistence, metrics).
340 /// Moved into AimDbInner during build() so it can be read on the live AimDb handle.
341 extensions: Extensions,
342
343 /// Remote access configuration (std only)
344 #[cfg(feature = "std")]
345 remote_config: Option<crate::remote::AimxConfig>,
346
347 /// PhantomData to track the runtime type parameter
348 _phantom: PhantomData<R>,
349}
350
351impl AimDbBuilder<NoRuntime> {
352 /// Creates a new database builder without a runtime
353 ///
354 /// Call `.runtime()` to set the runtime adapter.
355 pub fn new() -> Self {
356 Self {
357 records: Vec::new(),
358 runtime: None,
359 connector_builders: Vec::new(),
360 spawn_fns: Vec::new(),
361 start_fns: Vec::new(),
362 extensions: Extensions::new(),
363 #[cfg(feature = "std")]
364 remote_config: None,
365 _phantom: PhantomData,
366 }
367 }
368
369 /// Sets the runtime adapter
370 ///
371 /// This transitions the builder from untyped to typed with concrete runtime `R`.
372 ///
373 /// # Type Safety Note
374 ///
375 /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
376 /// transition because connectors are parameterized by the runtime type:
377 ///
378 /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
379 /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
380 ///
381 /// These types are incompatible and cannot be transferred. However, this is not a bug
382 /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
383 /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
384 ///
385 /// This means the type system **enforces** the correct call order:
386 /// ```rust,ignore
387 /// AimDbBuilder::new()
388 /// .runtime(runtime) // ← Must be called first
389 /// .with_connector(connector) // ← Now available
390 /// ```
391 ///
392 /// The `records` and `remote_config` are preserved across the transition since they
393 /// are not parameterized by the runtime type.
394 pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
395 where
396 R: aimdb_executor::Spawn + 'static,
397 {
398 AimDbBuilder {
399 records: self.records,
400 runtime: Some(rt),
401 connector_builders: Vec::new(),
402 spawn_fns: Vec::new(),
403 start_fns: self.start_fns,
404 extensions: self.extensions,
405 #[cfg(feature = "std")]
406 remote_config: self.remote_config,
407 _phantom: PhantomData,
408 }
409 }
410}
411
412impl<R> AimDbBuilder<R>
413where
414 R: aimdb_executor::Spawn + 'static,
415{
416 /// Returns a shared reference to the extension storage.
417 ///
418 /// External crates use this to retrieve state stored via `extensions_mut()`.
419 pub fn extensions(&self) -> &Extensions {
420 &self.extensions
421 }
422
423 /// Returns a mutable reference to the extension storage.
424 ///
425 /// Use this inside `AimDbBuilderPersistExt` (or similar) to store typed state
426 /// that `.persist()` and `AimDbQueryExt` will later retrieve.
427 pub fn extensions_mut(&mut self) -> &mut Extensions {
428 &mut self.extensions
429 }
430
431 /// Registers a task to be spawned after `build()` completes.
432 ///
433 /// The closure receives an `Arc<R>` (the runtime adapter) and must return a
434 /// future that runs for as long as needed (e.g. an infinite cleanup loop).
435 /// Tasks are spawned in registration order, after all record tasks and
436 /// connectors have been started.
437 ///
438 /// # Example
439 /// ```rust,ignore
440 /// builder.on_start(|runtime| async move {
441 /// loop {
442 /// do_cleanup().await;
443 /// runtime.sleep(Duration::from_secs(3600)).await;
444 /// }
445 /// });
446 /// ```
447 pub fn on_start<F, Fut>(&mut self, f: F) -> &mut Self
448 where
449 F: FnOnce(Arc<R>) -> Fut + Send + 'static,
450 Fut: core::future::Future<Output = ()> + Send + 'static,
451 {
452 // Type-erase so the field can be shared with the `NoRuntime` builder struct.
453 // Uses the module-level `StartFnType<R>` alias — must stay in sync with
454 // the downcast in `build()`.
455 #[cfg(feature = "std")]
456 let boxed: StartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
457 #[cfg(not(feature = "std"))]
458 let boxed: NoStdStartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
459 self.start_fns.push(Box::new(boxed));
460 self
461 }
462
463 /// Registers a connector builder that will be invoked during `build()`
464 ///
465 /// The connector builder will be called after the database is constructed,
466 /// allowing it to collect routes and initialize the connector properly.
467 ///
468 /// # Arguments
469 /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
470 ///
471 /// # Example
472 ///
473 /// ```rust,ignore
474 /// use aimdb_mqtt_connector::MqttConnector;
475 ///
476 /// let db = AimDbBuilder::new()
477 /// .runtime(runtime)
478 /// .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
479 /// .configure::<Temperature>(|reg| {
480 /// reg.link_from("mqtt://commands/temp")...
481 /// })
482 /// .build().await?;
483 /// ```
484 pub fn with_connector(
485 mut self,
486 builder: impl crate::connector::ConnectorBuilder<R> + 'static,
487 ) -> Self {
488 self.connector_builders.push(Box::new(builder));
489 self
490 }
491
492 /// Enables remote access via AimX protocol (std only)
493 ///
494 /// Configures the database to accept remote connections over a Unix domain socket,
495 /// allowing external clients to introspect records, subscribe to updates, and
496 /// (optionally) write data.
497 ///
498 /// The remote access supervisor will be spawned automatically during `build()`.
499 ///
500 /// # Arguments
501 /// * `config` - Remote access configuration (socket path, security policy, etc.)
502 ///
503 /// # Example
504 ///
505 /// ```rust,ignore
506 /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
507 ///
508 /// let config = AimxConfig::new("/tmp/aimdb.sock")
509 /// .with_security(SecurityPolicy::read_only());
510 ///
511 /// let db = AimDbBuilder::new()
512 /// .runtime(runtime)
513 /// .with_remote_access(config)
514 /// .build()?;
515 /// ```
516 #[cfg(feature = "std")]
517 pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
518 self.remote_config = Some(config);
519 self
520 }
521
522 /// Configures a record type manually with a unique key
523 ///
524 /// The key uniquely identifies this record instance. Multiple records of the same
525 /// type can exist with different keys (e.g., "sensor.temperature.room1" and
526 /// "sensor.temperature.room2").
527 ///
528 /// # Arguments
529 /// * `key` - A unique identifier for this record. Can be a string literal, `StringKey`,
530 /// or any type implementing `RecordKey` (including user-defined enum keys).
531 /// * `f` - Configuration closure
532 ///
533 /// # Example
534 /// ```rust,ignore
535 /// // Using string literal
536 /// builder.configure::<Temperature>("sensor.temp.room1", |reg| { ... });
537 ///
538 /// // Using compile-time safe enum key
539 /// builder.configure::<Temperature>(SensorKey::TempRoom1, |reg| { ... });
540 /// ```
541 pub fn configure<T>(
542 &mut self,
543 key: impl RecordKey,
544 f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
545 ) -> &mut Self
546 where
547 T: Send + Sync + 'static + Debug + Clone,
548 {
549 // Convert any RecordKey to StringKey for internal storage
550 let record_key: StringKey = StringKey::from_dynamic(key.as_str());
551 let type_id = TypeId::of::<T>();
552
553 // Find existing record with this key, or create new one
554 let record_index = self.records.iter().position(|(k, _, _)| k == &record_key);
555
556 let (rec, is_new_record) = match record_index {
557 Some(idx) => {
558 // Use existing record
559 let (_, existing_type, record) = &mut self.records[idx];
560 assert!(
561 *existing_type == type_id,
562 "StringKey '{}' already registered with different type",
563 record_key.as_str()
564 );
565 (
566 record
567 .as_typed_mut::<T, R>()
568 .expect("type mismatch in record registry"),
569 false,
570 )
571 }
572 None => {
573 // Create new record
574 self.records
575 .push((record_key, type_id, Box::new(TypedRecord::<T, R>::new())));
576 let (_, _, record) = self.records.last_mut().unwrap();
577 (
578 record
579 .as_typed_mut::<T, R>()
580 .expect("type mismatch in record registry"),
581 true,
582 )
583 }
584 };
585
586 let mut reg = RecordRegistrar {
587 rec,
588 connector_builders: &self.connector_builders,
589 #[cfg(feature = "alloc")]
590 record_key: record_key.as_str().to_string(),
591 extensions: &self.extensions,
592 };
593 f(&mut reg);
594
595 // Only store spawn function for new records to avoid duplicates
596 if is_new_record {
597 let spawn_key = record_key;
598
599 #[allow(clippy::type_complexity)]
600 let spawn_fn: Box<
601 dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send,
602 > = Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>, id: RecordId| {
603 // Use RecordSpawner to spawn tasks for this record type
604 use crate::typed_record::RecordSpawner;
605
606 let typed_record = db.inner().get_typed_record_by_id::<T, R>(id)?;
607 // Get the record key from the database to enable key-based producer/consumer
608 #[cfg(feature = "alloc")]
609 let key = db
610 .inner()
611 .key_for(id)
612 .map(|k| k.as_str().to_string())
613 .unwrap_or_else(|| alloc::format!("__record_{}", id.index()));
614 #[cfg(not(feature = "alloc"))]
615 let key = "";
616 #[cfg(feature = "alloc")]
617 let key = key.as_str();
618 RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db, key)
619 });
620
621 // Store the spawn function (type-erased in Box<dyn Any>)
622 self.spawn_fns.push((spawn_key, Box::new(spawn_fn)));
623 }
624
625 self
626 }
627
628 /// Registers a self-registering record type
629 ///
630 /// The record type must implement `RecordT<R>`.
631 ///
632 /// Uses the type name as the default key. For custom keys, use `configure()` directly.
633 pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
634 where
635 T: RecordT<R>,
636 {
637 // Default key is the full type name for backward compatibility
638 let key = StringKey::new(core::any::type_name::<T>());
639 self.configure::<T>(key, |reg| T::register(reg, cfg))
640 }
641
642 /// Registers a self-registering record type with a custom key
643 ///
644 /// The record type must implement `RecordT<R>`.
645 pub fn register_record_with_key<T>(&mut self, key: impl RecordKey, cfg: &T::Config) -> &mut Self
646 where
647 T: RecordT<R>,
648 {
649 self.configure::<T>(key, |reg| T::register(reg, cfg))
650 }
651
652 /// Runs the database indefinitely (never returns)
653 ///
654 /// This method builds the database, spawns all producer and consumer tasks, and then
655 /// parks the current task indefinitely. This is the primary way to run AimDB services.
656 ///
657 /// All logic runs in background tasks via producers, consumers, and connectors. The
658 /// application continues until interrupted (e.g., Ctrl+C).
659 ///
660 /// # Returns
661 /// `DbResult<()>` - Ok when database starts successfully, then parks forever
662 ///
663 /// # Example
664 ///
665 /// ```rust,ignore
666 /// #[tokio::main]
667 /// async fn main() -> DbResult<()> {
668 /// AimDbBuilder::new()
669 /// .runtime(Arc::new(TokioAdapter::new()?))
670 /// .configure::<MyData>(|reg| {
671 /// reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
672 /// .with_source(my_producer)
673 /// .with_tap(my_consumer);
674 /// })
675 /// .run().await // Runs forever
676 /// }
677 /// ```
678 pub async fn run(self) -> DbResult<()> {
679 #[cfg(feature = "tracing")]
680 tracing::info!("Building database and spawning background tasks...");
681
682 let _db = self.build().await?;
683
684 #[cfg(feature = "tracing")]
685 tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
686
687 // Park indefinitely - the background tasks will continue running
688 // The database handle is kept alive here to prevent dropping it
689 core::future::pending::<()>().await;
690
691 Ok(())
692 }
693
694 /// Builds the database and returns the handle (async)
695 ///
696 /// Use this when you need programmatic access to the database handle for
697 /// manual subscriptions or production. For typical services, use `.run().await` instead.
698 ///
699 /// **Automatic Task Spawning:** This method spawns all producer services and
700 /// `.tap()` observer tasks that were registered during configuration.
701 ///
702 /// **Connector Setup:** Connectors must be created manually before calling `build()`:
703 ///
704 /// ```rust,ignore
705 /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
706 ///
707 /// // Configure records with connector links
708 /// let builder = AimDbBuilder::new()
709 /// .runtime(runtime)
710 /// .configure::<Temp>("temp", |reg| {
711 /// reg.link_from("mqtt://commands/temp")
712 /// .with_buffer(BufferCfg::SingleLatest)
713 /// .with_remote_access();
714 /// });
715 ///
716 /// // Create MQTT connector with router
717 /// let router = RouterBuilder::new()
718 /// .route("commands/temp", /* deserializer */)
719 /// .build();
720 /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
721 ///
722 /// // Register connector and build
723 /// let db = builder
724 /// .with_connector("mqtt", Arc::new(connector))
725 /// .build().await?;
726 /// ```
727 ///
728 /// # Returns
729 /// `DbResult<AimDb<R>>` - The database instance
730 #[cfg_attr(not(feature = "std"), allow(unused_mut))]
731 pub async fn build(self) -> DbResult<AimDb<R>> {
732 use crate::DbError;
733
734 // Validate all records
735 for (key, _, record) in &self.records {
736 record.validate().map_err(|_msg| {
737 // Suppress unused warning for key in no_std
738 let _ = &key;
739 #[cfg(feature = "std")]
740 {
741 DbError::RuntimeError {
742 message: format!("Record '{}' validation failed: {}", key.as_str(), _msg),
743 }
744 }
745 #[cfg(not(feature = "std"))]
746 {
747 DbError::RuntimeError { _message: () }
748 }
749 })?;
750 }
751
752 // Ensure runtime is set
753 let runtime = self.runtime.ok_or({
754 #[cfg(feature = "std")]
755 {
756 DbError::RuntimeError {
757 message: "runtime not set (use .runtime())".into(),
758 }
759 }
760 #[cfg(not(feature = "std"))]
761 {
762 DbError::RuntimeError { _message: () }
763 }
764 })?;
765
766 // Build the new index structures
767 let record_count = self.records.len();
768 let mut storages: Vec<Box<dyn AnyRecord>> = Vec::with_capacity(record_count);
769 let mut by_key: HashMap<StringKey, RecordId> = HashMap::with_capacity(record_count);
770 let mut by_type: HashMap<TypeId, Vec<RecordId>> = HashMap::new();
771 let mut types: Vec<TypeId> = Vec::with_capacity(record_count);
772 let mut keys: Vec<StringKey> = Vec::with_capacity(record_count);
773
774 for (i, (key, type_id, record)) in self.records.into_iter().enumerate() {
775 let id = RecordId::new(i as u32);
776
777 // Check for duplicate keys (should not happen if configure() is used correctly)
778 if by_key.contains_key(&key) {
779 #[cfg(feature = "std")]
780 return Err(DbError::DuplicateRecordKey {
781 key: key.as_str().to_string(),
782 });
783 #[cfg(not(feature = "std"))]
784 return Err(DbError::DuplicateRecordKey { _key: () });
785 }
786
787 // Build index structures
788 storages.push(record);
789 by_key.insert(key, id);
790 by_type.entry(type_id).or_default().push(id);
791 types.push(type_id);
792 keys.push(key);
793 }
794
795 // Build dependency graph from record information
796 // Collect RecordGraphInfo for each record
797 let record_infos: Vec<crate::graph::RecordGraphInfo> = storages
798 .iter()
799 .enumerate()
800 .map(|(idx, record)| {
801 let key = keys[idx].as_str().to_string();
802 let origin = record.record_origin();
803
804 // Get buffer type and capacity from the record
805 let (buffer_type, buffer_capacity) = record.buffer_info();
806
807 crate::graph::RecordGraphInfo {
808 key,
809 origin,
810 buffer_type,
811 buffer_capacity,
812 tap_count: record.consumer_count(),
813 has_outbound_link: record.outbound_connector_count() > 0,
814 }
815 })
816 .collect();
817
818 // Build and validate the dependency graph
819 let dependency_graph = DependencyGraph::build_and_validate(&record_infos)?;
820
821 #[cfg(feature = "tracing")]
822 tracing::debug!(
823 "Dependency graph built successfully ({} nodes, {} edges, topo order: {:?})",
824 dependency_graph.nodes.len(),
825 dependency_graph.edges.len(),
826 dependency_graph.topo_order
827 );
828
829 let inner = Arc::new(AimDbInner {
830 storages,
831 by_key,
832 by_type,
833 types,
834 keys,
835 dependency_graph,
836 extensions: self.extensions,
837 });
838
839 let db = Arc::new(AimDb {
840 inner: inner.clone(),
841 runtime: runtime.clone(),
842 });
843
844 #[cfg(feature = "tracing")]
845 tracing::info!(
846 "Spawning producer services and tap observers for {} records",
847 self.spawn_fns.len()
848 );
849
850 // Build a lookup map from spawn_fns for topological ordering
851 let mut spawn_fn_map: HashMap<StringKey, Box<dyn core::any::Any + Send>> =
852 self.spawn_fns.into_iter().collect();
853
854 // Execute spawn functions in topological order
855 // This ensures transforms are spawned after their input records
856 for key_str in inner.dependency_graph.topo_order() {
857 // Find the StringKey that matches this key string
858 let key = match inner.by_key.keys().find(|k| k.as_str() == key_str) {
859 Some(k) => *k,
860 None => continue, // Key not in spawn_fns, skip
861 };
862
863 // Take the spawn function (if any) for this key
864 let spawn_fn_any = match spawn_fn_map.remove(&key) {
865 Some(f) => f,
866 None => continue, // No spawn function for this key
867 };
868
869 // Resolve key to RecordId
870 let id = inner.resolve(&key).ok_or({
871 #[cfg(feature = "std")]
872 {
873 DbError::RecordKeyNotFound {
874 key: key.as_str().to_string(),
875 }
876 }
877 #[cfg(not(feature = "std"))]
878 {
879 DbError::RecordKeyNotFound { _key: () }
880 }
881 })?;
882
883 // Downcast from Box<dyn Any> back to the concrete spawn function type
884 type SpawnFnType<R> =
885 Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send>;
886
887 let spawn_fn = spawn_fn_any
888 .downcast::<SpawnFnType<R>>()
889 .expect("spawn function type mismatch");
890
891 // Execute the spawn function
892 (*spawn_fn)(&runtime, &db, id)?;
893 }
894
895 #[cfg(feature = "tracing")]
896 tracing::info!("Automatic spawning complete");
897
898 // Spawn remote access supervisor if configured (std only)
899 #[cfg(feature = "std")]
900 if let Some(remote_cfg) = self.remote_config {
901 #[cfg(feature = "tracing")]
902 tracing::info!(
903 "Spawning remote access supervisor on socket: {}",
904 remote_cfg.socket_path.display()
905 );
906
907 // Apply security policy to mark writable records
908 let writable_keys = remote_cfg.security_policy.writable_records();
909 for key_str in writable_keys {
910 if let Some(id) = inner.resolve_str(&key_str) {
911 #[cfg(feature = "tracing")]
912 tracing::debug!("Marking record '{}' as writable", key_str);
913
914 // Mark the record as writable (type-erased call)
915 inner.storages[id.index()].set_writable_erased(true);
916 }
917 }
918
919 // Spawn the remote supervisor task
920 crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
921
922 #[cfg(feature = "tracing")]
923 tracing::info!("Remote access supervisor spawned successfully");
924 }
925
926 // Build connectors from builders (after database is fully constructed)
927 // This allows connectors to use collect_inbound_routes() which creates
928 // producers tied to this specific database instance
929 for builder in self.connector_builders {
930 #[cfg(feature = "tracing")]
931 let scheme = {
932 #[cfg(feature = "std")]
933 {
934 builder.scheme().to_string()
935 }
936 #[cfg(not(feature = "std"))]
937 {
938 alloc::string::String::from(builder.scheme())
939 }
940 };
941
942 #[cfg(feature = "tracing")]
943 tracing::debug!("Building connector for scheme: {}", scheme);
944
945 // Build the connector (this spawns tasks as a side effect)
946 let _connector = builder.build(&db).await?;
947
948 #[cfg(feature = "tracing")]
949 tracing::info!("Connector built and spawned successfully: {}", scheme);
950 }
951
952 // Spawn on_start tasks (registered by external crates like aimdb-persistence)
953 if !self.start_fns.is_empty() {
954 #[cfg(feature = "tracing")]
955 tracing::debug!("Spawning {} on_start task(s)", self.start_fns.len());
956
957 #[cfg(feature = "std")]
958 for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
959 let start_fn = start_fn_any
960 .downcast::<StartFnType<R>>()
961 .unwrap_or_else(|_| {
962 panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
963 });
964 let future = (*start_fn)(runtime.clone());
965 runtime.spawn(future).map_err(DbError::from)?;
966 }
967
968 #[cfg(not(feature = "std"))]
969 for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
970 let start_fn = start_fn_any
971 .downcast::<NoStdStartFnType<R>>()
972 .unwrap_or_else(|_| {
973 panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
974 });
975 let future = (*start_fn)(runtime.clone());
976 runtime.spawn(future).map_err(DbError::from)?;
977 }
978 }
979
980 // Unwrap the Arc to return the owned AimDb
981 // This is safe because we just created it and hold the only reference
982 let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
983
984 Ok(db_owned)
985 }
986}
987
988impl Default for AimDbBuilder<NoRuntime> {
989 fn default() -> Self {
990 Self::new()
991 }
992}
993
994/// Producer-consumer database
995///
996/// A database instance with type-safe record registration and cross-record
997/// communication via the Emitter pattern. The type parameter `R` represents
998/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
999///
1000/// See `examples/` for usage.
1001///
1002/// # Examples
1003///
1004/// ```rust,ignore
1005/// use aimdb_tokio_adapter::TokioAdapter;
1006///
1007/// let runtime = Arc::new(TokioAdapter);
1008/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
1009/// .runtime(runtime)
1010/// .register_record::<Temperature>(&TemperatureConfig)
1011/// .build()?;
1012/// ```
1013pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
1014 /// Internal state
1015 inner: Arc<AimDbInner>,
1016
1017 /// Runtime adapter with concrete type
1018 runtime: Arc<R>,
1019}
1020
1021impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
1022 fn clone(&self) -> Self {
1023 Self {
1024 inner: self.inner.clone(),
1025 runtime: self.runtime.clone(),
1026 }
1027 }
1028}
1029
1030impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
1031 /// Internal accessor for the inner state
1032 ///
1033 /// Used by adapter crates and internal spawning logic.
1034 #[doc(hidden)]
1035 pub fn inner(&self) -> &Arc<AimDbInner> {
1036 &self.inner
1037 }
1038
1039 /// Returns the extension storage frozen at `build()` time.
1040 ///
1041 /// External crates (e.g. `aimdb-persistence`) retrieve their typed state here
1042 /// to service query calls. The extensions are read-only on the live handle.
1043 ///
1044 /// # Example
1045 /// ```rust,ignore
1046 /// use aimdb_persistence::PersistenceState;
1047 /// let state = db.extensions().get::<PersistenceState>().unwrap();
1048 /// ```
1049 pub fn extensions(&self) -> &Extensions {
1050 &self.inner.extensions
1051 }
1052
1053 /// Builds a database with a closure-based builder pattern
1054 pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
1055 let mut b = AimDbBuilder::new().runtime(rt);
1056 f(&mut b);
1057 b.run().await
1058 }
1059
1060 /// Spawns a task using the database's runtime adapter
1061 ///
1062 /// This method provides direct access to the runtime's spawn capability.
1063 ///
1064 /// # Arguments
1065 /// * `future` - The future to spawn
1066 ///
1067 /// # Returns
1068 /// `DbResult<()>` - Ok if the task was spawned successfully
1069 pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
1070 where
1071 F: core::future::Future<Output = ()> + Send + 'static,
1072 {
1073 self.runtime.spawn(future).map_err(DbError::from)?;
1074 Ok(())
1075 }
1076
1077 /// Produces a value to a specific record by key
1078 ///
1079 /// Uses O(1) key-based lookup to find the correct record.
1080 ///
1081 /// # Arguments
1082 /// * `key` - The record key (e.g., "sensor.temperature")
1083 /// * `value` - The value to produce
1084 ///
1085 /// # Example
1086 ///
1087 /// ```rust,ignore
1088 /// db.produce::<Temperature>("sensors.indoor", indoor_temp).await?;
1089 /// db.produce::<Temperature>("sensors.outdoor", outdoor_temp).await?;
1090 /// ```
1091 pub async fn produce<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
1092 where
1093 T: Send + 'static + Debug + Clone,
1094 {
1095 let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1096 typed_rec.produce(value).await;
1097 Ok(())
1098 }
1099
1100 /// Subscribes to a specific record by key
1101 ///
1102 /// Uses O(1) key-based lookup to find the correct record.
1103 ///
1104 /// # Arguments
1105 /// * `key` - The record key (e.g., "sensor.temperature")
1106 ///
1107 /// # Example
1108 ///
1109 /// ```rust,ignore
1110 /// let mut reader = db.subscribe::<Temperature>("sensors.indoor")?;
1111 /// while let Ok(temp) = reader.recv().await {
1112 /// println!("Indoor: {:.1}°C", temp.celsius);
1113 /// }
1114 /// ```
1115 pub fn subscribe<T>(
1116 &self,
1117 key: impl AsRef<str>,
1118 ) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
1119 where
1120 T: Send + Sync + 'static + Debug + Clone,
1121 {
1122 let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1123 typed_rec.subscribe()
1124 }
1125
1126 /// Creates a type-safe producer for a specific record by key
1127 ///
1128 /// Returns a `Producer<T, R>` bound to a specific record key.
1129 ///
1130 /// # Arguments
1131 /// * `key` - The record key (e.g., "sensor.temperature")
1132 ///
1133 /// # Example
1134 ///
1135 /// ```rust,ignore
1136 /// let indoor_producer = db.producer::<Temperature>("sensors.indoor");
1137 /// let outdoor_producer = db.producer::<Temperature>("sensors.outdoor");
1138 ///
1139 /// // Each producer writes to its own record
1140 /// indoor_producer.produce(indoor_temp).await?;
1141 /// outdoor_producer.produce(outdoor_temp).await?;
1142 /// ```
1143 pub fn producer<T>(
1144 &self,
1145 key: impl Into<alloc::string::String>,
1146 ) -> crate::typed_api::Producer<T, R>
1147 where
1148 T: Send + 'static + Debug + Clone,
1149 {
1150 crate::typed_api::Producer::new(Arc::new(self.clone()), key.into())
1151 }
1152
1153 /// Creates a type-safe consumer for a specific record by key
1154 ///
1155 /// Returns a `Consumer<T, R>` bound to a specific record key.
1156 ///
1157 /// # Arguments
1158 /// * `key` - The record key (e.g., "sensor.temperature")
1159 ///
1160 /// # Example
1161 ///
1162 /// ```rust,ignore
1163 /// let indoor_consumer = db.consumer::<Temperature>("sensors.indoor");
1164 /// let outdoor_consumer = db.consumer::<Temperature>("sensors.outdoor");
1165 ///
1166 /// // Each consumer reads from its own record
1167 /// let mut rx = indoor_consumer.subscribe()?;
1168 /// ```
1169 pub fn consumer<T>(
1170 &self,
1171 key: impl Into<alloc::string::String>,
1172 ) -> crate::typed_api::Consumer<T, R>
1173 where
1174 T: Send + Sync + 'static + Debug + Clone,
1175 {
1176 crate::typed_api::Consumer::new(Arc::new(self.clone()), key.into())
1177 }
1178
1179 /// Resolve a record key to its RecordId
1180 ///
1181 /// Useful for checking if a record exists before operations.
1182 ///
1183 /// # Example
1184 ///
1185 /// ```rust,ignore
1186 /// if let Some(id) = db.resolve_key("sensors.temperature") {
1187 /// println!("Record exists with ID: {}", id);
1188 /// }
1189 /// ```
1190 pub fn resolve_key(&self, key: &str) -> Option<crate::record_id::RecordId> {
1191 self.inner.resolve_str(key)
1192 }
1193
1194 /// Get all record IDs for a specific type
1195 ///
1196 /// Returns a slice of RecordIds for all records of type T.
1197 /// Useful for introspection when multiple records of the same type exist.
1198 ///
1199 /// # Example
1200 ///
1201 /// ```rust,ignore
1202 /// let temp_ids = db.records_of_type::<Temperature>();
1203 /// println!("Found {} temperature records", temp_ids.len());
1204 /// ```
1205 pub fn records_of_type<T: 'static>(&self) -> &[crate::record_id::RecordId] {
1206 self.inner.records_of_type::<T>()
1207 }
1208
1209 /// Returns a reference to the runtime adapter
1210 ///
1211 /// Provides direct access to the concrete runtime type.
1212 pub fn runtime(&self) -> &R {
1213 &self.runtime
1214 }
1215
1216 /// Lists all registered records (std only)
1217 ///
1218 /// Returns metadata for all registered records, useful for remote access introspection.
1219 /// Available only when the `std` feature is enabled.
1220 ///
1221 /// # Example
1222 /// ```rust,ignore
1223 /// let records = db.list_records();
1224 /// for record in records {
1225 /// println!("Record: {} ({})", record.name, record.type_id);
1226 /// }
1227 /// ```
1228 #[cfg(feature = "std")]
1229 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
1230 self.inner.list_records()
1231 }
1232
1233 /// Try to get record's latest value as JSON by name (std only)
1234 ///
1235 /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
1236 ///
1237 /// # Arguments
1238 /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
1239 ///
1240 /// # Returns
1241 /// `Some(JsonValue)` with current value, or `None` if unavailable
1242 #[cfg(feature = "std")]
1243 pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
1244 self.inner.try_latest_as_json(record_name)
1245 }
1246
1247 /// Sets a record value from JSON (remote access API)
1248 ///
1249 /// Deserializes JSON and produces the value to the record's buffer.
1250 ///
1251 /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
1252 /// records without active producers.
1253 ///
1254 /// # Arguments
1255 /// * `record_name` - Full Rust type name
1256 /// * `json_value` - JSON value to set
1257 ///
1258 /// # Returns
1259 /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
1260 ///
1261 /// # Example (internal use)
1262 /// ```rust,ignore
1263 /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
1264 /// ```
1265 #[cfg(feature = "std")]
1266 pub fn set_record_from_json(
1267 &self,
1268 record_name: &str,
1269 json_value: serde_json::Value,
1270 ) -> DbResult<()> {
1271 self.inner.set_record_from_json(record_name, json_value)
1272 }
1273
1274 /// Subscribe to record updates as JSON stream (std only)
1275 ///
1276 /// Creates a subscription to a record's buffer and forwards updates as JSON
1277 /// to a bounded channel. This is used internally by the remote access protocol
1278 /// for implementing `record.subscribe`.
1279 ///
1280 /// # Architecture
1281 ///
1282 /// Spawns a consumer task that:
1283 /// 1. Subscribes to the record's buffer using the existing buffer API
1284 /// 2. Reads values as they arrive
1285 /// 3. Serializes each value to JSON
1286 /// 4. Sends JSON values to a bounded channel (with backpressure handling)
1287 /// 5. Terminates when either:
1288 /// - The cancel signal is received (unsubscribe)
1289 /// - The channel receiver is dropped (client disconnected)
1290 ///
1291 /// # Arguments
1292 /// * `record_key` - Key of the record to subscribe to
1293 /// * `queue_size` - Size of the bounded channel for this subscription
1294 ///
1295 /// # Returns
1296 /// `Ok((receiver, cancel_tx))` where:
1297 /// - `receiver`: Bounded channel receiver for JSON values
1298 /// - `cancel_tx`: One-shot sender to cancel the subscription
1299 ///
1300 /// `Err` if:
1301 /// - Record not found for the given key
1302 /// - Record not configured with `.with_remote_access()`
1303 /// - Failed to subscribe to buffer
1304 ///
1305 /// # Example (internal use)
1306 ///
1307 /// ```rust,ignore
1308 /// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
1309 ///
1310 /// // Read events
1311 /// while let Some(json_value) = rx.recv().await {
1312 /// // Forward to client...
1313 /// }
1314 ///
1315 /// // Cancel subscription
1316 /// let _ = cancel_tx.send(());
1317 /// ```
1318 #[cfg(feature = "std")]
1319 #[allow(unused_variables)] // Variables used only in tracing feature
1320 pub fn subscribe_record_updates(
1321 &self,
1322 record_key: &str,
1323 queue_size: usize,
1324 ) -> DbResult<(
1325 tokio::sync::mpsc::Receiver<serde_json::Value>,
1326 tokio::sync::oneshot::Sender<()>,
1327 )> {
1328 use tokio::sync::{mpsc, oneshot};
1329
1330 // Find the record by key
1331 let id = self
1332 .inner
1333 .resolve_str(record_key)
1334 .ok_or_else(|| DbError::RecordKeyNotFound {
1335 key: record_key.to_string(),
1336 })?;
1337
1338 let record = self
1339 .inner
1340 .storage(id)
1341 .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
1342
1343 // Subscribe to the record's buffer as JSON stream
1344 // This will fail if record not configured with .with_remote_access()
1345 let mut json_reader = record.subscribe_json()?;
1346
1347 // Create channels for the subscription
1348 let (value_tx, value_rx) = mpsc::channel(queue_size);
1349 let (cancel_tx, mut cancel_rx) = oneshot::channel();
1350
1351 // Get metadata for logging
1352 let type_id = self.inner.types[id.index()];
1353 let key = self.inner.keys[id.index()];
1354 let record_metadata = record.collect_metadata(type_id, key, id);
1355 let runtime = self.runtime.clone();
1356
1357 // Spawn consumer task that forwards JSON values from buffer to channel
1358 let spawn_result = runtime.spawn(async move {
1359 #[cfg(feature = "tracing")]
1360 tracing::debug!(
1361 "Subscription consumer task started for {}",
1362 record_metadata.name
1363 );
1364
1365 // Main event loop: read from buffer and forward to channel
1366 loop {
1367 tokio::select! {
1368 // Handle cancellation signal
1369 _ = &mut cancel_rx => {
1370 #[cfg(feature = "tracing")]
1371 tracing::debug!("Subscription cancelled");
1372 break;
1373 }
1374 // Read next JSON value from buffer
1375 result = json_reader.recv_json() => {
1376 match result {
1377 Ok(json_val) => {
1378 // Send JSON value to subscription channel
1379 if value_tx.send(json_val).await.is_err() {
1380 #[cfg(feature = "tracing")]
1381 tracing::debug!("Subscription receiver dropped");
1382 break;
1383 }
1384 }
1385 Err(DbError::BufferLagged { lag_count, .. }) => {
1386 // Consumer fell behind - log warning but continue
1387 #[cfg(feature = "tracing")]
1388 tracing::warn!(
1389 "Subscription for {} lagged by {} messages",
1390 record_metadata.name,
1391 lag_count
1392 );
1393 // Continue reading - next recv will get latest
1394 }
1395 Err(DbError::BufferClosed { .. }) => {
1396 // Buffer closed (shutdown) - exit gracefully
1397 #[cfg(feature = "tracing")]
1398 tracing::debug!("Buffer closed for {}", record_metadata.name);
1399 break;
1400 }
1401 Err(e) => {
1402 // Other error (shouldn't happen in practice)
1403 #[cfg(feature = "tracing")]
1404 tracing::error!(
1405 "Subscription error for {}: {:?}",
1406 record_metadata.name,
1407 e
1408 );
1409 break;
1410 }
1411 }
1412 }
1413 }
1414 }
1415
1416 #[cfg(feature = "tracing")]
1417 tracing::debug!("Subscription consumer task terminated");
1418 });
1419
1420 spawn_result.map_err(DbError::from)?;
1421
1422 Ok((value_rx, cancel_tx))
1423 }
1424
1425 /// Collects inbound connector routes for automatic router construction (std only)
1426 ///
1427 /// Iterates all records, filters their inbound_connectors by scheme,
1428 /// and returns routes with producer creation callbacks.
1429 ///
1430 /// # Arguments
1431 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1432 ///
1433 /// # Returns
1434 /// Vector of tuples: (topic, producer_trait, deserializer)
1435 ///
1436 /// The topic is resolved dynamically if a `TopicResolverFn` is configured,
1437 /// otherwise the static topic from the URL is used.
1438 ///
1439 /// # Example
1440 /// ```rust,ignore
1441 /// // In MqttConnector after db.build()
1442 /// let routes = db.collect_inbound_routes("mqtt");
1443 /// let router = RouterBuilder::from_routes(routes).build();
1444 /// connector.set_router(router).await?;
1445 /// ```
1446 #[cfg(feature = "alloc")]
1447 pub fn collect_inbound_routes(
1448 &self,
1449 scheme: &str,
1450 ) -> Vec<(
1451 String,
1452 Box<dyn crate::connector::ProducerTrait>,
1453 crate::connector::DeserializerFn,
1454 )> {
1455 let mut routes = Vec::new();
1456
1457 // Convert self to Arc<dyn Any> for producer factory
1458 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1459
1460 for record in &self.inner.storages {
1461 let inbound_links = record.inbound_connectors();
1462
1463 for link in inbound_links {
1464 // Filter by scheme
1465 if link.url.scheme() != scheme {
1466 continue;
1467 }
1468
1469 // Resolve topic: dynamic (from resolver) or static (from URL)
1470 let topic = link.resolve_topic();
1471
1472 // Create producer using the stored factory
1473 if let Some(producer) = link.create_producer(db_any.clone()) {
1474 routes.push((topic, producer, link.deserializer.clone()));
1475 }
1476 }
1477 }
1478
1479 #[cfg(feature = "tracing")]
1480 if !routes.is_empty() {
1481 tracing::debug!(
1482 "Collected {} inbound routes for scheme '{}'",
1483 routes.len(),
1484 scheme
1485 );
1486 }
1487
1488 routes
1489 }
1490
1491 /// Collects outbound routes for a specific protocol scheme
1492 ///
1493 /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1494 /// filters their outbound_connectors by scheme, and returns routes with
1495 /// consumer creation callbacks.
1496 ///
1497 /// This method is called by connectors during their `build()` phase to
1498 /// collect all configured outbound routes and spawn publisher tasks.
1499 ///
1500 /// # Arguments
1501 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1502 ///
1503 /// # Returns
1504 /// Vector of tuples: (destination, consumer_trait, serializer, config)
1505 ///
1506 /// The config Vec contains protocol-specific options (e.g., qos, retain).
1507 ///
1508 /// # Example
1509 /// ```rust,ignore
1510 /// // In MqttConnector::build()
1511 /// let routes = db.collect_outbound_routes("mqtt");
1512 /// for (topic, consumer, serializer, config) in routes {
1513 /// connector.spawn_publisher(topic, consumer, serializer, config)?;
1514 /// }
1515 /// ```
1516 #[cfg(feature = "alloc")]
1517 pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1518 let mut routes = Vec::new();
1519
1520 // Convert self to Arc<dyn Any> for consumer factory
1521 // This is necessary because the factory takes Arc<dyn Any> to avoid
1522 // needing to know the runtime type R at the factory definition site
1523 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1524
1525 for record in &self.inner.storages {
1526 let outbound_links = record.outbound_connectors();
1527
1528 for link in outbound_links {
1529 // Filter by scheme
1530 if link.url.scheme() != scheme {
1531 continue;
1532 }
1533
1534 let destination = link.url.resource_id();
1535
1536 // Skip links without serializer
1537 let Some(serializer) = link.serializer.clone() else {
1538 #[cfg(feature = "tracing")]
1539 tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1540 continue;
1541 };
1542
1543 // Create consumer using the stored factory
1544 if let Some(consumer) = link.create_consumer(db_any.clone()) {
1545 routes.push((
1546 destination,
1547 consumer,
1548 serializer,
1549 link.config.clone(),
1550 link.topic_provider.clone(),
1551 ));
1552 }
1553 }
1554 }
1555
1556 #[cfg(feature = "tracing")]
1557 if !routes.is_empty() {
1558 tracing::debug!(
1559 "Collected {} outbound routes for scheme '{}'",
1560 routes.len(),
1561 scheme
1562 );
1563 }
1564
1565 routes
1566 }
1567}