aimdb_core/
builder.rs

1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::collections::BTreeMap;
13
14#[cfg(not(feature = "std"))]
15use alloc::{boxed::Box, sync::Arc, vec::Vec};
16
17#[cfg(all(not(feature = "std"), feature = "alloc"))]
18use alloc::string::String;
19
20#[cfg(feature = "std")]
21use std::{boxed::Box, sync::Arc, vec::Vec};
22
23use crate::typed_api::{RecordRegistrar, RecordT};
24use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
25use crate::{DbError, DbResult};
26
27/// Type alias for outbound route tuples returned by `collect_outbound_routes`
28///
29/// Each tuple contains:
30/// - `String` - Topic/key from the URL path
31/// - `Box<dyn ConsumerTrait>` - Callback to create a consumer for this record
32/// - `SerializerFn` - User-provided serializer for the record type
33/// - `Vec<(String, String)>` - Configuration options from the URL query
34#[cfg(feature = "alloc")]
35type OutboundRoute = (
36    String,
37    Box<dyn crate::connector::ConsumerTrait>,
38    crate::connector::SerializerFn,
39    Vec<(String, String)>,
40);
41
42/// Marker type for untyped builder (before runtime is set)
43pub struct NoRuntime;
44
45/// Internal database state
46///
47/// Holds the registry of typed records, indexed by `TypeId`.
48pub struct AimDbInner {
49    /// Map from TypeId to type-erased records (SPMC buffers for internal data flow)
50    pub records: BTreeMap<TypeId, Box<dyn AnyRecord>>,
51}
52
53impl AimDbInner {
54    /// Helper to get a typed record from the registry
55    ///
56    /// This encapsulates the common pattern of:
57    /// 1. Getting TypeId for type T
58    /// 2. Looking up the record in the map
59    /// 3. Downcasting to the typed record
60    pub fn get_typed_record<T, R>(&self) -> DbResult<&TypedRecord<T, R>>
61    where
62        T: Send + 'static + Debug + Clone,
63        R: aimdb_executor::Spawn + 'static,
64    {
65        use crate::typed_record::AnyRecordExt;
66
67        let type_id = TypeId::of::<T>();
68
69        #[cfg(feature = "std")]
70        let record = self.records.get(&type_id).ok_or(DbError::RecordNotFound {
71            record_name: core::any::type_name::<T>().to_string(),
72        })?;
73
74        #[cfg(not(feature = "std"))]
75        let record = self
76            .records
77            .get(&type_id)
78            .ok_or(DbError::RecordNotFound { _record_name: () })?;
79
80        #[cfg(feature = "std")]
81        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
82            operation: "get_typed_record".to_string(),
83            reason: "type mismatch during downcast".to_string(),
84        })?;
85
86        #[cfg(not(feature = "std"))]
87        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
88            _operation: (),
89            _reason: (),
90        })?;
91
92        Ok(typed_record)
93    }
94
95    /// Collects metadata for all registered records (std only)
96    ///
97    /// Returns a vector of `RecordMetadata` for remote access introspection.
98    /// Available only when the `std` feature is enabled.
99    #[cfg(feature = "std")]
100    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
101        self.records
102            .iter()
103            .map(|(type_id, record)| record.collect_metadata(*type_id))
104            .collect()
105    }
106
107    /// Try to get record's latest value as JSON by record name (std only)
108    ///
109    /// Searches for a record with the given name and returns its current value
110    /// serialized to JSON. Returns `None` if:
111    /// - Record not found
112    /// - Record not configured with `.with_serialization()`
113    /// - No value available in the atomic snapshot
114    ///
115    /// # Arguments
116    /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
117    ///
118    /// # Returns
119    /// `Some(JsonValue)` with the current record value, or `None`
120    #[cfg(feature = "std")]
121    pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
122        for (type_id, record) in &self.records {
123            let metadata = record.collect_metadata(*type_id);
124            if metadata.name == record_name {
125                return record.latest_json();
126            }
127        }
128        None
129    }
130
131    /// Sets a record value from JSON (remote access API)
132    ///
133    /// Deserializes the JSON value and writes it to the record's buffer.
134    ///
135    /// **SAFETY:** Enforces the "No Producer Override" rule:
136    /// - Only works for records with `producer_count == 0`
137    /// - Returns error if the record has active producers
138    ///
139    /// # Arguments
140    /// * `record_name` - The full Rust type name (e.g., "server::AppConfig")
141    /// * `json_value` - JSON representation of the value
142    ///
143    /// # Returns
144    /// - `Ok(())` - Successfully set the value
145    /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
146    ///
147    /// # Errors
148    /// - `RecordNotFound` - Record with given name doesn't exist
149    /// - `PermissionDenied` - Record has active producers (safety check)
150    /// - `RuntimeError` - Record not configured with `.with_serialization()`
151    /// - `JsonWithContext` - JSON deserialization failed (schema mismatch)
152    ///
153    /// # Example (internal use - called by remote access protocol)
154    /// ```rust,ignore
155    /// let json_val = serde_json::json!({"log_level": "debug", "version": "1.0"});
156    /// db.set_record_from_json("server::AppConfig", json_val)?;
157    /// ```
158    #[cfg(feature = "std")]
159    pub fn set_record_from_json(
160        &self,
161        record_name: &str,
162        json_value: serde_json::Value,
163    ) -> DbResult<()> {
164        // Find the record by name
165        for (type_id, record) in &self.records {
166            let metadata = record.collect_metadata(*type_id);
167            if metadata.name == record_name {
168                // Delegate to the type-erased set_from_json method
169                // which will enforce the "no producer override" rule
170                return record.set_from_json(json_value);
171            }
172        }
173
174        // Record not found
175        Err(DbError::RecordNotFound {
176            record_name: record_name.to_string(),
177        })
178    }
179}
180
181/// Database builder for producer-consumer pattern
182///
183/// Provides a fluent API for constructing databases with type-safe record registration.
184/// Use `.runtime()` to set the runtime and transition to a typed builder.
185pub struct AimDbBuilder<R = NoRuntime> {
186    /// Registry of typed records
187    records: BTreeMap<TypeId, Box<dyn AnyRecord>>,
188
189    /// Runtime adapter
190    runtime: Option<Arc<R>>,
191
192    /// Connector builders that will be invoked during build()
193    connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
194
195    /// Spawn functions indexed by TypeId
196    spawn_fns: BTreeMap<TypeId, Box<dyn core::any::Any + Send>>,
197
198    /// Remote access configuration (std only)
199    #[cfg(feature = "std")]
200    remote_config: Option<crate::remote::AimxConfig>,
201
202    /// PhantomData to track the runtime type parameter
203    _phantom: PhantomData<R>,
204}
205
206impl AimDbBuilder<NoRuntime> {
207    /// Creates a new database builder without a runtime
208    ///
209    /// Call `.runtime()` to set the runtime adapter.
210    pub fn new() -> Self {
211        Self {
212            records: BTreeMap::new(),
213            runtime: None,
214            connector_builders: Vec::new(),
215            spawn_fns: BTreeMap::new(),
216            #[cfg(feature = "std")]
217            remote_config: None,
218            _phantom: PhantomData,
219        }
220    }
221
222    /// Sets the runtime adapter
223    ///
224    /// This transitions the builder from untyped to typed with concrete runtime `R`.
225    ///
226    /// # Type Safety Note
227    ///
228    /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
229    /// transition because connectors are parameterized by the runtime type:
230    ///
231    /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
232    /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
233    ///
234    /// These types are incompatible and cannot be transferred. However, this is not a bug
235    /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
236    /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
237    ///
238    /// This means the type system **enforces** the correct call order:
239    /// ```rust,ignore
240    /// AimDbBuilder::new()
241    ///     .runtime(runtime)           // ← Must be called first
242    ///     .with_connector(connector)  // ← Now available
243    /// ```
244    ///
245    /// The `records` and `remote_config` are preserved across the transition since they
246    /// are not parameterized by the runtime type.
247    pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
248    where
249        R: aimdb_executor::Spawn + 'static,
250    {
251        AimDbBuilder {
252            records: self.records,
253            runtime: Some(rt),
254            connector_builders: Vec::new(),
255            spawn_fns: BTreeMap::new(),
256            #[cfg(feature = "std")]
257            remote_config: self.remote_config,
258            _phantom: PhantomData,
259        }
260    }
261}
262
263impl<R> AimDbBuilder<R>
264where
265    R: aimdb_executor::Spawn + 'static,
266{
267    /// Registers a connector builder that will be invoked during `build()`
268    ///
269    /// The connector builder will be called after the database is constructed,
270    /// allowing it to collect routes and initialize the connector properly.
271    ///
272    /// # Arguments
273    /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
274    ///
275    /// # Example
276    ///
277    /// ```rust,ignore
278    /// use aimdb_mqtt_connector::MqttConnector;
279    ///
280    /// let db = AimDbBuilder::new()
281    ///     .runtime(runtime)
282    ///     .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
283    ///     .configure::<Temperature>(|reg| {
284    ///         reg.link_from("mqtt://commands/temp")...
285    ///     })
286    ///     .build().await?;
287    /// ```
288    pub fn with_connector(
289        mut self,
290        builder: impl crate::connector::ConnectorBuilder<R> + 'static,
291    ) -> Self {
292        self.connector_builders.push(Box::new(builder));
293        self
294    }
295
296    /// Enables remote access via AimX protocol (std only)
297    ///
298    /// Configures the database to accept remote connections over a Unix domain socket,
299    /// allowing external clients to introspect records, subscribe to updates, and
300    /// (optionally) write data.
301    ///
302    /// The remote access supervisor will be spawned automatically during `build()`.
303    ///
304    /// # Arguments
305    /// * `config` - Remote access configuration (socket path, security policy, etc.)
306    ///
307    /// # Example
308    ///
309    /// ```rust,ignore
310    /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
311    ///
312    /// let config = AimxConfig::new("/tmp/aimdb.sock")
313    ///     .with_security(SecurityPolicy::read_only());
314    ///
315    /// let db = AimDbBuilder::new()
316    ///     .runtime(runtime)
317    ///     .with_remote_access(config)
318    ///     .build()?;
319    /// ```
320    #[cfg(feature = "std")]
321    pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
322        self.remote_config = Some(config);
323        self
324    }
325
326    /// Configures a record type manually
327    ///
328    /// Low-level method for advanced use cases. Most users should use `register_record` instead.
329    pub fn configure<T>(
330        &mut self,
331        f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
332    ) -> &mut Self
333    where
334        T: Send + Sync + 'static + Debug + Clone,
335    {
336        let entry = self
337            .records
338            .entry(TypeId::of::<T>())
339            .or_insert_with(|| Box::new(TypedRecord::<T, R>::new()));
340
341        let rec = entry
342            .as_typed_mut::<T, R>()
343            .expect("type mismatch in record registry");
344
345        let mut reg = RecordRegistrar {
346            rec,
347            connector_builders: &self.connector_builders,
348        };
349        f(&mut reg);
350
351        // Store a spawn function that captures the concrete type T and connectors
352        let type_id = TypeId::of::<T>();
353
354        #[allow(clippy::type_complexity)]
355        let spawn_fn: Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>) -> DbResult<()> + Send> =
356            Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>| {
357                // Use RecordSpawner to spawn tasks for this record type
358                use crate::typed_record::RecordSpawner;
359
360                let typed_record = db.inner().get_typed_record::<T, R>()?;
361                RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db)
362            });
363
364        // Store the spawn function (type-erased in Box<dyn Any>)
365        self.spawn_fns.insert(type_id, Box::new(spawn_fn));
366
367        self
368    }
369
370    /// Registers a self-registering record type
371    ///
372    /// The record type must implement `RecordT<R>`.
373    pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
374    where
375        T: RecordT<R>,
376    {
377        self.configure::<T>(|reg| T::register(reg, cfg))
378    }
379
380    /// Runs the database indefinitely (never returns)
381    ///
382    /// This method builds the database, spawns all producer and consumer tasks, and then
383    /// parks the current task indefinitely. This is the primary way to run AimDB services.
384    ///
385    /// All logic runs in background tasks via producers, consumers, and connectors. The
386    /// application continues until interrupted (e.g., Ctrl+C).
387    ///
388    /// # Returns
389    /// `DbResult<()>` - Ok when database starts successfully, then parks forever
390    ///
391    /// # Example
392    ///
393    /// ```rust,ignore
394    /// #[tokio::main]
395    /// async fn main() -> DbResult<()> {
396    ///     AimDbBuilder::new()
397    ///         .runtime(Arc::new(TokioAdapter::new()?))
398    ///         .configure::<MyData>(|reg| {
399    ///             reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
400    ///                .with_source(my_producer)
401    ///                .with_tap(my_consumer);
402    ///         })
403    ///         .run().await  // Runs forever
404    /// }
405    /// ```
406    pub async fn run(self) -> DbResult<()> {
407        #[cfg(feature = "tracing")]
408        tracing::info!("Building database and spawning background tasks...");
409
410        let _db = self.build().await?;
411
412        #[cfg(feature = "tracing")]
413        tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
414
415        // Park indefinitely - the background tasks will continue running
416        // The database handle is kept alive here to prevent dropping it
417        core::future::pending::<()>().await;
418
419        Ok(())
420    }
421
422    /// Builds the database and returns the handle (async)
423    ///
424    /// Use this when you need programmatic access to the database handle for
425    /// manual subscriptions or production. For typical services, use `.run().await` instead.
426    ///
427    /// **Automatic Task Spawning:** This method spawns all producer services and
428    /// `.tap()` observer tasks that were registered during configuration.
429    ///
430    /// **Connector Setup:** Connectors must be created manually before calling `build()`:
431    ///
432    /// ```rust,ignore
433    /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
434    ///
435    /// // Configure records with connector links
436    /// let builder = AimDbBuilder::new()
437    ///     .runtime(runtime)
438    ///     .configure::<Temp>(|reg| {
439    ///         reg.link_from("mqtt://commands/temp")
440    ///            .with_buffer(BufferCfg::SingleLatest)
441    ///            .with_serialization();
442    ///     });
443    ///
444    /// // Create MQTT connector with router
445    /// let router = RouterBuilder::new()
446    ///     .route("commands/temp", /* deserializer */)
447    ///     .build();
448    /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
449    ///
450    /// // Register connector and build
451    /// let db = builder
452    ///     .with_connector("mqtt", Arc::new(connector))
453    ///     .build().await?;
454    /// ```
455    ///
456    /// # Returns
457    /// `DbResult<AimDb<R>>` - The database instance
458    #[cfg_attr(not(feature = "std"), allow(unused_mut))]
459    pub async fn build(self) -> DbResult<AimDb<R>> {
460        use crate::DbError;
461
462        // Validate all records
463        for record in self.records.values() {
464            record.validate().map_err(|_msg| {
465                #[cfg(feature = "std")]
466                {
467                    DbError::RuntimeError {
468                        message: format!("Record validation failed: {}", _msg),
469                    }
470                }
471                #[cfg(not(feature = "std"))]
472                {
473                    DbError::RuntimeError { _message: () }
474                }
475            })?;
476        }
477
478        // Ensure runtime is set
479        let runtime = self.runtime.ok_or({
480            #[cfg(feature = "std")]
481            {
482                DbError::RuntimeError {
483                    message: "runtime not set (use .runtime())".into(),
484                }
485            }
486            #[cfg(not(feature = "std"))]
487            {
488                DbError::RuntimeError { _message: () }
489            }
490        })?;
491
492        let inner = Arc::new(AimDbInner {
493            records: self.records,
494        });
495
496        let db = Arc::new(AimDb {
497            inner: inner.clone(),
498            runtime: runtime.clone(),
499        });
500
501        #[cfg(feature = "tracing")]
502        tracing::info!(
503            "Spawning producer services and tap observers for {} record types",
504            self.spawn_fns.len()
505        );
506
507        // Execute spawn functions for each record type
508        for (_type_id, spawn_fn_any) in self.spawn_fns {
509            // Downcast from Box<dyn Any> back to the concrete spawn function type
510            type SpawnFnType<R> = Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>) -> DbResult<()> + Send>;
511
512            let spawn_fn = spawn_fn_any
513                .downcast::<SpawnFnType<R>>()
514                .expect("spawn function type mismatch");
515
516            // Execute the spawn function
517            (*spawn_fn)(&runtime, &db)?;
518        }
519
520        #[cfg(feature = "tracing")]
521        tracing::info!("Automatic spawning complete");
522
523        // Spawn remote access supervisor if configured (std only)
524        #[cfg(feature = "std")]
525        if let Some(remote_cfg) = self.remote_config {
526            #[cfg(feature = "tracing")]
527            tracing::info!(
528                "Spawning remote access supervisor on socket: {}",
529                remote_cfg.socket_path.display()
530            );
531
532            // Apply security policy to mark writable records
533            let writable_type_ids = remote_cfg.security_policy.writable_records();
534            for (type_id, record) in inner.records.iter() {
535                if writable_type_ids.contains(type_id) {
536                    #[cfg(feature = "tracing")]
537                    tracing::debug!("Marking record {:?} as writable", type_id);
538
539                    // Mark the record as writable (type-erased call)
540                    record.set_writable_erased(true);
541                }
542            }
543
544            // Spawn the remote supervisor task
545            // This will be implemented in Task 6
546            crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
547
548            #[cfg(feature = "tracing")]
549            tracing::info!("Remote access supervisor spawned successfully");
550        }
551
552        // Build connectors from builders (after database is fully constructed)
553        // This allows connectors to use collect_inbound_routes() which creates
554        // producers tied to this specific database instance
555        let mut built_connectors = BTreeMap::new();
556        for builder in self.connector_builders {
557            #[cfg(feature = "std")]
558            let scheme = builder.scheme().to_string();
559
560            #[cfg(not(feature = "std"))]
561            let scheme = alloc::string::String::from(builder.scheme());
562
563            #[cfg(feature = "tracing")]
564            tracing::debug!("Building connector for scheme: {}", scheme);
565
566            let connector = builder.build(&db).await?;
567            built_connectors.insert(scheme.clone(), connector);
568
569            #[cfg(feature = "tracing")]
570            tracing::info!("Connector built and spawned successfully: {}", scheme);
571        }
572
573        // Unwrap the Arc to return the owned AimDb
574        // This is safe because we just created it and hold the only reference
575        let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
576
577        Ok(db_owned)
578    }
579}
580
581impl Default for AimDbBuilder<NoRuntime> {
582    fn default() -> Self {
583        Self::new()
584    }
585}
586
587/// Producer-consumer database
588///
589/// A database instance with type-safe record registration and cross-record
590/// communication via the Emitter pattern. The type parameter `R` represents
591/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
592///
593/// See `examples/` for usage.
594///
595/// # Examples
596///
597/// ```rust,ignore
598/// use aimdb_tokio_adapter::TokioAdapter;
599///
600/// let runtime = Arc::new(TokioAdapter);
601/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
602///     .runtime(runtime)
603///     .register_record::<Temperature>(&TemperatureConfig)
604///     .build()?;
605/// ```
606pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
607    /// Internal state
608    inner: Arc<AimDbInner>,
609
610    /// Runtime adapter with concrete type
611    runtime: Arc<R>,
612}
613
614impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
615    fn clone(&self) -> Self {
616        Self {
617            inner: self.inner.clone(),
618            runtime: self.runtime.clone(),
619        }
620    }
621}
622
623impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
624    /// Internal accessor for the inner state
625    ///
626    /// Used by adapter crates and internal spawning logic.
627    #[doc(hidden)]
628    pub fn inner(&self) -> &Arc<AimDbInner> {
629        &self.inner
630    }
631
632    /// Builds a database with a closure-based builder pattern
633    pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
634        let mut b = AimDbBuilder::new().runtime(rt);
635        f(&mut b);
636        b.run().await
637    }
638
639    /// Spawns a task using the database's runtime adapter
640    ///
641    /// This method provides direct access to the runtime's spawn capability.
642    ///
643    /// # Arguments
644    /// * `future` - The future to spawn
645    ///
646    /// # Returns
647    /// `DbResult<()>` - Ok if the task was spawned successfully
648    pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
649    where
650        F: core::future::Future<Output = ()> + Send + 'static,
651    {
652        self.runtime.spawn(future).map_err(DbError::from)?;
653        Ok(())
654    }
655
656    /// Produces a value for a record type
657    ///
658    /// Writes the value to the record's buffer and triggers all consumers.
659    pub async fn produce<T>(&self, value: T) -> DbResult<()>
660    where
661        T: Send + 'static + Debug + Clone,
662    {
663        // Get the typed record using the helper
664        let typed_rec = self.inner.get_typed_record::<T, R>()?;
665
666        // Produce the value directly to the buffer
667        typed_rec.produce(value).await;
668        Ok(())
669    }
670
671    /// Subscribes to a record type's buffer
672    ///
673    /// Creates a subscription to the configured buffer for the given record type.
674    /// Returns a boxed reader for receiving values asynchronously.
675    ///
676    /// # Example
677    ///
678    /// ```rust,ignore
679    /// let mut reader = db.subscribe::<Temperature>()?;
680    ///
681    /// loop {
682    ///     match reader.recv().await {
683    ///         Ok(temp) => println!("Temperature: {:.1}°C", temp.celsius),
684    ///         Err(_) => break,
685    ///     }
686    /// }
687    /// ```
688    pub fn subscribe<T>(&self) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
689    where
690        T: Send + Sync + 'static + Debug + Clone,
691    {
692        // Get the typed record using the helper
693        let typed_rec = self.inner.get_typed_record::<T, R>()?;
694
695        // Subscribe to the buffer
696        typed_rec.subscribe()
697    }
698
699    /// Creates a type-safe producer for a specific record type
700    ///
701    /// Returns a `Producer<T, R>` that can only produce values of type `T`.
702    /// This is the recommended way to pass database access to producer services,
703    /// following the principle of least privilege.
704    ///
705    /// # Example
706    ///
707    /// ```rust,ignore
708    /// let db = builder.build()?;
709    /// let temp_producer = db.producer::<Temperature>();
710    ///
711    /// // Pass to service - it can only produce Temperature values
712    /// runtime.spawn(temperature_service(ctx, temp_producer)).unwrap();
713    /// ```
714    pub fn producer<T>(&self) -> crate::typed_api::Producer<T, R>
715    where
716        T: Send + 'static + Debug + Clone,
717    {
718        crate::typed_api::Producer::new(Arc::new(self.clone()))
719    }
720
721    /// Creates a type-safe consumer for a specific record type
722    ///
723    /// Returns a `Consumer<T, R>` that can only subscribe to values of type `T`.
724    /// This is the recommended way to pass database access to consumer services,
725    /// following the principle of least privilege.
726    ///
727    /// # Example
728    ///
729    /// ```rust,ignore
730    /// let db = builder.build()?;
731    /// let temp_consumer = db.consumer::<Temperature>();
732    ///
733    /// // Pass to service - it can only consume Temperature values
734    /// runtime.spawn(temperature_monitor(ctx, temp_consumer)).unwrap();
735    /// ```
736    pub fn consumer<T>(&self) -> crate::typed_api::Consumer<T, R>
737    where
738        T: Send + Sync + 'static + Debug + Clone,
739    {
740        crate::typed_api::Consumer::new(Arc::new(self.clone()))
741    }
742
743    /// Returns a reference to the runtime adapter
744    ///
745    /// Provides direct access to the concrete runtime type.
746    pub fn runtime(&self) -> &R {
747        &self.runtime
748    }
749
750    /// Lists all registered records (std only)
751    ///
752    /// Returns metadata for all registered records, useful for remote access introspection.
753    /// Available only when the `std` feature is enabled.
754    ///
755    /// # Example
756    /// ```rust,ignore
757    /// let records = db.list_records();
758    /// for record in records {
759    ///     println!("Record: {} ({})", record.name, record.type_id);
760    /// }
761    /// ```
762    #[cfg(feature = "std")]
763    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
764        self.inner.list_records()
765    }
766
767    /// Try to get record's latest value as JSON by name (std only)
768    ///
769    /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
770    ///
771    /// # Arguments
772    /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
773    ///
774    /// # Returns
775    /// `Some(JsonValue)` with current value, or `None` if unavailable
776    #[cfg(feature = "std")]
777    pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
778        self.inner.try_latest_as_json(record_name)
779    }
780
781    /// Sets a record value from JSON (remote access API)
782    ///
783    /// Deserializes JSON and produces the value to the record's buffer.
784    ///
785    /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
786    /// records without active producers.
787    ///
788    /// # Arguments
789    /// * `record_name` - Full Rust type name
790    /// * `json_value` - JSON value to set
791    ///
792    /// # Returns
793    /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
794    ///
795    /// # Example (internal use)
796    /// ```rust,ignore
797    /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
798    /// ```
799    #[cfg(feature = "std")]
800    pub fn set_record_from_json(
801        &self,
802        record_name: &str,
803        json_value: serde_json::Value,
804    ) -> DbResult<()> {
805        self.inner.set_record_from_json(record_name, json_value)
806    }
807
808    /// Subscribe to record updates as JSON stream (std only)
809    ///
810    /// Creates a subscription to a record's buffer and forwards updates as JSON
811    /// to a bounded channel. This is used internally by the remote access protocol
812    /// for implementing `record.subscribe`.
813    ///
814    /// # Architecture
815    ///
816    /// Spawns a consumer task that:
817    /// 1. Subscribes to the record's buffer using the existing buffer API
818    /// 2. Reads values as they arrive
819    /// 3. Serializes each value to JSON
820    /// 4. Sends JSON values to a bounded channel (with backpressure handling)
821    /// 5. Terminates when either:
822    ///    - The cancel signal is received (unsubscribe)
823    ///    - The channel receiver is dropped (client disconnected)
824    ///
825    /// # Arguments
826    /// * `type_id` - TypeId of the record to subscribe to
827    /// * `queue_size` - Size of the bounded channel for this subscription
828    ///
829    /// # Returns
830    /// `Ok((receiver, cancel_tx))` where:
831    /// - `receiver`: Bounded channel receiver for JSON values
832    /// - `cancel_tx`: One-shot sender to cancel the subscription
833    ///
834    /// `Err` if:
835    /// - Record not found for the given TypeId
836    /// - Record not configured with `.with_serialization()`
837    /// - Failed to subscribe to buffer
838    ///
839    /// # Example (internal use)
840    ///
841    /// ```rust,ignore
842    /// let type_id = TypeId::of::<Temperature>();
843    /// let (mut rx, cancel_tx) = db.subscribe_record_updates(type_id, 100)?;
844    ///
845    /// // Read events
846    /// while let Some(json_value) = rx.recv().await {
847    ///     // Forward to client...
848    /// }
849    ///
850    /// // Cancel subscription
851    /// let _ = cancel_tx.send(());
852    /// ```
853    #[cfg(feature = "std")]
854    #[allow(unused_variables)] // Variables used only in tracing feature
855    pub fn subscribe_record_updates(
856        &self,
857        type_id: TypeId,
858        queue_size: usize,
859    ) -> DbResult<(
860        tokio::sync::mpsc::Receiver<serde_json::Value>,
861        tokio::sync::oneshot::Sender<()>,
862    )> {
863        use tokio::sync::{mpsc, oneshot};
864
865        // Find the record by TypeId
866        let record = self
867            .inner
868            .records
869            .get(&type_id)
870            .ok_or(DbError::RecordNotFound {
871                record_name: format!("TypeId({:?})", type_id),
872            })?;
873
874        // Subscribe to the record's buffer as JSON stream
875        // This will fail if record not configured with .with_serialization()
876        let mut json_reader = record.subscribe_json()?;
877
878        // Create channels for the subscription
879        let (value_tx, value_rx) = mpsc::channel(queue_size);
880        let (cancel_tx, mut cancel_rx) = oneshot::channel();
881
882        // Get metadata for logging
883        let record_metadata = record.collect_metadata(type_id);
884        let runtime = self.runtime.clone();
885
886        // Spawn consumer task that forwards JSON values from buffer to channel
887        let spawn_result = runtime.spawn(async move {
888            #[cfg(feature = "tracing")]
889            tracing::debug!(
890                "Subscription consumer task started for {}",
891                record_metadata.name
892            );
893
894            // Main event loop: read from buffer and forward to channel
895            loop {
896                tokio::select! {
897                    // Handle cancellation signal
898                    _ = &mut cancel_rx => {
899                        #[cfg(feature = "tracing")]
900                        tracing::debug!("Subscription cancelled");
901                        break;
902                    }
903                    // Read next JSON value from buffer
904                    result = json_reader.recv_json() => {
905                        match result {
906                            Ok(json_val) => {
907                                // Send JSON value to subscription channel
908                                if value_tx.send(json_val).await.is_err() {
909                                    #[cfg(feature = "tracing")]
910                                    tracing::debug!("Subscription receiver dropped");
911                                    break;
912                                }
913                            }
914                            Err(DbError::BufferLagged { lag_count, .. }) => {
915                                // Consumer fell behind - log warning but continue
916                                #[cfg(feature = "tracing")]
917                                tracing::warn!(
918                                    "Subscription for {} lagged by {} messages",
919                                    record_metadata.name,
920                                    lag_count
921                                );
922                                // Continue reading - next recv will get latest
923                            }
924                            Err(DbError::BufferClosed { .. }) => {
925                                // Buffer closed (shutdown) - exit gracefully
926                                #[cfg(feature = "tracing")]
927                                tracing::debug!("Buffer closed for {}", record_metadata.name);
928                                break;
929                            }
930                            Err(e) => {
931                                // Other error (shouldn't happen in practice)
932                                #[cfg(feature = "tracing")]
933                                tracing::error!(
934                                    "Subscription error for {}: {:?}",
935                                    record_metadata.name,
936                                    e
937                                );
938                                break;
939                            }
940                        }
941                    }
942                }
943            }
944
945            #[cfg(feature = "tracing")]
946            tracing::debug!("Subscription consumer task terminated");
947        });
948
949        spawn_result.map_err(DbError::from)?;
950
951        Ok((value_rx, cancel_tx))
952    }
953
954    /// Collects inbound connector routes for automatic router construction (std only)
955    ///
956    /// Iterates all records, filters their inbound_connectors by scheme,
957    /// and returns routes with producer creation callbacks.
958    ///
959    /// # Arguments
960    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
961    ///
962    /// # Returns
963    /// Vector of tuples: (topic, producer_trait, deserializer)
964    ///
965    /// # Example
966    /// ```rust,ignore
967    /// // In MqttConnector after db.build()
968    /// let routes = db.collect_inbound_routes("mqtt");
969    /// let router = RouterBuilder::from_routes(routes).build();
970    /// connector.set_router(router).await?;
971    /// ```
972    #[cfg(feature = "alloc")]
973    pub fn collect_inbound_routes(
974        &self,
975        scheme: &str,
976    ) -> Vec<(
977        String,
978        Box<dyn crate::connector::ProducerTrait>,
979        crate::connector::DeserializerFn,
980    )> {
981        let mut routes = Vec::new();
982
983        // Convert self to Arc<dyn Any> for producer factory
984        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
985
986        for record in self.inner.records.values() {
987            let inbound_links = record.inbound_connectors();
988
989            for link in inbound_links {
990                // Filter by scheme
991                if link.url.scheme() != scheme {
992                    continue;
993                }
994
995                let topic = link.url.resource_id();
996
997                // Create producer using the stored factory
998                if let Some(producer) = link.create_producer(db_any.clone()) {
999                    routes.push((topic, producer, link.deserializer.clone()));
1000                }
1001            }
1002        }
1003
1004        #[cfg(feature = "tracing")]
1005        if !routes.is_empty() {
1006            tracing::debug!(
1007                "Collected {} inbound routes for scheme '{}'",
1008                routes.len(),
1009                scheme
1010            );
1011        }
1012
1013        routes
1014    }
1015
1016    /// Collects outbound routes for a specific protocol scheme
1017    ///
1018    /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1019    /// filters their outbound_connectors by scheme, and returns routes with
1020    /// consumer creation callbacks.
1021    ///
1022    /// This method is called by connectors during their `build()` phase to
1023    /// collect all configured outbound routes and spawn publisher tasks.
1024    ///
1025    /// # Arguments
1026    /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1027    ///
1028    /// # Returns
1029    /// Vector of tuples: (destination, consumer_trait, serializer, config)
1030    ///
1031    /// The config Vec contains protocol-specific options (e.g., qos, retain).
1032    ///
1033    /// # Example
1034    /// ```rust,ignore
1035    /// // In MqttConnector::build()
1036    /// let routes = db.collect_outbound_routes("mqtt");
1037    /// for (topic, consumer, serializer, config) in routes {
1038    ///     connector.spawn_publisher(topic, consumer, serializer, config)?;
1039    /// }
1040    /// ```
1041    #[cfg(feature = "alloc")]
1042    pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1043        let mut routes = Vec::new();
1044
1045        // Convert self to Arc<dyn Any> for consumer factory
1046        // This is necessary because the factory takes Arc<dyn Any> to avoid
1047        // needing to know the runtime type R at the factory definition site
1048        let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1049
1050        for record in self.inner.records.values() {
1051            let outbound_links = record.outbound_connectors();
1052
1053            for link in outbound_links {
1054                // Filter by scheme
1055                if link.url.scheme() != scheme {
1056                    continue;
1057                }
1058
1059                let destination = link.url.resource_id();
1060
1061                // Skip links without serializer
1062                let Some(serializer) = link.serializer.clone() else {
1063                    #[cfg(feature = "tracing")]
1064                    tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1065                    continue;
1066                };
1067
1068                // Create consumer using the stored factory
1069                if let Some(consumer) = link.create_consumer(db_any.clone()) {
1070                    routes.push((destination, consumer, serializer, link.config.clone()));
1071                }
1072            }
1073        }
1074
1075        #[cfg(feature = "tracing")]
1076        if !routes.is_empty() {
1077            tracing::debug!(
1078                "Collected {} outbound routes for scheme '{}'",
1079                routes.len(),
1080                scheme
1081            );
1082        }
1083
1084        routes
1085    }
1086}