SyncEngine

Struct SyncEngine 

Source
pub struct SyncEngine { /* private fields */ }
Expand description

Main sync engine coordinator.

Manages the three-tier storage architecture:

  • L1: In-memory DashMap with pressure-based eviction
  • L2: Redis cache with batch writes
  • L3: MySQL/SQLite archive (ground truth)

§Thread Safety

The engine is Send + Sync and designed for concurrent access. Internal state uses atomic operations and concurrent data structures.

Implementations§

Source§

impl SyncEngine

Source

pub async fn contains(&self, id: &str) -> bool

Check if an item exists across all tiers.

Checks in order: L1 cache → Redis EXISTS → L3 Cuckoo filter → SQL query. If found in SQL and Cuckoo filter was untrusted, updates the filter.

§Returns
  • true → item definitely exists in at least one tier
  • false → item does not exist (authoritative)
§Example
if engine.contains("user.123").await {
    let item = engine.get("user.123").await;
} else {
    println!("Not found");
}
Source

pub fn definitely_missing(&self, id: &str) -> bool

Fast check: is this item definitely NOT in L3?

Uses the Cuckoo filter for a fast authoritative negative.

  • Returns true → item is definitely not in L3 (safe to skip)
  • Returns false → item might exist (need to check L3)

Only meaningful when the L3 filter is trusted. If untrusted, returns false (meaning “we don’t know, you should check”).

§Use Case

Fast early-exit in replication: if definitely missing, apply without checking.

if engine.definitely_missing("patient.123") {
    // Fast path: definitely new, just insert
    println!("New item, inserting directly");
} else {
    // Slow path: might exist, check hash
    if !engine.is_current("patient.123", "abc123...").await {
        println!("Outdated, updating");
    }
}
Source

pub fn might_exist(&self, id: &str) -> bool

Fast check: might this item exist somewhere?

Checks L1 cache (partial, evicts) and Cuckoo filter (probabilistic).

  • Returns true → item is in L1 OR might be in L3 (worth checking)
  • Returns false → item is definitely not in L1 or L3

Note: L1 is partial (items evict), so this can return false even if the item exists in L2/L3. For authoritative check, use contains().

§Use Case

Quick probabilistic check before expensive async lookup.

Source

pub async fn is_current(&self, id: &str, content_hash: &str) -> bool

Check if the item at key has the given content hash.

This is the semantic API for CDC deduplication in replication. Returns true if the item exists AND its content hash matches. Returns false if item doesn’t exist OR hash differs.

§Arguments
  • id - Object ID
  • content_hash - SHA256 hash of content (hex-encoded string)
§Example
// Skip replication if we already have this version
let incoming_hash = "abc123...";
if engine.is_current("patient.123", incoming_hash).await {
    println!("Already up to date, skipping");
    return;
}
Source

pub fn len(&self) -> usize

Get the current count of items in L1 cache.

Source

pub fn is_empty(&self) -> bool

Check if L1 cache is empty.

Source

pub async fn status(&self, id: &str) -> ItemStatus

Get the sync status of an item.

Returns detailed state information about where an item exists and its sync status across tiers.

§Example
match engine.status("order.456").await {
    ItemStatus::Synced { in_l1, in_l2, in_l3 } => {
        println!("Synced: L1={}, L2={}, L3={}", in_l1, in_l2, in_l3);
    }
    ItemStatus::Pending => println!("Queued for sync"),
    ItemStatus::Missing => println!("Not found"),
}
Source

pub async fn get_many(&self, ids: &[&str]) -> Vec<Option<SyncItem>>

Fetch multiple items in parallel.

Returns a vector of Option<SyncItem> in the same order as input IDs. Missing items are represented as None.

§Performance

This method fetches from L1 synchronously, then batches L2/L3 lookups for items not in L1. Much faster than sequential get() calls.

§Example
let ids = vec!["user.1", "user.2", "user.3"];
let items = engine.get_many(&ids).await;
for (id, item) in ids.iter().zip(items.iter()) {
    match item {
        Some(item) => println!("{}: found", id),
        None => println!("{}: missing", id),
    }
}
Source

pub async fn submit_many( &self, items: Vec<SyncItem>, ) -> Result<BatchResult, StorageError>

Submit multiple items for sync atomically.

All items are added to L1 and queued for batch persistence. Returns a BatchResult with success/failure counts.

§Example
let items = vec![
    SyncItem::from_json("user.1".into(), json!({"name": "Alice"})),
    SyncItem::from_json("user.2".into(), json!({"name": "Bob"})),
];
let result = engine.submit_many(items).await.unwrap();
println!("Submitted: {}, Failed: {}", result.succeeded, result.failed);
Source

pub async fn delete_many( &self, ids: &[&str], ) -> Result<BatchResult, StorageError>

Delete multiple items atomically.

Removes items from all tiers (L1, L2, L3) and updates filters. Returns a BatchResult with counts.

§Example
let ids = vec!["user.1", "user.2", "user.3"];
let result = engine.delete_many(&ids).await.unwrap();
println!("Deleted: {}", result.succeeded);
Source

pub async fn get_or_insert_with<F, Fut>( &self, id: &str, factory: F, ) -> Result<SyncItem, StorageError>
where F: FnOnce() -> Fut, Fut: Future<Output = SyncItem>,

Get an item, or compute and insert it if missing.

This is the classic “get or insert” pattern, useful for cache-aside:

  1. Check cache (L1 → L2 → L3)
  2. If missing, call the async factory function
  3. Insert the result and return it

The factory is only called if the item is not found.

§Example
let item = engine.get_or_insert_with("user.123", || async {
    // Expensive operation - only runs if not cached
    SyncItem::from_json("user.123".into(), json!({"name": "Fetched from DB"}))
}).await.unwrap();
Source

pub async fn get_by_state( &self, state: &str, limit: usize, ) -> Result<Vec<SyncItem>, StorageError>

Get items by state from SQL (L3 ground truth).

Uses indexed query for fast retrieval.

§Example
// Get all delta items for CRDT merging
let deltas = engine.get_by_state("delta", 1000).await?;
for item in deltas {
    println!("Delta: {}", item.object_id);
}
Source

pub async fn count_by_state(&self, state: &str) -> Result<u64, StorageError>

Count items in a given state (SQL ground truth).

§Example
let pending_count = engine.count_by_state("pending").await?;
println!("{} items pending", pending_count);
Source

pub async fn list_state_ids( &self, state: &str, limit: usize, ) -> Result<Vec<String>, StorageError>

Get just the IDs of items in a given state (lightweight query).

Returns IDs from SQL. For Redis state SET, use list_state_ids_redis().

Source

pub async fn set_state( &self, id: &str, new_state: &str, ) -> Result<bool, StorageError>

Update the state of an item by ID.

Updates both SQL (ground truth) and Redis state SETs. L1 cache is NOT updated - caller should re-fetch if needed.

Returns true if the item was found and updated.

Source

pub async fn delete_by_state(&self, state: &str) -> Result<u64, StorageError>

Delete all items in a given state from SQL.

Also removes from L1 cache and Redis state SET. Returns the number of deleted items.

§Example
// Clean up all processed deltas
let deleted = engine.delete_by_state("delta").await?;
println!("Deleted {} delta items", deleted);
Source

pub async fn scan_prefix( &self, prefix: &str, limit: usize, ) -> Result<Vec<SyncItem>, StorageError>

Scan items by ID prefix.

Retrieves all items whose ID starts with the given prefix. Queries SQL (ground truth) directly - does NOT check L1 cache.

Useful for CRDT delta-first architecture where deltas are stored as: delta:{object_id}:{op_id} and you need to fetch all deltas for an object.

§Example
// Get base state
let base = engine.get("base:user.123").await?;

// Get all pending deltas for this object
let deltas = engine.scan_prefix("delta:user.123:", 1000).await?;

// Merge on-the-fly for read-repair
for delta in deltas {
    println!("Delta: {} -> {:?}", delta.object_id, delta.content_as_json());
}
Source

pub async fn count_prefix(&self, prefix: &str) -> Result<u64, StorageError>

Count items matching an ID prefix (SQL ground truth).

Source

pub async fn delete_prefix(&self, prefix: &str) -> Result<u64, StorageError>

Delete all items matching an ID prefix.

Removes from L1 cache, SQL, and Redis. Returns the number of deleted items.

§Example
// After merging deltas into base, clean them up
let deleted = engine.delete_prefix("delta:user.123:").await?;
println!("Cleaned up {} deltas", deleted);
Source§

impl SyncEngine

Source

pub async fn start(&mut self) -> Result<(), StorageError>

Start the engine - connect to backends with proper startup sequence.

Startup flow (trust hierarchy):

  1. Initialize WAL (SQLite) - always first, our durability lifeline
  2. Connect to SQL (L3) - ground truth, initialize SQL merkle store
  3. Drain any pending WAL entries to SQL (blocking)
  4. Get SQL merkle root (this is our trusted root)
  5. Load CF snapshots from SQLite:
    • If snapshot merkle root matches SQL root → CF is trusted
    • Otherwise → CF must be rebuilt from SQL scan
  6. Connect to Redis (L2) - cache layer
  7. Compare Redis merkle root with SQL merkle root
    • If match → Redis is synced
    • If mismatch → use branch diff to find stale regions and resync
  8. Ready!
Source

pub async fn warm_up(&mut self) -> Result<(), StorageError>

Warm up L1 cache from L2/L3 (call after start)

Source

pub async fn tick(&self)

Perform one tick of maintenance (for manual control instead of run loop).

Source

pub async fn force_flush(&self)

Force flush all pending L2 batches immediately.

Source

pub async fn run(&self)

Run the main event loop.

This method takes &self (not &mut self) so the engine can be shared via Arc while the run loop executes in a background task.

§Example
let engine = Arc::new(engine);
let engine_clone = engine.clone();
tokio::spawn(async move {
    engine_clone.run().await;
});
Source

pub async fn shutdown(&self)

Initiate graceful shutdown

Source§

impl SyncEngine

Source

pub async fn get_merkle_root(&self) -> Result<Option<[u8; 32]>, StorageError>

Get the current merkle root hash.

Returns the root hash from SQL (ground truth), or from Redis if they match. This is the starting point for sync verification.

§Returns
  • Some([u8; 32]) - The root hash
  • None - No data has been written yet (empty tree)
Source

pub async fn get_merkle_hash( &self, path: &str, ) -> Result<Option<[u8; 32]>, StorageError>

Get the hash for a specific merkle path.

Serves from Redis if redis_root == sql_root (fast path), otherwise falls back to SQL (slow path).

§Arguments
  • path - The merkle path (e.g., “uk.nhs.patient”)
§Returns
  • Some([u8; 32]) - The hash at this path
  • None - Path doesn’t exist in the tree
Source

pub async fn get_merkle_node( &self, path: &str, ) -> Result<Option<MerkleNode>, StorageError>

Get a full merkle node (hash + children).

Use this for tree traversal during sync.

§Arguments
  • path - The merkle path (use “” for root)
Source

pub async fn get_merkle_children( &self, path: &str, ) -> Result<BTreeMap<String, [u8; 32]>, StorageError>

Get children of a merkle path.

Returns a map of segment -> hash for all direct children. Use this to traverse the tree level by level.

§Arguments
  • path - Parent path (use “” for top-level children)
§Example
path="" → {"uk": 0xABC, "us": 0xDEF}
path="uk" → {"nhs": 0x123, "private": 0x456}
Source

pub async fn find_divergent_paths( &self, remote_nodes: &[(String, [u8; 32])], ) -> Result<MerkleDiff, StorageError>

Find paths where local and remote merkle trees diverge.

This is the main sync algorithm - given remote node’s hashes, find the minimal set of paths that need to be synced.

§Arguments
  • remote_nodes - Pairs of (path, hash) from the remote node
§Returns

Paths where hashes differ (need to fetch/push data)

§Algorithm

For each remote (path, hash) pair:

  1. Get local hash for that path
  2. If hashes match → subtree is synced, skip
  3. If hashes differ → add to divergent list
  4. If local missing → add to remote_only
  5. If remote missing → add to local_only
Source

pub async fn is_merkle_synced(&self) -> Result<bool, StorageError>

Check if Redis merkle tree is in sync with SQL ground truth.

This is used to determine whether we can serve merkle queries from Redis (fast) or need to fall back to SQL (slow but authoritative).

Source

pub async fn drill_down_divergence<F, Fut>( &self, start_path: &str, remote_children: F, ) -> Result<Vec<String>, StorageError>
where F: Fn(String) -> Fut, Fut: Future<Output = Result<BTreeMap<String, [u8; 32]>, StorageError>>,

Drill down the merkle tree to find all divergent leaf paths.

Starting from a known-divergent path, recursively descend until we find the actual leaves that need syncing.

§Arguments
  • start_path - A path known to have divergent hashes
  • remote_children - Function to get children from remote node
§Returns

List of leaf paths that need to be synced

Source§

impl SyncEngine

Source

pub async fn create_search_index( &self, index: SearchIndex, ) -> Result<(), StorageError>

Register a search index.

Creates the index in RediSearch using FT.CREATE. The index will automatically index all JSON documents with matching key prefix.

§Example
// Define index schema
let index = SearchIndex::new("users", "crdt:users:")
    .text_sortable("name")
    .text("email")
    .numeric_sortable("age")
    .tag("roles");

// Create in RediSearch
engine.create_search_index(index).await?;
Source

pub async fn drop_search_index(&self, name: &str) -> Result<(), StorageError>

Drop a search index.

Removes the index from RediSearch. Does not delete the indexed documents.

Source

pub async fn search( &self, index_name: &str, query: &Query, ) -> Result<SearchResult, StorageError>

Search for items using RediSearch query syntax.

Searches the specified index using FT.SEARCH. For durable data (crdt: prefix), falls back to SQL if Redis returns no results.

§Arguments
  • index_name - Name of the search index (without “idx:” prefix)
  • query - Query AST built with Query:: constructors
§Example
// Simple field query
let results = engine.search("users", &Query::field_eq("name", "Alice")).await?;

// Complex query with AND/OR
let query = Query::field_eq("status", "active")
    .and(Query::numeric_range("age", Some(25.0), Some(40.0)));
let results = engine.search("users", &query).await?;

for item in results.items {
    println!("Found: {}", item.object_id);
}
Source

pub async fn search_with_options( &self, index_name: &str, query: &Query, tier: SearchTier, limit: usize, ) -> Result<SearchResult, StorageError>

Search with explicit tier and limit options.

Source

pub async fn search_raw( &self, index_name: &str, query_str: &str, limit: usize, ) -> Result<Vec<SyncItem>, StorageError>

Search using raw RediSearch query string (Redis-only, no SQL fallback).

Use this when you need the full power of RediSearch syntax without the Query AST. This is an advanced API with caveats:

  • No SQL fallback: If Redis is unavailable, this will fail
  • No search cache: Results are not cached via the merkle system
  • Manual paths: You must use $.payload.{field} paths in your query
  • No translation: The query string is passed directly to FT.SEARCH

Prefer search() or search_with_options() for most use cases.

§Example
// Raw RediSearch query with explicit payload paths
let results = engine.search_raw(
    "users",
    "@name:(Alice Smith) @age:[25 35]",
    100
).await?;
Source

pub async fn search_sql( &self, query: &Query, limit: usize, ) -> Result<Vec<SyncItem>, StorageError>

Direct SQL search (bypasses Redis, SQL-only).

Queries the SQL archive directly using JSON_EXTRACT. This is an advanced API with specific use cases:

  • No Redis: Bypasses L2 cache entirely
  • No caching: Results are not cached via the merkle system
  • Ground truth: Queries the durable SQL archive

Useful for:

  • Analytics queries that need complete data
  • When Redis is unavailable or not trusted
  • Bulk operations on archived data

Prefer search() or search_with_options() for most use cases.

Source

pub fn search_cache_stats(&self) -> Option<SearchCacheStats>

Get search cache statistics.

Source§

impl SyncEngine

Source

pub fn new( config: SyncEngineConfig, config_rx: Receiver<SyncEngineConfig>, ) -> Self

Create a new sync engine.

The engine starts in Created state. Call start() to connect to backends and transition to Ready.

Source

pub fn state(&self) -> EngineState

Get current engine state.

Source

pub fn state_receiver(&self) -> Receiver<EngineState>

Get a receiver to watch state changes.

Source

pub fn is_ready(&self) -> bool

Check if engine is ready to accept requests.

Source

pub fn memory_pressure(&self) -> f64

Get current memory pressure (0.0 - 1.0+).

Source

pub fn pressure(&self) -> BackpressureLevel

Get current backpressure level.

Source

pub fn should_accept_writes(&self) -> bool

Check if the engine should accept writes (based on pressure).

Source

pub async fn health_check(&self) -> HealthCheck

Perform a comprehensive health check.

This probes backend connectivity (Redis PING, SQL SELECT 1) and collects internal state into a HealthCheck struct suitable for /ready and /health endpoints.

§Performance
  • Cached fields: Instant (no I/O)
  • Live probes: Redis PING + SQL SELECT 1 (parallel, ~1-10ms each)
§Example
let health = engine.health_check().await;

// For /ready endpoint (load balancer)
if health.healthy {
    HttpResponse::Ok().body("ready")
} else {
    HttpResponse::ServiceUnavailable().body("not ready")
}

// For /health endpoint (diagnostics)
HttpResponse::Ok().json(health)
Source

pub async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>

Get an item by ID.

Checks storage tiers in order: L1 → L2 → L3. Updates access count and promotes to L1 on hit.

Source

pub async fn get_verified( &self, id: &str, ) -> Result<Option<SyncItem>, StorageError>

Get an item with hash verification.

If the item has a non-empty content_hash, the content hash is verified. Returns StorageError::Corruption if the hash doesn’t match.

Source

pub async fn submit(&self, item: SyncItem) -> Result<(), StorageError>

Submit an item for sync.

The item is immediately stored in L1 and queued for batch write to L2/L3. Uses default options: Redis + SQL (both enabled). Filters are updated only on successful writes in flush_batch_internal().

For custom routing, use submit_with.

Source

pub async fn submit_with( &self, item: SyncItem, options: SubmitOptions, ) -> Result<(), StorageError>

Submit an item with custom routing options.

The item is immediately stored in L1 and queued for batch write. Items are batched by compatible options for efficient pipelined writes.

§Example
// Cache-only with 1 minute TTL (no SQL write)
let item = SyncItem::new("cache.key".into(), b"data".to_vec());
engine.submit_with(item, SubmitOptions::cache(CacheTtl::Minute)).await?;

// SQL-only durable storage (no Redis)
let item = SyncItem::new("archive.key".into(), b"data".to_vec());
engine.submit_with(item, SubmitOptions::durable()).await?;
Source

pub async fn delete(&self, id: &str) -> Result<bool, StorageError>

Delete an item from all storage tiers.

Deletes are more complex than writes because the item may exist in:

  • L1 (DashMap) - immediate removal
  • L2 (Redis) - async removal
  • L3 (MySQL) - async removal
  • Cuckoo filters (L2/L3) - remove from both
  • Merkle trees - update with deletion marker
Source

pub fn l1_stats(&self) -> (usize, usize)

Get L1 cache stats

Source

pub fn l3_filter_stats(&self) -> (usize, usize, FilterTrust)

Get L3 filter stats (entries, capacity, trust_state)

Source

pub fn l3_filter(&self) -> &Arc<FilterManager>

Get access to the L3 filter (for warmup/verification)

Source

pub async fn merkle_roots(&self) -> (Option<String>, Option<String>)

Get merkle root hashes from Redis (L2) and SQL (L3).

Returns (redis_root, sql_root) as hex strings. Returns None for backends that aren’t connected or have empty trees.

Source

pub async fn verify_filter(&self) -> bool

Verify and trust the L3 cuckoo filter.

Compares the filter’s merkle root against L3’s merkle root. If they match, marks the filter as trusted.

Returns true if the filter is now trusted, false otherwise.

Source

pub fn update_gauge_metrics(&self)

Update all gauge metrics with current engine state.

Call this before snapshotting metrics to ensure gauges reflect current state. Useful for OTEL export or monitoring dashboards.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more