aimdb_core/builder.rs
1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::collections::BTreeMap;
13
14#[cfg(not(feature = "std"))]
15use alloc::{boxed::Box, sync::Arc, vec::Vec};
16
17#[cfg(all(not(feature = "std"), feature = "alloc"))]
18use alloc::string::String;
19
20#[cfg(feature = "std")]
21use std::{boxed::Box, sync::Arc, vec::Vec};
22
23use crate::typed_api::{RecordRegistrar, RecordT};
24use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
25use crate::{DbError, DbResult};
26
27/// Type alias for outbound route tuples returned by `collect_outbound_routes`
28///
29/// Each tuple contains:
30/// - `String` - Topic/key from the URL path
31/// - `Box<dyn ConsumerTrait>` - Callback to create a consumer for this record
32/// - `SerializerFn` - User-provided serializer for the record type
33/// - `Vec<(String, String)>` - Configuration options from the URL query
34#[cfg(feature = "alloc")]
35type OutboundRoute = (
36 String,
37 Box<dyn crate::connector::ConsumerTrait>,
38 crate::connector::SerializerFn,
39 Vec<(String, String)>,
40);
41
42/// Marker type for untyped builder (before runtime is set)
43pub struct NoRuntime;
44
45/// Internal database state
46///
47/// Holds the registry of typed records, indexed by `TypeId`.
48pub struct AimDbInner {
49 /// Map from TypeId to type-erased records (SPMC buffers for internal data flow)
50 pub records: BTreeMap<TypeId, Box<dyn AnyRecord>>,
51}
52
53impl AimDbInner {
54 /// Helper to get a typed record from the registry
55 ///
56 /// This encapsulates the common pattern of:
57 /// 1. Getting TypeId for type T
58 /// 2. Looking up the record in the map
59 /// 3. Downcasting to the typed record
60 pub fn get_typed_record<T, R>(&self) -> DbResult<&TypedRecord<T, R>>
61 where
62 T: Send + 'static + Debug + Clone,
63 R: aimdb_executor::Spawn + 'static,
64 {
65 use crate::typed_record::AnyRecordExt;
66
67 let type_id = TypeId::of::<T>();
68
69 #[cfg(feature = "std")]
70 let record = self.records.get(&type_id).ok_or(DbError::RecordNotFound {
71 record_name: core::any::type_name::<T>().to_string(),
72 })?;
73
74 #[cfg(not(feature = "std"))]
75 let record = self
76 .records
77 .get(&type_id)
78 .ok_or(DbError::RecordNotFound { _record_name: () })?;
79
80 #[cfg(feature = "std")]
81 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
82 operation: "get_typed_record".to_string(),
83 reason: "type mismatch during downcast".to_string(),
84 })?;
85
86 #[cfg(not(feature = "std"))]
87 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
88 _operation: (),
89 _reason: (),
90 })?;
91
92 Ok(typed_record)
93 }
94
95 /// Collects metadata for all registered records (std only)
96 ///
97 /// Returns a vector of `RecordMetadata` for remote access introspection.
98 /// Available only when the `std` feature is enabled.
99 #[cfg(feature = "std")]
100 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
101 self.records
102 .iter()
103 .map(|(type_id, record)| record.collect_metadata(*type_id))
104 .collect()
105 }
106
107 /// Try to get record's latest value as JSON by record name (std only)
108 ///
109 /// Searches for a record with the given name and returns its current value
110 /// serialized to JSON. Returns `None` if:
111 /// - Record not found
112 /// - Record not configured with `.with_serialization()`
113 /// - No value available in the atomic snapshot
114 ///
115 /// # Arguments
116 /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
117 ///
118 /// # Returns
119 /// `Some(JsonValue)` with the current record value, or `None`
120 #[cfg(feature = "std")]
121 pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
122 for (type_id, record) in &self.records {
123 let metadata = record.collect_metadata(*type_id);
124 if metadata.name == record_name {
125 return record.latest_json();
126 }
127 }
128 None
129 }
130
131 /// Sets a record value from JSON (remote access API)
132 ///
133 /// Deserializes the JSON value and writes it to the record's buffer.
134 ///
135 /// **SAFETY:** Enforces the "No Producer Override" rule:
136 /// - Only works for records with `producer_count == 0`
137 /// - Returns error if the record has active producers
138 ///
139 /// # Arguments
140 /// * `record_name` - The full Rust type name (e.g., "server::AppConfig")
141 /// * `json_value` - JSON representation of the value
142 ///
143 /// # Returns
144 /// - `Ok(())` - Successfully set the value
145 /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
146 ///
147 /// # Errors
148 /// - `RecordNotFound` - Record with given name doesn't exist
149 /// - `PermissionDenied` - Record has active producers (safety check)
150 /// - `RuntimeError` - Record not configured with `.with_serialization()`
151 /// - `JsonWithContext` - JSON deserialization failed (schema mismatch)
152 ///
153 /// # Example (internal use - called by remote access protocol)
154 /// ```rust,ignore
155 /// let json_val = serde_json::json!({"log_level": "debug", "version": "1.0"});
156 /// db.set_record_from_json("server::AppConfig", json_val)?;
157 /// ```
158 #[cfg(feature = "std")]
159 pub fn set_record_from_json(
160 &self,
161 record_name: &str,
162 json_value: serde_json::Value,
163 ) -> DbResult<()> {
164 // Find the record by name
165 for (type_id, record) in &self.records {
166 let metadata = record.collect_metadata(*type_id);
167 if metadata.name == record_name {
168 // Delegate to the type-erased set_from_json method
169 // which will enforce the "no producer override" rule
170 return record.set_from_json(json_value);
171 }
172 }
173
174 // Record not found
175 Err(DbError::RecordNotFound {
176 record_name: record_name.to_string(),
177 })
178 }
179}
180
181/// Database builder for producer-consumer pattern
182///
183/// Provides a fluent API for constructing databases with type-safe record registration.
184/// Use `.runtime()` to set the runtime and transition to a typed builder.
185pub struct AimDbBuilder<R = NoRuntime> {
186 /// Registry of typed records
187 records: BTreeMap<TypeId, Box<dyn AnyRecord>>,
188
189 /// Runtime adapter
190 runtime: Option<Arc<R>>,
191
192 /// Connector builders that will be invoked during build()
193 connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
194
195 /// Spawn functions indexed by TypeId
196 spawn_fns: BTreeMap<TypeId, Box<dyn core::any::Any + Send>>,
197
198 /// Remote access configuration (std only)
199 #[cfg(feature = "std")]
200 remote_config: Option<crate::remote::AimxConfig>,
201
202 /// PhantomData to track the runtime type parameter
203 _phantom: PhantomData<R>,
204}
205
206impl AimDbBuilder<NoRuntime> {
207 /// Creates a new database builder without a runtime
208 ///
209 /// Call `.runtime()` to set the runtime adapter.
210 pub fn new() -> Self {
211 Self {
212 records: BTreeMap::new(),
213 runtime: None,
214 connector_builders: Vec::new(),
215 spawn_fns: BTreeMap::new(),
216 #[cfg(feature = "std")]
217 remote_config: None,
218 _phantom: PhantomData,
219 }
220 }
221
222 /// Sets the runtime adapter
223 ///
224 /// This transitions the builder from untyped to typed with concrete runtime `R`.
225 ///
226 /// # Type Safety Note
227 ///
228 /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
229 /// transition because connectors are parameterized by the runtime type:
230 ///
231 /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
232 /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
233 ///
234 /// These types are incompatible and cannot be transferred. However, this is not a bug
235 /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
236 /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
237 ///
238 /// This means the type system **enforces** the correct call order:
239 /// ```rust,ignore
240 /// AimDbBuilder::new()
241 /// .runtime(runtime) // ← Must be called first
242 /// .with_connector(connector) // ← Now available
243 /// ```
244 ///
245 /// The `records` and `remote_config` are preserved across the transition since they
246 /// are not parameterized by the runtime type.
247 pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
248 where
249 R: aimdb_executor::Spawn + 'static,
250 {
251 AimDbBuilder {
252 records: self.records,
253 runtime: Some(rt),
254 connector_builders: Vec::new(),
255 spawn_fns: BTreeMap::new(),
256 #[cfg(feature = "std")]
257 remote_config: self.remote_config,
258 _phantom: PhantomData,
259 }
260 }
261}
262
263impl<R> AimDbBuilder<R>
264where
265 R: aimdb_executor::Spawn + 'static,
266{
267 /// Registers a connector builder that will be invoked during `build()`
268 ///
269 /// The connector builder will be called after the database is constructed,
270 /// allowing it to collect routes and initialize the connector properly.
271 ///
272 /// # Arguments
273 /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
274 ///
275 /// # Example
276 ///
277 /// ```rust,ignore
278 /// use aimdb_mqtt_connector::MqttConnector;
279 ///
280 /// let db = AimDbBuilder::new()
281 /// .runtime(runtime)
282 /// .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
283 /// .configure::<Temperature>(|reg| {
284 /// reg.link_from("mqtt://commands/temp")...
285 /// })
286 /// .build().await?;
287 /// ```
288 pub fn with_connector(
289 mut self,
290 builder: impl crate::connector::ConnectorBuilder<R> + 'static,
291 ) -> Self {
292 self.connector_builders.push(Box::new(builder));
293 self
294 }
295
296 /// Enables remote access via AimX protocol (std only)
297 ///
298 /// Configures the database to accept remote connections over a Unix domain socket,
299 /// allowing external clients to introspect records, subscribe to updates, and
300 /// (optionally) write data.
301 ///
302 /// The remote access supervisor will be spawned automatically during `build()`.
303 ///
304 /// # Arguments
305 /// * `config` - Remote access configuration (socket path, security policy, etc.)
306 ///
307 /// # Example
308 ///
309 /// ```rust,ignore
310 /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
311 ///
312 /// let config = AimxConfig::new("/tmp/aimdb.sock")
313 /// .with_security(SecurityPolicy::read_only());
314 ///
315 /// let db = AimDbBuilder::new()
316 /// .runtime(runtime)
317 /// .with_remote_access(config)
318 /// .build()?;
319 /// ```
320 #[cfg(feature = "std")]
321 pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
322 self.remote_config = Some(config);
323 self
324 }
325
326 /// Configures a record type manually
327 ///
328 /// Low-level method for advanced use cases. Most users should use `register_record` instead.
329 pub fn configure<T>(
330 &mut self,
331 f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
332 ) -> &mut Self
333 where
334 T: Send + Sync + 'static + Debug + Clone,
335 {
336 let entry = self
337 .records
338 .entry(TypeId::of::<T>())
339 .or_insert_with(|| Box::new(TypedRecord::<T, R>::new()));
340
341 let rec = entry
342 .as_typed_mut::<T, R>()
343 .expect("type mismatch in record registry");
344
345 let mut reg = RecordRegistrar {
346 rec,
347 connector_builders: &self.connector_builders,
348 };
349 f(&mut reg);
350
351 // Store a spawn function that captures the concrete type T and connectors
352 let type_id = TypeId::of::<T>();
353
354 #[allow(clippy::type_complexity)]
355 let spawn_fn: Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>) -> DbResult<()> + Send> =
356 Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>| {
357 // Use RecordSpawner to spawn tasks for this record type
358 use crate::typed_record::RecordSpawner;
359
360 let typed_record = db.inner().get_typed_record::<T, R>()?;
361 RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db)
362 });
363
364 // Store the spawn function (type-erased in Box<dyn Any>)
365 self.spawn_fns.insert(type_id, Box::new(spawn_fn));
366
367 self
368 }
369
370 /// Registers a self-registering record type
371 ///
372 /// The record type must implement `RecordT<R>`.
373 pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
374 where
375 T: RecordT<R>,
376 {
377 self.configure::<T>(|reg| T::register(reg, cfg))
378 }
379
380 /// Runs the database indefinitely (never returns)
381 ///
382 /// This method builds the database, spawns all producer and consumer tasks, and then
383 /// parks the current task indefinitely. This is the primary way to run AimDB services.
384 ///
385 /// All logic runs in background tasks via producers, consumers, and connectors. The
386 /// application continues until interrupted (e.g., Ctrl+C).
387 ///
388 /// # Returns
389 /// `DbResult<()>` - Ok when database starts successfully, then parks forever
390 ///
391 /// # Example
392 ///
393 /// ```rust,ignore
394 /// #[tokio::main]
395 /// async fn main() -> DbResult<()> {
396 /// AimDbBuilder::new()
397 /// .runtime(Arc::new(TokioAdapter::new()?))
398 /// .configure::<MyData>(|reg| {
399 /// reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
400 /// .with_source(my_producer)
401 /// .with_tap(my_consumer);
402 /// })
403 /// .run().await // Runs forever
404 /// }
405 /// ```
406 pub async fn run(self) -> DbResult<()> {
407 #[cfg(feature = "tracing")]
408 tracing::info!("Building database and spawning background tasks...");
409
410 let _db = self.build().await?;
411
412 #[cfg(feature = "tracing")]
413 tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
414
415 // Park indefinitely - the background tasks will continue running
416 // The database handle is kept alive here to prevent dropping it
417 core::future::pending::<()>().await;
418
419 Ok(())
420 }
421
422 /// Builds the database and returns the handle (async)
423 ///
424 /// Use this when you need programmatic access to the database handle for
425 /// manual subscriptions or production. For typical services, use `.run().await` instead.
426 ///
427 /// **Automatic Task Spawning:** This method spawns all producer services and
428 /// `.tap()` observer tasks that were registered during configuration.
429 ///
430 /// **Connector Setup:** Connectors must be created manually before calling `build()`:
431 ///
432 /// ```rust,ignore
433 /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
434 ///
435 /// // Configure records with connector links
436 /// let builder = AimDbBuilder::new()
437 /// .runtime(runtime)
438 /// .configure::<Temp>(|reg| {
439 /// reg.link_from("mqtt://commands/temp")
440 /// .with_buffer(BufferCfg::SingleLatest)
441 /// .with_serialization();
442 /// });
443 ///
444 /// // Create MQTT connector with router
445 /// let router = RouterBuilder::new()
446 /// .route("commands/temp", /* deserializer */)
447 /// .build();
448 /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
449 ///
450 /// // Register connector and build
451 /// let db = builder
452 /// .with_connector("mqtt", Arc::new(connector))
453 /// .build().await?;
454 /// ```
455 ///
456 /// # Returns
457 /// `DbResult<AimDb<R>>` - The database instance
458 #[cfg_attr(not(feature = "std"), allow(unused_mut))]
459 pub async fn build(self) -> DbResult<AimDb<R>> {
460 use crate::DbError;
461
462 // Validate all records
463 for record in self.records.values() {
464 record.validate().map_err(|_msg| {
465 #[cfg(feature = "std")]
466 {
467 DbError::RuntimeError {
468 message: format!("Record validation failed: {}", _msg),
469 }
470 }
471 #[cfg(not(feature = "std"))]
472 {
473 DbError::RuntimeError { _message: () }
474 }
475 })?;
476 }
477
478 // Ensure runtime is set
479 let runtime = self.runtime.ok_or({
480 #[cfg(feature = "std")]
481 {
482 DbError::RuntimeError {
483 message: "runtime not set (use .runtime())".into(),
484 }
485 }
486 #[cfg(not(feature = "std"))]
487 {
488 DbError::RuntimeError { _message: () }
489 }
490 })?;
491
492 let inner = Arc::new(AimDbInner {
493 records: self.records,
494 });
495
496 let db = Arc::new(AimDb {
497 inner: inner.clone(),
498 runtime: runtime.clone(),
499 });
500
501 #[cfg(feature = "tracing")]
502 tracing::info!(
503 "Spawning producer services and tap observers for {} record types",
504 self.spawn_fns.len()
505 );
506
507 // Execute spawn functions for each record type
508 for (_type_id, spawn_fn_any) in self.spawn_fns {
509 // Downcast from Box<dyn Any> back to the concrete spawn function type
510 type SpawnFnType<R> = Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>) -> DbResult<()> + Send>;
511
512 let spawn_fn = spawn_fn_any
513 .downcast::<SpawnFnType<R>>()
514 .expect("spawn function type mismatch");
515
516 // Execute the spawn function
517 (*spawn_fn)(&runtime, &db)?;
518 }
519
520 #[cfg(feature = "tracing")]
521 tracing::info!("Automatic spawning complete");
522
523 // Spawn remote access supervisor if configured (std only)
524 #[cfg(feature = "std")]
525 if let Some(remote_cfg) = self.remote_config {
526 #[cfg(feature = "tracing")]
527 tracing::info!(
528 "Spawning remote access supervisor on socket: {}",
529 remote_cfg.socket_path.display()
530 );
531
532 // Apply security policy to mark writable records
533 let writable_type_ids = remote_cfg.security_policy.writable_records();
534 for (type_id, record) in inner.records.iter() {
535 if writable_type_ids.contains(type_id) {
536 #[cfg(feature = "tracing")]
537 tracing::debug!("Marking record {:?} as writable", type_id);
538
539 // Mark the record as writable (type-erased call)
540 record.set_writable_erased(true);
541 }
542 }
543
544 // Spawn the remote supervisor task
545 // This will be implemented in Task 6
546 crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
547
548 #[cfg(feature = "tracing")]
549 tracing::info!("Remote access supervisor spawned successfully");
550 }
551
552 // Build connectors from builders (after database is fully constructed)
553 // This allows connectors to use collect_inbound_routes() which creates
554 // producers tied to this specific database instance
555 let mut built_connectors = BTreeMap::new();
556 for builder in self.connector_builders {
557 #[cfg(feature = "std")]
558 let scheme = builder.scheme().to_string();
559
560 #[cfg(not(feature = "std"))]
561 let scheme = alloc::string::String::from(builder.scheme());
562
563 #[cfg(feature = "tracing")]
564 tracing::debug!("Building connector for scheme: {}", scheme);
565
566 let connector = builder.build(&db).await?;
567 built_connectors.insert(scheme.clone(), connector);
568
569 #[cfg(feature = "tracing")]
570 tracing::info!("Connector built and spawned successfully: {}", scheme);
571 }
572
573 // Unwrap the Arc to return the owned AimDb
574 // This is safe because we just created it and hold the only reference
575 let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
576
577 Ok(db_owned)
578 }
579}
580
581impl Default for AimDbBuilder<NoRuntime> {
582 fn default() -> Self {
583 Self::new()
584 }
585}
586
587/// Producer-consumer database
588///
589/// A database instance with type-safe record registration and cross-record
590/// communication via the Emitter pattern. The type parameter `R` represents
591/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
592///
593/// See `examples/` for usage.
594///
595/// # Examples
596///
597/// ```rust,ignore
598/// use aimdb_tokio_adapter::TokioAdapter;
599///
600/// let runtime = Arc::new(TokioAdapter);
601/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
602/// .runtime(runtime)
603/// .register_record::<Temperature>(&TemperatureConfig)
604/// .build()?;
605/// ```
606pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
607 /// Internal state
608 inner: Arc<AimDbInner>,
609
610 /// Runtime adapter with concrete type
611 runtime: Arc<R>,
612}
613
614impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
615 fn clone(&self) -> Self {
616 Self {
617 inner: self.inner.clone(),
618 runtime: self.runtime.clone(),
619 }
620 }
621}
622
623impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
624 /// Internal accessor for the inner state
625 ///
626 /// Used by adapter crates and internal spawning logic.
627 #[doc(hidden)]
628 pub fn inner(&self) -> &Arc<AimDbInner> {
629 &self.inner
630 }
631
632 /// Builds a database with a closure-based builder pattern
633 pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
634 let mut b = AimDbBuilder::new().runtime(rt);
635 f(&mut b);
636 b.run().await
637 }
638
639 /// Spawns a task using the database's runtime adapter
640 ///
641 /// This method provides direct access to the runtime's spawn capability.
642 ///
643 /// # Arguments
644 /// * `future` - The future to spawn
645 ///
646 /// # Returns
647 /// `DbResult<()>` - Ok if the task was spawned successfully
648 pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
649 where
650 F: core::future::Future<Output = ()> + Send + 'static,
651 {
652 self.runtime.spawn(future).map_err(DbError::from)?;
653 Ok(())
654 }
655
656 /// Produces a value for a record type
657 ///
658 /// Writes the value to the record's buffer and triggers all consumers.
659 pub async fn produce<T>(&self, value: T) -> DbResult<()>
660 where
661 T: Send + 'static + Debug + Clone,
662 {
663 // Get the typed record using the helper
664 let typed_rec = self.inner.get_typed_record::<T, R>()?;
665
666 // Produce the value directly to the buffer
667 typed_rec.produce(value).await;
668 Ok(())
669 }
670
671 /// Subscribes to a record type's buffer
672 ///
673 /// Creates a subscription to the configured buffer for the given record type.
674 /// Returns a boxed reader for receiving values asynchronously.
675 ///
676 /// # Example
677 ///
678 /// ```rust,ignore
679 /// let mut reader = db.subscribe::<Temperature>()?;
680 ///
681 /// loop {
682 /// match reader.recv().await {
683 /// Ok(temp) => println!("Temperature: {:.1}°C", temp.celsius),
684 /// Err(_) => break,
685 /// }
686 /// }
687 /// ```
688 pub fn subscribe<T>(&self) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
689 where
690 T: Send + Sync + 'static + Debug + Clone,
691 {
692 // Get the typed record using the helper
693 let typed_rec = self.inner.get_typed_record::<T, R>()?;
694
695 // Subscribe to the buffer
696 typed_rec.subscribe()
697 }
698
699 /// Creates a type-safe producer for a specific record type
700 ///
701 /// Returns a `Producer<T, R>` that can only produce values of type `T`.
702 /// This is the recommended way to pass database access to producer services,
703 /// following the principle of least privilege.
704 ///
705 /// # Example
706 ///
707 /// ```rust,ignore
708 /// let db = builder.build()?;
709 /// let temp_producer = db.producer::<Temperature>();
710 ///
711 /// // Pass to service - it can only produce Temperature values
712 /// runtime.spawn(temperature_service(ctx, temp_producer)).unwrap();
713 /// ```
714 pub fn producer<T>(&self) -> crate::typed_api::Producer<T, R>
715 where
716 T: Send + 'static + Debug + Clone,
717 {
718 crate::typed_api::Producer::new(Arc::new(self.clone()))
719 }
720
721 /// Creates a type-safe consumer for a specific record type
722 ///
723 /// Returns a `Consumer<T, R>` that can only subscribe to values of type `T`.
724 /// This is the recommended way to pass database access to consumer services,
725 /// following the principle of least privilege.
726 ///
727 /// # Example
728 ///
729 /// ```rust,ignore
730 /// let db = builder.build()?;
731 /// let temp_consumer = db.consumer::<Temperature>();
732 ///
733 /// // Pass to service - it can only consume Temperature values
734 /// runtime.spawn(temperature_monitor(ctx, temp_consumer)).unwrap();
735 /// ```
736 pub fn consumer<T>(&self) -> crate::typed_api::Consumer<T, R>
737 where
738 T: Send + Sync + 'static + Debug + Clone,
739 {
740 crate::typed_api::Consumer::new(Arc::new(self.clone()))
741 }
742
743 /// Returns a reference to the runtime adapter
744 ///
745 /// Provides direct access to the concrete runtime type.
746 pub fn runtime(&self) -> &R {
747 &self.runtime
748 }
749
750 /// Lists all registered records (std only)
751 ///
752 /// Returns metadata for all registered records, useful for remote access introspection.
753 /// Available only when the `std` feature is enabled.
754 ///
755 /// # Example
756 /// ```rust,ignore
757 /// let records = db.list_records();
758 /// for record in records {
759 /// println!("Record: {} ({})", record.name, record.type_id);
760 /// }
761 /// ```
762 #[cfg(feature = "std")]
763 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
764 self.inner.list_records()
765 }
766
767 /// Try to get record's latest value as JSON by name (std only)
768 ///
769 /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
770 ///
771 /// # Arguments
772 /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
773 ///
774 /// # Returns
775 /// `Some(JsonValue)` with current value, or `None` if unavailable
776 #[cfg(feature = "std")]
777 pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
778 self.inner.try_latest_as_json(record_name)
779 }
780
781 /// Sets a record value from JSON (remote access API)
782 ///
783 /// Deserializes JSON and produces the value to the record's buffer.
784 ///
785 /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
786 /// records without active producers.
787 ///
788 /// # Arguments
789 /// * `record_name` - Full Rust type name
790 /// * `json_value` - JSON value to set
791 ///
792 /// # Returns
793 /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
794 ///
795 /// # Example (internal use)
796 /// ```rust,ignore
797 /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
798 /// ```
799 #[cfg(feature = "std")]
800 pub fn set_record_from_json(
801 &self,
802 record_name: &str,
803 json_value: serde_json::Value,
804 ) -> DbResult<()> {
805 self.inner.set_record_from_json(record_name, json_value)
806 }
807
808 /// Subscribe to record updates as JSON stream (std only)
809 ///
810 /// Creates a subscription to a record's buffer and forwards updates as JSON
811 /// to a bounded channel. This is used internally by the remote access protocol
812 /// for implementing `record.subscribe`.
813 ///
814 /// # Architecture
815 ///
816 /// Spawns a consumer task that:
817 /// 1. Subscribes to the record's buffer using the existing buffer API
818 /// 2. Reads values as they arrive
819 /// 3. Serializes each value to JSON
820 /// 4. Sends JSON values to a bounded channel (with backpressure handling)
821 /// 5. Terminates when either:
822 /// - The cancel signal is received (unsubscribe)
823 /// - The channel receiver is dropped (client disconnected)
824 ///
825 /// # Arguments
826 /// * `type_id` - TypeId of the record to subscribe to
827 /// * `queue_size` - Size of the bounded channel for this subscription
828 ///
829 /// # Returns
830 /// `Ok((receiver, cancel_tx))` where:
831 /// - `receiver`: Bounded channel receiver for JSON values
832 /// - `cancel_tx`: One-shot sender to cancel the subscription
833 ///
834 /// `Err` if:
835 /// - Record not found for the given TypeId
836 /// - Record not configured with `.with_serialization()`
837 /// - Failed to subscribe to buffer
838 ///
839 /// # Example (internal use)
840 ///
841 /// ```rust,ignore
842 /// let type_id = TypeId::of::<Temperature>();
843 /// let (mut rx, cancel_tx) = db.subscribe_record_updates(type_id, 100)?;
844 ///
845 /// // Read events
846 /// while let Some(json_value) = rx.recv().await {
847 /// // Forward to client...
848 /// }
849 ///
850 /// // Cancel subscription
851 /// let _ = cancel_tx.send(());
852 /// ```
853 #[cfg(feature = "std")]
854 #[allow(unused_variables)] // Variables used only in tracing feature
855 pub fn subscribe_record_updates(
856 &self,
857 type_id: TypeId,
858 queue_size: usize,
859 ) -> DbResult<(
860 tokio::sync::mpsc::Receiver<serde_json::Value>,
861 tokio::sync::oneshot::Sender<()>,
862 )> {
863 use tokio::sync::{mpsc, oneshot};
864
865 // Find the record by TypeId
866 let record = self
867 .inner
868 .records
869 .get(&type_id)
870 .ok_or(DbError::RecordNotFound {
871 record_name: format!("TypeId({:?})", type_id),
872 })?;
873
874 // Subscribe to the record's buffer as JSON stream
875 // This will fail if record not configured with .with_serialization()
876 let mut json_reader = record.subscribe_json()?;
877
878 // Create channels for the subscription
879 let (value_tx, value_rx) = mpsc::channel(queue_size);
880 let (cancel_tx, mut cancel_rx) = oneshot::channel();
881
882 // Get metadata for logging
883 let record_metadata = record.collect_metadata(type_id);
884 let runtime = self.runtime.clone();
885
886 // Spawn consumer task that forwards JSON values from buffer to channel
887 let spawn_result = runtime.spawn(async move {
888 #[cfg(feature = "tracing")]
889 tracing::debug!(
890 "Subscription consumer task started for {}",
891 record_metadata.name
892 );
893
894 // Main event loop: read from buffer and forward to channel
895 loop {
896 tokio::select! {
897 // Handle cancellation signal
898 _ = &mut cancel_rx => {
899 #[cfg(feature = "tracing")]
900 tracing::debug!("Subscription cancelled");
901 break;
902 }
903 // Read next JSON value from buffer
904 result = json_reader.recv_json() => {
905 match result {
906 Ok(json_val) => {
907 // Send JSON value to subscription channel
908 if value_tx.send(json_val).await.is_err() {
909 #[cfg(feature = "tracing")]
910 tracing::debug!("Subscription receiver dropped");
911 break;
912 }
913 }
914 Err(DbError::BufferLagged { lag_count, .. }) => {
915 // Consumer fell behind - log warning but continue
916 #[cfg(feature = "tracing")]
917 tracing::warn!(
918 "Subscription for {} lagged by {} messages",
919 record_metadata.name,
920 lag_count
921 );
922 // Continue reading - next recv will get latest
923 }
924 Err(DbError::BufferClosed { .. }) => {
925 // Buffer closed (shutdown) - exit gracefully
926 #[cfg(feature = "tracing")]
927 tracing::debug!("Buffer closed for {}", record_metadata.name);
928 break;
929 }
930 Err(e) => {
931 // Other error (shouldn't happen in practice)
932 #[cfg(feature = "tracing")]
933 tracing::error!(
934 "Subscription error for {}: {:?}",
935 record_metadata.name,
936 e
937 );
938 break;
939 }
940 }
941 }
942 }
943 }
944
945 #[cfg(feature = "tracing")]
946 tracing::debug!("Subscription consumer task terminated");
947 });
948
949 spawn_result.map_err(DbError::from)?;
950
951 Ok((value_rx, cancel_tx))
952 }
953
954 /// Collects inbound connector routes for automatic router construction (std only)
955 ///
956 /// Iterates all records, filters their inbound_connectors by scheme,
957 /// and returns routes with producer creation callbacks.
958 ///
959 /// # Arguments
960 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
961 ///
962 /// # Returns
963 /// Vector of tuples: (topic, producer_trait, deserializer)
964 ///
965 /// # Example
966 /// ```rust,ignore
967 /// // In MqttConnector after db.build()
968 /// let routes = db.collect_inbound_routes("mqtt");
969 /// let router = RouterBuilder::from_routes(routes).build();
970 /// connector.set_router(router).await?;
971 /// ```
972 #[cfg(feature = "alloc")]
973 pub fn collect_inbound_routes(
974 &self,
975 scheme: &str,
976 ) -> Vec<(
977 String,
978 Box<dyn crate::connector::ProducerTrait>,
979 crate::connector::DeserializerFn,
980 )> {
981 let mut routes = Vec::new();
982
983 // Convert self to Arc<dyn Any> for producer factory
984 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
985
986 for record in self.inner.records.values() {
987 let inbound_links = record.inbound_connectors();
988
989 for link in inbound_links {
990 // Filter by scheme
991 if link.url.scheme() != scheme {
992 continue;
993 }
994
995 let topic = link.url.resource_id();
996
997 // Create producer using the stored factory
998 if let Some(producer) = link.create_producer(db_any.clone()) {
999 routes.push((topic, producer, link.deserializer.clone()));
1000 }
1001 }
1002 }
1003
1004 #[cfg(feature = "tracing")]
1005 if !routes.is_empty() {
1006 tracing::debug!(
1007 "Collected {} inbound routes for scheme '{}'",
1008 routes.len(),
1009 scheme
1010 );
1011 }
1012
1013 routes
1014 }
1015
1016 /// Collects outbound routes for a specific protocol scheme
1017 ///
1018 /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1019 /// filters their outbound_connectors by scheme, and returns routes with
1020 /// consumer creation callbacks.
1021 ///
1022 /// This method is called by connectors during their `build()` phase to
1023 /// collect all configured outbound routes and spawn publisher tasks.
1024 ///
1025 /// # Arguments
1026 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1027 ///
1028 /// # Returns
1029 /// Vector of tuples: (destination, consumer_trait, serializer, config)
1030 ///
1031 /// The config Vec contains protocol-specific options (e.g., qos, retain).
1032 ///
1033 /// # Example
1034 /// ```rust,ignore
1035 /// // In MqttConnector::build()
1036 /// let routes = db.collect_outbound_routes("mqtt");
1037 /// for (topic, consumer, serializer, config) in routes {
1038 /// connector.spawn_publisher(topic, consumer, serializer, config)?;
1039 /// }
1040 /// ```
1041 #[cfg(feature = "alloc")]
1042 pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1043 let mut routes = Vec::new();
1044
1045 // Convert self to Arc<dyn Any> for consumer factory
1046 // This is necessary because the factory takes Arc<dyn Any> to avoid
1047 // needing to know the runtime type R at the factory definition site
1048 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1049
1050 for record in self.inner.records.values() {
1051 let outbound_links = record.outbound_connectors();
1052
1053 for link in outbound_links {
1054 // Filter by scheme
1055 if link.url.scheme() != scheme {
1056 continue;
1057 }
1058
1059 let destination = link.url.resource_id();
1060
1061 // Skip links without serializer
1062 let Some(serializer) = link.serializer.clone() else {
1063 #[cfg(feature = "tracing")]
1064 tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1065 continue;
1066 };
1067
1068 // Create consumer using the stored factory
1069 if let Some(consumer) = link.create_consumer(db_any.clone()) {
1070 routes.push((destination, consumer, serializer, link.config.clone()));
1071 }
1072 }
1073 }
1074
1075 #[cfg(feature = "tracing")]
1076 if !routes.is_empty() {
1077 tracing::debug!(
1078 "Collected {} outbound routes for scheme '{}'",
1079 routes.len(),
1080 scheme
1081 );
1082 }
1083
1084 routes
1085 }
1086}