sync_engine/
lib.rs

1//! # Sync Engine
2//!
3//! A high-performance, tiered storage engine for distributed data synchronization.
4//!
5//! ## Philosophy: Dumb Byte Router
6//!
7//! sync-engine stores `Vec<u8>` and routes to L1/L2/L3 based on caller options.
8//! Compression, serialization, and data interpretation are the caller's responsibility.
9//!
10//! ## Architecture
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────────┐
14//! │                        Ingest Layer                         │
15//! │  • Accepts SyncItems via submit() / submit_with()          │
16//! │  • Backpressure control based on memory usage              │
17//! └─────────────────────────────────────────────────────────────┘
18//!                              │
19//!                              ▼
20//! ┌─────────────────────────────────────────────────────────────┐
21//! │                    L1: In-Memory Cache                      │
22//! │  • Moka cache for concurrent access                        │
23//! │  • Tan-curve eviction under memory pressure                │
24//! └─────────────────────────────────────────────────────────────┘
25//!                              │
26//!                    (Batch flush via HybridBatcher)
27//!                              ▼
28//! ┌─────────────────────────────────────────────────────────────┐
29//! │                     L2: Redis Cache                         │
30//! │  • Pipelined batch writes for throughput                   │
31//! │  • Optional TTL per-item (via SubmitOptions)               │
32//! │  • EXISTS command for fast existence checks                │
33//! └─────────────────────────────────────────────────────────────┘
34//!                              │
35//!                    (Batch persist to ground truth)
36//!                              ▼
37//! ┌─────────────────────────────────────────────────────────────┐
38//! │                   L3: MySQL/SQLite Archive                  │
39//! │  • Ground truth storage (BLOB column)                      │
40//! │  • Cuckoo filter for fast existence checks                 │
41//! │  • WAL fallback during outages                             │
42//! └─────────────────────────────────────────────────────────────┘
43//! ```
44//!
45//! ## Quick Start
46//!
47//! ```rust,no_run
48//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem};
49//! use serde_json::json;
50//! use tokio::sync::watch;
51//!
52//! #[tokio::main]
53//! async fn main() {
54//!     let config = SyncEngineConfig {
55//!         redis_url: Some("redis://localhost:6379".into()),
56//!         sql_url: Some("mysql://user:pass@localhost/db".into()),
57//!         ..Default::default()
58//!     };
59//!
60//!     let (_tx, rx) = watch::channel(config.clone());
61//!     let mut engine = SyncEngine::new(config, rx);
62//!     
63//!     // Start the engine (connects to backends)
64//!     engine.start().await.expect("Failed to start");
65//!
66//!     // Submit items for sync
67//!     let item = SyncItem::from_json(
68//!         "uk.nhs.patient.record.12345".into(),
69//!         json!({"name": "John Doe", "nhs_number": "1234567890"})
70//!     );
71//!     engine.submit(item).await.expect("Failed to submit");
72//!
73//!     // Retrieve items (L1 → L2 → L3 fallback)
74//!     if let Some(item) = engine.get("uk.nhs.patient.record.12345").await.unwrap() {
75//!         println!("Found: {:?}", item.content_as_json());
76//!     }
77//!
78//!     engine.shutdown().await;
79//! }
80//! ```
81//!
82//! ## Features
83//!
84//! - **Tiered Caching**: L1 (memory) → L2 (Redis) → L3 (SQL) with automatic fallback
85//! - **Binary Storage**: Store raw `Vec<u8>` - caller handles serialization/compression
86//! - **Flexible Routing**: `SubmitOptions` controls which tiers receive data
87//! - **TTL Support**: Per-item TTL for Redis cache entries
88//! - **Batch Writes**: Configurable flush by count, size, or time
89//! - **Cuckoo Filters**: Skip SQL queries when data definitely doesn't exist
90//! - **WAL Durability**: Local SQLite WAL during MySQL outages
91//! - **Backpressure**: Graceful degradation under memory pressure
92//! - **Circuit Breakers**: Prevent cascade failures to backends
93//! - **Retry Logic**: Configurable retry policies for transient failures
94//!
95//! ## Configuration
96//!
97//! See [`SyncEngineConfig`] for all configuration options.
98//!
99//! ## Modules
100//!
101//! - [`coordinator`]: The main [`SyncEngine`] orchestrating all components
102//! - [`storage`]: Storage backends (Redis, SQL, Memory)
103//! - [`batching`]: Hybrid batcher for efficient writes
104//! - [`cuckoo`]: Probabilistic existence filters
105//! - [`merkle`]: Merkle tree for sync verification
106//! - [`resilience`]: Circuit breakers, retry logic, WAL
107//! - [`eviction`]: Tan-curve eviction policy
108//! - [`backpressure`]: Memory pressure handling
109
110pub mod config;
111pub mod sync_item;
112pub mod submit_options;
113pub mod storage;
114pub mod batching;
115pub mod resilience;
116pub mod eviction;
117pub mod cuckoo;
118pub mod merkle;
119pub mod backpressure;
120pub mod coordinator;
121pub mod metrics;
122pub mod search;
123pub mod cdc;
124
125// Note: We don't expose a `tracing` module to avoid conflict with the tracing crate
126
127pub use config::SyncEngineConfig;
128pub use coordinator::{SyncEngine, EngineState, ItemStatus, BatchResult, MerkleDiff};
129pub use backpressure::BackpressureLevel;
130pub use sync_item::{SyncItem, ContentType};
131pub use submit_options::{CacheTtl, OptionsKey, SubmitOptions};
132pub use storage::traits::{CacheStore, ArchiveStore, StorageError};
133pub use cuckoo::filter_manager::{FilterManager, FilterTrust};
134pub use batching::hybrid_batcher::{HybridBatcher, BatchConfig, FlushReason, Batch, FlushBatch, SizedItem, BatchableItem};
135pub use merkle::{PathMerkle, MerkleBatch, MerkleNode, RedisMerkleStore};
136pub use resilience::wal::{WriteAheadLog, MysqlHealthChecker, WalStats};
137pub use resilience::circuit_breaker::{CircuitBreaker, CircuitConfig, CircuitError, BackendCircuits};
138pub use resilience::retry::RetryConfig;
139pub use metrics::LatencyTimer;
140pub use cdc::{CdcEntry, CdcOp, CdcMeta, CdcFieldValue, maybe_compress, maybe_decompress, is_zstd_compressed, cdc_stream_key, CDC_STREAM_SUFFIX};