aimdb_core/builder.rs
1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::collections::BTreeMap;
13
14#[cfg(not(feature = "std"))]
15use alloc::{boxed::Box, string::String, sync::Arc};
16
17#[cfg(feature = "std")]
18use std::{boxed::Box, string::String, sync::Arc};
19
20use crate::typed_api::{RecordRegistrar, RecordT};
21use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
22use crate::{DbError, DbResult};
23
24/// Marker type for untyped builder (before runtime is set)
25pub struct NoRuntime;
26
27/// Internal database state
28///
29/// Holds the registry of typed records, indexed by `TypeId`.
30pub struct AimDbInner {
31 /// Map from TypeId to type-erased records (SPMC buffers for internal data flow)
32 pub records: BTreeMap<TypeId, Box<dyn AnyRecord>>,
33}
34
35impl AimDbInner {
36 /// Helper to get a typed record from the registry
37 ///
38 /// This encapsulates the common pattern of:
39 /// 1. Getting TypeId for type T
40 /// 2. Looking up the record in the map
41 /// 3. Downcasting to the typed record
42 pub fn get_typed_record<T, R>(&self) -> DbResult<&TypedRecord<T, R>>
43 where
44 T: Send + 'static + Debug + Clone,
45 R: aimdb_executor::Spawn + 'static,
46 {
47 use crate::typed_record::AnyRecordExt;
48
49 let type_id = TypeId::of::<T>();
50
51 #[cfg(feature = "std")]
52 let record = self.records.get(&type_id).ok_or(DbError::RecordNotFound {
53 record_name: core::any::type_name::<T>().to_string(),
54 })?;
55
56 #[cfg(not(feature = "std"))]
57 let record = self
58 .records
59 .get(&type_id)
60 .ok_or(DbError::RecordNotFound { _record_name: () })?;
61
62 #[cfg(feature = "std")]
63 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
64 operation: "get_typed_record".to_string(),
65 reason: "type mismatch during downcast".to_string(),
66 })?;
67
68 #[cfg(not(feature = "std"))]
69 let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
70 _operation: (),
71 _reason: (),
72 })?;
73
74 Ok(typed_record)
75 }
76
77 /// Collects metadata for all registered records (std only)
78 ///
79 /// Returns a vector of `RecordMetadata` for remote access introspection.
80 /// Available only when the `std` feature is enabled.
81 #[cfg(feature = "std")]
82 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
83 self.records
84 .iter()
85 .map(|(type_id, record)| record.collect_metadata(*type_id))
86 .collect()
87 }
88
89 /// Try to get record's latest value as JSON by record name (std only)
90 ///
91 /// Searches for a record with the given name and returns its current value
92 /// serialized to JSON. Returns `None` if:
93 /// - Record not found
94 /// - Record not configured with `.with_serialization()`
95 /// - No value available in the atomic snapshot
96 ///
97 /// # Arguments
98 /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
99 ///
100 /// # Returns
101 /// `Some(JsonValue)` with the current record value, or `None`
102 #[cfg(feature = "std")]
103 pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
104 for (type_id, record) in &self.records {
105 let metadata = record.collect_metadata(*type_id);
106 if metadata.name == record_name {
107 return record.latest_json();
108 }
109 }
110 None
111 }
112
113 /// Sets a record value from JSON (remote access API)
114 ///
115 /// Deserializes the JSON value and writes it to the record's buffer.
116 ///
117 /// **SAFETY:** Enforces the "No Producer Override" rule:
118 /// - Only works for records with `producer_count == 0`
119 /// - Returns error if the record has active producers
120 ///
121 /// # Arguments
122 /// * `record_name` - The full Rust type name (e.g., "server::AppConfig")
123 /// * `json_value` - JSON representation of the value
124 ///
125 /// # Returns
126 /// - `Ok(())` - Successfully set the value
127 /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
128 ///
129 /// # Errors
130 /// - `RecordNotFound` - Record with given name doesn't exist
131 /// - `PermissionDenied` - Record has active producers (safety check)
132 /// - `RuntimeError` - Record not configured with `.with_serialization()`
133 /// - `JsonWithContext` - JSON deserialization failed (schema mismatch)
134 ///
135 /// # Example (internal use - called by remote access protocol)
136 /// ```rust,ignore
137 /// let json_val = serde_json::json!({"log_level": "debug", "version": "1.0"});
138 /// db.set_record_from_json("server::AppConfig", json_val)?;
139 /// ```
140 #[cfg(feature = "std")]
141 pub fn set_record_from_json(
142 &self,
143 record_name: &str,
144 json_value: serde_json::Value,
145 ) -> DbResult<()> {
146 // Find the record by name
147 for (type_id, record) in &self.records {
148 let metadata = record.collect_metadata(*type_id);
149 if metadata.name == record_name {
150 // Delegate to the type-erased set_from_json method
151 // which will enforce the "no producer override" rule
152 return record.set_from_json(json_value);
153 }
154 }
155
156 // Record not found
157 Err(DbError::RecordNotFound {
158 record_name: record_name.to_string(),
159 })
160 }
161}
162
163/// Database builder for producer-consumer pattern
164///
165/// Provides a fluent API for constructing databases with type-safe record registration.
166/// Use `.runtime()` to set the runtime and transition to a typed builder.
167pub struct AimDbBuilder<R = NoRuntime> {
168 /// Registry of typed records
169 records: BTreeMap<TypeId, Box<dyn AnyRecord>>,
170
171 /// Runtime adapter
172 runtime: Option<Arc<R>>,
173
174 /// Connectors indexed by scheme (mqtt, shmem, kafka, etc.)
175 pub(crate) connectors: BTreeMap<String, Arc<dyn crate::transport::Connector>>,
176
177 /// Spawn functions indexed by TypeId
178 spawn_fns: BTreeMap<TypeId, Box<dyn core::any::Any + Send>>,
179
180 /// Remote access configuration (std only)
181 #[cfg(feature = "std")]
182 remote_config: Option<crate::remote::AimxConfig>,
183
184 /// PhantomData to track the runtime type parameter
185 _phantom: PhantomData<R>,
186}
187
188impl AimDbBuilder<NoRuntime> {
189 /// Creates a new database builder without a runtime
190 ///
191 /// Call `.runtime()` to set the runtime adapter.
192 pub fn new() -> Self {
193 Self {
194 records: BTreeMap::new(),
195 runtime: None,
196 connectors: BTreeMap::new(),
197 spawn_fns: BTreeMap::new(),
198 #[cfg(feature = "std")]
199 remote_config: None,
200 _phantom: PhantomData,
201 }
202 }
203
204 /// Sets the runtime adapter
205 ///
206 /// This transitions the builder from untyped to typed with concrete runtime `R`.
207 pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
208 where
209 R: aimdb_executor::Spawn + 'static,
210 {
211 AimDbBuilder {
212 records: self.records,
213 runtime: Some(rt),
214 connectors: self.connectors,
215 spawn_fns: BTreeMap::new(),
216 #[cfg(feature = "std")]
217 remote_config: None,
218 _phantom: PhantomData,
219 }
220 }
221}
222
223impl<R> AimDbBuilder<R>
224where
225 R: aimdb_executor::Spawn + 'static,
226{
227 /// Registers a connector for a specific URL scheme
228 ///
229 /// The scheme (e.g., "mqtt", "shmem", "kafka") determines how `.link()` URLs
230 /// are routed. Each scheme can have ONE connector, which manages connections to
231 /// a specific endpoint (broker, segment, cluster, etc.).
232 ///
233 /// # Arguments
234 /// * `scheme` - URL scheme without "://" (e.g., "mqtt", "shmem", "kafka")
235 /// * `connector` - Connector implementing the Connector trait
236 ///
237 /// # Example
238 ///
239 /// ```rust,ignore
240 /// use aimdb_mqtt_connector::MqttConnector;
241 /// use std::sync::Arc;
242 ///
243 /// let mqtt_connector = MqttConnector::new("mqtt://broker.local:1883").await?;
244 /// let shmem_connector = ShmemConnector::new("/dev/shm/aimdb");
245 ///
246 /// let builder = AimDbBuilder::new()
247 /// .runtime(runtime)
248 /// .with_connector("mqtt", Arc::new(mqtt_connector))
249 /// .with_connector("shmem", Arc::new(shmem_connector));
250 ///
251 /// // Now .link() can route to either:
252 /// // .link("mqtt://sensors/temp") → mqtt_connector
253 /// // .link("shmem://temp_data") → shmem_connector
254 /// ```
255 pub fn with_connector(
256 mut self,
257 scheme: impl Into<String>,
258 connector: Arc<dyn crate::transport::Connector>,
259 ) -> Self {
260 self.connectors.insert(scheme.into(), connector);
261 self
262 }
263
264 /// Enables remote access via AimX protocol (std only)
265 ///
266 /// Configures the database to accept remote connections over a Unix domain socket,
267 /// allowing external clients to introspect records, subscribe to updates, and
268 /// (optionally) write data.
269 ///
270 /// The remote access supervisor will be spawned automatically during `build()`.
271 ///
272 /// # Arguments
273 /// * `config` - Remote access configuration (socket path, security policy, etc.)
274 ///
275 /// # Example
276 ///
277 /// ```rust,ignore
278 /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
279 ///
280 /// let config = AimxConfig::new("/tmp/aimdb.sock")
281 /// .with_security(SecurityPolicy::read_only());
282 ///
283 /// let db = AimDbBuilder::new()
284 /// .runtime(runtime)
285 /// .with_remote_access(config)
286 /// .build()?;
287 /// ```
288 #[cfg(feature = "std")]
289 pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
290 self.remote_config = Some(config);
291 self
292 }
293
294 /// Configures a record type manually
295 ///
296 /// Low-level method for advanced use cases. Most users should use `register_record` instead.
297 pub fn configure<T>(
298 &mut self,
299 f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
300 ) -> &mut Self
301 where
302 T: Send + Sync + 'static + Debug + Clone,
303 {
304 let entry = self
305 .records
306 .entry(TypeId::of::<T>())
307 .or_insert_with(|| Box::new(TypedRecord::<T, R>::new()));
308
309 let rec = entry
310 .as_typed_mut::<T, R>()
311 .expect("type mismatch in record registry");
312
313 let mut reg = RecordRegistrar {
314 rec,
315 connectors: &self.connectors,
316 };
317 f(&mut reg);
318
319 // Store a spawn function that captures the concrete type T
320 let type_id = TypeId::of::<T>();
321 #[allow(clippy::type_complexity)]
322 let spawn_fn: Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>) -> DbResult<()> + Send> =
323 Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>| {
324 // Use RecordSpawner to spawn tasks for this record type
325 use crate::typed_record::RecordSpawner;
326
327 let typed_record = db.inner().get_typed_record::<T, R>()?;
328 RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db)
329 });
330
331 // Store the spawn function (type-erased in Box<dyn Any>)
332 self.spawn_fns.insert(type_id, Box::new(spawn_fn));
333
334 self
335 }
336
337 /// Registers a self-registering record type
338 ///
339 /// The record type must implement `RecordT<R>`.
340 pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
341 where
342 T: RecordT<R>,
343 {
344 self.configure::<T>(|reg| T::register(reg, cfg))
345 }
346
347 /// Runs the database indefinitely (never returns)
348 ///
349 /// This method builds the database, spawns all producer and consumer tasks, and then
350 /// parks the current task indefinitely. This is the primary way to run AimDB services.
351 ///
352 /// All logic runs in background tasks via producers, consumers, and connectors. The
353 /// application continues until interrupted (e.g., Ctrl+C).
354 ///
355 /// # Returns
356 /// `DbResult<()>` - Ok when database starts successfully, then parks forever
357 ///
358 /// # Example
359 ///
360 /// ```rust,ignore
361 /// #[tokio::main]
362 /// async fn main() -> DbResult<()> {
363 /// AimDbBuilder::new()
364 /// .runtime(Arc::new(TokioAdapter::new()?))
365 /// .configure::<MyData>(|reg| {
366 /// reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
367 /// .with_source(my_producer)
368 /// .with_tap(my_consumer);
369 /// })
370 /// .run().await // Runs forever
371 /// }
372 /// ```
373 pub async fn run(self) -> DbResult<()> {
374 // Build the database and spawn all tasks
375 let _db = self.build()?;
376
377 #[cfg(feature = "tracing")]
378 tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
379
380 // Park indefinitely - the background tasks will continue running
381 // The database handle is kept alive here to prevent dropping it
382 core::future::pending::<()>().await;
383
384 Ok(())
385 }
386
387 /// Builds the database and returns the handle (advanced use)
388 ///
389 /// Use this when you need programmatic access to the database handle for
390 /// manual subscriptions or production. For typical services, use `.run().await` instead.
391 ///
392 /// **Automatic Task Spawning:** This method spawns all producer services and
393 /// `.tap()` observer tasks that were registered during configuration.
394 ///
395 /// # Returns
396 /// `DbResult<AimDb<R>>` - The database instance
397 ///
398 /// # Example
399 ///
400 /// ```rust,ignore
401 /// let db = AimDbBuilder::new()
402 /// .runtime(Arc::new(TokioAdapter::new()?))
403 /// .configure::<MyData>(|reg| { /* ... */ })
404 /// .build()?;
405 ///
406 /// // Manually subscribe or produce
407 /// let mut reader = db.subscribe::<MyData>()?;
408 /// ```
409 pub fn build(self) -> DbResult<AimDb<R>> {
410 use crate::DbError;
411
412 // Validate all records
413 for record in self.records.values() {
414 record.validate().map_err(|_msg| {
415 #[cfg(feature = "std")]
416 {
417 DbError::RuntimeError {
418 message: format!("Record validation failed: {}", _msg),
419 }
420 }
421 #[cfg(not(feature = "std"))]
422 {
423 DbError::RuntimeError { _message: () }
424 }
425 })?;
426 }
427
428 // Ensure runtime is set
429 let runtime = self.runtime.ok_or({
430 #[cfg(feature = "std")]
431 {
432 DbError::RuntimeError {
433 message: "runtime not set (use .runtime())".into(),
434 }
435 }
436 #[cfg(not(feature = "std"))]
437 {
438 DbError::RuntimeError { _message: () }
439 }
440 })?;
441
442 let inner = Arc::new(AimDbInner {
443 records: self.records,
444 });
445
446 let db = Arc::new(AimDb {
447 inner: inner.clone(),
448 runtime: runtime.clone(),
449 });
450
451 #[cfg(feature = "tracing")]
452 tracing::info!(
453 "Spawning producer services and tap observers for {} record types",
454 self.spawn_fns.len()
455 );
456
457 // Execute spawn functions for each record type
458 for (_type_id, spawn_fn_any) in self.spawn_fns {
459 // Downcast from Box<dyn Any> back to the concrete spawn function type
460 type SpawnFnType<R> = Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>) -> DbResult<()> + Send>;
461
462 let spawn_fn = spawn_fn_any
463 .downcast::<SpawnFnType<R>>()
464 .expect("spawn function type mismatch");
465
466 // Execute the spawn function
467 (*spawn_fn)(&runtime, &db)?;
468 }
469
470 #[cfg(feature = "tracing")]
471 tracing::info!("Automatic spawning complete");
472
473 // Spawn remote access supervisor if configured (std only)
474 #[cfg(feature = "std")]
475 if let Some(remote_cfg) = self.remote_config {
476 #[cfg(feature = "tracing")]
477 tracing::info!(
478 "Spawning remote access supervisor on socket: {}",
479 remote_cfg.socket_path.display()
480 );
481
482 // Apply security policy to mark writable records
483 let writable_type_ids = remote_cfg.security_policy.writable_records();
484 for (type_id, record) in inner.records.iter() {
485 if writable_type_ids.contains(type_id) {
486 #[cfg(feature = "tracing")]
487 tracing::debug!("Marking record {:?} as writable", type_id);
488
489 // Mark the record as writable (type-erased call)
490 record.set_writable_erased(true);
491 }
492 }
493
494 // Spawn the remote supervisor task
495 // This will be implemented in Task 6
496 crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
497
498 #[cfg(feature = "tracing")]
499 tracing::info!("Remote access supervisor spawned successfully");
500 }
501
502 // Unwrap the Arc to return the owned AimDb
503 // This is safe because we just created it and hold the only reference
504 let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
505
506 Ok(db_owned)
507 }
508}
509
510impl Default for AimDbBuilder<NoRuntime> {
511 fn default() -> Self {
512 Self::new()
513 }
514}
515
516/// Producer-consumer database
517///
518/// A database instance with type-safe record registration and cross-record
519/// communication via the Emitter pattern. The type parameter `R` represents
520/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
521///
522/// See `examples/` for usage.
523///
524/// # Examples
525///
526/// ```rust,ignore
527/// use aimdb_tokio_adapter::TokioAdapter;
528///
529/// let runtime = Arc::new(TokioAdapter);
530/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
531/// .runtime(runtime)
532/// .register_record::<Temperature>(&TemperatureConfig)
533/// .build()?;
534/// ```
535pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
536 /// Internal state
537 inner: Arc<AimDbInner>,
538
539 /// Runtime adapter with concrete type
540 runtime: Arc<R>,
541}
542
543impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
544 fn clone(&self) -> Self {
545 Self {
546 inner: self.inner.clone(),
547 runtime: self.runtime.clone(),
548 }
549 }
550}
551
552impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
553 /// Internal accessor for the inner state
554 ///
555 /// Used by adapter crates and internal spawning logic.
556 #[doc(hidden)]
557 pub fn inner(&self) -> &Arc<AimDbInner> {
558 &self.inner
559 }
560
561 /// Builds a database with a closure-based builder pattern
562 pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
563 let mut b = AimDbBuilder::new().runtime(rt);
564 f(&mut b);
565 b.run().await
566 }
567
568 /// Spawns a task using the database's runtime adapter
569 ///
570 /// This method provides direct access to the runtime's spawn capability.
571 ///
572 /// # Arguments
573 /// * `future` - The future to spawn
574 ///
575 /// # Returns
576 /// `DbResult<()>` - Ok if the task was spawned successfully
577 pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
578 where
579 F: core::future::Future<Output = ()> + Send + 'static,
580 {
581 self.runtime.spawn(future).map_err(DbError::from)?;
582 Ok(())
583 }
584
585 /// Produces a value for a record type
586 ///
587 /// Writes the value to the record's buffer and triggers all consumers.
588 pub async fn produce<T>(&self, value: T) -> DbResult<()>
589 where
590 T: Send + 'static + Debug + Clone,
591 {
592 // Get the typed record using the helper
593 let typed_rec = self.inner.get_typed_record::<T, R>()?;
594
595 // Produce the value directly to the buffer
596 typed_rec.produce(value).await;
597 Ok(())
598 }
599
600 /// Subscribes to a record type's buffer
601 ///
602 /// Creates a subscription to the configured buffer for the given record type.
603 /// Returns a boxed reader for receiving values asynchronously.
604 ///
605 /// # Example
606 ///
607 /// ```rust,ignore
608 /// let mut reader = db.subscribe::<Temperature>()?;
609 ///
610 /// loop {
611 /// match reader.recv().await {
612 /// Ok(temp) => println!("Temperature: {:.1}°C", temp.celsius),
613 /// Err(_) => break,
614 /// }
615 /// }
616 /// ```
617 pub fn subscribe<T>(&self) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
618 where
619 T: Send + Sync + 'static + Debug + Clone,
620 {
621 // Get the typed record using the helper
622 let typed_rec = self.inner.get_typed_record::<T, R>()?;
623
624 // Subscribe to the buffer
625 typed_rec.subscribe()
626 }
627
628 /// Creates a type-safe producer for a specific record type
629 ///
630 /// Returns a `Producer<T, R>` that can only produce values of type `T`.
631 /// This is the recommended way to pass database access to producer services,
632 /// following the principle of least privilege.
633 ///
634 /// # Example
635 ///
636 /// ```rust,ignore
637 /// let db = builder.build()?;
638 /// let temp_producer = db.producer::<Temperature>();
639 ///
640 /// // Pass to service - it can only produce Temperature values
641 /// runtime.spawn(temperature_service(ctx, temp_producer)).unwrap();
642 /// ```
643 pub fn producer<T>(&self) -> crate::typed_api::Producer<T, R>
644 where
645 T: Send + 'static + Debug + Clone,
646 {
647 crate::typed_api::Producer::new(Arc::new(self.clone()))
648 }
649
650 /// Creates a type-safe consumer for a specific record type
651 ///
652 /// Returns a `Consumer<T, R>` that can only subscribe to values of type `T`.
653 /// This is the recommended way to pass database access to consumer services,
654 /// following the principle of least privilege.
655 ///
656 /// # Example
657 ///
658 /// ```rust,ignore
659 /// let db = builder.build()?;
660 /// let temp_consumer = db.consumer::<Temperature>();
661 ///
662 /// // Pass to service - it can only consume Temperature values
663 /// runtime.spawn(temperature_monitor(ctx, temp_consumer)).unwrap();
664 /// ```
665 pub fn consumer<T>(&self) -> crate::typed_api::Consumer<T, R>
666 where
667 T: Send + Sync + 'static + Debug + Clone,
668 {
669 crate::typed_api::Consumer::new(Arc::new(self.clone()))
670 }
671
672 /// Returns a reference to the runtime adapter
673 ///
674 /// Provides direct access to the concrete runtime type.
675 pub fn runtime(&self) -> &R {
676 &self.runtime
677 }
678
679 /// Lists all registered records (std only)
680 ///
681 /// Returns metadata for all registered records, useful for remote access introspection.
682 /// Available only when the `std` feature is enabled.
683 ///
684 /// # Example
685 /// ```rust,ignore
686 /// let records = db.list_records();
687 /// for record in records {
688 /// println!("Record: {} ({})", record.name, record.type_id);
689 /// }
690 /// ```
691 #[cfg(feature = "std")]
692 pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
693 self.inner.list_records()
694 }
695
696 /// Try to get record's latest value as JSON by name (std only)
697 ///
698 /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
699 ///
700 /// # Arguments
701 /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
702 ///
703 /// # Returns
704 /// `Some(JsonValue)` with current value, or `None` if unavailable
705 #[cfg(feature = "std")]
706 pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
707 self.inner.try_latest_as_json(record_name)
708 }
709
710 /// Sets a record value from JSON (remote access API)
711 ///
712 /// Deserializes JSON and produces the value to the record's buffer.
713 ///
714 /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
715 /// records without active producers.
716 ///
717 /// # Arguments
718 /// * `record_name` - Full Rust type name
719 /// * `json_value` - JSON value to set
720 ///
721 /// # Returns
722 /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
723 ///
724 /// # Example (internal use)
725 /// ```rust,ignore
726 /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
727 /// ```
728 #[cfg(feature = "std")]
729 pub fn set_record_from_json(
730 &self,
731 record_name: &str,
732 json_value: serde_json::Value,
733 ) -> DbResult<()> {
734 self.inner.set_record_from_json(record_name, json_value)
735 }
736
737 /// Subscribe to record updates as JSON stream (std only)
738 ///
739 /// Creates a subscription to a record's buffer and forwards updates as JSON
740 /// to a bounded channel. This is used internally by the remote access protocol
741 /// for implementing `record.subscribe`.
742 ///
743 /// # Architecture
744 ///
745 /// Spawns a consumer task that:
746 /// 1. Subscribes to the record's buffer using the existing buffer API
747 /// 2. Reads values as they arrive
748 /// 3. Serializes each value to JSON
749 /// 4. Sends JSON values to a bounded channel (with backpressure handling)
750 /// 5. Terminates when either:
751 /// - The cancel signal is received (unsubscribe)
752 /// - The channel receiver is dropped (client disconnected)
753 ///
754 /// # Arguments
755 /// * `type_id` - TypeId of the record to subscribe to
756 /// * `queue_size` - Size of the bounded channel for this subscription
757 ///
758 /// # Returns
759 /// `Ok((receiver, cancel_tx))` where:
760 /// - `receiver`: Bounded channel receiver for JSON values
761 /// - `cancel_tx`: One-shot sender to cancel the subscription
762 ///
763 /// `Err` if:
764 /// - Record not found for the given TypeId
765 /// - Record not configured with `.with_serialization()`
766 /// - Failed to subscribe to buffer
767 ///
768 /// # Example (internal use)
769 ///
770 /// ```rust,ignore
771 /// let type_id = TypeId::of::<Temperature>();
772 /// let (mut rx, cancel_tx) = db.subscribe_record_updates(type_id, 100)?;
773 ///
774 /// // Read events
775 /// while let Some(json_value) = rx.recv().await {
776 /// // Forward to client...
777 /// }
778 ///
779 /// // Cancel subscription
780 /// let _ = cancel_tx.send(());
781 /// ```
782 #[cfg(feature = "std")]
783 #[allow(unused_variables)] // Variables used only in tracing feature
784 pub fn subscribe_record_updates(
785 &self,
786 type_id: TypeId,
787 queue_size: usize,
788 ) -> DbResult<(
789 tokio::sync::mpsc::Receiver<serde_json::Value>,
790 tokio::sync::oneshot::Sender<()>,
791 )> {
792 use tokio::sync::{mpsc, oneshot};
793
794 // Find the record by TypeId
795 let record = self
796 .inner
797 .records
798 .get(&type_id)
799 .ok_or(DbError::RecordNotFound {
800 record_name: format!("TypeId({:?})", type_id),
801 })?;
802
803 // Subscribe to the record's buffer as JSON stream
804 // This will fail if record not configured with .with_serialization()
805 let mut json_reader = record.subscribe_json()?;
806
807 // Create channels for the subscription
808 let (value_tx, value_rx) = mpsc::channel(queue_size);
809 let (cancel_tx, mut cancel_rx) = oneshot::channel();
810
811 // Get metadata for logging
812 let record_metadata = record.collect_metadata(type_id);
813 let runtime = self.runtime.clone();
814
815 // Spawn consumer task that forwards JSON values from buffer to channel
816 let spawn_result = runtime.spawn(async move {
817 #[cfg(feature = "tracing")]
818 tracing::debug!(
819 "Subscription consumer task started for {}",
820 record_metadata.name
821 );
822
823 // Main event loop: read from buffer and forward to channel
824 loop {
825 tokio::select! {
826 // Handle cancellation signal
827 _ = &mut cancel_rx => {
828 #[cfg(feature = "tracing")]
829 tracing::debug!("Subscription cancelled");
830 break;
831 }
832 // Read next JSON value from buffer
833 result = json_reader.recv_json() => {
834 match result {
835 Ok(json_val) => {
836 // Send JSON value to subscription channel
837 if value_tx.send(json_val).await.is_err() {
838 #[cfg(feature = "tracing")]
839 tracing::debug!("Subscription receiver dropped");
840 break;
841 }
842 }
843 Err(DbError::BufferLagged { lag_count, .. }) => {
844 // Consumer fell behind - log warning but continue
845 #[cfg(feature = "tracing")]
846 tracing::warn!(
847 "Subscription for {} lagged by {} messages",
848 record_metadata.name,
849 lag_count
850 );
851 // Continue reading - next recv will get latest
852 }
853 Err(DbError::BufferClosed { .. }) => {
854 // Buffer closed (shutdown) - exit gracefully
855 #[cfg(feature = "tracing")]
856 tracing::debug!("Buffer closed for {}", record_metadata.name);
857 break;
858 }
859 Err(e) => {
860 // Other error (shouldn't happen in practice)
861 #[cfg(feature = "tracing")]
862 tracing::error!(
863 "Subscription error for {}: {:?}",
864 record_metadata.name,
865 e
866 );
867 break;
868 }
869 }
870 }
871 }
872 }
873
874 #[cfg(feature = "tracing")]
875 tracing::debug!("Subscription consumer task terminated");
876 });
877
878 spawn_result.map_err(DbError::from)?;
879
880 Ok((value_rx, cancel_tx))
881 }
882}
883
884#[cfg(test)]
885mod tests {
886 // NOTE: Tests commented out because RecordT<R> now requires R: aimdb_executor::Spawn,
887 // and we can't use () as a dummy runtime. See examples/ for working tests with real runtimes.
888
889 /*
890 use super::*;
891
892 #[derive(Debug, Clone, PartialEq)]
893 struct TestData {
894 value: i32,
895 }
896
897 struct TestConfig;
898
899 impl RecordT<R> for TestData {
900 type Config = TestConfig;
901
902 fn register<'a>(reg: &'a mut RecordRegistrar<'a, Self, R>, _cfg: &Self::Config) {
903 reg.source(|_prod, _ctx| async {}).tap(|_consumer| async {});
904 }
905 }
906
907 #[test]
908 fn test_builder_basic() {
909 let mut builder = AimDbBuilder::new();
910 builder.register_record::<TestData>(&TestConfig);
911
912 // Should have one record registered
913 assert_eq!(builder.records.len(), 1);
914 }
915
916 #[test]
917 fn test_builder_configure() {
918 let mut builder = AimDbBuilder::new();
919 builder.configure::<TestData>(|reg| {
920 reg.source(|_prod, _ctx| async {}).tap(|_consumer| async {});
921 });
922
923 assert_eq!(builder.records.len(), 1);
924 }
925 */
926}