Crema
A strongly consistent distributed cache built on Raft consensus and Moka local cache.
Features
Core Features
- Strong consistency for writes via Raft consensus (linearizable)
- Linearizable reads via Read-Index protocol (
consistent_get()) - Fast local reads from Moka cache with TinyLFU eviction
- Automatic TTL and eviction policies
- Request forwarding - automatic forwarding of writes to Raft leader
- Graceful shutdown with proper state cleanup
Cluster Management
- Two-tier cluster membership (discovery + manual Raft control)
- Memberlist integration for gossip-based node discovery and health monitoring
- Automatic peer discovery via seed nodes
Multi-Raft Scaling
- Multi-Raft architecture for horizontal scalability
- Automatic shard routing with consistent hashing
- Shard migration with zero-downtime rebalancing
- Per-shard leadership for distributed write load
Persistence & Recovery
- Pluggable storage backends - Memory (default) or RocksDB (persistent)
- Checkpointing with LZ4 compression for fast recovery
- Crash recovery from snapshots and Raft log replay
- CRC32 checksums for data integrity
Observability
- Prometheus-style metrics (counters, gauges, histograms)
- Detailed shard statistics (entries, ops/sec, leadership)
- Migration progress tracking
Testing
- Chaos testing framework for resilience testing
- Comprehensive test suite with 471+ tests
Requirements
- Rust 1.75.0 or later
- Tokio async runtime
- (Optional) RocksDB for persistent storage
Installation
Add to your Cargo.toml:
[]
= "0.1.0"
# For persistent storage (optional)
= { = "0.1.0", = ["rocksdb"] }
Quick Start
Single-Raft Mode (Default)
use ;
use Duration;
async
Multi-Raft Mode (Sharded)
For high-throughput workloads, enable Multi-Raft mode to distribute writes across multiple shards:
use ;
use Duration;
async
Architecture
┌────────────────────────────────────────────────────────────────┐
│ Client Layer │
├────────────────────────────────────────────────────────────────┤
│ Request Router │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Key → Shard │ │ Forward │ │ MOVED │ │
│ │ Mapping │ │ to Leader │ │ Response │ │
│ │ (versioned) │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
├────────────────────────────────────────────────────────────────┤
│ Multi-Raft Coordinator (Control Plane) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Shard │ │ Migration │ │ Raft │ │
│ │ Registry │ │ Orchestrator │ │ Lifecycle │ │
│ │ (shard→raft) │ │(ownership Δ) │ │ (start/stop)│ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ Note: Does NOT participate in normal read/write data path │
├────────────────────────────────────────────────────────────────┤
│ Raft Groups (per Shard) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Shard N Raft Group │ │
│ │ ┌─────────────┐ ┌─────────────────────────────────┐ │ │
│ │ │ Raft Core │ │ Apply Pipeline │ │ │
│ │ │ - HardState │ │ ┌─────────────────────────┐ │ │ │
│ │ │ - Log/WAL │ │ │ State Machine (Moka) │ │ │ │
│ │ │ - Conf │ │ │ + Checkpoint Snapshots│ │ │ │
│ │ └─────────────┘ │ └─────────────────────────┘ │ │ │
│ │ └─────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
├────────────────────────────────────────────────────────────────┤
│ Storage Layer │
│ ┌─────────────────────────────────┐ ┌────────────────────┐ │
│ │ Raft Consensus Storage │ │ State Machine │ │
│ │ ┌───────────┐ ┌─────────────┐ │ │ ┌──────────────┐ │ │
│ │ │ HardState │ │ Log/WAL │ │ │ │ Moka Cache │ │ │
│ │ │ ConfState │ │ (entries) │ │ │ │ (KV data) │ │ │
│ │ └───────────┘ └─────────────┘ │ │ └──────────────┘ │ │
│ │ ┌─────────────────────────────┐│ │ ┌──────────────┐ │ │
│ │ │ RocksDB (optional feature) ││ │ │ Checkpoint │ │ │
│ │ │ OR MemStorage (default) ││ │ │ (LZ4+CRC32) │ │ │
│ │ └─────────────────────────────┘│ │ └──────────────┘ │ │
│ └─────────────────────────────────┘ └────────────────────┘ │
├────────────────────────────────────────────────────────────────┤
│ Network Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ TCP │ │ Raft Messages│ │ Memberlist │ │
│ │ (Client) │ │ (Peers) │ │ (Gossip) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────────────────────────────────────────────────┘
Module Structure
src/
├── cache/ # Main DistributedCache API, CacheRouter, and Moka storage
├── consensus/ # Raft node, state machine, storage, transport
├── cluster/ # Two-tier membership (discovery + manual Raft control)
├── network/ # TCP server, RPC messages, wire protocol
├── checkpoint/ # Snapshot writer/reader with LZ4 compression
├── partitioning/ # Consistent hash ring for key distribution
├── multiraft/ # Horizontal scaling via multiple Raft groups + migration
│ ├── coordinator.rs # MultiRaftCoordinator - manages all shards
│ ├── shard.rs # Shard - single partition with its own storage
│ ├── router.rs # ShardRouter - routes keys to shards
│ ├── migration.rs # Shard migration for rebalancing
│ └── memberlist_integration.rs # Gossip-based shard leader tracking
├── metrics/ # Prometheus-style counters, gauges, histograms
└── testing/ # Chaos testing framework
Storage Configuration
Crema supports two storage backends for Raft log persistence:
Memory Storage (Default)
In-memory storage is the default and requires no additional configuration. Data is lost on restart but provides the fastest performance.
use ;
let config = new
.with_raft_storage_type; // Default
Use cases:
- Development and testing
- Ephemeral caching where data loss is acceptable
- Maximum performance scenarios
RocksDB Storage (Persistent)
For production deployments requiring durability, enable RocksDB storage:
# Cargo.toml
[]
= { = "0.1.0", = ["rocksdb"] }
use ;
let config = new
.with_raft_storage_type
.with_data_dir; // Required for RocksDB
Use cases:
- Production deployments requiring durability
- Crash recovery without full cluster rebuild
- Compliance requirements for data persistence
Storage Comparison
| Feature | Memory | RocksDB |
|---|---|---|
| Persistence | No | Yes |
| Recovery on restart | Full replay required | Instant |
| Write latency | ~1ms | ~2-5ms |
| Feature flag | None | rocksdb |
| Disk usage | None | Proportional to data |
Consistency Model
Read Operations
| API | Consistency | Performance | Use Case |
|---|---|---|---|
get() |
Eventually consistent | Fastest (local) | High-throughput reads where staleness is acceptable |
consistent_get() |
Linearizable | Slower (leader roundtrip) | When freshness is critical |
Write Operations
All writes (put(), delete()) go through Raft consensus and are linearizable.
// Fast local read - may return stale data on followers
let value = cache.get.await;
// Strongly consistent read - always reads latest committed value
let value = cache.consistent_get.await?;
// Linearizable write
cache.put.await?;
Request Forwarding
When a write request arrives at a follower node, it is automatically forwarded to the current Raft leader:
Client → Follower → Leader (Raft Consensus) → Response → Client
This provides transparent handling of writes regardless of which node receives the request.
Consistency Summary
| Operation | Single-Raft | Multi-Raft |
|---|---|---|
| Writes | Strongly consistent (linearizable) | Per-shard consistent |
get() |
Locally consistent | Per-shard locally consistent |
consistent_get() |
Strongly consistent | Per-shard leader reads |
| Cross-shard | N/A | No transaction support |
| Shard routing | N/A | Eventually consistent (gossip) |
Multi-Raft Scaling
Why Multi-Raft?
A single Raft group has practical limits of ~10-20K writes/sec due to:
- Network latency for replication
- Disk fsync for durability
- Leader bottleneck (all writes go through one node)
Multi-Raft solves this by partitioning the keyspace into N independent shards:
| Shards | Theoretical Max | Practical Max |
|---|---|---|
| 1 | 50K/sec | 10-20K/sec |
| 4 | 200K/sec | 40-80K/sec |
| 16 | 800K/sec | 160-320K/sec |
| 64 | 3.2M/sec | 640K-1.3M/sec |
Shard Assignment
Keys are assigned to shards using consistent hashing:
shard_id = xxhash64(key) % num_shards
This ensures:
- Same key always goes to same shard
- Keys are evenly distributed across shards
- Different shards can have different leaders (load distribution)
Gossip-Based Routing
The implementation uses memberlist gossip for shard leader discovery:
- Eventually consistent - leader info propagates via gossip
- Best-effort routing - shard leader info is a routing hint
- Epoch-based versioning - handles out-of-order gossip updates
- Debounced broadcasts - batches rapid leader changes (200ms default)
// Shard leader info is encoded in memberlist metadata
// Format: "shard_id:leader_id:epoch,..."
// When a node becomes leader of a shard:
coordinator.set_local_shard_leader; // Queues broadcast
// When receiving gossip about shard leaders:
coordinator.set_shard_leader_if_newer; // Epoch check
Multi-Raft API
use ;
// Direct coordinator usage (for advanced use cases)
let coordinator = new
.num_shards
.replica_factor
.shard_capacity
.default_ttl
.build_and_init
.await?;
// Cache operations
coordinator.put.await?;
let value = coordinator.get.await?;
coordinator.delete.await?;
// Shard introspection
let shard_id = coordinator.shard_for_key;
let shard_info: = coordinator.shard_info;
let stats: MultiRaftStats = coordinator.stats;
// Leader management
coordinator.set_shard_leader;
let leaders = coordinator.shard_leaders; // HashMap<ShardId, Option<NodeId>>
// Invalidate leaders when a node fails
let invalidated = coordinator.invalidate_leader_for_node;
Shard Types
use ;
// ShardId is just u32
let shard_id: ShardId = 0;
// ShardConfig for creating shards
let config = new
.with_replicas
.with_max_capacity
.with_default_ttl;
// ShardState tracks lifecycle
// ShardInfo provides runtime statistics
// ShardRange represents key ownership
let range = ShardRange ;
assert!; // key_hash % 16 == 0
Configuration
CacheConfig
use ;
use Duration;
let node_id = 1;
let raft_addr = "127.0.0.1:9000".parse?;
// Configure memberlist for gossip-based discovery
let memberlist_config = MemberlistConfig ;
// Create MemberlistDiscovery - users create their own discovery implementation
let raft_peers = vec!;
let discovery = new;
let config = new
// Cache settings
.with_max_capacity
.with_default_ttl
// Storage backend
.with_raft_storage_type // or Memory (default)
.with_data_dir // Required for RocksDB
// Raft settings
.with_raft_config
// Cluster discovery - pass your own ClusterDiscovery implementation
.with_cluster_discovery
// Multi-Raft for sharding (requires cluster discovery)
.with_multiraft_config;
MultiRaftCacheConfig
| Field | Default | Description |
|---|---|---|
enabled |
false |
Enable Multi-Raft mode |
num_shards |
16 |
Number of shards (max 64) |
shard_capacity |
100_000 |
Max entries per shard |
auto_init_shards |
true |
Auto-create shards on startup |
leader_broadcast_debounce_ms |
200 |
Debounce interval for leader broadcasts |
Note: Multi-Raft requires a ClusterDiscovery implementation (like MemberlistDiscovery). The cache will fail to start if Multi-Raft is enabled without cluster discovery.
ClusterDiscovery
Crema uses a ClusterDiscovery trait to abstract cluster membership protocols. Users create their own discovery implementation and pass it to the config:
// Available implementations:
// - MemberlistDiscovery: Gossip-based discovery using SWIM protocol
// - StaticClusterDiscovery: Fixed IP list with health checks
// - NoOpClusterDiscovery: No-op for single-node or manual management
let discovery = new;
let config = new
.with_cluster_discovery;
MemberlistConfig
| Field | Default | Description |
|---|---|---|
enabled |
false |
Enable memberlist gossip |
bind_addr |
None |
Address for gossip protocol (UDP + TCP) |
seed_addrs |
[] |
Initial nodes to contact for discovery |
node_name |
None |
Human-readable node name |
peer_management.auto_add_peers |
false |
Automatically add discovered nodes as Raft peers |
peer_management.auto_add_voters |
false |
Automatically add discovered nodes as Raft voters |
Checkpointing
The cache supports periodic checkpointing for fast recovery:
use ;
let config = new
.with_log_threshold // Checkpoint after 10K log entries
.with_compression; // Enable LZ4 compression
// Snapshots are automatically created based on configured triggers
Checkpoint Format
- Binary format with magic bytes "MCRS"
- Version and flags header
- Raft index/term metadata
- LZ4-compressed entries
- CRC32 checksum for integrity verification
Recovery Process
On startup, the cache:
- Loads the latest valid checkpoint (if available)
- Verifies CRC32 checksum
- Restores state machine to checkpoint state
- Replays Raft log entries after checkpoint index
- Joins the cluster and catches up with leader
Metrics
Crema provides Prometheus-style metrics for monitoring:
use ;
// Access the metrics registry
let metrics = cache.metrics;
// Available metrics:
// - cache_hits_total (Counter)
// - cache_misses_total (Counter)
// - cache_entries (Gauge)
// - cache_size_bytes (Gauge)
// - raft_proposals_total (Counter)
// - raft_proposal_latency_seconds (Histogram)
// - raft_leader_changes_total (Counter)
// - memberlist_nodes (Gauge)
// - shard_entries (Gauge, per-shard)
// - shard_leader_changes_total (Counter)
// - migration_duration_seconds (Histogram)
Shard Statistics
if let Some = cache.multiraft_coordinator
Running a Cluster
Single Node
Multi-Node Cluster
# Terminal 1
RUST_LOG=info
# Terminal 2
RUST_LOG=info
# Terminal 3
RUST_LOG=info
Production Deployment
For production deployments, consider:
-
Use RocksDB storage for durability:
.with_raft_storage_type .with_data_dir -
Configure appropriate timeouts for your network:
.with_raft_config -
Enable cluster discovery for automatic peer discovery:
let memberlist_config = MemberlistConfig ; let discovery = new; // ... .with_cluster_discovery -
Set appropriate shard count based on expected write throughput:
- Start with 16 shards for most workloads
- Scale to 64 shards for high-throughput scenarios
-
Monitor metrics for cluster health and performance.
Wire Protocol
- 4-byte length prefix (big-endian u32)
- Bincode-encoded messages
- Protobuf for internal Raft messages
- Max message size: 16MB
Feature Flags
| Feature | Description |
|---|---|
default |
Memory storage + memberlist gossip |
memberlist |
Enable memberlist-based cluster discovery (default) |
rocksdb |
Enable RocksDB persistent storage |
full |
All features including RocksDB and memberlist |
# Default (with memberlist)
= "0.1.0"
# Without memberlist (minimal dependencies)
= { = "0.1.0", = false, = ["tokio"] }
# With RocksDB
= { = "0.1.0", = ["rocksdb"] }
# All features
= { = "0.1.0", = ["full"] }
Cluster Discovery Abstraction
Crema uses a ClusterDiscovery trait to abstract cluster membership protocols. Users create their own discovery implementation and pass it to the config via with_cluster_discovery():
use ;
// Available implementations:
// 1. MemberlistDiscovery - Gossip-based discovery using SWIM protocol
let discovery = new;
// 2. StaticClusterDiscovery - Fixed IP list with health checks
let discovery = new;
// 3. NoOpClusterDiscovery - No-op for single-node or manual management
let discovery = new;
// Pass to config
let config = new
.with_cluster_discovery;
When no discovery is provided, the cache defaults to NoOpClusterDiscovery. This is useful for:
- Single-node deployments
- Manual peer management
- Custom discovery implementations
Development
# Build
# Build with all features
# Run tests
# Run tests with all features
# Run tests with output
# Check code
# Lint
# Format
License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT License (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.