rivven-cluster 0.0.16

Distributed clustering for Rivven - SWIM membership, Raft consensus, ISR replication
Documentation

rivven-cluster

Distributed cluster coordination for the Rivven event streaming platform.

Overview

rivven-cluster provides the distributed coordination layer for Rivven, implementing consensus, membership, and partition management with hot paths optimized for streaming workloads.

Features

Feature Description
Raft Consensus Authenticated leader election and log replication using OpenRaft
redb Storage Pure Rust persistent storage (zero C dependencies)
SWIM Gossip HMAC-authenticated failure detection and membership management
ISR Replication In-Sync Replica tracking with high watermark
Partitioning Consistent hashing with rack awareness
QUIC Transport 0-RTT, multiplexed streams, BBR congestion control
Consumer Coordination Consumer group management with Raft persistence

Why redb?

Rivven uses redb instead of RocksDB for Raft log storage:

Aspect redb RocksDB
Build time ~10s 2-5 min
Binary size Minimal +10-15 MB
Cross-compile ✅ Works everywhere ❌ Needs C++ toolchain
Docker musl ✅ Works ❌ Needs musl-g++
ACID ✅ Full ✅ Full
Memory usage Lower Higher (bloom filters)

Architecture

┌───────────────────────────────────────────────────────────────────────────┐
│                           rivven-cluster                                   │
├───────────────────────────────────────────────────────────────────────────┤
│                                                                            │
│  ┌────────────────┐  ┌────────────────┐  ┌────────────────────────────┐   │
│  │  ClusterCoord  │  │  Replication   │  │    Consumer Coordinator    │   │
│  │  (orchestrate) │  │   Manager      │  │    (group management)      │   │
│  └───────┬────────┘  └───────┬────────┘  └─────────────┬──────────────┘   │
│          │                   │                         │                   │
│  ┌───────┴────────┐  ┌───────┴────────┐  ┌─────────────┴──────────────┐   │
│  │  Raft Consensus│  │ ISR Tracking   │  │   Offset Management        │   │
│  │  (metadata)    │  │ (replication)  │  │   (commit/fetch)           │   │
│  └───────┬────────┘  └───────┬────────┘  └─────────────┬──────────────┘   │
│          │                   │                         │                   │
│  ┌───────┴────────┐  ┌───────┴────────┐  ┌─────────────┴──────────────┐   │
│  │  SWIM Gossip   │  │  Partition     │  │   Metadata Store           │   │
│  │  (membership)  │  │  Placer        │  │   (state machine)          │   │
│  └───────┬────────┘  └───────┬────────┘  └─────────────┬──────────────┘   │
│          │                   │                         │                   │
│          └───────────────────┼─────────────────────────┘                   │
│                              │                                             │
│                   ┌──────────┴──────────┐                                  │
│                   │  Transport Layer    │                                  │
│                   │  (TCP / QUIC)       │                                  │
│                   └─────────────────────┘                                  │
│                                                                            │
└───────────────────────────────────────────────────────────────────────────┘

Components

Raft Consensus

Metadata replication with OpenRaft and redb storage:

use rivven_cluster::{RaftNode, RaftNodeConfig};

let config = RaftNodeConfig {
    node_id: "node-1".to_string(),
    standalone: false,
    data_dir: "/var/lib/rivven/raft".into(),
    heartbeat_interval_ms: 50,
    election_timeout_min_ms: 150,
    election_timeout_max_ms: 300,
    snapshot_threshold: 10000,
    initial_members: vec![],
};

let mut node = RaftNode::with_config(config).await?;
node.start().await?;

SWIM Gossip

Decentralized failure detection with O(N) protocol-period dissemination:

use rivven_cluster::{Membership, SwimConfig, NodeInfo};

let config = SwimConfig {
    ping_interval: Duration::from_millis(100),
    ping_timeout: Duration::from_millis(50),
    indirect_probes: 3,
    suspicion_multiplier: 3,
    ..Default::default()
};

let membership = Membership::new(local_node, config, shutdown_rx).await?;
membership.join(&seeds).await?;

ISR Replication

Kafka-style ISR tracking with high watermark:

use rivven_cluster::{ReplicationManager, ReplicationConfig, PartitionReplication};

let config = ReplicationConfig {
    min_isr: 2,
    replica_lag_max_messages: 1000,
    replica_lag_max_time: Duration::from_secs(10),
    ..Default::default()
};

let manager = ReplicationManager::new(node_id, config);
let partition = manager.get_or_create(partition_id, is_leader);

// Handle follower fetch and update ISR
manager.handle_replica_fetch(&partition_id, &replica_id, fetch_offset).await?;

Partition Placement

Consistent hashing with rack awareness:

use rivven_cluster::{PartitionPlacer, PlacementConfig, PlacementStrategy};

let config = PlacementConfig {
    strategy: PlacementStrategy::ConsistentHash,
    rack_aware: true,
    virtual_nodes: 150,
    max_partitions_per_node: 0,
};

let mut placer = PartitionPlacer::new(config);
placer.add_node(&node);

let replicas = placer.assign_partition("orders", 0, 3)?;

QUIC Transport (Optional)

High-performance transport with 0-RTT and multiplexing:

use rivven_cluster::quic_transport::{QuicTransport, QuicConfig, TlsConfig};

let config = QuicConfig::high_throughput();
let tls = TlsConfig::self_signed("rivven-cluster")?;

let transport = QuicTransport::new(bind_addr, config, tls).await?;
transport.start().await?;

let response = transport.send(&peer_id, request).await?;

Feature Flags

[features]
default = ["raft", "swim", "metrics-prometheus", "compression"]
raft = ["openraft", "redb", "reqwest"]
swim = []
quic = ["quinn", "rustls", "rcgen"]
full = ["raft", "swim", "metrics-prometheus", "compression", "quic"]

Configuration

cluster:
  node_id: "node-1"
  mode: cluster  # or "standalone"
  rack: "rack-1"
  
  client_addr: "0.0.0.0:9092"
  cluster_addr: "0.0.0.0:9093"
  
  seeds:
    - "node-2:9093"
    - "node-3:9093"
  
  swim:
    ping_interval_ms: 100
    ping_timeout_ms: 50
    indirect_probes: 3
    suspicion_multiplier: 3
    
  raft:
    heartbeat_interval_ms: 50
    election_timeout_min_ms: 150
    election_timeout_max_ms: 300
    snapshot_threshold: 10000
    
  replication:
    min_isr: 2
    replica_lag_max_messages: 1000
    replica_lag_max_time_secs: 10
    fetch_interval_ms: 50
    
  topic_defaults:
    partitions: 6
    replication_factor: 3

Testing

# Unit tests (68 tests)
cargo test -p rivven-cluster --lib

# Integration tests (36 tests, 4 ignored for chaos testing)
cargo test -p rivven-cluster --test '*'

# Standalone stress tests
cargo test -p rivven-cluster --test three_node_cluster -- --nocapture

Documentation

License

Apache-2.0. See LICENSE.