aimdb_core/builder.rs
1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::vec::Vec;
13use hashbrown::HashMap;
14
15#[cfg(not(feature = "std"))]
16use alloc::{boxed::Box, sync::Arc};
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19use alloc::string::{String, ToString};
20
21#[cfg(feature = "std")]
22use std::{boxed::Box, sync::Arc};
23
24use crate::record_id::{RecordId, RecordKey, StringKey};
25use crate::typed_api::{RecordRegistrar, RecordT};
26use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
27use crate::{DbError, DbResult};
28
29/// Type alias for outbound route tuples returned by `collect_outbound_routes`
30///
31/// Each tuple contains:
32/// - `String` - Topic/key from the URL path
33/// - `Box<dyn ConsumerTrait>` - Callback to create a consumer for this record
34/// - `SerializerFn` - User-provided serializer for the record type
35/// - `Vec<(String, String)>` - Configuration options from the URL query
36#[cfg(feature = "alloc")]
37type OutboundRoute = (
38 String,
39 Box<dyn crate::connector::ConsumerTrait>,
40 crate::connector::SerializerFn,
41 Vec<(String, String)>,
42);
43
44/// Marker type for untyped builder (before runtime is set)
45pub struct NoRuntime;
46
47/// Internal database state
48///
49/// Holds the registry of typed records with multiple index structures for
50/// efficient access patterns:
51///
52/// - **`storages`**: Vec for O(1) hot-path access by RecordId
53/// - **`by_key`**: HashMap for O(1) lookup by stable RecordKey
54/// - **`by_type`**: HashMap for introspection (find all records of type T)
55/// - **`types`**: Vec for runtime type validation during downcasts
56pub struct AimDbInner {
57 /// Record storage (hot path - indexed by RecordId)
58 ///
59 /// Order matches registration order. Immutable after build().
60 storages: Vec<Box<dyn AnyRecord>>,
61
62 /// Name → RecordId lookup (control plane)
63 ///
64 /// Used by remote access, CLI, MCP for O(1) name resolution.
65 by_key: HashMap<StringKey, RecordId>,
66
67 /// TypeId → RecordIds lookup (introspection)
68 ///
69 /// Enables "find all Temperature records" queries.
70 by_type: HashMap<TypeId, Vec<RecordId>>,
71
72 /// RecordId → TypeId lookup (type safety assertions)
73 ///
74 /// Used to validate downcasts at runtime.
75 types: Vec<TypeId>,
76
77 /// RecordId → StringKey lookup (reverse mapping)
78 ///
79 /// Used to get the key for a given record ID.
80 keys: Vec<StringKey>,
81}
82
83impl AimDbInner {
84 /// Resolve RecordKey to RecordId (control plane - O(1) average)
85 #[inline]
86 pub fn resolve<K: RecordKey>(&self, key: &K) -> Option<RecordId> {
87 self.by_key.get(key.as_str()).copied()
88 }
89
90 /// Resolve string to RecordId (convenience for remote access)
91 ///
92 /// O(1) average thanks to `Borrow<str>` implementation on `RecordKey`.
93 #[inline]
94 pub fn resolve_str(&self, name: &str) -> Option<RecordId> {
95 self.by_key.get(name).copied()
96 }
97
98 /// Get storage by RecordId (hot path - O(1))
99 #[inline]
100 pub fn storage(&self, id: RecordId) -> Option<&dyn AnyRecord> {
101 self.storages.get(id.index()).map(|b| b.as_ref())
102 }
103
104 /// Get the StringKey for a given RecordId
105 #[inline]
106 pub fn key_for(&self, id: RecordId) -> Option<&StringKey> {
107 self.keys.get(id.index())
108 }
109
110 /// Get all RecordIds for a type (introspection)
111 pub fn records_of_type<T: 'static>(&self) -> &[RecordId] {
112 self.by_type
113 .get(&TypeId::of::<T>())
114 .map(|v| v.as_slice())
115 .unwrap_or(&[])
116 }
117
118 /// Get the number of registered records
119 #[inline]
120 pub fn record_count(&self) -> usize {
121 self.storages.len()
122 }
123
124 /// Helper to get a typed record by RecordKey
125 ///
126 /// This encapsulates the common pattern of:
127 /// 1. Resolving key to RecordId
128 /// 2. Validating TypeId matches
129 /// 3. Downcasting to the typed record
130 pub fn get_typed_record_by_key<T, R>(
131 &self,
132 key: impl AsRef<str>,
133 ) -> DbResult<&TypedRecord<T, R>>
134 where
135 T: Send + 'static + Debug + Clone,
136 R: aimdb_executor::Spawn + 'static,
137 {
138 let key_str = key.as_ref();
139
140 // Resolve key to RecordId
141 let id = self.resolve_str(key_str).ok_or({
142 #[cfg(feature = "std")]
143 {
144 DbError::RecordKeyNotFound {
145 key: key_str.to_string(),
146 }
147 }
148 #[cfg(not(feature = "std"))]
149 {
150 DbError::RecordKeyNotFound { _key: () }
151 }
152 })?;
153
154 self.get_typed_record_by_id::<T, R>(id)
155 }
156
157 /// Helper to get a typed record by RecordId with type validation
158 pub fn get_typed_record_by_id<T, R>(&self, id: RecordId) -> DbResult<&TypedRecord<T, R>>
159 where
160 T: Send + 'static + Debug + Clone,
161 R: aimdb_executor::Spawn + 'static,
162 {
163 use crate::typed_record::AnyRecordExt;
164
165 // Validate RecordId is in bounds
166 if id.index() >= self.storages.len() {
167 return Err(DbError::InvalidRecordId { id: id.raw() });
168 }
169
170 // Validate TypeId matches
171 let expected = TypeId::of::<T>();
172 let actual = self.types[id.index()];
173 if expected != actual {
174 #[cfg(feature = "std")]
175 return Err(DbError::TypeMismatch {
176 record_id: id.raw(),
177 expected_type: core::any::type_name::<T>().to_string(),
178 });
179 #[cfg(not(feature = "std"))]
180 return Err(DbError::TypeMismatch {
181 record_id: id.raw(),
182 _expected_type: (),
183 });
184 }
185
186 // Safe to downcast (type validated above)
187 let record = &self.storages[id.index()];
188
189 #[cfg(feature = "std")]
190 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
191 operation: "get_typed_record_by_id".to_string(),
192 reason: "type mismatch during downcast".to_string(),
193 })?;
194
195 #[cfg(not(feature = "std"))]
196 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
197 _operation: (),
198 _reason: (),
199 })?;
200
201 Ok(typed_record)
202 }
203
204 /// Collects metadata for all registered records (std only)
205 ///
206 /// Returns a vector of `RecordMetadata` for remote access introspection.
207 /// Available only when the `std` feature is enabled.
208 #[cfg(feature = "std")]
209 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
210 self.storages
211 .iter()
212 .enumerate()
213 .map(|(i, record)| {
214 let id = RecordId::new(i as u32);
215 let type_id = self.types[i];
216 let key = self.keys[i];
217 record.collect_metadata(type_id, key, id)
218 })
219 .collect()
220 }
221
222 /// Try to get record's latest value as JSON by record key (std only)
223 ///
224 /// O(1) lookup using the key-based index.
225 ///
226 /// # Arguments
227 /// * `record_key` - The record key (e.g., "sensors.temperature")
228 ///
229 /// # Returns
230 /// `Some(JsonValue)` with the current record value, or `None`
231 #[cfg(feature = "std")]
232 pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
233 let id = self.resolve_str(record_key)?;
234 self.storages.get(id.index())?.latest_json()
235 }
236
237 /// Sets a record value from JSON (remote access API)
238 ///
239 /// Deserializes the JSON value and writes it to the record's buffer.
240 ///
241 /// **SAFETY:** Enforces the "No Producer Override" rule:
242 /// - Only works for records with `producer_count == 0`
243 /// - Returns error if the record has active producers
244 ///
245 /// # Arguments
246 /// * `record_key` - The record key (e.g., "config.app")
247 /// * `json_value` - JSON representation of the value
248 ///
249 /// # Returns
250 /// - `Ok(())` - Successfully set the value
251 /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
252 #[cfg(feature = "std")]
253 pub fn set_record_from_json(
254 &self,
255 record_key: &str,
256 json_value: serde_json::Value,
257 ) -> DbResult<()> {
258 let id = self
259 .resolve_str(record_key)
260 .ok_or_else(|| DbError::RecordKeyNotFound {
261 key: record_key.to_string(),
262 })?;
263
264 self.storages[id.index()].set_from_json(json_value)
265 }
266}
267
268/// Database builder for producer-consumer pattern
269///
270/// Provides a fluent API for constructing databases with type-safe record registration.
271/// Use `.runtime()` to set the runtime and transition to a typed builder.
272pub struct AimDbBuilder<R = NoRuntime> {
273 /// Registered records with their keys (order matters for RecordId assignment)
274 records: Vec<(StringKey, TypeId, Box<dyn AnyRecord>)>,
275
276 /// Runtime adapter
277 runtime: Option<Arc<R>>,
278
279 /// Connector builders that will be invoked during build()
280 connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
281
282 /// Spawn functions with their keys
283 spawn_fns: Vec<(StringKey, Box<dyn core::any::Any + Send>)>,
284
285 /// Remote access configuration (std only)
286 #[cfg(feature = "std")]
287 remote_config: Option<crate::remote::AimxConfig>,
288
289 /// PhantomData to track the runtime type parameter
290 _phantom: PhantomData<R>,
291}
292
293impl AimDbBuilder<NoRuntime> {
294 /// Creates a new database builder without a runtime
295 ///
296 /// Call `.runtime()` to set the runtime adapter.
297 pub fn new() -> Self {
298 Self {
299 records: Vec::new(),
300 runtime: None,
301 connector_builders: Vec::new(),
302 spawn_fns: Vec::new(),
303 #[cfg(feature = "std")]
304 remote_config: None,
305 _phantom: PhantomData,
306 }
307 }
308
309 /// Sets the runtime adapter
310 ///
311 /// This transitions the builder from untyped to typed with concrete runtime `R`.
312 ///
313 /// # Type Safety Note
314 ///
315 /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
316 /// transition because connectors are parameterized by the runtime type:
317 ///
318 /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
319 /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
320 ///
321 /// These types are incompatible and cannot be transferred. However, this is not a bug
322 /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
323 /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
324 ///
325 /// This means the type system **enforces** the correct call order:
326 /// ```rust,ignore
327 /// AimDbBuilder::new()
328 /// .runtime(runtime) // ← Must be called first
329 /// .with_connector(connector) // ← Now available
330 /// ```
331 ///
332 /// The `records` and `remote_config` are preserved across the transition since they
333 /// are not parameterized by the runtime type.
334 pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
335 where
336 R: aimdb_executor::Spawn + 'static,
337 {
338 AimDbBuilder {
339 records: self.records,
340 runtime: Some(rt),
341 connector_builders: Vec::new(),
342 spawn_fns: Vec::new(),
343 #[cfg(feature = "std")]
344 remote_config: self.remote_config,
345 _phantom: PhantomData,
346 }
347 }
348}
349
350impl<R> AimDbBuilder<R>
351where
352 R: aimdb_executor::Spawn + 'static,
353{
354 /// Registers a connector builder that will be invoked during `build()`
355 ///
356 /// The connector builder will be called after the database is constructed,
357 /// allowing it to collect routes and initialize the connector properly.
358 ///
359 /// # Arguments
360 /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
361 ///
362 /// # Example
363 ///
364 /// ```rust,ignore
365 /// use aimdb_mqtt_connector::MqttConnector;
366 ///
367 /// let db = AimDbBuilder::new()
368 /// .runtime(runtime)
369 /// .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
370 /// .configure::<Temperature>(|reg| {
371 /// reg.link_from("mqtt://commands/temp")...
372 /// })
373 /// .build().await?;
374 /// ```
375 pub fn with_connector(
376 mut self,
377 builder: impl crate::connector::ConnectorBuilder<R> + 'static,
378 ) -> Self {
379 self.connector_builders.push(Box::new(builder));
380 self
381 }
382
383 /// Enables remote access via AimX protocol (std only)
384 ///
385 /// Configures the database to accept remote connections over a Unix domain socket,
386 /// allowing external clients to introspect records, subscribe to updates, and
387 /// (optionally) write data.
388 ///
389 /// The remote access supervisor will be spawned automatically during `build()`.
390 ///
391 /// # Arguments
392 /// * `config` - Remote access configuration (socket path, security policy, etc.)
393 ///
394 /// # Example
395 ///
396 /// ```rust,ignore
397 /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
398 ///
399 /// let config = AimxConfig::new("/tmp/aimdb.sock")
400 /// .with_security(SecurityPolicy::read_only());
401 ///
402 /// let db = AimDbBuilder::new()
403 /// .runtime(runtime)
404 /// .with_remote_access(config)
405 /// .build()?;
406 /// ```
407 #[cfg(feature = "std")]
408 pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
409 self.remote_config = Some(config);
410 self
411 }
412
413 /// Configures a record type manually with a unique key
414 ///
415 /// The key uniquely identifies this record instance. Multiple records of the same
416 /// type can exist with different keys (e.g., "sensor.temperature.room1" and
417 /// "sensor.temperature.room2").
418 ///
419 /// # Arguments
420 /// * `key` - A unique identifier for this record. Can be a string literal, `StringKey`,
421 /// or any type implementing `RecordKey` (including user-defined enum keys).
422 /// * `f` - Configuration closure
423 ///
424 /// # Example
425 /// ```rust,ignore
426 /// // Using string literal
427 /// builder.configure::<Temperature>("sensor.temp.room1", |reg| { ... });
428 ///
429 /// // Using compile-time safe enum key
430 /// builder.configure::<Temperature>(SensorKey::TempRoom1, |reg| { ... });
431 /// ```
432 pub fn configure<T>(
433 &mut self,
434 key: impl RecordKey,
435 f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
436 ) -> &mut Self
437 where
438 T: Send + Sync + 'static + Debug + Clone,
439 {
440 // Convert any RecordKey to StringKey for internal storage
441 let record_key: StringKey = StringKey::from_dynamic(key.as_str());
442 let type_id = TypeId::of::<T>();
443
444 // Find existing record with this key, or create new one
445 let record_index = self.records.iter().position(|(k, _, _)| k == &record_key);
446
447 let (rec, is_new_record) = match record_index {
448 Some(idx) => {
449 // Use existing record
450 let (_, existing_type, record) = &mut self.records[idx];
451 assert!(
452 *existing_type == type_id,
453 "StringKey '{}' already registered with different type",
454 record_key.as_str()
455 );
456 (
457 record
458 .as_typed_mut::<T, R>()
459 .expect("type mismatch in record registry"),
460 false,
461 )
462 }
463 None => {
464 // Create new record
465 self.records
466 .push((record_key, type_id, Box::new(TypedRecord::<T, R>::new())));
467 let (_, _, record) = self.records.last_mut().unwrap();
468 (
469 record
470 .as_typed_mut::<T, R>()
471 .expect("type mismatch in record registry"),
472 true,
473 )
474 }
475 };
476
477 let mut reg = RecordRegistrar {
478 rec,
479 connector_builders: &self.connector_builders,
480 #[cfg(feature = "alloc")]
481 record_key: record_key.as_str().to_string(),
482 };
483 f(&mut reg);
484
485 // Only store spawn function for new records to avoid duplicates
486 if is_new_record {
487 let spawn_key = record_key;
488
489 #[allow(clippy::type_complexity)]
490 let spawn_fn: Box<
491 dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send,
492 > = Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>, id: RecordId| {
493 // Use RecordSpawner to spawn tasks for this record type
494 use crate::typed_record::RecordSpawner;
495
496 let typed_record = db.inner().get_typed_record_by_id::<T, R>(id)?;
497 // Get the record key from the database to enable key-based producer/consumer
498 #[cfg(feature = "alloc")]
499 let key = db
500 .inner()
501 .key_for(id)
502 .map(|k| k.as_str().to_string())
503 .unwrap_or_else(|| alloc::format!("__record_{}", id.index()));
504 #[cfg(not(feature = "alloc"))]
505 let key = "";
506 #[cfg(feature = "alloc")]
507 let key = key.as_str();
508 RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db, key)
509 });
510
511 // Store the spawn function (type-erased in Box<dyn Any>)
512 self.spawn_fns.push((spawn_key, Box::new(spawn_fn)));
513 }
514
515 self
516 }
517
518 /// Registers a self-registering record type
519 ///
520 /// The record type must implement `RecordT<R>`.
521 ///
522 /// Uses the type name as the default key. For custom keys, use `configure()` directly.
523 pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
524 where
525 T: RecordT<R>,
526 {
527 // Default key is the full type name for backward compatibility
528 let key = StringKey::new(core::any::type_name::<T>());
529 self.configure::<T>(key, |reg| T::register(reg, cfg))
530 }
531
532 /// Registers a self-registering record type with a custom key
533 ///
534 /// The record type must implement `RecordT<R>`.
535 pub fn register_record_with_key<T>(&mut self, key: impl RecordKey, cfg: &T::Config) -> &mut Self
536 where
537 T: RecordT<R>,
538 {
539 self.configure::<T>(key, |reg| T::register(reg, cfg))
540 }
541
542 /// Runs the database indefinitely (never returns)
543 ///
544 /// This method builds the database, spawns all producer and consumer tasks, and then
545 /// parks the current task indefinitely. This is the primary way to run AimDB services.
546 ///
547 /// All logic runs in background tasks via producers, consumers, and connectors. The
548 /// application continues until interrupted (e.g., Ctrl+C).
549 ///
550 /// # Returns
551 /// `DbResult<()>` - Ok when database starts successfully, then parks forever
552 ///
553 /// # Example
554 ///
555 /// ```rust,ignore
556 /// #[tokio::main]
557 /// async fn main() -> DbResult<()> {
558 /// AimDbBuilder::new()
559 /// .runtime(Arc::new(TokioAdapter::new()?))
560 /// .configure::<MyData>(|reg| {
561 /// reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
562 /// .with_source(my_producer)
563 /// .with_tap(my_consumer);
564 /// })
565 /// .run().await // Runs forever
566 /// }
567 /// ```
568 pub async fn run(self) -> DbResult<()> {
569 #[cfg(feature = "tracing")]
570 tracing::info!("Building database and spawning background tasks...");
571
572 let _db = self.build().await?;
573
574 #[cfg(feature = "tracing")]
575 tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
576
577 // Park indefinitely - the background tasks will continue running
578 // The database handle is kept alive here to prevent dropping it
579 core::future::pending::<()>().await;
580
581 Ok(())
582 }
583
584 /// Builds the database and returns the handle (async)
585 ///
586 /// Use this when you need programmatic access to the database handle for
587 /// manual subscriptions or production. For typical services, use `.run().await` instead.
588 ///
589 /// **Automatic Task Spawning:** This method spawns all producer services and
590 /// `.tap()` observer tasks that were registered during configuration.
591 ///
592 /// **Connector Setup:** Connectors must be created manually before calling `build()`:
593 ///
594 /// ```rust,ignore
595 /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
596 ///
597 /// // Configure records with connector links
598 /// let builder = AimDbBuilder::new()
599 /// .runtime(runtime)
600 /// .configure::<Temp>("temp", |reg| {
601 /// reg.link_from("mqtt://commands/temp")
602 /// .with_buffer(BufferCfg::SingleLatest)
603 /// .with_serialization();
604 /// });
605 ///
606 /// // Create MQTT connector with router
607 /// let router = RouterBuilder::new()
608 /// .route("commands/temp", /* deserializer */)
609 /// .build();
610 /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
611 ///
612 /// // Register connector and build
613 /// let db = builder
614 /// .with_connector("mqtt", Arc::new(connector))
615 /// .build().await?;
616 /// ```
617 ///
618 /// # Returns
619 /// `DbResult<AimDb<R>>` - The database instance
620 #[cfg_attr(not(feature = "std"), allow(unused_mut))]
621 pub async fn build(self) -> DbResult<AimDb<R>> {
622 use crate::DbError;
623
624 // Validate all records
625 for (key, _, record) in &self.records {
626 record.validate().map_err(|_msg| {
627 // Suppress unused warning for key in no_std
628 let _ = &key;
629 #[cfg(feature = "std")]
630 {
631 DbError::RuntimeError {
632 message: format!("Record '{}' validation failed: {}", key.as_str(), _msg),
633 }
634 }
635 #[cfg(not(feature = "std"))]
636 {
637 DbError::RuntimeError { _message: () }
638 }
639 })?;
640 }
641
642 // Ensure runtime is set
643 let runtime = self.runtime.ok_or({
644 #[cfg(feature = "std")]
645 {
646 DbError::RuntimeError {
647 message: "runtime not set (use .runtime())".into(),
648 }
649 }
650 #[cfg(not(feature = "std"))]
651 {
652 DbError::RuntimeError { _message: () }
653 }
654 })?;
655
656 // Build the new index structures
657 let record_count = self.records.len();
658 let mut storages: Vec<Box<dyn AnyRecord>> = Vec::with_capacity(record_count);
659 let mut by_key: HashMap<StringKey, RecordId> = HashMap::with_capacity(record_count);
660 let mut by_type: HashMap<TypeId, Vec<RecordId>> = HashMap::new();
661 let mut types: Vec<TypeId> = Vec::with_capacity(record_count);
662 let mut keys: Vec<StringKey> = Vec::with_capacity(record_count);
663
664 for (i, (key, type_id, record)) in self.records.into_iter().enumerate() {
665 let id = RecordId::new(i as u32);
666
667 // Check for duplicate keys (should not happen if configure() is used correctly)
668 if by_key.contains_key(&key) {
669 #[cfg(feature = "std")]
670 return Err(DbError::DuplicateRecordKey {
671 key: key.as_str().to_string(),
672 });
673 #[cfg(not(feature = "std"))]
674 return Err(DbError::DuplicateRecordKey { _key: () });
675 }
676
677 // Build index structures
678 storages.push(record);
679 by_key.insert(key, id);
680 by_type.entry(type_id).or_default().push(id);
681 types.push(type_id);
682 keys.push(key);
683 }
684
685 let inner = Arc::new(AimDbInner {
686 storages,
687 by_key,
688 by_type,
689 types,
690 keys,
691 });
692
693 let db = Arc::new(AimDb {
694 inner: inner.clone(),
695 runtime: runtime.clone(),
696 });
697
698 #[cfg(feature = "tracing")]
699 tracing::info!(
700 "Spawning producer services and tap observers for {} records",
701 self.spawn_fns.len()
702 );
703
704 // Execute spawn functions for each record
705 for (key, spawn_fn_any) in self.spawn_fns {
706 // Resolve key to RecordId
707 let id = inner.resolve(&key).ok_or({
708 #[cfg(feature = "std")]
709 {
710 DbError::RecordKeyNotFound {
711 key: key.as_str().to_string(),
712 }
713 }
714 #[cfg(not(feature = "std"))]
715 {
716 DbError::RecordKeyNotFound { _key: () }
717 }
718 })?;
719
720 // Downcast from Box<dyn Any> back to the concrete spawn function type
721 type SpawnFnType<R> =
722 Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send>;
723
724 let spawn_fn = spawn_fn_any
725 .downcast::<SpawnFnType<R>>()
726 .expect("spawn function type mismatch");
727
728 // Execute the spawn function
729 (*spawn_fn)(&runtime, &db, id)?;
730 }
731
732 #[cfg(feature = "tracing")]
733 tracing::info!("Automatic spawning complete");
734
735 // Spawn remote access supervisor if configured (std only)
736 #[cfg(feature = "std")]
737 if let Some(remote_cfg) = self.remote_config {
738 #[cfg(feature = "tracing")]
739 tracing::info!(
740 "Spawning remote access supervisor on socket: {}",
741 remote_cfg.socket_path.display()
742 );
743
744 // Apply security policy to mark writable records
745 let writable_keys = remote_cfg.security_policy.writable_records();
746 for key_str in writable_keys {
747 if let Some(id) = inner.resolve_str(&key_str) {
748 #[cfg(feature = "tracing")]
749 tracing::debug!("Marking record '{}' as writable", key_str);
750
751 // Mark the record as writable (type-erased call)
752 inner.storages[id.index()].set_writable_erased(true);
753 }
754 }
755
756 // Spawn the remote supervisor task
757 crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
758
759 #[cfg(feature = "tracing")]
760 tracing::info!("Remote access supervisor spawned successfully");
761 }
762
763 // Build connectors from builders (after database is fully constructed)
764 // This allows connectors to use collect_inbound_routes() which creates
765 // producers tied to this specific database instance
766 for builder in self.connector_builders {
767 #[cfg(feature = "tracing")]
768 let scheme = {
769 #[cfg(feature = "std")]
770 {
771 builder.scheme().to_string()
772 }
773 #[cfg(not(feature = "std"))]
774 {
775 alloc::string::String::from(builder.scheme())
776 }
777 };
778
779 #[cfg(feature = "tracing")]
780 tracing::debug!("Building connector for scheme: {}", scheme);
781
782 // Build the connector (this spawns tasks as a side effect)
783 let _connector = builder.build(&db).await?;
784
785 #[cfg(feature = "tracing")]
786 tracing::info!("Connector built and spawned successfully: {}", scheme);
787 }
788
789 // Unwrap the Arc to return the owned AimDb
790 // This is safe because we just created it and hold the only reference
791 let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
792
793 Ok(db_owned)
794 }
795}
796
797impl Default for AimDbBuilder<NoRuntime> {
798 fn default() -> Self {
799 Self::new()
800 }
801}
802
803/// Producer-consumer database
804///
805/// A database instance with type-safe record registration and cross-record
806/// communication via the Emitter pattern. The type parameter `R` represents
807/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
808///
809/// See `examples/` for usage.
810///
811/// # Examples
812///
813/// ```rust,ignore
814/// use aimdb_tokio_adapter::TokioAdapter;
815///
816/// let runtime = Arc::new(TokioAdapter);
817/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
818/// .runtime(runtime)
819/// .register_record::<Temperature>(&TemperatureConfig)
820/// .build()?;
821/// ```
822pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
823 /// Internal state
824 inner: Arc<AimDbInner>,
825
826 /// Runtime adapter with concrete type
827 runtime: Arc<R>,
828}
829
830impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
831 fn clone(&self) -> Self {
832 Self {
833 inner: self.inner.clone(),
834 runtime: self.runtime.clone(),
835 }
836 }
837}
838
839impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
840 /// Internal accessor for the inner state
841 ///
842 /// Used by adapter crates and internal spawning logic.
843 #[doc(hidden)]
844 pub fn inner(&self) -> &Arc<AimDbInner> {
845 &self.inner
846 }
847
848 /// Builds a database with a closure-based builder pattern
849 pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
850 let mut b = AimDbBuilder::new().runtime(rt);
851 f(&mut b);
852 b.run().await
853 }
854
855 /// Spawns a task using the database's runtime adapter
856 ///
857 /// This method provides direct access to the runtime's spawn capability.
858 ///
859 /// # Arguments
860 /// * `future` - The future to spawn
861 ///
862 /// # Returns
863 /// `DbResult<()>` - Ok if the task was spawned successfully
864 pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
865 where
866 F: core::future::Future<Output = ()> + Send + 'static,
867 {
868 self.runtime.spawn(future).map_err(DbError::from)?;
869 Ok(())
870 }
871
872 /// Produces a value to a specific record by key
873 ///
874 /// Uses O(1) key-based lookup to find the correct record.
875 ///
876 /// # Arguments
877 /// * `key` - The record key (e.g., "sensor.temperature")
878 /// * `value` - The value to produce
879 ///
880 /// # Example
881 ///
882 /// ```rust,ignore
883 /// db.produce::<Temperature>("sensors.indoor", indoor_temp).await?;
884 /// db.produce::<Temperature>("sensors.outdoor", outdoor_temp).await?;
885 /// ```
886 pub async fn produce<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
887 where
888 T: Send + 'static + Debug + Clone,
889 {
890 let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
891 typed_rec.produce(value).await;
892 Ok(())
893 }
894
895 /// Subscribes to a specific record by key
896 ///
897 /// Uses O(1) key-based lookup to find the correct record.
898 ///
899 /// # Arguments
900 /// * `key` - The record key (e.g., "sensor.temperature")
901 ///
902 /// # Example
903 ///
904 /// ```rust,ignore
905 /// let mut reader = db.subscribe::<Temperature>("sensors.indoor")?;
906 /// while let Ok(temp) = reader.recv().await {
907 /// println!("Indoor: {:.1}°C", temp.celsius);
908 /// }
909 /// ```
910 pub fn subscribe<T>(
911 &self,
912 key: impl AsRef<str>,
913 ) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
914 where
915 T: Send + Sync + 'static + Debug + Clone,
916 {
917 let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
918 typed_rec.subscribe()
919 }
920
921 /// Creates a type-safe producer for a specific record by key
922 ///
923 /// Returns a `Producer<T, R>` bound to a specific record key.
924 ///
925 /// # Arguments
926 /// * `key` - The record key (e.g., "sensor.temperature")
927 ///
928 /// # Example
929 ///
930 /// ```rust,ignore
931 /// let indoor_producer = db.producer::<Temperature>("sensors.indoor");
932 /// let outdoor_producer = db.producer::<Temperature>("sensors.outdoor");
933 ///
934 /// // Each producer writes to its own record
935 /// indoor_producer.produce(indoor_temp).await?;
936 /// outdoor_producer.produce(outdoor_temp).await?;
937 /// ```
938 pub fn producer<T>(
939 &self,
940 key: impl Into<alloc::string::String>,
941 ) -> crate::typed_api::Producer<T, R>
942 where
943 T: Send + 'static + Debug + Clone,
944 {
945 crate::typed_api::Producer::new(Arc::new(self.clone()), key.into())
946 }
947
948 /// Creates a type-safe consumer for a specific record by key
949 ///
950 /// Returns a `Consumer<T, R>` bound to a specific record key.
951 ///
952 /// # Arguments
953 /// * `key` - The record key (e.g., "sensor.temperature")
954 ///
955 /// # Example
956 ///
957 /// ```rust,ignore
958 /// let indoor_consumer = db.consumer::<Temperature>("sensors.indoor");
959 /// let outdoor_consumer = db.consumer::<Temperature>("sensors.outdoor");
960 ///
961 /// // Each consumer reads from its own record
962 /// let mut rx = indoor_consumer.subscribe()?;
963 /// ```
964 pub fn consumer<T>(
965 &self,
966 key: impl Into<alloc::string::String>,
967 ) -> crate::typed_api::Consumer<T, R>
968 where
969 T: Send + Sync + 'static + Debug + Clone,
970 {
971 crate::typed_api::Consumer::new(Arc::new(self.clone()), key.into())
972 }
973
974 /// Resolve a record key to its RecordId
975 ///
976 /// Useful for checking if a record exists before operations.
977 ///
978 /// # Example
979 ///
980 /// ```rust,ignore
981 /// if let Some(id) = db.resolve_key("sensors.temperature") {
982 /// println!("Record exists with ID: {}", id);
983 /// }
984 /// ```
985 pub fn resolve_key(&self, key: &str) -> Option<crate::record_id::RecordId> {
986 self.inner.resolve_str(key)
987 }
988
989 /// Get all record IDs for a specific type
990 ///
991 /// Returns a slice of RecordIds for all records of type T.
992 /// Useful for introspection when multiple records of the same type exist.
993 ///
994 /// # Example
995 ///
996 /// ```rust,ignore
997 /// let temp_ids = db.records_of_type::<Temperature>();
998 /// println!("Found {} temperature records", temp_ids.len());
999 /// ```
1000 pub fn records_of_type<T: 'static>(&self) -> &[crate::record_id::RecordId] {
1001 self.inner.records_of_type::<T>()
1002 }
1003
1004 /// Returns a reference to the runtime adapter
1005 ///
1006 /// Provides direct access to the concrete runtime type.
1007 pub fn runtime(&self) -> &R {
1008 &self.runtime
1009 }
1010
1011 /// Lists all registered records (std only)
1012 ///
1013 /// Returns metadata for all registered records, useful for remote access introspection.
1014 /// Available only when the `std` feature is enabled.
1015 ///
1016 /// # Example
1017 /// ```rust,ignore
1018 /// let records = db.list_records();
1019 /// for record in records {
1020 /// println!("Record: {} ({})", record.name, record.type_id);
1021 /// }
1022 /// ```
1023 #[cfg(feature = "std")]
1024 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
1025 self.inner.list_records()
1026 }
1027
1028 /// Try to get record's latest value as JSON by name (std only)
1029 ///
1030 /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
1031 ///
1032 /// # Arguments
1033 /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
1034 ///
1035 /// # Returns
1036 /// `Some(JsonValue)` with current value, or `None` if unavailable
1037 #[cfg(feature = "std")]
1038 pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
1039 self.inner.try_latest_as_json(record_name)
1040 }
1041
1042 /// Sets a record value from JSON (remote access API)
1043 ///
1044 /// Deserializes JSON and produces the value to the record's buffer.
1045 ///
1046 /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
1047 /// records without active producers.
1048 ///
1049 /// # Arguments
1050 /// * `record_name` - Full Rust type name
1051 /// * `json_value` - JSON value to set
1052 ///
1053 /// # Returns
1054 /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
1055 ///
1056 /// # Example (internal use)
1057 /// ```rust,ignore
1058 /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
1059 /// ```
1060 #[cfg(feature = "std")]
1061 pub fn set_record_from_json(
1062 &self,
1063 record_name: &str,
1064 json_value: serde_json::Value,
1065 ) -> DbResult<()> {
1066 self.inner.set_record_from_json(record_name, json_value)
1067 }
1068
1069 /// Subscribe to record updates as JSON stream (std only)
1070 ///
1071 /// Creates a subscription to a record's buffer and forwards updates as JSON
1072 /// to a bounded channel. This is used internally by the remote access protocol
1073 /// for implementing `record.subscribe`.
1074 ///
1075 /// # Architecture
1076 ///
1077 /// Spawns a consumer task that:
1078 /// 1. Subscribes to the record's buffer using the existing buffer API
1079 /// 2. Reads values as they arrive
1080 /// 3. Serializes each value to JSON
1081 /// 4. Sends JSON values to a bounded channel (with backpressure handling)
1082 /// 5. Terminates when either:
1083 /// - The cancel signal is received (unsubscribe)
1084 /// - The channel receiver is dropped (client disconnected)
1085 ///
1086 /// # Arguments
1087 /// * `record_key` - Key of the record to subscribe to
1088 /// * `queue_size` - Size of the bounded channel for this subscription
1089 ///
1090 /// # Returns
1091 /// `Ok((receiver, cancel_tx))` where:
1092 /// - `receiver`: Bounded channel receiver for JSON values
1093 /// - `cancel_tx`: One-shot sender to cancel the subscription
1094 ///
1095 /// `Err` if:
1096 /// - Record not found for the given key
1097 /// - Record not configured with `.with_serialization()`
1098 /// - Failed to subscribe to buffer
1099 ///
1100 /// # Example (internal use)
1101 ///
1102 /// ```rust,ignore
1103 /// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
1104 ///
1105 /// // Read events
1106 /// while let Some(json_value) = rx.recv().await {
1107 /// // Forward to client...
1108 /// }
1109 ///
1110 /// // Cancel subscription
1111 /// let _ = cancel_tx.send(());
1112 /// ```
1113 #[cfg(feature = "std")]
1114 #[allow(unused_variables)] // Variables used only in tracing feature
1115 pub fn subscribe_record_updates(
1116 &self,
1117 record_key: &str,
1118 queue_size: usize,
1119 ) -> DbResult<(
1120 tokio::sync::mpsc::Receiver<serde_json::Value>,
1121 tokio::sync::oneshot::Sender<()>,
1122 )> {
1123 use tokio::sync::{mpsc, oneshot};
1124
1125 // Find the record by key
1126 let id = self
1127 .inner
1128 .resolve_str(record_key)
1129 .ok_or_else(|| DbError::RecordKeyNotFound {
1130 key: record_key.to_string(),
1131 })?;
1132
1133 let record = self
1134 .inner
1135 .storage(id)
1136 .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
1137
1138 // Subscribe to the record's buffer as JSON stream
1139 // This will fail if record not configured with .with_serialization()
1140 let mut json_reader = record.subscribe_json()?;
1141
1142 // Create channels for the subscription
1143 let (value_tx, value_rx) = mpsc::channel(queue_size);
1144 let (cancel_tx, mut cancel_rx) = oneshot::channel();
1145
1146 // Get metadata for logging
1147 let type_id = self.inner.types[id.index()];
1148 let key = self.inner.keys[id.index()];
1149 let record_metadata = record.collect_metadata(type_id, key, id);
1150 let runtime = self.runtime.clone();
1151
1152 // Spawn consumer task that forwards JSON values from buffer to channel
1153 let spawn_result = runtime.spawn(async move {
1154 #[cfg(feature = "tracing")]
1155 tracing::debug!(
1156 "Subscription consumer task started for {}",
1157 record_metadata.name
1158 );
1159
1160 // Main event loop: read from buffer and forward to channel
1161 loop {
1162 tokio::select! {
1163 // Handle cancellation signal
1164 _ = &mut cancel_rx => {
1165 #[cfg(feature = "tracing")]
1166 tracing::debug!("Subscription cancelled");
1167 break;
1168 }
1169 // Read next JSON value from buffer
1170 result = json_reader.recv_json() => {
1171 match result {
1172 Ok(json_val) => {
1173 // Send JSON value to subscription channel
1174 if value_tx.send(json_val).await.is_err() {
1175 #[cfg(feature = "tracing")]
1176 tracing::debug!("Subscription receiver dropped");
1177 break;
1178 }
1179 }
1180 Err(DbError::BufferLagged { lag_count, .. }) => {
1181 // Consumer fell behind - log warning but continue
1182 #[cfg(feature = "tracing")]
1183 tracing::warn!(
1184 "Subscription for {} lagged by {} messages",
1185 record_metadata.name,
1186 lag_count
1187 );
1188 // Continue reading - next recv will get latest
1189 }
1190 Err(DbError::BufferClosed { .. }) => {
1191 // Buffer closed (shutdown) - exit gracefully
1192 #[cfg(feature = "tracing")]
1193 tracing::debug!("Buffer closed for {}", record_metadata.name);
1194 break;
1195 }
1196 Err(e) => {
1197 // Other error (shouldn't happen in practice)
1198 #[cfg(feature = "tracing")]
1199 tracing::error!(
1200 "Subscription error for {}: {:?}",
1201 record_metadata.name,
1202 e
1203 );
1204 break;
1205 }
1206 }
1207 }
1208 }
1209 }
1210
1211 #[cfg(feature = "tracing")]
1212 tracing::debug!("Subscription consumer task terminated");
1213 });
1214
1215 spawn_result.map_err(DbError::from)?;
1216
1217 Ok((value_rx, cancel_tx))
1218 }
1219
1220 /// Collects inbound connector routes for automatic router construction (std only)
1221 ///
1222 /// Iterates all records, filters their inbound_connectors by scheme,
1223 /// and returns routes with producer creation callbacks.
1224 ///
1225 /// # Arguments
1226 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1227 ///
1228 /// # Returns
1229 /// Vector of tuples: (topic, producer_trait, deserializer)
1230 ///
1231 /// # Example
1232 /// ```rust,ignore
1233 /// // In MqttConnector after db.build()
1234 /// let routes = db.collect_inbound_routes("mqtt");
1235 /// let router = RouterBuilder::from_routes(routes).build();
1236 /// connector.set_router(router).await?;
1237 /// ```
1238 #[cfg(feature = "alloc")]
1239 pub fn collect_inbound_routes(
1240 &self,
1241 scheme: &str,
1242 ) -> Vec<(
1243 String,
1244 Box<dyn crate::connector::ProducerTrait>,
1245 crate::connector::DeserializerFn,
1246 )> {
1247 let mut routes = Vec::new();
1248
1249 // Convert self to Arc<dyn Any> for producer factory
1250 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1251
1252 for record in &self.inner.storages {
1253 let inbound_links = record.inbound_connectors();
1254
1255 for link in inbound_links {
1256 // Filter by scheme
1257 if link.url.scheme() != scheme {
1258 continue;
1259 }
1260
1261 let topic = link.url.resource_id();
1262
1263 // Create producer using the stored factory
1264 if let Some(producer) = link.create_producer(db_any.clone()) {
1265 routes.push((topic, producer, link.deserializer.clone()));
1266 }
1267 }
1268 }
1269
1270 #[cfg(feature = "tracing")]
1271 if !routes.is_empty() {
1272 tracing::debug!(
1273 "Collected {} inbound routes for scheme '{}'",
1274 routes.len(),
1275 scheme
1276 );
1277 }
1278
1279 routes
1280 }
1281
1282 /// Collects outbound routes for a specific protocol scheme
1283 ///
1284 /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1285 /// filters their outbound_connectors by scheme, and returns routes with
1286 /// consumer creation callbacks.
1287 ///
1288 /// This method is called by connectors during their `build()` phase to
1289 /// collect all configured outbound routes and spawn publisher tasks.
1290 ///
1291 /// # Arguments
1292 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1293 ///
1294 /// # Returns
1295 /// Vector of tuples: (destination, consumer_trait, serializer, config)
1296 ///
1297 /// The config Vec contains protocol-specific options (e.g., qos, retain).
1298 ///
1299 /// # Example
1300 /// ```rust,ignore
1301 /// // In MqttConnector::build()
1302 /// let routes = db.collect_outbound_routes("mqtt");
1303 /// for (topic, consumer, serializer, config) in routes {
1304 /// connector.spawn_publisher(topic, consumer, serializer, config)?;
1305 /// }
1306 /// ```
1307 #[cfg(feature = "alloc")]
1308 pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1309 let mut routes = Vec::new();
1310
1311 // Convert self to Arc<dyn Any> for consumer factory
1312 // This is necessary because the factory takes Arc<dyn Any> to avoid
1313 // needing to know the runtime type R at the factory definition site
1314 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1315
1316 for record in &self.inner.storages {
1317 let outbound_links = record.outbound_connectors();
1318
1319 for link in outbound_links {
1320 // Filter by scheme
1321 if link.url.scheme() != scheme {
1322 continue;
1323 }
1324
1325 let destination = link.url.resource_id();
1326
1327 // Skip links without serializer
1328 let Some(serializer) = link.serializer.clone() else {
1329 #[cfg(feature = "tracing")]
1330 tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1331 continue;
1332 };
1333
1334 // Create consumer using the stored factory
1335 if let Some(consumer) = link.create_consumer(db_any.clone()) {
1336 routes.push((destination, consumer, serializer, link.config.clone()));
1337 }
1338 }
1339 }
1340
1341 #[cfg(feature = "tracing")]
1342 if !routes.is_empty() {
1343 tracing::debug!(
1344 "Collected {} outbound routes for scheme '{}'",
1345 routes.len(),
1346 scheme
1347 );
1348 }
1349
1350 routes
1351 }
1352}