pub struct AimDb<R: Spawn + 'static> { /* private fields */ }Expand description
Producer-consumer database
A database instance with type-safe record registration and cross-record
communication via the Emitter pattern. The type parameter R represents
the runtime adapter (e.g., TokioAdapter, EmbassyAdapter).
See examples/ for usage.
§Examples
use aimdb_tokio_adapter::TokioAdapter;
let runtime = Arc::new(TokioAdapter);
let db: AimDb<TokioAdapter> = AimDbBuilder::new()
.runtime(runtime)
.register_record::<Temperature>(&TemperatureConfig)
.build()?;Implementations§
Source§impl<R: Spawn + 'static> AimDb<R>
impl<R: Spawn + 'static> AimDb<R>
Sourcepub async fn build_with(
rt: Arc<R>,
f: impl FnOnce(&mut AimDbBuilder<R>),
) -> DbResult<()>
pub async fn build_with( rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>), ) -> DbResult<()>
Builds a database with a closure-based builder pattern
Sourcepub fn spawn_task<F>(&self, future: F) -> DbResult<()>
pub fn spawn_task<F>(&self, future: F) -> DbResult<()>
Sourcepub async fn produce<T>(&self, value: T) -> DbResult<()>
pub async fn produce<T>(&self, value: T) -> DbResult<()>
Produces a value for a record type
Writes the value to the record’s buffer and triggers all consumers.
Sourcepub fn subscribe<T>(&self) -> DbResult<Box<dyn BufferReader<T> + Send>>
pub fn subscribe<T>(&self) -> DbResult<Box<dyn BufferReader<T> + Send>>
Subscribes to a record type’s buffer
Creates a subscription to the configured buffer for the given record type. Returns a boxed reader for receiving values asynchronously.
§Example
let mut reader = db.subscribe::<Temperature>()?;
loop {
match reader.recv().await {
Ok(temp) => println!("Temperature: {:.1}°C", temp.celsius),
Err(_) => break,
}
}Sourcepub fn producer<T>(&self) -> Producer<T, R>
pub fn producer<T>(&self) -> Producer<T, R>
Creates a type-safe producer for a specific record type
Returns a Producer<T, R> that can only produce values of type T.
This is the recommended way to pass database access to producer services,
following the principle of least privilege.
§Example
let db = builder.build()?;
let temp_producer = db.producer::<Temperature>();
// Pass to service - it can only produce Temperature values
runtime.spawn(temperature_service(ctx, temp_producer)).unwrap();Sourcepub fn consumer<T>(&self) -> Consumer<T, R>
pub fn consumer<T>(&self) -> Consumer<T, R>
Creates a type-safe consumer for a specific record type
Returns a Consumer<T, R> that can only subscribe to values of type T.
This is the recommended way to pass database access to consumer services,
following the principle of least privilege.
§Example
let db = builder.build()?;
let temp_consumer = db.consumer::<Temperature>();
// Pass to service - it can only consume Temperature values
runtime.spawn(temperature_monitor(ctx, temp_consumer)).unwrap();Sourcepub fn runtime(&self) -> &R
pub fn runtime(&self) -> &R
Returns a reference to the runtime adapter
Provides direct access to the concrete runtime type.
Sourcepub fn list_records(&self) -> Vec<RecordMetadata>
pub fn list_records(&self) -> Vec<RecordMetadata>
Sourcepub fn try_latest_as_json(&self, record_name: &str) -> Option<Value>
pub fn try_latest_as_json(&self, record_name: &str) -> Option<Value>
Sourcepub fn set_record_from_json(
&self,
record_name: &str,
json_value: Value,
) -> DbResult<()>
pub fn set_record_from_json( &self, record_name: &str, json_value: Value, ) -> DbResult<()>
Sets a record value from JSON (remote access API)
Deserializes JSON and produces the value to the record’s buffer.
SAFETY: Enforces “No Producer Override” rule - only works for configuration records without active producers.
§Arguments
record_name- Full Rust type namejson_value- JSON value to set
§Returns
Ok(()) on success, error if record not found, has producers, or deserialization fails
§Example (internal use)
db.set_record_from_json("AppConfig", json!({"debug": true}))?;Sourcepub fn subscribe_record_updates(
&self,
type_id: TypeId,
queue_size: usize,
) -> DbResult<(Receiver<Value>, Sender<()>)>
pub fn subscribe_record_updates( &self, type_id: TypeId, queue_size: usize, ) -> DbResult<(Receiver<Value>, Sender<()>)>
Subscribe to record updates as JSON stream (std only)
Creates a subscription to a record’s buffer and forwards updates as JSON
to a bounded channel. This is used internally by the remote access protocol
for implementing record.subscribe.
§Architecture
Spawns a consumer task that:
- Subscribes to the record’s buffer using the existing buffer API
- Reads values as they arrive
- Serializes each value to JSON
- Sends JSON values to a bounded channel (with backpressure handling)
- Terminates when either:
- The cancel signal is received (unsubscribe)
- The channel receiver is dropped (client disconnected)
§Arguments
type_id- TypeId of the record to subscribe toqueue_size- Size of the bounded channel for this subscription
§Returns
Ok((receiver, cancel_tx)) where:
receiver: Bounded channel receiver for JSON valuescancel_tx: One-shot sender to cancel the subscription
Err if:
- Record not found for the given TypeId
- Record not configured with
.with_serialization() - Failed to subscribe to buffer
§Example (internal use)
let type_id = TypeId::of::<Temperature>();
let (mut rx, cancel_tx) = db.subscribe_record_updates(type_id, 100)?;
// Read events
while let Some(json_value) = rx.recv().await {
// Forward to client...
}
// Cancel subscription
let _ = cancel_tx.send(());Sourcepub fn collect_inbound_routes(
&self,
scheme: &str,
) -> Vec<(String, Box<dyn ProducerTrait>, DeserializerFn)>
pub fn collect_inbound_routes( &self, scheme: &str, ) -> Vec<(String, Box<dyn ProducerTrait>, DeserializerFn)>
Collects inbound connector routes for automatic router construction (std only)
Iterates all records, filters their inbound_connectors by scheme, and returns routes with producer creation callbacks.
§Arguments
scheme- URL scheme to filter by (e.g., “mqtt”, “kafka”)
§Returns
Vector of tuples: (topic, producer_trait, deserializer)
§Example
// In MqttConnector after db.build()
let routes = db.collect_inbound_routes("mqtt");
let router = RouterBuilder::from_routes(routes).build();
connector.set_router(router).await?;Sourcepub fn collect_outbound_routes(
&self,
scheme: &str,
) -> Vec<(String, Box<dyn ConsumerTrait>, SerializerFn, Vec<(String, String)>)>
pub fn collect_outbound_routes( &self, scheme: &str, ) -> Vec<(String, Box<dyn ConsumerTrait>, SerializerFn, Vec<(String, String)>)>
Collects outbound routes for a specific protocol scheme
Mirrors collect_inbound_routes() for symmetry. Iterates all records,
filters their outbound_connectors by scheme, and returns routes with
consumer creation callbacks.
This method is called by connectors during their build() phase to
collect all configured outbound routes and spawn publisher tasks.
§Arguments
scheme- URL scheme to filter by (e.g., “mqtt”, “kafka”)
§Returns
Vector of tuples: (destination, consumer_trait, serializer, config)
The config Vec contains protocol-specific options (e.g., qos, retain).
§Example
// In MqttConnector::build()
let routes = db.collect_outbound_routes("mqtt");
for (topic, consumer, serializer, config) in routes {
connector.spawn_publisher(topic, consumer, serializer, config)?;
}