pub struct SyncEngine { /* private fields */ }Expand description
Main sync engine coordinator.
Manages the three-tier storage architecture:
- L1: In-memory DashMap with pressure-based eviction
- L2: Redis cache with batch writes
- L3: MySQL/SQLite archive (ground truth)
§Thread Safety
The engine is Send + Sync and designed for concurrent access.
Internal state uses atomic operations and concurrent data structures.
Implementations§
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn contains(&self, id: &str) -> bool
pub async fn contains(&self, id: &str) -> bool
Check if an item exists across all tiers.
Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query. If found in SQL and Cuckoo filter was untrusted, updates the filter.
§Returns
true→ item definitely exists in at least one tierfalse→ item does not exist (authoritative)
§Example
if engine.contains("user.123").await {
let item = engine.get("user.123").await;
} else {
println!("Not found");
}Sourcepub fn definitely_missing(&self, id: &str) -> bool
pub fn definitely_missing(&self, id: &str) -> bool
Fast check: is this item definitely NOT in L3?
Uses the Cuckoo filter for a fast authoritative negative.
- Returns
true→ item is definitely not in L3 (safe to skip) - Returns
false→ item might exist (need to check L3)
Only meaningful when the L3 filter is trusted. If untrusted, returns false
(meaning “we don’t know, you should check”).
§Use Case
Fast early-exit in replication: if definitely missing, apply without checking.
if engine.definitely_missing("patient.123") {
// Fast path: definitely new, just insert
println!("New item, inserting directly");
} else {
// Slow path: might exist, check hash
if !engine.is_current("patient.123", "abc123...").await {
println!("Outdated, updating");
}
}Sourcepub fn might_exist(&self, id: &str) -> bool
pub fn might_exist(&self, id: &str) -> bool
Fast check: might this item exist somewhere?
Checks L1 cache (partial, evicts) and Cuckoo filter (probabilistic).
- Returns
true→ item is in L1 OR might be in L3 (worth checking) - Returns
false→ item is definitely not in L1 or L3
Note: L1 is partial (items evict), so this can return false even if
the item exists in L2/L3. For authoritative check, use contains().
§Use Case
Quick probabilistic check before expensive async lookup.
Sourcepub async fn is_current(&self, id: &str, content_hash: &str) -> bool
pub async fn is_current(&self, id: &str, content_hash: &str) -> bool
Check if the item at key has the given content hash.
This is the semantic API for CDC deduplication in replication.
Returns true if the item exists AND its content hash matches.
Returns false if item doesn’t exist OR hash differs.
§Arguments
id- Object IDcontent_hash- SHA256 hash of content (hex-encoded string)
§Example
// Skip replication if we already have this version
let incoming_hash = "abc123...";
if engine.is_current("patient.123", incoming_hash).await {
println!("Already up to date, skipping");
return;
}Sourcepub async fn status(&self, id: &str) -> ItemStatus
pub async fn status(&self, id: &str) -> ItemStatus
Get the sync status of an item.
Returns detailed state information about where an item exists and its sync status across tiers.
§Example
match engine.status("order.456").await {
ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
}
ItemStatus::Pending => println!("Queued for sync"),
ItemStatus::Missing => println!("Not found"),
}Sourcepub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>>
pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>>
Fetch multiple items in parallel.
Returns a vector of Option<SyncItem> in the same order as input IDs.
Missing items are represented as None.
§Performance
This method fetches from L1 synchronously, then batches L2/L3 lookups
for items not in L1. Much faster than sequential get() calls.
§Example
let ids = vec!["user.1", "user.2", "user.3"];
let items = engine.get_many(&ids).await;
for (id, item) in ids.iter().zip(items.iter()) {
match item {
Some(item) => println!("{}: found", id),
None => println!("{}: missing", id),
}
}Sourcepub async fn submit_many(
&self,
items: Vec<SyncItem>,
) -> Result<BatchResult, StorageError>
pub async fn submit_many( &self, items: Vec<SyncItem>, ) -> Result<BatchResult, StorageError>
Submit multiple items for sync atomically.
All items are added to L1 and queued for batch persistence.
Returns a BatchResult with success/failure counts.
§Example
let items = vec![
SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
];
let result = engine.submit_many(items).await.unwrap();
println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);Sourcepub async fn delete_many(
&self,
ids: &[&str],
) -> Result<BatchResult, StorageError>
pub async fn delete_many( &self, ids: &[&str], ) -> Result<BatchResult, StorageError>
Delete multiple items atomically.
Removes items from all tiers (L1, L2, L3) and updates filters.
Returns a BatchResult with counts.
§Example
let ids = vec!["user.1", "user.2", "user.3"];
let result = engine.delete_many(&ids).await.unwrap();
println!("Deleted: {}", result.succeeded);Sourcepub async fn get_or_insert_with<F, Fut>(
&self,
id: &str,
factory: F,
) -> Result<SyncItem, StorageError>
pub async fn get_or_insert_with<F, Fut>( &self, id: &str, factory: F, ) -> Result<SyncItem, StorageError>
Get an item, or compute and insert it if missing.
This is the classic “get or insert” pattern, useful for cache-aside:
- Check cache (L1 → L2 → L3)
- If missing, call the async factory function
- Insert the result and return it
The factory is only called if the item is not found.
§Example
let item = engine.get_or_insert_with("user.123", || async {
// Expensive operation - only runs if not cached
SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
}).await.unwrap();Sourcepub async fn get_by_state(
&self,
state: &str,
limit: usize,
) -> Result<Vec<SyncItem>, StorageError>
pub async fn get_by_state( &self, state: &str, limit: usize, ) -> Result<Vec<SyncItem>, StorageError>
Get items by state from SQL (L3 ground truth).
Uses indexed query for fast retrieval.
§Example
// Get all delta items for CRDT merging
let deltas = engine.get_by_state("delta", 1000).await?;
for item in deltas {
println!("Delta: {}", item.object_id);
}Sourcepub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError>
pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError>
Count items in a given state (SQL ground truth).
§Example
let pending_count = engine.count_by_state("pending").await?;
println!("{} items pending", pending_count);Sourcepub async fn list_state_ids(
&self,
state: &str,
limit: usize,
) -> Result<Vec<String>, StorageError>
pub async fn list_state_ids( &self, state: &str, limit: usize, ) -> Result<Vec<String>, StorageError>
Get just the IDs of items in a given state (lightweight query).
Returns IDs from SQL. For Redis state SET, use list_state_ids_redis().
Sourcepub async fn set_state(
&self,
id: &str,
new_state: &str,
) -> Result<bool, StorageError>
pub async fn set_state( &self, id: &str, new_state: &str, ) -> Result<bool, StorageError>
Update the state of an item by ID.
Updates both SQL (ground truth) and Redis state SETs. L1 cache is NOT updated - caller should re-fetch if needed.
Returns true if the item was found and updated.
Sourcepub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError>
pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError>
Delete all items in a given state from SQL.
Also removes from L1 cache and Redis state SET. Returns the number of deleted items.
§Example
// Clean up all processed deltas
let deleted = engine.delete_by_state("delta").await?;
println!("Deleted {} delta items", deleted);Sourcepub async fn scan_prefix(
&self,
prefix: &str,
limit: usize,
) -> Result<Vec<SyncItem>, StorageError>
pub async fn scan_prefix( &self, prefix: &str, limit: usize, ) -> Result<Vec<SyncItem>, StorageError>
Scan items by ID prefix.
Retrieves all items whose ID starts with the given prefix. Queries SQL (ground truth) directly - does NOT check L1 cache.
Useful for CRDT delta-first architecture where deltas are stored as:
delta:{object_id}:{op_id} and you need to fetch all deltas for an object.
§Example
// Get base state
let base = engine.get("base:user.123").await?;
// Get all pending deltas for this object
let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;
// Merge on-the-fly for read-repair
for delta in deltas {
println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
}Sourcepub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError>
pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError>
Count items matching an ID prefix (SQL ground truth).
Sourcepub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError>
pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError>
Delete all items matching an ID prefix.
Removes from L1 cache, SQL, and Redis. Returns the number of deleted items.
§Example
// After merging deltas into base, clean them up
let deleted = engine.delete_prefix("delta:user.123:").await?;
println!("Cleaned up {} deltas", deleted);Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn start(&mut self) -> Result<(), StorageError>
pub async fn start(&mut self) -> Result<(), StorageError>
Start the engine - connect to backends with proper startup sequence.
Startup flow (trust hierarchy):
- Initialize WAL (SQLite) - always first, our durability lifeline
- Connect to SQL (L3) - ground truth, initialize SQL merkle store
- Drain any pending WAL entries to SQL (blocking)
- Get SQL merkle root (this is our trusted root)
- Load CF snapshots from SQLite:
- If snapshot merkle root matches SQL root → CF is trusted
- Otherwise → CF must be rebuilt from SQL scan
- Connect to Redis (L2) - cache layer
- Compare Redis merkle root with SQL merkle root
- If match → Redis is synced
- If mismatch → use branch diff to find stale regions and resync
- Ready!
Sourcepub async fn warm_up(&mut self) -> Result<(), StorageError>
pub async fn warm_up(&mut self) -> Result<(), StorageError>
Warm up L1 cache from L2/L3 (call after start)
Sourcepub async fn tick(&self)
pub async fn tick(&self)
Perform one tick of maintenance (for manual control instead of run loop).
Sourcepub async fn force_flush(&self)
pub async fn force_flush(&self)
Force flush all pending L2 batches immediately.
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>, StorageError>
pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>, StorageError>
Get the current merkle root hash.
Returns the root hash from SQL (ground truth), or from Redis if they match. This is the starting point for sync verification.
§Returns
Some([u8; 32])- The root hashNone- No data has been written yet (empty tree)
Sourcepub async fn get_merkle_hash(
&self,
path: &str,
) -> Result<Option<[u8; 32]>, StorageError>
pub async fn get_merkle_hash( &self, path: &str, ) -> Result<Option<[u8; 32]>, StorageError>
Sourcepub async fn get_merkle_node(
&self,
path: &str,
) -> Result<Option<MerkleNode>, StorageError>
pub async fn get_merkle_node( &self, path: &str, ) -> Result<Option<MerkleNode>, StorageError>
Get a full merkle node (hash + children).
Use this for tree traversal during sync.
§Arguments
path- The merkle path (use “” for root)
Sourcepub async fn get_merkle_children(
&self,
path: &str,
) -> Result<BTreeMap<String, [u8; 32]>, StorageError>
pub async fn get_merkle_children( &self, path: &str, ) -> Result<BTreeMap<String, [u8; 32]>, StorageError>
Sourcepub async fn find_divergent_paths(
&self,
remote_nodes: &[(String, [u8; 32])],
) -> Result<MerkleDiff, StorageError>
pub async fn find_divergent_paths( &self, remote_nodes: &[(String, [u8; 32])], ) -> Result<MerkleDiff, StorageError>
Find paths where local and remote merkle trees diverge.
This is the main sync algorithm - given remote node’s hashes, find the minimal set of paths that need to be synced.
§Arguments
remote_nodes- Pairs of (path, hash) from the remote node
§Returns
Paths where hashes differ (need to fetch/push data)
§Algorithm
For each remote (path, hash) pair:
- Get local hash for that path
- If hashes match → subtree is synced, skip
- If hashes differ → add to divergent list
- If local missing → add to remote_only
- If remote missing → add to local_only
Sourcepub async fn is_merkle_synced(&self) -> Result<bool, StorageError>
pub async fn is_merkle_synced(&self) -> Result<bool, StorageError>
Check if Redis merkle tree is in sync with SQL ground truth.
This is used to determine whether we can serve merkle queries from Redis (fast) or need to fall back to SQL (slow but authoritative).
Sourcepub async fn drill_down_divergence<F, Fut>(
&self,
start_path: &str,
remote_children: F,
) -> Result<Vec<String>, StorageError>
pub async fn drill_down_divergence<F, Fut>( &self, start_path: &str, remote_children: F, ) -> Result<Vec<String>, StorageError>
Drill down the merkle tree to find all divergent leaf paths.
Starting from a known-divergent path, recursively descend until we find the actual leaves that need syncing.
§Arguments
start_path- A path known to have divergent hashesremote_children- Function to get children from remote node
§Returns
List of leaf paths that need to be synced
Source§impl SyncEngine
impl SyncEngine
Sourcepub async fn create_search_index(
&self,
index: SearchIndex,
) -> Result<(), StorageError>
pub async fn create_search_index( &self, index: SearchIndex, ) -> Result<(), StorageError>
Register a search index.
Creates the index in RediSearch using FT.CREATE. The index will automatically index all JSON documents with matching key prefix.
§Example
// Define index schema
let index = SearchIndex::new("users", "crdt:users:")
.text_sortable("name")
.text("email")
.numeric_sortable("age")
.tag("roles");
// Create in RediSearch
engine.create_search_index(index).await?;Sourcepub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError>
pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError>
Drop a search index.
Removes the index from RediSearch. Does not delete the indexed documents.
Sourcepub async fn search(
&self,
index_name: &str,
query: &Query,
) -> Result<SearchResult, StorageError>
pub async fn search( &self, index_name: &str, query: &Query, ) -> Result<SearchResult, StorageError>
Search for items using RediSearch query syntax.
Searches the specified index using FT.SEARCH. For durable data (crdt: prefix), falls back to SQL if Redis returns no results.
§Arguments
index_name- Name of the search index (without “idx:” prefix)query- Query AST built withQuery::constructors
§Example
// Simple field query
let results = engine.search("users", &Query::field_eq("name", "Alice")).await?;
// Complex query with AND/OR
let query = Query::field_eq("status", "active")
.and(Query::numeric_range("age", Some(25.0), Some(40.0)));
let results = engine.search("users", &query).await?;
for item in results.items {
println!("Found: {}", item.object_id);
}Sourcepub async fn search_with_options(
&self,
index_name: &str,
query: &Query,
tier: SearchTier,
limit: usize,
) -> Result<SearchResult, StorageError>
pub async fn search_with_options( &self, index_name: &str, query: &Query, tier: SearchTier, limit: usize, ) -> Result<SearchResult, StorageError>
Search with explicit tier and limit options.
Sourcepub async fn search_raw(
&self,
index_name: &str,
query_str: &str,
limit: usize,
) -> Result<Vec<SyncItem>, StorageError>
pub async fn search_raw( &self, index_name: &str, query_str: &str, limit: usize, ) -> Result<Vec<SyncItem>, StorageError>
Search using raw RediSearch query string (Redis-only, no SQL fallback).
Use this when you need the full power of RediSearch syntax without the Query AST. This is an advanced API with caveats:
- No SQL fallback: If Redis is unavailable, this will fail
- No search cache: Results are not cached via the merkle system
- Manual paths: You must use
$.payload.{field}paths in your query - No translation: The query string is passed directly to FT.SEARCH
Prefer search() or search_with_options() for most use cases.
§Example
// Raw RediSearch query with explicit payload paths
let results = engine.search_raw(
"users",
"@name:(Alice Smith) @age:[25 35]",
100
).await?;Sourcepub async fn search_sql(
&self,
query: &Query,
limit: usize,
) -> Result<Vec<SyncItem>, StorageError>
pub async fn search_sql( &self, query: &Query, limit: usize, ) -> Result<Vec<SyncItem>, StorageError>
Direct SQL search (bypasses Redis, SQL-only).
Queries the SQL archive directly using JSON_EXTRACT. This is an advanced API with specific use cases:
- No Redis: Bypasses L2 cache entirely
- No caching: Results are not cached via the merkle system
- Ground truth: Queries the durable SQL archive
Useful for:
- Analytics queries that need complete data
- When Redis is unavailable or not trusted
- Bulk operations on archived data
Prefer search() or search_with_options() for most use cases.
Sourcepub fn search_cache_stats(&self) -> Option<SearchCacheStats>
pub fn search_cache_stats(&self) -> Option<SearchCacheStats>
Get search cache statistics.
Source§impl SyncEngine
impl SyncEngine
Sourcepub fn new(
config: SyncEngineConfig,
config_rx: Receiver<SyncEngineConfig>,
) -> Self
pub fn new( config: SyncEngineConfig, config_rx: Receiver<SyncEngineConfig>, ) -> Self
Create a new sync engine.
The engine starts in Created state. Call start()
to connect to backends and transition to Ready.
Sourcepub fn state(&self) -> EngineState
pub fn state(&self) -> EngineState
Get current engine state.
Sourcepub fn state_receiver(&self) -> Receiver<EngineState>
pub fn state_receiver(&self) -> Receiver<EngineState>
Get a receiver to watch state changes.
Sourcepub fn memory_pressure(&self) -> f64
pub fn memory_pressure(&self) -> f64
Get current memory pressure (0.0 - 1.0+).
Sourcepub fn pressure(&self) -> BackpressureLevel
pub fn pressure(&self) -> BackpressureLevel
Get current backpressure level.
Sourcepub fn should_accept_writes(&self) -> bool
pub fn should_accept_writes(&self) -> bool
Check if the engine should accept writes (based on pressure).
Sourcepub async fn health_check(&self) -> HealthCheck
pub async fn health_check(&self) -> HealthCheck
Perform a comprehensive health check.
This probes backend connectivity (Redis PING, SQL SELECT 1) and
collects internal state into a HealthCheck struct suitable for
/ready and /health endpoints.
§Performance
- Cached fields: Instant (no I/O)
- Live probes: Redis PING + SQL SELECT 1 (parallel, ~1-10ms each)
§Example
let health = engine.health_check().await;
// For /ready endpoint (load balancer)
if health.healthy {
HttpResponse::Ok().body("ready")
} else {
HttpResponse::ServiceUnavailable().body("not ready")
}
// For /health endpoint (diagnostics)
HttpResponse::Ok().json(health)Sourcepub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>
pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>
Get an item by ID.
Checks storage tiers in order: L1 → L2 → L3. Updates access count and promotes to L1 on hit.
Sourcepub async fn get_verified(
&self,
id: &str,
) -> Result<Option<SyncItem>, StorageError>
pub async fn get_verified( &self, id: &str, ) -> Result<Option<SyncItem>, StorageError>
Get an item with hash verification.
If the item has a non-empty content_hash, the content hash is verified.
Returns StorageError::Corruption if the hash doesn’t match.
Sourcepub async fn submit(&self, item: SyncItem) -> Result<(), StorageError>
pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError>
Submit an item for sync.
The item is immediately stored in L1 and queued for batch write to L2/L3. Uses default options: Redis + SQL (both enabled). Filters are updated only on successful writes in flush_batch_internal().
For custom routing, use submit_with.
Sourcepub async fn submit_with(
&self,
item: SyncItem,
options: SubmitOptions,
) -> Result<(), StorageError>
pub async fn submit_with( &self, item: SyncItem, options: SubmitOptions, ) -> Result<(), StorageError>
Submit an item with custom routing options.
The item is immediately stored in L1 and queued for batch write. Items are batched by compatible options for efficient pipelined writes.
§Example
// Cache-only with 1 minute TTL (no SQL write)
let item = SyncItem::new("cache.key".into(), b"data".to_vec());
engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;
// SQL-only durable storage (no Redis)
let item = SyncItem::new("archive.key".into(), b"data".to_vec());
engine.submit_with(item, SubmitOptions::durable()).await?;Sourcepub async fn delete(&self, id: &str) -> Result<bool, StorageError>
pub async fn delete(&self, id: &str) -> Result<bool, StorageError>
Delete an item from all storage tiers.
Deletes are more complex than writes because the item may exist in:
- L1 (DashMap) - immediate removal
- L2 (Redis) - async removal
- L3 (MySQL) - async removal
- Cuckoo filters (L2/L3) - remove from both
- Merkle trees - update with deletion marker
Sourcepub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust)
pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust)
Get L3 filter stats (entries, capacity, trust_state)
Sourcepub fn l3_filter(&self) -> &Arc<FilterManager>
pub fn l3_filter(&self) -> &Arc<FilterManager>
Get access to the L3 filter (for warmup/verification)
Sourcepub async fn merkle_roots(&self) -> (Option<String>, Option<String>)
pub async fn merkle_roots(&self) -> (Option<String>, Option<String>)
Get merkle root hashes from Redis (L2) and SQL (L3).
Returns (redis_root, sql_root) as hex strings.
Returns None for backends that aren’t connected or have empty trees.
Sourcepub async fn verify_filter(&self) -> bool
pub async fn verify_filter(&self) -> bool
Verify and trust the L3 cuckoo filter.
Compares the filter’s merkle root against L3’s merkle root. If they match, marks the filter as trusted.
Returns true if the filter is now trusted, false otherwise.
Sourcepub fn update_gauge_metrics(&self)
pub fn update_gauge_metrics(&self)
Update all gauge metrics with current engine state.
Call this before snapshotting metrics to ensure gauges reflect current state. Useful for OTEL export or monitoring dashboards.
Auto Trait Implementations§
impl !Freeze for SyncEngine
impl !RefUnwindSafe for SyncEngine
impl Send for SyncEngine
impl Sync for SyncEngine
impl Unpin for SyncEngine
impl !UnwindSafe for SyncEngine
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more