rivven-cluster
Distributed cluster coordination for the Rivven event streaming platform.
Overview
rivven-cluster provides the distributed coordination layer for Rivven, implementing consensus, membership, and partition management with hot paths optimized for streaming workloads.
Features
| Feature | Description |
|---|---|
| Raft Consensus | Authenticated leader election and log replication using OpenRaft |
| redb Storage | Pure Rust persistent storage (zero C dependencies) |
| SWIM Gossip | HMAC-authenticated failure detection and membership management |
| ISR Replication | In-Sync Replica tracking with high watermark |
| Partitioning | Consistent hashing with rack awareness |
| QUIC Transport | 0-RTT, multiplexed streams, BBR congestion control |
| Consumer Coordination | Consumer group management with Raft persistence |
Why redb?
Rivven uses redb instead of RocksDB for Raft log storage:
| Aspect | redb | RocksDB |
|---|---|---|
| Build time | ~10s | 2-5 min |
| Binary size | Minimal | +10-15 MB |
| Cross-compile | ✅ Works everywhere | ❌ Needs C++ toolchain |
| Docker musl | ✅ Works | ❌ Needs musl-g++ |
| ACID | ✅ Full | ✅ Full |
| Memory usage | Lower | Higher (bloom filters) |
Architecture
┌───────────────────────────────────────────────────────────────────────────┐
│ rivven-cluster │
├───────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────────────────┐ │
│ │ ClusterCoord │ │ Replication │ │ Consumer Coordinator │ │
│ │ (orchestrate) │ │ Manager │ │ (group management) │ │
│ └───────┬────────┘ └───────┬────────┘ └─────────────┬──────────────┘ │
│ │ │ │ │
│ ┌───────┴────────┐ ┌───────┴────────┐ ┌─────────────┴──────────────┐ │
│ │ Raft Consensus│ │ ISR Tracking │ │ Offset Management │ │
│ │ (metadata) │ │ (replication) │ │ (commit/fetch) │ │
│ └───────┬────────┘ └───────┬────────┘ └─────────────┬──────────────┘ │
│ │ │ │ │
│ ┌───────┴────────┐ ┌───────┴────────┐ ┌─────────────┴──────────────┐ │
│ │ SWIM Gossip │ │ Partition │ │ Metadata Store │ │
│ │ (membership) │ │ Placer │ │ (state machine) │ │
│ └───────┬────────┘ └───────┬────────┘ └─────────────┬──────────────┘ │
│ │ │ │ │
│ └───────────────────┼─────────────────────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ Transport Layer │ │
│ │ (TCP / QUIC) │ │
│ └─────────────────────┘ │
│ │
└───────────────────────────────────────────────────────────────────────────┘
Components
Raft Consensus
Metadata replication with OpenRaft and redb storage:
use ;
let config = RaftNodeConfig ;
let mut node = with_config.await?;
node.start.await?;
SWIM Gossip
Decentralized failure detection with O(N) protocol-period dissemination:
use ;
let config = SwimConfig ;
let membership = new.await?;
membership.join.await?;
ISR Replication
Kafka-style ISR tracking with high watermark:
use ;
let config = ReplicationConfig ;
let manager = new;
let partition = manager.get_or_create;
// Handle follower fetch and update ISR
manager.handle_replica_fetch.await?;
Follower Persistence: FollowerFetcher persists records via Partition::append_replicated_batch() before advancing the fetch offset. This ensures ISR followers hold real data — leader failure does not cause data loss. Replica state reporting tracks consecutive failures and logs warnings after 5+ failures for operator visibility.
Leader Transitions: become_leader() and become_follower() use AtomicBool for lock-free concurrent access through Arc<PartitionReplication>. log_end_offset updates use fetch_max() to prevent non-monotonic regression under concurrent appends.
Partition Placement
Consistent hashing with rack awareness:
use ;
let config = PlacementConfig ;
let mut placer = new;
placer.add_node;
let replicas = placer.assign_partition?;
QUIC Transport (Optional)
High-performance transport with 0-RTT and multiplexing:
use ;
let config = high_throughput;
let tls = self_signed?;
let transport = new.await?;
transport.start.await?;
let response = transport.send.await?;
Feature Flags
[]
= ["raft", "swim", "metrics-prometheus", "compression"]
= ["openraft", "redb", "reqwest"]
= []
= ["quinn", "rustls", "rcgen"]
= ["raft", "swim", "metrics-prometheus", "compression", "quic"]
Configuration
cluster:
node_id: "node-1"
mode: cluster # or "standalone"
rack: "rack-1"
client_addr: "0.0.0.0:9092"
cluster_addr: "0.0.0.0:9093"
seeds:
- "node-2:9093"
- "node-3:9093"
swim:
ping_interval_ms: 100
ping_timeout_ms: 50
indirect_probes: 3
suspicion_multiplier: 3
raft:
heartbeat_interval_ms: 50
election_timeout_min_ms: 150
election_timeout_max_ms: 300
snapshot_threshold: 10000
replication:
min_isr: 2
replica_lag_max_messages: 1000
replica_lag_max_time_secs: 10
fetch_interval_ms: 50
topic_defaults:
partitions: 6
replication_factor: 3
Testing
# Unit tests (68 tests)
# Integration tests (36 tests, 4 ignored for chaos testing)
# Standalone stress tests
Documentation
License
Apache-2.0. See LICENSE.