sync_engine/
lib.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! # Sync Engine
5//!
6//! A high-performance, tiered storage engine for distributed data synchronization.
7//!
8//! ## Philosophy: Dumb Byte Router
9//!
10//! sync-engine stores `Vec<u8>` and routes to L1/L2/L3 based on caller options.
11//! Compression, serialization, and data interpretation are the caller's responsibility.
12//!
13//! ## Architecture
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────┐
17//! │                        Ingest Layer                         │
18//! │  • Accepts SyncItems via submit() / submit_with()          │
19//! │  • Backpressure control based on memory usage              │
20//! └─────────────────────────────────────────────────────────────┘
21//!                              │
22//!                              ▼
23//! ┌─────────────────────────────────────────────────────────────┐
24//! │                    L1: In-Memory Cache                      │
25//! │  • Moka cache for concurrent access                        │
26//! │  • Tan-curve eviction under memory pressure                │
27//! └─────────────────────────────────────────────────────────────┘
28//!                              │
29//!                    (Batch flush via HybridBatcher)
30//!                              ▼
31//! ┌─────────────────────────────────────────────────────────────┐
32//! │                     L2: Redis Cache                         │
33//! │  • Pipelined batch writes for throughput                   │
34//! │  • Optional TTL per-item (via SubmitOptions)               │
35//! │  • EXISTS command for fast existence checks                │
36//! └─────────────────────────────────────────────────────────────┘
37//!                              │
38//!                    (Batch persist to ground truth)
39//!                              ▼
40//! ┌─────────────────────────────────────────────────────────────┐
41//! │                   L3: MySQL/SQLite Archive                  │
42//! │  • Ground truth storage (BLOB column)                      │
43//! │  • Cuckoo filter for fast existence checks                 │
44//! │  • WAL fallback during outages                             │
45//! └─────────────────────────────────────────────────────────────┘
46//! ```
47//!
48//! ## Quick Start
49//!
50//! ```rust,no_run
51//! use sync_engine::{SyncEngine, SyncEngineConfig, SyncItem};
52//! use serde_json::json;
53//! use tokio::sync::watch;
54//!
55//! #[tokio::main]
56//! async fn main() {
57//!     let config = SyncEngineConfig {
58//!         redis_url: Some("redis://localhost:6379".into()),
59//!         sql_url: Some("mysql://user:pass@localhost/db".into()),
60//!         ..Default::default()
61//!     };
62//!
63//!     let (_tx, rx) = watch::channel(config.clone());
64//!     let mut engine = SyncEngine::new(config, rx);
65//!     
66//!     // Start the engine (connects to backends)
67//!     engine.start().await.expect("Failed to start");
68//!
69//!     // Submit items for sync
70//!     let item = SyncItem::from_json(
71//!         "uk.nhs.patient.record.12345".into(),
72//!         json!({"name": "John Doe", "nhs_number": "1234567890"})
73//!     );
74//!     engine.submit(item).await.expect("Failed to submit");
75//!
76//!     // Retrieve items (L1 → L2 → L3 fallback)
77//!     if let Some(item) = engine.get("uk.nhs.patient.record.12345").await.unwrap() {
78//!         println!("Found: {:?}", item.content_as_json());
79//!     }
80//!
81//!     engine.shutdown().await;
82//! }
83//! ```
84//!
85//! ## Features
86//!
87//! - **Tiered Caching**: L1 (memory) → L2 (Redis) → L3 (SQL) with automatic fallback
88//! - **Binary Storage**: Store raw `Vec<u8>` - caller handles serialization/compression
89//! - **Flexible Routing**: `SubmitOptions` controls which tiers receive data
90//! - **TTL Support**: Per-item TTL for Redis cache entries
91//! - **Batch Writes**: Configurable flush by count, size, or time
92//! - **Cuckoo Filters**: Skip SQL queries when data definitely doesn't exist
93//! - **WAL Durability**: Local SQLite WAL during MySQL outages
94//! - **Backpressure**: Graceful degradation under memory pressure
95//! - **Circuit Breakers**: Prevent cascade failures to backends
96//! - **Retry Logic**: Configurable retry policies for transient failures
97//!
98//! ## Configuration
99//!
100//! See [`SyncEngineConfig`] for all configuration options.
101//!
102//! ## Modules
103//!
104//! - [`config`]: Engine configuration ([`SyncEngineConfig`])
105//! - [`coordinator`]: The main [`SyncEngine`] orchestrating all components
106//! - [`sync_item`]: Core data structure ([`SyncItem`], [`ContentType`])
107//! - [`submit_options`]: Per-item routing options ([`SubmitOptions`], [`CacheTtl`])
108//! - [`storage`]: Storage backends (Redis, SQL, Memory)
109//! - [`batching`]: Hybrid batcher for efficient writes
110//! - [`cuckoo`]: Probabilistic existence filters
111//! - [`merkle`]: Merkle tree for sync verification
112//! - [`resilience`]: Circuit breakers, retry logic, WAL
113//! - [`eviction`]: Tan-curve eviction policy
114//! - [`backpressure`]: Memory pressure handling
115//! - [`search`]: RediSearch and SQL query translation
116//! - [`cdc`]: Change Data Capture stream output
117//! - [`metrics`]: OTEL-compatible metrics instrumentation
118
119pub mod config;
120pub mod sync_item;
121pub mod submit_options;
122pub mod storage;
123pub mod batching;
124pub mod resilience;
125pub mod eviction;
126pub mod cuckoo;
127pub mod merkle;
128pub mod backpressure;
129pub mod coordinator;
130pub mod metrics;
131pub mod search;
132pub mod cdc;
133
134// Note: We don't expose a `tracing` module to avoid conflict with the tracing crate
135
136pub use config::SyncEngineConfig;
137pub use coordinator::{SyncEngine, EngineState, ItemStatus, BatchResult, MerkleDiff, HealthCheck};
138pub use backpressure::BackpressureLevel;
139pub use sync_item::{SyncItem, ContentType};
140pub use submit_options::{CacheTtl, OptionsKey, SubmitOptions};
141pub use storage::traits::{CacheStore, ArchiveStore, StorageError};
142pub use cuckoo::filter_manager::{FilterManager, FilterTrust};
143pub use batching::hybrid_batcher::{HybridBatcher, BatchConfig, FlushReason, Batch, FlushBatch, SizedItem, BatchableItem};
144pub use merkle::{PathMerkle, MerkleBatch, MerkleNode, RedisMerkleStore};
145pub use resilience::wal::{WriteAheadLog, MysqlHealthChecker, WalStats};
146pub use resilience::circuit_breaker::{CircuitBreaker, CircuitConfig, CircuitError, BackendCircuits};
147pub use resilience::retry::RetryConfig;
148pub use metrics::LatencyTimer;
149pub use cdc::{CdcEntry, CdcOp, CdcMeta, CdcFieldValue, maybe_compress, maybe_decompress, is_zstd_compressed, cdc_stream_key, CDC_STREAM_SUFFIX};