aimdb_core/
builder.rs

1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::vec::Vec;
13use hashbrown::HashMap;
14
15#[cfg(not(feature = "std"))]
16use alloc::{boxed::Box, sync::Arc};
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19use alloc::string::{String, ToString};
20
21#[cfg(feature = "std")]
22use std::{boxed::Box, sync::Arc};
23
24use crate::record_id::{RecordId, RecordKey, StringKey};
25use crate::typed_api::{RecordRegistrar, RecordT};
26use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
27use crate::{DbError, DbResult};
28
29/// Type alias for outbound route tuples returned by `collect_outbound_routes`
30///
31/// Each tuple contains:
32/// - `String` - Topic/key from the URL path
33/// - `Box<dyn ConsumerTrait>` - Callback to create a consumer for this record
34/// - `SerializerFn` - User-provided serializer for the record type
35/// - `Vec<(String, String)>` - Configuration options from the URL query
36#[cfg(feature = "alloc")]
37type OutboundRoute = (
38    String,
39    Box<dyn crate::connector::ConsumerTrait>,
40    crate::connector::SerializerFn,
41    Vec<(String, String)>,
42);
43
44/// Marker type for untyped builder (before runtime is set)
45pub struct NoRuntime;
46
47/// Internal database state
48///
49/// Holds the registry of typed records with multiple index structures for
50/// efficient access patterns:
51///
52/// - **`storages`**: Vec for O(1) hot-path access by RecordId
53/// - **`by_key`**: HashMap for O(1) lookup by stable RecordKey
54/// - **`by_type`**: HashMap for introspection (find all records of type T)
55/// - **`types`**: Vec for runtime type validation during downcasts
56pub struct AimDbInner {
57    /// Record storage (hot path - indexed by RecordId)
58    ///
59    /// Order matches registration order. Immutable after build().
60    storages: Vec<Box<dyn AnyRecord>>,
61
62    /// Name → RecordId lookup (control plane)
63    ///
64    /// Used by remote access, CLI, MCP for O(1) name resolution.
65    by_key: HashMap<StringKey, RecordId>,
66
67    /// TypeId → RecordIds lookup (introspection)
68    ///
69    /// Enables "find all Temperature records" queries.
70    by_type: HashMap<TypeId, Vec<RecordId>>,
71
72    /// RecordId → TypeId lookup (type safety assertions)
73    ///
74    /// Used to validate downcasts at runtime.
75    types: Vec<TypeId>,
76
77    /// RecordId → StringKey lookup (reverse mapping)
78    ///
79    /// Used to get the key for a given record ID.
80    keys: Vec<StringKey>,
81}
82
83impl AimDbInner {
84    /// Resolve RecordKey to RecordId (control plane - O(1) average)
85    #[inline]
86    pub fn resolve<K: RecordKey>(&self, key: &K) -> Option<RecordId> {
87        self.by_key.get(key.as_str()).copied()
88    }
89
90    /// Resolve string to RecordId (convenience for remote access)
91    ///
92    /// O(1) average thanks to `Borrow<str>` implementation on `RecordKey`.
93    #[inline]
94    pub fn resolve_str(&self, name: &str) -> Option<RecordId> {
95        self.by_key.get(name).copied()
96    }
97
98    /// Get storage by RecordId (hot path - O(1))
99    #[inline]
100    pub fn storage(&self, id: RecordId) -> Option<&dyn AnyRecord> {
101        self.storages.get(id.index()).map(|b| b.as_ref())
102    }
103
104    /// Get the StringKey for a given RecordId
105    #[inline]
106    pub fn key_for(&self, id: RecordId) -> Option<&StringKey> {
107        self.keys.get(id.index())
108    }
109
110    /// Get all RecordIds for a type (introspection)
111    pub fn records_of_type<T: 'static>(&self) -> &[RecordId] {
112        self.by_type
113            .get(&TypeId::of::<T>())
114            .map(|v| v.as_slice())
115            .unwrap_or(&[])
116    }
117
118    /// Get the number of registered records
119    #[inline]
120    pub fn record_count(&self) -> usize {
121        self.storages.len()
122    }
123
124    /// Helper to get a typed record by RecordKey
125    ///
126    /// This encapsulates the common pattern of:
127    /// 1. Resolving key to RecordId
128    /// 2. Validating TypeId matches
129    /// 3. Downcasting to the typed record
130    pub fn get_typed_record_by_key<T, R>(
131        &self,
132        key: impl AsRef<str>,
133    ) -> DbResult<&TypedRecord<T, R>>
134    where
135        T: Send + 'static + Debug + Clone,
136        R: aimdb_executor::Spawn + 'static,
137    {
138        let key_str = key.as_ref();
139
140        // Resolve key to RecordId
141        let id = self.resolve_str(key_str).ok_or({
142            #[cfg(feature = "std")]
143            {
144                DbError::RecordKeyNotFound {
145                    key: key_str.to_string(),
146                }
147            }
148            #[cfg(not(feature = "std"))]
149            {
150                DbError::RecordKeyNotFound { _key: () }
151            }
152        })?;
153
154        self.get_typed_record_by_id::<T, R>(id)
155    }
156
157    /// Helper to get a typed record by RecordId with type validation
158    pub fn get_typed_record_by_id<T, R>(&self, id: RecordId) -> DbResult<&TypedRecord<T, R>>
159    where
160        T: Send + 'static + Debug + Clone,
161        R: aimdb_executor::Spawn + 'static,
162    {
163        use crate::typed_record::AnyRecordExt;
164
165        // Validate RecordId is in bounds
166        if id.index() >= self.storages.len() {
167            return Err(DbError::InvalidRecordId { id: id.raw() });
168        }
169
170        // Validate TypeId matches
171        let expected = TypeId::of::<T>();
172        let actual = self.types[id.index()];
173        if expected != actual {
174            #[cfg(feature = "std")]
175            return Err(DbError::TypeMismatch {
176                record_id: id.raw(),
177                expected_type: core::any::type_name::<T>().to_string(),
178            });
179            #[cfg(not(feature = "std"))]
180            return Err(DbError::TypeMismatch {
181                record_id: id.raw(),
182                _expected_type: (),
183            });
184        }
185
186        // Safe to downcast (type validated above)
187        let record = &self.storages[id.index()];
188
189        #[cfg(feature = "std")]
190        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
191            operation: "get_typed_record_by_id".to_string(),
192            reason: "type mismatch during downcast".to_string(),
193        })?;
194
195        #[cfg(not(feature = "std"))]
196        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
197            _operation: (),
198            _reason: (),
199        })?;
200
201        Ok(typed_record)
202    }
203
204    /// Collects metadata for all registered records (std only)
205    ///
206    /// Returns a vector of `RecordMetadata` for remote access introspection.
207    /// Available only when the `std` feature is enabled.
208    #[cfg(feature = "std")]
209    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
210        self.storages
211            .iter()
212            .enumerate()
213            .map(|(i, record)| {
214                let id = RecordId::new(i as u32);
215                let type_id = self.types[i];
216                let key = self.keys[i];
217                record.collect_metadata(type_id, key, id)
218            })
219            .collect()
220    }
221
222    /// Try to get record's latest value as JSON by record key (std only)
223    ///
224    /// O(1) lookup using the key-based index.
225    ///
226    /// # Arguments
227    /// * `record_key` - The record key (e.g., "sensors.temperature")
228    ///
229    /// # Returns
230    /// `Some(JsonValue)` with the current record value, or `None`
231    #[cfg(feature = "std")]
232    pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
233        let id = self.resolve_str(record_key)?;
234        self.storages.get(id.index())?.latest_json()
235    }
236
237    /// Sets a record value from JSON (remote access API)
238    ///
239    /// Deserializes the JSON value and writes it to the record's buffer.
240    ///
241    /// **SAFETY:** Enforces the "No Producer Override" rule:
242    /// - Only works for records with `producer_count == 0`
243    /// - Returns error if the record has active producers
244    ///
245    /// # Arguments
246    /// * `record_key` - The record key (e.g., "config.app")
247    /// * `json_value` - JSON representation of the value
248    ///
249    /// # Returns
250    /// - `Ok(())` - Successfully set the value
251    /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
252    #[cfg(feature = "std")]
253    pub fn set_record_from_json(
254        &self,
255        record_key: &str,
256        json_value: serde_json::Value,
257    ) -> DbResult<()> {
258        let id = self
259            .resolve_str(record_key)
260            .ok_or_else(|| DbError::RecordKeyNotFound {
261                key: record_key.to_string(),
262            })?;
263
264        self.storages[id.index()].set_from_json(json_value)
265    }
266}
267
268/// Database builder for producer-consumer pattern
269///
270/// Provides a fluent API for constructing databases with type-safe record registration.
271/// Use `.runtime()` to set the runtime and transition to a typed builder.
272pub struct AimDbBuilder<R = NoRuntime> {
273    /// Registered records with their keys (order matters for RecordId assignment)
274    records: Vec<(StringKey, TypeId, Box<dyn AnyRecord>)>,
275
276    /// Runtime adapter
277    runtime: Option<Arc<R>>,
278
279    /// Connector builders that will be invoked during build()
280    connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
281
282    /// Spawn functions with their keys
283    spawn_fns: Vec<(StringKey, Box<dyn core::any::Any + Send>)>,
284
285    /// Remote access configuration (std only)
286    #[cfg(feature = "std")]
287    remote_config: Option<crate::remote::AimxConfig>,
288
289    /// PhantomData to track the runtime type parameter
290    _phantom: PhantomData<R>,
291}
292
293impl AimDbBuilder<NoRuntime> {
294    /// Creates a new database builder without a runtime
295    ///
296    /// Call `.runtime()` to set the runtime adapter.
297    pub fn new() -> Self {
298        Self {
299            records: Vec::new(),
300            runtime: None,
301            connector_builders: Vec::new(),
302            spawn_fns: Vec::new(),
303            #[cfg(feature = "std")]
304            remote_config: None,
305            _phantom: PhantomData,
306        }
307    }
308
309    /// Sets the runtime adapter
310    ///
311    /// This transitions the builder from untyped to typed with concrete runtime `R`.
312    ///
313    /// # Type Safety Note
314    ///
315    /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
316    /// transition because connectors are parameterized by the runtime type:
317    ///
318    /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
319    /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
320    ///
321    /// These types are incompatible and cannot be transferred. However, this is not a bug
322    /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
323    /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
324    ///
325    /// This means the type system **enforces** the correct call order:
326    /// ```rust,ignore
327    /// AimDbBuilder::new()
328    ///     .runtime(runtime)           // ← Must be called first
329    ///     .with_connector(connector)  // ← Now available
330    /// ```
331    ///
332    /// The `records` and `remote_config` are preserved across the transition since they
333    /// are not parameterized by the runtime type.
334    pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
335    where
336        R: aimdb_executor::Spawn + 'static,
337    {
338        AimDbBuilder {
339            records: self.records,
340            runtime: Some(rt),
341            connector_builders: Vec::new(),
342            spawn_fns: Vec::new(),
343            #[cfg(feature = "std")]
344            remote_config: self.remote_config,
345            _phantom: PhantomData,
346        }
347    }
348}
349
350impl<R> AimDbBuilder<R>
351where
352    R: aimdb_executor::Spawn + 'static,
353{
354    /// Registers a connector builder that will be invoked during `build()`
355    ///
356    /// The connector builder will be called after the database is constructed,
357    /// allowing it to collect routes and initialize the connector properly.
358    ///
359    /// # Arguments
360    /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
361    ///
362    /// # Example
363    ///
364    /// ```rust,ignore
365    /// use aimdb_mqtt_connector::MqttConnector;
366    ///
367    /// let db = AimDbBuilder::new()
368    ///     .runtime(runtime)
369    ///     .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
370    ///     .configure::<Temperature>(|reg| {
371    ///         reg.link_from("mqtt://commands/temp")...
372    ///     })
373    ///     .build().await?;
374    /// ```
375    pub fn with_connector(
376        mut self,
377        builder: impl crate::connector::ConnectorBuilder<R> + 'static,
378    ) -> Self {
379        self.connector_builders.push(Box::new(builder));
380        self
381    }
382
383    /// Enables remote access via AimX protocol (std only)
384    ///
385    /// Configures the database to accept remote connections over a Unix domain socket,
386    /// allowing external clients to introspect records, subscribe to updates, and
387    /// (optionally) write data.
388    ///
389    /// The remote access supervisor will be spawned automatically during `build()`.
390    ///
391    /// # Arguments
392    /// * `config` - Remote access configuration (socket path, security policy, etc.)
393    ///
394    /// # Example
395    ///
396    /// ```rust,ignore
397    /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
398    ///
399    /// let config = AimxConfig::new("/tmp/aimdb.sock")
400    ///     .with_security(SecurityPolicy::read_only());
401    ///
402    /// let db = AimDbBuilder::new()
403    ///     .runtime(runtime)
404    ///     .with_remote_access(config)
405    ///     .build()?;
406    /// ```
407    #[cfg(feature = "std")]
408    pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
409        self.remote_config = Some(config);
410        self
411    }
412
413    /// Configures a record type manually with a unique key
414    ///
415    /// The key uniquely identifies this record instance. Multiple records of the same
416    /// type can exist with different keys (e.g., "sensor.temperature.room1" and
417    /// "sensor.temperature.room2").
418    ///
419    /// # Arguments
420    /// * `key` - A unique identifier for this record. Can be a string literal, `StringKey`,
421    ///   or any type implementing `RecordKey` (including user-defined enum keys).
422    /// * `f` - Configuration closure
423    ///
424    /// # Example
425    /// ```rust,ignore
426    /// // Using string literal
427    /// builder.configure::<Temperature>("sensor.temp.room1", |reg| { ... });
428    ///
429    /// // Using compile-time safe enum key
430    /// builder.configure::<Temperature>(SensorKey::TempRoom1, |reg| { ... });
431    /// ```
432    pub fn configure<T>(
433        &mut self,
434        key: impl RecordKey,
435        f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
436    ) -> &mut Self
437    where
438        T: Send + Sync + 'static + Debug + Clone,
439    {
440        // Convert any RecordKey to StringKey for internal storage
441        let record_key: StringKey = StringKey::from_dynamic(key.as_str());
442        let type_id = TypeId::of::<T>();
443
444        // Find existing record with this key, or create new one
445        let record_index = self.records.iter().position(|(k, _, _)| k == &record_key);
446
447        let (rec, is_new_record) = match record_index {
448            Some(idx) => {
449                // Use existing record
450                let (_, existing_type, record) = &mut self.records[idx];
451                assert!(
452                    *existing_type == type_id,
453                    "StringKey '{}' already registered with different type",
454                    record_key.as_str()
455                );
456                (
457                    record
458                        .as_typed_mut::<T, R>()
459                        .expect("type mismatch in record registry"),
460                    false,
461                )
462            }
463            None => {
464                // Create new record
465                self.records
466                    .push((record_key, type_id, Box::new(TypedRecord::<T, R>::new())));
467                let (_, _, record) = self.records.last_mut().unwrap();
468                (
469                    record
470                        .as_typed_mut::<T, R>()
471                        .expect("type mismatch in record registry"),
472                    true,
473                )
474            }
475        };
476
477        let mut reg = RecordRegistrar {
478            rec,
479            connector_builders: &self.connector_builders,
480            #[cfg(feature = "alloc")]
481            record_key: record_key.as_str().to_string(),
482        };
483        f(&mut reg);
484
485        // Only store spawn function for new records to avoid duplicates
486        if is_new_record {
487            let spawn_key = record_key;
488
489            #[allow(clippy::type_complexity)]
490            let spawn_fn: Box<
491                dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send,
492            > = Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>, id: RecordId| {
493                // Use RecordSpawner to spawn tasks for this record type
494                use crate::typed_record::RecordSpawner;
495
496                let typed_record = db.inner().get_typed_record_by_id::<T, R>(id)?;
497                // Get the record key from the database to enable key-based producer/consumer
498                #[cfg(feature = "alloc")]
499                let key = db
500                    .inner()
501                    .key_for(id)
502                    .map(|k| k.as_str().to_string())
503                    .unwrap_or_else(|| alloc::format!("__record_{}", id.index()));
504                #[cfg(not(feature = "alloc"))]
505                let key = "";
506                #[cfg(feature = "alloc")]
507                let key = key.as_str();
508                RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db, key)
509            });
510
511            // Store the spawn function (type-erased in Box<dyn Any>)
512            self.spawn_fns.push((spawn_key, Box::new(spawn_fn)));
513        }
514
515        self
516    }
517
518    /// Registers a self-registering record type
519    ///
520    /// The record type must implement `RecordT<R>`.
521    ///
522    /// Uses the type name as the default key. For custom keys, use `configure()` directly.
523    pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
524    where
525        T: RecordT<R>,
526    {
527        // Default key is the full type name for backward compatibility
528        let key = StringKey::new(core::any::type_name::<T>());
529        self.configure::<T>(key, |reg| T::register(reg, cfg))
530    }
531
532    /// Registers a self-registering record type with a custom key
533    ///
534    /// The record type must implement `RecordT<R>`.
535    pub fn register_record_with_key<T>(&mut self, key: impl RecordKey, cfg: &T::Config) -> &mut Self
536    where
537        T: RecordT<R>,
538    {
539        self.configure::<T>(key, |reg| T::register(reg, cfg))
540    }
541
542    /// Runs the database indefinitely (never returns)
543    ///
544    /// This method builds the database, spawns all producer and consumer tasks, and then
545    /// parks the current task indefinitely. This is the primary way to run AimDB services.
546    ///
547    /// All logic runs in background tasks via producers, consumers, and connectors. The
548    /// application continues until interrupted (e.g., Ctrl+C).
549    ///
550    /// # Returns
551    /// `DbResult<()>` - Ok when database starts successfully, then parks forever
552    ///
553    /// # Example
554    ///
555    /// ```rust,ignore
556    /// #[tokio::main]
557    /// async fn main() -> DbResult<()> {
558    ///     AimDbBuilder::new()
559    ///         .runtime(Arc::new(TokioAdapter::new()?))
560    ///         .configure::<MyData>(|reg| {
561    ///             reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
562    ///                .with_source(my_producer)
563    ///                .with_tap(my_consumer);
564    ///         })
565    ///         .run().await  // Runs forever
566    /// }
567    /// ```
568    pub async fn run(self) -> DbResult<()> {
569        #[cfg(feature = "tracing")]
570        tracing::info!("Building database and spawning background tasks...");
571
572        let _db = self.build().await?;
573
574        #[cfg(feature = "tracing")]
575        tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
576
577        // Park indefinitely - the background tasks will continue running
578        // The database handle is kept alive here to prevent dropping it
579        core::future::pending::<()>().await;
580
581        Ok(())
582    }
583
584    /// Builds the database and returns the handle (async)
585    ///
586    /// Use this when you need programmatic access to the database handle for
587    /// manual subscriptions or production. For typical services, use `.run().await` instead.
588    ///
589    /// **Automatic Task Spawning:** This method spawns all producer services and
590    /// `.tap()` observer tasks that were registered during configuration.
591    ///
592    /// **Connector Setup:** Connectors must be created manually before calling `build()`:
593    ///
594    /// ```rust,ignore
595    /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
596    ///
597    /// // Configure records with connector links
598    /// let builder = AimDbBuilder::new()
599    ///     .runtime(runtime)
600    ///     .configure::<Temp>("temp", |reg| {
601    ///         reg.link_from("mqtt://commands/temp")
602    ///            .with_buffer(BufferCfg::SingleLatest)
603    ///            .with_serialization();
604    ///     });
605    ///
606    /// // Create MQTT connector with router
607    /// let router = RouterBuilder::new()
608    ///     .route("commands/temp", /* deserializer */)
609    ///     .build();
610    /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
611    ///
612    /// // Register connector and build
613    /// let db = builder
614    ///     .with_connector("mqtt", Arc::new(connector))
615    ///     .build().await?;
616    /// ```
617    ///
618    /// # Returns
619    /// `DbResult<AimDb<R>>` - The database instance
620    #[cfg_attr(not(feature = "std"), allow(unused_mut))]
621    pub async fn build(self) -> DbResult<AimDb<R>> {
622        use crate::DbError;
623
624        // Validate all records
625        for (key, _, record) in &self.records {
626            record.validate().map_err(|_msg| {
627                // Suppress unused warning for key in no_std
628                let _ = &key;
629                #[cfg(feature = "std")]
630                {
631                    DbError::RuntimeError {
632                        message: format!("Record '{}' validation failed: {}", key.as_str(), _msg),
633                    }
634                }
635                #[cfg(not(feature = "std"))]
636                {
637                    DbError::RuntimeError { _message: () }
638                }
639            })?;
640        }
641
642        // Ensure runtime is set
643        let runtime = self.runtime.ok_or({
644            #[cfg(feature = "std")]
645            {
646                DbError::RuntimeError {
647                    message: "runtime not set (use .runtime())".into(),
648                }
649            }
650            #[cfg(not(feature = "std"))]
651            {
652                DbError::RuntimeError { _message: () }
653            }
654        })?;
655
656        // Build the new index structures
657        let record_count = self.records.len();
658        let mut storages: Vec<Box<dyn AnyRecord>> = Vec::with_capacity(record_count);
659        let mut by_key: HashMap<StringKey, RecordId> = HashMap::with_capacity(record_count);
660        let mut by_type: HashMap<TypeId, Vec<RecordId>> = HashMap::new();
661        let mut types: Vec<TypeId> = Vec::with_capacity(record_count);
662        let mut keys: Vec<StringKey> = Vec::with_capacity(record_count);
663
664        for (i, (key, type_id, record)) in self.records.into_iter().enumerate() {
665            let id = RecordId::new(i as u32);
666
667            // Check for duplicate keys (should not happen if configure() is used correctly)
668            if by_key.contains_key(&key) {
669                #[cfg(feature = "std")]
670                return Err(DbError::DuplicateRecordKey {
671                    key: key.as_str().to_string(),
672                });
673                #[cfg(not(feature = "std"))]
674                return Err(DbError::DuplicateRecordKey { _key: () });
675            }
676
677            // Build index structures
678            storages.push(record);
679            by_key.insert(key, id);
680            by_type.entry(type_id).or_default().push(id);
681            types.push(type_id);
682            keys.push(key);
683        }
684
685        let inner = Arc::new(AimDbInner {
686            storages,
687            by_key,
688            by_type,
689            types,
690            keys,
691        });
692
693        let db = Arc::new(AimDb {
694            inner: inner.clone(),
695            runtime: runtime.clone(),
696        });
697
698        #[cfg(feature = "tracing")]
699        tracing::info!(
700            "Spawning producer services and tap observers for {} records",
701            self.spawn_fns.len()
702        );
703
704        // Execute spawn functions for each record
705        for (key, spawn_fn_any) in self.spawn_fns {
706            // Resolve key to RecordId
707            let id = inner.resolve(&key).ok_or({
708                #[cfg(feature = "std")]
709                {
710                    DbError::RecordKeyNotFound {
711                        key: key.as_str().to_string(),
712                    }
713                }
714                #[cfg(not(feature = "std"))]
715                {
716                    DbError::RecordKeyNotFound { _key: () }
717                }
718            })?;
719
720            // Downcast from Box<dyn Any> back to the concrete spawn function type
721            type SpawnFnType<R> =
722                Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send>;
723
724            let spawn_fn = spawn_fn_any
725                .downcast::<SpawnFnType<R>>()
726                .expect("spawn function type mismatch");
727
728            // Execute the spawn function
729            (*spawn_fn)(&runtime, &db, id)?;
730        }
731
732        #[cfg(feature = "tracing")]
733        tracing::info!("Automatic spawning complete");
734
735        // Spawn remote access supervisor if configured (std only)
736        #[cfg(feature = "std")]
737        if let Some(remote_cfg) = self.remote_config {
738            #[cfg(feature = "tracing")]
739            tracing::info!(
740                "Spawning remote access supervisor on socket: {}",
741                remote_cfg.socket_path.display()
742            );
743
744            // Apply security policy to mark writable records
745            let writable_keys = remote_cfg.security_policy.writable_records();
746            for key_str in writable_keys {
747                if let Some(id) = inner.resolve_str(&key_str) {
748                    #[cfg(feature = "tracing")]
749                    tracing::debug!("Marking record '{}' as writable", key_str);
750
751                    // Mark the record as writable (type-erased call)
752                    inner.storages[id.index()].set_writable_erased(true);
753                }
754            }
755
756            // Spawn the remote supervisor task
757            crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
758
759            #[cfg(feature = "tracing")]
760            tracing::info!("Remote access supervisor spawned successfully");
761        }
762
763        // Build connectors from builders (after database is fully constructed)
764        // This allows connectors to use collect_inbound_routes() which creates
765        // producers tied to this specific database instance
766        for builder in self.connector_builders {
767            #[cfg(feature = "tracing")]
768            let scheme = {
769                #[cfg(feature = "std")]
770                {
771                    builder.scheme().to_string()
772                }
773                #[cfg(not(feature = "std"))]
774                {
775                    alloc::string::String::from(builder.scheme())
776                }
777            };
778
779            #[cfg(feature = "tracing")]
780            tracing::debug!("Building connector for scheme: {}", scheme);
781
782            // Build the connector (this spawns tasks as a side effect)
783            let _connector = builder.build(&db).await?;
784
785            #[cfg(feature = "tracing")]
786            tracing::info!("Connector built and spawned successfully: {}", scheme);
787        }
788
789        // Unwrap the Arc to return the owned AimDb
790        // This is safe because we just created it and hold the only reference
791        let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
792
793        Ok(db_owned)
794    }
795}
796
797impl Default for AimDbBuilder<NoRuntime> {
798    fn default() -> Self {
799        Self::new()
800    }
801}
802
803/// Producer-consumer database
804///
805/// A database instance with type-safe record registration and cross-record
806/// communication via the Emitter pattern. The type parameter `R` represents
807/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
808///
809/// See `examples/` for usage.
810///
811/// # Examples
812///
813/// ```rust,ignore
814/// use aimdb_tokio_adapter::TokioAdapter;
815///
816/// let runtime = Arc::new(TokioAdapter);
817/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
818///     .runtime(runtime)
819///     .register_record::<Temperature>(&TemperatureConfig)
820///     .build()?;
821/// ```
822pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
823    /// Internal state
824    inner: Arc<AimDbInner>,
825
826    /// Runtime adapter with concrete type
827    runtime: Arc<R>,
828}
829
830impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
831    fn clone(&self) -> Self {
832        Self {
833            inner: self.inner.clone(),
834            runtime: self.runtime.clone(),
835        }
836    }
837}
838
839impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
840    /// Internal accessor for the inner state
841    ///
842    /// Used by adapter crates and internal spawning logic.
843    #[doc(hidden)]
844    pub fn inner(&self) -> &Arc<AimDbInner> {
845        &self.inner
846    }
847
848    /// Builds a database with a closure-based builder pattern
849    pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
850        let mut b = AimDbBuilder::new().runtime(rt);
851        f(&mut b);
852        b.run().await
853    }
854
855    /// Spawns a task using the database's runtime adapter
856    ///
857    /// This method provides direct access to the runtime's spawn capability.
858    ///
859    /// # Arguments
860    /// * `future` - The future to spawn
861    ///
862    /// # Returns
863    /// `DbResult<()>` - Ok if the task was spawned successfully
864    pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
865    where
866        F: core::future::Future<Output = ()> + Send + 'static,
867    {
868        self.runtime.spawn(future).map_err(DbError::from)?;
869        Ok(())
870    }
871
872    /// Produces a value to a specific record by key
873    ///
874    /// Uses O(1) key-based lookup to find the correct record.
875    ///
876    /// # Arguments
877    /// * `key` - The record key (e.g., "sensor.temperature")
878    /// * `value` - The value to produce
879    ///
880    /// # Example
881    ///
882    /// ```rust,ignore
883    /// db.produce::<Temperature>("sensors.indoor", indoor_temp).await?;
884    /// db.produce::<Temperature>("sensors.outdoor", outdoor_temp).await?;
885    /// ```
886    pub async fn produce<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
887    where
888        T: Send + 'static + Debug + Clone,
889    {
890        let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
891        typed_rec.produce(value).await;
892        Ok(())
893    }
894
895    /// Subscribes to a specific record by key
896    ///
897    /// Uses O(1) key-based lookup to find the correct record.
898    ///
899    /// # Arguments
900    /// * `key` - The record key (e.g., "sensor.temperature")
901    ///
902    /// # Example
903    ///
904    /// ```rust,ignore
905    /// let mut reader = db.subscribe::<Temperature>("sensors.indoor")?;
906    /// while let Ok(temp) = reader.recv().await {
907    ///     println!("Indoor: {:.1}°C", temp.celsius);
908    /// }
909    /// ```
910    pub fn subscribe<T>(
911        &self,
912        key: impl AsRef<str>,
913    ) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
914    where
915        T: Send + Sync + 'static + Debug + Clone,
916    {
917        let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
918        typed_rec.subscribe()
919    }
920
921    /// Creates a type-safe producer for a specific record by key
922    ///
923    /// Returns a `Producer<T, R>` bound to a specific record key.
924    ///
925    /// # Arguments
926    /// * `key` - The record key (e.g., "sensor.temperature")
927    ///
928    /// # Example
929    ///
930    /// ```rust,ignore
931    /// let indoor_producer = db.producer::<Temperature>("sensors.indoor");
932    /// let outdoor_producer = db.producer::<Temperature>("sensors.outdoor");
933    ///
934    /// // Each producer writes to its own record
935    /// indoor_producer.produce(indoor_temp).await?;
936    /// outdoor_producer.produce(outdoor_temp).await?;
937    /// ```
938    pub fn producer<T>(
939        &self,
940        key: impl Into<alloc::string::String>,
941    ) -> crate::typed_api::Producer<T, R>
942    where
943        T: Send + 'static + Debug + Clone,
944    {
945        crate::typed_api::Producer::new(Arc::new(self.clone()), key.into())
946    }
947
948    /// Creates a type-safe consumer for a specific record by key
949    ///
950    /// Returns a `Consumer<T, R>` bound to a specific record key.
951    ///
952    /// # Arguments
953    /// * `key` - The record key (e.g., "sensor.temperature")
954    ///
955    /// # Example
956    ///
957    /// ```rust,ignore
958    /// let indoor_consumer = db.consumer::<Temperature>("sensors.indoor");
959    /// let outdoor_consumer = db.consumer::<Temperature>("sensors.outdoor");
960    ///
961    /// // Each consumer reads from its own record
962    /// let mut rx = indoor_consumer.subscribe()?;
963    /// ```
964    pub fn consumer<T>(
965        &self,
966        key: impl Into<alloc::string::String>,
967    ) -> crate::typed_api::Consumer<T, R>
968    where
969        T: Send + Sync + 'static + Debug + Clone,
970    {
971        crate::typed_api::Consumer::new(Arc::new(self.clone()), key.into())
972    }
973
974    /// Resolve a record key to its RecordId
975    ///
976    /// Useful for checking if a record exists before operations.
977    ///
978    /// # Example
979    ///
980    /// ```rust,ignore
981    /// if let Some(id) = db.resolve_key("sensors.temperature") {
982    ///     println!("Record exists with ID: {}", id);
983    /// }
984    /// ```
985    pub fn resolve_key(&self, key: &str) -> Option<crate::record_id::RecordId> {
986        self.inner.resolve_str(key)
987    }
988
989    /// Get all record IDs for a specific type
990    ///
991    /// Returns a slice of RecordIds for all records of type T.
992    /// Useful for introspection when multiple records of the same type exist.
993    ///
994    /// # Example
995    ///
996    /// ```rust,ignore
997    /// let temp_ids = db.records_of_type::<Temperature>();
998    /// println!("Found {} temperature records", temp_ids.len());
999    /// ```
1000    pub fn records_of_type<T: 'static>(&self) -> &[crate::record_id::RecordId] {
1001        self.inner.records_of_type::<T>()
1002    }
1003
1004    /// Returns a reference to the runtime adapter
1005    ///
1006    /// Provides direct access to the concrete runtime type.
1007    pub fn runtime(&self) -> &R {
1008        &self.runtime
1009    }
1010
1011    /// Lists all registered records (std only)
1012    ///
1013    /// Returns metadata for all registered records, useful for remote access introspection.
1014    /// Available only when the `std` feature is enabled.
1015    ///
1016    /// # Example
1017    /// ```rust,ignore
1018    /// let records = db.list_records();
1019    /// for record in records {
1020    ///     println!("Record: {} ({})", record.name, record.type_id);
1021    /// }
1022    /// ```
1023    #[cfg(feature = "std")]
1024    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
1025        self.inner.list_records()
1026    }
1027
1028    /// Try to get record's latest value as JSON by name (std only)
1029    ///
1030    /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
1031    ///
1032    /// # Arguments
1033    /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
1034    ///
1035    /// # Returns
1036    /// `Some(JsonValue)` with current value, or `None` if unavailable
1037    #[cfg(feature = "std")]
1038    pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
1039        self.inner.try_latest_as_json(record_name)
1040    }
1041
1042    /// Sets a record value from JSON (remote access API)
1043    ///
1044    /// Deserializes JSON and produces the value to the record's buffer.
1045    ///
1046    /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
1047    /// records without active producers.
1048    ///
1049    /// # Arguments
1050    /// * `record_name` - Full Rust type name
1051    /// * `json_value` - JSON value to set
1052    ///
1053    /// # Returns
1054    /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
1055    ///
1056    /// # Example (internal use)
1057    /// ```rust,ignore
1058    /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
1059    /// ```
1060    #[cfg(feature = "std")]
1061    pub fn set_record_from_json(
1062        &self,
1063        record_name: &str,
1064        json_value: serde_json::Value,
1065    ) -> DbResult<()> {
1066        self.inner.set_record_from_json(record_name, json_value)
1067    }
1068
1069    /// Subscribe to record updates as JSON stream (std only)
1070    ///
1071    /// Creates a subscription to a record's buffer and forwards updates as JSON
1072    /// to a bounded channel. This is used internally by the remote access protocol
1073    /// for implementing `record.subscribe`.
1074    ///
1075    /// # Architecture
1076    ///
1077    /// Spawns a consumer task that:
1078    /// 1. Subscribes to the record's buffer using the existing buffer API
1079    /// 2. Reads values as they arrive
1080    /// 3. Serializes each value to JSON
1081    /// 4. Sends JSON values to a bounded channel (with backpressure handling)
1082    /// 5. Terminates when either:
1083    ///    - The cancel signal is received (unsubscribe)
1084    ///    - The channel receiver is dropped (client disconnected)
1085    ///
1086    /// # Arguments
1087    /// * `record_key` - Key of the record to subscribe to
1088    /// * `queue_size` - Size of the bounded channel for this subscription
1089    ///
1090    /// # Returns
1091    /// `Ok((receiver, cancel_tx))` where:
1092    /// - `receiver`: Bounded channel receiver for JSON values
1093    /// - `cancel_tx`: One-shot sender to cancel the subscription
1094    ///
1095    /// `Err` if:
1096    /// - Record not found for the given key
1097    /// - Record not configured with `.with_serialization()`
1098    /// - Failed to subscribe to buffer
1099    ///
1100    /// # Example (internal use)
1101    ///
1102    /// ```rust,ignore
1103    /// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
1104    ///
1105    /// // Read events
1106    /// while let Some(json_value) = rx.recv().await {
1107    ///     // Forward to client...
1108    /// }
1109    ///
1110    /// // Cancel subscription
1111    /// let _ = cancel_tx.send(());
1112    /// ```
1113    #[cfg(feature = "std")]
1114    #[allow(unused_variables)] // Variables used only in tracing feature
1115    pub fn subscribe_record_updates(
1116        &self,
1117        record_key: &str,
1118        queue_size: usize,
1119    ) -> DbResult<(
1120        tokio::sync::mpsc::Receiver<serde_json::Value>,
1121        tokio::sync::oneshot::Sender<()>,
1122    )> {
1123        use tokio::sync::{mpsc, oneshot};
1124
1125        // Find the record by key
1126        let id = self
1127            .inner
1128            .resolve_str(record_key)
1129            .ok_or_else(|| DbError::RecordKeyNotFound {
1130                key: record_key.to_string(),
1131            })?;
1132
1133        let record = self
1134            .inner
1135            .storage(id)
1136            .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
1137
1138        // Subscribe to the record's buffer as JSON stream
1139        // This will fail if record not configured with .with_serialization()
1140        let mut json_reader = record.subscribe_json()?;
1141
1142        // Create channels for the subscription
1143        let (value_tx, value_rx) = mpsc::channel(queue_size);
1144        let (cancel_tx, mut cancel_rx) = oneshot::channel();
1145
1146        // Get metadata for logging
1147        let type_id = self.inner.types[id.index()];
1148        let key = self.inner.keys[id.index()];
1149        let record_metadata = record.collect_metadata(type_id, key, id);
1150        let runtime = self.runtime.clone();
1151
1152        // Spawn consumer task that forwards JSON values from buffer to channel
1153        let spawn_result = runtime.spawn(async move {
1154            #[cfg(feature = "tracing")]
1155            tracing::debug!(
1156                "Subscription consumer task started for {}",
1157                record_metadata.name
1158            );
1159
1160            // Main event loop: read from buffer and forward to channel
1161            loop {
1162                tokio::select! {
1163                    // Handle cancellation signal
1164                    _ = &mut cancel_rx => {
1165                        #[cfg(feature = "tracing")]
1166                        tracing::debug!("Subscription cancelled");
1167                        break;
1168                    }
1169                    // Read next JSON value from buffer
1170                    result = json_reader.recv_json() => {
1171                        match result {
1172                            Ok(json_val) => {
1173                                // Send JSON value to subscription channel
1174                                if value_tx.send(json_val).await.is_err() {
1175                                    #[cfg(feature = "tracing")]
1176                                    tracing::debug!("Subscription receiver dropped");
1177                                    break;
1178                                }
1179                            }
1180                            Err(DbError::BufferLagged { lag_count, .. }) => {
1181                                // Consumer fell behind - log warning but continue
1182                                #[cfg(feature = "tracing")]
1183                                tracing::warn!(
1184                                    "Subscription for {} lagged by {} messages",
1185                                    record_metadata.name,
1186                                    lag_count
1187                                );
1188                                // Continue reading - next recv will get latest
1189                            }
1190                            Err(DbError::BufferClosed { .. }) => {
1191                                // Buffer closed (shutdown) - exit gracefully
1192                                #[cfg(feature = "tracing")]
1193                                tracing::debug!("Buffer closed for {}", record_metadata.name);
1194                                break;
1195                            }
1196                            Err(e) => {
1197                                // Other error (shouldn't happen in practice)
1198                                #[cfg(feature = "tracing")]
1199                                tracing::error!(
1200                                    "Subscription error for {}: {:?}",
1201                                    record_metadata.name,
1202                                    e
1203                                );
1204                                break;
1205                            }
1206                        }
1207                    }
1208                }
1209            }
1210
1211            #[cfg(feature = "tracing")]
1212            tracing::debug!("Subscription consumer task terminated");
1213        });
1214
1215        spawn_result.map_err(DbError::from)?;
1216
1217        Ok((value_rx, cancel_tx))
1218    }
1219
1220    /// Collects inbound connector routes for automatic router construction (std only)
1221    ///
1222    /// Iterates all records, filters their inbound_connectors by scheme,
1223    /// and returns routes with producer creation callbacks.
1224    ///
1225    /// # Arguments
1226    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1227    ///
1228    /// # Returns
1229    /// Vector of tuples: (topic, producer_trait, deserializer)
1230    ///
1231    /// # Example
1232    /// ```rust,ignore
1233    /// // In MqttConnector after db.build()
1234    /// let routes = db.collect_inbound_routes("mqtt");
1235    /// let router = RouterBuilder::from_routes(routes).build();
1236    /// connector.set_router(router).await?;
1237    /// ```
1238    #[cfg(feature = "alloc")]
1239    pub fn collect_inbound_routes(
1240        &self,
1241        scheme: &str,
1242    ) -> Vec<(
1243        String,
1244        Box<dyn crate::connector::ProducerTrait>,
1245        crate::connector::DeserializerFn,
1246    )> {
1247        let mut routes = Vec::new();
1248
1249        // Convert self to Arc<dyn Any> for producer factory
1250        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1251
1252        for record in &self.inner.storages {
1253            let inbound_links = record.inbound_connectors();
1254
1255            for link in inbound_links {
1256                // Filter by scheme
1257                if link.url.scheme() != scheme {
1258                    continue;
1259                }
1260
1261                let topic = link.url.resource_id();
1262
1263                // Create producer using the stored factory
1264                if let Some(producer) = link.create_producer(db_any.clone()) {
1265                    routes.push((topic, producer, link.deserializer.clone()));
1266                }
1267            }
1268        }
1269
1270        #[cfg(feature = "tracing")]
1271        if !routes.is_empty() {
1272            tracing::debug!(
1273                "Collected {} inbound routes for scheme '{}'",
1274                routes.len(),
1275                scheme
1276            );
1277        }
1278
1279        routes
1280    }
1281
1282    /// Collects outbound routes for a specific protocol scheme
1283    ///
1284    /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1285    /// filters their outbound_connectors by scheme, and returns routes with
1286    /// consumer creation callbacks.
1287    ///
1288    /// This method is called by connectors during their `build()` phase to
1289    /// collect all configured outbound routes and spawn publisher tasks.
1290    ///
1291    /// # Arguments
1292    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1293    ///
1294    /// # Returns
1295    /// Vector of tuples: (destination, consumer_trait, serializer, config)
1296    ///
1297    /// The config Vec contains protocol-specific options (e.g., qos, retain).
1298    ///
1299    /// # Example
1300    /// ```rust,ignore
1301    /// // In MqttConnector::build()
1302    /// let routes = db.collect_outbound_routes("mqtt");
1303    /// for (topic, consumer, serializer, config) in routes {
1304    ///     connector.spawn_publisher(topic, consumer, serializer, config)?;
1305    /// }
1306    /// ```
1307    #[cfg(feature = "alloc")]
1308    pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1309        let mut routes = Vec::new();
1310
1311        // Convert self to Arc<dyn Any> for consumer factory
1312        // This is necessary because the factory takes Arc<dyn Any> to avoid
1313        // needing to know the runtime type R at the factory definition site
1314        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1315
1316        for record in &self.inner.storages {
1317            let outbound_links = record.outbound_connectors();
1318
1319            for link in outbound_links {
1320                // Filter by scheme
1321                if link.url.scheme() != scheme {
1322                    continue;
1323                }
1324
1325                let destination = link.url.resource_id();
1326
1327                // Skip links without serializer
1328                let Some(serializer) = link.serializer.clone() else {
1329                    #[cfg(feature = "tracing")]
1330                    tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1331                    continue;
1332                };
1333
1334                // Create consumer using the stored factory
1335                if let Some(consumer) = link.create_consumer(db_any.clone()) {
1336                    routes.push((destination, consumer, serializer, link.config.clone()));
1337                }
1338            }
1339        }
1340
1341        #[cfg(feature = "tracing")]
1342        if !routes.is_empty() {
1343            tracing::debug!(
1344                "Collected {} outbound routes for scheme '{}'",
1345                routes.len(),
1346                scheme
1347            );
1348        }
1349
1350        routes
1351    }
1352}