aimdb_core/
builder.rs

1//! Database builder with type-safe record registration
2//!
3//! Provides `AimDb` and `AimDbBuilder` for constructing databases with
4//! type-safe, self-registering records using the producer-consumer pattern.
5
6use core::any::TypeId;
7use core::fmt::Debug;
8use core::marker::PhantomData;
9
10extern crate alloc;
11
12use alloc::collections::BTreeMap;
13
14#[cfg(not(feature = "std"))]
15use alloc::{boxed::Box, string::String, sync::Arc};
16
17#[cfg(feature = "std")]
18use std::{boxed::Box, string::String, sync::Arc};
19
20use crate::typed_api::{RecordRegistrar, RecordT};
21use crate::typed_record::{AnyRecord, AnyRecordExt, TypedRecord};
22use crate::{DbError, DbResult};
23
24/// Marker type for untyped builder (before runtime is set)
25pub struct NoRuntime;
26
27/// Internal database state
28///
29/// Holds the registry of typed records, indexed by `TypeId`.
30pub struct AimDbInner {
31    /// Map from TypeId to type-erased records (SPMC buffers for internal data flow)
32    pub records: BTreeMap<TypeId, Box<dyn AnyRecord>>,
33}
34
35impl AimDbInner {
36    /// Helper to get a typed record from the registry
37    ///
38    /// This encapsulates the common pattern of:
39    /// 1. Getting TypeId for type T
40    /// 2. Looking up the record in the map
41    /// 3. Downcasting to the typed record
42    pub fn get_typed_record<T, R>(&self) -> DbResult<&TypedRecord<T, R>>
43    where
44        T: Send + 'static + Debug + Clone,
45        R: aimdb_executor::Spawn + 'static,
46    {
47        use crate::typed_record::AnyRecordExt;
48
49        let type_id = TypeId::of::<T>();
50
51        #[cfg(feature = "std")]
52        let record = self.records.get(&type_id).ok_or(DbError::RecordNotFound {
53            record_name: core::any::type_name::<T>().to_string(),
54        })?;
55
56        #[cfg(not(feature = "std"))]
57        let record = self
58            .records
59            .get(&type_id)
60            .ok_or(DbError::RecordNotFound { _record_name: () })?;
61
62        #[cfg(feature = "std")]
63        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
64            operation: "get_typed_record".to_string(),
65            reason: "type mismatch during downcast".to_string(),
66        })?;
67
68        #[cfg(not(feature = "std"))]
69        let typed_record = record.as_typed::<T, R>().ok_or(DbError::InvalidOperation {
70            _operation: (),
71            _reason: (),
72        })?;
73
74        Ok(typed_record)
75    }
76
77    /// Collects metadata for all registered records (std only)
78    ///
79    /// Returns a vector of `RecordMetadata` for remote access introspection.
80    /// Available only when the `std` feature is enabled.
81    #[cfg(feature = "std")]
82    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
83        self.records
84            .iter()
85            .map(|(type_id, record)| record.collect_metadata(*type_id))
86            .collect()
87    }
88
89    /// Try to get record's latest value as JSON by record name (std only)
90    ///
91    /// Searches for a record with the given name and returns its current value
92    /// serialized to JSON. Returns `None` if:
93    /// - Record not found
94    /// - Record not configured with `.with_serialization()`
95    /// - No value available in the atomic snapshot
96    ///
97    /// # Arguments
98    /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
99    ///
100    /// # Returns
101    /// `Some(JsonValue)` with the current record value, or `None`
102    #[cfg(feature = "std")]
103    pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
104        for (type_id, record) in &self.records {
105            let metadata = record.collect_metadata(*type_id);
106            if metadata.name == record_name {
107                return record.latest_json();
108            }
109        }
110        None
111    }
112
113    /// Sets a record value from JSON (remote access API)
114    ///
115    /// Deserializes the JSON value and writes it to the record's buffer.
116    ///
117    /// **SAFETY:** Enforces the "No Producer Override" rule:
118    /// - Only works for records with `producer_count == 0`
119    /// - Returns error if the record has active producers
120    ///
121    /// # Arguments
122    /// * `record_name` - The full Rust type name (e.g., "server::AppConfig")
123    /// * `json_value` - JSON representation of the value
124    ///
125    /// # Returns
126    /// - `Ok(())` - Successfully set the value
127    /// - `Err(DbError)` - If record not found, has producers, or deserialization fails
128    ///
129    /// # Errors
130    /// - `RecordNotFound` - Record with given name doesn't exist
131    /// - `PermissionDenied` - Record has active producers (safety check)
132    /// - `RuntimeError` - Record not configured with `.with_serialization()`
133    /// - `JsonWithContext` - JSON deserialization failed (schema mismatch)
134    ///
135    /// # Example (internal use - called by remote access protocol)
136    /// ```rust,ignore
137    /// let json_val = serde_json::json!({"log_level": "debug", "version": "1.0"});
138    /// db.set_record_from_json("server::AppConfig", json_val)?;
139    /// ```
140    #[cfg(feature = "std")]
141    pub fn set_record_from_json(
142        &self,
143        record_name: &str,
144        json_value: serde_json::Value,
145    ) -> DbResult<()> {
146        // Find the record by name
147        for (type_id, record) in &self.records {
148            let metadata = record.collect_metadata(*type_id);
149            if metadata.name == record_name {
150                // Delegate to the type-erased set_from_json method
151                // which will enforce the "no producer override" rule
152                return record.set_from_json(json_value);
153            }
154        }
155
156        // Record not found
157        Err(DbError::RecordNotFound {
158            record_name: record_name.to_string(),
159        })
160    }
161}
162
163/// Database builder for producer-consumer pattern
164///
165/// Provides a fluent API for constructing databases with type-safe record registration.
166/// Use `.runtime()` to set the runtime and transition to a typed builder.
167pub struct AimDbBuilder<R = NoRuntime> {
168    /// Registry of typed records
169    records: BTreeMap<TypeId, Box<dyn AnyRecord>>,
170
171    /// Runtime adapter
172    runtime: Option<Arc<R>>,
173
174    /// Connectors indexed by scheme (mqtt, shmem, kafka, etc.)
175    pub(crate) connectors: BTreeMap<String, Arc<dyn crate::transport::Connector>>,
176
177    /// Spawn functions indexed by TypeId
178    spawn_fns: BTreeMap<TypeId, Box<dyn core::any::Any + Send>>,
179
180    /// Remote access configuration (std only)
181    #[cfg(feature = "std")]
182    remote_config: Option<crate::remote::AimxConfig>,
183
184    /// PhantomData to track the runtime type parameter
185    _phantom: PhantomData<R>,
186}
187
188impl AimDbBuilder<NoRuntime> {
189    /// Creates a new database builder without a runtime
190    ///
191    /// Call `.runtime()` to set the runtime adapter.
192    pub fn new() -> Self {
193        Self {
194            records: BTreeMap::new(),
195            runtime: None,
196            connectors: BTreeMap::new(),
197            spawn_fns: BTreeMap::new(),
198            #[cfg(feature = "std")]
199            remote_config: None,
200            _phantom: PhantomData,
201        }
202    }
203
204    /// Sets the runtime adapter
205    ///
206    /// This transitions the builder from untyped to typed with concrete runtime `R`.
207    pub fn runtime<R>(self, rt: Arc<R>) -> AimDbBuilder<R>
208    where
209        R: aimdb_executor::Spawn + 'static,
210    {
211        AimDbBuilder {
212            records: self.records,
213            runtime: Some(rt),
214            connectors: self.connectors,
215            spawn_fns: BTreeMap::new(),
216            #[cfg(feature = "std")]
217            remote_config: None,
218            _phantom: PhantomData,
219        }
220    }
221}
222
223impl<R> AimDbBuilder<R>
224where
225    R: aimdb_executor::Spawn + 'static,
226{
227    /// Registers a connector for a specific URL scheme
228    ///
229    /// The scheme (e.g., "mqtt", "shmem", "kafka") determines how `.link()` URLs
230    /// are routed. Each scheme can have ONE connector, which manages connections to
231    /// a specific endpoint (broker, segment, cluster, etc.).
232    ///
233    /// # Arguments
234    /// * `scheme` - URL scheme without "://" (e.g., "mqtt", "shmem", "kafka")
235    /// * `connector` - Connector implementing the Connector trait
236    ///
237    /// # Example
238    ///
239    /// ```rust,ignore
240    /// use aimdb_mqtt_connector::MqttConnector;
241    /// use std::sync::Arc;
242    ///
243    /// let mqtt_connector = MqttConnector::new("mqtt://broker.local:1883").await?;
244    /// let shmem_connector = ShmemConnector::new("/dev/shm/aimdb");
245    ///
246    /// let builder = AimDbBuilder::new()
247    ///     .runtime(runtime)
248    ///     .with_connector("mqtt", Arc::new(mqtt_connector))
249    ///     .with_connector("shmem", Arc::new(shmem_connector));
250    ///
251    /// // Now .link() can route to either:
252    /// //   .link("mqtt://sensors/temp")  → mqtt_connector
253    /// //   .link("shmem://temp_data")    → shmem_connector
254    /// ```
255    pub fn with_connector(
256        mut self,
257        scheme: impl Into<String>,
258        connector: Arc<dyn crate::transport::Connector>,
259    ) -> Self {
260        self.connectors.insert(scheme.into(), connector);
261        self
262    }
263
264    /// Enables remote access via AimX protocol (std only)
265    ///
266    /// Configures the database to accept remote connections over a Unix domain socket,
267    /// allowing external clients to introspect records, subscribe to updates, and
268    /// (optionally) write data.
269    ///
270    /// The remote access supervisor will be spawned automatically during `build()`.
271    ///
272    /// # Arguments
273    /// * `config` - Remote access configuration (socket path, security policy, etc.)
274    ///
275    /// # Example
276    ///
277    /// ```rust,ignore
278    /// use aimdb_core::remote::{AimxConfig, SecurityPolicy};
279    ///
280    /// let config = AimxConfig::new("/tmp/aimdb.sock")
281    ///     .with_security(SecurityPolicy::read_only());
282    ///
283    /// let db = AimDbBuilder::new()
284    ///     .runtime(runtime)
285    ///     .with_remote_access(config)
286    ///     .build()?;
287    /// ```
288    #[cfg(feature = "std")]
289    pub fn with_remote_access(mut self, config: crate::remote::AimxConfig) -> Self {
290        self.remote_config = Some(config);
291        self
292    }
293
294    /// Configures a record type manually
295    ///
296    /// Low-level method for advanced use cases. Most users should use `register_record` instead.
297    pub fn configure<T>(
298        &mut self,
299        f: impl for<'a> FnOnce(&'a mut RecordRegistrar<'a, T, R>),
300    ) -> &mut Self
301    where
302        T: Send + Sync + 'static + Debug + Clone,
303    {
304        let entry = self
305            .records
306            .entry(TypeId::of::<T>())
307            .or_insert_with(|| Box::new(TypedRecord::<T, R>::new()));
308
309        let rec = entry
310            .as_typed_mut::<T, R>()
311            .expect("type mismatch in record registry");
312
313        let mut reg = RecordRegistrar {
314            rec,
315            connectors: &self.connectors,
316        };
317        f(&mut reg);
318
319        // Store a spawn function that captures the concrete type T
320        let type_id = TypeId::of::<T>();
321        #[allow(clippy::type_complexity)]
322        let spawn_fn: Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>) -> DbResult<()> + Send> =
323            Box::new(move |runtime: &Arc<R>, db: &Arc<AimDb<R>>| {
324                // Use RecordSpawner to spawn tasks for this record type
325                use crate::typed_record::RecordSpawner;
326
327                let typed_record = db.inner().get_typed_record::<T, R>()?;
328                RecordSpawner::<T>::spawn_all_tasks(typed_record, runtime, db)
329            });
330
331        // Store the spawn function (type-erased in Box<dyn Any>)
332        self.spawn_fns.insert(type_id, Box::new(spawn_fn));
333
334        self
335    }
336
337    /// Registers a self-registering record type
338    ///
339    /// The record type must implement `RecordT<R>`.
340    pub fn register_record<T>(&mut self, cfg: &T::Config) -> &mut Self
341    where
342        T: RecordT<R>,
343    {
344        self.configure::<T>(|reg| T::register(reg, cfg))
345    }
346
347    /// Runs the database indefinitely (never returns)
348    ///
349    /// This method builds the database, spawns all producer and consumer tasks, and then
350    /// parks the current task indefinitely. This is the primary way to run AimDB services.
351    ///
352    /// All logic runs in background tasks via producers, consumers, and connectors. The
353    /// application continues until interrupted (e.g., Ctrl+C).
354    ///
355    /// # Returns
356    /// `DbResult<()>` - Ok when database starts successfully, then parks forever
357    ///
358    /// # Example
359    ///
360    /// ```rust,ignore
361    /// #[tokio::main]
362    /// async fn main() -> DbResult<()> {
363    ///     AimDbBuilder::new()
364    ///         .runtime(Arc::new(TokioAdapter::new()?))
365    ///         .configure::<MyData>(|reg| {
366    ///             reg.with_buffer(BufferCfg::SpmcRing { capacity: 100 })
367    ///                .with_source(my_producer)
368    ///                .with_tap(my_consumer);
369    ///         })
370    ///         .run().await  // Runs forever
371    /// }
372    /// ```
373    pub async fn run(self) -> DbResult<()> {
374        // Build the database and spawn all tasks
375        let _db = self.build()?;
376
377        #[cfg(feature = "tracing")]
378        tracing::info!("Database running, background tasks active. Press Ctrl+C to stop.");
379
380        // Park indefinitely - the background tasks will continue running
381        // The database handle is kept alive here to prevent dropping it
382        core::future::pending::<()>().await;
383
384        Ok(())
385    }
386
387    /// Builds the database and returns the handle (advanced use)
388    ///
389    /// Use this when you need programmatic access to the database handle for
390    /// manual subscriptions or production. For typical services, use `.run().await` instead.
391    ///
392    /// **Automatic Task Spawning:** This method spawns all producer services and
393    /// `.tap()` observer tasks that were registered during configuration.
394    ///
395    /// # Returns
396    /// `DbResult<AimDb<R>>` - The database instance
397    ///
398    /// # Example
399    ///
400    /// ```rust,ignore
401    /// let db = AimDbBuilder::new()
402    ///     .runtime(Arc::new(TokioAdapter::new()?))
403    ///     .configure::<MyData>(|reg| { /* ... */ })
404    ///     .build()?;
405    ///
406    /// // Manually subscribe or produce
407    /// let mut reader = db.subscribe::<MyData>()?;
408    /// ```
409    pub fn build(self) -> DbResult<AimDb<R>> {
410        use crate::DbError;
411
412        // Validate all records
413        for record in self.records.values() {
414            record.validate().map_err(|_msg| {
415                #[cfg(feature = "std")]
416                {
417                    DbError::RuntimeError {
418                        message: format!("Record validation failed: {}", _msg),
419                    }
420                }
421                #[cfg(not(feature = "std"))]
422                {
423                    DbError::RuntimeError { _message: () }
424                }
425            })?;
426        }
427
428        // Ensure runtime is set
429        let runtime = self.runtime.ok_or({
430            #[cfg(feature = "std")]
431            {
432                DbError::RuntimeError {
433                    message: "runtime not set (use .runtime())".into(),
434                }
435            }
436            #[cfg(not(feature = "std"))]
437            {
438                DbError::RuntimeError { _message: () }
439            }
440        })?;
441
442        let inner = Arc::new(AimDbInner {
443            records: self.records,
444        });
445
446        let db = Arc::new(AimDb {
447            inner: inner.clone(),
448            runtime: runtime.clone(),
449        });
450
451        #[cfg(feature = "tracing")]
452        tracing::info!(
453            "Spawning producer services and tap observers for {} record types",
454            self.spawn_fns.len()
455        );
456
457        // Execute spawn functions for each record type
458        for (_type_id, spawn_fn_any) in self.spawn_fns {
459            // Downcast from Box<dyn Any> back to the concrete spawn function type
460            type SpawnFnType<R> = Box<dyn FnOnce(&Arc<R>, &Arc<AimDb<R>>) -> DbResult<()> + Send>;
461
462            let spawn_fn = spawn_fn_any
463                .downcast::<SpawnFnType<R>>()
464                .expect("spawn function type mismatch");
465
466            // Execute the spawn function
467            (*spawn_fn)(&runtime, &db)?;
468        }
469
470        #[cfg(feature = "tracing")]
471        tracing::info!("Automatic spawning complete");
472
473        // Spawn remote access supervisor if configured (std only)
474        #[cfg(feature = "std")]
475        if let Some(remote_cfg) = self.remote_config {
476            #[cfg(feature = "tracing")]
477            tracing::info!(
478                "Spawning remote access supervisor on socket: {}",
479                remote_cfg.socket_path.display()
480            );
481
482            // Apply security policy to mark writable records
483            let writable_type_ids = remote_cfg.security_policy.writable_records();
484            for (type_id, record) in inner.records.iter() {
485                if writable_type_ids.contains(type_id) {
486                    #[cfg(feature = "tracing")]
487                    tracing::debug!("Marking record {:?} as writable", type_id);
488
489                    // Mark the record as writable (type-erased call)
490                    record.set_writable_erased(true);
491                }
492            }
493
494            // Spawn the remote supervisor task
495            // This will be implemented in Task 6
496            crate::remote::supervisor::spawn_supervisor(db.clone(), runtime.clone(), remote_cfg)?;
497
498            #[cfg(feature = "tracing")]
499            tracing::info!("Remote access supervisor spawned successfully");
500        }
501
502        // Unwrap the Arc to return the owned AimDb
503        // This is safe because we just created it and hold the only reference
504        let db_owned = Arc::try_unwrap(db).unwrap_or_else(|arc| (*arc).clone());
505
506        Ok(db_owned)
507    }
508}
509
510impl Default for AimDbBuilder<NoRuntime> {
511    fn default() -> Self {
512        Self::new()
513    }
514}
515
516/// Producer-consumer database
517///
518/// A database instance with type-safe record registration and cross-record
519/// communication via the Emitter pattern. The type parameter `R` represents
520/// the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
521///
522/// See `examples/` for usage.
523///
524/// # Examples
525///
526/// ```rust,ignore
527/// use aimdb_tokio_adapter::TokioAdapter;
528///
529/// let runtime = Arc::new(TokioAdapter);
530/// let db: AimDb<TokioAdapter> = AimDbBuilder::new()
531///     .runtime(runtime)
532///     .register_record::<Temperature>(&TemperatureConfig)
533///     .build()?;
534/// ```
535pub struct AimDb<R: aimdb_executor::Spawn + 'static> {
536    /// Internal state
537    inner: Arc<AimDbInner>,
538
539    /// Runtime adapter with concrete type
540    runtime: Arc<R>,
541}
542
543impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
544    fn clone(&self) -> Self {
545        Self {
546            inner: self.inner.clone(),
547            runtime: self.runtime.clone(),
548        }
549    }
550}
551
552impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
553    /// Internal accessor for the inner state
554    ///
555    /// Used by adapter crates and internal spawning logic.
556    #[doc(hidden)]
557    pub fn inner(&self) -> &Arc<AimDbInner> {
558        &self.inner
559    }
560
561    /// Builds a database with a closure-based builder pattern
562    pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
563        let mut b = AimDbBuilder::new().runtime(rt);
564        f(&mut b);
565        b.run().await
566    }
567
568    /// Spawns a task using the database's runtime adapter
569    ///
570    /// This method provides direct access to the runtime's spawn capability.
571    ///
572    /// # Arguments
573    /// * `future` - The future to spawn
574    ///
575    /// # Returns
576    /// `DbResult<()>` - Ok if the task was spawned successfully
577    pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
578    where
579        F: core::future::Future<Output = ()> + Send + 'static,
580    {
581        self.runtime.spawn(future).map_err(DbError::from)?;
582        Ok(())
583    }
584
585    /// Produces a value for a record type
586    ///
587    /// Writes the value to the record's buffer and triggers all consumers.
588    pub async fn produce<T>(&self, value: T) -> DbResult<()>
589    where
590        T: Send + 'static + Debug + Clone,
591    {
592        // Get the typed record using the helper
593        let typed_rec = self.inner.get_typed_record::<T, R>()?;
594
595        // Produce the value directly to the buffer
596        typed_rec.produce(value).await;
597        Ok(())
598    }
599
600    /// Subscribes to a record type's buffer
601    ///
602    /// Creates a subscription to the configured buffer for the given record type.
603    /// Returns a boxed reader for receiving values asynchronously.
604    ///
605    /// # Example
606    ///
607    /// ```rust,ignore
608    /// let mut reader = db.subscribe::<Temperature>()?;
609    ///
610    /// loop {
611    ///     match reader.recv().await {
612    ///         Ok(temp) => println!("Temperature: {:.1}°C", temp.celsius),
613    ///         Err(_) => break,
614    ///     }
615    /// }
616    /// ```
617    pub fn subscribe<T>(&self) -> DbResult<Box<dyn crate::buffer::BufferReader<T> + Send>>
618    where
619        T: Send + Sync + 'static + Debug + Clone,
620    {
621        // Get the typed record using the helper
622        let typed_rec = self.inner.get_typed_record::<T, R>()?;
623
624        // Subscribe to the buffer
625        typed_rec.subscribe()
626    }
627
628    /// Creates a type-safe producer for a specific record type
629    ///
630    /// Returns a `Producer<T, R>` that can only produce values of type `T`.
631    /// This is the recommended way to pass database access to producer services,
632    /// following the principle of least privilege.
633    ///
634    /// # Example
635    ///
636    /// ```rust,ignore
637    /// let db = builder.build()?;
638    /// let temp_producer = db.producer::<Temperature>();
639    ///
640    /// // Pass to service - it can only produce Temperature values
641    /// runtime.spawn(temperature_service(ctx, temp_producer)).unwrap();
642    /// ```
643    pub fn producer<T>(&self) -> crate::typed_api::Producer<T, R>
644    where
645        T: Send + 'static + Debug + Clone,
646    {
647        crate::typed_api::Producer::new(Arc::new(self.clone()))
648    }
649
650    /// Creates a type-safe consumer for a specific record type
651    ///
652    /// Returns a `Consumer<T, R>` that can only subscribe to values of type `T`.
653    /// This is the recommended way to pass database access to consumer services,
654    /// following the principle of least privilege.
655    ///
656    /// # Example
657    ///
658    /// ```rust,ignore
659    /// let db = builder.build()?;
660    /// let temp_consumer = db.consumer::<Temperature>();
661    ///
662    /// // Pass to service - it can only consume Temperature values
663    /// runtime.spawn(temperature_monitor(ctx, temp_consumer)).unwrap();
664    /// ```
665    pub fn consumer<T>(&self) -> crate::typed_api::Consumer<T, R>
666    where
667        T: Send + Sync + 'static + Debug + Clone,
668    {
669        crate::typed_api::Consumer::new(Arc::new(self.clone()))
670    }
671
672    /// Returns a reference to the runtime adapter
673    ///
674    /// Provides direct access to the concrete runtime type.
675    pub fn runtime(&self) -> &R {
676        &self.runtime
677    }
678
679    /// Lists all registered records (std only)
680    ///
681    /// Returns metadata for all registered records, useful for remote access introspection.
682    /// Available only when the `std` feature is enabled.
683    ///
684    /// # Example
685    /// ```rust,ignore
686    /// let records = db.list_records();
687    /// for record in records {
688    ///     println!("Record: {} ({})", record.name, record.type_id);
689    /// }
690    /// ```
691    #[cfg(feature = "std")]
692    pub fn list_records(&self) -> Vec<crate::remote::RecordMetadata> {
693        self.inner.list_records()
694    }
695
696    /// Try to get record's latest value as JSON by name (std only)
697    ///
698    /// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
699    ///
700    /// # Arguments
701    /// * `record_name` - The full Rust type name (e.g., "server::Temperature")
702    ///
703    /// # Returns
704    /// `Some(JsonValue)` with current value, or `None` if unavailable
705    #[cfg(feature = "std")]
706    pub fn try_latest_as_json(&self, record_name: &str) -> Option<serde_json::Value> {
707        self.inner.try_latest_as_json(record_name)
708    }
709
710    /// Sets a record value from JSON (remote access API)
711    ///
712    /// Deserializes JSON and produces the value to the record's buffer.
713    ///
714    /// **SAFETY:** Enforces "No Producer Override" rule - only works for configuration
715    /// records without active producers.
716    ///
717    /// # Arguments
718    /// * `record_name` - Full Rust type name
719    /// * `json_value` - JSON value to set
720    ///
721    /// # Returns
722    /// `Ok(())` on success, error if record not found, has producers, or deserialization fails
723    ///
724    /// # Example (internal use)
725    /// ```rust,ignore
726    /// db.set_record_from_json("AppConfig", json!({"debug": true}))?;
727    /// ```
728    #[cfg(feature = "std")]
729    pub fn set_record_from_json(
730        &self,
731        record_name: &str,
732        json_value: serde_json::Value,
733    ) -> DbResult<()> {
734        self.inner.set_record_from_json(record_name, json_value)
735    }
736
737    /// Subscribe to record updates as JSON stream (std only)
738    ///
739    /// Creates a subscription to a record's buffer and forwards updates as JSON
740    /// to a bounded channel. This is used internally by the remote access protocol
741    /// for implementing `record.subscribe`.
742    ///
743    /// # Architecture
744    ///
745    /// Spawns a consumer task that:
746    /// 1. Subscribes to the record's buffer using the existing buffer API
747    /// 2. Reads values as they arrive
748    /// 3. Serializes each value to JSON
749    /// 4. Sends JSON values to a bounded channel (with backpressure handling)
750    /// 5. Terminates when either:
751    ///    - The cancel signal is received (unsubscribe)
752    ///    - The channel receiver is dropped (client disconnected)
753    ///
754    /// # Arguments
755    /// * `type_id` - TypeId of the record to subscribe to
756    /// * `queue_size` - Size of the bounded channel for this subscription
757    ///
758    /// # Returns
759    /// `Ok((receiver, cancel_tx))` where:
760    /// - `receiver`: Bounded channel receiver for JSON values
761    /// - `cancel_tx`: One-shot sender to cancel the subscription
762    ///
763    /// `Err` if:
764    /// - Record not found for the given TypeId
765    /// - Record not configured with `.with_serialization()`
766    /// - Failed to subscribe to buffer
767    ///
768    /// # Example (internal use)
769    ///
770    /// ```rust,ignore
771    /// let type_id = TypeId::of::<Temperature>();
772    /// let (mut rx, cancel_tx) = db.subscribe_record_updates(type_id, 100)?;
773    ///
774    /// // Read events
775    /// while let Some(json_value) = rx.recv().await {
776    ///     // Forward to client...
777    /// }
778    ///
779    /// // Cancel subscription
780    /// let _ = cancel_tx.send(());
781    /// ```
782    #[cfg(feature = "std")]
783    #[allow(unused_variables)] // Variables used only in tracing feature
784    pub fn subscribe_record_updates(
785        &self,
786        type_id: TypeId,
787        queue_size: usize,
788    ) -> DbResult<(
789        tokio::sync::mpsc::Receiver<serde_json::Value>,
790        tokio::sync::oneshot::Sender<()>,
791    )> {
792        use tokio::sync::{mpsc, oneshot};
793
794        // Find the record by TypeId
795        let record = self
796            .inner
797            .records
798            .get(&type_id)
799            .ok_or(DbError::RecordNotFound {
800                record_name: format!("TypeId({:?})", type_id),
801            })?;
802
803        // Subscribe to the record's buffer as JSON stream
804        // This will fail if record not configured with .with_serialization()
805        let mut json_reader = record.subscribe_json()?;
806
807        // Create channels for the subscription
808        let (value_tx, value_rx) = mpsc::channel(queue_size);
809        let (cancel_tx, mut cancel_rx) = oneshot::channel();
810
811        // Get metadata for logging
812        let record_metadata = record.collect_metadata(type_id);
813        let runtime = self.runtime.clone();
814
815        // Spawn consumer task that forwards JSON values from buffer to channel
816        let spawn_result = runtime.spawn(async move {
817            #[cfg(feature = "tracing")]
818            tracing::debug!(
819                "Subscription consumer task started for {}",
820                record_metadata.name
821            );
822
823            // Main event loop: read from buffer and forward to channel
824            loop {
825                tokio::select! {
826                    // Handle cancellation signal
827                    _ = &mut cancel_rx => {
828                        #[cfg(feature = "tracing")]
829                        tracing::debug!("Subscription cancelled");
830                        break;
831                    }
832                    // Read next JSON value from buffer
833                    result = json_reader.recv_json() => {
834                        match result {
835                            Ok(json_val) => {
836                                // Send JSON value to subscription channel
837                                if value_tx.send(json_val).await.is_err() {
838                                    #[cfg(feature = "tracing")]
839                                    tracing::debug!("Subscription receiver dropped");
840                                    break;
841                                }
842                            }
843                            Err(DbError::BufferLagged { lag_count, .. }) => {
844                                // Consumer fell behind - log warning but continue
845                                #[cfg(feature = "tracing")]
846                                tracing::warn!(
847                                    "Subscription for {} lagged by {} messages",
848                                    record_metadata.name,
849                                    lag_count
850                                );
851                                // Continue reading - next recv will get latest
852                            }
853                            Err(DbError::BufferClosed { .. }) => {
854                                // Buffer closed (shutdown) - exit gracefully
855                                #[cfg(feature = "tracing")]
856                                tracing::debug!("Buffer closed for {}", record_metadata.name);
857                                break;
858                            }
859                            Err(e) => {
860                                // Other error (shouldn't happen in practice)
861                                #[cfg(feature = "tracing")]
862                                tracing::error!(
863                                    "Subscription error for {}: {:?}",
864                                    record_metadata.name,
865                                    e
866                                );
867                                break;
868                            }
869                        }
870                    }
871                }
872            }
873
874            #[cfg(feature = "tracing")]
875            tracing::debug!("Subscription consumer task terminated");
876        });
877
878        spawn_result.map_err(DbError::from)?;
879
880        Ok((value_rx, cancel_tx))
881    }
882}
883
884#[cfg(test)]
885mod tests {
886    // NOTE: Tests commented out because RecordT<R> now requires R: aimdb_executor::Spawn,
887    // and we can't use () as a dummy runtime. See examples/ for working tests with real runtimes.
888
889    /*
890    use super::*;
891
892    #[derive(Debug, Clone, PartialEq)]
893    struct TestData {
894        value: i32,
895    }
896
897    struct TestConfig;
898
899    impl RecordT<R> for TestData {
900        type Config = TestConfig;
901
902        fn register<'a>(reg: &'a mut RecordRegistrar<'a, Self, R>, _cfg: &Self::Config) {
903            reg.source(|_prod, _ctx| async {}).tap(|_consumer| async {});
904        }
905    }
906
907    #[test]
908    fn test_builder_basic() {
909        let mut builder = AimDbBuilder::new();
910        builder.register_record::<TestData>(&TestConfig);
911
912        // Should have one record registered
913        assert_eq!(builder.records.len(), 1);
914    }
915
916    #[test]
917    fn test_builder_configure() {
918        let mut builder = AimDbBuilder::new();
919        builder.configure::<TestData>(|reg| {
920            reg.source(|_prod, _ctx| async {}).tap(|_consumer| async {});
921        });
922
923        assert_eq!(builder.records.len(), 1);
924    }
925    */
926}