sync_engine/
lib.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! # Sync Engine
5//!
6//! A high-performance, tiered storage engine for distributed data synchronization.
7//!
8//! ## Philosophy: Dumb Byte Router
9//!
10//! sync-engine stores `Vec<u8>` and routes to L1/L2/L3 based on caller options.
11//! Compression, serialization, and data interpretation are the caller's responsibility.
12//!
13//! ## Architecture
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────┐
17//! │                        Ingest Layer                         │
18//! │  • Accepts SyncItems via submit() / submit_with()          │
19//! │  • Backpressure control based on memory usage              │
20//! └─────────────────────────────────────────────────────────────┘
21//!                              │
22//!                              ▼
23//! ┌─────────────────────────────────────────────────────────────┐
24//! │                    L1: In-Memory Cache                      │
25//! │  • Moka cache for concurrent access                        │
26//! │  • Tan-curve eviction under memory pressure                │
27//! └─────────────────────────────────────────────────────────────┘
28//!                              │
29//!                    (Batch flush via HybridBatcher)
30//!                              ▼
31//! ┌─────────────────────────────────────────────────────────────┐
32//! │                     L2: Redis Cache                         │
33//! │  • Pipelined batch writes for throughput                   │
34//! │  • Optional TTL per-item (via SubmitOptions)               │
35//! │  • EXISTS command for fast existence checks                │
36//! └─────────────────────────────────────────────────────────────┘
37//!                              │
38//!                    (Batch persist to ground truth)
39//!                              ▼
40//! ┌─────────────────────────────────────────────────────────────┐
41//! │                   L3: MySQL/SQLite Archive                  │
42//! │  • Ground truth storage (BLOB column)                      │
43//! │  • Cuckoo filter for fast existence checks                 │
44//! │  • WAL fallback during outages                             │
45//! └─────────────────────────────────────────────────────────────┘
46//! ```
47//!
48//! ## Quick Start
49//!
50//! ```rust,no_run
51//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem};
52//! use serde_json::json;
53//! use tokio::sync::watch;
54//!
55//! #[tokio::main]
56//! async fn main() {
57//!     let config = SyncEngineConfig {
58//!         redis_url: Some("redis://localhost:6379".into()),
59//!         sql_url: Some("mysql://user:pass@localhost/db".into()),
60//!         ..Default::default()
61//!     };
62//!
63//!     let (_tx, rx) = watch::channel(config.clone());
64//!     let mut engine = SyncEngine::new(config, rx);
65//!     
66//!     // Start the engine (connects to backends)
67//!     engine.start().await.expect("Failed to start");
68//!
69//!     // Submit items for sync
70//!     let item = SyncItem::from_json(
71//!         "uk.nhs.patient.record.12345".into(),
72//!         json!({"name": "John Doe", "nhs_number": "1234567890"})
73//!     );
74//!     engine.submit(item).await.expect("Failed to submit");
75//!
76//!     // Retrieve items (L1 → L2 → L3 fallback)
77//!     if let Some(item) = engine.get("uk.nhs.patient.record.12345").await.unwrap() {
78//!         println!("Found: {:?}", item.content_as_json());
79//!     }
80//!
81//!     engine.shutdown().await;
82//! }
83//! ```
84//!
85//! ## Features
86//!
87//! - **Tiered Caching**: L1 (memory) → L2 (Redis) → L3 (SQL) with automatic fallback
88//! - **Binary Storage**: Store raw `Vec<u8>` - caller handles serialization/compression
89//! - **Flexible Routing**: `SubmitOptions` controls which tiers receive data
90//! - **TTL Support**: Per-item TTL for Redis cache entries
91//! - **Batch Writes**: Configurable flush by count, size, or time
92//! - **Cuckoo Filters**: Skip SQL queries when data definitely doesn't exist
93//! - **WAL Durability**: Local SQLite WAL during MySQL outages
94//! - **Backpressure**: Graceful degradation under memory pressure
95//! - **Circuit Breakers**: Prevent cascade failures to backends
96//! - **Retry Logic**: Configurable retry policies for transient failures
97//!
98//! ## Configuration
99//!
100//! See [`SyncEngineConfig`] for all configuration options.
101//!
102//! ## Modules
103//!
104//! - [`coordinator`]: The main [`SyncEngine`] orchestrating all components
105//! - [`storage`]: Storage backends (Redis, SQL, Memory)
106//! - [`batching`]: Hybrid batcher for efficient writes
107//! - [`cuckoo`]: Probabilistic existence filters
108//! - [`merkle`]: Merkle tree for sync verification
109//! - [`resilience`]: Circuit breakers, retry logic, WAL
110//! - [`eviction`]: Tan-curve eviction policy
111//! - [`backpressure`]: Memory pressure handling
112
113pub mod config;
114pub mod sync_item;
115pub mod submit_options;
116pub mod storage;
117pub mod batching;
118pub mod resilience;
119pub mod eviction;
120pub mod cuckoo;
121pub mod merkle;
122pub mod backpressure;
123pub mod coordinator;
124pub mod metrics;
125pub mod search;
126pub mod cdc;
127
128// Note: We don't expose a `tracing` module to avoid conflict with the tracing crate
129
130pub use config::SyncEngineConfig;
131pub use coordinator::{SyncEngine, EngineState, ItemStatus, BatchResult, MerkleDiff};
132pub use backpressure::BackpressureLevel;
133pub use sync_item::{SyncItem, ContentType};
134pub use submit_options::{CacheTtl, OptionsKey, SubmitOptions};
135pub use storage::traits::{CacheStore, ArchiveStore, StorageError};
136pub use cuckoo::filter_manager::{FilterManager, FilterTrust};
137pub use batching::hybrid_batcher::{HybridBatcher, BatchConfig, FlushReason, Batch, FlushBatch, SizedItem, BatchableItem};
138pub use merkle::{PathMerkle, MerkleBatch, MerkleNode, RedisMerkleStore};
139pub use resilience::wal::{WriteAheadLog, MysqlHealthChecker, WalStats};
140pub use resilience::circuit_breaker::{CircuitBreaker, CircuitConfig, CircuitError, BackendCircuits};
141pub use resilience::retry::RetryConfig;
142pub use metrics::LatencyTimer;
143pub use cdc::{CdcEntry, CdcOp, CdcMeta, CdcFieldValue, maybe_compress, maybe_decompress, is_zstd_compressed, cdc_stream_key, CDC_STREAM_SUFFIX};