rivven-client 0.0.3

High-performance async client for Rivven event streaming platform
Documentation

rivven-client

Native Rust client library for Rivven.

Features

  • Async/Await - Built on Tokio for high-performance async I/O
  • Connection Pooling - Efficient connection management with configurable pool sizes
  • Automatic Failover - Bootstrap server failover and reconnection
  • Circuit Breaker - Fault tolerance with automatic recovery
  • Retry with Exponential Backoff - Automatic retry with jitter for transient failures
  • Health Monitoring - Real-time statistics and health checks
  • TLS Support - Secure connections with rustls
  • SCRAM-SHA-256 Authentication - Secure challenge-response authentication

Installation

[dependencies]
rivven-client = "0.2"
# With TLS support
rivven-client = { version = "0.2", features = ["tls"] }

Usage

Basic Client

For simple use cases, use the basic Client:

use rivven_client::Client;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = Client::connect("localhost:9092").await?;
    
    // Publish a message
    client.publish("my-topic", b"value").await?;
    
    // Consume messages
    let messages = client.consume("my-topic", 0, 0, 100).await?;
    
    Ok(())
}

Authentication

Rivven supports multiple authentication methods:

use rivven_client::Client;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = Client::connect("localhost:9092").await?;
    
    // Simple authentication (use with TLS in production)
    let session = client.authenticate("alice", "password123").await?;
    println!("Session ID: {}", session.session_id);
    
    // SCRAM-SHA-256 authentication (recommended)
    // Password never sent over the wire, mutual authentication
    let session = client.authenticate_scram("alice", "password123").await?;
    println!("Authenticated! Expires in {}s", session.expires_in);
    
    // Now use the authenticated session for operations
    client.publish("my-topic", b"secure message").await?;
    
    Ok(())
}

Production-Grade Resilient Client

For production deployments, use ResilientClient which provides:

  • Connection pooling across multiple servers
  • Automatic retry with exponential backoff and jitter
  • Circuit breaker pattern for fault isolation
  • Real-time health monitoring
use rivven_client::{ResilientClient, ResilientClientConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure the resilient client
    let config = ResilientClientConfig::builder()
        .servers(vec![
            "node1:9092".to_string(),
            "node2:9092".to_string(),
            "node3:9092".to_string(),
        ])
        .pool_size_per_server(5)
        .max_retries(3)
        .retry_initial_delay(Duration::from_millis(100))
        .retry_max_delay(Duration::from_secs(5))
        .circuit_breaker_failure_threshold(5)
        .circuit_breaker_recovery_timeout(Duration::from_secs(30))
        .build();

    // Create the resilient client
    let client = ResilientClient::new(config);
    
    // All operations automatically use connection pooling, 
    // retries, and circuit breakers
    client.publish("my-topic", Some(b"key"), b"value").await?;
    
    // Check client health
    let stats = client.stats().await;
    println!("Active connections: {}", stats.active_connections);
    println!("Healthy servers: {}", stats.healthy_servers);
    
    Ok(())
}

Circuit Breaker Behavior

The circuit breaker protects against cascading failures:

  1. Closed (Normal): Requests flow normally. Failures are counted.
  2. Open (Failing): After threshold failures, the circuit opens. All requests fail fast without attempting connection.
  3. Half-Open (Recovery): After recovery timeout, one request is allowed through. If successful, circuit closes; if failed, circuit reopens.
// Circuit breaker configuration
let config = ResilientClientConfig::builder()
    .servers(vec!["localhost:9092".to_string()])
    .circuit_breaker_failure_threshold(5)     // Open after 5 failures
    .circuit_breaker_recovery_timeout(Duration::from_secs(30))  // Try recovery after 30s
    .build();

Retry with Exponential Backoff

Failed operations are automatically retried with exponential backoff and jitter:

let config = ResilientClientConfig::builder()
    .servers(vec!["localhost:9092".to_string()])
    .max_retries(3)                              // Retry up to 3 times
    .retry_initial_delay(Duration::from_millis(100))  // Start with 100ms delay
    .retry_max_delay(Duration::from_secs(5))     // Cap at 5 seconds
    .retry_multiplier(2.0)                       // Double delay each retry
    .build();

Health Monitoring

Monitor client and server health in real-time:

let stats = client.stats().await;

println!("Client Statistics:");
println!("  Total servers: {}", stats.total_servers);
println!("  Healthy servers: {}", stats.healthy_servers);
println!("  Active connections: {}", stats.active_connections);
println!("  Available connections: {}", stats.available_connections);

for server in &stats.servers {
    println!("\n  Server: {}", server.address);
    println!("    Circuit state: {:?}", server.circuit_state);
    println!("    Active connections: {}", server.active_connections);
    println!("    Available connections: {}", server.available_connections);
}

Admin Operations

use rivven_client::ResilientClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = ResilientClient::new(config);
    
    // Create topic
    client.create_topic("my-topic", 3, 2).await?;
    
    // List topics
    let topics = client.list_topics().await?;
    for topic in topics {
        println!("Topic: {}", topic);
    }
    
    // Get topic info
    let info = client.get_topic_info("my-topic").await?;
    
    // Delete topic
    client.delete_topic("my-topic").await?;
    
    Ok(())
}

Configuration Reference

ResilientClientConfig

Option Default Description
servers Required List of server addresses
pool_size_per_server 10 Maximum connections per server
connection_timeout 10s Timeout for establishing connections
request_timeout 30s Timeout for individual requests
max_retries 3 Maximum retry attempts
retry_initial_delay 100ms Initial retry delay
retry_max_delay 5s Maximum retry delay
retry_multiplier 2.0 Delay multiplier between retries
circuit_breaker_failure_threshold 5 Failures before circuit opens
circuit_breaker_recovery_timeout 30s Time before attempting recovery

Error Handling

The client provides typed errors for different failure modes:

use rivven_client::{ResilientClient, Error};

match client.publish("topic", None, b"data").await {
    Ok(offset) => println!("Published at offset {}", offset),
    Err(Error::CircuitBreakerOpen(server)) => {
        println!("Server {} is unhealthy, circuit breaker open", server);
    }
    Err(Error::AllServersUnavailable) => {
        println!("All servers are unavailable");
    }
    Err(Error::ConnectionError(msg)) => {
        println!("Connection failed: {}", msg);
    }
    Err(e) => println!("Other error: {}", e),
}

TLS Configuration

Enable TLS for secure connections:

rivven-client = { version = "0.2", features = ["tls"] }
use rivven_client::{Client, TlsConfig};

let tls_config = TlsConfig::builder()
    .ca_cert_path("/path/to/ca.crt")
    .client_cert_path("/path/to/client.crt")
    .client_key_path("/path/to/client.key")
    .build()?;

let client = Client::connect_with_tls("localhost:9093", tls_config).await?;

License

Licensed under either of Apache License, Version 2.0 or MIT license at your option.

See root LICENSE file.