Crate sync_engine

Crate sync_engine 

Source
Expand description

§Sync Engine

A high-performance, tiered storage engine for distributed data synchronization.

§Philosophy: Dumb Byte Router

sync-engine stores Vec<u8> and routes to L1/L2/L3 based on caller options. Compression, serialization, and data interpretation are the caller’s responsibility.

§Architecture

┌─────────────────────────────────────────────────────────────┐
│                        Ingest Layer                         │
│  • Accepts SyncItems via submit() / submit_with()          │
│  • Backpressure control based on memory usage              │
└─────────────────────────────────────────────────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────┐
│                    L1: In-Memory Cache                      │
│  • Moka cache for concurrent access                        │
│  • Tan-curve eviction under memory pressure                │
└─────────────────────────────────────────────────────────────┘
                             │
                   (Batch flush via HybridBatcher)
                             ▼
┌─────────────────────────────────────────────────────────────┐
│                     L2: Redis Cache                         │
│  • Pipelined batch writes for throughput                   │
│  • Optional TTL per-item (via SubmitOptions)               │
│  • EXISTS command for fast existence checks                │
└─────────────────────────────────────────────────────────────┘
                             │
                   (Batch persist to ground truth)
                             ▼
┌─────────────────────────────────────────────────────────────┐
│                   L3: MySQL/SQLite Archive                  │
│  • Ground truth storage (BLOB column)                      │
│  • Cuckoo filter for fast existence checks                 │
│  • WAL fallback during outages                             │
└─────────────────────────────────────────────────────────────┘

§Quick Start

use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem};
use serde_json::json;
use tokio::sync::watch;

#[tokio::main]
async fn main() {
    let config = SyncEngineConfig {
        redis_url: Some("redis://localhost:6379".into()),
        sql_url: Some("mysql://user:pass@localhost/db".into()),
        ..Default::default()
    };

    let (_tx, rx) = watch::channel(config.clone());
    let mut engine = SyncEngine::new(config, rx);
     
    // Start the engine (connects to backends)
    engine.start().await.expect("Failed to start");

    // Submit items for sync
    let item = SyncItem::from_json(
        "uk.nhs.patient.record.12345".into(),
        json!({"name": "John Doe", "nhs_number": "1234567890"})
    );
    engine.submit(item).await.expect("Failed to submit");

    // Retrieve items (L1 → L2 → L3 fallback)
    if let Some(item) = engine.get("uk.nhs.patient.record.12345").await.unwrap() {
        println!("Found: {:?}", item.content_as_json());
    }

    engine.shutdown().await;
}

§Features

  • Tiered Caching: L1 (memory) → L2 (Redis) → L3 (SQL) with automatic fallback
  • Binary Storage: Store raw Vec<u8> - caller handles serialization/compression
  • Flexible Routing: SubmitOptions controls which tiers receive data
  • TTL Support: Per-item TTL for Redis cache entries
  • Batch Writes: Configurable flush by count, size, or time
  • Cuckoo Filters: Skip SQL queries when data definitely doesn’t exist
  • WAL Durability: Local SQLite WAL during MySQL outages
  • Backpressure: Graceful degradation under memory pressure
  • Circuit Breakers: Prevent cascade failures to backends
  • Retry Logic: Configurable retry policies for transient failures

§Configuration

See SyncEngineConfig for all configuration options.

§Modules

Re-exports§

pub use config::SyncEngineConfig;
pub use coordinator::SyncEngine;
pub use coordinator::EngineState;
pub use coordinator::ItemStatus;
pub use coordinator::BatchResult;
pub use coordinator::MerkleDiff;
pub use coordinator::HealthCheck;
pub use backpressure::BackpressureLevel;
pub use sync_item::SyncItem;
pub use sync_item::ContentType;
pub use submit_options::CacheTtl;
pub use submit_options::OptionsKey;
pub use submit_options::SubmitOptions;
pub use storage::traits::CacheStore;
pub use storage::traits::ArchiveStore;
pub use storage::traits::StorageError;
pub use cuckoo::filter_manager::FilterManager;
pub use cuckoo::filter_manager::FilterTrust;
pub use batching::hybrid_batcher::HybridBatcher;
pub use batching::hybrid_batcher::BatchConfig;
pub use batching::hybrid_batcher::FlushReason;
pub use batching::hybrid_batcher::Batch;
pub use batching::hybrid_batcher::FlushBatch;
pub use batching::hybrid_batcher::SizedItem;
pub use batching::hybrid_batcher::BatchableItem;
pub use merkle::PathMerkle;
pub use merkle::MerkleBatch;
pub use merkle::MerkleNode;
pub use merkle::RedisMerkleStore;
pub use resilience::wal::WriteAheadLog;
pub use resilience::wal::MysqlHealthChecker;
pub use resilience::wal::WalStats;
pub use resilience::circuit_breaker::CircuitBreaker;
pub use resilience::circuit_breaker::CircuitConfig;
pub use resilience::circuit_breaker::CircuitError;
pub use resilience::circuit_breaker::BackendCircuits;
pub use resilience::retry::RetryConfig;
pub use metrics::LatencyTimer;
pub use cdc::CdcEntry;
pub use cdc::CdcOp;
pub use cdc::CdcMeta;
pub use cdc::CdcFieldValue;
pub use cdc::maybe_compress;
pub use cdc::maybe_decompress;
pub use cdc::is_zstd_compressed;
pub use cdc::cdc_stream_key;
pub use cdc::CDC_STREAM_SUFFIX;

Modules§

backpressure
Backpressure handling for graceful degradation under load.
batching
cdc
Change Data Capture (CDC) Stream support.
config
Configuration for the sync engine.
coordinator
Sync engine coordinator.
cuckoo
eviction
Eviction policies for tiered cache management.
merkle
Path-based Merkle tree for efficient sync verification.
metrics
Metrics instrumentation for sync-engine.
resilience
search
Search Infrastructure
storage
submit_options
Submit options for caller-controlled storage routing.
sync_item
Sync item data structure.

Macros§

time_operation
Convenience macro for timing operations