Skip to main content

aimdb_core/
builder.rs

1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::vec::Vec;
13use hashbrown::HashMap;
14
15#[cfg(not(feature = "std"))]
16use alloc::{boxed::Box, sync::Arc};
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19use alloc::string::{String, ToString};
20
21#[cfg(feature = "std")]
22use std::{boxed::Box, sync::Arc};
23
24use crate::extensions::Extensions;
25use crate::graph::DependencyGraph;
26
27/// Type-erased on_start function stored in `AimDbBuilder::start_fns`.
28///
29/// Defined once here so `on_start()` (which stores) and `build()` (which
30/// downcasts) share the *exact same* type and a silent type mismatch cannot
31/// cause a runtime panic.
32#[cfg(feature = "std")]
33type StartFnType<R> = Box<
34    dyn FnOnce(
35            Arc<R>,
36        )
37            -> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
38        + Send,
39>;
40#[cfg(not(feature = "std"))]
41type NoStdStartFnType<R> = Box<
42    dyn FnOnce(
43            Arc<R>,
44        )
45            -> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
46        + Send,
47>;
48use crate::record_id::{RecordId, RecordKey, StringKey};
49use crate::typed_api::{RecordRegistrar, RecordT};
50use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
51use crate::{DbError, DbResult};
52
53/// Type alias for outbound route tuples returned by `collect_outbound_routes`
54///
55/// Each tuple contains:
56/// - `String` - Default topic/destination from the URL path
57/// - `Box<dyn ConsumerTrait>` - Consumer for subscribing to record values
58/// - `SerializerKind` - User-provided serializer for the record type (raw or context-aware)
59/// - `Vec<(String, String)>` - Configuration options from the URL query
60/// - `Option<TopicProviderFn>` - Optional dynamic topic provider
61#[cfg(feature = "alloc")]
62pub type OutboundRoute = (
63    String,
64    Box<dyn crate::connector::ConsumerTrait>,
65    crate::connector::SerializerKind,
66    Vec<(String, String)>,
67    Option<crate::connector::TopicProviderFn>,
68);
69
70/// Marker type for untyped builder (before runtime is set)
71pub struct NoRuntime;
72
73/// Internal database state
74///
75/// Holds the registry of typed records with multiple index structures for
76/// efficient access patterns:
77///
78/// - **`storages`**: Vec for O(1) hot-path access by RecordId
79/// - **`by_key`**: HashMap for O(1) lookup by stable RecordKey
80/// - **`by_type`**: HashMap for introspection (find all records of type T)
81/// - **`types`**: Vec for runtime type validation during downcasts
82/// - **`dependency_graph`**: DAG of record relationships (immutable after build)
83pub struct AimDbInner {
84    /// Record storage (hot path - indexed by RecordId)
85    ///
86    /// Order matches registration order. Immutable after build().
87    storages: Vec<Box<dyn AnyRecord>>,
88
89    /// Name → RecordId lookup (control plane)
90    ///
91    /// Used by remote access, CLI, MCP for O(1) name resolution.
92    by_key: HashMap<StringKey, RecordId>,
93
94    /// TypeId → RecordIds lookup (introspection)
95    ///
96    /// Enables "find all Temperature records" queries.
97    by_type: HashMap<TypeId, Vec<RecordId>>,
98
99    /// RecordId → TypeId lookup (type safety assertions)
100    ///
101    /// Used to validate downcasts at runtime.
102    types: Vec<TypeId>,
103
104    /// RecordId → StringKey lookup (reverse mapping)
105    ///
106    /// Used to get the key for a given record ID.
107    keys: Vec<StringKey>,
108
109    /// Dependency graph (immutable after build)
110    ///
111    /// Contains all record nodes, edges, and topological order.
112    /// Used for introspection, spawn ordering, and graph queries.
113    dependency_graph: crate::graph::DependencyGraph,
114
115    /// Generic extension storage (frozen after build)
116    ///
117    /// External crates (e.g. `aimdb-persistence`) store typed state here
118    /// during builder configuration and retrieve it at query time via
119    /// `AimDb::extensions()`.
120    pub(crate) extensions: Extensions,
121}
122
123impl AimDbInner {
124    /// Resolve RecordKey to RecordId (control plane - O(1) average)
125    #[inline]
126    pub fn resolve<K: RecordKey>(&self, key: &K) -> Option<RecordId> {
127        self.by_key.get(key.as_str()).copied()
128    }
129
130    /// Resolve string to RecordId (convenience for remote access)
131    ///
132    /// O(1) average thanks to `Borrow<str>` implementation on `RecordKey`.
133    #[inline]
134    pub fn resolve_str(&self, name: &str) -> Option<RecordId> {
135        self.by_key.get(name).copied()
136    }
137
138    /// Get storage by RecordId (hot path - O(1))
139    #[inline]
140    pub fn storage(&self, id: RecordId) -> Option<&dyn AnyRecord> {
141        self.storages.get(id.index()).map(|b| b.as_ref())
142    }
143
144    /// Get the StringKey for a given RecordId
145    #[inline]
146    pub fn key_for(&self, id: RecordId) -> Option<&StringKey> {
147        self.keys.get(id.index())
148    }
149
150    /// Get all RecordIds for a type (introspection)
151    pub fn records_of_type<T: 'static>(&self) -> &[RecordId] {
152        self.by_type
153            .get(&TypeId::of::<T>())
154            .map(|v| v.as_slice())
155            .unwrap_or(&[])
156    }
157
158    /// Get the number of registered records
159    #[inline]
160    pub fn record_count(&self) -> usize {
161        self.storages.len()
162    }
163
164    /// Get the dependency graph (immutable reference)
165    ///
166    /// The graph is constructed once during `build()` and never changes.
167    /// Contains all record nodes, edges, and topological ordering.
168    #[inline]
169    pub fn dependency_graph(&self) -> &crate::graph::DependencyGraph {
170        &self.dependency_graph
171    }
172
173    /// Helper to get a typed record by RecordKey
174    ///
175    /// This encapsulates the common pattern of:
176    /// 1. Resolving key to RecordId
177    /// 2. Validating TypeId matches
178    /// 3. Downcasting to the typed record
179    pub fn get_typed_record_by_key<T, R>(
180        &self,
181        key: impl AsRef<str>,
182    ) -> DbResult<&TypedRecord<T, R>>
183    where
184        T: Send + 'static + Debug + Clone,
185        R: aimdb_executor::Spawn + 'static,
186    {
187        let key_str = key.as_ref();
188
189        // Resolve key to RecordId
190        let id = self.resolve_str(key_str).ok_or({
191            #[cfg(feature = "std")]
192            {
193                DbError::RecordKeyNotFound {
194                    key: key_str.to_string(),
195                }
196            }
197            #[cfg(not(feature = "std"))]
198            {
199                DbError::RecordKeyNotFound { _key: () }
200            }
201        })?;
202
203        self.get_typed_record_by_id::<T, R>(id)
204    }
205
206    /// Helper to get a typed record by RecordId with type validation
207    pub fn get_typed_record_by_id<T, R>(&self, id: RecordId) -> DbResult<&TypedRecord<T, R>>
208    where
209        T: Send + 'static + Debug + Clone,
210        R: aimdb_executor::Spawn + 'static,
211    {
212        use crate::typed_record::AnyRecordExt;
213
214        // Validate RecordId is in bounds
215        if id.index() >= self.storages.len() {
216            return Err(DbError::InvalidRecordId { id: id.raw() });
217        }
218
219        // Validate TypeId matches
220        let expected = TypeId::of::<T>();
221        let actual = self.types[id.index()];
222        if expected != actual {
223            #[cfg(feature = "std")]
224            return Err(DbError::TypeMismatch {
225                record_id: id.raw(),
226                expected_type: core::any::type_name::<T>().to_string(),
227            });
228            #[cfg(not(feature = "std"))]
229            return Err(DbError::TypeMismatch {
230                record_id: id.raw(),
231                _expected_type: (),
232            });
233        }
234
235        // Safe to downcast (type validated above)
236        let record = &self.storages[id.index()];
237
238        #[cfg(feature = "std")]
239        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
240            operation: "get_typed_record_by_id".to_string(),
241            reason: "type mismatch during downcast".to_string(),
242        })?;
243
244        #[cfg(not(feature = "std"))]
245        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
246            _operation: (),
247            _reason: (),
248        })?;
249
250        Ok(typed_record)
251    }
252
253    /// Collects metadata for all registered records (std only)
254    ///
255    /// Returns a vector of `RecordMetadata` for remote access introspection.
256    /// Available only when the `std` feature is enabled.
257    #[cfg(feature = "std")]
258    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
259        self.storages
260            .iter()
261            .enumerate()
262            .map(|(i, record)| {
263                let id = RecordId::new(i as u32);
264                let type_id = self.types[i];
265                let key = self.keys[i];
266                record.collect_metadata(type_id, key, id)
267            })
268            .collect()
269    }
270
271    /// Try to get record's latest value as JSON by record key (std only)
272    ///
273    /// O(1) lookup using the key-based index.
274    ///
275    /// # Arguments
276    /// * `record_key` - The record key (e.g., "sensors.temperature")
277    ///
278    /// # Returns
279    /// `Some(JsonValue)` with the current record value, or `None`
280    #[cfg(feature = "std")]
281    pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
282        let id = self.resolve_str(record_key)?;
283        self.storages.get(id.index())?.latest_json()
284    }
285
286    /// Sets a record value from JSON (remote access API)
287    ///
288    /// Deserializes the JSON value and writes it to the record's buffer.
289    ///
290    /// **SAFETY:** Enforces the "No Producer Override" rule:
291    /// - Only works for records with `producer_count == 0`
292    /// - Returns error if the record has active producers
293    ///
294    /// # Arguments
295    /// * `record_key` - The record key (e.g., "config.app")
296    /// * `json_value` - JSON representation of the value
297    ///
298    /// # Returns
299    /// - `Ok(())` - Successfully set the value
300    /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
301    #[cfg(feature = "std")]
302    pub fn set_record_from_json(
303        &self,
304        record_key: &str,
305        json_value: serde_json::Value,
306    ) -> DbResult<()> {
307        let id = self
308            .resolve_str(record_key)
309            .ok_or_else(|| DbError::RecordKeyNotFound {
310                key: record_key.to_string(),
311            })?;
312
313        self.storages[id.index()].set_from_json(json_value)
314    }
315}
316
317/// Database builder for producer-consumer pattern
318///
319/// Provides a fluent API for constructing databases with type-safe record registration.
320/// Use `.runtime()` to set the runtime and transition to a typed builder.
321pub struct AimDbBuilder<R = NoRuntime> {
322    /// Registered records with their keys (order matters for RecordId assignment)
323    records: Vec<(StringKey, TypeId, Box<dyn AnyRecord>)>,
324
325    /// Runtime adapter
326    runtime: Option<Arc<R>>,
327
328    /// Connector builders that will be invoked during build()
329    connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
330
331    /// Spawn functions with their keys
332    spawn_fns: Vec<(StringKey, Box<dyn core::any::Any + Send>)>,
333
334    /// Startup tasks registered via on_start() — spawned after build() completes.
335    /// Stored type-erased (Box<Box<dyn FnOnce(Arc<R>) -> BoxFuture<…>>>) to allow
336    /// the field to exist on the unparameterised NoRuntime builder too.
337    start_fns: Vec<Box<dyn core::any::Any + Send>>,
338
339    /// Generic extension storage for external crates (e.g., persistence, metrics).
340    /// Moved into AimDbInner during build() so it can be read on the live AimDb handle.
341    extensions: Extensions,
342
343    /// Remote access configuration (std only)
344    #[cfg(feature = "std")]
345    remote_config: Option<crate::remote::AimxConfig>,
346
347    /// PhantomData to track the runtime type parameter
348    _phantom: PhantomData<R>,
349}
350
351impl AimDbBuilder<NoRuntime> {
352    /// Creates a new database builder without a runtime
353    ///
354    /// Call `.runtime()` to set the runtime adapter.
355    pub fn new() -> Self {
356        Self {
357            records: Vec::new(),
358            runtime: None,
359            connector_builders: Vec::new(),
360            spawn_fns: Vec::new(),
361            start_fns: Vec::new(),
362            extensions: Extensions::new(),
363            #[cfg(feature = "std")]
364            remote_config: None,
365            _phantom: PhantomData,
366        }
367    }
368
369    /// Sets the runtime adapter
370    ///
371    /// This transitions the builder from untyped to typed with concrete runtime `R`.
372    ///
373    /// # Type Safety Note
374    ///
375    /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
376    /// transition because connectors are parameterized by the runtime type:
377    ///
378    /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
379    /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
380    ///
381    /// These types are incompatible and cannot be transferred. However, this is not a bug
382    /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
383    /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
384    ///
385    /// This means the type system **enforces** the correct call order:
386    /// ```rust,ignore
387    /// AimDbBuilder::new()
388    ///     .runtime(runtime)           // ← Must be called first
389    ///     .with_connector(connector)  // ← Now available
390    /// ```
391    ///
392    /// The `records` and `remote_config` are preserved across the transition since they
393    /// are not parameterized by the runtime type.
394    pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
395    where
396        R: aimdb_executor::Spawn + 'static,
397    {
398        AimDbBuilder {
399            records: self.records,
400            runtime: Some(rt),
401            connector_builders: Vec::new(),
402            spawn_fns: Vec::new(),
403            start_fns: self.start_fns,
404            extensions: self.extensions,
405            #[cfg(feature = "std")]
406            remote_config: self.remote_config,
407            _phantom: PhantomData,
408        }
409    }
410}
411
412impl<R> AimDbBuilder<R>
413where
414    R: aimdb_executor::Spawn + 'static,
415{
416    /// Returns a shared reference to the extension storage.
417    ///
418    /// External crates use this to retrieve state stored via `extensions_mut()`.
419    pub fn extensions(&self) -> &Extensions {
420        &self.extensions
421    }
422
423    /// Returns a mutable reference to the extension storage.
424    ///
425    /// Use this inside `AimDbBuilderPersistExt` (or similar) to store typed state
426    /// that `.persist()` and `AimDbQueryExt` will later retrieve.
427    pub fn extensions_mut(&mut self) -> &mut Extensions {
428        &mut self.extensions
429    }
430
431    /// Registers a task to be spawned after `build()` completes.
432    ///
433    /// The closure receives an `Arc<R>` (the runtime adapter) and must return a
434    /// future that runs for as long as needed (e.g. an infinite cleanup loop).
435    /// Tasks are spawned in registration order, after all record tasks and
436    /// connectors have been started.
437    ///
438    /// # Example
439    /// ```rust,ignore
440    /// builder.on_start(|runtime| async move {
441    ///     loop {
442    ///         do_cleanup().await;
443    ///         runtime.sleep(Duration::from_secs(3600)).await;
444    ///     }
445    /// });
446    /// ```
447    pub fn on_start<F, Fut>(&mut self, f: F) -> &mut Self
448    where
449        F: FnOnce(Arc<R>) -> Fut + Send + 'static,
450        Fut: core::future::Future<Output = ()> + Send + 'static,
451    {
452        // Type-erase so the field can be shared with the `NoRuntime` builder struct.
453        // Uses the module-level `StartFnType<R>` alias — must stay in sync with
454        // the downcast in `build()`.
455        #[cfg(feature = "std")]
456        let boxed: StartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
457        #[cfg(not(feature = "std"))]
458        let boxed: NoStdStartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
459        self.start_fns.push(Box::new(boxed));
460        self
461    }
462
463    /// Registers a connector builder that will be invoked during `build()`
464    ///
465    /// The connector builder will be called after the database is constructed,
466    /// allowing it to collect routes and initialize the connector properly.
467    ///
468    /// # Arguments
469    /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
470    ///
471    /// # Example
472    ///
473    /// ```rust,ignore
474    /// use aimdb_mqtt_connector::MqttConnector;
475    ///
476    /// let db = AimDbBuilder::new()
477    ///     .runtime(runtime)
478    ///     .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
479    ///     .configure::<Temperature>(|reg| {
480    ///         reg.link_from("mqtt://commands/temp")...
481    ///     })
482    ///     .build().await?;
483    /// ```
484    pub fn with_connector(
485        mut self,
486        builder: impl crate::connector::ConnectorBuilder<R> + 'static,
487    ) -> Self {
488        self.connector_builders.push(Box::new(builder));
489        self
490    }
491
492    /// Enables remote access via AimX protocol (std only)
493    ///
494    /// Configures the database to accept remote connections over a Unix domain socket,
495    /// allowing external clients to introspect records, subscribe to updates, and
496    /// (optionally) write data.
497    ///
498    /// The remote access supervisor will be spawned automatically during `build()`.
499    ///
500    /// # Arguments
501    /// * `config` - Remote access configuration (socket path, security policy, etc.)
502    ///
503    /// # Example
504    ///
505    /// ```rust,ignore
506    /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
507    ///
508    /// let config = AimxConfig::new("/tmp/aimdb.sock")
509    ///     .with_security(SecurityPolicy::read_only());
510    ///
511    /// let db = AimDbBuilder::new()
512    ///     .runtime(runtime)
513    ///     .with_remote_access(config)
514    ///     .build()?;
515    /// ```
516    #[cfg(feature = "std")]
517    pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
518        self.remote_config = Some(config);
519        self
520    }
521
522    /// Configures a record type manually with a unique key
523    ///
524    /// The key uniquely identifies this record instance. Multiple records of the same
525    /// type can exist with different keys (e.g., "sensor.temperature.room1" and
526    /// "sensor.temperature.room2").
527    ///
528    /// # Arguments
529    /// * `key` - A unique identifier for this record. Can be a string literal, `StringKey`,
530    ///   or any type implementing `RecordKey` (including user-defined enum keys).
531    /// * `f` - Configuration closure
532    ///
533    /// # Example
534    /// ```rust,ignore
535    /// // Using string literal
536    /// builder.configure::<Temperature>("sensor.temp.room1", |reg| { ... });
537    ///
538    /// // Using compile-time safe enum key
539    /// builder.configure::<Temperature>(SensorKey::TempRoom1, |reg| { ... });
540    /// ```
541    pub fn configure<T>(
542        &mut self,
543        key: impl RecordKey,
544        f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
545    ) -> &mut Self
546    where
547        T: Send + Sync + 'static + Debug + Clone,
548    {
549        // Convert any RecordKey to StringKey for internal storage
550        let record_key: StringKey = StringKey::from_dynamic(key.as_str());
551        let type_id = TypeId::of::<T>();
552
553        // Find existing record with this key, or create new one
554        let record_index = self.records.iter().position(|(k, _, _)| k == &record_key);
555
556        let (rec, is_new_record) = match record_index {
557            Some(idx) => {
558                // Use existing record
559                let (_, existing_type, record) = &mut self.records[idx];
560                assert!(
561                    *existing_type == type_id,
562                    "StringKey '{}' already registered with different type",
563                    record_key.as_str()
564                );
565                (
566                    record
567                        .as_typed_mut::<T, R>()
568                        .expect("type mismatch in record registry"),
569                    false,
570                )
571            }
572            None => {
573                // Create new record
574                self.records
575                    .push((record_key, type_id, Box::new(TypedRecord::<T, R>::new())));
576                let (_, _, record) = self.records.last_mut().unwrap();
577                (
578                    record
579                        .as_typed_mut::<T, R>()
580                        .expect("type mismatch in record registry"),
581                    true,
582                )
583            }
584        };
585
586        let mut reg = RecordRegistrar {
587            rec,
588            connector_builders: &self.connector_builders,
589            #[cfg(feature = "alloc")]
590            record_key: record_key.as_str().to_string(),
591            extensions: &self.extensions,
592            last_stage: None,
593        };
594        f(&mut reg);
595
596        // Only store spawn function for new records to avoid duplicates
597        if is_new_record {
598            let spawn_key = record_key;
599
600            #[allow(clippy::type_complexity)]
601            let spawn_fn: Box<
602                dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send,
603            > = Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>, id: RecordId| {
604                // Use RecordSpawner to spawn tasks for this record type
605                use crate::typed_record::RecordSpawner;
606
607                let typed_record = db.inner().get_typed_record_by_id::<T, R>(id)?;
608                // Get the record key from the database to enable key-based producer/consumer
609                #[cfg(feature = "alloc")]
610                let key = db
611                    .inner()
612                    .key_for(id)
613                    .map(|k| k.as_str().to_string())
614                    .unwrap_or_else(|| alloc::format!("__record_{}", id.index()));
615                #[cfg(not(feature = "alloc"))]
616                let key = "";
617                #[cfg(feature = "alloc")]
618                let key = key.as_str();
619                RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db, key)
620            });
621
622            // Store the spawn function (type-erased in Box<dyn Any>)
623            self.spawn_fns.push((spawn_key, Box::new(spawn_fn)));
624        }
625
626        self
627    }
628
629    /// Registers a self-registering record type
630    ///
631    /// The record type must implement `RecordT<R>`.
632    ///
633    /// Uses the type name as the default key. For custom keys, use `configure()` directly.
634    pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
635    where
636        T: RecordT<R>,
637    {
638        // Default key is the full type name for backward compatibility
639        let key = StringKey::new(core::any::type_name::<T>());
640        self.configure::<T>(key, |reg| T::register(reg, cfg))
641    }
642
643    /// Registers a self-registering record type with a custom key
644    ///
645    /// The record type must implement `RecordT<R>`.
646    pub fn register_record_with_key<T>(&mut self, key: impl RecordKey, cfg: &T::Config) -> &mut Self
647    where
648        T: RecordT<R>,
649    {
650        self.configure::<T>(key, |reg| T::register(reg, cfg))
651    }
652
653    /// Runs the database indefinitely (never returns)
654    ///
655    /// This method builds the database, spawns all producer and consumer tasks, and then
656    /// parks the current task indefinitely. This is the primary way to run AimDB services.
657    ///
658    /// All logic runs in background tasks via producers, consumers, and connectors. The
659    /// application continues until interrupted (e.g., Ctrl+C).
660    ///
661    /// # Returns
662    /// `DbResult<()>` - Ok when database starts successfully, then parks forever
663    ///
664    /// # Example
665    ///
666    /// ```rust,ignore
667    /// #[tokio::main]
668    /// async fn main() -> DbResult<()> {
669    ///     AimDbBuilder::new()
670    ///         .runtime(Arc::new(TokioAdapter::new()?))
671    ///         .configure::<MyData>(|reg| {
672    ///             reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
673    ///                .with_source(my_producer)
674    ///                .with_tap(my_consumer);
675    ///         })
676    ///         .run().await  // Runs forever
677    /// }
678    /// ```
679    pub async fn run(self) -> DbResult<()>
680    where
681        R: crate::RuntimeForProfiling,
682    {
683        #[cfg(feature = "tracing")]
684        tracing::info!("Building database and spawning background tasks...");
685
686        let _db = self.build().await?;
687
688        #[cfg(feature = "tracing")]
689        tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
690
691        // Park indefinitely - the background tasks will continue running
692        // The database handle is kept alive here to prevent dropping it
693        core::future::pending::<()>().await;
694
695        Ok(())
696    }
697
698    /// Builds the database and returns the handle (async)
699    ///
700    /// Use this when you need programmatic access to the database handle for
701    /// manual subscriptions or production. For typical services, use `.run().await` instead.
702    ///
703    /// **Automatic Task Spawning:** This method spawns all producer services and
704    /// `.tap()` observer tasks that were registered during configuration.
705    ///
706    /// **Connector Setup:** Connectors must be created manually before calling `build()`:
707    ///
708    /// ```rust,ignore
709    /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
710    ///
711    /// // Configure records with connector links
712    /// let builder = AimDbBuilder::new()
713    ///     .runtime(runtime)
714    ///     .configure::<Temp>("temp", |reg| {
715    ///         reg.link_from("mqtt://commands/temp")
716    ///            .with_buffer(BufferCfg::SingleLatest)
717    ///            .with_remote_access();
718    ///     });
719    ///
720    /// // Create MQTT connector with router
721    /// let router = RouterBuilder::new()
722    ///     .route("commands/temp", /* deserializer */)
723    ///     .build();
724    /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
725    ///
726    /// // Register connector and build
727    /// let db = builder
728    ///     .with_connector("mqtt", Arc::new(connector))
729    ///     .build().await?;
730    /// ```
731    ///
732    /// # Returns
733    /// `DbResult<AimDb<R>>` - The database instance
734    #[cfg_attr(not(feature = "std"), allow(unused_mut))]
735    pub async fn build(self) -> DbResult<AimDb<R>>
736    where
737        R: crate::RuntimeForProfiling,
738    {
739        use crate::DbError;
740
741        // Validate all records
742        for (key, _, record) in &self.records {
743            record.validate().map_err(|_msg| {
744                // Suppress unused warning for key in no_std
745                let _ = &key;
746                #[cfg(feature = "std")]
747                {
748                    DbError::RuntimeError {
749                        message: format!("Record '{}' validation failed: {}", key.as_str(), _msg),
750                    }
751                }
752                #[cfg(not(feature = "std"))]
753                {
754                    DbError::RuntimeError { _message: () }
755                }
756            })?;
757        }
758
759        // Ensure runtime is set
760        let runtime = self.runtime.ok_or({
761            #[cfg(feature = "std")]
762            {
763                DbError::RuntimeError {
764                    message: "runtime not set (use .runtime())".into(),
765                }
766            }
767            #[cfg(not(feature = "std"))]
768            {
769                DbError::RuntimeError { _message: () }
770            }
771        })?;
772
773        // Build the new index structures
774        let record_count = self.records.len();
775        let mut storages: Vec<Box<dyn AnyRecord>> = Vec::with_capacity(record_count);
776        let mut by_key: HashMap<StringKey, RecordId> = HashMap::with_capacity(record_count);
777        let mut by_type: HashMap<TypeId, Vec<RecordId>> = HashMap::new();
778        let mut types: Vec<TypeId> = Vec::with_capacity(record_count);
779        let mut keys: Vec<StringKey> = Vec::with_capacity(record_count);
780
781        for (i, (key, type_id, record)) in self.records.into_iter().enumerate() {
782            let id = RecordId::new(i as u32);
783
784            // Check for duplicate keys (should not happen if configure() is used correctly)
785            if by_key.contains_key(&key) {
786                #[cfg(feature = "std")]
787                return Err(DbError::DuplicateRecordKey {
788                    key: key.as_str().to_string(),
789                });
790                #[cfg(not(feature = "std"))]
791                return Err(DbError::DuplicateRecordKey { _key: () });
792            }
793
794            // Build index structures
795            storages.push(record);
796            by_key.insert(key, id);
797            by_type.entry(type_id).or_default().push(id);
798            types.push(type_id);
799            keys.push(key);
800        }
801
802        // Build dependency graph from record information
803        // Collect RecordGraphInfo for each record
804        let record_infos: Vec<crate::graph::RecordGraphInfo> = storages
805            .iter()
806            .enumerate()
807            .map(|(idx, record)| {
808                let key = keys[idx].as_str().to_string();
809                let origin = record.record_origin();
810
811                // Get buffer type and capacity from the record
812                let (buffer_type, buffer_capacity) = record.buffer_info();
813
814                crate::graph::RecordGraphInfo {
815                    key,
816                    origin,
817                    buffer_type,
818                    buffer_capacity,
819                    tap_count: record.consumer_count(),
820                    has_outbound_link: record.outbound_connector_count() > 0,
821                }
822            })
823            .collect();
824
825        // Build and validate the dependency graph
826        let dependency_graph = DependencyGraph::build_and_validate(&record_infos)?;
827
828        #[cfg(feature = "tracing")]
829        tracing::debug!(
830            "Dependency graph built successfully ({} nodes, {} edges, topo order: {:?})",
831            dependency_graph.nodes.len(),
832            dependency_graph.edges.len(),
833            dependency_graph.topo_order
834        );
835
836        let inner = Arc::new(AimDbInner {
837            storages,
838            by_key,
839            by_type,
840            types,
841            keys,
842            dependency_graph,
843            extensions: self.extensions,
844        });
845
846        #[cfg(feature = "profiling")]
847        let profiling_clock = crate::profiling::make_clock(runtime.clone());
848
849        let db = Arc::new(AimDb {
850            inner: inner.clone(),
851            runtime: runtime.clone(),
852            #[cfg(feature = "profiling")]
853            profiling_clock,
854        });
855
856        #[cfg(feature = "tracing")]
857        tracing::info!(
858            "Spawning producer services and tap observers for {} records",
859            self.spawn_fns.len()
860        );
861
862        // Build a lookup map from spawn_fns for topological ordering
863        let mut spawn_fn_map: HashMap<StringKey, Box<dyn core::any::Any + Send>> =
864            self.spawn_fns.into_iter().collect();
865
866        // Execute spawn functions in topological order
867        // This ensures transforms are spawned after their input records
868        for key_str in inner.dependency_graph.topo_order() {
869            // Find the StringKey that matches this key string
870            let key = match inner.by_key.keys().find(|k| k.as_str() == key_str) {
871                Some(k) => *k,
872                None => continue, // Key not in spawn_fns, skip
873            };
874
875            // Take the spawn function (if any) for this key
876            let spawn_fn_any = match spawn_fn_map.remove(&key) {
877                Some(f) => f,
878                None => continue, // No spawn function for this key
879            };
880
881            // Resolve key to RecordId
882            let id = inner.resolve(&key).ok_or({
883                #[cfg(feature = "std")]
884                {
885                    DbError::RecordKeyNotFound {
886                        key: key.as_str().to_string(),
887                    }
888                }
889                #[cfg(not(feature = "std"))]
890                {
891                    DbError::RecordKeyNotFound { _key: () }
892                }
893            })?;
894
895            // Downcast from Box<dyn Any> back to the concrete spawn function type
896            type SpawnFnType<R> =
897                Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send>;
898
899            let spawn_fn = spawn_fn_any
900                .downcast::<SpawnFnType<R>>()
901                .expect("spawn function type mismatch");
902
903            // Execute the spawn function
904            (*spawn_fn)(&runtime, &db, id)?;
905        }
906
907        #[cfg(feature = "tracing")]
908        tracing::info!("Automatic spawning complete");
909
910        // Spawn remote access supervisor if configured (std only)
911        #[cfg(feature = "std")]
912        if let Some(remote_cfg) = self.remote_config {
913            #[cfg(feature = "tracing")]
914            tracing::info!(
915                "Spawning remote access supervisor on socket: {}",
916                remote_cfg.socket_path.display()
917            );
918
919            // Apply security policy to mark writable records
920            let writable_keys = remote_cfg.security_policy.writable_records();
921            for key_str in writable_keys {
922                if let Some(id) = inner.resolve_str(&key_str) {
923                    #[cfg(feature = "tracing")]
924                    tracing::debug!("Marking record '{}' as writable", key_str);
925
926                    // Mark the record as writable (type-erased call)
927                    inner.storages[id.index()].set_writable_erased(true);
928                }
929            }
930
931            // Spawn the remote supervisor task
932            crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
933
934            #[cfg(feature = "tracing")]
935            tracing::info!("Remote access supervisor spawned successfully");
936        }
937
938        // Build connectors from builders (after database is fully constructed)
939        // This allows connectors to use collect_inbound_routes() which creates
940        // producers tied to this specific database instance
941        for builder in self.connector_builders {
942            #[cfg(feature = "tracing")]
943            let scheme = {
944                #[cfg(feature = "std")]
945                {
946                    builder.scheme().to_string()
947                }
948                #[cfg(not(feature = "std"))]
949                {
950                    alloc::string::String::from(builder.scheme())
951                }
952            };
953
954            #[cfg(feature = "tracing")]
955            tracing::debug!("Building connector for scheme: {}", scheme);
956
957            // Build the connector (this spawns tasks as a side effect)
958            let _connector = builder.build(&db).await?;
959
960            #[cfg(feature = "tracing")]
961            tracing::info!("Connector built and spawned successfully: {}", scheme);
962        }
963
964        // Spawn on_start tasks (registered by external crates like aimdb-persistence)
965        if !self.start_fns.is_empty() {
966            #[cfg(feature = "tracing")]
967            tracing::debug!("Spawning {} on_start task(s)", self.start_fns.len());
968
969            #[cfg(feature = "std")]
970            for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
971                let start_fn = start_fn_any
972                    .downcast::<StartFnType<R>>()
973                    .unwrap_or_else(|_| {
974                        panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
975                    });
976                let future = (*start_fn)(runtime.clone());
977                runtime.spawn(future).map_err(DbError::from)?;
978            }
979
980            #[cfg(not(feature = "std"))]
981            for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
982                let start_fn = start_fn_any
983                    .downcast::<NoStdStartFnType<R>>()
984                    .unwrap_or_else(|_| {
985                        panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
986                    });
987                let future = (*start_fn)(runtime.clone());
988                runtime.spawn(future).map_err(DbError::from)?;
989            }
990        }
991
992        // Unwrap the Arc to return the owned AimDb
993        // This is safe because we just created it and hold the only reference
994        let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
995
996        Ok(db_owned)
997    }
998}
999
1000impl Default for AimDbBuilder<NoRuntime> {
1001    fn default() -> Self {
1002        Self::new()
1003    }
1004}
1005
1006/// Producer-consumer database
1007///
1008/// A database instance with type-safe record registration and cross-record
1009/// communication via the Emitter pattern. The type parameter `R` represents
1010/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
1011///
1012/// See `examples/` for usage.
1013///
1014/// # Examples
1015///
1016/// ```rust,ignore
1017/// use aimdb_tokio_adapter::TokioAdapter;
1018///
1019/// let runtime = Arc::new(TokioAdapter);
1020/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
1021///     .runtime(runtime)
1022///     .register_record::<Temperature>(&TemperatureConfig)
1023///     .build()?;
1024/// ```
1025pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
1026    /// Internal state
1027    inner: Arc<AimDbInner>,
1028
1029    /// Runtime adapter with concrete type
1030    runtime: Arc<R>,
1031
1032    /// Shared wall clock for stage profiling, built from the runtime at `build()` time.
1033    #[cfg(feature = "profiling")]
1034    profiling_clock: crate::profiling::Clock,
1035}
1036
1037impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
1038    fn clone(&self) -> Self {
1039        Self {
1040            inner: self.inner.clone(),
1041            runtime: self.runtime.clone(),
1042            #[cfg(feature = "profiling")]
1043            profiling_clock: self.profiling_clock.clone(),
1044        }
1045    }
1046}
1047
1048impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
1049    /// Internal accessor for the inner state
1050    ///
1051    /// Used by adapter crates and internal spawning logic.
1052    #[doc(hidden)]
1053    pub fn inner(&self) -> &Arc<AimDbInner> {
1054        &self.inner
1055    }
1056
1057    /// Returns the extension storage frozen at `build()` time.
1058    ///
1059    /// External crates (e.g. `aimdb-persistence`) retrieve their typed state here
1060    /// to service query calls. The extensions are read-only on the live handle.
1061    ///
1062    /// # Example
1063    /// ```rust,ignore
1064    /// use aimdb_persistence::PersistenceState;
1065    /// let state = db.extensions().get::<PersistenceState>().unwrap();
1066    /// ```
1067    pub fn extensions(&self) -> &Extensions {
1068        &self.inner.extensions
1069    }
1070
1071    /// Shared wall clock used by stage profiling (nanoseconds since an arbitrary epoch).
1072    #[cfg(feature = "profiling")]
1073    pub(crate) fn profiling_clock(&self) -> &crate::profiling::Clock {
1074        &self.profiling_clock
1075    }
1076
1077    /// Builds a database with a closure-based builder pattern
1078    pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()>
1079    where
1080        R: crate::RuntimeForProfiling,
1081    {
1082        let mut b = AimDbBuilder::new().runtime(rt);
1083        f(&mut b);
1084        b.run().await
1085    }
1086
1087    /// Spawns a task using the database's runtime adapter
1088    ///
1089    /// This method provides direct access to the runtime's spawn capability.
1090    ///
1091    /// # Arguments
1092    /// * `future` - The future to spawn
1093    ///
1094    /// # Returns
1095    /// `DbResult<()>` - Ok if the task was spawned successfully
1096    pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
1097    where
1098        F: core::future::Future<Output = ()> + Send + 'static,
1099    {
1100        self.runtime.spawn(future).map_err(DbError::from)?;
1101        Ok(())
1102    }
1103
1104    /// Produces a value to a specific record by key
1105    ///
1106    /// Uses O(1) key-based lookup to find the correct record.
1107    ///
1108    /// # Arguments
1109    /// * `key` - The record key (e.g., "sensor.temperature")
1110    /// * `value` - The value to produce
1111    ///
1112    /// # Example
1113    ///
1114    /// ```rust,ignore
1115    /// db.produce::<Temperature>("sensors.indoor", indoor_temp).await?;
1116    /// db.produce::<Temperature>("sensors.outdoor", outdoor_temp).await?;
1117    /// ```
1118    pub async fn produce<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
1119    where
1120        T: Send + 'static + Debug + Clone,
1121    {
1122        let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1123        typed_rec.produce(value).await;
1124        Ok(())
1125    }
1126
1127    /// Subscribes to a specific record by key
1128    ///
1129    /// Uses O(1) key-based lookup to find the correct record.
1130    ///
1131    /// # Arguments
1132    /// * `key` - The record key (e.g., "sensor.temperature")
1133    ///
1134    /// # Example
1135    ///
1136    /// ```rust,ignore
1137    /// let mut reader = db.subscribe::<Temperature>("sensors.indoor")?;
1138    /// while let Ok(temp) = reader.recv().await {
1139    ///     println!("Indoor: {:.1}°C", temp.celsius);
1140    /// }
1141    /// ```
1142    pub fn subscribe<T>(
1143        &self,
1144        key: impl AsRef<str>,
1145    ) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
1146    where
1147        T: Send + Sync + 'static + Debug + Clone,
1148    {
1149        let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1150        typed_rec.subscribe()
1151    }
1152
1153    /// Creates a type-safe producer for a specific record by key
1154    ///
1155    /// Returns a `Producer<T, R>` bound to a specific record key.
1156    ///
1157    /// # Arguments
1158    /// * `key` - The record key (e.g., "sensor.temperature")
1159    ///
1160    /// # Example
1161    ///
1162    /// ```rust,ignore
1163    /// let indoor_producer = db.producer::<Temperature>("sensors.indoor");
1164    /// let outdoor_producer = db.producer::<Temperature>("sensors.outdoor");
1165    ///
1166    /// // Each producer writes to its own record
1167    /// indoor_producer.produce(indoor_temp).await?;
1168    /// outdoor_producer.produce(outdoor_temp).await?;
1169    /// ```
1170    pub fn producer<T>(
1171        &self,
1172        key: impl Into<alloc::string::String>,
1173    ) -> crate::typed_api::Producer<T, R>
1174    where
1175        T: Send + 'static + Debug + Clone,
1176    {
1177        crate::typed_api::Producer::new(Arc::new(self.clone()), key.into())
1178    }
1179
1180    /// Creates a type-safe consumer for a specific record by key
1181    ///
1182    /// Returns a `Consumer<T, R>` bound to a specific record key.
1183    ///
1184    /// # Arguments
1185    /// * `key` - The record key (e.g., "sensor.temperature")
1186    ///
1187    /// # Example
1188    ///
1189    /// ```rust,ignore
1190    /// let indoor_consumer = db.consumer::<Temperature>("sensors.indoor");
1191    /// let outdoor_consumer = db.consumer::<Temperature>("sensors.outdoor");
1192    ///
1193    /// // Each consumer reads from its own record
1194    /// let mut rx = indoor_consumer.subscribe()?;
1195    /// ```
1196    pub fn consumer<T>(
1197        &self,
1198        key: impl Into<alloc::string::String>,
1199    ) -> crate::typed_api::Consumer<T, R>
1200    where
1201        T: Send + Sync + 'static + Debug + Clone,
1202    {
1203        crate::typed_api::Consumer::new(Arc::new(self.clone()), key.into())
1204    }
1205
1206    /// Resolve a record key to its RecordId
1207    ///
1208    /// Useful for checking if a record exists before operations.
1209    ///
1210    /// # Example
1211    ///
1212    /// ```rust,ignore
1213    /// if let Some(id) = db.resolve_key("sensors.temperature") {
1214    ///     println!("Record exists with ID: {}", id);
1215    /// }
1216    /// ```
1217    pub fn resolve_key(&self, key: &str) -> Option<crate::record_id::RecordId> {
1218        self.inner.resolve_str(key)
1219    }
1220
1221    /// Get all record IDs for a specific type
1222    ///
1223    /// Returns a slice of RecordIds for all records of type T.
1224    /// Useful for introspection when multiple records of the same type exist.
1225    ///
1226    /// # Example
1227    ///
1228    /// ```rust,ignore
1229    /// let temp_ids = db.records_of_type::<Temperature>();
1230    /// println!("Found {} temperature records", temp_ids.len());
1231    /// ```
1232    pub fn records_of_type<T: 'static>(&self) -> &[crate::record_id::RecordId] {
1233        self.inner.records_of_type::<T>()
1234    }
1235
1236    /// Returns a reference to the runtime adapter
1237    ///
1238    /// Provides direct access to the concrete runtime type.
1239    pub fn runtime(&self) -> &R {
1240        &self.runtime
1241    }
1242
1243    /// Returns the runtime as a type-erased `Arc<dyn Any + Send + Sync>`
1244    ///
1245    /// Used by connectors to provide `RuntimeContext` to context-aware
1246    /// deserializers during inbound message routing.
1247    pub fn runtime_any(&self) -> Arc<dyn core::any::Any + Send + Sync> {
1248        self.runtime.clone()
1249    }
1250
1251    /// Lists all registered records (std only)
1252    ///
1253    /// Returns metadata for all registered records, useful for remote access introspection.
1254    /// Available only when the `std` feature is enabled.
1255    ///
1256    /// # Example
1257    /// ```rust,ignore
1258    /// let records = db.list_records();
1259    /// for record in records {
1260    ///     println!("Record: {} ({})", record.name, record.type_id);
1261    /// }
1262    /// ```
1263    #[cfg(feature = "std")]
1264    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
1265        self.inner.list_records()
1266    }
1267
1268    /// Resets stage profiling counters for every record (feature `profiling`).
1269    #[cfg(feature = "profiling")]
1270    pub fn reset_stage_profiling(&self) {
1271        for record in &self.inner.storages {
1272            record.reset_profiling();
1273        }
1274    }
1275
1276    /// Resets buffer introspection counters for every record (feature `metrics`).
1277    #[cfg(feature = "metrics")]
1278    pub fn reset_buffer_metrics(&self) {
1279        for record in &self.inner.storages {
1280            record.reset_buffer_metrics();
1281        }
1282    }
1283
1284    /// Try to get record's latest value as JSON by name (std only)
1285    ///
1286    /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
1287    ///
1288    /// # Arguments
1289    /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
1290    ///
1291    /// # Returns
1292    /// `Some(JsonValue)` with current value, or `None` if unavailable
1293    #[cfg(feature = "std")]
1294    pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
1295        self.inner.try_latest_as_json(record_name)
1296    }
1297
1298    /// Sets a record value from JSON (remote access API)
1299    ///
1300    /// Deserializes JSON and produces the value to the record's buffer.
1301    ///
1302    /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
1303    /// records without active producers.
1304    ///
1305    /// # Arguments
1306    /// * `record_name` - Full Rust type name
1307    /// * `json_value` - JSON value to set
1308    ///
1309    /// # Returns
1310    /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
1311    ///
1312    /// # Example (internal use)
1313    /// ```rust,ignore
1314    /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
1315    /// ```
1316    #[cfg(feature = "std")]
1317    pub fn set_record_from_json(
1318        &self,
1319        record_name: &str,
1320        json_value: serde_json::Value,
1321    ) -> DbResult<()> {
1322        self.inner.set_record_from_json(record_name, json_value)
1323    }
1324
1325    /// Subscribe to record updates as JSON stream (std only)
1326    ///
1327    /// Creates a subscription to a record's buffer and forwards updates as JSON
1328    /// to a bounded channel. This is used internally by the remote access protocol
1329    /// for implementing `record.subscribe`.
1330    ///
1331    /// # Architecture
1332    ///
1333    /// Spawns a consumer task that:
1334    /// 1. Subscribes to the record's buffer using the existing buffer API
1335    /// 2. Reads values as they arrive
1336    /// 3. Serializes each value to JSON
1337    /// 4. Sends JSON values to a bounded channel (with backpressure handling)
1338    /// 5. Terminates when either:
1339    ///    - The cancel signal is received (unsubscribe)
1340    ///    - The channel receiver is dropped (client disconnected)
1341    ///
1342    /// # Arguments
1343    /// * `record_key` - Key of the record to subscribe to
1344    /// * `queue_size` - Size of the bounded channel for this subscription
1345    ///
1346    /// # Returns
1347    /// `Ok((receiver, cancel_tx))` where:
1348    /// - `receiver`: Bounded channel receiver for JSON values
1349    /// - `cancel_tx`: One-shot sender to cancel the subscription
1350    ///
1351    /// `Err` if:
1352    /// - Record not found for the given key
1353    /// - Record not configured with `.with_remote_access()`
1354    /// - Failed to subscribe to buffer
1355    ///
1356    /// # Example (internal use)
1357    ///
1358    /// ```rust,ignore
1359    /// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
1360    ///
1361    /// // Read events
1362    /// while let Some(json_value) = rx.recv().await {
1363    ///     // Forward to client...
1364    /// }
1365    ///
1366    /// // Cancel subscription
1367    /// let _ = cancel_tx.send(());
1368    /// ```
1369    #[cfg(feature = "std")]
1370    #[allow(unused_variables)] // Variables used only in tracing feature
1371    pub fn subscribe_record_updates(
1372        &self,
1373        record_key: &str,
1374        queue_size: usize,
1375    ) -> DbResult<(
1376        tokio::sync::mpsc::Receiver<serde_json::Value>,
1377        tokio::sync::oneshot::Sender<()>,
1378    )> {
1379        use tokio::sync::{mpsc, oneshot};
1380
1381        // Find the record by key
1382        let id = self
1383            .inner
1384            .resolve_str(record_key)
1385            .ok_or_else(|| DbError::RecordKeyNotFound {
1386                key: record_key.to_string(),
1387            })?;
1388
1389        let record = self
1390            .inner
1391            .storage(id)
1392            .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
1393
1394        // Subscribe to the record's buffer as JSON stream
1395        // This will fail if record not configured with .with_remote_access()
1396        let mut json_reader = record.subscribe_json()?;
1397
1398        // Create channels for the subscription
1399        let (value_tx, value_rx) = mpsc::channel(queue_size);
1400        let (cancel_tx, mut cancel_rx) = oneshot::channel();
1401
1402        // Get metadata for logging
1403        let type_id = self.inner.types[id.index()];
1404        let key = self.inner.keys[id.index()];
1405        let record_metadata = record.collect_metadata(type_id, key, id);
1406        let runtime = self.runtime.clone();
1407
1408        // Spawn consumer task that forwards JSON values from buffer to channel
1409        let spawn_result = runtime.spawn(async move {
1410            #[cfg(feature = "tracing")]
1411            tracing::debug!(
1412                "Subscription consumer task started for {}",
1413                record_metadata.name
1414            );
1415
1416            // Main event loop: read from buffer and forward to channel
1417            loop {
1418                tokio::select! {
1419                    // Handle cancellation signal
1420                    _ = &mut cancel_rx => {
1421                        #[cfg(feature = "tracing")]
1422                        tracing::debug!("Subscription cancelled");
1423                        break;
1424                    }
1425                    // Read next JSON value from buffer
1426                    result = json_reader.recv_json() => {
1427                        match result {
1428                            Ok(json_val) => {
1429                                // Send JSON value to subscription channel
1430                                if value_tx.send(json_val).await.is_err() {
1431                                    #[cfg(feature = "tracing")]
1432                                    tracing::debug!("Subscription receiver dropped");
1433                                    break;
1434                                }
1435                            }
1436                            Err(DbError::BufferLagged { lag_count, .. }) => {
1437                                // Consumer fell behind - log warning but continue
1438                                #[cfg(feature = "tracing")]
1439                                tracing::warn!(
1440                                    "Subscription for {} lagged by {} messages",
1441                                    record_metadata.name,
1442                                    lag_count
1443                                );
1444                                // Continue reading - next recv will get latest
1445                            }
1446                            Err(DbError::BufferClosed { .. }) => {
1447                                // Buffer closed (shutdown) - exit gracefully
1448                                #[cfg(feature = "tracing")]
1449                                tracing::debug!("Buffer closed for {}", record_metadata.name);
1450                                break;
1451                            }
1452                            Err(e) => {
1453                                // Other error (shouldn't happen in practice)
1454                                #[cfg(feature = "tracing")]
1455                                tracing::error!(
1456                                    "Subscription error for {}: {:?}",
1457                                    record_metadata.name,
1458                                    e
1459                                );
1460                                break;
1461                            }
1462                        }
1463                    }
1464                }
1465            }
1466
1467            #[cfg(feature = "tracing")]
1468            tracing::debug!("Subscription consumer task terminated");
1469        });
1470
1471        spawn_result.map_err(DbError::from)?;
1472
1473        Ok((value_rx, cancel_tx))
1474    }
1475
1476    /// Collects inbound connector routes for automatic router construction (std only)
1477    ///
1478    /// Iterates all records, filters their inbound_connectors by scheme,
1479    /// and returns routes with producer creation callbacks.
1480    ///
1481    /// # Arguments
1482    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1483    ///
1484    /// # Returns
1485    /// Vector of tuples: (topic, producer_trait, deserializer)
1486    ///
1487    /// The topic is resolved dynamically if a `TopicResolverFn` is configured,
1488    /// otherwise the static topic from the URL is used.
1489    ///
1490    /// # Example
1491    /// ```rust,ignore
1492    /// // In MqttConnector after db.build()
1493    /// let routes = db.collect_inbound_routes("mqtt");
1494    /// let router = RouterBuilder::from_routes(routes).build();
1495    /// connector.set_router(router).await?;
1496    /// ```
1497    #[cfg(feature = "alloc")]
1498    pub fn collect_inbound_routes(
1499        &self,
1500        scheme: &str,
1501    ) -> Vec<(
1502        String,
1503        Box<dyn crate::connector::ProducerTrait>,
1504        crate::connector::DeserializerKind,
1505    )> {
1506        let mut routes = Vec::new();
1507
1508        // Convert self to Arc<dyn Any> for producer factory
1509        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1510
1511        for record in &self.inner.storages {
1512            let inbound_links = record.inbound_connectors();
1513
1514            for link in inbound_links {
1515                // Filter by scheme
1516                if link.url.scheme() != scheme {
1517                    continue;
1518                }
1519
1520                // Resolve topic: dynamic (from resolver) or static (from URL)
1521                let topic = link.resolve_topic();
1522
1523                // Create producer using the stored factory
1524                if let Some(producer) = link.create_producer(db_any.clone()) {
1525                    routes.push((topic, producer, link.deserializer.clone()));
1526                }
1527            }
1528        }
1529
1530        #[cfg(feature = "tracing")]
1531        if !routes.is_empty() {
1532            tracing::debug!(
1533                "Collected {} inbound routes for scheme '{}'",
1534                routes.len(),
1535                scheme
1536            );
1537        }
1538
1539        routes
1540    }
1541
1542    /// Collects outbound routes for a specific protocol scheme
1543    ///
1544    /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1545    /// filters their outbound_connectors by scheme, and returns routes with
1546    /// consumer creation callbacks.
1547    ///
1548    /// This method is called by connectors during their `build()` phase to
1549    /// collect all configured outbound routes and spawn publisher tasks.
1550    ///
1551    /// # Arguments
1552    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1553    ///
1554    /// # Returns
1555    /// Vector of tuples: (destination, consumer_trait, serializer, config)
1556    ///
1557    /// The config Vec contains protocol-specific options (e.g., qos, retain).
1558    ///
1559    /// # Example
1560    /// ```rust,ignore
1561    /// // In MqttConnector::build()
1562    /// let routes = db.collect_outbound_routes("mqtt");
1563    /// for (topic, consumer, serializer, config) in routes {
1564    ///     connector.spawn_publisher(topic, consumer, serializer, config)?;
1565    /// }
1566    /// ```
1567    /// Collect `(topic, TypeId)` pairs for all outbound routes matching `scheme`.
1568    ///
1569    /// Complements [`collect_outbound_routes`](Self::collect_outbound_routes) when
1570    /// callers need to know the concrete record type behind each outbound topic
1571    /// (e.g. to resolve a schema name for discovery responses).
1572    ///
1573    /// The returned TypeId is the `TypeId::of::<T>()` for the record type `T`
1574    /// that was used in the corresponding `configure::<T>()` call.
1575    #[cfg(feature = "alloc")]
1576    pub fn collect_outbound_topic_type_ids(&self, scheme: &str) -> Vec<(String, TypeId)> {
1577        let mut result = Vec::new();
1578
1579        for (idx, record) in self.inner.storages.iter().enumerate() {
1580            let type_id = self.inner.types[idx];
1581
1582            for link in record.outbound_connectors() {
1583                if link.url.scheme() != scheme {
1584                    continue;
1585                }
1586                result.push((link.url.resource_id(), type_id));
1587            }
1588        }
1589
1590        result
1591    }
1592
1593    #[cfg(feature = "alloc")]
1594    pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1595        let mut routes = Vec::new();
1596
1597        // Convert self to Arc<dyn Any> for consumer factory
1598        // This is necessary because the factory takes Arc<dyn Any> to avoid
1599        // needing to know the runtime type R at the factory definition site
1600        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1601
1602        for record in &self.inner.storages {
1603            let outbound_links = record.outbound_connectors();
1604
1605            for link in outbound_links {
1606                // Filter by scheme
1607                if link.url.scheme() != scheme {
1608                    continue;
1609                }
1610
1611                let destination = link.url.resource_id();
1612
1613                // Skip links without serializer
1614                let Some(serializer) = link.serializer.clone() else {
1615                    #[cfg(feature = "tracing")]
1616                    tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1617                    continue;
1618                };
1619
1620                // Create consumer using the stored factory
1621                if let Some(consumer) = link.create_consumer(db_any.clone()) {
1622                    routes.push((
1623                        destination,
1624                        consumer,
1625                        serializer,
1626                        link.config.clone(),
1627                        link.topic_provider.clone(),
1628                    ));
1629                }
1630            }
1631        }
1632
1633        #[cfg(feature = "tracing")]
1634        if !routes.is_empty() {
1635            tracing::debug!(
1636                "Collected {} outbound routes for scheme '{}'",
1637                routes.len(),
1638                scheme
1639            );
1640        }
1641
1642        routes
1643    }
1644}