Expand description
§Sync Engine
A high-performance, tiered storage engine for distributed data synchronization.
§Philosophy: Dumb Byte Router
sync-engine stores Vec<u8> and routes to L1/L2/L3 based on caller options.
Compression, serialization, and data interpretation are the caller’s responsibility.
§Architecture
┌─────────────────────────────────────────────────────────────┐
│ Ingest Layer │
│ • Accepts SyncItems via submit() / submit_with() │
│ • Backpressure control based on memory usage │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ L1: In-Memory Cache │
│ • Moka cache for concurrent access │
│ • Tan-curve eviction under memory pressure │
└─────────────────────────────────────────────────────────────┘
│
(Batch flush via HybridBatcher)
▼
┌─────────────────────────────────────────────────────────────┐
│ L2: Redis Cache │
│ • Pipelined batch writes for throughput │
│ • Optional TTL per-item (via SubmitOptions) │
│ • EXISTS command for fast existence checks │
└─────────────────────────────────────────────────────────────┘
│
(Batch persist to ground truth)
▼
┌─────────────────────────────────────────────────────────────┐
│ L3: MySQL/SQLite Archive │
│ • Ground truth storage (BLOB column) │
│ • Cuckoo filter for fast existence checks │
│ • WAL fallback during outages │
└─────────────────────────────────────────────────────────────┘§Quick Start
use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem};
use serde_json::json;
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let config = SyncEngineConfig {
redis_url: Some("redis://localhost:6379".into()),
sql_url: Some("mysql://user:pass@localhost/db".into()),
..Default::default()
};
let (_tx, rx) = watch::channel(config.clone());
let mut engine = SyncEngine::new(config, rx);
// Start the engine (connects to backends)
engine.start().await.expect("Failed to start");
// Submit items for sync
let item = SyncItem::from_json(
"uk.nhs.patient.record.12345".into(),
json!({"name": "John Doe", "nhs_number": "1234567890"})
);
engine.submit(item).await.expect("Failed to submit");
// Retrieve items (L1 → L2 → L3 fallback)
if let Some(item) = engine.get("uk.nhs.patient.record.12345").await.unwrap() {
println!("Found: {:?}", item.content_as_json());
}
engine.shutdown().await;
}§Features
- Tiered Caching: L1 (memory) → L2 (Redis) → L3 (SQL) with automatic fallback
- Binary Storage: Store raw
Vec<u8>- caller handles serialization/compression - Flexible Routing:
SubmitOptionscontrols which tiers receive data - TTL Support: Per-item TTL for Redis cache entries
- Batch Writes: Configurable flush by count, size, or time
- Cuckoo Filters: Skip SQL queries when data definitely doesn’t exist
- WAL Durability: Local SQLite WAL during MySQL outages
- Backpressure: Graceful degradation under memory pressure
- Circuit Breakers: Prevent cascade failures to backends
- Retry Logic: Configurable retry policies for transient failures
§Configuration
See SyncEngineConfig for all configuration options.
§Modules
config: Engine configuration (SyncEngineConfig)coordinator: The mainSyncEngineorchestrating all componentssync_item: Core data structure (SyncItem,ContentType)submit_options: Per-item routing options (SubmitOptions,CacheTtl)storage: Storage backends (Redis, SQL, Memory)batching: Hybrid batcher for efficient writescuckoo: Probabilistic existence filtersmerkle: Merkle tree for sync verificationresilience: Circuit breakers, retry logic, WALeviction: Tan-curve eviction policybackpressure: Memory pressure handlingsearch: RediSearch and SQL query translationcdc: Change Data Capture stream outputmetrics: OTEL-compatible metrics instrumentation
Re-exports§
pub use config::SyncEngineConfig;pub use coordinator::SyncEngine;pub use coordinator::EngineState;pub use coordinator::ItemStatus;pub use coordinator::BatchResult;pub use coordinator::MerkleDiff;pub use coordinator::HealthCheck;pub use backpressure::BackpressureLevel;pub use sync_item::SyncItem;pub use sync_item::ContentType;pub use submit_options::CacheTtl;pub use submit_options::OptionsKey;pub use submit_options::SubmitOptions;pub use storage::traits::CacheStore;pub use storage::traits::ArchiveStore;pub use storage::traits::StorageError;pub use cuckoo::filter_manager::FilterManager;pub use cuckoo::filter_manager::FilterTrust;pub use batching::hybrid_batcher::HybridBatcher;pub use batching::hybrid_batcher::BatchConfig;pub use batching::hybrid_batcher::FlushReason;pub use batching::hybrid_batcher::Batch;pub use batching::hybrid_batcher::FlushBatch;pub use batching::hybrid_batcher::SizedItem;pub use batching::hybrid_batcher::BatchableItem;pub use merkle::PathMerkle;pub use merkle::MerkleBatch;pub use merkle::MerkleNode;pub use merkle::RedisMerkleStore;pub use resilience::wal::WriteAheadLog;pub use resilience::wal::MysqlHealthChecker;pub use resilience::wal::WalStats;pub use resilience::circuit_breaker::CircuitBreaker;pub use resilience::circuit_breaker::CircuitConfig;pub use resilience::circuit_breaker::CircuitError;pub use resilience::circuit_breaker::BackendCircuits;pub use resilience::retry::RetryConfig;pub use metrics::LatencyTimer;pub use cdc::CdcEntry;pub use cdc::CdcOp;pub use cdc::CdcMeta;pub use cdc::CdcFieldValue;pub use cdc::maybe_compress;pub use cdc::maybe_decompress;pub use cdc::is_zstd_compressed;pub use cdc::cdc_stream_key;pub use cdc::CDC_STREAM_SUFFIX;
Modules§
- backpressure
- Backpressure handling for graceful degradation under load.
- batching
- cdc
- Change Data Capture (CDC) Stream support.
- config
- Configuration for the sync engine.
- coordinator
- Sync engine coordinator.
- cuckoo
- eviction
- Eviction policies for tiered cache management.
- merkle
- Path-based Merkle tree for efficient sync verification.
- metrics
- Metrics instrumentation for sync-engine.
- resilience
- search
- Search Infrastructure
- storage
- submit_
options - Submit options for caller-controlled storage routing.
- sync_
item - Sync item data structure.
Macros§
- time_
operation - Convenience macro for timing operations