rivven-core
Core storage engine and types for the Rivven event streaming platform.
Overview
rivven-core is the foundational storage engine that powers Rivven's high-throughput message persistence. It provides append-only log segments, tiered storage, consumer groups, and optimized I/O primitives.
Features
| Category | Features |
|---|---|
| Storage | Log segments, offset/timestamp indexes, tiered storage |
| Compression | LZ4, Zstd, Snappy, Gzip codecs |
| I/O | Zero-copy reads, memory-mapped files, buffer pooling |
| Linux | io_uring backend for maximum throughput |
| Consumer | Consumer groups with offset tracking |
Installation
[]
= "0.2"
Architecture
rivven-core/
├── storage/ # Log-structured storage engine
│ ├── segment.rs # Log segment files
│ ├── index.rs # Offset and timestamp indexes
│ ├── tiered.rs # Hot/warm/cold tiered storage
│ └── compaction.rs # Log compaction
├── io_uring.rs # Linux io_uring async I/O
├── zero_copy.rs # Zero-copy producer/consumer
├── consumer/ # Consumer group coordination
├── compression/ # Codec implementations
├── protocol/ # Wire protocol types
└── metrics/ # Observability
Tiered Storage
Rivven supports automatic data tiering across hot (memory), warm (local disk), and cold (object storage) tiers:
use ;
// Enable tiered storage with high-performance preset
let config = new
.with_tiered_storage;
// Or use cost-optimized preset for archival workloads
let config = new
.with_tiered_storage;
// Custom configuration
let tiered_config = TieredStorageConfig ;
let config = new.with_tiered_storage;
| Tier | Storage | Latency | Use Case |
|---|---|---|---|
| Hot | In-memory | < 1ms | Recent data, active consumers |
| Warm | Local disk | 1-10ms | Medium-aged data |
| Cold | S3/GCS/Azure | 100ms+ | Archival, compliance |
io_uring Async I/O
For Linux 5.6+, rivven-core provides an io_uring backend that eliminates syscall overhead:
use ;
// High-throughput WAL writer
let config = high_throughput;
let wal = new?;
// Direct write (immediate)
let offset = wal.append?;
// Batched writes (queued until flush)
wal.append_batched?;
wal.append_batched?;
wal.flush_batch?; // Execute all batched writes
// Append with checksum
let offset = wal.append_with_checksum?;
wal.sync?;
// Read segments
let reader = open?;
let messages = reader.read_messages?;
Batch Operations
Batched I/O reduces syscall overhead by queueing multiple operations:
use ;
// Create a batch of operations
let mut batch = new;
batch.write;
batch.write;
batch.read;
batch.sync;
// Get batch statistics before execution
let stats: BatchStats = batch.stats;
println!;
// Execute batch
let writer = new?;
let executor = for_writer;
executor.execute?;
Batch Statistics
The BatchStats struct provides insight into batch composition:
| Field | Type | Description |
|---|---|---|
total_ops |
u64 | Total operations in batch |
write_ops |
u64 | Number of write operations |
read_ops |
u64 | Number of read operations |
sync_ops |
u64 | Number of sync operations |
write_bytes |
u64 | Total bytes to be written |
read_bytes |
u64 | Total bytes to be read |
io_uring Performance
| Backend | IOPS (4KB) | Latency p99 | CPU Usage |
|---|---|---|---|
| epoll | 200K | 1.5ms | 80% |
| io_uring | 800K | 0.3ms | 40% |
Core Types
use ;
// Create a record
let record = builder
.key
.value
.header
.build;
Storage Engine
The storage engine uses a log-structured design:
data/
└── topics/
└── orders/
├── partition-0/
│ ├── 00000000000000000000.log # Segment file
│ ├── 00000000000000000000.idx # Offset index
│ └── 00000000000000001000.log # Next segment
└── partition-1/
Documentation
License
Apache-2.0. See LICENSE.