aimdb_core/builder.rs
1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::vec::Vec;
13use hashbrown::HashMap;
14
15#[cfg(not(feature = "std"))]
16use alloc::{boxed::Box, sync::Arc};
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19use alloc::string::{String, ToString};
20
21#[cfg(feature = "std")]
22use std::{boxed::Box, sync::Arc};
23
24use crate::extensions::Extensions;
25use crate::graph::DependencyGraph;
26
27/// Type-erased on_start function stored in `AimDbBuilder::start_fns`.
28///
29/// Defined once here so `on_start()` (which stores) and `build()` (which
30/// downcasts) share the *exact same* type and a silent type mismatch cannot
31/// cause a runtime panic.
32#[cfg(feature = "std")]
33type StartFnType<R> = Box<
34 dyn FnOnce(
35 Arc<R>,
36 )
37 -> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
38 + Send,
39>;
40#[cfg(not(feature = "std"))]
41type NoStdStartFnType<R> = Box<
42 dyn FnOnce(
43 Arc<R>,
44 )
45 -> core::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send + 'static>>
46 + Send,
47>;
48use crate::record_id::{RecordId, RecordKey, StringKey};
49use crate::typed_api::{RecordRegistrar, RecordT};
50use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
51use crate::{DbError, DbResult};
52
53/// Type alias for outbound route tuples returned by `collect_outbound_routes`
54///
55/// Each tuple contains:
56/// - `String` - Default topic/destination from the URL path
57/// - `Box<dyn ConsumerTrait>` - Consumer for subscribing to record values
58/// - `SerializerKind` - User-provided serializer for the record type (raw or context-aware)
59/// - `Vec<(String, String)>` - Configuration options from the URL query
60/// - `Option<TopicProviderFn>` - Optional dynamic topic provider
61#[cfg(feature = "alloc")]
62pub type OutboundRoute = (
63 String,
64 Box<dyn crate::connector::ConsumerTrait>,
65 crate::connector::SerializerKind,
66 Vec<(String, String)>,
67 Option<crate::connector::TopicProviderFn>,
68);
69
70/// Marker type for untyped builder (before runtime is set)
71pub struct NoRuntime;
72
73/// Internal database state
74///
75/// Holds the registry of typed records with multiple index structures for
76/// efficient access patterns:
77///
78/// - **`storages`**: Vec for O(1) hot-path access by RecordId
79/// - **`by_key`**: HashMap for O(1) lookup by stable RecordKey
80/// - **`by_type`**: HashMap for introspection (find all records of type T)
81/// - **`types`**: Vec for runtime type validation during downcasts
82/// - **`dependency_graph`**: DAG of record relationships (immutable after build)
83pub struct AimDbInner {
84 /// Record storage (hot path - indexed by RecordId)
85 ///
86 /// Order matches registration order. Immutable after build().
87 storages: Vec<Box<dyn AnyRecord>>,
88
89 /// Name → RecordId lookup (control plane)
90 ///
91 /// Used by remote access, CLI, MCP for O(1) name resolution.
92 by_key: HashMap<StringKey, RecordId>,
93
94 /// TypeId → RecordIds lookup (introspection)
95 ///
96 /// Enables "find all Temperature records" queries.
97 by_type: HashMap<TypeId, Vec<RecordId>>,
98
99 /// RecordId → TypeId lookup (type safety assertions)
100 ///
101 /// Used to validate downcasts at runtime.
102 types: Vec<TypeId>,
103
104 /// RecordId → StringKey lookup (reverse mapping)
105 ///
106 /// Used to get the key for a given record ID.
107 keys: Vec<StringKey>,
108
109 /// Dependency graph (immutable after build)
110 ///
111 /// Contains all record nodes, edges, and topological order.
112 /// Used for introspection, spawn ordering, and graph queries.
113 dependency_graph: crate::graph::DependencyGraph,
114
115 /// Generic extension storage (frozen after build)
116 ///
117 /// External crates (e.g. `aimdb-persistence`) store typed state here
118 /// during builder configuration and retrieve it at query time via
119 /// `AimDb::extensions()`.
120 pub(crate) extensions: Extensions,
121}
122
123impl AimDbInner {
124 /// Resolve RecordKey to RecordId (control plane - O(1) average)
125 #[inline]
126 pub fn resolve<K: RecordKey>(&self, key: &K) -> Option<RecordId> {
127 self.by_key.get(key.as_str()).copied()
128 }
129
130 /// Resolve string to RecordId (convenience for remote access)
131 ///
132 /// O(1) average thanks to `Borrow<str>` implementation on `RecordKey`.
133 #[inline]
134 pub fn resolve_str(&self, name: &str) -> Option<RecordId> {
135 self.by_key.get(name).copied()
136 }
137
138 /// Get storage by RecordId (hot path - O(1))
139 #[inline]
140 pub fn storage(&self, id: RecordId) -> Option<&dyn AnyRecord> {
141 self.storages.get(id.index()).map(|b| b.as_ref())
142 }
143
144 /// Get the StringKey for a given RecordId
145 #[inline]
146 pub fn key_for(&self, id: RecordId) -> Option<&StringKey> {
147 self.keys.get(id.index())
148 }
149
150 /// Get all RecordIds for a type (introspection)
151 pub fn records_of_type<T: 'static>(&self) -> &[RecordId] {
152 self.by_type
153 .get(&TypeId::of::<T>())
154 .map(|v| v.as_slice())
155 .unwrap_or(&[])
156 }
157
158 /// Get the number of registered records
159 #[inline]
160 pub fn record_count(&self) -> usize {
161 self.storages.len()
162 }
163
164 /// Get the dependency graph (immutable reference)
165 ///
166 /// The graph is constructed once during `build()` and never changes.
167 /// Contains all record nodes, edges, and topological ordering.
168 #[inline]
169 pub fn dependency_graph(&self) -> &crate::graph::DependencyGraph {
170 &self.dependency_graph
171 }
172
173 /// Helper to get a typed record by RecordKey
174 ///
175 /// This encapsulates the common pattern of:
176 /// 1. Resolving key to RecordId
177 /// 2. Validating TypeId matches
178 /// 3. Downcasting to the typed record
179 pub fn get_typed_record_by_key<T, R>(
180 &self,
181 key: impl AsRef<str>,
182 ) -> DbResult<&TypedRecord<T, R>>
183 where
184 T: Send + 'static + Debug + Clone,
185 R: aimdb_executor::Spawn + 'static,
186 {
187 let key_str = key.as_ref();
188
189 // Resolve key to RecordId
190 let id = self.resolve_str(key_str).ok_or({
191 #[cfg(feature = "std")]
192 {
193 DbError::RecordKeyNotFound {
194 key: key_str.to_string(),
195 }
196 }
197 #[cfg(not(feature = "std"))]
198 {
199 DbError::RecordKeyNotFound { _key: () }
200 }
201 })?;
202
203 self.get_typed_record_by_id::<T, R>(id)
204 }
205
206 /// Helper to get a typed record by RecordId with type validation
207 pub fn get_typed_record_by_id<T, R>(&self, id: RecordId) -> DbResult<&TypedRecord<T, R>>
208 where
209 T: Send + 'static + Debug + Clone,
210 R: aimdb_executor::Spawn + 'static,
211 {
212 use crate::typed_record::AnyRecordExt;
213
214 // Validate RecordId is in bounds
215 if id.index() >= self.storages.len() {
216 return Err(DbError::InvalidRecordId { id: id.raw() });
217 }
218
219 // Validate TypeId matches
220 let expected = TypeId::of::<T>();
221 let actual = self.types[id.index()];
222 if expected != actual {
223 #[cfg(feature = "std")]
224 return Err(DbError::TypeMismatch {
225 record_id: id.raw(),
226 expected_type: core::any::type_name::<T>().to_string(),
227 });
228 #[cfg(not(feature = "std"))]
229 return Err(DbError::TypeMismatch {
230 record_id: id.raw(),
231 _expected_type: (),
232 });
233 }
234
235 // Safe to downcast (type validated above)
236 let record = &self.storages[id.index()];
237
238 #[cfg(feature = "std")]
239 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
240 operation: "get_typed_record_by_id".to_string(),
241 reason: "type mismatch during downcast".to_string(),
242 })?;
243
244 #[cfg(not(feature = "std"))]
245 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
246 _operation: (),
247 _reason: (),
248 })?;
249
250 Ok(typed_record)
251 }
252
253 /// Collects metadata for all registered records (std only)
254 ///
255 /// Returns a vector of `RecordMetadata` for remote access introspection.
256 /// Available only when the `std` feature is enabled.
257 #[cfg(feature = "std")]
258 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
259 self.storages
260 .iter()
261 .enumerate()
262 .map(|(i, record)| {
263 let id = RecordId::new(i as u32);
264 let type_id = self.types[i];
265 let key = self.keys[i];
266 record.collect_metadata(type_id, key, id)
267 })
268 .collect()
269 }
270
271 /// Try to get record's latest value as JSON by record key (std only)
272 ///
273 /// O(1) lookup using the key-based index.
274 ///
275 /// # Arguments
276 /// * `record_key` - The record key (e.g., "sensors.temperature")
277 ///
278 /// # Returns
279 /// `Some(JsonValue)` with the current record value, or `None`
280 #[cfg(feature = "std")]
281 pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
282 let id = self.resolve_str(record_key)?;
283 self.storages.get(id.index())?.latest_json()
284 }
285
286 /// Sets a record value from JSON (remote access API)
287 ///
288 /// Deserializes the JSON value and writes it to the record's buffer.
289 ///
290 /// **SAFETY:** Enforces the "No Producer Override" rule:
291 /// - Only works for records with `producer_count == 0`
292 /// - Returns error if the record has active producers
293 ///
294 /// # Arguments
295 /// * `record_key` - The record key (e.g., "config.app")
296 /// * `json_value` - JSON representation of the value
297 ///
298 /// # Returns
299 /// - `Ok(())` - Successfully set the value
300 /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
301 #[cfg(feature = "std")]
302 pub fn set_record_from_json(
303 &self,
304 record_key: &str,
305 json_value: serde_json::Value,
306 ) -> DbResult<()> {
307 let id = self
308 .resolve_str(record_key)
309 .ok_or_else(|| DbError::RecordKeyNotFound {
310 key: record_key.to_string(),
311 })?;
312
313 self.storages[id.index()].set_from_json(json_value)
314 }
315}
316
317/// Database builder for producer-consumer pattern
318///
319/// Provides a fluent API for constructing databases with type-safe record registration.
320/// Use `.runtime()` to set the runtime and transition to a typed builder.
321pub struct AimDbBuilder<R = NoRuntime> {
322 /// Registered records with their keys (order matters for RecordId assignment)
323 records: Vec<(StringKey, TypeId, Box<dyn AnyRecord>)>,
324
325 /// Runtime adapter
326 runtime: Option<Arc<R>>,
327
328 /// Connector builders that will be invoked during build()
329 connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
330
331 /// Spawn functions with their keys
332 spawn_fns: Vec<(StringKey, Box<dyn core::any::Any + Send>)>,
333
334 /// Startup tasks registered via on_start() — spawned after build() completes.
335 /// Stored type-erased (Box<Box<dyn FnOnce(Arc<R>) -> BoxFuture<…>>>) to allow
336 /// the field to exist on the unparameterised NoRuntime builder too.
337 start_fns: Vec<Box<dyn core::any::Any + Send>>,
338
339 /// Generic extension storage for external crates (e.g., persistence, metrics).
340 /// Moved into AimDbInner during build() so it can be read on the live AimDb handle.
341 extensions: Extensions,
342
343 /// Remote access configuration (std only)
344 #[cfg(feature = "std")]
345 remote_config: Option<crate::remote::AimxConfig>,
346
347 /// PhantomData to track the runtime type parameter
348 _phantom: PhantomData<R>,
349}
350
351impl AimDbBuilder<NoRuntime> {
352 /// Creates a new database builder without a runtime
353 ///
354 /// Call `.runtime()` to set the runtime adapter.
355 pub fn new() -> Self {
356 Self {
357 records: Vec::new(),
358 runtime: None,
359 connector_builders: Vec::new(),
360 spawn_fns: Vec::new(),
361 start_fns: Vec::new(),
362 extensions: Extensions::new(),
363 #[cfg(feature = "std")]
364 remote_config: None,
365 _phantom: PhantomData,
366 }
367 }
368
369 /// Sets the runtime adapter
370 ///
371 /// This transitions the builder from untyped to typed with concrete runtime `R`.
372 ///
373 /// # Type Safety Note
374 ///
375 /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
376 /// transition because connectors are parameterized by the runtime type:
377 ///
378 /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
379 /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
380 ///
381 /// These types are incompatible and cannot be transferred. However, this is not a bug
382 /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
383 /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
384 ///
385 /// This means the type system **enforces** the correct call order:
386 /// ```rust,ignore
387 /// AimDbBuilder::new()
388 /// .runtime(runtime) // ← Must be called first
389 /// .with_connector(connector) // ← Now available
390 /// ```
391 ///
392 /// The `records` and `remote_config` are preserved across the transition since they
393 /// are not parameterized by the runtime type.
394 pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
395 where
396 R: aimdb_executor::Spawn + 'static,
397 {
398 AimDbBuilder {
399 records: self.records,
400 runtime: Some(rt),
401 connector_builders: Vec::new(),
402 spawn_fns: Vec::new(),
403 start_fns: self.start_fns,
404 extensions: self.extensions,
405 #[cfg(feature = "std")]
406 remote_config: self.remote_config,
407 _phantom: PhantomData,
408 }
409 }
410}
411
412impl<R> AimDbBuilder<R>
413where
414 R: aimdb_executor::Spawn + 'static,
415{
416 /// Returns a shared reference to the extension storage.
417 ///
418 /// External crates use this to retrieve state stored via `extensions_mut()`.
419 pub fn extensions(&self) -> &Extensions {
420 &self.extensions
421 }
422
423 /// Returns a mutable reference to the extension storage.
424 ///
425 /// Use this inside `AimDbBuilderPersistExt` (or similar) to store typed state
426 /// that `.persist()` and `AimDbQueryExt` will later retrieve.
427 pub fn extensions_mut(&mut self) -> &mut Extensions {
428 &mut self.extensions
429 }
430
431 /// Registers a task to be spawned after `build()` completes.
432 ///
433 /// The closure receives an `Arc<R>` (the runtime adapter) and must return a
434 /// future that runs for as long as needed (e.g. an infinite cleanup loop).
435 /// Tasks are spawned in registration order, after all record tasks and
436 /// connectors have been started.
437 ///
438 /// # Example
439 /// ```rust,ignore
440 /// builder.on_start(|runtime| async move {
441 /// loop {
442 /// do_cleanup().await;
443 /// runtime.sleep(Duration::from_secs(3600)).await;
444 /// }
445 /// });
446 /// ```
447 pub fn on_start<F, Fut>(&mut self, f: F) -> &mut Self
448 where
449 F: FnOnce(Arc<R>) -> Fut + Send + 'static,
450 Fut: core::future::Future<Output = ()> + Send + 'static,
451 {
452 // Type-erase so the field can be shared with the `NoRuntime` builder struct.
453 // Uses the module-level `StartFnType<R>` alias — must stay in sync with
454 // the downcast in `build()`.
455 #[cfg(feature = "std")]
456 let boxed: StartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
457 #[cfg(not(feature = "std"))]
458 let boxed: NoStdStartFnType<R> = Box::new(move |runtime| Box::pin(f(runtime)));
459 self.start_fns.push(Box::new(boxed));
460 self
461 }
462
463 /// Registers a connector builder that will be invoked during `build()`
464 ///
465 /// The connector builder will be called after the database is constructed,
466 /// allowing it to collect routes and initialize the connector properly.
467 ///
468 /// # Arguments
469 /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
470 ///
471 /// # Example
472 ///
473 /// ```rust,ignore
474 /// use aimdb_mqtt_connector::MqttConnector;
475 ///
476 /// let db = AimDbBuilder::new()
477 /// .runtime(runtime)
478 /// .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
479 /// .configure::<Temperature>(|reg| {
480 /// reg.link_from("mqtt://commands/temp")...
481 /// })
482 /// .build().await?;
483 /// ```
484 pub fn with_connector(
485 mut self,
486 builder: impl crate::connector::ConnectorBuilder<R> + 'static,
487 ) -> Self {
488 self.connector_builders.push(Box::new(builder));
489 self
490 }
491
492 /// Enables remote access via AimX protocol (std only)
493 ///
494 /// Configures the database to accept remote connections over a Unix domain socket,
495 /// allowing external clients to introspect records, subscribe to updates, and
496 /// (optionally) write data.
497 ///
498 /// The remote access supervisor will be spawned automatically during `build()`.
499 ///
500 /// # Arguments
501 /// * `config` - Remote access configuration (socket path, security policy, etc.)
502 ///
503 /// # Example
504 ///
505 /// ```rust,ignore
506 /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
507 ///
508 /// let config = AimxConfig::new("/tmp/aimdb.sock")
509 /// .with_security(SecurityPolicy::read_only());
510 ///
511 /// let db = AimDbBuilder::new()
512 /// .runtime(runtime)
513 /// .with_remote_access(config)
514 /// .build()?;
515 /// ```
516 #[cfg(feature = "std")]
517 pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
518 self.remote_config = Some(config);
519 self
520 }
521
522 /// Configures a record type manually with a unique key
523 ///
524 /// The key uniquely identifies this record instance. Multiple records of the same
525 /// type can exist with different keys (e.g., "sensor.temperature.room1" and
526 /// "sensor.temperature.room2").
527 ///
528 /// # Arguments
529 /// * `key` - A unique identifier for this record. Can be a string literal, `StringKey`,
530 /// or any type implementing `RecordKey` (including user-defined enum keys).
531 /// * `f` - Configuration closure
532 ///
533 /// # Example
534 /// ```rust,ignore
535 /// // Using string literal
536 /// builder.configure::<Temperature>("sensor.temp.room1", |reg| { ... });
537 ///
538 /// // Using compile-time safe enum key
539 /// builder.configure::<Temperature>(SensorKey::TempRoom1, |reg| { ... });
540 /// ```
541 pub fn configure<T>(
542 &mut self,
543 key: impl RecordKey,
544 f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
545 ) -> &mut Self
546 where
547 T: Send + Sync + 'static + Debug + Clone,
548 {
549 // Convert any RecordKey to StringKey for internal storage
550 let record_key: StringKey = StringKey::from_dynamic(key.as_str());
551 let type_id = TypeId::of::<T>();
552
553 // Find existing record with this key, or create new one
554 let record_index = self.records.iter().position(|(k, _, _)| k == &record_key);
555
556 let (rec, is_new_record) = match record_index {
557 Some(idx) => {
558 // Use existing record
559 let (_, existing_type, record) = &mut self.records[idx];
560 assert!(
561 *existing_type == type_id,
562 "StringKey '{}' already registered with different type",
563 record_key.as_str()
564 );
565 (
566 record
567 .as_typed_mut::<T, R>()
568 .expect("type mismatch in record registry"),
569 false,
570 )
571 }
572 None => {
573 // Create new record
574 self.records
575 .push((record_key, type_id, Box::new(TypedRecord::<T, R>::new())));
576 let (_, _, record) = self.records.last_mut().unwrap();
577 (
578 record
579 .as_typed_mut::<T, R>()
580 .expect("type mismatch in record registry"),
581 true,
582 )
583 }
584 };
585
586 let mut reg = RecordRegistrar {
587 rec,
588 connector_builders: &self.connector_builders,
589 #[cfg(feature = "alloc")]
590 record_key: record_key.as_str().to_string(),
591 extensions: &self.extensions,
592 last_stage: None,
593 };
594 f(&mut reg);
595
596 // Only store spawn function for new records to avoid duplicates
597 if is_new_record {
598 let spawn_key = record_key;
599
600 #[allow(clippy::type_complexity)]
601 let spawn_fn: Box<
602 dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send,
603 > = Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>, id: RecordId| {
604 // Use RecordSpawner to spawn tasks for this record type
605 use crate::typed_record::RecordSpawner;
606
607 let typed_record = db.inner().get_typed_record_by_id::<T, R>(id)?;
608 // Get the record key from the database to enable key-based producer/consumer
609 #[cfg(feature = "alloc")]
610 let key = db
611 .inner()
612 .key_for(id)
613 .map(|k| k.as_str().to_string())
614 .unwrap_or_else(|| alloc::format!("__record_{}", id.index()));
615 #[cfg(not(feature = "alloc"))]
616 let key = "";
617 #[cfg(feature = "alloc")]
618 let key = key.as_str();
619 RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db, key)
620 });
621
622 // Store the spawn function (type-erased in Box<dyn Any>)
623 self.spawn_fns.push((spawn_key, Box::new(spawn_fn)));
624 }
625
626 self
627 }
628
629 /// Registers a self-registering record type
630 ///
631 /// The record type must implement `RecordT<R>`.
632 ///
633 /// Uses the type name as the default key. For custom keys, use `configure()` directly.
634 pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
635 where
636 T: RecordT<R>,
637 {
638 // Default key is the full type name for backward compatibility
639 let key = StringKey::new(core::any::type_name::<T>());
640 self.configure::<T>(key, |reg| T::register(reg, cfg))
641 }
642
643 /// Registers a self-registering record type with a custom key
644 ///
645 /// The record type must implement `RecordT<R>`.
646 pub fn register_record_with_key<T>(&mut self, key: impl RecordKey, cfg: &T::Config) -> &mut Self
647 where
648 T: RecordT<R>,
649 {
650 self.configure::<T>(key, |reg| T::register(reg, cfg))
651 }
652
653 /// Runs the database indefinitely (never returns)
654 ///
655 /// This method builds the database, spawns all producer and consumer tasks, and then
656 /// parks the current task indefinitely. This is the primary way to run AimDB services.
657 ///
658 /// All logic runs in background tasks via producers, consumers, and connectors. The
659 /// application continues until interrupted (e.g., Ctrl+C).
660 ///
661 /// # Returns
662 /// `DbResult<()>` - Ok when database starts successfully, then parks forever
663 ///
664 /// # Example
665 ///
666 /// ```rust,ignore
667 /// #[tokio::main]
668 /// async fn main() -> DbResult<()> {
669 /// AimDbBuilder::new()
670 /// .runtime(Arc::new(TokioAdapter::new()?))
671 /// .configure::<MyData>(|reg| {
672 /// reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
673 /// .with_source(my_producer)
674 /// .with_tap(my_consumer);
675 /// })
676 /// .run().await // Runs forever
677 /// }
678 /// ```
679 pub async fn run(self) -> DbResult<()>
680 where
681 R: crate::RuntimeForProfiling,
682 {
683 #[cfg(feature = "tracing")]
684 tracing::info!("Building database and spawning background tasks...");
685
686 let _db = self.build().await?;
687
688 #[cfg(feature = "tracing")]
689 tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
690
691 // Park indefinitely - the background tasks will continue running
692 // The database handle is kept alive here to prevent dropping it
693 core::future::pending::<()>().await;
694
695 Ok(())
696 }
697
698 /// Builds the database and returns the handle (async)
699 ///
700 /// Use this when you need programmatic access to the database handle for
701 /// manual subscriptions or production. For typical services, use `.run().await` instead.
702 ///
703 /// **Automatic Task Spawning:** This method spawns all producer services and
704 /// `.tap()` observer tasks that were registered during configuration.
705 ///
706 /// **Connector Setup:** Connectors must be created manually before calling `build()`:
707 ///
708 /// ```rust,ignore
709 /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
710 ///
711 /// // Configure records with connector links
712 /// let builder = AimDbBuilder::new()
713 /// .runtime(runtime)
714 /// .configure::<Temp>("temp", |reg| {
715 /// reg.link_from("mqtt://commands/temp")
716 /// .with_buffer(BufferCfg::SingleLatest)
717 /// .with_remote_access();
718 /// });
719 ///
720 /// // Create MQTT connector with router
721 /// let router = RouterBuilder::new()
722 /// .route("commands/temp", /* deserializer */)
723 /// .build();
724 /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
725 ///
726 /// // Register connector and build
727 /// let db = builder
728 /// .with_connector("mqtt", Arc::new(connector))
729 /// .build().await?;
730 /// ```
731 ///
732 /// # Returns
733 /// `DbResult<AimDb<R>>` - The database instance
734 #[cfg_attr(not(feature = "std"), allow(unused_mut))]
735 pub async fn build(self) -> DbResult<AimDb<R>>
736 where
737 R: crate::RuntimeForProfiling,
738 {
739 use crate::DbError;
740
741 // Validate all records
742 for (key, _, record) in &self.records {
743 record.validate().map_err(|_msg| {
744 // Suppress unused warning for key in no_std
745 let _ = &key;
746 #[cfg(feature = "std")]
747 {
748 DbError::RuntimeError {
749 message: format!("Record '{}' validation failed: {}", key.as_str(), _msg),
750 }
751 }
752 #[cfg(not(feature = "std"))]
753 {
754 DbError::RuntimeError { _message: () }
755 }
756 })?;
757 }
758
759 // Ensure runtime is set
760 let runtime = self.runtime.ok_or({
761 #[cfg(feature = "std")]
762 {
763 DbError::RuntimeError {
764 message: "runtime not set (use .runtime())".into(),
765 }
766 }
767 #[cfg(not(feature = "std"))]
768 {
769 DbError::RuntimeError { _message: () }
770 }
771 })?;
772
773 // Build the new index structures
774 let record_count = self.records.len();
775 let mut storages: Vec<Box<dyn AnyRecord>> = Vec::with_capacity(record_count);
776 let mut by_key: HashMap<StringKey, RecordId> = HashMap::with_capacity(record_count);
777 let mut by_type: HashMap<TypeId, Vec<RecordId>> = HashMap::new();
778 let mut types: Vec<TypeId> = Vec::with_capacity(record_count);
779 let mut keys: Vec<StringKey> = Vec::with_capacity(record_count);
780
781 for (i, (key, type_id, record)) in self.records.into_iter().enumerate() {
782 let id = RecordId::new(i as u32);
783
784 // Check for duplicate keys (should not happen if configure() is used correctly)
785 if by_key.contains_key(&key) {
786 #[cfg(feature = "std")]
787 return Err(DbError::DuplicateRecordKey {
788 key: key.as_str().to_string(),
789 });
790 #[cfg(not(feature = "std"))]
791 return Err(DbError::DuplicateRecordKey { _key: () });
792 }
793
794 // Build index structures
795 storages.push(record);
796 by_key.insert(key, id);
797 by_type.entry(type_id).or_default().push(id);
798 types.push(type_id);
799 keys.push(key);
800 }
801
802 // Build dependency graph from record information
803 // Collect RecordGraphInfo for each record
804 let record_infos: Vec<crate::graph::RecordGraphInfo> = storages
805 .iter()
806 .enumerate()
807 .map(|(idx, record)| {
808 let key = keys[idx].as_str().to_string();
809 let origin = record.record_origin();
810
811 // Get buffer type and capacity from the record
812 let (buffer_type, buffer_capacity) = record.buffer_info();
813
814 crate::graph::RecordGraphInfo {
815 key,
816 origin,
817 buffer_type,
818 buffer_capacity,
819 tap_count: record.consumer_count(),
820 has_outbound_link: record.outbound_connector_count() > 0,
821 }
822 })
823 .collect();
824
825 // Build and validate the dependency graph
826 let dependency_graph = DependencyGraph::build_and_validate(&record_infos)?;
827
828 #[cfg(feature = "tracing")]
829 tracing::debug!(
830 "Dependency graph built successfully ({} nodes, {} edges, topo order: {:?})",
831 dependency_graph.nodes.len(),
832 dependency_graph.edges.len(),
833 dependency_graph.topo_order
834 );
835
836 let inner = Arc::new(AimDbInner {
837 storages,
838 by_key,
839 by_type,
840 types,
841 keys,
842 dependency_graph,
843 extensions: self.extensions,
844 });
845
846 #[cfg(feature = "profiling")]
847 let profiling_clock = crate::profiling::make_clock(runtime.clone());
848
849 let db = Arc::new(AimDb {
850 inner: inner.clone(),
851 runtime: runtime.clone(),
852 #[cfg(feature = "profiling")]
853 profiling_clock,
854 });
855
856 #[cfg(feature = "tracing")]
857 tracing::info!(
858 "Spawning producer services and tap observers for {} records",
859 self.spawn_fns.len()
860 );
861
862 // Build a lookup map from spawn_fns for topological ordering
863 let mut spawn_fn_map: HashMap<StringKey, Box<dyn core::any::Any + Send>> =
864 self.spawn_fns.into_iter().collect();
865
866 // Execute spawn functions in topological order
867 // This ensures transforms are spawned after their input records
868 for key_str in inner.dependency_graph.topo_order() {
869 // Find the StringKey that matches this key string
870 let key = match inner.by_key.keys().find(|k| k.as_str() == key_str) {
871 Some(k) => *k,
872 None => continue, // Key not in spawn_fns, skip
873 };
874
875 // Take the spawn function (if any) for this key
876 let spawn_fn_any = match spawn_fn_map.remove(&key) {
877 Some(f) => f,
878 None => continue, // No spawn function for this key
879 };
880
881 // Resolve key to RecordId
882 let id = inner.resolve(&key).ok_or({
883 #[cfg(feature = "std")]
884 {
885 DbError::RecordKeyNotFound {
886 key: key.as_str().to_string(),
887 }
888 }
889 #[cfg(not(feature = "std"))]
890 {
891 DbError::RecordKeyNotFound { _key: () }
892 }
893 })?;
894
895 // Downcast from Box<dyn Any> back to the concrete spawn function type
896 type SpawnFnType<R> =
897 Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send>;
898
899 let spawn_fn = spawn_fn_any
900 .downcast::<SpawnFnType<R>>()
901 .expect("spawn function type mismatch");
902
903 // Execute the spawn function
904 (*spawn_fn)(&runtime, &db, id)?;
905 }
906
907 #[cfg(feature = "tracing")]
908 tracing::info!("Automatic spawning complete");
909
910 // Spawn remote access supervisor if configured (std only)
911 #[cfg(feature = "std")]
912 if let Some(remote_cfg) = self.remote_config {
913 #[cfg(feature = "tracing")]
914 tracing::info!(
915 "Spawning remote access supervisor on socket: {}",
916 remote_cfg.socket_path.display()
917 );
918
919 // Apply security policy to mark writable records
920 let writable_keys = remote_cfg.security_policy.writable_records();
921 for key_str in writable_keys {
922 if let Some(id) = inner.resolve_str(&key_str) {
923 #[cfg(feature = "tracing")]
924 tracing::debug!("Marking record '{}' as writable", key_str);
925
926 // Mark the record as writable (type-erased call)
927 inner.storages[id.index()].set_writable_erased(true);
928 }
929 }
930
931 // Spawn the remote supervisor task
932 crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
933
934 #[cfg(feature = "tracing")]
935 tracing::info!("Remote access supervisor spawned successfully");
936 }
937
938 // Build connectors from builders (after database is fully constructed)
939 // This allows connectors to use collect_inbound_routes() which creates
940 // producers tied to this specific database instance
941 for builder in self.connector_builders {
942 #[cfg(feature = "tracing")]
943 let scheme = {
944 #[cfg(feature = "std")]
945 {
946 builder.scheme().to_string()
947 }
948 #[cfg(not(feature = "std"))]
949 {
950 alloc::string::String::from(builder.scheme())
951 }
952 };
953
954 #[cfg(feature = "tracing")]
955 tracing::debug!("Building connector for scheme: {}", scheme);
956
957 // Build the connector (this spawns tasks as a side effect)
958 let _connector = builder.build(&db).await?;
959
960 #[cfg(feature = "tracing")]
961 tracing::info!("Connector built and spawned successfully: {}", scheme);
962 }
963
964 // Spawn on_start tasks (registered by external crates like aimdb-persistence)
965 if !self.start_fns.is_empty() {
966 #[cfg(feature = "tracing")]
967 tracing::debug!("Spawning {} on_start task(s)", self.start_fns.len());
968
969 #[cfg(feature = "std")]
970 for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
971 let start_fn = start_fn_any
972 .downcast::<StartFnType<R>>()
973 .unwrap_or_else(|_| {
974 panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
975 });
976 let future = (*start_fn)(runtime.clone());
977 runtime.spawn(future).map_err(DbError::from)?;
978 }
979
980 #[cfg(not(feature = "std"))]
981 for (idx, start_fn_any) in self.start_fns.into_iter().enumerate() {
982 let start_fn = start_fn_any
983 .downcast::<NoStdStartFnType<R>>()
984 .unwrap_or_else(|_| {
985 panic!("on_start fn[{idx}] type mismatch — this is a bug in aimdb-core")
986 });
987 let future = (*start_fn)(runtime.clone());
988 runtime.spawn(future).map_err(DbError::from)?;
989 }
990 }
991
992 // Unwrap the Arc to return the owned AimDb
993 // This is safe because we just created it and hold the only reference
994 let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
995
996 Ok(db_owned)
997 }
998}
999
1000impl Default for AimDbBuilder<NoRuntime> {
1001 fn default() -> Self {
1002 Self::new()
1003 }
1004}
1005
1006/// Producer-consumer database
1007///
1008/// A database instance with type-safe record registration and cross-record
1009/// communication via the Emitter pattern. The type parameter `R` represents
1010/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
1011///
1012/// See `examples/` for usage.
1013///
1014/// # Examples
1015///
1016/// ```rust,ignore
1017/// use aimdb_tokio_adapter::TokioAdapter;
1018///
1019/// let runtime = Arc::new(TokioAdapter);
1020/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
1021/// .runtime(runtime)
1022/// .register_record::<Temperature>(&TemperatureConfig)
1023/// .build()?;
1024/// ```
1025pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
1026 /// Internal state
1027 inner: Arc<AimDbInner>,
1028
1029 /// Runtime adapter with concrete type
1030 runtime: Arc<R>,
1031
1032 /// Shared wall clock for stage profiling, built from the runtime at `build()` time.
1033 #[cfg(feature = "profiling")]
1034 profiling_clock: crate::profiling::Clock,
1035}
1036
1037impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
1038 fn clone(&self) -> Self {
1039 Self {
1040 inner: self.inner.clone(),
1041 runtime: self.runtime.clone(),
1042 #[cfg(feature = "profiling")]
1043 profiling_clock: self.profiling_clock.clone(),
1044 }
1045 }
1046}
1047
1048impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
1049 /// Internal accessor for the inner state
1050 ///
1051 /// Used by adapter crates and internal spawning logic.
1052 #[doc(hidden)]
1053 pub fn inner(&self) -> &Arc<AimDbInner> {
1054 &self.inner
1055 }
1056
1057 /// Returns the extension storage frozen at `build()` time.
1058 ///
1059 /// External crates (e.g. `aimdb-persistence`) retrieve their typed state here
1060 /// to service query calls. The extensions are read-only on the live handle.
1061 ///
1062 /// # Example
1063 /// ```rust,ignore
1064 /// use aimdb_persistence::PersistenceState;
1065 /// let state = db.extensions().get::<PersistenceState>().unwrap();
1066 /// ```
1067 pub fn extensions(&self) -> &Extensions {
1068 &self.inner.extensions
1069 }
1070
1071 /// Shared wall clock used by stage profiling (nanoseconds since an arbitrary epoch).
1072 #[cfg(feature = "profiling")]
1073 pub(crate) fn profiling_clock(&self) -> &crate::profiling::Clock {
1074 &self.profiling_clock
1075 }
1076
1077 /// Builds a database with a closure-based builder pattern
1078 pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()>
1079 where
1080 R: crate::RuntimeForProfiling,
1081 {
1082 let mut b = AimDbBuilder::new().runtime(rt);
1083 f(&mut b);
1084 b.run().await
1085 }
1086
1087 /// Spawns a task using the database's runtime adapter
1088 ///
1089 /// This method provides direct access to the runtime's spawn capability.
1090 ///
1091 /// # Arguments
1092 /// * `future` - The future to spawn
1093 ///
1094 /// # Returns
1095 /// `DbResult<()>` - Ok if the task was spawned successfully
1096 pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
1097 where
1098 F: core::future::Future<Output = ()> + Send + 'static,
1099 {
1100 self.runtime.spawn(future).map_err(DbError::from)?;
1101 Ok(())
1102 }
1103
1104 /// Produces a value to a specific record by key
1105 ///
1106 /// Uses O(1) key-based lookup to find the correct record.
1107 ///
1108 /// # Arguments
1109 /// * `key` - The record key (e.g., "sensor.temperature")
1110 /// * `value` - The value to produce
1111 ///
1112 /// # Example
1113 ///
1114 /// ```rust,ignore
1115 /// db.produce::<Temperature>("sensors.indoor", indoor_temp).await?;
1116 /// db.produce::<Temperature>("sensors.outdoor", outdoor_temp).await?;
1117 /// ```
1118 pub async fn produce<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
1119 where
1120 T: Send + 'static + Debug + Clone,
1121 {
1122 let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1123 typed_rec.produce(value).await;
1124 Ok(())
1125 }
1126
1127 /// Subscribes to a specific record by key
1128 ///
1129 /// Uses O(1) key-based lookup to find the correct record.
1130 ///
1131 /// # Arguments
1132 /// * `key` - The record key (e.g., "sensor.temperature")
1133 ///
1134 /// # Example
1135 ///
1136 /// ```rust,ignore
1137 /// let mut reader = db.subscribe::<Temperature>("sensors.indoor")?;
1138 /// while let Ok(temp) = reader.recv().await {
1139 /// println!("Indoor: {:.1}°C", temp.celsius);
1140 /// }
1141 /// ```
1142 pub fn subscribe<T>(
1143 &self,
1144 key: impl AsRef<str>,
1145 ) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
1146 where
1147 T: Send + Sync + 'static + Debug + Clone,
1148 {
1149 let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1150 typed_rec.subscribe()
1151 }
1152
1153 /// Creates a type-safe producer for a specific record by key
1154 ///
1155 /// Returns a `Producer<T, R>` bound to a specific record key.
1156 ///
1157 /// # Arguments
1158 /// * `key` - The record key (e.g., "sensor.temperature")
1159 ///
1160 /// # Example
1161 ///
1162 /// ```rust,ignore
1163 /// let indoor_producer = db.producer::<Temperature>("sensors.indoor");
1164 /// let outdoor_producer = db.producer::<Temperature>("sensors.outdoor");
1165 ///
1166 /// // Each producer writes to its own record
1167 /// indoor_producer.produce(indoor_temp).await?;
1168 /// outdoor_producer.produce(outdoor_temp).await?;
1169 /// ```
1170 pub fn producer<T>(
1171 &self,
1172 key: impl Into<alloc::string::String>,
1173 ) -> crate::typed_api::Producer<T, R>
1174 where
1175 T: Send + 'static + Debug + Clone,
1176 {
1177 crate::typed_api::Producer::new(Arc::new(self.clone()), key.into())
1178 }
1179
1180 /// Creates a type-safe consumer for a specific record by key
1181 ///
1182 /// Returns a `Consumer<T, R>` bound to a specific record key.
1183 ///
1184 /// # Arguments
1185 /// * `key` - The record key (e.g., "sensor.temperature")
1186 ///
1187 /// # Example
1188 ///
1189 /// ```rust,ignore
1190 /// let indoor_consumer = db.consumer::<Temperature>("sensors.indoor");
1191 /// let outdoor_consumer = db.consumer::<Temperature>("sensors.outdoor");
1192 ///
1193 /// // Each consumer reads from its own record
1194 /// let mut rx = indoor_consumer.subscribe()?;
1195 /// ```
1196 pub fn consumer<T>(
1197 &self,
1198 key: impl Into<alloc::string::String>,
1199 ) -> crate::typed_api::Consumer<T, R>
1200 where
1201 T: Send + Sync + 'static + Debug + Clone,
1202 {
1203 crate::typed_api::Consumer::new(Arc::new(self.clone()), key.into())
1204 }
1205
1206 /// Resolve a record key to its RecordId
1207 ///
1208 /// Useful for checking if a record exists before operations.
1209 ///
1210 /// # Example
1211 ///
1212 /// ```rust,ignore
1213 /// if let Some(id) = db.resolve_key("sensors.temperature") {
1214 /// println!("Record exists with ID: {}", id);
1215 /// }
1216 /// ```
1217 pub fn resolve_key(&self, key: &str) -> Option<crate::record_id::RecordId> {
1218 self.inner.resolve_str(key)
1219 }
1220
1221 /// Get all record IDs for a specific type
1222 ///
1223 /// Returns a slice of RecordIds for all records of type T.
1224 /// Useful for introspection when multiple records of the same type exist.
1225 ///
1226 /// # Example
1227 ///
1228 /// ```rust,ignore
1229 /// let temp_ids = db.records_of_type::<Temperature>();
1230 /// println!("Found {} temperature records", temp_ids.len());
1231 /// ```
1232 pub fn records_of_type<T: 'static>(&self) -> &[crate::record_id::RecordId] {
1233 self.inner.records_of_type::<T>()
1234 }
1235
1236 /// Returns a reference to the runtime adapter
1237 ///
1238 /// Provides direct access to the concrete runtime type.
1239 pub fn runtime(&self) -> &R {
1240 &self.runtime
1241 }
1242
1243 /// Returns the runtime as a type-erased `Arc<dyn Any + Send + Sync>`
1244 ///
1245 /// Used by connectors to provide `RuntimeContext` to context-aware
1246 /// deserializers during inbound message routing.
1247 pub fn runtime_any(&self) -> Arc<dyn core::any::Any + Send + Sync> {
1248 self.runtime.clone()
1249 }
1250
1251 /// Lists all registered records (std only)
1252 ///
1253 /// Returns metadata for all registered records, useful for remote access introspection.
1254 /// Available only when the `std` feature is enabled.
1255 ///
1256 /// # Example
1257 /// ```rust,ignore
1258 /// let records = db.list_records();
1259 /// for record in records {
1260 /// println!("Record: {} ({})", record.name, record.type_id);
1261 /// }
1262 /// ```
1263 #[cfg(feature = "std")]
1264 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
1265 self.inner.list_records()
1266 }
1267
1268 /// Resets stage profiling counters for every record (feature `profiling`).
1269 #[cfg(feature = "profiling")]
1270 pub fn reset_stage_profiling(&self) {
1271 for record in &self.inner.storages {
1272 record.reset_profiling();
1273 }
1274 }
1275
1276 /// Resets buffer introspection counters for every record (feature `metrics`).
1277 #[cfg(feature = "metrics")]
1278 pub fn reset_buffer_metrics(&self) {
1279 for record in &self.inner.storages {
1280 record.reset_buffer_metrics();
1281 }
1282 }
1283
1284 /// Try to get record's latest value as JSON by name (std only)
1285 ///
1286 /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
1287 ///
1288 /// # Arguments
1289 /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
1290 ///
1291 /// # Returns
1292 /// `Some(JsonValue)` with current value, or `None` if unavailable
1293 #[cfg(feature = "std")]
1294 pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
1295 self.inner.try_latest_as_json(record_name)
1296 }
1297
1298 /// Sets a record value from JSON (remote access API)
1299 ///
1300 /// Deserializes JSON and produces the value to the record's buffer.
1301 ///
1302 /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
1303 /// records without active producers.
1304 ///
1305 /// # Arguments
1306 /// * `record_name` - Full Rust type name
1307 /// * `json_value` - JSON value to set
1308 ///
1309 /// # Returns
1310 /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
1311 ///
1312 /// # Example (internal use)
1313 /// ```rust,ignore
1314 /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
1315 /// ```
1316 #[cfg(feature = "std")]
1317 pub fn set_record_from_json(
1318 &self,
1319 record_name: &str,
1320 json_value: serde_json::Value,
1321 ) -> DbResult<()> {
1322 self.inner.set_record_from_json(record_name, json_value)
1323 }
1324
1325 /// Subscribe to record updates as JSON stream (std only)
1326 ///
1327 /// Creates a subscription to a record's buffer and forwards updates as JSON
1328 /// to a bounded channel. This is used internally by the remote access protocol
1329 /// for implementing `record.subscribe`.
1330 ///
1331 /// # Architecture
1332 ///
1333 /// Spawns a consumer task that:
1334 /// 1. Subscribes to the record's buffer using the existing buffer API
1335 /// 2. Reads values as they arrive
1336 /// 3. Serializes each value to JSON
1337 /// 4. Sends JSON values to a bounded channel (with backpressure handling)
1338 /// 5. Terminates when either:
1339 /// - The cancel signal is received (unsubscribe)
1340 /// - The channel receiver is dropped (client disconnected)
1341 ///
1342 /// # Arguments
1343 /// * `record_key` - Key of the record to subscribe to
1344 /// * `queue_size` - Size of the bounded channel for this subscription
1345 ///
1346 /// # Returns
1347 /// `Ok((receiver, cancel_tx))` where:
1348 /// - `receiver`: Bounded channel receiver for JSON values
1349 /// - `cancel_tx`: One-shot sender to cancel the subscription
1350 ///
1351 /// `Err` if:
1352 /// - Record not found for the given key
1353 /// - Record not configured with `.with_remote_access()`
1354 /// - Failed to subscribe to buffer
1355 ///
1356 /// # Example (internal use)
1357 ///
1358 /// ```rust,ignore
1359 /// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
1360 ///
1361 /// // Read events
1362 /// while let Some(json_value) = rx.recv().await {
1363 /// // Forward to client...
1364 /// }
1365 ///
1366 /// // Cancel subscription
1367 /// let _ = cancel_tx.send(());
1368 /// ```
1369 #[cfg(feature = "std")]
1370 #[allow(unused_variables)] // Variables used only in tracing feature
1371 pub fn subscribe_record_updates(
1372 &self,
1373 record_key: &str,
1374 queue_size: usize,
1375 ) -> DbResult<(
1376 tokio::sync::mpsc::Receiver<serde_json::Value>,
1377 tokio::sync::oneshot::Sender<()>,
1378 )> {
1379 use tokio::sync::{mpsc, oneshot};
1380
1381 // Find the record by key
1382 let id = self
1383 .inner
1384 .resolve_str(record_key)
1385 .ok_or_else(|| DbError::RecordKeyNotFound {
1386 key: record_key.to_string(),
1387 })?;
1388
1389 let record = self
1390 .inner
1391 .storage(id)
1392 .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
1393
1394 // Subscribe to the record's buffer as JSON stream
1395 // This will fail if record not configured with .with_remote_access()
1396 let mut json_reader = record.subscribe_json()?;
1397
1398 // Create channels for the subscription
1399 let (value_tx, value_rx) = mpsc::channel(queue_size);
1400 let (cancel_tx, mut cancel_rx) = oneshot::channel();
1401
1402 // Get metadata for logging
1403 let type_id = self.inner.types[id.index()];
1404 let key = self.inner.keys[id.index()];
1405 let record_metadata = record.collect_metadata(type_id, key, id);
1406 let runtime = self.runtime.clone();
1407
1408 // Spawn consumer task that forwards JSON values from buffer to channel
1409 let spawn_result = runtime.spawn(async move {
1410 #[cfg(feature = "tracing")]
1411 tracing::debug!(
1412 "Subscription consumer task started for {}",
1413 record_metadata.name
1414 );
1415
1416 // Main event loop: read from buffer and forward to channel
1417 loop {
1418 tokio::select! {
1419 // Handle cancellation signal
1420 _ = &mut cancel_rx => {
1421 #[cfg(feature = "tracing")]
1422 tracing::debug!("Subscription cancelled");
1423 break;
1424 }
1425 // Read next JSON value from buffer
1426 result = json_reader.recv_json() => {
1427 match result {
1428 Ok(json_val) => {
1429 // Send JSON value to subscription channel
1430 if value_tx.send(json_val).await.is_err() {
1431 #[cfg(feature = "tracing")]
1432 tracing::debug!("Subscription receiver dropped");
1433 break;
1434 }
1435 }
1436 Err(DbError::BufferLagged { lag_count, .. }) => {
1437 // Consumer fell behind - log warning but continue
1438 #[cfg(feature = "tracing")]
1439 tracing::warn!(
1440 "Subscription for {} lagged by {} messages",
1441 record_metadata.name,
1442 lag_count
1443 );
1444 // Continue reading - next recv will get latest
1445 }
1446 Err(DbError::BufferClosed { .. }) => {
1447 // Buffer closed (shutdown) - exit gracefully
1448 #[cfg(feature = "tracing")]
1449 tracing::debug!("Buffer closed for {}", record_metadata.name);
1450 break;
1451 }
1452 Err(e) => {
1453 // Other error (shouldn't happen in practice)
1454 #[cfg(feature = "tracing")]
1455 tracing::error!(
1456 "Subscription error for {}: {:?}",
1457 record_metadata.name,
1458 e
1459 );
1460 break;
1461 }
1462 }
1463 }
1464 }
1465 }
1466
1467 #[cfg(feature = "tracing")]
1468 tracing::debug!("Subscription consumer task terminated");
1469 });
1470
1471 spawn_result.map_err(DbError::from)?;
1472
1473 Ok((value_rx, cancel_tx))
1474 }
1475
1476 /// Collects inbound connector routes for automatic router construction (std only)
1477 ///
1478 /// Iterates all records, filters their inbound_connectors by scheme,
1479 /// and returns routes with producer creation callbacks.
1480 ///
1481 /// # Arguments
1482 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1483 ///
1484 /// # Returns
1485 /// Vector of tuples: (topic, producer_trait, deserializer)
1486 ///
1487 /// The topic is resolved dynamically if a `TopicResolverFn` is configured,
1488 /// otherwise the static topic from the URL is used.
1489 ///
1490 /// # Example
1491 /// ```rust,ignore
1492 /// // In MqttConnector after db.build()
1493 /// let routes = db.collect_inbound_routes("mqtt");
1494 /// let router = RouterBuilder::from_routes(routes).build();
1495 /// connector.set_router(router).await?;
1496 /// ```
1497 #[cfg(feature = "alloc")]
1498 pub fn collect_inbound_routes(
1499 &self,
1500 scheme: &str,
1501 ) -> Vec<(
1502 String,
1503 Box<dyn crate::connector::ProducerTrait>,
1504 crate::connector::DeserializerKind,
1505 )> {
1506 let mut routes = Vec::new();
1507
1508 // Convert self to Arc<dyn Any> for producer factory
1509 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1510
1511 for record in &self.inner.storages {
1512 let inbound_links = record.inbound_connectors();
1513
1514 for link in inbound_links {
1515 // Filter by scheme
1516 if link.url.scheme() != scheme {
1517 continue;
1518 }
1519
1520 // Resolve topic: dynamic (from resolver) or static (from URL)
1521 let topic = link.resolve_topic();
1522
1523 // Create producer using the stored factory
1524 if let Some(producer) = link.create_producer(db_any.clone()) {
1525 routes.push((topic, producer, link.deserializer.clone()));
1526 }
1527 }
1528 }
1529
1530 #[cfg(feature = "tracing")]
1531 if !routes.is_empty() {
1532 tracing::debug!(
1533 "Collected {} inbound routes for scheme '{}'",
1534 routes.len(),
1535 scheme
1536 );
1537 }
1538
1539 routes
1540 }
1541
1542 /// Collects outbound routes for a specific protocol scheme
1543 ///
1544 /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1545 /// filters their outbound_connectors by scheme, and returns routes with
1546 /// consumer creation callbacks.
1547 ///
1548 /// This method is called by connectors during their `build()` phase to
1549 /// collect all configured outbound routes and spawn publisher tasks.
1550 ///
1551 /// # Arguments
1552 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1553 ///
1554 /// # Returns
1555 /// Vector of tuples: (destination, consumer_trait, serializer, config)
1556 ///
1557 /// The config Vec contains protocol-specific options (e.g., qos, retain).
1558 ///
1559 /// # Example
1560 /// ```rust,ignore
1561 /// // In MqttConnector::build()
1562 /// let routes = db.collect_outbound_routes("mqtt");
1563 /// for (topic, consumer, serializer, config) in routes {
1564 /// connector.spawn_publisher(topic, consumer, serializer, config)?;
1565 /// }
1566 /// ```
1567 /// Collect `(topic, TypeId)` pairs for all outbound routes matching `scheme`.
1568 ///
1569 /// Complements [`collect_outbound_routes`](Self::collect_outbound_routes) when
1570 /// callers need to know the concrete record type behind each outbound topic
1571 /// (e.g. to resolve a schema name for discovery responses).
1572 ///
1573 /// The returned TypeId is the `TypeId::of::<T>()` for the record type `T`
1574 /// that was used in the corresponding `configure::<T>()` call.
1575 #[cfg(feature = "alloc")]
1576 pub fn collect_outbound_topic_type_ids(&self, scheme: &str) -> Vec<(String, TypeId)> {
1577 let mut result = Vec::new();
1578
1579 for (idx, record) in self.inner.storages.iter().enumerate() {
1580 let type_id = self.inner.types[idx];
1581
1582 for link in record.outbound_connectors() {
1583 if link.url.scheme() != scheme {
1584 continue;
1585 }
1586 result.push((link.url.resource_id(), type_id));
1587 }
1588 }
1589
1590 result
1591 }
1592
1593 #[cfg(feature = "alloc")]
1594 pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1595 let mut routes = Vec::new();
1596
1597 // Convert self to Arc<dyn Any> for consumer factory
1598 // This is necessary because the factory takes Arc<dyn Any> to avoid
1599 // needing to know the runtime type R at the factory definition site
1600 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1601
1602 for record in &self.inner.storages {
1603 let outbound_links = record.outbound_connectors();
1604
1605 for link in outbound_links {
1606 // Filter by scheme
1607 if link.url.scheme() != scheme {
1608 continue;
1609 }
1610
1611 let destination = link.url.resource_id();
1612
1613 // Skip links without serializer
1614 let Some(serializer) = link.serializer.clone() else {
1615 #[cfg(feature = "tracing")]
1616 tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1617 continue;
1618 };
1619
1620 // Create consumer using the stored factory
1621 if let Some(consumer) = link.create_consumer(db_any.clone()) {
1622 routes.push((
1623 destination,
1624 consumer,
1625 serializer,
1626 link.config.clone(),
1627 link.topic_provider.clone(),
1628 ));
1629 }
1630 }
1631 }
1632
1633 #[cfg(feature = "tracing")]
1634 if !routes.is_empty() {
1635 tracing::debug!(
1636 "Collected {} outbound routes for scheme '{}'",
1637 routes.len(),
1638 scheme
1639 );
1640 }
1641
1642 routes
1643 }
1644}