Skip to main content

aimdb_core/
builder.rs

1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::vec::Vec;
13use hashbrown::HashMap;
14
15#[cfg(not(feature = "std"))]
16use alloc::{boxed::Box, sync::Arc};
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19use alloc::string::{String, ToString};
20
21#[cfg(feature = "std")]
22use std::{boxed::Box, sync::Arc};
23
24use crate::extensions::Extensions;
25use crate::graph::DependencyGraph;
26
27/// Type-erased on_start function stored in `AimDbBuilder::start_fns`.
28///
29/// Defined once here so `on_start()` (which stores) and `build()` (which
30/// downcasts) share the *exact same* type and a silent type mismatch cannot
31/// cause a runtime panic.
32#[cfg(feature = "std")]
33type StartFnType<R> = Box<
34    dyn FnOnce(
35            Arc<R>,
36        )
37            -> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
38        + Send,
39>;
40#[cfg(not(feature = "std"))]
41type NoStdStartFnType<R> = Box<
42    dyn FnOnce(
43            Arc<R>,
44        )
45            -> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
46        + Send,
47>;
48use crate::record_id::{RecordId, RecordKey, StringKey};
49use crate::typed_api::{RecordRegistrar, RecordT};
50use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
51use crate::{DbError, DbResult};
52
53/// Type alias for outbound route tuples returned by `collect_outbound_routes`
54///
55/// Each tuple contains:
56/// - `String` - Default topic/destination from the URL path
57/// - `Box<dyn ConsumerTrait>` - Consumer for subscribing to record values
58/// - `SerializerFn` - User-provided serializer for the record type
59/// - `Vec<(String, String)>` - Configuration options from the URL query
60/// - `Option<TopicProviderFn>` - Optional dynamic topic provider
61#[cfg(feature = "alloc")]
62pub type OutboundRoute = (
63    String,
64    Box<dyn crate::connector::ConsumerTrait>,
65    crate::connector::SerializerFn,
66    Vec<(String, String)>,
67    Option<crate::connector::TopicProviderFn>,
68);
69
70/// Marker type for untyped builder (before runtime is set)
71pub struct NoRuntime;
72
73/// Internal database state
74///
75/// Holds the registry of typed records with multiple index structures for
76/// efficient access patterns:
77///
78/// - **`storages`**: Vec for O(1) hot-path access by RecordId
79/// - **`by_key`**: HashMap for O(1) lookup by stable RecordKey
80/// - **`by_type`**: HashMap for introspection (find all records of type T)
81/// - **`types`**: Vec for runtime type validation during downcasts
82/// - **`dependency_graph`**: DAG of record relationships (immutable after build)
83pub struct AimDbInner {
84    /// Record storage (hot path - indexed by RecordId)
85    ///
86    /// Order matches registration order. Immutable after build().
87    storages: Vec<Box<dyn AnyRecord>>,
88
89    /// Name → RecordId lookup (control plane)
90    ///
91    /// Used by remote access, CLI, MCP for O(1) name resolution.
92    by_key: HashMap<StringKey, RecordId>,
93
94    /// TypeId → RecordIds lookup (introspection)
95    ///
96    /// Enables "find all Temperature records" queries.
97    by_type: HashMap<TypeId, Vec<RecordId>>,
98
99    /// RecordId → TypeId lookup (type safety assertions)
100    ///
101    /// Used to validate downcasts at runtime.
102    types: Vec<TypeId>,
103
104    /// RecordId → StringKey lookup (reverse mapping)
105    ///
106    /// Used to get the key for a given record ID.
107    keys: Vec<StringKey>,
108
109    /// Dependency graph (immutable after build)
110    ///
111    /// Contains all record nodes, edges, and topological order.
112    /// Used for introspection, spawn ordering, and graph queries.
113    dependency_graph: crate::graph::DependencyGraph,
114
115    /// Generic extension storage (frozen after build)
116    ///
117    /// External crates (e.g. `aimdb-persistence`) store typed state here
118    /// during builder configuration and retrieve it at query time via
119    /// `AimDb::extensions()`.
120    pub(crate) extensions: Extensions,
121}
122
123impl AimDbInner {
124    /// Resolve RecordKey to RecordId (control plane - O(1) average)
125    #[inline]
126    pub fn resolve<K: RecordKey>(&self, key: &K) -> Option<RecordId> {
127        self.by_key.get(key.as_str()).copied()
128    }
129
130    /// Resolve string to RecordId (convenience for remote access)
131    ///
132    /// O(1) average thanks to `Borrow<str>` implementation on `RecordKey`.
133    #[inline]
134    pub fn resolve_str(&self, name: &str) -> Option<RecordId> {
135        self.by_key.get(name).copied()
136    }
137
138    /// Get storage by RecordId (hot path - O(1))
139    #[inline]
140    pub fn storage(&self, id: RecordId) -> Option<&dyn AnyRecord> {
141        self.storages.get(id.index()).map(|b| b.as_ref())
142    }
143
144    /// Get the StringKey for a given RecordId
145    #[inline]
146    pub fn key_for(&self, id: RecordId) -> Option<&StringKey> {
147        self.keys.get(id.index())
148    }
149
150    /// Get all RecordIds for a type (introspection)
151    pub fn records_of_type<T: 'static>(&self) -> &[RecordId] {
152        self.by_type
153            .get(&TypeId::of::<T>())
154            .map(|v| v.as_slice())
155            .unwrap_or(&[])
156    }
157
158    /// Get the number of registered records
159    #[inline]
160    pub fn record_count(&self) -> usize {
161        self.storages.len()
162    }
163
164    /// Get the dependency graph (immutable reference)
165    ///
166    /// The graph is constructed once during `build()` and never changes.
167    /// Contains all record nodes, edges, and topological ordering.
168    #[inline]
169    pub fn dependency_graph(&self) -> &crate::graph::DependencyGraph {
170        &self.dependency_graph
171    }
172
173    /// Helper to get a typed record by RecordKey
174    ///
175    /// This encapsulates the common pattern of:
176    /// 1. Resolving key to RecordId
177    /// 2. Validating TypeId matches
178    /// 3. Downcasting to the typed record
179    pub fn get_typed_record_by_key<T, R>(
180        &self,
181        key: impl AsRef<str>,
182    ) -> DbResult<&TypedRecord<T, R>>
183    where
184        T: Send + 'static + Debug + Clone,
185        R: aimdb_executor::Spawn + 'static,
186    {
187        let key_str = key.as_ref();
188
189        // Resolve key to RecordId
190        let id = self.resolve_str(key_str).ok_or({
191            #[cfg(feature = "std")]
192            {
193                DbError::RecordKeyNotFound {
194                    key: key_str.to_string(),
195                }
196            }
197            #[cfg(not(feature = "std"))]
198            {
199                DbError::RecordKeyNotFound { _key: () }
200            }
201        })?;
202
203        self.get_typed_record_by_id::<T, R>(id)
204    }
205
206    /// Helper to get a typed record by RecordId with type validation
207    pub fn get_typed_record_by_id<T, R>(&self, id: RecordId) -> DbResult<&TypedRecord<T, R>>
208    where
209        T: Send + 'static + Debug + Clone,
210        R: aimdb_executor::Spawn + 'static,
211    {
212        use crate::typed_record::AnyRecordExt;
213
214        // Validate RecordId is in bounds
215        if id.index() >= self.storages.len() {
216            return Err(DbError::InvalidRecordId { id: id.raw() });
217        }
218
219        // Validate TypeId matches
220        let expected = TypeId::of::<T>();
221        let actual = self.types[id.index()];
222        if expected != actual {
223            #[cfg(feature = "std")]
224            return Err(DbError::TypeMismatch {
225                record_id: id.raw(),
226                expected_type: core::any::type_name::<T>().to_string(),
227            });
228            #[cfg(not(feature = "std"))]
229            return Err(DbError::TypeMismatch {
230                record_id: id.raw(),
231                _expected_type: (),
232            });
233        }
234
235        // Safe to downcast (type validated above)
236        let record = &self.storages[id.index()];
237
238        #[cfg(feature = "std")]
239        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
240            operation: "get_typed_record_by_id".to_string(),
241            reason: "type mismatch during downcast".to_string(),
242        })?;
243
244        #[cfg(not(feature = "std"))]
245        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
246            _operation: (),
247            _reason: (),
248        })?;
249
250        Ok(typed_record)
251    }
252
253    /// Collects metadata for all registered records (std only)
254    ///
255    /// Returns a vector of `RecordMetadata` for remote access introspection.
256    /// Available only when the `std` feature is enabled.
257    #[cfg(feature = "std")]
258    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
259        self.storages
260            .iter()
261            .enumerate()
262            .map(|(i, record)| {
263                let id = RecordId::new(i as u32);
264                let type_id = self.types[i];
265                let key = self.keys[i];
266                record.collect_metadata(type_id, key, id)
267            })
268            .collect()
269    }
270
271    /// Try to get record's latest value as JSON by record key (std only)
272    ///
273    /// O(1) lookup using the key-based index.
274    ///
275    /// # Arguments
276    /// * `record_key` - The record key (e.g., "sensors.temperature")
277    ///
278    /// # Returns
279    /// `Some(JsonValue)` with the current record value, or `None`
280    #[cfg(feature = "std")]
281    pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
282        let id = self.resolve_str(record_key)?;
283        self.storages.get(id.index())?.latest_json()
284    }
285
286    /// Sets a record value from JSON (remote access API)
287    ///
288    /// Deserializes the JSON value and writes it to the record's buffer.
289    ///
290    /// **SAFETY:** Enforces the "No Producer Override" rule:
291    /// - Only works for records with `producer_count == 0`
292    /// - Returns error if the record has active producers
293    ///
294    /// # Arguments
295    /// * `record_key` - The record key (e.g., "config.app")
296    /// * `json_value` - JSON representation of the value
297    ///
298    /// # Returns
299    /// - `Ok(())` - Successfully set the value
300    /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
301    #[cfg(feature = "std")]
302    pub fn set_record_from_json(
303        &self,
304        record_key: &str,
305        json_value: serde_json::Value,
306    ) -> DbResult<()> {
307        let id = self
308            .resolve_str(record_key)
309            .ok_or_else(|| DbError::RecordKeyNotFound {
310                key: record_key.to_string(),
311            })?;
312
313        self.storages[id.index()].set_from_json(json_value)
314    }
315}
316
317/// Database builder for producer-consumer pattern
318///
319/// Provides a fluent API for constructing databases with type-safe record registration.
320/// Use `.runtime()` to set the runtime and transition to a typed builder.
321pub struct AimDbBuilder<R = NoRuntime> {
322    /// Registered records with their keys (order matters for RecordId assignment)
323    records: Vec<(StringKey, TypeId, Box<dyn AnyRecord>)>,
324
325    /// Runtime adapter
326    runtime: Option<Arc<R>>,
327
328    /// Connector builders that will be invoked during build()
329    connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
330
331    /// Spawn functions with their keys
332    spawn_fns: Vec<(StringKey, Box<dyn core::any::Any + Send>)>,
333
334    /// Startup tasks registered via on_start() — spawned after build() completes.
335    /// Stored type-erased (Box<Box<dyn FnOnce(Arc<R>) -> BoxFuture<…>>>) to allow
336    /// the field to exist on the unparameterised NoRuntime builder too.
337    start_fns: Vec<Box<dyn core::any::Any + Send>>,
338
339    /// Generic extension storage for external crates (e.g., persistence, metrics).
340    /// Moved into AimDbInner during build() so it can be read on the live AimDb handle.
341    extensions: Extensions,
342
343    /// Remote access configuration (std only)
344    #[cfg(feature = "std")]
345    remote_config: Option<crate::remote::AimxConfig>,
346
347    /// PhantomData to track the runtime type parameter
348    _phantom: PhantomData<R>,
349}
350
351impl AimDbBuilder<NoRuntime> {
352    /// Creates a new database builder without a runtime
353    ///
354    /// Call `.runtime()` to set the runtime adapter.
355    pub fn new() -> Self {
356        Self {
357            records: Vec::new(),
358            runtime: None,
359            connector_builders: Vec::new(),
360            spawn_fns: Vec::new(),
361            start_fns: Vec::new(),
362            extensions: Extensions::new(),
363            #[cfg(feature = "std")]
364            remote_config: None,
365            _phantom: PhantomData,
366        }
367    }
368
369    /// Sets the runtime adapter
370    ///
371    /// This transitions the builder from untyped to typed with concrete runtime `R`.
372    ///
373    /// # Type Safety Note
374    ///
375    /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
376    /// transition because connectors are parameterized by the runtime type:
377    ///
378    /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
379    /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
380    ///
381    /// These types are incompatible and cannot be transferred. However, this is not a bug
382    /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
383    /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
384    ///
385    /// This means the type system **enforces** the correct call order:
386    /// ```rust,ignore
387    /// AimDbBuilder::new()
388    ///     .runtime(runtime)           // ← Must be called first
389    ///     .with_connector(connector)  // ← Now available
390    /// ```
391    ///
392    /// The `records` and `remote_config` are preserved across the transition since they
393    /// are not parameterized by the runtime type.
394    pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
395    where
396        R: aimdb_executor::Spawn + 'static,
397    {
398        AimDbBuilder {
399            records: self.records,
400            runtime: Some(rt),
401            connector_builders: Vec::new(),
402            spawn_fns: Vec::new(),
403            start_fns: self.start_fns,
404            extensions: self.extensions,
405            #[cfg(feature = "std")]
406            remote_config: self.remote_config,
407            _phantom: PhantomData,
408        }
409    }
410}
411
412impl<R> AimDbBuilder<R>
413where
414    R: aimdb_executor::Spawn + 'static,
415{
416    /// Returns a shared reference to the extension storage.
417    ///
418    /// External crates use this to retrieve state stored via `extensions_mut()`.
419    pub fn extensions(&self) -> &Extensions {
420        &self.extensions
421    }
422
423    /// Returns a mutable reference to the extension storage.
424    ///
425    /// Use this inside `AimDbBuilderPersistExt` (or similar) to store typed state
426    /// that `.persist()` and `AimDbQueryExt` will later retrieve.
427    pub fn extensions_mut(&mut self) -> &mut Extensions {
428        &mut self.extensions
429    }
430
431    /// Registers a task to be spawned after `build()` completes.
432    ///
433    /// The closure receives an `Arc<R>` (the runtime adapter) and must return a
434    /// future that runs for as long as needed (e.g. an infinite cleanup loop).
435    /// Tasks are spawned in registration order, after all record tasks and
436    /// connectors have been started.
437    ///
438    /// # Example
439    /// ```rust,ignore
440    /// builder.on_start(|runtime| async move {
441    ///     loop {
442    ///         do_cleanup().await;
443    ///         runtime.sleep(Duration::from_secs(3600)).await;
444    ///     }
445    /// });
446    /// ```
447    pub fn on_start<F, Fut>(&mut self, f: F) -> &mut Self
448    where
449        F: FnOnce(Arc<R>) -> Fut + Send + 'static,
450        Fut: core::future::Future<Output = ()> + Send + 'static,
451    {
452        // Type-erase so the field can be shared with the `NoRuntime` builder struct.
453        // Uses the module-level `StartFnType<R>` alias — must stay in sync with
454        // the downcast in `build()`.
455        #[cfg(feature = "std")]
456        let boxed: StartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
457        #[cfg(not(feature = "std"))]
458        let boxed: NoStdStartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
459        self.start_fns.push(Box::new(boxed));
460        self
461    }
462
463    /// Registers a connector builder that will be invoked during `build()`
464    ///
465    /// The connector builder will be called after the database is constructed,
466    /// allowing it to collect routes and initialize the connector properly.
467    ///
468    /// # Arguments
469    /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
470    ///
471    /// # Example
472    ///
473    /// ```rust,ignore
474    /// use aimdb_mqtt_connector::MqttConnector;
475    ///
476    /// let db = AimDbBuilder::new()
477    ///     .runtime(runtime)
478    ///     .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
479    ///     .configure::<Temperature>(|reg| {
480    ///         reg.link_from("mqtt://commands/temp")...
481    ///     })
482    ///     .build().await?;
483    /// ```
484    pub fn with_connector(
485        mut self,
486        builder: impl crate::connector::ConnectorBuilder<R> + 'static,
487    ) -> Self {
488        self.connector_builders.push(Box::new(builder));
489        self
490    }
491
492    /// Enables remote access via AimX protocol (std only)
493    ///
494    /// Configures the database to accept remote connections over a Unix domain socket,
495    /// allowing external clients to introspect records, subscribe to updates, and
496    /// (optionally) write data.
497    ///
498    /// The remote access supervisor will be spawned automatically during `build()`.
499    ///
500    /// # Arguments
501    /// * `config` - Remote access configuration (socket path, security policy, etc.)
502    ///
503    /// # Example
504    ///
505    /// ```rust,ignore
506    /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
507    ///
508    /// let config = AimxConfig::new("/tmp/aimdb.sock")
509    ///     .with_security(SecurityPolicy::read_only());
510    ///
511    /// let db = AimDbBuilder::new()
512    ///     .runtime(runtime)
513    ///     .with_remote_access(config)
514    ///     .build()?;
515    /// ```
516    #[cfg(feature = "std")]
517    pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
518        self.remote_config = Some(config);
519        self
520    }
521
522    /// Configures a record type manually with a unique key
523    ///
524    /// The key uniquely identifies this record instance. Multiple records of the same
525    /// type can exist with different keys (e.g., "sensor.temperature.room1" and
526    /// "sensor.temperature.room2").
527    ///
528    /// # Arguments
529    /// * `key` - A unique identifier for this record. Can be a string literal, `StringKey`,
530    ///   or any type implementing `RecordKey` (including user-defined enum keys).
531    /// * `f` - Configuration closure
532    ///
533    /// # Example
534    /// ```rust,ignore
535    /// // Using string literal
536    /// builder.configure::<Temperature>("sensor.temp.room1", |reg| { ... });
537    ///
538    /// // Using compile-time safe enum key
539    /// builder.configure::<Temperature>(SensorKey::TempRoom1, |reg| { ... });
540    /// ```
541    pub fn configure<T>(
542        &mut self,
543        key: impl RecordKey,
544        f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
545    ) -> &mut Self
546    where
547        T: Send + Sync + 'static + Debug + Clone,
548    {
549        // Convert any RecordKey to StringKey for internal storage
550        let record_key: StringKey = StringKey::from_dynamic(key.as_str());
551        let type_id = TypeId::of::<T>();
552
553        // Find existing record with this key, or create new one
554        let record_index = self.records.iter().position(|(k, _, _)| k == &record_key);
555
556        let (rec, is_new_record) = match record_index {
557            Some(idx) => {
558                // Use existing record
559                let (_, existing_type, record) = &mut self.records[idx];
560                assert!(
561                    *existing_type == type_id,
562                    "StringKey '{}' already registered with different type",
563                    record_key.as_str()
564                );
565                (
566                    record
567                        .as_typed_mut::<T, R>()
568                        .expect("type mismatch in record registry"),
569                    false,
570                )
571            }
572            None => {
573                // Create new record
574                self.records
575                    .push((record_key, type_id, Box::new(TypedRecord::<T, R>::new())));
576                let (_, _, record) = self.records.last_mut().unwrap();
577                (
578                    record
579                        .as_typed_mut::<T, R>()
580                        .expect("type mismatch in record registry"),
581                    true,
582                )
583            }
584        };
585
586        let mut reg = RecordRegistrar {
587            rec,
588            connector_builders: &self.connector_builders,
589            #[cfg(feature = "alloc")]
590            record_key: record_key.as_str().to_string(),
591            extensions: &self.extensions,
592        };
593        f(&mut reg);
594
595        // Only store spawn function for new records to avoid duplicates
596        if is_new_record {
597            let spawn_key = record_key;
598
599            #[allow(clippy::type_complexity)]
600            let spawn_fn: Box<
601                dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send,
602            > = Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>, id: RecordId| {
603                // Use RecordSpawner to spawn tasks for this record type
604                use crate::typed_record::RecordSpawner;
605
606                let typed_record = db.inner().get_typed_record_by_id::<T, R>(id)?;
607                // Get the record key from the database to enable key-based producer/consumer
608                #[cfg(feature = "alloc")]
609                let key = db
610                    .inner()
611                    .key_for(id)
612                    .map(|k| k.as_str().to_string())
613                    .unwrap_or_else(|| alloc::format!("__record_{}", id.index()));
614                #[cfg(not(feature = "alloc"))]
615                let key = "";
616                #[cfg(feature = "alloc")]
617                let key = key.as_str();
618                RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db, key)
619            });
620
621            // Store the spawn function (type-erased in Box<dyn Any>)
622            self.spawn_fns.push((spawn_key, Box::new(spawn_fn)));
623        }
624
625        self
626    }
627
628    /// Registers a self-registering record type
629    ///
630    /// The record type must implement `RecordT<R>`.
631    ///
632    /// Uses the type name as the default key. For custom keys, use `configure()` directly.
633    pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
634    where
635        T: RecordT<R>,
636    {
637        // Default key is the full type name for backward compatibility
638        let key = StringKey::new(core::any::type_name::<T>());
639        self.configure::<T>(key, |reg| T::register(reg, cfg))
640    }
641
642    /// Registers a self-registering record type with a custom key
643    ///
644    /// The record type must implement `RecordT<R>`.
645    pub fn register_record_with_key<T>(&mut self, key: impl RecordKey, cfg: &T::Config) -> &mut Self
646    where
647        T: RecordT<R>,
648    {
649        self.configure::<T>(key, |reg| T::register(reg, cfg))
650    }
651
652    /// Runs the database indefinitely (never returns)
653    ///
654    /// This method builds the database, spawns all producer and consumer tasks, and then
655    /// parks the current task indefinitely. This is the primary way to run AimDB services.
656    ///
657    /// All logic runs in background tasks via producers, consumers, and connectors. The
658    /// application continues until interrupted (e.g., Ctrl+C).
659    ///
660    /// # Returns
661    /// `DbResult<()>` - Ok when database starts successfully, then parks forever
662    ///
663    /// # Example
664    ///
665    /// ```rust,ignore
666    /// #[tokio::main]
667    /// async fn main() -> DbResult<()> {
668    ///     AimDbBuilder::new()
669    ///         .runtime(Arc::new(TokioAdapter::new()?))
670    ///         .configure::<MyData>(|reg| {
671    ///             reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
672    ///                .with_source(my_producer)
673    ///                .with_tap(my_consumer);
674    ///         })
675    ///         .run().await  // Runs forever
676    /// }
677    /// ```
678    pub async fn run(self) -> DbResult<()> {
679        #[cfg(feature = "tracing")]
680        tracing::info!("Building database and spawning background tasks...");
681
682        let _db = self.build().await?;
683
684        #[cfg(feature = "tracing")]
685        tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
686
687        // Park indefinitely - the background tasks will continue running
688        // The database handle is kept alive here to prevent dropping it
689        core::future::pending::<()>().await;
690
691        Ok(())
692    }
693
694    /// Builds the database and returns the handle (async)
695    ///
696    /// Use this when you need programmatic access to the database handle for
697    /// manual subscriptions or production. For typical services, use `.run().await` instead.
698    ///
699    /// **Automatic Task Spawning:** This method spawns all producer services and
700    /// `.tap()` observer tasks that were registered during configuration.
701    ///
702    /// **Connector Setup:** Connectors must be created manually before calling `build()`:
703    ///
704    /// ```rust,ignore
705    /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
706    ///
707    /// // Configure records with connector links
708    /// let builder = AimDbBuilder::new()
709    ///     .runtime(runtime)
710    ///     .configure::<Temp>("temp", |reg| {
711    ///         reg.link_from("mqtt://commands/temp")
712    ///            .with_buffer(BufferCfg::SingleLatest)
713    ///            .with_remote_access();
714    ///     });
715    ///
716    /// // Create MQTT connector with router
717    /// let router = RouterBuilder::new()
718    ///     .route("commands/temp", /* deserializer */)
719    ///     .build();
720    /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
721    ///
722    /// // Register connector and build
723    /// let db = builder
724    ///     .with_connector("mqtt", Arc::new(connector))
725    ///     .build().await?;
726    /// ```
727    ///
728    /// # Returns
729    /// `DbResult<AimDb<R>>` - The database instance
730    #[cfg_attr(not(feature = "std"), allow(unused_mut))]
731    pub async fn build(self) -> DbResult<AimDb<R>> {
732        use crate::DbError;
733
734        // Validate all records
735        for (key, _, record) in &self.records {
736            record.validate().map_err(|_msg| {
737                // Suppress unused warning for key in no_std
738                let _ = &key;
739                #[cfg(feature = "std")]
740                {
741                    DbError::RuntimeError {
742                        message: format!("Record '{}' validation failed: {}", key.as_str(), _msg),
743                    }
744                }
745                #[cfg(not(feature = "std"))]
746                {
747                    DbError::RuntimeError { _message: () }
748                }
749            })?;
750        }
751
752        // Ensure runtime is set
753        let runtime = self.runtime.ok_or({
754            #[cfg(feature = "std")]
755            {
756                DbError::RuntimeError {
757                    message: "runtime not set (use .runtime())".into(),
758                }
759            }
760            #[cfg(not(feature = "std"))]
761            {
762                DbError::RuntimeError { _message: () }
763            }
764        })?;
765
766        // Build the new index structures
767        let record_count = self.records.len();
768        let mut storages: Vec<Box<dyn AnyRecord>> = Vec::with_capacity(record_count);
769        let mut by_key: HashMap<StringKey, RecordId> = HashMap::with_capacity(record_count);
770        let mut by_type: HashMap<TypeId, Vec<RecordId>> = HashMap::new();
771        let mut types: Vec<TypeId> = Vec::with_capacity(record_count);
772        let mut keys: Vec<StringKey> = Vec::with_capacity(record_count);
773
774        for (i, (key, type_id, record)) in self.records.into_iter().enumerate() {
775            let id = RecordId::new(i as u32);
776
777            // Check for duplicate keys (should not happen if configure() is used correctly)
778            if by_key.contains_key(&key) {
779                #[cfg(feature = "std")]
780                return Err(DbError::DuplicateRecordKey {
781                    key: key.as_str().to_string(),
782                });
783                #[cfg(not(feature = "std"))]
784                return Err(DbError::DuplicateRecordKey { _key: () });
785            }
786
787            // Build index structures
788            storages.push(record);
789            by_key.insert(key, id);
790            by_type.entry(type_id).or_default().push(id);
791            types.push(type_id);
792            keys.push(key);
793        }
794
795        // Build dependency graph from record information
796        // Collect RecordGraphInfo for each record
797        let record_infos: Vec<crate::graph::RecordGraphInfo> = storages
798            .iter()
799            .enumerate()
800            .map(|(idx, record)| {
801                let key = keys[idx].as_str().to_string();
802                let origin = record.record_origin();
803
804                // Get buffer type and capacity from the record
805                let (buffer_type, buffer_capacity) = record.buffer_info();
806
807                crate::graph::RecordGraphInfo {
808                    key,
809                    origin,
810                    buffer_type,
811                    buffer_capacity,
812                    tap_count: record.consumer_count(),
813                    has_outbound_link: record.outbound_connector_count() > 0,
814                }
815            })
816            .collect();
817
818        // Build and validate the dependency graph
819        let dependency_graph = DependencyGraph::build_and_validate(&record_infos)?;
820
821        #[cfg(feature = "tracing")]
822        tracing::debug!(
823            "Dependency graph built successfully ({} nodes, {} edges, topo order: {:?})",
824            dependency_graph.nodes.len(),
825            dependency_graph.edges.len(),
826            dependency_graph.topo_order
827        );
828
829        let inner = Arc::new(AimDbInner {
830            storages,
831            by_key,
832            by_type,
833            types,
834            keys,
835            dependency_graph,
836            extensions: self.extensions,
837        });
838
839        let db = Arc::new(AimDb {
840            inner: inner.clone(),
841            runtime: runtime.clone(),
842        });
843
844        #[cfg(feature = "tracing")]
845        tracing::info!(
846            "Spawning producer services and tap observers for {} records",
847            self.spawn_fns.len()
848        );
849
850        // Build a lookup map from spawn_fns for topological ordering
851        let mut spawn_fn_map: HashMap<StringKey, Box<dyn core::any::Any + Send>> =
852            self.spawn_fns.into_iter().collect();
853
854        // Execute spawn functions in topological order
855        // This ensures transforms are spawned after their input records
856        for key_str in inner.dependency_graph.topo_order() {
857            // Find the StringKey that matches this key string
858            let key = match inner.by_key.keys().find(|k| k.as_str() == key_str) {
859                Some(k) => *k,
860                None => continue, // Key not in spawn_fns, skip
861            };
862
863            // Take the spawn function (if any) for this key
864            let spawn_fn_any = match spawn_fn_map.remove(&key) {
865                Some(f) => f,
866                None => continue, // No spawn function for this key
867            };
868
869            // Resolve key to RecordId
870            let id = inner.resolve(&key).ok_or({
871                #[cfg(feature = "std")]
872                {
873                    DbError::RecordKeyNotFound {
874                        key: key.as_str().to_string(),
875                    }
876                }
877                #[cfg(not(feature = "std"))]
878                {
879                    DbError::RecordKeyNotFound { _key: () }
880                }
881            })?;
882
883            // Downcast from Box<dyn Any> back to the concrete spawn function type
884            type SpawnFnType<R> =
885                Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send>;
886
887            let spawn_fn = spawn_fn_any
888                .downcast::<SpawnFnType<R>>()
889                .expect("spawn function type mismatch");
890
891            // Execute the spawn function
892            (*spawn_fn)(&runtime, &db, id)?;
893        }
894
895        #[cfg(feature = "tracing")]
896        tracing::info!("Automatic spawning complete");
897
898        // Spawn remote access supervisor if configured (std only)
899        #[cfg(feature = "std")]
900        if let Some(remote_cfg) = self.remote_config {
901            #[cfg(feature = "tracing")]
902            tracing::info!(
903                "Spawning remote access supervisor on socket: {}",
904                remote_cfg.socket_path.display()
905            );
906
907            // Apply security policy to mark writable records
908            let writable_keys = remote_cfg.security_policy.writable_records();
909            for key_str in writable_keys {
910                if let Some(id) = inner.resolve_str(&key_str) {
911                    #[cfg(feature = "tracing")]
912                    tracing::debug!("Marking record '{}' as writable", key_str);
913
914                    // Mark the record as writable (type-erased call)
915                    inner.storages[id.index()].set_writable_erased(true);
916                }
917            }
918
919            // Spawn the remote supervisor task
920            crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
921
922            #[cfg(feature = "tracing")]
923            tracing::info!("Remote access supervisor spawned successfully");
924        }
925
926        // Build connectors from builders (after database is fully constructed)
927        // This allows connectors to use collect_inbound_routes() which creates
928        // producers tied to this specific database instance
929        for builder in self.connector_builders {
930            #[cfg(feature = "tracing")]
931            let scheme = {
932                #[cfg(feature = "std")]
933                {
934                    builder.scheme().to_string()
935                }
936                #[cfg(not(feature = "std"))]
937                {
938                    alloc::string::String::from(builder.scheme())
939                }
940            };
941
942            #[cfg(feature = "tracing")]
943            tracing::debug!("Building connector for scheme: {}", scheme);
944
945            // Build the connector (this spawns tasks as a side effect)
946            let _connector = builder.build(&db).await?;
947
948            #[cfg(feature = "tracing")]
949            tracing::info!("Connector built and spawned successfully: {}", scheme);
950        }
951
952        // Spawn on_start tasks (registered by external crates like aimdb-persistence)
953        if !self.start_fns.is_empty() {
954            #[cfg(feature = "tracing")]
955            tracing::debug!("Spawning {} on_start task(s)", self.start_fns.len());
956
957            #[cfg(feature = "std")]
958            for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
959                let start_fn = start_fn_any
960                    .downcast::<StartFnType<R>>()
961                    .unwrap_or_else(|_| {
962                        panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
963                    });
964                let future = (*start_fn)(runtime.clone());
965                runtime.spawn(future).map_err(DbError::from)?;
966            }
967
968            #[cfg(not(feature = "std"))]
969            for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
970                let start_fn = start_fn_any
971                    .downcast::<NoStdStartFnType<R>>()
972                    .unwrap_or_else(|_| {
973                        panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
974                    });
975                let future = (*start_fn)(runtime.clone());
976                runtime.spawn(future).map_err(DbError::from)?;
977            }
978        }
979
980        // Unwrap the Arc to return the owned AimDb
981        // This is safe because we just created it and hold the only reference
982        let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
983
984        Ok(db_owned)
985    }
986}
987
988impl Default for AimDbBuilder<NoRuntime> {
989    fn default() -> Self {
990        Self::new()
991    }
992}
993
994/// Producer-consumer database
995///
996/// A database instance with type-safe record registration and cross-record
997/// communication via the Emitter pattern. The type parameter `R` represents
998/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
999///
1000/// See `examples/` for usage.
1001///
1002/// # Examples
1003///
1004/// ```rust,ignore
1005/// use aimdb_tokio_adapter::TokioAdapter;
1006///
1007/// let runtime = Arc::new(TokioAdapter);
1008/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
1009///     .runtime(runtime)
1010///     .register_record::<Temperature>(&TemperatureConfig)
1011///     .build()?;
1012/// ```
1013pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
1014    /// Internal state
1015    inner: Arc<AimDbInner>,
1016
1017    /// Runtime adapter with concrete type
1018    runtime: Arc<R>,
1019}
1020
1021impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
1022    fn clone(&self) -> Self {
1023        Self {
1024            inner: self.inner.clone(),
1025            runtime: self.runtime.clone(),
1026        }
1027    }
1028}
1029
1030impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
1031    /// Internal accessor for the inner state
1032    ///
1033    /// Used by adapter crates and internal spawning logic.
1034    #[doc(hidden)]
1035    pub fn inner(&self) -> &Arc<AimDbInner> {
1036        &self.inner
1037    }
1038
1039    /// Returns the extension storage frozen at `build()` time.
1040    ///
1041    /// External crates (e.g. `aimdb-persistence`) retrieve their typed state here
1042    /// to service query calls. The extensions are read-only on the live handle.
1043    ///
1044    /// # Example
1045    /// ```rust,ignore
1046    /// use aimdb_persistence::PersistenceState;
1047    /// let state = db.extensions().get::<PersistenceState>().unwrap();
1048    /// ```
1049    pub fn extensions(&self) -> &Extensions {
1050        &self.inner.extensions
1051    }
1052
1053    /// Builds a database with a closure-based builder pattern
1054    pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
1055        let mut b = AimDbBuilder::new().runtime(rt);
1056        f(&mut b);
1057        b.run().await
1058    }
1059
1060    /// Spawns a task using the database's runtime adapter
1061    ///
1062    /// This method provides direct access to the runtime's spawn capability.
1063    ///
1064    /// # Arguments
1065    /// * `future` - The future to spawn
1066    ///
1067    /// # Returns
1068    /// `DbResult<()>` - Ok if the task was spawned successfully
1069    pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
1070    where
1071        F: core::future::Future<Output = ()> + Send + 'static,
1072    {
1073        self.runtime.spawn(future).map_err(DbError::from)?;
1074        Ok(())
1075    }
1076
1077    /// Produces a value to a specific record by key
1078    ///
1079    /// Uses O(1) key-based lookup to find the correct record.
1080    ///
1081    /// # Arguments
1082    /// * `key` - The record key (e.g., "sensor.temperature")
1083    /// * `value` - The value to produce
1084    ///
1085    /// # Example
1086    ///
1087    /// ```rust,ignore
1088    /// db.produce::<Temperature>("sensors.indoor", indoor_temp).await?;
1089    /// db.produce::<Temperature>("sensors.outdoor", outdoor_temp).await?;
1090    /// ```
1091    pub async fn produce<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
1092    where
1093        T: Send + 'static + Debug + Clone,
1094    {
1095        let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1096        typed_rec.produce(value).await;
1097        Ok(())
1098    }
1099
1100    /// Subscribes to a specific record by key
1101    ///
1102    /// Uses O(1) key-based lookup to find the correct record.
1103    ///
1104    /// # Arguments
1105    /// * `key` - The record key (e.g., "sensor.temperature")
1106    ///
1107    /// # Example
1108    ///
1109    /// ```rust,ignore
1110    /// let mut reader = db.subscribe::<Temperature>("sensors.indoor")?;
1111    /// while let Ok(temp) = reader.recv().await {
1112    ///     println!("Indoor: {:.1}°C", temp.celsius);
1113    /// }
1114    /// ```
1115    pub fn subscribe<T>(
1116        &self,
1117        key: impl AsRef<str>,
1118    ) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
1119    where
1120        T: Send + Sync + 'static + Debug + Clone,
1121    {
1122        let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1123        typed_rec.subscribe()
1124    }
1125
1126    /// Creates a type-safe producer for a specific record by key
1127    ///
1128    /// Returns a `Producer<T, R>` bound to a specific record key.
1129    ///
1130    /// # Arguments
1131    /// * `key` - The record key (e.g., "sensor.temperature")
1132    ///
1133    /// # Example
1134    ///
1135    /// ```rust,ignore
1136    /// let indoor_producer = db.producer::<Temperature>("sensors.indoor");
1137    /// let outdoor_producer = db.producer::<Temperature>("sensors.outdoor");
1138    ///
1139    /// // Each producer writes to its own record
1140    /// indoor_producer.produce(indoor_temp).await?;
1141    /// outdoor_producer.produce(outdoor_temp).await?;
1142    /// ```
1143    pub fn producer<T>(
1144        &self,
1145        key: impl Into<alloc::string::String>,
1146    ) -> crate::typed_api::Producer<T, R>
1147    where
1148        T: Send + 'static + Debug + Clone,
1149    {
1150        crate::typed_api::Producer::new(Arc::new(self.clone()), key.into())
1151    }
1152
1153    /// Creates a type-safe consumer for a specific record by key
1154    ///
1155    /// Returns a `Consumer<T, R>` bound to a specific record key.
1156    ///
1157    /// # Arguments
1158    /// * `key` - The record key (e.g., "sensor.temperature")
1159    ///
1160    /// # Example
1161    ///
1162    /// ```rust,ignore
1163    /// let indoor_consumer = db.consumer::<Temperature>("sensors.indoor");
1164    /// let outdoor_consumer = db.consumer::<Temperature>("sensors.outdoor");
1165    ///
1166    /// // Each consumer reads from its own record
1167    /// let mut rx = indoor_consumer.subscribe()?;
1168    /// ```
1169    pub fn consumer<T>(
1170        &self,
1171        key: impl Into<alloc::string::String>,
1172    ) -> crate::typed_api::Consumer<T, R>
1173    where
1174        T: Send + Sync + 'static + Debug + Clone,
1175    {
1176        crate::typed_api::Consumer::new(Arc::new(self.clone()), key.into())
1177    }
1178
1179    /// Resolve a record key to its RecordId
1180    ///
1181    /// Useful for checking if a record exists before operations.
1182    ///
1183    /// # Example
1184    ///
1185    /// ```rust,ignore
1186    /// if let Some(id) = db.resolve_key("sensors.temperature") {
1187    ///     println!("Record exists with ID: {}", id);
1188    /// }
1189    /// ```
1190    pub fn resolve_key(&self, key: &str) -> Option<crate::record_id::RecordId> {
1191        self.inner.resolve_str(key)
1192    }
1193
1194    /// Get all record IDs for a specific type
1195    ///
1196    /// Returns a slice of RecordIds for all records of type T.
1197    /// Useful for introspection when multiple records of the same type exist.
1198    ///
1199    /// # Example
1200    ///
1201    /// ```rust,ignore
1202    /// let temp_ids = db.records_of_type::<Temperature>();
1203    /// println!("Found {} temperature records", temp_ids.len());
1204    /// ```
1205    pub fn records_of_type<T: 'static>(&self) -> &[crate::record_id::RecordId] {
1206        self.inner.records_of_type::<T>()
1207    }
1208
1209    /// Returns a reference to the runtime adapter
1210    ///
1211    /// Provides direct access to the concrete runtime type.
1212    pub fn runtime(&self) -> &R {
1213        &self.runtime
1214    }
1215
1216    /// Lists all registered records (std only)
1217    ///
1218    /// Returns metadata for all registered records, useful for remote access introspection.
1219    /// Available only when the `std` feature is enabled.
1220    ///
1221    /// # Example
1222    /// ```rust,ignore
1223    /// let records = db.list_records();
1224    /// for record in records {
1225    ///     println!("Record: {} ({})", record.name, record.type_id);
1226    /// }
1227    /// ```
1228    #[cfg(feature = "std")]
1229    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
1230        self.inner.list_records()
1231    }
1232
1233    /// Try to get record's latest value as JSON by name (std only)
1234    ///
1235    /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
1236    ///
1237    /// # Arguments
1238    /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
1239    ///
1240    /// # Returns
1241    /// `Some(JsonValue)` with current value, or `None` if unavailable
1242    #[cfg(feature = "std")]
1243    pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
1244        self.inner.try_latest_as_json(record_name)
1245    }
1246
1247    /// Sets a record value from JSON (remote access API)
1248    ///
1249    /// Deserializes JSON and produces the value to the record's buffer.
1250    ///
1251    /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
1252    /// records without active producers.
1253    ///
1254    /// # Arguments
1255    /// * `record_name` - Full Rust type name
1256    /// * `json_value` - JSON value to set
1257    ///
1258    /// # Returns
1259    /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
1260    ///
1261    /// # Example (internal use)
1262    /// ```rust,ignore
1263    /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
1264    /// ```
1265    #[cfg(feature = "std")]
1266    pub fn set_record_from_json(
1267        &self,
1268        record_name: &str,
1269        json_value: serde_json::Value,
1270    ) -> DbResult<()> {
1271        self.inner.set_record_from_json(record_name, json_value)
1272    }
1273
1274    /// Subscribe to record updates as JSON stream (std only)
1275    ///
1276    /// Creates a subscription to a record's buffer and forwards updates as JSON
1277    /// to a bounded channel. This is used internally by the remote access protocol
1278    /// for implementing `record.subscribe`.
1279    ///
1280    /// # Architecture
1281    ///
1282    /// Spawns a consumer task that:
1283    /// 1. Subscribes to the record's buffer using the existing buffer API
1284    /// 2. Reads values as they arrive
1285    /// 3. Serializes each value to JSON
1286    /// 4. Sends JSON values to a bounded channel (with backpressure handling)
1287    /// 5. Terminates when either:
1288    ///    - The cancel signal is received (unsubscribe)
1289    ///    - The channel receiver is dropped (client disconnected)
1290    ///
1291    /// # Arguments
1292    /// * `record_key` - Key of the record to subscribe to
1293    /// * `queue_size` - Size of the bounded channel for this subscription
1294    ///
1295    /// # Returns
1296    /// `Ok((receiver, cancel_tx))` where:
1297    /// - `receiver`: Bounded channel receiver for JSON values
1298    /// - `cancel_tx`: One-shot sender to cancel the subscription
1299    ///
1300    /// `Err` if:
1301    /// - Record not found for the given key
1302    /// - Record not configured with `.with_remote_access()`
1303    /// - Failed to subscribe to buffer
1304    ///
1305    /// # Example (internal use)
1306    ///
1307    /// ```rust,ignore
1308    /// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
1309    ///
1310    /// // Read events
1311    /// while let Some(json_value) = rx.recv().await {
1312    ///     // Forward to client...
1313    /// }
1314    ///
1315    /// // Cancel subscription
1316    /// let _ = cancel_tx.send(());
1317    /// ```
1318    #[cfg(feature = "std")]
1319    #[allow(unused_variables)] // Variables used only in tracing feature
1320    pub fn subscribe_record_updates(
1321        &self,
1322        record_key: &str,
1323        queue_size: usize,
1324    ) -> DbResult<(
1325        tokio::sync::mpsc::Receiver<serde_json::Value>,
1326        tokio::sync::oneshot::Sender<()>,
1327    )> {
1328        use tokio::sync::{mpsc, oneshot};
1329
1330        // Find the record by key
1331        let id = self
1332            .inner
1333            .resolve_str(record_key)
1334            .ok_or_else(|| DbError::RecordKeyNotFound {
1335                key: record_key.to_string(),
1336            })?;
1337
1338        let record = self
1339            .inner
1340            .storage(id)
1341            .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
1342
1343        // Subscribe to the record's buffer as JSON stream
1344        // This will fail if record not configured with .with_remote_access()
1345        let mut json_reader = record.subscribe_json()?;
1346
1347        // Create channels for the subscription
1348        let (value_tx, value_rx) = mpsc::channel(queue_size);
1349        let (cancel_tx, mut cancel_rx) = oneshot::channel();
1350
1351        // Get metadata for logging
1352        let type_id = self.inner.types[id.index()];
1353        let key = self.inner.keys[id.index()];
1354        let record_metadata = record.collect_metadata(type_id, key, id);
1355        let runtime = self.runtime.clone();
1356
1357        // Spawn consumer task that forwards JSON values from buffer to channel
1358        let spawn_result = runtime.spawn(async move {
1359            #[cfg(feature = "tracing")]
1360            tracing::debug!(
1361                "Subscription consumer task started for {}",
1362                record_metadata.name
1363            );
1364
1365            // Main event loop: read from buffer and forward to channel
1366            loop {
1367                tokio::select! {
1368                    // Handle cancellation signal
1369                    _ = &mut cancel_rx => {
1370                        #[cfg(feature = "tracing")]
1371                        tracing::debug!("Subscription cancelled");
1372                        break;
1373                    }
1374                    // Read next JSON value from buffer
1375                    result = json_reader.recv_json() => {
1376                        match result {
1377                            Ok(json_val) => {
1378                                // Send JSON value to subscription channel
1379                                if value_tx.send(json_val).await.is_err() {
1380                                    #[cfg(feature = "tracing")]
1381                                    tracing::debug!("Subscription receiver dropped");
1382                                    break;
1383                                }
1384                            }
1385                            Err(DbError::BufferLagged { lag_count, .. }) => {
1386                                // Consumer fell behind - log warning but continue
1387                                #[cfg(feature = "tracing")]
1388                                tracing::warn!(
1389                                    "Subscription for {} lagged by {} messages",
1390                                    record_metadata.name,
1391                                    lag_count
1392                                );
1393                                // Continue reading - next recv will get latest
1394                            }
1395                            Err(DbError::BufferClosed { .. }) => {
1396                                // Buffer closed (shutdown) - exit gracefully
1397                                #[cfg(feature = "tracing")]
1398                                tracing::debug!("Buffer closed for {}", record_metadata.name);
1399                                break;
1400                            }
1401                            Err(e) => {
1402                                // Other error (shouldn't happen in practice)
1403                                #[cfg(feature = "tracing")]
1404                                tracing::error!(
1405                                    "Subscription error for {}: {:?}",
1406                                    record_metadata.name,
1407                                    e
1408                                );
1409                                break;
1410                            }
1411                        }
1412                    }
1413                }
1414            }
1415
1416            #[cfg(feature = "tracing")]
1417            tracing::debug!("Subscription consumer task terminated");
1418        });
1419
1420        spawn_result.map_err(DbError::from)?;
1421
1422        Ok((value_rx, cancel_tx))
1423    }
1424
1425    /// Collects inbound connector routes for automatic router construction (std only)
1426    ///
1427    /// Iterates all records, filters their inbound_connectors by scheme,
1428    /// and returns routes with producer creation callbacks.
1429    ///
1430    /// # Arguments
1431    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1432    ///
1433    /// # Returns
1434    /// Vector of tuples: (topic, producer_trait, deserializer)
1435    ///
1436    /// The topic is resolved dynamically if a `TopicResolverFn` is configured,
1437    /// otherwise the static topic from the URL is used.
1438    ///
1439    /// # Example
1440    /// ```rust,ignore
1441    /// // In MqttConnector after db.build()
1442    /// let routes = db.collect_inbound_routes("mqtt");
1443    /// let router = RouterBuilder::from_routes(routes).build();
1444    /// connector.set_router(router).await?;
1445    /// ```
1446    #[cfg(feature = "alloc")]
1447    pub fn collect_inbound_routes(
1448        &self,
1449        scheme: &str,
1450    ) -> Vec<(
1451        String,
1452        Box<dyn crate::connector::ProducerTrait>,
1453        crate::connector::DeserializerFn,
1454    )> {
1455        let mut routes = Vec::new();
1456
1457        // Convert self to Arc<dyn Any> for producer factory
1458        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1459
1460        for record in &self.inner.storages {
1461            let inbound_links = record.inbound_connectors();
1462
1463            for link in inbound_links {
1464                // Filter by scheme
1465                if link.url.scheme() != scheme {
1466                    continue;
1467                }
1468
1469                // Resolve topic: dynamic (from resolver) or static (from URL)
1470                let topic = link.resolve_topic();
1471
1472                // Create producer using the stored factory
1473                if let Some(producer) = link.create_producer(db_any.clone()) {
1474                    routes.push((topic, producer, link.deserializer.clone()));
1475                }
1476            }
1477        }
1478
1479        #[cfg(feature = "tracing")]
1480        if !routes.is_empty() {
1481            tracing::debug!(
1482                "Collected {} inbound routes for scheme '{}'",
1483                routes.len(),
1484                scheme
1485            );
1486        }
1487
1488        routes
1489    }
1490
1491    /// Collects outbound routes for a specific protocol scheme
1492    ///
1493    /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1494    /// filters their outbound_connectors by scheme, and returns routes with
1495    /// consumer creation callbacks.
1496    ///
1497    /// This method is called by connectors during their `build()` phase to
1498    /// collect all configured outbound routes and spawn publisher tasks.
1499    ///
1500    /// # Arguments
1501    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1502    ///
1503    /// # Returns
1504    /// Vector of tuples: (destination, consumer_trait, serializer, config)
1505    ///
1506    /// The config Vec contains protocol-specific options (e.g., qos, retain).
1507    ///
1508    /// # Example
1509    /// ```rust,ignore
1510    /// // In MqttConnector::build()
1511    /// let routes = db.collect_outbound_routes("mqtt");
1512    /// for (topic, consumer, serializer, config) in routes {
1513    ///     connector.spawn_publisher(topic, consumer, serializer, config)?;
1514    /// }
1515    /// ```
1516    #[cfg(feature = "alloc")]
1517    pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1518        let mut routes = Vec::new();
1519
1520        // Convert self to Arc<dyn Any> for consumer factory
1521        // This is necessary because the factory takes Arc<dyn Any> to avoid
1522        // needing to know the runtime type R at the factory definition site
1523        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1524
1525        for record in &self.inner.storages {
1526            let outbound_links = record.outbound_connectors();
1527
1528            for link in outbound_links {
1529                // Filter by scheme
1530                if link.url.scheme() != scheme {
1531                    continue;
1532                }
1533
1534                let destination = link.url.resource_id();
1535
1536                // Skip links without serializer
1537                let Some(serializer) = link.serializer.clone() else {
1538                    #[cfg(feature = "tracing")]
1539                    tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1540                    continue;
1541                };
1542
1543                // Create consumer using the stored factory
1544                if let Some(consumer) = link.create_consumer(db_any.clone()) {
1545                    routes.push((
1546                        destination,
1547                        consumer,
1548                        serializer,
1549                        link.config.clone(),
1550                        link.topic_provider.clone(),
1551                    ));
1552                }
1553            }
1554        }
1555
1556        #[cfg(feature = "tracing")]
1557        if !routes.is_empty() {
1558            tracing::debug!(
1559                "Collected {} outbound routes for scheme '{}'",
1560                routes.len(),
1561                scheme
1562            );
1563        }
1564
1565        routes
1566    }
1567}