aimdb_core/
builder.rs

1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::vec::Vec;
13use hashbrown::HashMap;
14
15#[cfg(not(feature = "std"))]
16use alloc::{boxed::Box, sync::Arc};
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19use alloc::string::String;
20
21#[cfg(feature = "std")]
22use std::{boxed::Box, sync::Arc};
23
24use crate::record_id::{RecordId, RecordKey};
25use crate::typed_api::{RecordRegistrar, RecordT};
26use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
27use crate::{DbError, DbResult};
28
29/// Type alias for outbound route tuples returned by `collect_outbound_routes`
30///
31/// Each tuple contains:
32/// - `String` - Topic/key from the URL path
33/// - `Box<dyn ConsumerTrait>` - Callback to create a consumer for this record
34/// - `SerializerFn` - User-provided serializer for the record type
35/// - `Vec<(String, String)>` - Configuration options from the URL query
36#[cfg(feature = "alloc")]
37type OutboundRoute = (
38    String,
39    Box<dyn crate::connector::ConsumerTrait>,
40    crate::connector::SerializerFn,
41    Vec<(String, String)>,
42);
43
44/// Marker type for untyped builder (before runtime is set)
45pub struct NoRuntime;
46
47/// Internal database state
48///
49/// Holds the registry of typed records with multiple index structures for
50/// efficient access patterns:
51///
52/// - **`storages`**: Vec for O(1) hot-path access by RecordId
53/// - **`by_key`**: HashMap for O(1) lookup by stable RecordKey
54/// - **`by_type`**: HashMap for introspection (find all records of type T)
55/// - **`types`**: Vec for runtime type validation during downcasts
56pub struct AimDbInner {
57    /// Record storage (hot path - indexed by RecordId)
58    ///
59    /// Order matches registration order. Immutable after build().
60    storages: Vec<Box<dyn AnyRecord>>,
61
62    /// Name → RecordId lookup (control plane)
63    ///
64    /// Used by remote access, CLI, MCP for O(1) name resolution.
65    by_key: HashMap<RecordKey, RecordId>,
66
67    /// TypeId → RecordIds lookup (introspection)
68    ///
69    /// Enables "find all Temperature records" queries.
70    by_type: HashMap<TypeId, Vec<RecordId>>,
71
72    /// RecordId → TypeId lookup (type safety assertions)
73    ///
74    /// Used to validate downcasts at runtime.
75    types: Vec<TypeId>,
76
77    /// RecordId → RecordKey lookup (reverse mapping)
78    ///
79    /// Used to get the key for a given record ID.
80    keys: Vec<RecordKey>,
81}
82
83impl AimDbInner {
84    /// Resolve RecordKey to RecordId (control plane - O(1) average)
85    #[inline]
86    pub fn resolve(&self, key: &RecordKey) -> Option<RecordId> {
87        self.by_key.get(key.as_str()).copied()
88    }
89
90    /// Resolve string to RecordId (convenience for remote access)
91    ///
92    /// O(1) average thanks to `Borrow<str>` implementation on `RecordKey`.
93    #[inline]
94    pub fn resolve_str(&self, name: &str) -> Option<RecordId> {
95        self.by_key.get(name).copied()
96    }
97
98    /// Get storage by RecordId (hot path - O(1))
99    #[inline]
100    pub fn storage(&self, id: RecordId) -> Option<&dyn AnyRecord> {
101        self.storages.get(id.index()).map(|b| b.as_ref())
102    }
103
104    /// Get the RecordKey for a given RecordId
105    #[inline]
106    pub fn key_for(&self, id: RecordId) -> Option<&RecordKey> {
107        self.keys.get(id.index())
108    }
109
110    /// Get all RecordIds for a type (introspection)
111    pub fn records_of_type<T: 'static>(&self) -> &[RecordId] {
112        self.by_type
113            .get(&TypeId::of::<T>())
114            .map(|v| v.as_slice())
115            .unwrap_or(&[])
116    }
117
118    /// Get the number of registered records
119    #[inline]
120    pub fn record_count(&self) -> usize {
121        self.storages.len()
122    }
123
124    /// Helper to get a typed record by RecordKey
125    ///
126    /// This encapsulates the common pattern of:
127    /// 1. Resolving key to RecordId
128    /// 2. Validating TypeId matches
129    /// 3. Downcasting to the typed record
130    pub fn get_typed_record_by_key<T, R>(
131        &self,
132        key: impl AsRef<str>,
133    ) -> DbResult<&TypedRecord<T, R>>
134    where
135        T: Send + 'static + Debug + Clone,
136        R: aimdb_executor::Spawn + 'static,
137    {
138        let key_str = key.as_ref();
139
140        // Resolve key to RecordId
141        let id = self.resolve_str(key_str).ok_or({
142            #[cfg(feature = "std")]
143            {
144                DbError::RecordKeyNotFound {
145                    key: key_str.to_string(),
146                }
147            }
148            #[cfg(not(feature = "std"))]
149            {
150                DbError::RecordKeyNotFound { _key: () }
151            }
152        })?;
153
154        self.get_typed_record_by_id::<T, R>(id)
155    }
156
157    /// Helper to get a typed record by RecordId with type validation
158    pub fn get_typed_record_by_id<T, R>(&self, id: RecordId) -> DbResult<&TypedRecord<T, R>>
159    where
160        T: Send + 'static + Debug + Clone,
161        R: aimdb_executor::Spawn + 'static,
162    {
163        use crate::typed_record::AnyRecordExt;
164
165        // Validate RecordId is in bounds
166        if id.index() >= self.storages.len() {
167            return Err(DbError::InvalidRecordId { id: id.raw() });
168        }
169
170        // Validate TypeId matches
171        let expected = TypeId::of::<T>();
172        let actual = self.types[id.index()];
173        if expected != actual {
174            #[cfg(feature = "std")]
175            return Err(DbError::TypeMismatch {
176                record_id: id.raw(),
177                expected_type: core::any::type_name::<T>().to_string(),
178            });
179            #[cfg(not(feature = "std"))]
180            return Err(DbError::TypeMismatch {
181                record_id: id.raw(),
182                _expected_type: (),
183            });
184        }
185
186        // Safe to downcast (type validated above)
187        let record = &self.storages[id.index()];
188
189        #[cfg(feature = "std")]
190        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
191            operation: "get_typed_record_by_id".to_string(),
192            reason: "type mismatch during downcast".to_string(),
193        })?;
194
195        #[cfg(not(feature = "std"))]
196        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
197            _operation: (),
198            _reason: (),
199        })?;
200
201        Ok(typed_record)
202    }
203
204    /// Helper to get a typed record from the registry (legacy API)
205    ///
206    /// **Deprecated**: Use `get_typed_record_by_key()` instead.
207    ///
208    /// This method only works when exactly one record of type T exists.
209    /// Returns `AmbiguousType` error if multiple records of the same type exist.
210    pub fn get_typed_record<T, R>(&self) -> DbResult<&TypedRecord<T, R>>
211    where
212        T: Send + 'static + Debug + Clone,
213        R: aimdb_executor::Spawn + 'static,
214    {
215        let type_id = TypeId::of::<T>();
216        let ids = self.by_type.get(&type_id);
217
218        match ids.map(|v| v.as_slice()) {
219            None | Some([]) => {
220                #[cfg(feature = "std")]
221                return Err(DbError::RecordNotFound {
222                    record_name: core::any::type_name::<T>().to_string(),
223                });
224                #[cfg(not(feature = "std"))]
225                return Err(DbError::RecordNotFound { _record_name: () });
226            }
227            Some([single_id]) => self.get_typed_record_by_id::<T, R>(*single_id),
228            Some(multiple) => {
229                #[cfg(feature = "std")]
230                return Err(DbError::AmbiguousType {
231                    count: multiple.len() as u32,
232                    type_name: core::any::type_name::<T>().to_string(),
233                });
234                #[cfg(not(feature = "std"))]
235                return Err(DbError::AmbiguousType {
236                    count: multiple.len() as u32,
237                    _type_name: (),
238                });
239            }
240        }
241    }
242
243    /// Collects metadata for all registered records (std only)
244    ///
245    /// Returns a vector of `RecordMetadata` for remote access introspection.
246    /// Available only when the `std` feature is enabled.
247    #[cfg(feature = "std")]
248    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
249        self.storages
250            .iter()
251            .enumerate()
252            .map(|(i, record)| {
253                let id = RecordId::new(i as u32);
254                let type_id = self.types[i];
255                let key = &self.keys[i];
256                record.collect_metadata(type_id, key.clone(), id)
257            })
258            .collect()
259    }
260
261    /// Try to get record's latest value as JSON by record key (std only)
262    ///
263    /// O(1) lookup using the key-based index.
264    ///
265    /// # Arguments
266    /// * `record_key` - The record key (e.g., "sensors.temperature")
267    ///
268    /// # Returns
269    /// `Some(JsonValue)` with the current record value, or `None`
270    #[cfg(feature = "std")]
271    pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
272        let id = self.resolve_str(record_key)?;
273        self.storages.get(id.index())?.latest_json()
274    }
275
276    /// Sets a record value from JSON (remote access API)
277    ///
278    /// Deserializes the JSON value and writes it to the record's buffer.
279    ///
280    /// **SAFETY:** Enforces the "No Producer Override" rule:
281    /// - Only works for records with `producer_count == 0`
282    /// - Returns error if the record has active producers
283    ///
284    /// # Arguments
285    /// * `record_key` - The record key (e.g., "config.app")
286    /// * `json_value` - JSON representation of the value
287    ///
288    /// # Returns
289    /// - `Ok(())` - Successfully set the value
290    /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
291    #[cfg(feature = "std")]
292    pub fn set_record_from_json(
293        &self,
294        record_key: &str,
295        json_value: serde_json::Value,
296    ) -> DbResult<()> {
297        let id = self
298            .resolve_str(record_key)
299            .ok_or_else(|| DbError::RecordKeyNotFound {
300                key: record_key.to_string(),
301            })?;
302
303        self.storages[id.index()].set_from_json(json_value)
304    }
305}
306
307/// Database builder for producer-consumer pattern
308///
309/// Provides a fluent API for constructing databases with type-safe record registration.
310/// Use `.runtime()` to set the runtime and transition to a typed builder.
311pub struct AimDbBuilder<R = NoRuntime> {
312    /// Registered records with their keys (order matters for RecordId assignment)
313    records: Vec<(RecordKey, TypeId, Box<dyn AnyRecord>)>,
314
315    /// Runtime adapter
316    runtime: Option<Arc<R>>,
317
318    /// Connector builders that will be invoked during build()
319    connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
320
321    /// Spawn functions with their keys
322    spawn_fns: Vec<(RecordKey, Box<dyn core::any::Any + Send>)>,
323
324    /// Remote access configuration (std only)
325    #[cfg(feature = "std")]
326    remote_config: Option<crate::remote::AimxConfig>,
327
328    /// PhantomData to track the runtime type parameter
329    _phantom: PhantomData<R>,
330}
331
332impl AimDbBuilder<NoRuntime> {
333    /// Creates a new database builder without a runtime
334    ///
335    /// Call `.runtime()` to set the runtime adapter.
336    pub fn new() -> Self {
337        Self {
338            records: Vec::new(),
339            runtime: None,
340            connector_builders: Vec::new(),
341            spawn_fns: Vec::new(),
342            #[cfg(feature = "std")]
343            remote_config: None,
344            _phantom: PhantomData,
345        }
346    }
347
348    /// Sets the runtime adapter
349    ///
350    /// This transitions the builder from untyped to typed with concrete runtime `R`.
351    ///
352    /// # Type Safety Note
353    ///
354    /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
355    /// transition because connectors are parameterized by the runtime type:
356    ///
357    /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
358    /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
359    ///
360    /// These types are incompatible and cannot be transferred. However, this is not a bug
361    /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
362    /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
363    ///
364    /// This means the type system **enforces** the correct call order:
365    /// ```rust,ignore
366    /// AimDbBuilder::new()
367    ///     .runtime(runtime)           // ← Must be called first
368    ///     .with_connector(connector)  // ← Now available
369    /// ```
370    ///
371    /// The `records` and `remote_config` are preserved across the transition since they
372    /// are not parameterized by the runtime type.
373    pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
374    where
375        R: aimdb_executor::Spawn + 'static,
376    {
377        AimDbBuilder {
378            records: self.records,
379            runtime: Some(rt),
380            connector_builders: Vec::new(),
381            spawn_fns: Vec::new(),
382            #[cfg(feature = "std")]
383            remote_config: self.remote_config,
384            _phantom: PhantomData,
385        }
386    }
387}
388
389impl<R> AimDbBuilder<R>
390where
391    R: aimdb_executor::Spawn + 'static,
392{
393    /// Registers a connector builder that will be invoked during `build()`
394    ///
395    /// The connector builder will be called after the database is constructed,
396    /// allowing it to collect routes and initialize the connector properly.
397    ///
398    /// # Arguments
399    /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
400    ///
401    /// # Example
402    ///
403    /// ```rust,ignore
404    /// use aimdb_mqtt_connector::MqttConnector;
405    ///
406    /// let db = AimDbBuilder::new()
407    ///     .runtime(runtime)
408    ///     .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
409    ///     .configure::<Temperature>(|reg| {
410    ///         reg.link_from("mqtt://commands/temp")...
411    ///     })
412    ///     .build().await?;
413    /// ```
414    pub fn with_connector(
415        mut self,
416        builder: impl crate::connector::ConnectorBuilder<R> + 'static,
417    ) -> Self {
418        self.connector_builders.push(Box::new(builder));
419        self
420    }
421
422    /// Enables remote access via AimX protocol (std only)
423    ///
424    /// Configures the database to accept remote connections over a Unix domain socket,
425    /// allowing external clients to introspect records, subscribe to updates, and
426    /// (optionally) write data.
427    ///
428    /// The remote access supervisor will be spawned automatically during `build()`.
429    ///
430    /// # Arguments
431    /// * `config` - Remote access configuration (socket path, security policy, etc.)
432    ///
433    /// # Example
434    ///
435    /// ```rust,ignore
436    /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
437    ///
438    /// let config = AimxConfig::new("/tmp/aimdb.sock")
439    ///     .with_security(SecurityPolicy::read_only());
440    ///
441    /// let db = AimDbBuilder::new()
442    ///     .runtime(runtime)
443    ///     .with_remote_access(config)
444    ///     .build()?;
445    /// ```
446    #[cfg(feature = "std")]
447    pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
448        self.remote_config = Some(config);
449        self
450    }
451
452    /// Configures a record type manually with a unique key
453    ///
454    /// The key uniquely identifies this record instance. Multiple records of the same
455    /// type can exist with different keys (e.g., "sensor.temperature.room1" and
456    /// "sensor.temperature.room2").
457    ///
458    /// # Arguments
459    /// * `key` - A unique string identifier for this record (e.g., "sensor.temperature.room1")
460    /// * `f` - Configuration closure
461    ///
462    /// # Example
463    /// ```rust,ignore
464    /// builder.configure::<Temperature>("sensor.temp.room1", |reg| {
465    ///     reg.with_buffer(BufferCfg::SingleLatest)
466    ///        .with_serialization();
467    /// });
468    /// ```
469    pub fn configure<T>(
470        &mut self,
471        key: impl Into<RecordKey>,
472        f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
473    ) -> &mut Self
474    where
475        T: Send + Sync + 'static + Debug + Clone,
476    {
477        let record_key: RecordKey = key.into();
478        let type_id = TypeId::of::<T>();
479
480        // Find existing record with this key, or create new one
481        let record_index = self.records.iter().position(|(k, _, _)| k == &record_key);
482
483        let (rec, is_new_record) = match record_index {
484            Some(idx) => {
485                // Use existing record
486                let (_, existing_type, record) = &mut self.records[idx];
487                assert!(
488                    *existing_type == type_id,
489                    "RecordKey '{}' already registered with different type",
490                    record_key.as_str()
491                );
492                (
493                    record
494                        .as_typed_mut::<T, R>()
495                        .expect("type mismatch in record registry"),
496                    false,
497                )
498            }
499            None => {
500                // Create new record
501                self.records.push((
502                    record_key.clone(),
503                    type_id,
504                    Box::new(TypedRecord::<T, R>::new()),
505                ));
506                let (_, _, record) = self.records.last_mut().unwrap();
507                (
508                    record
509                        .as_typed_mut::<T, R>()
510                        .expect("type mismatch in record registry"),
511                    true,
512                )
513            }
514        };
515
516        let mut reg = RecordRegistrar {
517            rec,
518            connector_builders: &self.connector_builders,
519        };
520        f(&mut reg);
521
522        // Only store spawn function for new records to avoid duplicates
523        if is_new_record {
524            let spawn_key = record_key.clone();
525
526            #[allow(clippy::type_complexity)]
527            let spawn_fn: Box<
528                dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send,
529            > = Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>, id: RecordId| {
530                // Use RecordSpawner to spawn tasks for this record type
531                use crate::typed_record::RecordSpawner;
532
533                let typed_record = db.inner().get_typed_record_by_id::<T, R>(id)?;
534                RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db)
535            });
536
537            // Store the spawn function (type-erased in Box<dyn Any>)
538            self.spawn_fns.push((spawn_key, Box::new(spawn_fn)));
539        }
540
541        self
542    }
543
544    /// Registers a self-registering record type
545    ///
546    /// The record type must implement `RecordT<R>`.
547    ///
548    /// Uses the type name as the default key. For custom keys, use `configure()` directly.
549    pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
550    where
551        T: RecordT<R>,
552    {
553        // Default key is the full type name for backward compatibility
554        let key = RecordKey::new(core::any::type_name::<T>());
555        self.configure::<T>(key, |reg| T::register(reg, cfg))
556    }
557
558    /// Registers a self-registering record type with a custom key
559    ///
560    /// The record type must implement `RecordT<R>`.
561    pub fn register_record_with_key<T>(
562        &mut self,
563        key: impl Into<RecordKey>,
564        cfg: &T::Config,
565    ) -> &mut Self
566    where
567        T: RecordT<R>,
568    {
569        self.configure::<T>(key, |reg| T::register(reg, cfg))
570    }
571
572    /// Runs the database indefinitely (never returns)
573    ///
574    /// This method builds the database, spawns all producer and consumer tasks, and then
575    /// parks the current task indefinitely. This is the primary way to run AimDB services.
576    ///
577    /// All logic runs in background tasks via producers, consumers, and connectors. The
578    /// application continues until interrupted (e.g., Ctrl+C).
579    ///
580    /// # Returns
581    /// `DbResult<()>` - Ok when database starts successfully, then parks forever
582    ///
583    /// # Example
584    ///
585    /// ```rust,ignore
586    /// #[tokio::main]
587    /// async fn main() -> DbResult<()> {
588    ///     AimDbBuilder::new()
589    ///         .runtime(Arc::new(TokioAdapter::new()?))
590    ///         .configure::<MyData>(|reg| {
591    ///             reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
592    ///                .with_source(my_producer)
593    ///                .with_tap(my_consumer);
594    ///         })
595    ///         .run().await  // Runs forever
596    /// }
597    /// ```
598    pub async fn run(self) -> DbResult<()> {
599        #[cfg(feature = "tracing")]
600        tracing::info!("Building database and spawning background tasks...");
601
602        let _db = self.build().await?;
603
604        #[cfg(feature = "tracing")]
605        tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
606
607        // Park indefinitely - the background tasks will continue running
608        // The database handle is kept alive here to prevent dropping it
609        core::future::pending::<()>().await;
610
611        Ok(())
612    }
613
614    /// Builds the database and returns the handle (async)
615    ///
616    /// Use this when you need programmatic access to the database handle for
617    /// manual subscriptions or production. For typical services, use `.run().await` instead.
618    ///
619    /// **Automatic Task Spawning:** This method spawns all producer services and
620    /// `.tap()` observer tasks that were registered during configuration.
621    ///
622    /// **Connector Setup:** Connectors must be created manually before calling `build()`:
623    ///
624    /// ```rust,ignore
625    /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
626    ///
627    /// // Configure records with connector links
628    /// let builder = AimDbBuilder::new()
629    ///     .runtime(runtime)
630    ///     .configure::<Temp>("temp", |reg| {
631    ///         reg.link_from("mqtt://commands/temp")
632    ///            .with_buffer(BufferCfg::SingleLatest)
633    ///            .with_serialization();
634    ///     });
635    ///
636    /// // Create MQTT connector with router
637    /// let router = RouterBuilder::new()
638    ///     .route("commands/temp", /* deserializer */)
639    ///     .build();
640    /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
641    ///
642    /// // Register connector and build
643    /// let db = builder
644    ///     .with_connector("mqtt", Arc::new(connector))
645    ///     .build().await?;
646    /// ```
647    ///
648    /// # Returns
649    /// `DbResult<AimDb<R>>` - The database instance
650    #[cfg_attr(not(feature = "std"), allow(unused_mut))]
651    pub async fn build(self) -> DbResult<AimDb<R>> {
652        use crate::DbError;
653
654        // Validate all records
655        for (key, _, record) in &self.records {
656            record.validate().map_err(|_msg| {
657                // Suppress unused warning for key in no_std
658                let _ = &key;
659                #[cfg(feature = "std")]
660                {
661                    DbError::RuntimeError {
662                        message: format!("Record '{}' validation failed: {}", key.as_str(), _msg),
663                    }
664                }
665                #[cfg(not(feature = "std"))]
666                {
667                    DbError::RuntimeError { _message: () }
668                }
669            })?;
670        }
671
672        // Ensure runtime is set
673        let runtime = self.runtime.ok_or({
674            #[cfg(feature = "std")]
675            {
676                DbError::RuntimeError {
677                    message: "runtime not set (use .runtime())".into(),
678                }
679            }
680            #[cfg(not(feature = "std"))]
681            {
682                DbError::RuntimeError { _message: () }
683            }
684        })?;
685
686        // Build the new index structures
687        let record_count = self.records.len();
688        let mut storages: Vec<Box<dyn AnyRecord>> = Vec::with_capacity(record_count);
689        let mut by_key: HashMap<RecordKey, RecordId> = HashMap::with_capacity(record_count);
690        let mut by_type: HashMap<TypeId, Vec<RecordId>> = HashMap::new();
691        let mut types: Vec<TypeId> = Vec::with_capacity(record_count);
692        let mut keys: Vec<RecordKey> = Vec::with_capacity(record_count);
693
694        for (i, (key, type_id, record)) in self.records.into_iter().enumerate() {
695            let id = RecordId::new(i as u32);
696
697            // Check for duplicate keys (should not happen if configure() is used correctly)
698            if by_key.contains_key(&key) {
699                #[cfg(feature = "std")]
700                return Err(DbError::DuplicateRecordKey {
701                    key: key.as_str().to_string(),
702                });
703                #[cfg(not(feature = "std"))]
704                return Err(DbError::DuplicateRecordKey { _key: () });
705            }
706
707            // Build index structures
708            storages.push(record);
709            by_key.insert(key.clone(), id);
710            by_type.entry(type_id).or_default().push(id);
711            types.push(type_id);
712            keys.push(key);
713        }
714
715        let inner = Arc::new(AimDbInner {
716            storages,
717            by_key,
718            by_type,
719            types,
720            keys,
721        });
722
723        let db = Arc::new(AimDb {
724            inner: inner.clone(),
725            runtime: runtime.clone(),
726        });
727
728        #[cfg(feature = "tracing")]
729        tracing::info!(
730            "Spawning producer services and tap observers for {} records",
731            self.spawn_fns.len()
732        );
733
734        // Execute spawn functions for each record
735        for (key, spawn_fn_any) in self.spawn_fns {
736            // Resolve key to RecordId
737            let id = inner.resolve(&key).ok_or({
738                #[cfg(feature = "std")]
739                {
740                    DbError::RecordKeyNotFound {
741                        key: key.as_str().to_string(),
742                    }
743                }
744                #[cfg(not(feature = "std"))]
745                {
746                    DbError::RecordKeyNotFound { _key: () }
747                }
748            })?;
749
750            // Downcast from Box<dyn Any> back to the concrete spawn function type
751            type SpawnFnType<R> =
752                Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send>;
753
754            let spawn_fn = spawn_fn_any
755                .downcast::<SpawnFnType<R>>()
756                .expect("spawn function type mismatch");
757
758            // Execute the spawn function
759            (*spawn_fn)(&runtime, &db, id)?;
760        }
761
762        #[cfg(feature = "tracing")]
763        tracing::info!("Automatic spawning complete");
764
765        // Spawn remote access supervisor if configured (std only)
766        #[cfg(feature = "std")]
767        if let Some(remote_cfg) = self.remote_config {
768            #[cfg(feature = "tracing")]
769            tracing::info!(
770                "Spawning remote access supervisor on socket: {}",
771                remote_cfg.socket_path.display()
772            );
773
774            // Apply security policy to mark writable records
775            let writable_keys = remote_cfg.security_policy.writable_records();
776            for key_str in writable_keys {
777                if let Some(id) = inner.resolve_str(&key_str) {
778                    #[cfg(feature = "tracing")]
779                    tracing::debug!("Marking record '{}' as writable", key_str);
780
781                    // Mark the record as writable (type-erased call)
782                    inner.storages[id.index()].set_writable_erased(true);
783                }
784            }
785
786            // Spawn the remote supervisor task
787            crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
788
789            #[cfg(feature = "tracing")]
790            tracing::info!("Remote access supervisor spawned successfully");
791        }
792
793        // Build connectors from builders (after database is fully constructed)
794        // This allows connectors to use collect_inbound_routes() which creates
795        // producers tied to this specific database instance
796        for builder in self.connector_builders {
797            #[cfg(feature = "tracing")]
798            let scheme = {
799                #[cfg(feature = "std")]
800                {
801                    builder.scheme().to_string()
802                }
803                #[cfg(not(feature = "std"))]
804                {
805                    alloc::string::String::from(builder.scheme())
806                }
807            };
808
809            #[cfg(feature = "tracing")]
810            tracing::debug!("Building connector for scheme: {}", scheme);
811
812            // Build the connector (this spawns tasks as a side effect)
813            let _connector = builder.build(&db).await?;
814
815            #[cfg(feature = "tracing")]
816            tracing::info!("Connector built and spawned successfully: {}", scheme);
817        }
818
819        // Unwrap the Arc to return the owned AimDb
820        // This is safe because we just created it and hold the only reference
821        let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
822
823        Ok(db_owned)
824    }
825}
826
827impl Default for AimDbBuilder<NoRuntime> {
828    fn default() -> Self {
829        Self::new()
830    }
831}
832
833/// Producer-consumer database
834///
835/// A database instance with type-safe record registration and cross-record
836/// communication via the Emitter pattern. The type parameter `R` represents
837/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
838///
839/// See `examples/` for usage.
840///
841/// # Examples
842///
843/// ```rust,ignore
844/// use aimdb_tokio_adapter::TokioAdapter;
845///
846/// let runtime = Arc::new(TokioAdapter);
847/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
848///     .runtime(runtime)
849///     .register_record::<Temperature>(&TemperatureConfig)
850///     .build()?;
851/// ```
852pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
853    /// Internal state
854    inner: Arc<AimDbInner>,
855
856    /// Runtime adapter with concrete type
857    runtime: Arc<R>,
858}
859
860impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
861    fn clone(&self) -> Self {
862        Self {
863            inner: self.inner.clone(),
864            runtime: self.runtime.clone(),
865        }
866    }
867}
868
869impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
870    /// Internal accessor for the inner state
871    ///
872    /// Used by adapter crates and internal spawning logic.
873    #[doc(hidden)]
874    pub fn inner(&self) -> &Arc<AimDbInner> {
875        &self.inner
876    }
877
878    /// Builds a database with a closure-based builder pattern
879    pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
880        let mut b = AimDbBuilder::new().runtime(rt);
881        f(&mut b);
882        b.run().await
883    }
884
885    /// Spawns a task using the database's runtime adapter
886    ///
887    /// This method provides direct access to the runtime's spawn capability.
888    ///
889    /// # Arguments
890    /// * `future` - The future to spawn
891    ///
892    /// # Returns
893    /// `DbResult<()>` - Ok if the task was spawned successfully
894    pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
895    where
896        F: core::future::Future<Output = ()> + Send + 'static,
897    {
898        self.runtime.spawn(future).map_err(DbError::from)?;
899        Ok(())
900    }
901
902    /// Produces a value for a record type
903    ///
904    /// Writes the value to the record's buffer and triggers all consumers.
905    pub async fn produce<T>(&self, value: T) -> DbResult<()>
906    where
907        T: Send + 'static + Debug + Clone,
908    {
909        // Get the typed record using the helper
910        let typed_rec = self.inner.get_typed_record::<T, R>()?;
911
912        // Produce the value directly to the buffer
913        typed_rec.produce(value).await;
914        Ok(())
915    }
916
917    /// Subscribes to a record type's buffer
918    ///
919    /// Creates a subscription to the configured buffer for the given record type.
920    /// Returns a boxed reader for receiving values asynchronously.
921    ///
922    /// # Example
923    ///
924    /// ```rust,ignore
925    /// let mut reader = db.subscribe::<Temperature>()?;
926    ///
927    /// loop {
928    ///     match reader.recv().await {
929    ///         Ok(temp) => println!("Temperature: {:.1}°C", temp.celsius),
930    ///         Err(_) => break,
931    ///     }
932    /// }
933    /// ```
934    pub fn subscribe<T>(&self) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
935    where
936        T: Send + Sync + 'static + Debug + Clone,
937    {
938        // Get the typed record using the helper
939        let typed_rec = self.inner.get_typed_record::<T, R>()?;
940
941        // Subscribe to the buffer
942        typed_rec.subscribe()
943    }
944
945    /// Creates a type-safe producer for a specific record type
946    ///
947    /// Returns a `Producer<T, R>` that can only produce values of type `T`.
948    /// This is the recommended way to pass database access to producer services,
949    /// following the principle of least privilege.
950    ///
951    /// # Example
952    ///
953    /// ```rust,ignore
954    /// let db = builder.build()?;
955    /// let temp_producer = db.producer::<Temperature>();
956    ///
957    /// // Pass to service - it can only produce Temperature values
958    /// runtime.spawn(temperature_service(ctx, temp_producer)).unwrap();
959    /// ```
960    pub fn producer<T>(&self) -> crate::typed_api::Producer<T, R>
961    where
962        T: Send + 'static + Debug + Clone,
963    {
964        crate::typed_api::Producer::new(Arc::new(self.clone()))
965    }
966
967    /// Creates a type-safe consumer for a specific record type
968    ///
969    /// Returns a `Consumer<T, R>` that can only subscribe to values of type `T`.
970    /// This is the recommended way to pass database access to consumer services,
971    /// following the principle of least privilege.
972    ///
973    /// # Example
974    ///
975    /// ```rust,ignore
976    /// let db = builder.build()?;
977    /// let temp_consumer = db.consumer::<Temperature>();
978    ///
979    /// // Pass to service - it can only consume Temperature values
980    /// runtime.spawn(temperature_monitor(ctx, temp_consumer)).unwrap();
981    /// ```
982    pub fn consumer<T>(&self) -> crate::typed_api::Consumer<T, R>
983    where
984        T: Send + Sync + 'static + Debug + Clone,
985    {
986        crate::typed_api::Consumer::new(Arc::new(self.clone()))
987    }
988
989    // ========================================================================
990    // Key-based API (recommended for multi-instance records)
991    // ========================================================================
992
993    /// Produces a value to a specific record by key
994    ///
995    /// This is the recommended method when multiple records of the same type exist.
996    /// Uses O(1) key-based lookup to find the correct record.
997    ///
998    /// # Arguments
999    /// * `key` - The record key (e.g., "sensor.temperature")
1000    /// * `value` - The value to produce
1001    ///
1002    /// # Example
1003    ///
1004    /// ```rust,ignore
1005    /// // With multiple Temperature records
1006    /// db.produce_by_key::<Temperature>("sensors.indoor", indoor_temp).await?;
1007    /// db.produce_by_key::<Temperature>("sensors.outdoor", outdoor_temp).await?;
1008    /// ```
1009    pub async fn produce_by_key<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
1010    where
1011        T: Send + 'static + Debug + Clone,
1012    {
1013        let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1014        typed_rec.produce(value).await;
1015        Ok(())
1016    }
1017
1018    /// Subscribes to a specific record by key
1019    ///
1020    /// This is the recommended method when multiple records of the same type exist.
1021    /// Uses O(1) key-based lookup to find the correct record.
1022    ///
1023    /// # Arguments
1024    /// * `key` - The record key (e.g., "sensor.temperature")
1025    ///
1026    /// # Example
1027    ///
1028    /// ```rust,ignore
1029    /// let mut reader = db.subscribe_by_key::<Temperature>("sensors.indoor")?;
1030    /// while let Ok(temp) = reader.recv().await {
1031    ///     println!("Indoor: {:.1}°C", temp.celsius);
1032    /// }
1033    /// ```
1034    pub fn subscribe_by_key<T>(
1035        &self,
1036        key: impl AsRef<str>,
1037    ) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
1038    where
1039        T: Send + Sync + 'static + Debug + Clone,
1040    {
1041        let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1042        typed_rec.subscribe()
1043    }
1044
1045    /// Creates a type-safe producer for a specific record by key
1046    ///
1047    /// Returns a `ProducerByKey<T, R>` bound to a specific record key.
1048    /// Use this when multiple records of the same type exist.
1049    ///
1050    /// # Arguments
1051    /// * `key` - The record key (e.g., "sensor.temperature")
1052    ///
1053    /// # Example
1054    ///
1055    /// ```rust,ignore
1056    /// let indoor_producer = db.producer_by_key::<Temperature>("sensors.indoor");
1057    /// let outdoor_producer = db.producer_by_key::<Temperature>("sensors.outdoor");
1058    ///
1059    /// // Each producer writes to its own record
1060    /// indoor_producer.produce(indoor_temp).await?;
1061    /// outdoor_producer.produce(outdoor_temp).await?;
1062    /// ```
1063    #[cfg(feature = "alloc")]
1064    pub fn producer_by_key<T>(
1065        &self,
1066        key: impl Into<alloc::string::String>,
1067    ) -> crate::typed_api::ProducerByKey<T, R>
1068    where
1069        T: Send + 'static + Debug + Clone,
1070    {
1071        crate::typed_api::ProducerByKey::new(Arc::new(self.clone()), key.into())
1072    }
1073
1074    /// Creates a type-safe consumer for a specific record by key
1075    ///
1076    /// Returns a `ConsumerByKey<T, R>` bound to a specific record key.
1077    /// Use this when multiple records of the same type exist.
1078    ///
1079    /// # Arguments
1080    /// * `key` - The record key (e.g., "sensor.temperature")
1081    ///
1082    /// # Example
1083    ///
1084    /// ```rust,ignore
1085    /// let indoor_consumer = db.consumer_by_key::<Temperature>("sensors.indoor");
1086    /// let outdoor_consumer = db.consumer_by_key::<Temperature>("sensors.outdoor");
1087    ///
1088    /// // Each consumer reads from its own record
1089    /// let mut rx = indoor_consumer.subscribe()?;
1090    /// ```
1091    #[cfg(feature = "alloc")]
1092    pub fn consumer_by_key<T>(
1093        &self,
1094        key: impl Into<alloc::string::String>,
1095    ) -> crate::typed_api::ConsumerByKey<T, R>
1096    where
1097        T: Send + Sync + 'static + Debug + Clone,
1098    {
1099        crate::typed_api::ConsumerByKey::new(Arc::new(self.clone()), key.into())
1100    }
1101
1102    /// Resolve a record key to its RecordId
1103    ///
1104    /// Useful for checking if a record exists before operations.
1105    ///
1106    /// # Example
1107    ///
1108    /// ```rust,ignore
1109    /// if let Some(id) = db.resolve_key("sensors.temperature") {
1110    ///     println!("Record exists with ID: {}", id);
1111    /// }
1112    /// ```
1113    pub fn resolve_key(&self, key: &str) -> Option<crate::record_id::RecordId> {
1114        self.inner.resolve_str(key)
1115    }
1116
1117    /// Get all record IDs for a specific type
1118    ///
1119    /// Returns a slice of RecordIds for all records of type T.
1120    /// Useful for introspection when multiple records of the same type exist.
1121    ///
1122    /// # Example
1123    ///
1124    /// ```rust,ignore
1125    /// let temp_ids = db.records_of_type::<Temperature>();
1126    /// println!("Found {} temperature records", temp_ids.len());
1127    /// ```
1128    pub fn records_of_type<T: 'static>(&self) -> &[crate::record_id::RecordId] {
1129        self.inner.records_of_type::<T>()
1130    }
1131
1132    /// Returns a reference to the runtime adapter
1133    ///
1134    /// Provides direct access to the concrete runtime type.
1135    pub fn runtime(&self) -> &R {
1136        &self.runtime
1137    }
1138
1139    /// Lists all registered records (std only)
1140    ///
1141    /// Returns metadata for all registered records, useful for remote access introspection.
1142    /// Available only when the `std` feature is enabled.
1143    ///
1144    /// # Example
1145    /// ```rust,ignore
1146    /// let records = db.list_records();
1147    /// for record in records {
1148    ///     println!("Record: {} ({})", record.name, record.type_id);
1149    /// }
1150    /// ```
1151    #[cfg(feature = "std")]
1152    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
1153        self.inner.list_records()
1154    }
1155
1156    /// Try to get record's latest value as JSON by name (std only)
1157    ///
1158    /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
1159    ///
1160    /// # Arguments
1161    /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
1162    ///
1163    /// # Returns
1164    /// `Some(JsonValue)` with current value, or `None` if unavailable
1165    #[cfg(feature = "std")]
1166    pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
1167        self.inner.try_latest_as_json(record_name)
1168    }
1169
1170    /// Sets a record value from JSON (remote access API)
1171    ///
1172    /// Deserializes JSON and produces the value to the record's buffer.
1173    ///
1174    /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
1175    /// records without active producers.
1176    ///
1177    /// # Arguments
1178    /// * `record_name` - Full Rust type name
1179    /// * `json_value` - JSON value to set
1180    ///
1181    /// # Returns
1182    /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
1183    ///
1184    /// # Example (internal use)
1185    /// ```rust,ignore
1186    /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
1187    /// ```
1188    #[cfg(feature = "std")]
1189    pub fn set_record_from_json(
1190        &self,
1191        record_name: &str,
1192        json_value: serde_json::Value,
1193    ) -> DbResult<()> {
1194        self.inner.set_record_from_json(record_name, json_value)
1195    }
1196
1197    /// Subscribe to record updates as JSON stream (std only)
1198    ///
1199    /// Creates a subscription to a record's buffer and forwards updates as JSON
1200    /// to a bounded channel. This is used internally by the remote access protocol
1201    /// for implementing `record.subscribe`.
1202    ///
1203    /// # Architecture
1204    ///
1205    /// Spawns a consumer task that:
1206    /// 1. Subscribes to the record's buffer using the existing buffer API
1207    /// 2. Reads values as they arrive
1208    /// 3. Serializes each value to JSON
1209    /// 4. Sends JSON values to a bounded channel (with backpressure handling)
1210    /// 5. Terminates when either:
1211    ///    - The cancel signal is received (unsubscribe)
1212    ///    - The channel receiver is dropped (client disconnected)
1213    ///
1214    /// # Arguments
1215    /// * `record_key` - Key of the record to subscribe to
1216    /// * `queue_size` - Size of the bounded channel for this subscription
1217    ///
1218    /// # Returns
1219    /// `Ok((receiver, cancel_tx))` where:
1220    /// - `receiver`: Bounded channel receiver for JSON values
1221    /// - `cancel_tx`: One-shot sender to cancel the subscription
1222    ///
1223    /// `Err` if:
1224    /// - Record not found for the given key
1225    /// - Record not configured with `.with_serialization()`
1226    /// - Failed to subscribe to buffer
1227    ///
1228    /// # Example (internal use)
1229    ///
1230    /// ```rust,ignore
1231    /// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
1232    ///
1233    /// // Read events
1234    /// while let Some(json_value) = rx.recv().await {
1235    ///     // Forward to client...
1236    /// }
1237    ///
1238    /// // Cancel subscription
1239    /// let _ = cancel_tx.send(());
1240    /// ```
1241    #[cfg(feature = "std")]
1242    #[allow(unused_variables)] // Variables used only in tracing feature
1243    pub fn subscribe_record_updates(
1244        &self,
1245        record_key: &str,
1246        queue_size: usize,
1247    ) -> DbResult<(
1248        tokio::sync::mpsc::Receiver<serde_json::Value>,
1249        tokio::sync::oneshot::Sender<()>,
1250    )> {
1251        use tokio::sync::{mpsc, oneshot};
1252
1253        // Find the record by key
1254        let id = self
1255            .inner
1256            .resolve_str(record_key)
1257            .ok_or_else(|| DbError::RecordKeyNotFound {
1258                key: record_key.to_string(),
1259            })?;
1260
1261        let record = self
1262            .inner
1263            .storage(id)
1264            .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
1265
1266        // Subscribe to the record's buffer as JSON stream
1267        // This will fail if record not configured with .with_serialization()
1268        let mut json_reader = record.subscribe_json()?;
1269
1270        // Create channels for the subscription
1271        let (value_tx, value_rx) = mpsc::channel(queue_size);
1272        let (cancel_tx, mut cancel_rx) = oneshot::channel();
1273
1274        // Get metadata for logging
1275        let type_id = self.inner.types[id.index()];
1276        let key = self.inner.keys[id.index()].clone();
1277        let record_metadata = record.collect_metadata(type_id, key, id);
1278        let runtime = self.runtime.clone();
1279
1280        // Spawn consumer task that forwards JSON values from buffer to channel
1281        let spawn_result = runtime.spawn(async move {
1282            #[cfg(feature = "tracing")]
1283            tracing::debug!(
1284                "Subscription consumer task started for {}",
1285                record_metadata.name
1286            );
1287
1288            // Main event loop: read from buffer and forward to channel
1289            loop {
1290                tokio::select! {
1291                    // Handle cancellation signal
1292                    _ = &mut cancel_rx => {
1293                        #[cfg(feature = "tracing")]
1294                        tracing::debug!("Subscription cancelled");
1295                        break;
1296                    }
1297                    // Read next JSON value from buffer
1298                    result = json_reader.recv_json() => {
1299                        match result {
1300                            Ok(json_val) => {
1301                                // Send JSON value to subscription channel
1302                                if value_tx.send(json_val).await.is_err() {
1303                                    #[cfg(feature = "tracing")]
1304                                    tracing::debug!("Subscription receiver dropped");
1305                                    break;
1306                                }
1307                            }
1308                            Err(DbError::BufferLagged { lag_count, .. }) => {
1309                                // Consumer fell behind - log warning but continue
1310                                #[cfg(feature = "tracing")]
1311                                tracing::warn!(
1312                                    "Subscription for {} lagged by {} messages",
1313                                    record_metadata.name,
1314                                    lag_count
1315                                );
1316                                // Continue reading - next recv will get latest
1317                            }
1318                            Err(DbError::BufferClosed { .. }) => {
1319                                // Buffer closed (shutdown) - exit gracefully
1320                                #[cfg(feature = "tracing")]
1321                                tracing::debug!("Buffer closed for {}", record_metadata.name);
1322                                break;
1323                            }
1324                            Err(e) => {
1325                                // Other error (shouldn't happen in practice)
1326                                #[cfg(feature = "tracing")]
1327                                tracing::error!(
1328                                    "Subscription error for {}: {:?}",
1329                                    record_metadata.name,
1330                                    e
1331                                );
1332                                break;
1333                            }
1334                        }
1335                    }
1336                }
1337            }
1338
1339            #[cfg(feature = "tracing")]
1340            tracing::debug!("Subscription consumer task terminated");
1341        });
1342
1343        spawn_result.map_err(DbError::from)?;
1344
1345        Ok((value_rx, cancel_tx))
1346    }
1347
1348    /// Collects inbound connector routes for automatic router construction (std only)
1349    ///
1350    /// Iterates all records, filters their inbound_connectors by scheme,
1351    /// and returns routes with producer creation callbacks.
1352    ///
1353    /// # Arguments
1354    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1355    ///
1356    /// # Returns
1357    /// Vector of tuples: (topic, producer_trait, deserializer)
1358    ///
1359    /// # Example
1360    /// ```rust,ignore
1361    /// // In MqttConnector after db.build()
1362    /// let routes = db.collect_inbound_routes("mqtt");
1363    /// let router = RouterBuilder::from_routes(routes).build();
1364    /// connector.set_router(router).await?;
1365    /// ```
1366    #[cfg(feature = "alloc")]
1367    pub fn collect_inbound_routes(
1368        &self,
1369        scheme: &str,
1370    ) -> Vec<(
1371        String,
1372        Box<dyn crate::connector::ProducerTrait>,
1373        crate::connector::DeserializerFn,
1374    )> {
1375        let mut routes = Vec::new();
1376
1377        // Convert self to Arc<dyn Any> for producer factory
1378        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1379
1380        for record in &self.inner.storages {
1381            let inbound_links = record.inbound_connectors();
1382
1383            for link in inbound_links {
1384                // Filter by scheme
1385                if link.url.scheme() != scheme {
1386                    continue;
1387                }
1388
1389                let topic = link.url.resource_id();
1390
1391                // Create producer using the stored factory
1392                if let Some(producer) = link.create_producer(db_any.clone()) {
1393                    routes.push((topic, producer, link.deserializer.clone()));
1394                }
1395            }
1396        }
1397
1398        #[cfg(feature = "tracing")]
1399        if !routes.is_empty() {
1400            tracing::debug!(
1401                "Collected {} inbound routes for scheme '{}'",
1402                routes.len(),
1403                scheme
1404            );
1405        }
1406
1407        routes
1408    }
1409
1410    /// Collects outbound routes for a specific protocol scheme
1411    ///
1412    /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1413    /// filters their outbound_connectors by scheme, and returns routes with
1414    /// consumer creation callbacks.
1415    ///
1416    /// This method is called by connectors during their `build()` phase to
1417    /// collect all configured outbound routes and spawn publisher tasks.
1418    ///
1419    /// # Arguments
1420    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1421    ///
1422    /// # Returns
1423    /// Vector of tuples: (destination, consumer_trait, serializer, config)
1424    ///
1425    /// The config Vec contains protocol-specific options (e.g., qos, retain).
1426    ///
1427    /// # Example
1428    /// ```rust,ignore
1429    /// // In MqttConnector::build()
1430    /// let routes = db.collect_outbound_routes("mqtt");
1431    /// for (topic, consumer, serializer, config) in routes {
1432    ///     connector.spawn_publisher(topic, consumer, serializer, config)?;
1433    /// }
1434    /// ```
1435    #[cfg(feature = "alloc")]
1436    pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1437        let mut routes = Vec::new();
1438
1439        // Convert self to Arc<dyn Any> for consumer factory
1440        // This is necessary because the factory takes Arc<dyn Any> to avoid
1441        // needing to know the runtime type R at the factory definition site
1442        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1443
1444        for record in &self.inner.storages {
1445            let outbound_links = record.outbound_connectors();
1446
1447            for link in outbound_links {
1448                // Filter by scheme
1449                if link.url.scheme() != scheme {
1450                    continue;
1451                }
1452
1453                let destination = link.url.resource_id();
1454
1455                // Skip links without serializer
1456                let Some(serializer) = link.serializer.clone() else {
1457                    #[cfg(feature = "tracing")]
1458                    tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1459                    continue;
1460                };
1461
1462                // Create consumer using the stored factory
1463                if let Some(consumer) = link.create_consumer(db_any.clone()) {
1464                    routes.push((destination, consumer, serializer, link.config.clone()));
1465                }
1466            }
1467        }
1468
1469        #[cfg(feature = "tracing")]
1470        if !routes.is_empty() {
1471            tracing::debug!(
1472                "Collected {} outbound routes for scheme '{}'",
1473                routes.len(),
1474                scheme
1475            );
1476        }
1477
1478        routes
1479    }
1480}