pub struct AimDb<R: Spawn + 'static> { /* private fields */ }Expand description
Producer-consumer database
A database instance with type-safe record registration and cross-record
communication via the Emitter pattern. The type parameter R represents
the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
See examples/ for usage.
§Examples
use aimdb_tokio_adapter::TokioAdapter;
let runtime = Arc::new(TokioAdapter);
let db: AimDb<TokioAdapter> = AimDbBuilder::new()
.runtime(runtime)
.register_record::<Temperature>(&TemperatureConfig)
.build()?;Implementations§
Source§impl<R: Spawn + 'static> AimDb<R>
impl<R: Spawn + 'static> AimDb<R>
Sourcepub async fn build_with(
rt: Arc<R>,
f: impl FnOnce(&mut AimDbBuilder<R>),
) -> DbResult<()>
pub async fn build_with( rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>), ) -> DbResult<()>
Builds a database with a closure-based builder pattern
Sourcepub fn spawn_task<F>(&self, future: F) -> DbResult<()>
pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
Sourcepub async fn produce<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
pub async fn produce<T>(&self, key: impl AsRef<str>, value: T) -> DbResult<()>
Produces a value to a specific record by key
Uses O(1) key-based lookup to find the correct record.
§Arguments
key- The record key (e.g., “sensor.temperature”)value- The value to produce
§Example
db.produce::<Temperature>("sensors.indoor", indoor_temp).await?;
db.produce::<Temperature>("sensors.outdoor", outdoor_temp).await?;Sourcepub fn subscribe<T>(
&self,
key: impl AsRef<str>,
) -> DbResult<Box<dyn BufferReader<T> + Send>>
pub fn subscribe<T>( &self, key: impl AsRef<str>, ) -> DbResult<Box<dyn BufferReader<T> + Send>>
Subscribes to a specific record by key
Uses O(1) key-based lookup to find the correct record.
§Arguments
key- The record key (e.g., “sensor.temperature”)
§Example
let mut reader = db.subscribe::<Temperature>("sensors.indoor")?;
while let Ok(temp) = reader.recv().await {
println!("Indoor: {:.1}°C", temp.celsius);
}Sourcepub fn producer<T>(&self, key: impl Into<String>) -> Producer<T, R>
pub fn producer<T>(&self, key: impl Into<String>) -> Producer<T, R>
Creates a type-safe producer for a specific record by key
Returns a Producer<T, R> bound to a specific record key.
§Arguments
key- The record key (e.g., “sensor.temperature”)
§Example
let indoor_producer = db.producer::<Temperature>("sensors.indoor");
let outdoor_producer = db.producer::<Temperature>("sensors.outdoor");
// Each producer writes to its own record
indoor_producer.produce(indoor_temp).await?;
outdoor_producer.produce(outdoor_temp).await?;Sourcepub fn consumer<T>(&self, key: impl Into<String>) -> Consumer<T, R>
pub fn consumer<T>(&self, key: impl Into<String>) -> Consumer<T, R>
Creates a type-safe consumer for a specific record by key
Returns a Consumer<T, R> bound to a specific record key.
§Arguments
key- The record key (e.g., “sensor.temperature”)
§Example
let indoor_consumer = db.consumer::<Temperature>("sensors.indoor");
let outdoor_consumer = db.consumer::<Temperature>("sensors.outdoor");
// Each consumer reads from its own record
let mut rx = indoor_consumer.subscribe()?;Sourcepub fn resolve_key(&self, key: &str) -> Option<RecordId>
pub fn resolve_key(&self, key: &str) -> Option<RecordId>
Sourcepub fn records_of_type<T: 'static>(&self) -> &[RecordId]
pub fn records_of_type<T: 'static>(&self) -> &[RecordId]
Sourcepub fn runtime(&self) -> &R
pub fn runtime(&self) -> &R
Returns a reference to the runtime adapter
Provides direct access to the concrete runtime type.
Sourcepub fn list_records(&self) -> Vec<RecordMetadata>
pub fn list_records(&self) -> Vec<RecordMetadata>
Sourcepub fn try_latest_as_json(&self, record_name: &str) -> Option<Value>
pub fn try_latest_as_json(&self, record_name: &str) -> Option<Value>
Sourcepub fn set_record_from_json(
&self,
record_name: &str,
json_value: Value,
) -> DbResult<()>
pub fn set_record_from_json( &self, record_name: &str, json_value: Value, ) -> DbResult<()>
Sets a record value from JSON (remote access API)
Deserializes JSON and produces the value to the record’s buffer.
SAFETY: Enforces “No Producer Override” rule - only works for configuration records without active producers.
§Arguments
record_name- Full Rust type namejson_value- JSON value to set
§Returns
Ok(()) on success, error if record not found, has producers, or deserialization fails
§Example (internal use)
db.set_record_from_json("AppConfig", json!({"debug": true}))?;Sourcepub fn subscribe_record_updates(
&self,
record_key: &str,
queue_size: usize,
) -> DbResult<(Receiver<Value>, Sender<()>)>
pub fn subscribe_record_updates( &self, record_key: &str, queue_size: usize, ) -> DbResult<(Receiver<Value>, Sender<()>)>
Subscribe to record updates as JSON stream (std only)
Creates a subscription to a record’s buffer and forwards updates as JSON
to a bounded channel. This is used internally by the remote access protocol
for implementing record.subscribe.
§Architecture
Spawns a consumer task that:
- Subscribes to the record’s buffer using the existing buffer API
- Reads values as they arrive
- Serializes each value to JSON
- Sends JSON values to a bounded channel (with backpressure handling)
- Terminates when either:
- The cancel signal is received (unsubscribe)
- The channel receiver is dropped (client disconnected)
§Arguments
record_key- Key of the record to subscribe toqueue_size- Size of the bounded channel for this subscription
§Returns
Ok((receiver, cancel_tx)) where:
receiver: Bounded channel receiver for JSON valuescancel_tx: One-shot sender to cancel the subscription
Err if:
- Record not found for the given key
- Record not configured with
.with_serialization() - Failed to subscribe to buffer
§Example (internal use)
let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
// Read events
while let Some(json_value) = rx.recv().await {
// Forward to client...
}
// Cancel subscription
let _ = cancel_tx.send(());Sourcepub fn collect_inbound_routes(
&self,
scheme: &str,
) -> Vec<(String, Box<dyn ProducerTrait>, DeserializerFn)>
pub fn collect_inbound_routes( &self, scheme: &str, ) -> Vec<(String, Box<dyn ProducerTrait>, DeserializerFn)>
Collects inbound connector routes for automatic router construction (std only)
Iterates all records, filters their inbound_connectors by scheme, and returns routes with producer creation callbacks.
§Arguments
scheme- URL scheme to filter by (e.g., “mqtt”, “kafka”)
§Returns
Vector of tuples: (topic, producer_trait, deserializer)
§Example
// In MqttConnector after db.build()
let routes = db.collect_inbound_routes("mqtt");
let router = RouterBuilder::from_routes(routes).build();
connector.set_router(router).await?;Sourcepub fn collect_outbound_routes(
&self,
scheme: &str,
) -> Vec<(String, Box<dyn ConsumerTrait>, SerializerFn, Vec<(String, String)>)>
pub fn collect_outbound_routes( &self, scheme: &str, ) -> Vec<(String, Box<dyn ConsumerTrait>, SerializerFn, Vec<(String, String)>)>
Collects outbound routes for a specific protocol scheme
Mirrors collect_inbound_routes() for symmetry. Iterates all records,
filters their outbound_connectors by scheme, and returns routes with
consumer creation callbacks.
This method is called by connectors during their build() phase to
collect all configured outbound routes and spawn publisher tasks.
§Arguments
scheme- URL scheme to filter by (e.g., “mqtt”, “kafka”)
§Returns
Vector of tuples: (destination, consumer_trait, serializer, config)
The config Vec contains protocol-specific options (e.g., qos, retain).
§Example
// In MqttConnector::build()
let routes = db.collect_outbound_routes("mqtt");
for (topic, consumer, serializer, config) in routes {
connector.spawn_publisher(topic, consumer, serializer, config)?;
}