pub struct AimDb<R: Spawn + 'static> { /* private fields */ }Expand description
Producer-consumer database
A database instance with type-safe record registration and cross-record
communication via the Emitter pattern. The type parameter R represents
the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
See examples/ for usage.
§Examples
use aimdb_tokio_adapter::TokioAdapter;
let runtime = Arc::new(TokioAdapter);
let db: AimDb<TokioAdapter> = AimDbBuilder::new()
.runtime(runtime)
.register_record::<Temperature>(&TemperatureConfig)
.build()?;Implementations§
Source§impl<R: Spawn + 'static> AimDb<R>
impl<R: Spawn + 'static> AimDb<R>
Sourcepub async fn build_with(
rt: Arc<R>,
f: impl FnOnce(&mut AimDbBuilder<R>),
) -> DbResult<()>
pub async fn build_with( rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>), ) -> DbResult<()>
Builds a database with a closure-based builder pattern
Sourcepub fn spawn_task<F>(&self, future: F) -> DbResult<()>
pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
Sourcepub async fn produce<T>(&self, value: T) -> DbResult<()>
pub async fn produce<T>(&self, value: T) -> DbResult<()>
Produces a value for a record type
Writes the value to the record’s buffer and triggers all consumers.
Sourcepub fn subscribe<T>(&self) -> DbResult<Box<dyn BufferReader<T> + Send>>
pub fn subscribe<T>(&self) -> DbResult<Box<dyn BufferReader<T> + Send>>
Subscribes to a record type’s buffer
Creates a subscription to the configured buffer for the given record type. Returns a boxed reader for receiving values asynchronously.
§Example
let mut reader = db.subscribe::<Temperature>()?;
loop {
match reader.recv().await {
Ok(temp) => println!("Temperature: {:.1}°C", temp.celsius),
Err(_) => break,
}
}Sourcepub fn producer<T>(&self) -> Producer<T, R>
pub fn producer<T>(&self) -> Producer<T, R>
Creates a type-safe producer for a specific record type
Returns a Producer<T, R> that can only produce values of type T.
This is the recommended way to pass database access to producer services,
following the principle of least privilege.
§Example
let db = builder.build()?;
let temp_producer = db.producer::<Temperature>();
// Pass to service - it can only produce Temperature values
runtime.spawn(temperature_service(ctx, temp_producer)).unwrap();Sourcepub fn consumer<T>(&self) -> Consumer<T, R>
pub fn consumer<T>(&self) -> Consumer<T, R>
Creates a type-safe consumer for a specific record type
Returns a Consumer<T, R> that can only subscribe to values of type T.
This is the recommended way to pass database access to consumer services,
following the principle of least privilege.
§Example
let db = builder.build()?;
let temp_consumer = db.consumer::<Temperature>();
// Pass to service - it can only consume Temperature values
runtime.spawn(temperature_monitor(ctx, temp_consumer)).unwrap();Sourcepub async fn produce_by_key<T>(
&self,
key: impl AsRef<str>,
value: T,
) -> DbResult<()>
pub async fn produce_by_key<T>( &self, key: impl AsRef<str>, value: T, ) -> DbResult<()>
Produces a value to a specific record by key
This is the recommended method when multiple records of the same type exist. Uses O(1) key-based lookup to find the correct record.
§Arguments
key- The record key (e.g., “sensor.temperature”)value- The value to produce
§Example
// With multiple Temperature records
db.produce_by_key::<Temperature>("sensors.indoor", indoor_temp).await?;
db.produce_by_key::<Temperature>("sensors.outdoor", outdoor_temp).await?;Sourcepub fn subscribe_by_key<T>(
&self,
key: impl AsRef<str>,
) -> DbResult<Box<dyn BufferReader<T> + Send>>
pub fn subscribe_by_key<T>( &self, key: impl AsRef<str>, ) -> DbResult<Box<dyn BufferReader<T> + Send>>
Subscribes to a specific record by key
This is the recommended method when multiple records of the same type exist. Uses O(1) key-based lookup to find the correct record.
§Arguments
key- The record key (e.g., “sensor.temperature”)
§Example
let mut reader = db.subscribe_by_key::<Temperature>("sensors.indoor")?;
while let Ok(temp) = reader.recv().await {
println!("Indoor: {:.1}°C", temp.celsius);
}Sourcepub fn producer_by_key<T>(&self, key: impl Into<String>) -> ProducerByKey<T, R>
pub fn producer_by_key<T>(&self, key: impl Into<String>) -> ProducerByKey<T, R>
Creates a type-safe producer for a specific record by key
Returns a ProducerByKey<T, R> bound to a specific record key.
Use this when multiple records of the same type exist.
§Arguments
key- The record key (e.g., “sensor.temperature”)
§Example
let indoor_producer = db.producer_by_key::<Temperature>("sensors.indoor");
let outdoor_producer = db.producer_by_key::<Temperature>("sensors.outdoor");
// Each producer writes to its own record
indoor_producer.produce(indoor_temp).await?;
outdoor_producer.produce(outdoor_temp).await?;Sourcepub fn consumer_by_key<T>(&self, key: impl Into<String>) -> ConsumerByKey<T, R>
pub fn consumer_by_key<T>(&self, key: impl Into<String>) -> ConsumerByKey<T, R>
Creates a type-safe consumer for a specific record by key
Returns a ConsumerByKey<T, R> bound to a specific record key.
Use this when multiple records of the same type exist.
§Arguments
key- The record key (e.g., “sensor.temperature”)
§Example
let indoor_consumer = db.consumer_by_key::<Temperature>("sensors.indoor");
let outdoor_consumer = db.consumer_by_key::<Temperature>("sensors.outdoor");
// Each consumer reads from its own record
let mut rx = indoor_consumer.subscribe()?;Sourcepub fn resolve_key(&self, key: &str) -> Option<RecordId>
pub fn resolve_key(&self, key: &str) -> Option<RecordId>
Sourcepub fn records_of_type<T: 'static>(&self) -> &[RecordId]
pub fn records_of_type<T: 'static>(&self) -> &[RecordId]
Sourcepub fn runtime(&self) -> &R
pub fn runtime(&self) -> &R
Returns a reference to the runtime adapter
Provides direct access to the concrete runtime type.
Sourcepub fn list_records(&self) -> Vec<RecordMetadata>
pub fn list_records(&self) -> Vec<RecordMetadata>
Sourcepub fn try_latest_as_json(&self, record_name: &str) -> Option<Value>
pub fn try_latest_as_json(&self, record_name: &str) -> Option<Value>
Sourcepub fn set_record_from_json(
&self,
record_name: &str,
json_value: Value,
) -> DbResult<()>
pub fn set_record_from_json( &self, record_name: &str, json_value: Value, ) -> DbResult<()>
Sets a record value from JSON (remote access API)
Deserializes JSON and produces the value to the record’s buffer.
SAFETY: Enforces “No Producer Override” rule - only works for configuration records without active producers.
§Arguments
record_name- Full Rust type namejson_value- JSON value to set
§Returns
Ok(()) on success, error if record not found, has producers, or deserialization fails
§Example (internal use)
db.set_record_from_json("AppConfig", json!({"debug": true}))?;Sourcepub fn subscribe_record_updates(
&self,
record_key: &str,
queue_size: usize,
) -> DbResult<(Receiver<Value>, Sender<()>)>
pub fn subscribe_record_updates( &self, record_key: &str, queue_size: usize, ) -> DbResult<(Receiver<Value>, Sender<()>)>
Subscribe to record updates as JSON stream (std only)
Creates a subscription to a record’s buffer and forwards updates as JSON
to a bounded channel. This is used internally by the remote access protocol
for implementing record.subscribe.
§Architecture
Spawns a consumer task that:
- Subscribes to the record’s buffer using the existing buffer API
- Reads values as they arrive
- Serializes each value to JSON
- Sends JSON values to a bounded channel (with backpressure handling)
- Terminates when either:
- The cancel signal is received (unsubscribe)
- The channel receiver is dropped (client disconnected)
§Arguments
record_key- Key of the record to subscribe toqueue_size- Size of the bounded channel for this subscription
§Returns
Ok((receiver, cancel_tx)) where:
receiver: Bounded channel receiver for JSON valuescancel_tx: One-shot sender to cancel the subscription
Err if:
- Record not found for the given key
- Record not configured with
.with_serialization() - Failed to subscribe to buffer
§Example (internal use)
let (mut rx, cancel_tx) = db.subscribe_record_updates("sensor.temp", 100)?;
// Read events
while let Some(json_value) = rx.recv().await {
// Forward to client...
}
// Cancel subscription
let _ = cancel_tx.send(());Sourcepub fn collect_inbound_routes(
&self,
scheme: &str,
) -> Vec<(String, Box<dyn ProducerTrait>, DeserializerFn)>
pub fn collect_inbound_routes( &self, scheme: &str, ) -> Vec<(String, Box<dyn ProducerTrait>, DeserializerFn)>
Collects inbound connector routes for automatic router construction (std only)
Iterates all records, filters their inbound_connectors by scheme, and returns routes with producer creation callbacks.
§Arguments
scheme- URL scheme to filter by (e.g., “mqtt”, “kafka”)
§Returns
Vector of tuples: (topic, producer_trait, deserializer)
§Example
// In MqttConnector after db.build()
let routes = db.collect_inbound_routes("mqtt");
let router = RouterBuilder::from_routes(routes).build();
connector.set_router(router).await?;Sourcepub fn collect_outbound_routes(
&self,
scheme: &str,
) -> Vec<(String, Box<dyn ConsumerTrait>, SerializerFn, Vec<(String, String)>)>
pub fn collect_outbound_routes( &self, scheme: &str, ) -> Vec<(String, Box<dyn ConsumerTrait>, SerializerFn, Vec<(String, String)>)>
Collects outbound routes for a specific protocol scheme
Mirrors collect_inbound_routes() for symmetry. Iterates all records,
filters their outbound_connectors by scheme, and returns routes with
consumer creation callbacks.
This method is called by connectors during their build() phase to
collect all configured outbound routes and spawn publisher tasks.
§Arguments
scheme- URL scheme to filter by (e.g., “mqtt”, “kafka”)
§Returns
Vector of tuples: (destination, consumer_trait, serializer, config)
The config Vec contains protocol-specific options (e.g., qos, retain).
§Example
// In MqttConnector::build()
let routes = db.collect_outbound_routes("mqtt");
for (topic, consumer, serializer, config) in routes {
connector.spawn_publisher(topic, consumer, serializer, config)?;
}