pub struct SyncEngine { /* private fields */ }Expand description
Main sync engine coordinator.
Manages the three-tier storage architecture:
- L1: In-memory DashMap with pressure-based eviction
- L2: Redis cache with batch writes
- L3: MySQL/SQLite archive (ground truth)
§Thread Safety
The engine is Send + Sync and designed for concurrent access.
Internal state uses atomic operations and concurrent data structures.
Implementations§
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn contains(&self, id: &str) -> bool
pub async fn contains(&self, id: &str) -> bool
Check if an item exists across all tiers.
Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query. If found in SQL and Cuckoo filter was untrusted, updates the filter.
§Returns
true→ item definitely exists in at least one tierfalse→ item does not exist (authoritative)
§Example
if engine.contains("user.123").await {
let item = engine.get("user.123").await;
} else {
println!("Not found");
}Sourcepub fn contains_fast(&self, id: &str) -> bool
pub fn contains_fast(&self, id: &str) -> bool
Fast sync check if item might exist (L1 + Cuckoo only).
Use this when you need a quick probabilistic check without async.
For authoritative check, use contains() instead.
Sourcepub async fn status(&self, id: &str) -> ItemStatus
pub async fn status(&self, id: &str) -> ItemStatus
Get the sync status of an item.
Returns detailed state information about where an item exists and its sync status across tiers.
§Example
match engine.status("order.456").await {
ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
}
ItemStatus::Pending => println!("Queued for sync"),
ItemStatus::Missing => println!("Not found"),
}Sourcepub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>>
pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>>
Fetch multiple items in parallel.
Returns a vector of Option<SyncItem> in the same order as input IDs.
Missing items are represented as None.
§Performance
This method fetches from L1 synchronously, then batches L2/L3 lookups
for items not in L1. Much faster than sequential get() calls.
§Example
let ids = vec!["user.1", "user.2", "user.3"];
let items = engine.get_many(&ids).await;
for (id, item) in ids.iter().zip(items.iter()) {
match item {
Some(item) => println!("{}: found", id),
None => println!("{}: missing", id),
}
}Sourcepub async fn submit_many(
&self,
items: Vec<SyncItem>,
) -> Result<BatchResult, StorageError>
pub async fn submit_many( &self, items: Vec<SyncItem>, ) -> Result<BatchResult, StorageError>
Submit multiple items for sync atomically.
All items are added to L1 and queued for batch persistence.
Returns a BatchResult with success/failure counts.
§Example
let items = vec![
SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
];
let result = engine.submit_many(items).await.unwrap();
println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);Sourcepub async fn delete_many(
&self,
ids: &[&str],
) -> Result<BatchResult, StorageError>
pub async fn delete_many( &self, ids: &[&str], ) -> Result<BatchResult, StorageError>
Delete multiple items atomically.
Removes items from all tiers (L1, L2, L3) and updates filters.
Returns a BatchResult with counts.
§Example
let ids = vec!["user.1", "user.2", "user.3"];
let result = engine.delete_many(&ids).await.unwrap();
println!("Deleted: {}", result.succeeded);Sourcepub async fn get_or_insert_with<F, Fut>(
&self,
id: &str,
factory: F,
) -> Result<SyncItem, StorageError>
pub async fn get_or_insert_with<F, Fut>( &self, id: &str, factory: F, ) -> Result<SyncItem, StorageError>
Get an item, or compute and insert it if missing.
This is the classic “get or insert” pattern, useful for cache-aside:
- Check cache (L1 → L2 → L3)
- If missing, call the async factory function
- Insert the result and return it
The factory is only called if the item is not found.
§Example
let item = engine.get_or_insert_with("user.123", || async {
// Expensive operation - only runs if not cached
SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
}).await.unwrap();Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn start(&mut self) -> Result<(), StorageError>
pub async fn start(&mut self) -> Result<(), StorageError>
Start the engine - connect to backends with proper startup sequence.
Startup flow (trust hierarchy):
- Initialize WAL (SQLite) - always first, our durability lifeline
- Connect to SQL (L3) - ground truth, initialize SQL merkle store
- Drain any pending WAL entries to SQL (blocking)
- Get SQL merkle root (this is our trusted root)
- Load CF snapshots from SQLite:
- If snapshot merkle root matches SQL root → CF is trusted
- Otherwise → CF must be rebuilt from SQL scan
- Connect to Redis (L2) - cache layer
- Compare Redis merkle root with SQL merkle root
- If match → Redis is synced
- If mismatch → use branch diff to find stale regions and resync
- Ready!
Sourcepub async fn warm_up(&mut self) -> Result<(), StorageError>
pub async fn warm_up(&mut self) -> Result<(), StorageError>
Warm up L1 cache from L2/L3 (call after start)
Sourcepub async fn tick(&self)
pub async fn tick(&self)
Perform one tick of maintenance (for manual control instead of run loop).
Sourcepub async fn force_flush(&self)
pub async fn force_flush(&self)
Force flush all pending L2 batches immediately.
Source§impl SyncEngine
impl SyncEngine
Sourcepub fn new(
config: SyncEngineConfig,
config_rx: Receiver<SyncEngineConfig>,
) -> Self
pub fn new( config: SyncEngineConfig, config_rx: Receiver<SyncEngineConfig>, ) -> Self
Create a new sync engine.
The engine starts in Created state. Call start()
to connect to backends and transition to Ready.
Sourcepub fn state(&self) -> EngineState
pub fn state(&self) -> EngineState
Get current engine state.
Sourcepub fn state_receiver(&self) -> Receiver<EngineState>
pub fn state_receiver(&self) -> Receiver<EngineState>
Get a receiver to watch state changes.
Sourcepub fn memory_pressure(&self) -> f64
pub fn memory_pressure(&self) -> f64
Get current memory pressure (0.0 - 1.0+).
Sourcepub fn pressure(&self) -> BackpressureLevel
pub fn pressure(&self) -> BackpressureLevel
Get current backpressure level.
Sourcepub fn should_accept_writes(&self) -> bool
pub fn should_accept_writes(&self) -> bool
Check if the engine should accept writes (based on pressure).
Sourcepub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>
pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>
Get an item by ID.
Checks storage tiers in order: L1 → L2 → L3. Updates access count and promotes to L1 on hit.
Sourcepub async fn get_verified(
&self,
id: &str,
) -> Result<Option<SyncItem>, StorageError>
pub async fn get_verified( &self, id: &str, ) -> Result<Option<SyncItem>, StorageError>
Get an item with hash verification.
If the item has a non-empty merkle_root, the content hash is verified.
Returns StorageError::Corruption if the hash doesn’t match.
Sourcepub async fn submit(&self, item: SyncItem) -> Result<(), StorageError>
pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError>
Submit an item for sync.
The item is immediately stored in L1 and queued for batch write to L2/L3. Uses default options: Redis + SQL (both enabled). Filters are updated only on successful writes in flush_batch_internal().
For custom routing, use submit_with.
Sourcepub async fn submit_with(
&self,
item: SyncItem,
options: SubmitOptions,
) -> Result<(), StorageError>
pub async fn submit_with( &self, item: SyncItem, options: SubmitOptions, ) -> Result<(), StorageError>
Submit an item with custom routing options.
The item is immediately stored in L1 and queued for batch write. Items are batched by compatible options for efficient pipelined writes.
§Example
// Cache-only with 1 minute TTL (no SQL write)
let item = SyncItem::new("cache.key".into(), b"data".to_vec());
engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
// SQL-only durable storage (no Redis)
let item = SyncItem::new("archive.key".into(), b"data".to_vec());
engine.submit_with(item, SubmitOptions::durable()).await?;Sourcepub async fn delete(&self, id: &str) -> Result<bool, StorageError>
pub async fn delete(&self, id: &str) -> Result<bool, StorageError>
Delete an item from all storage tiers.
Deletes are more complex than writes because the item may exist in:
- L1 (DashMap) - immediate removal
- L2 (Redis) - async removal
- L3 (MySQL) - async removal
- Cuckoo filters (L2/L3) - remove from both
- Merkle trees - update with deletion marker
Sourcepub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust)
pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust)
Get L3 filter stats (entries, capacity, trust_state)
Sourcepub fn l3_filter(&self) -> &Arc<FilterManager>
pub fn l3_filter(&self) -> &Arc<FilterManager>
Get access to the L3 filter (for warmup/verification)
Sourcepub async fn merkle_roots(&self) -> (Option<String>, Option<String>)
pub async fn merkle_roots(&self) -> (Option<String>, Option<String>)
Get merkle root hashes from Redis (L2) and SQL (L3).
Returns (redis_root, sql_root) as hex strings.
Returns None for backends that aren’t connected or have empty trees.
Sourcepub async fn verify_filter(&self) -> bool
pub async fn verify_filter(&self) -> bool
Verify and trust the L3 cuckoo filter.
Compares the filter’s merkle root against L3’s merkle root. If they match, marks the filter as trusted.
Returns true if the filter is now trusted, false otherwise.
Sourcepub fn update_gauge_metrics(&self)
pub fn update_gauge_metrics(&self)
Update all gauge metrics with current engine state.
Call this before snapshotting metrics to ensure gauges reflect current state. Useful for OTEL export or monitoring dashboards.
Auto Trait Implementations§
impl !Freeze for SyncEngine
impl !RefUnwindSafe for SyncEngine
impl Send for SyncEngine
impl Sync for SyncEngine
impl Unpin for SyncEngine
impl !UnwindSafe for SyncEngine
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more