Skip to main content

Crate rivven_client

Crate rivven_client 

Source
Expand description

§rivven-client

Native async Rust client library for Rivven, the high-performance, single-binary event streaming platform.

§Features

  • Async/Await: Built on Tokio for high-performance async I/O
  • Connection Pooling: Efficient connection management with ResilientClient
  • Circuit Breaker: Automatic failure detection and recovery
  • Automatic Retries: Exponential backoff with configurable limits
  • Multi-Server Failover: Bootstrap server list with automatic failover
  • SCRAM-SHA-256 Authentication: Secure password-based authentication
  • TLS Support: Optional TLS encryption (requires tls feature)

§Quick Start

use rivven_client::{ResilientClient, ResilientClientConfig};

// Create a client with connection pooling and circuit breaker
let config = ResilientClientConfig::builder()
    .servers(vec!["127.0.0.1:9092".to_string()])
    .max_connections(10)
    .build()?;

let client = ResilientClient::new(config).await?;

// Publish a message
client.publish("my-topic", b"Hello, Rivven!").await?;

// Consume messages
let messages = client.consume("my-topic", 0, 0, 100).await?;

§Authentication

The client supports SCRAM-SHA-256 authentication:

use rivven_client::{Client, AuthSession};

let mut client = Client::connect("127.0.0.1:9092").await?;

// Authenticate with SCRAM-SHA-256
client.authenticate_scram("username", "password").await?;

§Resilient Client

For production use, ResilientClient provides:

  • Connection Pooling: Reuses connections across requests
  • Circuit Breaker: Stops sending requests to failing servers
  • Retries: Automatic retries with exponential backoff
  • Failover: Tries multiple bootstrap servers
use rivven_client::{ResilientClient, ResilientClientConfig};

let config = ResilientClientConfig::builder()
    .servers(vec![
        "broker1:9092".to_string(),
        "broker2:9092".to_string(),
        "broker3:9092".to_string(),
    ])
    .max_connections(20)
    .max_retries(5)
    .circuit_breaker_threshold(3)
    .circuit_breaker_timeout(std::time::Duration::from_secs(30))
    .build()?;

let client = ResilientClient::new(config).await?;

§Request Pipelining

For maximum throughput, use PipelinedClient which allows multiple in-flight requests over a single connection:

use rivven_client::{PipelinedClient, PipelineConfig};

// High-throughput configuration
let config = PipelineConfig::high_throughput();
let client = PipelinedClient::connect("127.0.0.1:9092", config).await?;

// Send 1000 requests concurrently
let handles: Vec<_> = (0..1000)
    .map(|i| {
        let client = client.clone();
        tokio::spawn(async move {
            client.publish("topic", format!("msg-{}", i)).await
        })
    })
    .collect();

for handle in handles {
    handle.await??;
}

§Feature Flags

  • tls - Enable TLS support via rustls

§High-Performance Producer

For maximum throughput with all best practices, use Producer:

use rivven_client::{Producer, ProducerConfig};
use std::sync::Arc;

let config = ProducerConfig::builder()
    .bootstrap_servers(vec!["localhost:9092".to_string()])
    .batch_size(16384)
    .linger_ms(5)
    .enable_idempotence(true)
    .build()?;

let producer = Arc::new(Producer::new(config).await?);

// Thread-safe sharing
for i in 0..1000 {
    let producer = Arc::clone(&producer);
    tokio::spawn(async move {
        producer.send("topic", format!("msg-{}", i)).await
    });
}

Re-exports§

pub use client::AlterTopicConfigResult;
pub use client::AuthSession;
pub use client::Client;
pub use client::ProducerState;
pub use consumer::Consumer;
pub use consumer::ConsumerAuthConfig;
pub use consumer::ConsumerConfig;
pub use consumer::ConsumerConfigBuilder;
pub use consumer::ConsumerRecord;
pub use consumer::RebalanceListener;
pub use consumer::TopicPartition;
pub use error::Error;
pub use error::Result;
pub use pipeline::PipelineAuthConfig;
pub use pipeline::PipelineConfig;
pub use pipeline::PipelineConfigBuilder;
pub use pipeline::PipelineStatsSnapshot;
pub use pipeline::PipelinedClient;
pub use producer::CompressionType;
pub use producer::Producer;
pub use producer::ProducerAuthConfig;
pub use producer::ProducerConfig;
pub use producer::ProducerConfigBuilder;
pub use producer::ProducerStatsSnapshot;
pub use producer::RecordMetadata;
pub use resilient::ResilientAuthConfig;
pub use resilient::ResilientClient;
pub use resilient::ResilientClientConfig;
pub use resilient::ResilientClientConfigBuilder;

Modules§

client
consumer
High-level consumer API for Rivven (§10.1 fix)
error
pipeline
Request pipelining for high-throughput client operations
producer
High-performance Producer with sticky partitioning, batching, and metadata cache
resilient
Production-grade resilient client with connection pooling, retries, and circuit breaker

Structs§

BrokerInfo
Broker/node information for metadata discovery
DeleteRecordsResult
Result of deleting records from a partition Delete records result for a partition
MessageData
Serialized message data for transport
PartitionMetadata
Partition metadata for cluster discovery
TopicConfigEntry
Topic configuration entry for AlterTopicConfig
TopicMetadata
Topic metadata for cluster discovery

Enums§

Request
Protocol request messages
Response
Protocol response messages
SchemaType
Schema type (format) for schema registry
WireFormat
Wire format identifier

Constants§

MAX_MESSAGE_SIZE
Maximum message size (10 MiB)
PROTOCOL_VERSION
Protocol version for compatibility checking