rivven-core
High-performance storage engine for the Rivven event streaming platform.
Overview
rivven-core is the foundational storage engine that powers Rivven's ultra-low-latency message persistence. It implements hot path optimizations including zero-copy I/O, portable async I/O (io_uring-style API), lock-free data structures, and cache-aligned memory layouts.
Features
| Category | Features |
|---|---|
| Hot Path | Zero-copy buffers, cache-line alignment, lock-free queues, ArcSwap mmap cache |
| Storage | Log segments, tiered storage (hot/warm/cold), compaction, atomic write generation tracking |
| I/O | Portable async I/O (io_uring-style API, std::fs fallback), memory-mapped files |
| Compression | LZ4, Zstd, Snappy (streaming-optimized) |
| Transactions | Exactly-once semantics, 2PC protocol |
| Batching | Group commit WAL, vectorized encoding, fast checksums (delegates to crc32fast/memchr) |
| Security | TLS 1.3, Cedar authorization, indexed ACL lookups, AES-256-GCM / ChaCha20-Poly1305 encryption with key rotation, SCRAM-SHA-256 (600K PBKDF2) |
Installation
[]
= "0.2"
# Enable optional features
= { = "0.2", = ["compression", "tls", "metrics"] }
Feature Flags
| Feature | Description | Dependencies |
|---|---|---|
compression |
LZ4, Zstd, Snappy codecs | lz4_flex, zstd, snap |
encryption |
AES-256-GCM / ChaCha20-Poly1305 at-rest encryption with key rotation | ring, rand |
tls |
TLS 1.3 transport security | rustls, webpki |
metrics |
Prometheus-compatible metrics | metrics, metrics-exporter-prometheus |
cedar |
Cedar policy-based authorization | cedar-policy |
oidc |
OpenID Connect authentication | openidconnect |
cloud-storage |
S3/GCS/Azure tiered storage | object_store |
Architecture
rivven-core/
├── Hot Path (Ultra-Fast)
│ ├── zero_copy.rs # Cache-aligned zero-copy buffers
│ ├── io_uring.rs # Portable async I/O (io_uring-style API, std::fs fallback)
│ ├── concurrent.rs # Lock-free MPMC queues, hashmaps
│ ├── buffer_pool.rs # Slab-allocated buffer pooling
│ └── vectorized.rs # Batch processing (delegates to crc32fast/memchr for SIMD)
│
├── Storage Engine
│ ├── storage/
│ │ ├── segment.rs # Log segment files with indexes
│ │ ├── tiered.rs # Hot/warm/cold tier management
│ │ ├── log_manager.rs # Segment lifecycle management
│ │ └── memory.rs # In-memory hot tier cache
│ ├── wal.rs # Group commit write-ahead log
│ └── compaction.rs # Log compaction with tombstones
│
├── Transactions
│ └── transaction.rs # Exactly-once semantics
│
├── Security
│ ├── auth.rs # Authentication providers
│ ├── tls.rs # TLS 1.3 configuration
│ └── encryption.rs # At-rest encryption
│
└── Utilities
├── compression.rs # Streaming codec implementations
├── bloom.rs # Bloom filters for segment lookup
└── metrics.rs # Performance observability
Tiered Storage
Rivven supports automatic data tiering across hot (memory), warm (local disk), and cold (object storage) tiers:
use ;
// Enable tiered storage with high-performance preset
let config = new
.with_tiered_storage;
// Or use cost-optimized preset for archival workloads
let config = new
.with_tiered_storage;
// Custom configuration
let tiered_config = TieredStorageConfig ;
let config = new.with_tiered_storage;
Storage tiers:
- Hot: In-memory for recent data and active consumers
- Warm: Local disk for medium-aged data
- Cold: S3/GCS/Azure for archival and compliance
Hot Path Optimizations
Segment Append (Zero-Allocation Serialization)
The produce hot path (Segment::append / append_batch) uses postcard::to_extend
to serialize messages directly into the output frame, avoiding intermediate
allocations:
Single append — 1 allocation, 0 copies:
Vec [CRC:4 placeholder | Len:4 placeholder] ──to_extend──▶ [CRC | Len | payload]
Batch append — 2 allocations for N messages:
Reusable msg_buf Vec ──to_extend + clear()──▶ per-message payload
Single BytesMut ◀── accumulates all framed records
Batch Append (Zero-Clone Ownership Transfer)
LogManager::append_batch() uses split_off() to partition batches across segment
boundaries without cloning Message structs — eliminates per-message String/Vec<u8>
header allocations.
Read Path (Dirty-Flag Lock Elision)
Segment reads check an atomic write_dirty flag before acquiring the write mutex.
When no writes are pending (common case for consumer-heavy workloads), the read path
bypasses the mutex entirely — eliminating head-of-line blocking behind concurrent appends.
Lock-Free Mmap Cache (ArcSwap)
Read-only memory maps for segment files are cached in an ArcSwap, providing lock-free access for readers. The mmap is only re-created when the segment's write generation changes (tracked via AtomicU64). Writers increment the atomic write generation after each append, and readers compare generations to detect staleness — a single load() + compare instead of a lock acquisition.
Zero-Copy Buffers
Cache-line aligned (64-byte) buffers eliminate unnecessary memory copies:
use ;
// Create a producer-side buffer
let mut buffer = new; // 64 KB
// Write directly into buffer (no intermediate copies)
let slice = buffer.write_slice;
slice.copy_from_slice;
// Transfer ownership to consumer (true zero-copy via Bytes::from_owner())
let consumer_view = buffer.freeze;
Performance Impact:
- True zero-copy
freeze()viaBytes::from_owner()— nomemcpyon buffer conversion - 4x reduction in memory bandwidth for large messages
- Cache-friendly access patterns with 64-byte alignment
- Reference counting for safe shared access
- Bounded pool with
max_bufferslimit to prevent unbounded memory growth - Safe reset via
try_reset()— verifies exclusiveArcownership before recycling
Lock-Free Data Structures
High-performance concurrent primitives optimized for streaming:
use ;
// MPMC queue with backpressure
let queue = bounded;
queue.push?; // Non-blocking
let msg = queue.pop?;
// Lock-free hashmap with sharded locks
let map = new;
map.insert;
let val = map.get;
// Append-only log for sequential writes
let log = new;
log.append;
| Data Structure | Operations | Contention Handling |
|---|---|---|
LockFreeQueue |
push/pop O(1) | Bounded backpressure |
ConcurrentHashMap |
get/insert O(1) | Sharded RwLocks |
AppendOnlyLog |
append O(1) | Single-writer optimized |
ConcurrentSkipList |
range O(log n) | Lock-free traversal |
Buffer Pooling
Slab-allocated buffer pool with thread-local caching:
use ;
// High-throughput configuration
let pool = with_config;
// Acquire buffer from pool (fast path: thread-local cache)
let mut buffer = pool.acquire;
buffer.extend_from_slice;
// Return to pool automatically on drop
drop;
// Pool statistics
let stats = pool.stats;
println!;
Size Classes:
| Class | Size Range | Use Case |
|---|---|---|
| Small | 64-512 bytes | Headers, metadata |
| Medium | 512-4KB | Typical messages |
| Large | 4KB-64KB | Batched records |
| Huge | 64KB-1MB | Large payloads |
Deallocate Routing: Returned buffers are classified by pool canonical sizes with +12.5% tolerance to compensate for allocator rounding (e.g. with_capacity(4096) may allocate 4160 bytes). Buffers that grew beyond the tolerance are dropped instead of mis-pooled.
Async I/O (io_uring-style API)
rivven-core provides a portable async I/O layer with an io_uring-style API. The current implementation uses std::fs::File behind parking_lot::Mutex as a portable fallback. The API is designed so a true io_uring backend can be swapped in on Linux 5.6+ without changing callers.
Concurrent Reads: On Unix, AsyncFile::read_at() uses pread (positioned read) via std::os::unix::fs::FileExt, eliminating the mutex for read operations. Concurrent segment fetches proceed without contention. Non-Unix falls back to seek+read under TokioMutex.
⚠️ Note:
BlockingWriterinio_uring.rsperforms synchronous file I/O under a blocking mutex. Usetokio::task::spawn_blockingor the asyncAsyncFilefromasync_io.rsfor async contexts.
use ;
// High-throughput WAL writer
let config = high_throughput;
let wal = new?;
// Direct write (immediate)
let offset = wal.append?;
// Batched writes (queued until flush)
wal.append_batched?;
wal.append_batched?;
wal.flush_batch?; // Execute all batched writes
// Append with checksum
let offset = wal.append_with_checksum?;
wal.sync?;
// Read segments
let reader = open?;
let messages = reader.read_messages?;
Batch Operations
Batched I/O reduces syscall overhead by queueing multiple operations:
use ;
// Create a batch of operations
let mut batch = new;
batch.write;
batch.write;
batch.read;
batch.sync;
// Get batch statistics before execution
let stats: BatchStats = batch.stats;
println!;
// Execute batch
let writer = new?;
let executor = for_writer;
executor.execute?;
Batch Statistics
The BatchStats struct provides insight into batch composition:
| Field | Type | Description |
|---|---|---|
total_ops |
u64 | Total operations in batch |
write_ops |
u64 | Number of write operations |
read_ops |
u64 | Number of read operations |
sync_ops |
u64 | Number of sync operations |
write_bytes |
u64 | Total bytes to be written |
read_bytes |
u64 | Total bytes to be read |
Transactions
Native exactly-once semantics with two-phase commit:
use ;
// Create coordinator
let coordinator = new;
// Begin transaction
let txn = coordinator.begin_transaction?;
// Add writes to transaction
txn.add_write?;
txn.add_write?;
// Commit atomically (all-or-nothing)
coordinator.commit.await?;
// Or abort on failure
// coordinator.abort(&txn).await?;
Transaction Guarantees:
| Property | Implementation |
|---|---|
| Atomicity | Two-phase commit with coordinator |
| Isolation | Epoch-based producer fencing |
| Durability | WAL persistence before commit |
| Exactly-Once | Idempotent sequence numbers |
Vectorized Batch Processing
Batch processing operations for high-throughput workloads. CRC32 and memory search delegate to crc32fast and memchr crates respectively, which use SIMD internally when available:
use ;
// Batch encoding (2-4x faster than sequential)
let mut encoder = with_capacity;
for msg in messages
let encoded = encoder.finish;
// Batch decoding
let decoder = new;
let messages = decoder.decode_all;
// Fast CRC32 (delegates to crc32fast, which uses SSE4.2/AVX2 when available)
let checksum = crc32_fast;
// Columnar record batch for analytics
let mut batch = new;
batch.add;
let filtered = batch.filter;
Vectorization Benefits:
| Operation | Speedup | Acceleration |
|---|---|---|
| CRC32 | 4-8x | crc32fast (SSE4.2/AVX2/ARM CRC32 when available) |
| Batch encode | 2-4x | Cache-optimized sequential processing |
| Memory search | 3-5x | memchr crate (AVX2/SSE2/NEON when available) |
Group Commit WAL
Write-ahead log with group commit optimization (10-100x throughput improvement):
use ;
// Configure for maximum throughput
let config = WalConfig ;
let wal = new?;
// Writes are batched and flushed together
let = wal.append?;
committed.await?; // Wait for fsync
Group Commit Performance:
- Zero-alloc serialization:
WalRecord::write_to_buf()serializes directly into the shared batch buffer — no per-recordBytesMutintermediate allocation - Buffer shrink: Batch buffer re-allocates to default capacity after burst traffic when oversized (>2x max)
- CRC-validated recovery: Both
find_actual_end()andscan_wal_file()validate CRC32 for every record. Replayed records are written viaappend_replicatedto preserve original offsets.WalRecord::from_bytes()rejects zero-lengthFull/First/Lastrecords to prevent phantom records from pre-allocated WAL tail. Transaction COMMIT/ABORT markers are replayed fromTxnWalPayloadrecords. - File pre-allocation: Background
spawn_blockingpre-allocates the next WAL file during normal operation, enabling zero-latency rotation when the current file fills up - Failure-safe stats: Write statistics (
writes_total,bytes_written, etc.) are only updated on successful writes — failed flushes are never counted - LSN from filename: On recovery, the starting LSN is derived from the WAL segment filename rather than scanning the file. If the filename cannot be parsed, recovery falls back to a full scan — this eliminates an O(n) scan on every startup
- Graceful shutdown drain: On shutdown the write channel is closed and all remaining buffered writes are drained and flushed before the WAL file is closed, ensuring zero data loss for in-flight appends. The shutdown sequence awaits the background worker
JoinHandleto guarantee the task has fully terminated before returning
| Batch Size | fsync/sec | Throughput |
|---|---|---|
| 1 (no batching) | 10,000 | 10K msg/sec |
| 100 | 100 | 1M msg/sec |
| 1000 | 10 | 10M msg/sec |
Core Types
use ;
// Create a record
let record = builder
.key
.value
.header
.build;
Partition Append Optimization
When tiered storage is enabled, the partition pre-serializes the message once before consuming it into the segment log. This eliminates both the message.clone() and double-serialization that would otherwise occur:
- Single
message.to_bytes()for the tiered-storage copy - Owned
messagemoved intolog.append()(zero-copy handoff) LogManager::truncate_before()physically deletes segments below the low-watermark (used byDeleteRecords)
Storage Engine
The storage engine uses a log-structured design:
data/
└── topics/
└── orders/
├── partition-0/
│ ├── 00000000000000000000.log # Segment file
│ ├── 00000000000000000000.idx # Offset index
│ └── 00000000000000001000.log # Next segment
└── partition-1/
Test Coverage
# Run all tests
# Run with feature flags
Current Coverage: 306 tests (100% passing)
| Category | Tests | Description |
|---|---|---|
| Zero-copy | 12 | Buffer allocation, slicing, freeze |
| Concurrent | 18 | Lock-free queue, hashmap, skiplist |
| Storage | 45 | Segments, indexes, tiered storage |
| WAL | 22 | Group commit, recovery, checksums |
| Transactions | 28 | 2PC, abort, idempotence |
| Vectorized | 15 | Batch encoding, CRC32, SIMD |
| TLS | 34 | Certificate validation, handshake |
| Auth | 25 | RBAC, Cedar policies, indexed ACL lookups |
| Compression | 18 | LZ4, Zstd, Snappy codecs |
Documentation
License
Apache-2.0. See LICENSE.