rivven-client
Native Rust client library for the Rivven event streaming platform.
Overview
rivven-client is a production-grade async client with connection pooling, automatic failover, circuit breakers, and exactly-once semantics.
Features
| Category |
Features |
| Connectivity |
Connection pooling, request pipelining, automatic failover |
| Resilience |
Circuit breaker, exponential backoff, health monitoring |
| Security |
TLS/mTLS, SCRAM-SHA-256 authentication |
| Semantics |
Transactions, idempotent producer, exactly-once delivery |
Installation
[dependencies]
rivven-client = "0.2"
rivven-client = { version = "0.2", features = ["tls"] }
Usage
Basic Client
For simple use cases, use the basic Client:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
client.publish("my-topic", b"value").await?;
let messages = client.consume("my-topic", 0, 0, 100).await?;
Ok(())
}
Authentication
Rivven supports multiple authentication methods:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
let session = client.authenticate("alice", "password123").await?;
println!("Session ID: {}", session.session_id);
let session = client.authenticate_scram("alice", "password123").await?;
println!("Authenticated! Expires in {}s", session.expires_in);
client.publish("my-topic", b"secure message").await?;
Ok(())
}
Production-Grade Resilient Client
For production deployments, use ResilientClient which provides:
- Connection pooling across multiple servers
- Automatic retry with exponential backoff and jitter
- Circuit breaker pattern for fault isolation
- Real-time health monitoring
use rivven_client::{ResilientClient, ResilientClientConfig};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ResilientClientConfig::builder()
.servers(vec![
"node1:9092".to_string(),
"node2:9092".to_string(),
"node3:9092".to_string(),
])
.pool_size_per_server(5)
.max_retries(3)
.retry_initial_delay(Duration::from_millis(100))
.retry_max_delay(Duration::from_secs(5))
.circuit_breaker_failure_threshold(5)
.circuit_breaker_recovery_timeout(Duration::from_secs(30))
.build();
let client = ResilientClient::new(config);
client.publish("my-topic", Some(b"key"), b"value").await?;
let stats = client.stats().await;
println!("Active connections: {}", stats.active_connections);
println!("Healthy servers: {}", stats.healthy_servers);
Ok(())
}
Circuit Breaker Behavior
The circuit breaker protects against cascading failures:
- Closed (Normal): Requests flow normally. Failures are counted.
- Open (Failing): After threshold failures, the circuit opens. All requests fail fast without attempting connection.
- Half-Open (Recovery): After recovery timeout, one request is allowed through. If successful, circuit closes; if failed, circuit reopens.
let config = ResilientClientConfig::builder()
.servers(vec!["localhost:9092".to_string()])
.circuit_breaker_failure_threshold(5) .circuit_breaker_recovery_timeout(Duration::from_secs(30)) .build();
Retry with Exponential Backoff
Failed operations are automatically retried with exponential backoff and jitter:
let config = ResilientClientConfig::builder()
.servers(vec!["localhost:9092".to_string()])
.max_retries(3) .retry_initial_delay(Duration::from_millis(100)) .retry_max_delay(Duration::from_secs(5)) .retry_multiplier(2.0) .build();
High-Throughput Pipelined Client
For maximum throughput, use PipelinedClient which allows multiple in-flight requests over a single connection:
use rivven_client::{PipelinedClient, PipelineConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = PipelineConfig::high_throughput();
let client = PipelinedClient::connect("localhost:9092", config).await?;
let handles: Vec<_> = (0..1000)
.map(|i| {
let client = client.clone();
tokio::spawn(async move {
client.publish("topic", format!("msg-{}", i)).await
})
})
.collect();
for handle in handles {
handle.await??;
}
let stats = client.stats();
println!("Requests sent: {}", stats.requests_sent);
println!("Responses received: {}", stats.responses_received);
println!("Success rate: {:.1}%", stats.success_rate() * 100.0);
Ok(())
}
Pipeline Configuration
| Config |
Default |
High-Throughput |
Low-Latency |
max_in_flight |
100 |
1000 |
32 |
batch_linger_us |
1000 |
5000 |
0 |
max_batch_size |
64 |
256 |
1 |
request_timeout |
30s |
60s |
10s |
Health Monitoring
Monitor client and server health in real-time:
let stats = client.stats().await;
println!("Client Statistics:");
println!(" Total servers: {}", stats.total_servers);
println!(" Healthy servers: {}", stats.healthy_servers);
println!(" Active connections: {}", stats.active_connections);
println!(" Available connections: {}", stats.available_connections);
for server in &stats.servers {
println!("\n Server: {}", server.address);
println!(" Circuit state: {:?}", server.circuit_state);
println!(" Active connections: {}", server.active_connections);
println!(" Available connections: {}", server.available_connections);
}
Admin Operations
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
client.create_topic("my-topic", Some(3)).await?;
let topics = client.list_topics().await?;
for topic in topics {
println!("Topic: {}", topic);
}
client.delete_topic("my-topic").await?;
Ok(())
}
Advanced Admin API
Rivven supports advanced admin operations for topic configuration management:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
client.create_topic("events", Some(3)).await?;
let configs = client.describe_topic_configs(&["events"]).await?;
for (topic, config) in &configs {
println!("Topic '{}' configuration:", topic);
for (key, value) in config {
println!(" {}: {}", key, value);
}
}
let result = client.alter_topic_config("events", &[
("retention.ms", Some("86400000")), ("cleanup.policy", Some("compact")), ("max.message.bytes", Some("2097152")), ]).await?;
println!("Changed {} config entries", result.changed_count);
client.alter_topic_config("events", &[
("retention.ms", None), ]).await?;
let new_count = client.create_partitions("events", 6).await?;
println!("Topic now has {} partitions", new_count);
let results = client.delete_records("events", &[
(0, 1000), (1, 500), ]).await?;
for result in results {
println!("Partition {}: low watermark now {}",
result.partition, result.low_watermark);
}
Ok(())
}
Supported Topic Configurations
| Configuration |
Description |
Example |
retention.ms |
Message retention time |
86400000 (1 day) |
retention.bytes |
Max partition size |
1073741824 (1 GB) |
max.message.bytes |
Max message size |
2097152 (2 MB) |
segment.bytes |
Segment file size |
536870912 (512 MB) |
segment.ms |
Segment rotation time |
604800000 (7 days) |
cleanup.policy |
delete or compact |
compact |
min.insync.replicas |
Min ISR for acks=all |
2 |
compression.type |
lz4, zstd, snappy, gzip |
lz4 |
### Transactions & Idempotent Producer
Rivven supports native transactions and idempotent producers for exactly-once semantics:
#### Idempotent Producer
Automatic message deduplication using producer IDs and sequence numbers:
```rust
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
// Initialize idempotent producer (assigns producer_id and epoch)
let mut producer = client.init_producer_id(None).await?;
println!("Producer ID: {}, Epoch: {}", producer.producer_id, producer.producer_epoch);
// Publish with deduplication
let (offset, partition, was_duplicate) = client
.publish_idempotent("orders", None::<Vec<u8>>, b"order-data".to_vec(), &mut producer)
.await?;
if was_duplicate {
println!("Message was a duplicate (already delivered)");
} else {
println!("Published to partition {} at offset {}", partition, offset);
}
Ok(())
}
Transactions
Atomic, all-or-nothing message delivery across partitions and topics:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
let mut producer = client.init_producer_id(None).await?;
let txn_id = "payment-processor";
client.begin_transaction(txn_id, &producer, None).await?;
client.add_partitions_to_txn(txn_id, &producer, &[
("orders", 0),
("payments", 0),
]).await?;
client.publish_transactional(txn_id, "orders", None::<Vec<u8>>, b"order-created".to_vec(), &mut producer).await?;
client.publish_transactional(txn_id, "payments", None::<Vec<u8>>, b"payment-processed".to_vec(), &mut producer).await?;
client.commit_transaction(txn_id, &producer).await?;
println!("Transaction committed atomically!");
Ok(())
}
Exactly-Once Consume-Transform-Produce
For stream processing with exactly-once semantics:
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
let mut producer = client.init_producer_id(None).await?;
let txn_id = "stream-processor";
let consumer_group = "processor-group";
client.begin_transaction(txn_id, &producer, None).await?;
client.add_partitions_to_txn(txn_id, &producer, &[("output-topic", 0)]).await?;
let messages = client.consume("input-topic", 0, 0, 100).await?;
for msg in &messages {
let transformed = format!("processed: {:?}", msg.value);
client.publish_transactional(
txn_id, "output-topic", None::<Vec<u8>>,
transformed.into_bytes(), &mut producer
).await?;
}
client.add_offsets_to_txn(
txn_id, &producer, consumer_group,
&[("input-topic", 0, messages.len() as i64)]
).await?;
client.commit_transaction(txn_id, &producer).await?;
Ok(())
}
Transaction Error Handling
use rivven_client::{Client, Error};
match client.commit_transaction(txn_id, &producer).await {
Ok(()) => println!("Committed successfully"),
Err(e) => {
eprintln!("Commit failed: {}", e);
client.abort_transaction(txn_id, &producer).await?;
}
}
Configuration Reference
ResilientClientConfig
| Option |
Default |
Description |
servers |
Required |
List of server addresses |
pool_size_per_server |
10 |
Maximum connections per server |
connection_timeout |
10s |
Timeout for establishing connections |
request_timeout |
30s |
Timeout for individual requests |
max_retries |
3 |
Maximum retry attempts |
retry_initial_delay |
100ms |
Initial retry delay |
retry_max_delay |
5s |
Maximum retry delay |
retry_multiplier |
2.0 |
Delay multiplier between retries |
circuit_breaker_failure_threshold |
5 |
Failures before circuit opens |
circuit_breaker_recovery_timeout |
30s |
Time before attempting recovery |
Error Handling
The client provides typed errors for different failure modes:
use rivven_client::{ResilientClient, Error};
match client.publish("topic", None, b"data").await {
Ok(offset) => println!("Published at offset {}", offset),
Err(Error::CircuitBreakerOpen(server)) => {
println!("Server {} is unhealthy, circuit breaker open", server);
}
Err(Error::AllServersUnavailable) => {
println!("All servers are unavailable");
}
Err(Error::ConnectionError(msg)) => {
println!("Connection failed: {}", msg);
}
Err(e) => println!("Other error: {}", e),
}
TLS Configuration
Enable TLS for secure connections:
rivven-client = { version = "0.2", features = ["tls"] }
use rivven_client::{Client, TlsConfig};
let tls_config = TlsConfig::builder()
.ca_cert_path("/path/to/ca.crt")
.client_cert_path("/path/to/client.crt")
.client_key_path("/path/to/client.key")
.build()?;
let client = Client::connect_with_tls("localhost:9093", tls_config).await?;
Documentation
License
Apache-2.0. See LICENSE.