Sync Engine
High-performance tiered sync engine with L1/L2/L3 caching and Redis/SQL backends
Philosophy: Content-Aware Tiered Storage
sync-engine intelligently routes data through L1/L2/L3 tiers based on content type:
- JSON payloads → Stored natively for full-text search (RediSearch, JSON_EXTRACT)
- Binary blobs → Stored as opaque bytes in dedicated blob columns
The caller submits SyncItem with raw bytes; the engine detects content type and
optimizes storage for queryability while maintaining the original data integrity.
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Ingest Layer │
│ • Accepts SyncItems via submit() / submit_with() │
│ • Backpressure control based on memory usage │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ L1: In-Memory Cache │
│ • Moka cache for concurrent access │
│ • Tan-curve eviction under memory pressure │
└─────────────────────────────────────────────────────────────┘
│
(Batch flush via HybridBatcher)
▼
┌─────────────────────────────────────────────────────────────┐
│ L2: Redis Cache │
│ • RedisJSON for structured data (JSON.SET/JSON.GET) │
│ • RediSearch for full-text & field queries │
│ • Binary fallback for non-JSON content │
│ • Optional per-item TTL │
└─────────────────────────────────────────────────────────────┘
│
(Batch persist to ground truth)
▼
┌─────────────────────────────────────────────────────────────┐
│ L3: MySQL/SQLite Archive │
│ • JSON in TEXT column (queryable via JSON_EXTRACT) │
│ • Binary in BLOB column │
│ • Cuckoo filter for fast existence checks │
│ • WAL fallback during outages │
└─────────────────────────────────────────────────────────────┘
Features
- Tiered Caching: L1 (memory) → L2 (Redis) → L3 (SQL) with automatic fallback
- Content-Aware Storage: JSON payloads stored searchable, binaries as blobs
- RedisJSON + RediSearch: Full-text and field-based queries on Redis data
- MySQL JSON Queries: JSON_EXTRACT on payload column for SQL searches
- Flexible Routing:
SubmitOptionscontrols which tiers receive data - Item State: Tag items with caller-defined state for grouping and batch ops
- TTL Support: Per-item TTL for Redis cache entries
- Batch Writes: Configurable flush by count, size, or time
- Cuckoo Filters: Skip SQL queries when data definitely doesn't exist
- WAL Durability: Local SQLite WAL during MySQL outages
- Backpressure: Graceful degradation under memory pressure
- Circuit Breakers: Prevent cascade failures to backends
- Change Detection Capture: (CDC) stream output into Redis streams for replication
Quick Start
Add to your Cargo.toml:
[]
= "0.2.1"
= { = "1", = ["full"] }
= "1"
Basic usage:
use ;
use json;
use watch;
async
Submit Options
Control where data is stored per-item:
use ;
// Default: Redis + SQL
let default = default;
// Redis only with TTL (ephemeral cache)
let cache = cache;
// SQL only (durable, skip Redis)
let durable = durable;
// Custom routing
let custom = SubmitOptions ;
Item State
Tag items with caller-defined state for grouping and batch operations:
use ;
use json;
// Create item with state (e.g., CRDT deltas vs base state)
let delta = from_json
.with_state;
// Override state via SubmitOptions
engine.submit_with.await?;
// Query by state
let deltas = engine.get_by_state.await?;
let count = engine.count_by_state.await?;
// Update state (e.g., after processing)
engine.set_state.await?;
// Bulk delete by state
engine.delete_by_state.await?;
State is indexed in SQL for fast queries and tracked in Redis SETs for O(1) membership checks.
Prefix Scan
Query items by ID prefix for CRDT delta-first patterns:
// Store deltas with hierarchical IDs: delta:{object_id}:{op_id}
let op = from_json
.with_state;
engine.submit.await?;
// Fetch ALL deltas for a specific object (for read-repair/merge)
let user_deltas = engine.scan_prefix.await?;
// Count pending deltas
let pending = engine.count_prefix.await?;
// After merging, cleanup the deltas
engine.delete_prefix.await?;
Prefix scan queries SQL directly (ground truth) and leverages the primary key index for efficient LIKE 'prefix%' queries.
Full-Text Search
Create search indices on your JSON data for RediSearch-powered queries with SQL fallback:
use ;
use SearchTier;
// Define a search index (the engine handles $.payload paths internally)
let index = new
.text // Full-text searchable
.text
.numeric_sortable // Numeric with range queries
.tag; // Exact tag matching
engine.create_search_index.await?;
// Insert documents as usual
let user = from_json;
engine.submit.await?;
// Query with fluent builder API
let query = field_eq
.and;
let results = engine.search.await?;
// Tag-based queries
let admins = engine.search.await?;
// Use SearchTier for control over Redis vs SQL
let redis_only = engine.search_with_options.await?;
Search Features:
- Text fields: Full-text search with phrase matching
- Numeric fields: Range queries with optional sorting
- Tag fields: Exact multi-value matching with OR semantics
- Compound queries:
.and(),.or(),.negate()for complex filters - Search cache: Merkle-invalidated SQL result caching for hybrid queries
Configuration
All configuration options with their defaults:
| Option | Default | Description |
|---|---|---|
| Connection | ||
redis_url |
None |
Redis connection string (e.g., "redis://localhost:6379") |
redis_prefix |
None |
Key prefix for namespacing (e.g., "sync:") |
sql_url |
None |
MySQL/SQLite connection string |
| Memory & Limits | ||
l1_max_bytes |
256 MB |
Maximum L1 cache size |
max_payload_bytes |
16 MB |
Max single item size (prevents cache exhaustion) |
| Batching | ||
batch_flush_ms |
100 |
Flush batch after N milliseconds |
batch_flush_count |
1000 |
Flush batch after N items |
batch_flush_bytes |
1 MB |
Flush batch after N bytes |
| Backpressure | ||
backpressure_warn |
0.7 |
Memory pressure warning threshold (0.0-1.0) |
backpressure_critical |
0.9 |
Memory pressure critical threshold (0.0-1.0) |
| WAL (Write-Ahead Log) | ||
wal_path |
None |
SQLite WAL path for durability during outages |
wal_max_items |
None |
Max WAL items before backpressure |
wal_drain_batch_size |
100 |
Items per WAL drain batch |
| Cuckoo Filter | ||
cuckoo_warmup_batch_size |
10000 |
Batch size when warming filter from SQL |
cf_snapshot_interval_secs |
30 |
Snapshot filter to WAL every N seconds |
cf_snapshot_insert_threshold |
10000 |
Snapshot filter after N inserts |
| Redis Eviction | ||
redis_eviction_enabled |
true |
Enable proactive eviction before Redis LRU |
redis_eviction_start |
0.75 |
Start evicting at this pressure (0.0-1.0) |
redis_eviction_target |
0.60 |
Target pressure after eviction (0.0-1.0) |
| Merkle Tree | ||
merkle_calc_enabled |
true |
Enable merkle updates (if sharing a SQL instance, disable on most nodes in cluster) |
merkle_calc_jitter_ms |
0 |
Random delay to reduce cluster contention |
| CDC Stream | ||
enable_cdc_stream |
false |
Enable Change Data Capture to Redis Stream |
cdc_stream_maxlen |
100000 |
Max stream entries (MAXLEN ~, relies on Merkle repair) |
Testing
Comprehensive test suite with 324 tests covering unit, property-based, integration, and chaos testing:
| Test Suite | Count | Description |
|---|---|---|
| Unit Tests | 241 ✅ | Fast, no external deps |
| Doc Tests | 31 ✅ | Example verification |
| Property Tests | 12 ✅ | Proptest fuzzing for invariants |
| Integration Tests | 30 ✅ | Real Redis Stack/MySQL via testcontainers |
| Chaos Tests | 10 ✅ | Failure injection, container killing |
| Total | 324 ✅ | ~76.6% code coverage |
Running Tests
# Unit tests (fast, no Docker)
# Doc tests (not so fast, no Docker)
# Property-based fuzzing
# Integration tests (requires Docker)
# Chaos tests (requires Docker)
# All tests
# Coverage (slow, requires Docker for integration and chaos)
Examples & CLI Tools
Basic Usage Example
A complete working example that demonstrates all core functionality:
# Start the docker environment
# Run the example
This showcases:
- Connecting to Redis (L2) and MySQL (L3)
- Writing JSON entries with timing
- L1 cache hit performance (microseconds!)
- Merkle tree sync verification
- Cuckoo filter trust verification
- OTEL-compatible metrics export
CLI Scripts
Utility scripts for inspecting and managing data in the docker environment:
| Script | Purpose |
|---|---|
./scripts/clear.sh |
Flush Redis + drop/recreate MySQL table |
./scripts/redisearch-index.sh |
Build RediSearch full-text index |
./scripts/redisearch-query.sh |
Query RediSearch with pretty output |
./scripts/redis-search.sh |
Manual Redis search (pattern or JSON) |
./scripts/sql-search.sh |
MySQL search with JSON_EXTRACT |
./scripts/show-records.sh |
Display full records from Redis + MySQL |
Examples:
# Clear everything and start fresh
# Create data with `cargo run --example basic_usage` from above
# Build RediSearch index after populating data
# Query by field, tag, or numeric range
# Search MySQL with JSON path
# Display all records from both backends
Docker Environment
The docker-compose.yml provides:
| Service | Port | Description |
|---|---|---|
| Redis Stack | 6379 | Redis + RediSearch + RedisJSON |
| MySQL | 3306 | Ground truth storage |
| RedisInsight | 5540 | Redis GUI |
| Adminer | 8080 | MySQL GUI |
Chaos Testing Scenarios
The chaos test suite validates resilience under real-world failure conditions:
- Container killing: Redis/MySQL death mid-operation
- Data corruption: Garbage JSON, truncated data in Redis
- Lifecycle edge cases: Double start, shutdown without start, ops after shutdown
- Concurrent failures: Many writers during Redis death
- Memory pressure: L1 overflow with backpressure
- Rapid cycles: Start/stop 5x without resource leaks
License
GNU Affero General Public License v3.0 (AGPL-3.0)
For commercial licensing options, contact: adrian.j.robinson@gmail.com