aimdb_core/builder.rs
1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::vec::Vec;
13use hashbrown::HashMap;
14
15#[cfg(not(feature = "std"))]
16use alloc::{boxed::Box, sync::Arc};
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19use alloc::string::String;
20
21#[cfg(feature = "std")]
22use std::{boxed::Box, sync::Arc};
23
24use crate::record_id::{RecordId, RecordKey};
25use crate::typed_api::{RecordRegistrar, RecordT};
26use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
27use crate::{DbError, DbResult};
28
29/// Type alias for outbound route tuples returned by `collect_outbound_routes`
30///
31/// Each tuple contains:
32/// - `String` - Topic/key from the URL path
33/// - `Box<dyn ConsumerTrait>` - Callback to create a consumer for this record
34/// - `SerializerFn` - User-provided serializer for the record type
35/// - `Vec<(String, String)>` - Configuration options from the URL query
36#[cfg(feature = "alloc")]
37type OutboundRoute = (
38 String,
39 Box<dyn crate::connector::ConsumerTrait>,
40 crate::connector::SerializerFn,
41 Vec<(String, String)>,
42);
43
44/// Marker type for untyped builder (before runtime is set)
45pub struct NoRuntime;
46
47/// Internal database state
48///
49/// Holds the registry of typed records with multiple index structures for
50/// efficient access patterns:
51///
52/// - **`storages`**: Vec for O(1) hot-path access by RecordId
53/// - **`by_key`**: HashMap for O(1) lookup by stable RecordKey
54/// - **`by_type`**: HashMap for introspection (find all records of type T)
55/// - **`types`**: Vec for runtime type validation during downcasts
56pub struct AimDbInner {
57 /// Record storage (hot path - indexed by RecordId)
58 ///
59 /// Order matches registration order. Immutable after build().
60 storages: Vec<Box<dyn AnyRecord>>,
61
62 /// Name → RecordId lookup (control plane)
63 ///
64 /// Used by remote access, CLI, MCP for O(1) name resolution.
65 by_key: HashMap<RecordKey, RecordId>,
66
67 /// TypeId → RecordIds lookup (introspection)
68 ///
69 /// Enables "find all Temperature records" queries.
70 by_type: HashMap<TypeId, Vec<RecordId>>,
71
72 /// RecordId → TypeId lookup (type safety assertions)
73 ///
74 /// Used to validate downcasts at runtime.
75 types: Vec<TypeId>,
76
77 /// RecordId → RecordKey lookup (reverse mapping)
78 ///
79 /// Used to get the key for a given record ID.
80 keys: Vec<RecordKey>,
81}
82
83impl AimDbInner {
84 /// Resolve RecordKey to RecordId (control plane - O(1) average)
85 #[inline]
86 pub fn resolve(&self, key: &RecordKey) -> Option<RecordId> {
87 self.by_key.get(key.as_str()).copied()
88 }
89
90 /// Resolve string to RecordId (convenience for remote access)
91 ///
92 /// O(1) average thanks to `Borrow<str>` implementation on `RecordKey`.
93 #[inline]
94 pub fn resolve_str(&self, name: &str) -> Option<RecordId> {
95 self.by_key.get(name).copied()
96 }
97
98 /// Get storage by RecordId (hot path - O(1))
99 #[inline]
100 pub fn storage(&self, id: RecordId) -> Option<&dyn AnyRecord> {
101 self.storages.get(id.index()).map(|b| b.as_ref())
102 }
103
104 /// Get the RecordKey for a given RecordId
105 #[inline]
106 pub fn key_for(&self, id: RecordId) -> Option<&RecordKey> {
107 self.keys.get(id.index())
108 }
109
110 /// Get all RecordIds for a type (introspection)
111 pub fn records_of_type<T: 'static>(&self) -> &[RecordId] {
112 self.by_type
113 .get(&TypeId::of::<T>())
114 .map(|v| v.as_slice())
115 .unwrap_or(&[])
116 }
117
118 /// Get the number of registered records
119 #[inline]
120 pub fn record_count(&self) -> usize {
121 self.storages.len()
122 }
123
124 /// Helper to get a typed record by RecordKey
125 ///
126 /// This encapsulates the common pattern of:
127 /// 1. Resolving key to RecordId
128 /// 2. Validating TypeId matches
129 /// 3. Downcasting to the typed record
130 pub fn get_typed_record_by_key<T, R>(
131 &self,
132 key: impl AsRef<str>,
133 ) -> DbResult<&TypedRecord<T, R>>
134 where
135 T: Send + 'static + Debug + Clone,
136 R: aimdb_executor::Spawn + 'static,
137 {
138 let key_str = key.as_ref();
139
140 // Resolve key to RecordId
141 let id = self.resolve_str(key_str).ok_or({
142 #[cfg(feature = "std")]
143 {
144 DbError::RecordKeyNotFound {
145 key: key_str.to_string(),
146 }
147 }
148 #[cfg(not(feature = "std"))]
149 {
150 DbError::RecordKeyNotFound { _key: () }
151 }
152 })?;
153
154 self.get_typed_record_by_id::<T, R>(id)
155 }
156
157 /// Helper to get a typed record by RecordId with type validation
158 pub fn get_typed_record_by_id<T, R>(&self, id: RecordId) -> DbResult<&TypedRecord<T, R>>
159 where
160 T: Send + 'static + Debug + Clone,
161 R: aimdb_executor::Spawn + 'static,
162 {
163 use crate::typed_record::AnyRecordExt;
164
165 // Validate RecordId is in bounds
166 if id.index() >= self.storages.len() {
167 return Err(DbError::InvalidRecordId { id: id.raw() });
168 }
169
170 // Validate TypeId matches
171 let expected = TypeId::of::<T>();
172 let actual = self.types[id.index()];
173 if expected != actual {
174 #[cfg(feature = "std")]
175 return Err(DbError::TypeMismatch {
176 record_id: id.raw(),
177 expected_type: core::any::type_name::<T>().to_string(),
178 });
179 #[cfg(not(feature = "std"))]
180 return Err(DbError::TypeMismatch {
181 record_id: id.raw(),
182 _expected_type: (),
183 });
184 }
185
186 // Safe to downcast (type validated above)
187 let record = &self.storages[id.index()];
188
189 #[cfg(feature = "std")]
190 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
191 operation: "get_typed_record_by_id".to_string(),
192 reason: "type mismatch during downcast".to_string(),
193 })?;
194
195 #[cfg(not(feature = "std"))]
196 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
197 _operation: (),
198 _reason: (),
199 })?;
200
201 Ok(typed_record)
202 }
203
204 /// Helper to get a typed record from the registry (legacy API)
205 ///
206 /// **Deprecated**: Use `get_typed_record_by_key()` instead.
207 ///
208 /// This method only works when exactly one record of type T exists.
209 /// Returns `AmbiguousType` error if multiple records of the same type exist.
210 pub fn get_typed_record<T, R>(&self) -> DbResult<&TypedRecord<T, R>>
211 where
212 T: Send + 'static + Debug + Clone,
213 R: aimdb_executor::Spawn + 'static,
214 {
215 let type_id = TypeId::of::<T>();
216 let ids = self.by_type.get(&type_id);
217
218 match ids.map(|v| v.as_slice()) {
219 None | Some([]) => {
220 #[cfg(feature = "std")]
221 return Err(DbError::RecordNotFound {
222 record_name: core::any::type_name::<T>().to_string(),
223 });
224 #[cfg(not(feature = "std"))]
225 return Err(DbError::RecordNotFound { _record_name: () });
226 }
227 Some([single_id]) => self.get_typed_record_by_id::<T, R>(*single_id),
228 Some(multiple) => {
229 #[cfg(feature = "std")]
230 return Err(DbError::AmbiguousType {
231 count: multiple.len() as u32,
232 type_name: core::any::type_name::<T>().to_string(),
233 });
234 #[cfg(not(feature = "std"))]
235 return Err(DbError::AmbiguousType {
236 count: multiple.len() as u32,
237 _type_name: (),
238 });
239 }
240 }
241 }
242
243 /// Collects metadata for all registered records (std only)
244 ///
245 /// Returns a vector of `RecordMetadata` for remote access introspection.
246 /// Available only when the `std` feature is enabled.
247 #[cfg(feature = "std")]
248 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
249 self.storages
250 .iter()
251 .enumerate()
252 .map(|(i, record)| {
253 let id = RecordId::new(i as u32);
254 let type_id = self.types[i];
255 let key = &self.keys[i];
256 record.collect_metadata(type_id, key.clone(), id)
257 })
258 .collect()
259 }
260
261 /// Try to get record's latest value as JSON by record key (std only)
262 ///
263 /// O(1) lookup using the key-based index.
264 ///
265 /// # Arguments
266 /// * `record_key` - The record key (e.g., "sensors.temperature")
267 ///
268 /// # Returns
269 /// `Some(JsonValue)` with the current record value, or `None`
270 #[cfg(feature = "std")]
271 pub fn try_latest_as_json(&self, record_key: &str) -> Option<serde_json::Value> {
272 let id = self.resolve_str(record_key)?;
273 self.storages.get(id.index())?.latest_json()
274 }
275
276 /// Sets a record value from JSON (remote access API)
277 ///
278 /// Deserializes the JSON value and writes it to the record's buffer.
279 ///
280 /// **SAFETY:** Enforces the "No Producer Override" rule:
281 /// - Only works for records with `producer_count == 0`
282 /// - Returns error if the record has active producers
283 ///
284 /// # Arguments
285 /// * `record_key` - The record key (e.g., "config.app")
286 /// * `json_value` - JSON representation of the value
287 ///
288 /// # Returns
289 /// - `Ok(())` - Successfully set the value
290 /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
291 #[cfg(feature = "std")]
292 pub fn set_record_from_json(
293 &self,
294 record_key: &str,
295 json_value: serde_json::Value,
296 ) -> DbResult<()> {
297 let id = self
298 .resolve_str(record_key)
299 .ok_or_else(|| DbError::RecordKeyNotFound {
300 key: record_key.to_string(),
301 })?;
302
303 self.storages[id.index()].set_from_json(json_value)
304 }
305}
306
307/// Database builder for producer-consumer pattern
308///
309/// Provides a fluent API for constructing databases with type-safe record registration.
310/// Use `.runtime()` to set the runtime and transition to a typed builder.
311pub struct AimDbBuilder<R = NoRuntime> {
312 /// Registered records with their keys (order matters for RecordId assignment)
313 records: Vec<(RecordKey, TypeId, Box<dyn AnyRecord>)>,
314
315 /// Runtime adapter
316 runtime: Option<Arc<R>>,
317
318 /// Connector builders that will be invoked during build()
319 connector_builders: Vec<Box<dyn crate::connector::ConnectorBuilder<R>>>,
320
321 /// Spawn functions with their keys
322 spawn_fns: Vec<(RecordKey, Box<dyn core::any::Any + Send>)>,
323
324 /// Remote access configuration (std only)
325 #[cfg(feature = "std")]
326 remote_config: Option<crate::remote::AimxConfig>,
327
328 /// PhantomData to track the runtime type parameter
329 _phantom: PhantomData<R>,
330}
331
332impl AimDbBuilder<NoRuntime> {
333 /// Creates a new database builder without a runtime
334 ///
335 /// Call `.runtime()` to set the runtime adapter.
336 pub fn new() -> Self {
337 Self {
338 records: Vec::new(),
339 runtime: None,
340 connector_builders: Vec::new(),
341 spawn_fns: Vec::new(),
342 #[cfg(feature = "std")]
343 remote_config: None,
344 _phantom: PhantomData,
345 }
346 }
347
348 /// Sets the runtime adapter
349 ///
350 /// This transitions the builder from untyped to typed with concrete runtime `R`.
351 ///
352 /// # Type Safety Note
353 ///
354 /// The `connector_builders` field is intentionally reset to `Vec::new()` during this
355 /// transition because connectors are parameterized by the runtime type:
356 ///
357 /// - Before: `Vec<Box<dyn ConnectorBuilder<NoRuntime>>>`
358 /// - After: `Vec<Box<dyn ConnectorBuilder<R>>>`
359 ///
360 /// These types are incompatible and cannot be transferred. However, this is not a bug
361 /// because `.with_connector()` is only available AFTER calling `.runtime()` (it's defined
362 /// in the `impl<R> where R: Spawn` block, not in `impl AimDbBuilder<NoRuntime>`).
363 ///
364 /// This means the type system **enforces** the correct call order:
365 /// ```rust,ignore
366 /// AimDbBuilder::new()
367 /// .runtime(runtime) // ← Must be called first
368 /// .with_connector(connector) // ← Now available
369 /// ```
370 ///
371 /// The `records` and `remote_config` are preserved across the transition since they
372 /// are not parameterized by the runtime type.
373 pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
374 where
375 R: aimdb_executor::Spawn + 'static,
376 {
377 AimDbBuilder {
378 records: self.records,
379 runtime: Some(rt),
380 connector_builders: Vec::new(),
381 spawn_fns: Vec::new(),
382 #[cfg(feature = "std")]
383 remote_config: self.remote_config,
384 _phantom: PhantomData,
385 }
386 }
387}
388
389impl<R> AimDbBuilder<R>
390where
391 R: aimdb_executor::Spawn + 'static,
392{
393 /// Registers a connector builder that will be invoked during `build()`
394 ///
395 /// The connector builder will be called after the database is constructed,
396 /// allowing it to collect routes and initialize the connector properly.
397 ///
398 /// # Arguments
399 /// * `builder` - A connector builder that implements `ConnectorBuilder<R>`
400 ///
401 /// # Example
402 ///
403 /// ```rust,ignore
404 /// use aimdb_mqtt_connector::MqttConnector;
405 ///
406 /// let db = AimDbBuilder::new()
407 /// .runtime(runtime)
408 /// .with_connector(MqttConnector::new("mqtt://broker.local:1883"))
409 /// .configure::<Temperature>(|reg| {
410 /// reg.link_from("mqtt://commands/temp")...
411 /// })
412 /// .build().await?;
413 /// ```
414 pub fn with_connector(
415 mut self,
416 builder: impl crate::connector::ConnectorBuilder<R> + 'static,
417 ) -> Self {
418 self.connector_builders.push(Box::new(builder));
419 self
420 }
421
422 /// Enables remote access via AimX protocol (std only)
423 ///
424 /// Configures the database to accept remote connections over a Unix domain socket,
425 /// allowing external clients to introspect records, subscribe to updates, and
426 /// (optionally) write data.
427 ///
428 /// The remote access supervisor will be spawned automatically during `build()`.
429 ///
430 /// # Arguments
431 /// * `config` - Remote access configuration (socket path, security policy, etc.)
432 ///
433 /// # Example
434 ///
435 /// ```rust,ignore
436 /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
437 ///
438 /// let config = AimxConfig::new("/tmp/aimdb.sock")
439 /// .with_security(SecurityPolicy::read_only());
440 ///
441 /// let db = AimDbBuilder::new()
442 /// .runtime(runtime)
443 /// .with_remote_access(config)
444 /// .build()?;
445 /// ```
446 #[cfg(feature = "std")]
447 pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
448 self.remote_config = Some(config);
449 self
450 }
451
452 /// Configures a record type manually with a unique key
453 ///
454 /// The key uniquely identifies this record instance. Multiple records of the same
455 /// type can exist with different keys (e.g., "sensor.temperature.room1" and
456 /// "sensor.temperature.room2").
457 ///
458 /// # Arguments
459 /// * `key` - A unique string identifier for this record (e.g., "sensor.temperature.room1")
460 /// * `f` - Configuration closure
461 ///
462 /// # Example
463 /// ```rust,ignore
464 /// builder.configure::<Temperature>("sensor.temp.room1", |reg| {
465 /// reg.with_buffer(BufferCfg::SingleLatest)
466 /// .with_serialization();
467 /// });
468 /// ```
469 pub fn configure<T>(
470 &mut self,
471 key: impl Into<RecordKey>,
472 f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
473 ) -> &mut Self
474 where
475 T: Send + Sync + 'static + Debug + Clone,
476 {
477 let record_key: RecordKey = key.into();
478 let type_id = TypeId::of::<T>();
479
480 // Find existing record with this key, or create new one
481 let record_index = self.records.iter().position(|(k, _, _)| k == &record_key);
482
483 let (rec, is_new_record) = match record_index {
484 Some(idx) => {
485 // Use existing record
486 let (_, existing_type, record) = &mut self.records[idx];
487 assert!(
488 *existing_type == type_id,
489 "RecordKey '{}' already registered with different type",
490 record_key.as_str()
491 );
492 (
493 record
494 .as_typed_mut::<T, R>()
495 .expect("type mismatch in record registry"),
496 false,
497 )
498 }
499 None => {
500 // Create new record
501 self.records.push((
502 record_key.clone(),
503 type_id,
504 Box::new(TypedRecord::<T, R>::new()),
505 ));
506 let (_, _, record) = self.records.last_mut().unwrap();
507 (
508 record
509 .as_typed_mut::<T, R>()
510 .expect("type mismatch in record registry"),
511 true,
512 )
513 }
514 };
515
516 let mut reg = RecordRegistrar {
517 rec,
518 connector_builders: &self.connector_builders,
519 };
520 f(&mut reg);
521
522 // Only store spawn function for new records to avoid duplicates
523 if is_new_record {
524 let spawn_key = record_key.clone();
525
526 #[allow(clippy::type_complexity)]
527 let spawn_fn: Box<
528 dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send,
529 > = Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>, id: RecordId| {
530 // Use RecordSpawner to spawn tasks for this record type
531 use crate::typed_record::RecordSpawner;
532
533 let typed_record = db.inner().get_typed_record_by_id::<T, R>(id)?;
534 RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db)
535 });
536
537 // Store the spawn function (type-erased in Box<dyn Any>)
538 self.spawn_fns.push((spawn_key, Box::new(spawn_fn)));
539 }
540
541 self
542 }
543
544 /// Registers a self-registering record type
545 ///
546 /// The record type must implement `RecordT<R>`.
547 ///
548 /// Uses the type name as the default key. For custom keys, use `configure()` directly.
549 pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
550 where
551 T: RecordT<R>,
552 {
553 // Default key is the full type name for backward compatibility
554 let key = RecordKey::new(core::any::type_name::<T>());
555 self.configure::<T>(key, |reg| T::register(reg, cfg))
556 }
557
558 /// Registers a self-registering record type with a custom key
559 ///
560 /// The record type must implement `RecordT<R>`.
561 pub fn register_record_with_key<T>(
562 &mut self,
563 key: impl Into<RecordKey>,
564 cfg: &T::Config,
565 ) -> &mut Self
566 where
567 T: RecordT<R>,
568 {
569 self.configure::<T>(key, |reg| T::register(reg, cfg))
570 }
571
572 /// Runs the database indefinitely (never returns)
573 ///
574 /// This method builds the database, spawns all producer and consumer tasks, and then
575 /// parks the current task indefinitely. This is the primary way to run AimDB services.
576 ///
577 /// All logic runs in background tasks via producers, consumers, and connectors. The
578 /// application continues until interrupted (e.g., Ctrl+C).
579 ///
580 /// # Returns
581 /// `DbResult<()>` - Ok when database starts successfully, then parks forever
582 ///
583 /// # Example
584 ///
585 /// ```rust,ignore
586 /// #[tokio::main]
587 /// async fn main() -> DbResult<()> {
588 /// AimDbBuilder::new()
589 /// .runtime(Arc::new(TokioAdapter::new()?))
590 /// .configure::<MyData>(|reg| {
591 /// reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
592 /// .with_source(my_producer)
593 /// .with_tap(my_consumer);
594 /// })
595 /// .run().await // Runs forever
596 /// }
597 /// ```
598 pub async fn run(self) -> DbResult<()> {
599 #[cfg(feature = "tracing")]
600 tracing::info!("Building database and spawning background tasks...");
601
602 let _db = self.build().await?;
603
604 #[cfg(feature = "tracing")]
605 tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
606
607 // Park indefinitely - the background tasks will continue running
608 // The database handle is kept alive here to prevent dropping it
609 core::future::pending::<()>().await;
610
611 Ok(())
612 }
613
614 /// Builds the database and returns the handle (async)
615 ///
616 /// Use this when you need programmatic access to the database handle for
617 /// manual subscriptions or production. For typical services, use `.run().await` instead.
618 ///
619 /// **Automatic Task Spawning:** This method spawns all producer services and
620 /// `.tap()` observer tasks that were registered during configuration.
621 ///
622 /// **Connector Setup:** Connectors must be created manually before calling `build()`:
623 ///
624 /// ```rust,ignore
625 /// use aimdb_mqtt_connector::{MqttConnector, router::RouterBuilder};
626 ///
627 /// // Configure records with connector links
628 /// let builder = AimDbBuilder::new()
629 /// .runtime(runtime)
630 /// .configure::<Temp>("temp", |reg| {
631 /// reg.link_from("mqtt://commands/temp")
632 /// .with_buffer(BufferCfg::SingleLatest)
633 /// .with_serialization();
634 /// });
635 ///
636 /// // Create MQTT connector with router
637 /// let router = RouterBuilder::new()
638 /// .route("commands/temp", /* deserializer */)
639 /// .build();
640 /// let connector = MqttConnector::new("mqtt://localhost:1883", router).await?;
641 ///
642 /// // Register connector and build
643 /// let db = builder
644 /// .with_connector("mqtt", Arc::new(connector))
645 /// .build().await?;
646 /// ```
647 ///
648 /// # Returns
649 /// `DbResult<AimDb<R>>` - The database instance
650 #[cfg_attr(not(feature = "std"), allow(unused_mut))]
651 pub async fn build(self) -> DbResult<AimDb<R>> {
652 use crate::DbError;
653
654 // Validate all records
655 for (key, _, record) in &self.records {
656 record.validate().map_err(|_msg| {
657 // Suppress unused warning for key in no_std
658 let _ = &key;
659 #[cfg(feature = "std")]
660 {
661 DbError::RuntimeError {
662 message: format!("Record '{}' validation failed: {}", key.as_str(), _msg),
663 }
664 }
665 #[cfg(not(feature = "std"))]
666 {
667 DbError::RuntimeError { _message: () }
668 }
669 })?;
670 }
671
672 // Ensure runtime is set
673 let runtime = self.runtime.ok_or({
674 #[cfg(feature = "std")]
675 {
676 DbError::RuntimeError {
677 message: "runtime not set (use .runtime())".into(),
678 }
679 }
680 #[cfg(not(feature = "std"))]
681 {
682 DbError::RuntimeError { _message: () }
683 }
684 })?;
685
686 // Build the new index structures
687 let record_count = self.records.len();
688 let mut storages: Vec<Box<dyn AnyRecord>> = Vec::with_capacity(record_count);
689 let mut by_key: HashMap<RecordKey, RecordId> = HashMap::with_capacity(record_count);
690 let mut by_type: HashMap<TypeId, Vec<RecordId>> = HashMap::new();
691 let mut types: Vec<TypeId> = Vec::with_capacity(record_count);
692 let mut keys: Vec<RecordKey> = Vec::with_capacity(record_count);
693
694 for (i, (key, type_id, record)) in self.records.into_iter().enumerate() {
695 let id = RecordId::new(i as u32);
696
697 // Check for duplicate keys (should not happen if configure() is used correctly)
698 if by_key.contains_key(&key) {
699 #[cfg(feature = "std")]
700 return Err(DbError::DuplicateRecordKey {
701 key: key.as_str().to_string(),
702 });
703 #[cfg(not(feature = "std"))]
704 return Err(DbError::DuplicateRecordKey { _key: () });
705 }
706
707 // Build index structures
708 storages.push(record);
709 by_key.insert(key.clone(), id);
710 by_type.entry(type_id).or_default().push(id);
711 types.push(type_id);
712 keys.push(key);
713 }
714
715 let inner = Arc::new(AimDbInner {
716 storages,
717 by_key,
718 by_type,
719 types,
720 keys,
721 });
722
723 let db = Arc::new(AimDb {
724 inner: inner.clone(),
725 runtime: runtime.clone(),
726 });
727
728 #[cfg(feature = "tracing")]
729 tracing::info!(
730 "Spawning producer services and tap observers for {} records",
731 self.spawn_fns.len()
732 );
733
734 // Execute spawn functions for each record
735 for (key, spawn_fn_any) in self.spawn_fns {
736 // Resolve key to RecordId
737 let id = inner.resolve(&key).ok_or({
738 #[cfg(feature = "std")]
739 {
740 DbError::RecordKeyNotFound {
741 key: key.as_str().to_string(),
742 }
743 }
744 #[cfg(not(feature = "std"))]
745 {
746 DbError::RecordKeyNotFound { _key: () }
747 }
748 })?;
749
750 // Downcast from Box<dyn Any> back to the concrete spawn function type
751 type SpawnFnType<R> =
752 Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>, RecordId) -> DbResult<()> + Send>;
753
754 let spawn_fn = spawn_fn_any
755 .downcast::<SpawnFnType<R>>()
756 .expect("spawn function type mismatch");
757
758 // Execute the spawn function
759 (*spawn_fn)(&runtime, &db, id)?;
760 }
761
762 #[cfg(feature = "tracing")]
763 tracing::info!("Automatic spawning complete");
764
765 // Spawn remote access supervisor if configured (std only)
766 #[cfg(feature = "std")]
767 if let Some(remote_cfg) = self.remote_config {
768 #[cfg(feature = "tracing")]
769 tracing::info!(
770 "Spawning remote access supervisor on socket: {}",
771 remote_cfg.socket_path.display()
772 );
773
774 // Apply security policy to mark writable records
775 let writable_keys = remote_cfg.security_policy.writable_records();
776 for key_str in writable_keys {
777 if let Some(id) = inner.resolve_str(&key_str) {
778 #[cfg(feature = "tracing")]
779 tracing::debug!("Marking record '{}' as writable", key_str);
780
781 // Mark the record as writable (type-erased call)
782 inner.storages[id.index()].set_writable_erased(true);
783 }
784 }
785
786 // Spawn the remote supervisor task
787 crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
788
789 #[cfg(feature = "tracing")]
790 tracing::info!("Remote access supervisor spawned successfully");
791 }
792
793 // Build connectors from builders (after database is fully constructed)
794 // This allows connectors to use collect_inbound_routes() which creates
795 // producers tied to this specific database instance
796 for builder in self.connector_builders {
797 #[cfg(feature = "tracing")]
798 let scheme = {
799 #[cfg(feature = "std")]
800 {
801 builder.scheme().to_string()
802 }
803 #[cfg(not(feature = "std"))]
804 {
805 alloc::string::String::from(builder.scheme())
806 }
807 };
808
809 #[cfg(feature = "tracing")]
810 tracing::debug!("Building connector for scheme: {}", scheme);
811
812 // Build the connector (this spawns tasks as a side effect)
813 let _connector = builder.build(&db).await?;
814
815 #[cfg(feature = "tracing")]
816 tracing::info!("Connector built and spawned successfully: {}", scheme);
817 }
818
819 // Unwrap the Arc to return the owned AimDb
820 // This is safe because we just created it and hold the only reference
821 let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
822
823 Ok(db_owned)
824 }
825}
826
827impl Default for AimDbBuilder<NoRuntime> {
828 fn default() -> Self {
829 Self::new()
830 }
831}
832
833/// Producer-consumer database
834///
835/// A database instance with type-safe record registration and cross-record
836/// communication via the Emitter pattern. The type parameter `R` represents
837/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
838///
839/// See `examples/` for usage.
840///
841/// # Examples
842///
843/// ```rust,ignore
844/// use aimdb_tokio_adapter::TokioAdapter;
845///
846/// let runtime = Arc::new(TokioAdapter);
847/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
848/// .runtime(runtime)
849/// .register_record::<Temperature>(&TemperatureConfig)
850/// .build()?;
851/// ```
852pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
853 /// Internal state
854 inner: Arc<AimDbInner>,
855
856 /// Runtime adapter with concrete type
857 runtime: Arc<R>,
858}
859
860impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
861 fn clone(&self) -> Self {
862 Self {
863 inner: self.inner.clone(),
864 runtime: self.runtime.clone(),
865 }
866 }
867}
868
869impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
870 /// Internal accessor for the inner state
871 ///
872 /// Used by adapter crates and internal spawning logic.
873 #[doc(hidden)]
874 pub fn inner(&self) -> &Arc<AimDbInner> {
875 &self.inner
876 }
877
878 /// Builds a database with a closure-based builder pattern
879 pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
880 let mut b = AimDbBuilder::new().runtime(rt);
881 f(&mut b);
882 b.run().await
883 }
884
885 /// Spawns a task using the database's runtime adapter
886 ///
887 /// This method provides direct access to the runtime's spawn capability.
888 ///
889 /// # Arguments
890 /// * `future` - The future to spawn
891 ///
892 /// # Returns
893 /// `DbResult<()>` - Ok if the task was spawned successfully
894 pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
895 where
896 F: core::future::Future<Output = ()> + Send + 'static,
897 {
898 self.runtime.spawn(future).map_err(DbError::from)?;
899 Ok(())
900 }
901
902 /// Produces a value for a record type
903 ///
904 /// Writes the value to the record's buffer and triggers all consumers.
905 pub async fn produce<T>(&self, value: T) -> DbResult<()>
906 where
907 T: Send + 'static + Debug + Clone,
908 {
909 // Get the typed record using the helper
910 let typed_rec = self.inner.get_typed_record::<T, R>()?;
911
912 // Produce the value directly to the buffer
913 typed_rec.produce(value).await;
914 Ok(())
915 }
916
917 /// Subscribes to a record type's buffer
918 ///
919 /// Creates a subscription to the configured buffer for the given record type.
920 /// Returns a boxed reader for receiving values asynchronously.
921 ///
922 /// # Example
923 ///
924 /// ```rust,ignore
925 /// let mut reader = db.subscribe::<Temperature>()?;
926 ///
927 /// loop {
928 /// match reader.recv().await {
929 /// Ok(temp) => println!("Temperature: {:.1}°C", temp.celsius),
930 /// Err(_) => break,
931 /// }
932 /// }
933 /// ```
934 pub fn subscribe<T>(&self) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
935 where
936 T: Send + Sync + 'static + Debug + Clone,
937 {
938 // Get the typed record using the helper
939 let typed_rec = self.inner.get_typed_record::<T, R>()?;
940
941 // Subscribe to the buffer
942 typed_rec.subscribe()
943 }
944
945 /// Creates a type-safe producer for a specific record type
946 ///
947 /// Returns a `Producer<T, R>` that can only produce values of type `T`.
948 /// This is the recommended way to pass database access to producer services,
949 /// following the principle of least privilege.
950 ///
951 /// # Example
952 ///
953 /// ```rust,ignore
954 /// let db = builder.build()?;
955 /// let temp_producer = db.producer::<Temperature>();
956 ///
957 /// // Pass to service - it can only produce Temperature values
958 /// runtime.spawn(temperature_service(ctx, temp_producer)).unwrap();
959 /// ```
960 pub fn producer<T>(&self) -> crate::typed_api::Producer<T, R>
961 where
962 T: Send + 'static + Debug + Clone,
963 {
964 crate::typed_api::Producer::new(Arc::new(self.clone()))
965 }
966
967 /// Creates a type-safe consumer for a specific record type
968 ///
969 /// Returns a `Consumer<T, R>` that can only subscribe to values of type `T`.
970 /// This is the recommended way to pass database access to consumer services,
971 /// following the principle of least privilege.
972 ///
973 /// # Example
974 ///
975 /// ```rust,ignore
976 /// let db = builder.build()?;
977 /// let temp_consumer = db.consumer::<Temperature>();
978 ///
979 /// // Pass to service - it can only consume Temperature values
980 /// runtime.spawn(temperature_monitor(ctx, temp_consumer)).unwrap();
981 /// ```
982 pub fn consumer<T>(&self) -> crate::typed_api::Consumer<T, R>
983 where
984 T: Send + Sync + 'static + Debug + Clone,
985 {
986 crate::typed_api::Consumer::new(Arc::new(self.clone()))
987 }
988
989 // ========================================================================
990 // Key-based API (recommended for multi-instance records)
991 // ========================================================================
992
993 /// Produces a value to a specific record by key
994 ///
995 /// This is the recommended method when multiple records of the same type exist.
996 /// Uses O(1) key-based lookup to find the correct record.
997 ///
998 /// # Arguments
999 /// * `key` - The record key (e.g., "sensor.temperature")
1000 /// * `value` - The value to produce
1001 ///
1002 /// # Example
1003 ///
1004 /// ```rust,ignore
1005 /// // With multiple Temperature records
1006 /// db.produce_by_key::<Temperature>("sensors.indoor", indoor_temp).await?;
1007 /// db.produce_by_key::<Temperature>("sensors.outdoor", outdoor_temp).await?;
1008 /// ```
1009 pub async fn produce_by_key<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
1010 where
1011 T: Send + 'static + Debug + Clone,
1012 {
1013 let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1014 typed_rec.produce(value).await;
1015 Ok(())
1016 }
1017
1018 /// Subscribes to a specific record by key
1019 ///
1020 /// This is the recommended method when multiple records of the same type exist.
1021 /// Uses O(1) key-based lookup to find the correct record.
1022 ///
1023 /// # Arguments
1024 /// * `key` - The record key (e.g., "sensor.temperature")
1025 ///
1026 /// # Example
1027 ///
1028 /// ```rust,ignore
1029 /// let mut reader = db.subscribe_by_key::<Temperature>("sensors.indoor")?;
1030 /// while let Ok(temp) = reader.recv().await {
1031 /// println!("Indoor: {:.1}°C", temp.celsius);
1032 /// }
1033 /// ```
1034 pub fn subscribe_by_key<T>(
1035 &self,
1036 key: impl AsRef<str>,
1037 ) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
1038 where
1039 T: Send + Sync + 'static + Debug + Clone,
1040 {
1041 let typed_rec = self.inner.get_typed_record_by_key::<T, R>(key)?;
1042 typed_rec.subscribe()
1043 }
1044
1045 /// Creates a type-safe producer for a specific record by key
1046 ///
1047 /// Returns a `ProducerByKey<T, R>` bound to a specific record key.
1048 /// Use this when multiple records of the same type exist.
1049 ///
1050 /// # Arguments
1051 /// * `key` - The record key (e.g., "sensor.temperature")
1052 ///
1053 /// # Example
1054 ///
1055 /// ```rust,ignore
1056 /// let indoor_producer = db.producer_by_key::<Temperature>("sensors.indoor");
1057 /// let outdoor_producer = db.producer_by_key::<Temperature>("sensors.outdoor");
1058 ///
1059 /// // Each producer writes to its own record
1060 /// indoor_producer.produce(indoor_temp).await?;
1061 /// outdoor_producer.produce(outdoor_temp).await?;
1062 /// ```
1063 #[cfg(feature = "alloc")]
1064 pub fn producer_by_key<T>(
1065 &self,
1066 key: impl Into<alloc::string::String>,
1067 ) -> crate::typed_api::ProducerByKey<T, R>
1068 where
1069 T: Send + 'static + Debug + Clone,
1070 {
1071 crate::typed_api::ProducerByKey::new(Arc::new(self.clone()), key.into())
1072 }
1073
1074 /// Creates a type-safe consumer for a specific record by key
1075 ///
1076 /// Returns a `ConsumerByKey<T, R>` bound to a specific record key.
1077 /// Use this when multiple records of the same type exist.
1078 ///
1079 /// # Arguments
1080 /// * `key` - The record key (e.g., "sensor.temperature")
1081 ///
1082 /// # Example
1083 ///
1084 /// ```rust,ignore
1085 /// let indoor_consumer = db.consumer_by_key::<Temperature>("sensors.indoor");
1086 /// let outdoor_consumer = db.consumer_by_key::<Temperature>("sensors.outdoor");
1087 ///
1088 /// // Each consumer reads from its own record
1089 /// let mut rx = indoor_consumer.subscribe()?;
1090 /// ```
1091 #[cfg(feature = "alloc")]
1092 pub fn consumer_by_key<T>(
1093 &self,
1094 key: impl Into<alloc::string::String>,
1095 ) -> crate::typed_api::ConsumerByKey<T, R>
1096 where
1097 T: Send + Sync + 'static + Debug + Clone,
1098 {
1099 crate::typed_api::ConsumerByKey::new(Arc::new(self.clone()), key.into())
1100 }
1101
1102 /// Resolve a record key to its RecordId
1103 ///
1104 /// Useful for checking if a record exists before operations.
1105 ///
1106 /// # Example
1107 ///
1108 /// ```rust,ignore
1109 /// if let Some(id) = db.resolve_key("sensors.temperature") {
1110 /// println!("Record exists with ID: {}", id);
1111 /// }
1112 /// ```
1113 pub fn resolve_key(&self, key: &str) -> Option<crate::record_id::RecordId> {
1114 self.inner.resolve_str(key)
1115 }
1116
1117 /// Get all record IDs for a specific type
1118 ///
1119 /// Returns a slice of RecordIds for all records of type T.
1120 /// Useful for introspection when multiple records of the same type exist.
1121 ///
1122 /// # Example
1123 ///
1124 /// ```rust,ignore
1125 /// let temp_ids = db.records_of_type::<Temperature>();
1126 /// println!("Found {} temperature records", temp_ids.len());
1127 /// ```
1128 pub fn records_of_type<T: 'static>(&self) -> &[crate::record_id::RecordId] {
1129 self.inner.records_of_type::<T>()
1130 }
1131
1132 /// Returns a reference to the runtime adapter
1133 ///
1134 /// Provides direct access to the concrete runtime type.
1135 pub fn runtime(&self) -> &R {
1136 &self.runtime
1137 }
1138
1139 /// Lists all registered records (std only)
1140 ///
1141 /// Returns metadata for all registered records, useful for remote access introspection.
1142 /// Available only when the `std` feature is enabled.
1143 ///
1144 /// # Example
1145 /// ```rust,ignore
1146 /// let records = db.list_records();
1147 /// for record in records {
1148 /// println!("Record: {} ({})", record.name, record.type_id);
1149 /// }
1150 /// ```
1151 #[cfg(feature = "std")]
1152 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
1153 self.inner.list_records()
1154 }
1155
1156 /// Try to get record's latest value as JSON by name (std only)
1157 ///
1158 /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
1159 ///
1160 /// # Arguments
1161 /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
1162 ///
1163 /// # Returns
1164 /// `Some(JsonValue)` with current value, or `None` if unavailable
1165 #[cfg(feature = "std")]
1166 pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
1167 self.inner.try_latest_as_json(record_name)
1168 }
1169
1170 /// Sets a record value from JSON (remote access API)
1171 ///
1172 /// Deserializes JSON and produces the value to the record's buffer.
1173 ///
1174 /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
1175 /// records without active producers.
1176 ///
1177 /// # Arguments
1178 /// * `record_name` - Full Rust type name
1179 /// * `json_value` - JSON value to set
1180 ///
1181 /// # Returns
1182 /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
1183 ///
1184 /// # Example (internal use)
1185 /// ```rust,ignore
1186 /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
1187 /// ```
1188 #[cfg(feature = "std")]
1189 pub fn set_record_from_json(
1190 &self,
1191 record_name: &str,
1192 json_value: serde_json::Value,
1193 ) -> DbResult<()> {
1194 self.inner.set_record_from_json(record_name, json_value)
1195 }
1196
1197 /// Subscribe to record updates as JSON stream (std only)
1198 ///
1199 /// Creates a subscription to a record's buffer and forwards updates as JSON
1200 /// to a bounded channel. This is used internally by the remote access protocol
1201 /// for implementing `record.subscribe`.
1202 ///
1203 /// # Architecture
1204 ///
1205 /// Spawns a consumer task that:
1206 /// 1. Subscribes to the record's buffer using the existing buffer API
1207 /// 2. Reads values as they arrive
1208 /// 3. Serializes each value to JSON
1209 /// 4. Sends JSON values to a bounded channel (with backpressure handling)
1210 /// 5. Terminates when either:
1211 /// - The cancel signal is received (unsubscribe)
1212 /// - The channel receiver is dropped (client disconnected)
1213 ///
1214 /// # Arguments
1215 /// * `record_key` - Key of the record to subscribe to
1216 /// * `queue_size` - Size of the bounded channel for this subscription
1217 ///
1218 /// # Returns
1219 /// `Ok((receiver, cancel_tx))` where:
1220 /// - `receiver`: Bounded channel receiver for JSON values
1221 /// - `cancel_tx`: One-shot sender to cancel the subscription
1222 ///
1223 /// `Err` if:
1224 /// - Record not found for the given key
1225 /// - Record not configured with `.with_serialization()`
1226 /// - Failed to subscribe to buffer
1227 ///
1228 /// # Example (internal use)
1229 ///
1230 /// ```rust,ignore
1231 /// let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
1232 ///
1233 /// // Read events
1234 /// while let Some(json_value) = rx.recv().await {
1235 /// // Forward to client...
1236 /// }
1237 ///
1238 /// // Cancel subscription
1239 /// let _ = cancel_tx.send(());
1240 /// ```
1241 #[cfg(feature = "std")]
1242 #[allow(unused_variables)] // Variables used only in tracing feature
1243 pub fn subscribe_record_updates(
1244 &self,
1245 record_key: &str,
1246 queue_size: usize,
1247 ) -> DbResult<(
1248 tokio::sync::mpsc::Receiver<serde_json::Value>,
1249 tokio::sync::oneshot::Sender<()>,
1250 )> {
1251 use tokio::sync::{mpsc, oneshot};
1252
1253 // Find the record by key
1254 let id = self
1255 .inner
1256 .resolve_str(record_key)
1257 .ok_or_else(|| DbError::RecordKeyNotFound {
1258 key: record_key.to_string(),
1259 })?;
1260
1261 let record = self
1262 .inner
1263 .storage(id)
1264 .ok_or_else(|| DbError::InvalidRecordId { id: id.raw() })?;
1265
1266 // Subscribe to the record's buffer as JSON stream
1267 // This will fail if record not configured with .with_serialization()
1268 let mut json_reader = record.subscribe_json()?;
1269
1270 // Create channels for the subscription
1271 let (value_tx, value_rx) = mpsc::channel(queue_size);
1272 let (cancel_tx, mut cancel_rx) = oneshot::channel();
1273
1274 // Get metadata for logging
1275 let type_id = self.inner.types[id.index()];
1276 let key = self.inner.keys[id.index()].clone();
1277 let record_metadata = record.collect_metadata(type_id, key, id);
1278 let runtime = self.runtime.clone();
1279
1280 // Spawn consumer task that forwards JSON values from buffer to channel
1281 let spawn_result = runtime.spawn(async move {
1282 #[cfg(feature = "tracing")]
1283 tracing::debug!(
1284 "Subscription consumer task started for {}",
1285 record_metadata.name
1286 );
1287
1288 // Main event loop: read from buffer and forward to channel
1289 loop {
1290 tokio::select! {
1291 // Handle cancellation signal
1292 _ = &mut cancel_rx => {
1293 #[cfg(feature = "tracing")]
1294 tracing::debug!("Subscription cancelled");
1295 break;
1296 }
1297 // Read next JSON value from buffer
1298 result = json_reader.recv_json() => {
1299 match result {
1300 Ok(json_val) => {
1301 // Send JSON value to subscription channel
1302 if value_tx.send(json_val).await.is_err() {
1303 #[cfg(feature = "tracing")]
1304 tracing::debug!("Subscription receiver dropped");
1305 break;
1306 }
1307 }
1308 Err(DbError::BufferLagged { lag_count, .. }) => {
1309 // Consumer fell behind - log warning but continue
1310 #[cfg(feature = "tracing")]
1311 tracing::warn!(
1312 "Subscription for {} lagged by {} messages",
1313 record_metadata.name,
1314 lag_count
1315 );
1316 // Continue reading - next recv will get latest
1317 }
1318 Err(DbError::BufferClosed { .. }) => {
1319 // Buffer closed (shutdown) - exit gracefully
1320 #[cfg(feature = "tracing")]
1321 tracing::debug!("Buffer closed for {}", record_metadata.name);
1322 break;
1323 }
1324 Err(e) => {
1325 // Other error (shouldn't happen in practice)
1326 #[cfg(feature = "tracing")]
1327 tracing::error!(
1328 "Subscription error for {}: {:?}",
1329 record_metadata.name,
1330 e
1331 );
1332 break;
1333 }
1334 }
1335 }
1336 }
1337 }
1338
1339 #[cfg(feature = "tracing")]
1340 tracing::debug!("Subscription consumer task terminated");
1341 });
1342
1343 spawn_result.map_err(DbError::from)?;
1344
1345 Ok((value_rx, cancel_tx))
1346 }
1347
1348 /// Collects inbound connector routes for automatic router construction (std only)
1349 ///
1350 /// Iterates all records, filters their inbound_connectors by scheme,
1351 /// and returns routes with producer creation callbacks.
1352 ///
1353 /// # Arguments
1354 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1355 ///
1356 /// # Returns
1357 /// Vector of tuples: (topic, producer_trait, deserializer)
1358 ///
1359 /// # Example
1360 /// ```rust,ignore
1361 /// // In MqttConnector after db.build()
1362 /// let routes = db.collect_inbound_routes("mqtt");
1363 /// let router = RouterBuilder::from_routes(routes).build();
1364 /// connector.set_router(router).await?;
1365 /// ```
1366 #[cfg(feature = "alloc")]
1367 pub fn collect_inbound_routes(
1368 &self,
1369 scheme: &str,
1370 ) -> Vec<(
1371 String,
1372 Box<dyn crate::connector::ProducerTrait>,
1373 crate::connector::DeserializerFn,
1374 )> {
1375 let mut routes = Vec::new();
1376
1377 // Convert self to Arc<dyn Any> for producer factory
1378 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1379
1380 for record in &self.inner.storages {
1381 let inbound_links = record.inbound_connectors();
1382
1383 for link in inbound_links {
1384 // Filter by scheme
1385 if link.url.scheme() != scheme {
1386 continue;
1387 }
1388
1389 let topic = link.url.resource_id();
1390
1391 // Create producer using the stored factory
1392 if let Some(producer) = link.create_producer(db_any.clone()) {
1393 routes.push((topic, producer, link.deserializer.clone()));
1394 }
1395 }
1396 }
1397
1398 #[cfg(feature = "tracing")]
1399 if !routes.is_empty() {
1400 tracing::debug!(
1401 "Collected {} inbound routes for scheme '{}'",
1402 routes.len(),
1403 scheme
1404 );
1405 }
1406
1407 routes
1408 }
1409
1410 /// Collects outbound routes for a specific protocol scheme
1411 ///
1412 /// Mirrors `collect_inbound_routes()` for symmetry. Iterates all records,
1413 /// filters their outbound_connectors by scheme, and returns routes with
1414 /// consumer creation callbacks.
1415 ///
1416 /// This method is called by connectors during their `build()` phase to
1417 /// collect all configured outbound routes and spawn publisher tasks.
1418 ///
1419 /// # Arguments
1420 /// * `scheme` - URL scheme to filter by (e.g., "mqtt", "kafka")
1421 ///
1422 /// # Returns
1423 /// Vector of tuples: (destination, consumer_trait, serializer, config)
1424 ///
1425 /// The config Vec contains protocol-specific options (e.g., qos, retain).
1426 ///
1427 /// # Example
1428 /// ```rust,ignore
1429 /// // In MqttConnector::build()
1430 /// let routes = db.collect_outbound_routes("mqtt");
1431 /// for (topic, consumer, serializer, config) in routes {
1432 /// connector.spawn_publisher(topic, consumer, serializer, config)?;
1433 /// }
1434 /// ```
1435 #[cfg(feature = "alloc")]
1436 pub fn collect_outbound_routes(&self, scheme: &str) -> Vec<OutboundRoute> {
1437 let mut routes = Vec::new();
1438
1439 // Convert self to Arc<dyn Any> for consumer factory
1440 // This is necessary because the factory takes Arc<dyn Any> to avoid
1441 // needing to know the runtime type R at the factory definition site
1442 let db_any: Arc<dyn core::any::Any + Send + Sync> = Arc::new(self.clone());
1443
1444 for record in &self.inner.storages {
1445 let outbound_links = record.outbound_connectors();
1446
1447 for link in outbound_links {
1448 // Filter by scheme
1449 if link.url.scheme() != scheme {
1450 continue;
1451 }
1452
1453 let destination = link.url.resource_id();
1454
1455 // Skip links without serializer
1456 let Some(serializer) = link.serializer.clone() else {
1457 #[cfg(feature = "tracing")]
1458 tracing::warn!("Outbound link '{}' has no serializer, skipping", link.url);
1459 continue;
1460 };
1461
1462 // Create consumer using the stored factory
1463 if let Some(consumer) = link.create_consumer(db_any.clone()) {
1464 routes.push((destination, consumer, serializer, link.config.clone()));
1465 }
1466 }
1467 }
1468
1469 #[cfg(feature = "tracing")]
1470 if !routes.is_empty() {
1471 tracing::debug!(
1472 "Collected {} outbound routes for scheme '{}'",
1473 routes.len(),
1474 scheme
1475 );
1476 }
1477
1478 routes
1479 }
1480}