rivven-client
Native Rust client library for the Rivven event streaming platform.
Overview
rivven-client is a production-grade async client with connection pooling, automatic failover, circuit breakers, and exactly-once semantics.
Features
| Category | Features |
|---|---|
| Connectivity | Connection pooling, request pipelining, automatic failover |
| Resilience | Circuit breaker, exponential backoff, health monitoring |
| Security | TLS/mTLS, SCRAM-SHA-256 authentication |
| Semantics | Transactions, idempotent producer, exactly-once delivery |
Installation
[]
= "0.2"
# With TLS support
= { = "0.2", = ["tls"] }
Usage
Basic Client
For simple use cases, use the basic Client:
use Client;
async
Authentication
Rivven supports multiple authentication methods:
use Client;
async
Production-Grade Resilient Client
For production deployments, use ResilientClient which provides:
- Connection pooling across multiple servers
- Automatic retry with exponential backoff and jitter
- Circuit breaker pattern for fault isolation
- Real-time health monitoring
use ;
use Duration;
async
Circuit Breaker Behavior
The circuit breaker protects against cascading failures:
- Closed (Normal): Requests flow normally. Failures are counted.
- Open (Failing): After threshold failures, the circuit opens. All requests fail fast without attempting connection.
- Half-Open (Recovery): After recovery timeout, one request is allowed through. If successful, circuit closes; if failed, circuit reopens.
// Circuit breaker configuration
let config = builder
.servers
.circuit_breaker_failure_threshold // Open after 5 failures
.circuit_breaker_recovery_timeout // Try recovery after 30s
.build;
Retry with Exponential Backoff
Failed operations are automatically retried with exponential backoff and jitter:
let config = builder
.servers
.max_retries // Retry up to 3 times
.retry_initial_delay // Start with 100ms delay
.retry_max_delay // Cap at 5 seconds
.retry_multiplier // Double delay each retry
.build;
High-Throughput Pipelined Client
For maximum throughput, use PipelinedClient which allows multiple in-flight requests over a single connection. Supports optional TLS and authentication:
use ;
async
Pipeline Configuration
| Config | Default | High-Throughput | Low-Latency |
|---|---|---|---|
max_in_flight |
100 | 1000 | 32 |
batch_linger_us |
1000 | 5000 | 0 |
max_batch_size |
64 | 256 | 1 |
request_timeout |
30s | 60s | 10s |
High-Performance Producer
For maximum throughput with all best practices, use Producer:
use ;
use Arc;
async
Producer Features
| Feature | Description |
|---|---|
| Authentication | SCRAM-SHA-256 auto-auth via ProducerAuthConfig |
| Auto-Handshake | Protocol version negotiated on connect |
| Compression | LZ4/Snappy/Zstd batch compression (feature-gated) |
| Idempotency | Sequence tracking + IdempotentPublish wire type; is_idempotent() detects silent degradation |
| Metadata Cache | TTL-based caching with persistent metadata client (avoids per-topic connection churn) |
| Sticky Partitioning | Batches keyless messages to same partition |
| Backpressure | Memory-bounded buffers prevent OOM; applies to standard, idempotent, and transactional publish paths |
| Murmur2 Hashing | Kafka-compatible key partitioning (optimized) |
| Batched I/O | Single flush per batch minimizes syscalls |
| Pipelined Responses | Write-all, then read-all for throughput |
| Multi-Server Failover | Tries all bootstrap servers on connect |
| Flush Safety | pending_records correctly decremented on batch failure; flush() always terminates |
| Completion Tracking | flush() waits for all pending records |
| Metadata Refresh | refresh_metadata() fetches partition info |
Producer Configuration
| Config | Default | High-Throughput | Low-Latency | Exactly-Once |
|---|---|---|---|---|
batch_size |
16KB | 64KB | 1 | 16KB |
linger_ms |
0 | 10 | 0 | 0 |
compression_type |
None | Lz4 | None | None |
max_in_flight_requests |
5 | 10 | 1 | 5 |
enable_idempotence |
false | false | false | true |
acks |
1 | 1 | 1 | -1 (all) |
auth |
None | — | — | — |
Health Monitoring
Monitor client and server health in real-time:
let stats = client.stats.await;
println!;
println!;
println!;
println!;
println!;
for server in &stats.servers
Admin Operations
use Client;
async
Advanced Admin API
Rivven supports advanced admin operations for topic configuration management:
use Client;
async
Supported Topic Configurations
| Configuration | Description | Example |
|---|---|---|
retention.ms |
Message retention time | 86400000 (1 day) |
retention.bytes |
Max partition size | 1073741824 (1 GB) |
max.message.bytes |
Max message size | 2097152 (2 MB) |
segment.bytes |
Segment file size | 536870912 (512 MB) |
segment.ms |
Segment rotation time | 604800000 (7 days) |
cleanup.policy |
delete or compact |
compact |
min.insync.replicas |
Min ISR for acks=all | 2 |
compression.type |
lz4, zstd, snappy, gzip |
lz4 |
### Schema Registration
Register schemas with the Rivven Schema Registry (`rivven-schema`) directly from the client using a lightweight HTTP/1.1 call — no external HTTP dependencies required:
```rust
use rivven_client::Client;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = Client::connect("localhost:9092").await?;
// Register an Avro schema with the schema registry
let schema_id = client.register_schema(
"http://localhost:8081", // Schema registry URL
"users-value", // Subject name
"AVRO", // Schema type: AVRO, PROTOBUF, or JSON
r#"{"type":"record","name":"User","fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#,
).await?;
println!("Registered schema with ID: {}", schema_id);
Ok(())
}
Note: For advanced schema registry operations (compatibility checks, Glue integration, codec management), use
rivven-connect'sSchemaRegistryClient. TheClient::register_schema()method is designed for quick schema bootstrapping without additional dependencies.
Security: HTTP responses from the registry are bounded by
MAX_CHUNK_SIZE(16 MB per chunk) andMAX_RESPONSE_SIZE(16 MB total) to prevent memory exhaustion from malicious or misconfigured registries.
Transactions & Idempotent Producer
Rivven supports native transactions and idempotent producers for exactly-once semantics:
Idempotent Producer
Automatic message deduplication using producer IDs and sequence numbers:
use Client;
async
Transactions
Atomic, all-or-nothing message delivery across partitions and topics:
use Client;
async
Exactly-Once Consume-Transform-Produce
For stream processing with exactly-once semantics:
use Client;
async
Transaction Error Handling
use ;
// On error, abort the transaction
match client.commit_transaction.await
Configuration Reference
ResilientClientConfig
| Option | Default | Description |
|---|---|---|
servers |
Required | List of server addresses |
pool_size_per_server |
10 | Maximum connections per server |
connection_timeout |
10s | Timeout for establishing connections |
request_timeout |
30s | Timeout for individual requests |
max_retries |
3 | Maximum retry attempts |
retry_initial_delay |
100ms | Initial retry delay |
retry_max_delay |
5s | Maximum retry delay |
retry_multiplier |
2.0 | Delay multiplier between retries |
circuit_breaker_failure_threshold |
5 | Failures before circuit opens |
circuit_breaker_recovery_timeout |
30s | Time before attempting recovery |
max_connection_lifetime |
300s | Maximum time a pooled connection can be reused before recycling |
Error Handling
The client provides typed errors for different failure modes:
use ;
match client.publish.await
TLS Configuration
Enable TLS for secure connections:
= { = "0.2", = ["tls"] }
use ;
let tls_config = builder
.ca_cert_path
.client_cert_path
.client_key_path
.build?;
let client = connect_with_tls.await?;
Documentation
License
Apache-2.0. See LICENSE.