aimdb-sync
Synchronous API wrapper for AimDB - blocking operations for async database.
Overview
aimdb-sync provides a synchronous interface to AimDB, enabling blocking operations on the async database. Perfect for FFI bindings, legacy codebases, simple scripts, and situations where async is impractical.
Key Features:
- Pure Sync Context: Works in plain
fn main() - no #[tokio::main] required
- Blocking Operations: Familiar sync API (set, get, try_get, etc.)
- Thread-Safe: All types are
Send + Sync, shareable across threads
- Type-Safe: Full compile-time type safety with generics
- Timeout Support: All operations support configurable timeouts
Architecture
┌──────────────────────────────┐
│ Synchronous Context │
│ (User Code) │
└──────────────┬───────────────┘
│ SyncProducer<T>
│ SyncConsumer<T>
▼
┌──────────────────────────────┐
│ Channel Bridge │
│ (tokio::sync::mpsc + │
│ std::sync::mpsc) │
└──────────────┬───────────────┘
│
▼
┌──────────────────────────────┐
│ Async Context │
│ (AimDB + Tokio Runtime) │
│ (Background Thread) │
└──────────────────────────────┘
Quick Start
Add to your Cargo.toml:
[dependencies]
aimdb-sync = "0.1"
aimdb-core = "0.1"
aimdb-tokio-adapter = "0.1"
Basic Example
use aimdb_core::{AimDbBuilder, buffer::BufferCfg};
use aimdb_sync::AimDbBuilderSyncExt;
use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Temperature {
celsius: f32,
sensor_id: String,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = Arc::new(TokioAdapter);
let mut builder = AimDbBuilder::new().runtime(adapter);
builder.configure::<Temperature>(|reg| {
reg.buffer(BufferCfg::SingleLatest);
});
let handle = builder.attach()?;
let producer = handle.producer::<Temperature>()?;
let consumer = handle.consumer::<Temperature>()?;
let prod_handle = std::thread::spawn(move || {
for i in 0..10 {
let temp = Temperature {
celsius: 20.0 + i as f32,
sensor_id: format!("sensor-{}", i),
};
producer.set(temp).unwrap();
std::thread::sleep(Duration::from_millis(100));
}
});
let cons_handle = std::thread::spawn(move || {
for _ in 0..10 {
let temp = consumer.get().unwrap();
println!("Temperature: {}°C from {}", temp.celsius, temp.sensor_id);
}
});
prod_handle.join().unwrap();
cons_handle.join().unwrap();
handle.detach()?;
Ok(())
}
Producer Operations
SyncProducer<T> provides blocking send operations:
Blocking Send
let producer = handle.producer::<Temperature>()?;
let temp = Temperature {
celsius: 23.5,
sensor_id: "sensor-001".to_string()
};
producer.set(temp)?;
Send with Timeout
use std::time::Duration;
match producer.set_with_timeout(temp, Duration::from_secs(1)) {
Ok(_) => println!("Sent successfully"),
Err(e) => eprintln!("Timeout or error: {}", e),
}
Non-Blocking Send
match producer.try_set(temp) {
Ok(_) => println!("Sent immediately"),
Err(e) => eprintln!("Channel full or error: {}", e),
}
Consumer Operations
SyncConsumer<T> provides blocking receive operations:
Blocking Receive
let consumer = handle.consumer::<Temperature>()?;
let temp = consumer.get()?;
println!("Received: {}°C", temp.celsius);
Receive with Timeout
use std::time::Duration;
match consumer.get_with_timeout(Duration::from_secs(5)) {
Ok(temp) => println!("Got: {}°C", temp.celsius),
Err(e) => eprintln!("Timeout or error: {}", e),
}
Non-Blocking Receive
match consumer.try_get() {
Ok(temp) => println!("Got: {}°C", temp.celsius),
Err(e) => eprintln!("No value available: {}", e),
}
Multi-Consumer Pattern
Multiple consumers can receive from the same record:
use aimdb_core::{AimDbBuilder, buffer::BufferCfg};
use aimdb_sync::AimDbBuilderSyncExt;
use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt};
use std::sync::Arc;
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = Arc::new(TokioAdapter);
let mut builder = AimDbBuilder::new().runtime(adapter);
builder.configure::<Temperature>(|reg| {
reg.buffer(BufferCfg::SpmcRing { capacity: 16 });
});
let handle = builder.attach()?;
let producer = handle.producer::<Temperature>()?;
let mut handles = vec![];
for id in 0..3 {
let consumer = handle.consumer::<Temperature>()?;
let handle = std::thread::spawn(move || {
loop {
match consumer.get_with_timeout(Duration::from_secs(1)) {
Ok(temp) => println!("Consumer {}: {}°C", id, temp.celsius),
Err(_) => break,
}
}
});
handles.push(handle);
}
for i in 0..10 {
producer.set(Temperature {
celsius: 20.0 + i as f32,
sensor_id: "main".to_string(),
})?;
std::thread::sleep(Duration::from_millis(100));
}
for handle in handles {
handle.join().unwrap();
}
handle.detach()?;
Ok(())
}
Thread Safety
All sync types are Send + Sync:
use std::sync::Arc;
let producer = Arc::new(handle.producer::<Temperature>()?);
let consumer = Arc::new(handle.consumer::<Temperature>()?);
let prod_clone = producer.clone();
std::thread::spawn(move || {
prod_clone.set(Temperature { celsius: 25.0, sensor_id: "s1".to_string() }).ok();
});
let cons_clone = consumer.clone();
std::thread::spawn(move || {
let value = cons_clone.get().ok();
});
Error Handling
use aimdb_core::DbError;
match producer.set(temp) {
Ok(_) => println!("Success"),
Err(DbError::SetTimeout) => {
eprintln!("Operation timed out");
}
Err(DbError::RuntimeShutdown) => {
eprintln!("Runtime thread has stopped");
}
Err(e) => {
eprintln!("Error: {}", e);
}
}
Common error types:
DbError::SetTimeout / DbError::GetTimeout: Operation exceeded timeout
DbError::RuntimeShutdown: Runtime thread stopped or channel closed
DbError::RecordNotFound: Type not registered in database
DbError::AttachFailed: Failed to start runtime thread
Configuration Options
Buffer Types
Choose buffer based on use case:
use aimdb_core::buffer::BufferCfg;
builder.configure::<MyData>(|reg| {
reg.buffer(BufferCfg::SpmcRing { capacity: 100 });
});
builder.configure::<MyData>(|reg| {
reg.buffer(BufferCfg::SingleLatest);
});
builder.configure::<MyData>(|reg| {
reg.buffer(BufferCfg::Mailbox);
});
Channel Capacity
Control the sync bridge channel size:
let producer = handle.producer::<Temperature>()?;
let consumer = handle.consumer::<Temperature>()?;
let producer = handle.producer_with_capacity::<Temperature>(1000)?;
let consumer = handle.consumer_with_capacity::<Temperature>(1000)?;
Shutdown
Database automatically shuts down when dropped:
fn main() -> Result<(), Box<dyn std::error::Error>> {
let handle = builder.attach()?;
handle.detach()?;
Ok(())
}
Use Cases
Legacy Integration
use aimdb_core::{AimDbBuilder, buffer::BufferCfg};
use aimdb_sync::{AimDbBuilderSyncExt, AimDbHandle};
use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt};
use std::sync::Arc;
use std::time::Duration;
pub struct LegacyAdapter {
handle: AimDbHandle,
}
impl LegacyAdapter {
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
let adapter = Arc::new(TokioAdapter);
let mut builder = AimDbBuilder::new().runtime(adapter);
builder.configure::<SensorData>(|reg| {
reg.buffer(BufferCfg::SpmcRing { capacity: 100 });
});
let handle = builder.attach()?;
Ok(Self { handle })
}
pub fn send_sensor_data(&self, data: SensorData) -> Result<(), String> {
let producer = self.handle.producer::<SensorData>()
.map_err(|e| e.to_string())?;
producer.set_with_timeout(data, Duration::from_secs(1))
.map_err(|e| e.to_string())
}
pub fn read_sensor_data(&self) -> Result<SensorData, String> {
let consumer = self.handle.consumer::<SensorData>()
.map_err(|e| e.to_string())?;
consumer.get_with_timeout(Duration::from_secs(1))
.map_err(|e| e.to_string())
}
pub fn shutdown(self) -> Result<(), String> {
self.handle.detach().map_err(|e| e.to_string())
}
}
Simple Scripts
use aimdb_core::{AimDbBuilder, buffer::BufferCfg};
use aimdb_sync::AimDbBuilderSyncExt;
use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt};
use std::sync::Arc;
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = Arc::new(TokioAdapter);
let mut builder = AimDbBuilder::new().runtime(adapter);
builder.configure::<LogMessage>(|reg| {
reg.buffer(BufferCfg::SpmcRing { capacity: 100 });
});
let handle = builder.attach()?;
let producer = handle.producer::<LogMessage>()?;
let consumer = handle.consumer::<LogMessage>()?;
loop {
let log = read_log_from_file();
producer.set(log)?;
if let Ok(msg) = consumer.try_get() {
print_to_console(msg);
}
std::thread::sleep(Duration::from_millis(100));
}
}
Performance Considerations
Overhead
- Channel crossing adds ~1-10μs latency
- Background runtime uses dedicated threads
- Memory: One tokio::mpsc channel per producer, one std::mpsc channel per consumer
Optimization Tips
- Batch Operations: Group multiple sets/gets when possible
- Avoid Blocking: Use
try_* methods in latency-sensitive paths
- Channel Capacity: Tune for expected throughput
- Thread Count: Match runtime_threads to workload
Testing
cargo test -p aimdb-sync
RUST_LOG=debug cargo test -p aimdb-sync -- --nocapture
cargo bench -p aimdb-sync
Complete Examples
See repository examples:
examples/sync-api-demo - Full synchronous integration
Documentation
Generate API docs:
cargo doc -p aimdb-sync --open
License
See LICENSE file.