Skip to main content

Client

Struct Client 

Source
pub struct Client { /* private fields */ }
Expand description

Rivven client for connecting to a Rivven server

Implementations§

Source§

impl Client

Source

pub async fn connect(addr: &str) -> Result<Self>

Connect to a Rivven server (plaintext)

Source

pub async fn authenticate( &mut self, username: &str, password: &str, ) -> Result<AuthSession>

Authenticate with simple username/password

This uses a simple plaintext password protocol. For production use over untrusted networks, prefer authenticate_scram() or use TLS.

Source

pub async fn authenticate_scram( &mut self, username: &str, password: &str, ) -> Result<AuthSession>

Authenticate using SCRAM-SHA-256 (secure challenge-response)

SCRAM-SHA-256 (RFC 5802/7677) provides:

  • Password never sent over the wire
  • Mutual authentication (server proves it knows password too)
  • Protection against replay attacks
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let session = client.authenticate_scram("alice", "password123").await?;
println!("Session: {} (expires in {}s)", session.session_id, session.expires_in);
Source

pub async fn publish( &mut self, topic: impl Into<String>, value: impl Into<Bytes>, ) -> Result<u64>

Publish a message to a topic

Source

pub async fn publish_with_key( &mut self, topic: impl Into<String>, key: Option<impl Into<Bytes>>, value: impl Into<Bytes>, ) -> Result<u64>

Publish a message with a key to a topic

Source

pub async fn publish_to_partition( &mut self, topic: impl Into<String>, partition: u32, key: Option<impl Into<Bytes>>, value: impl Into<Bytes>, ) -> Result<u64>

Publish a message to a specific partition

Source

pub async fn consume( &mut self, topic: impl Into<String>, partition: u32, offset: u64, max_messages: usize, ) -> Result<Vec<MessageData>>

Consume messages from a topic partition

Uses read_uncommitted isolation level (default). For transactional consumers that should not see aborted transaction messages, use Self::consume_with_isolation with isolation_level = 1 (read_committed).

Source

pub async fn consume_with_isolation( &mut self, topic: impl Into<String>, partition: u32, offset: u64, max_messages: usize, isolation_level: Option<u8>, ) -> Result<Vec<MessageData>>

Consume messages from a topic partition with specified isolation level

§Arguments
  • topic - Topic name
  • partition - Partition number
  • offset - Starting offset
  • max_messages - Maximum messages to return
  • isolation_level - Transaction isolation level:
    • None or Some(0) = read_uncommitted (default): Returns all messages
    • Some(1) = read_committed: Filters out messages from aborted transactions
§Read Committed Isolation

When using isolation_level = Some(1) (read_committed), the consumer will:

  • Not see messages from transactions that were aborted
  • Not see control records (transaction markers)
  • Only see committed transactional messages

This is essential for exactly-once semantics (EOS) consumers.

Source

pub async fn consume_read_committed( &mut self, topic: impl Into<String>, partition: u32, offset: u64, max_messages: usize, ) -> Result<Vec<MessageData>>

Consume messages with read_committed isolation level

This is a convenience method for transactional consumers that should only see committed messages. Messages from aborted transactions are filtered out.

Equivalent to calling Self::consume_with_isolation with isolation_level = Some(1).

Source

pub async fn create_topic( &mut self, name: impl Into<String>, partitions: Option<u32>, ) -> Result<u32>

Create a new topic

Source

pub async fn list_topics(&mut self) -> Result<Vec<String>>

List all topics

Source

pub async fn delete_topic(&mut self, name: impl Into<String>) -> Result<()>

Delete a topic

Source

pub async fn commit_offset( &mut self, consumer_group: impl Into<String>, topic: impl Into<String>, partition: u32, offset: u64, ) -> Result<()>

Commit consumer offset

Source

pub async fn get_offset( &mut self, consumer_group: impl Into<String>, topic: impl Into<String>, partition: u32, ) -> Result<Option<u64>>

Get consumer offset

Source

pub async fn get_offset_bounds( &mut self, topic: impl Into<String>, partition: u32, ) -> Result<(u64, u64)>

Get earliest and latest offsets for a topic partition

Returns (earliest, latest) where:

  • earliest: First available offset (messages before this are deleted/compacted)
  • latest: Next offset to be assigned (one past the last message)
Source

pub async fn get_metadata( &mut self, topic: impl Into<String>, ) -> Result<(String, u32)>

Get topic metadata

Source

pub async fn ping(&mut self) -> Result<()>

Ping the server

Source

pub async fn register_schema( &self, registry_url: &str, subject: &str, schema_type: &str, schema: &str, ) -> Result<u32>

Register a schema with the schema registry (via HTTP REST API)

The schema registry runs as a separate service (rivven-schema) with a Confluent-compatible REST API. This method performs a minimal HTTP/1.1 POST to {registry_url}/subjects/{subject}/versions without external HTTP deps.

§Arguments
  • registry_url - Schema registry base URL (e.g., http://localhost:8081)
  • subject - Subject name (typically {topic}-key or {topic}-value)
  • schema_type - Schema format: "AVRO", "PROTOBUF", or "JSON"
  • schema - The schema definition string
§Returns

The global schema ID on success.

Source

pub async fn list_groups(&mut self) -> Result<Vec<String>>

List all consumer groups

Source

pub async fn describe_group( &mut self, consumer_group: impl Into<String>, ) -> Result<HashMap<String, HashMap<u32, u64>>>

Describe a consumer group (get all committed offsets)

Source

pub async fn delete_group( &mut self, consumer_group: impl Into<String>, ) -> Result<()>

Delete a consumer group

Source

pub async fn get_offset_for_timestamp( &mut self, topic: impl Into<String>, partition: u32, timestamp_ms: i64, ) -> Result<Option<u64>>

Get the first offset with timestamp >= the given timestamp

§Arguments
  • topic - The topic name
  • partition - The partition number
  • timestamp_ms - Timestamp in milliseconds since Unix epoch
§Returns
  • Some(offset) - The first offset with message timestamp >= timestamp_ms
  • None - No messages found with timestamp >= timestamp_ms
Source

pub async fn describe_topic_configs( &mut self, topics: &[&str], ) -> Result<HashMap<String, HashMap<String, String>>>

Describe topic configurations

Returns the current configuration for the specified topics.

§Arguments
  • topics - Topics to describe (empty slice = all topics)
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let configs = client.describe_topic_configs(&["orders", "events"]).await?;
for (topic, config) in configs {
    println!("{}: {:?}", topic, config);
}
Source

pub async fn alter_topic_config( &mut self, topic: impl Into<String>, configs: &[(&str, Option<&str>)], ) -> Result<AlterTopicConfigResult>

Alter topic configuration

Modifies configuration for an existing topic. Pass None as value to reset a configuration key to its default.

§Arguments
  • topic - Topic name
  • configs - Configuration changes: (key, value) pairs. Use None to reset to default.
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let result = client.alter_topic_config("orders", &[
    ("retention.ms", Some("86400000")),  // 1 day retention
    ("cleanup.policy", Some("compact")), // Enable compaction
]).await?;
println!("Changed {} configs", result.changed_count);
Source

pub async fn create_partitions( &mut self, topic: impl Into<String>, new_partition_count: u32, ) -> Result<u32>

Create additional partitions for an existing topic

Increases the partition count for a topic. The new partition count must be greater than the current count (you cannot reduce partitions).

§Arguments
  • topic - Topic name
  • new_partition_count - New total partition count
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
// Increase from 3 to 6 partitions
let new_count = client.create_partitions("orders", 6).await?;
println!("Topic now has {} partitions", new_count);
Source

pub async fn delete_records( &mut self, topic: impl Into<String>, partition_offsets: &[(u32, u64)], ) -> Result<Vec<DeleteRecordsResult>>

Delete records before a given offset (log truncation)

Removes all records with offsets less than the specified offset for each partition. This is useful for freeing up disk space or removing old data.

§Arguments
  • topic - Topic name
  • partition_offsets - List of (partition, before_offset) pairs
§Returns

A list of results indicating the new low watermark for each partition.

§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
// Delete records before offset 1000 on partitions 0, 1, 2
let results = client.delete_records("orders", &[
    (0, 1000),
    (1, 1000),
    (2, 1000),
]).await?;
for r in results {
    println!("Partition {}: low watermark now {}", r.partition, r.low_watermark);
}
Source

pub async fn init_producer_id( &mut self, previous_producer_id: Option<u64>, ) -> Result<ProducerState>

Initialize an idempotent producer

Returns a producer ID and epoch that should be used for all subsequent idempotent publish operations. The broker uses these to detect and deduplicate messages in case of retries.

§Arguments
  • previous_producer_id - If reconnecting, pass the previous producer_id to bump the epoch (prevents zombie producers)
§Returns

ProducerState containing the producer_id and producer_epoch

§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let producer = client.init_producer_id(None).await?;
println!("Producer ID: {}, Epoch: {}", producer.producer_id, producer.producer_epoch);
Source

pub async fn publish_idempotent( &mut self, topic: impl Into<String>, key: Option<impl Into<Bytes>>, value: impl Into<Bytes>, producer: &mut ProducerState, ) -> Result<(u64, u32, bool)>

Publish a message with idempotent semantics

Uses producer_id/epoch/sequence for exactly-once delivery. The broker deduplicates messages based on these values, making retries safe.

§Arguments
  • topic - Topic to publish to
  • key - Optional message key (used for partitioning)
  • value - Message payload
  • producer - Producer state from init_producer_id
§Returns

Tuple of (offset, partition, was_duplicate)

§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let mut producer = client.init_producer_id(None).await?;

let (offset, partition, duplicate) = client
    .publish_idempotent("orders", None::<Vec<u8>>, b"order-1".to_vec(), &mut producer)
    .await?;

println!("Published to partition {} at offset {}", partition, offset);
if duplicate {
    println!("(This was a retry - message already existed)");
}
Source

pub async fn begin_transaction( &mut self, txn_id: impl Into<String>, producer: &ProducerState, timeout_ms: Option<u64>, ) -> Result<()>

Begin a new transaction

Starts a transaction that can span multiple topics and partitions. All writes within the transaction are atomic - they either all succeed or all fail together.

§Arguments
  • txn_id - Unique transaction identifier (should be stable per producer)
  • producer - Producer state from init_producer_id
  • timeout_ms - Optional transaction timeout (defaults to 60s)
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let producer = client.init_producer_id(None).await?;

// Start a transaction
client.begin_transaction("txn-1", &producer, None).await?;
// ... publish messages ...
client.commit_transaction("txn-1", &producer).await?;
Source

pub async fn add_partitions_to_txn( &mut self, txn_id: impl Into<String>, producer: &ProducerState, partitions: &[(&str, u32)], ) -> Result<usize>

Add partitions to an active transaction

Registers partitions that will be written to within the transaction. This must be called before publishing to a new partition.

§Arguments
  • txn_id - Transaction identifier
  • producer - Producer state
  • partitions - List of (topic, partition) pairs to add
Source

pub async fn publish_transactional( &mut self, txn_id: impl Into<String>, topic: impl Into<String>, key: Option<impl Into<Bytes>>, value: impl Into<Bytes>, producer: &mut ProducerState, ) -> Result<(u64, u32, i32)>

Publish a message within a transaction

Like publish_idempotent, but the message is only visible to consumers after the transaction is committed.

§Arguments
  • txn_id - Transaction identifier
  • topic - Topic to publish to
  • key - Optional message key
  • value - Message payload
  • producer - Producer state with sequence tracking
§Returns

Tuple of (offset, partition, sequence) - offset is pending until commit

Source

pub async fn add_offsets_to_txn( &mut self, txn_id: impl Into<String>, producer: &ProducerState, group_id: impl Into<String>, offsets: &[(&str, u32, i64)], ) -> Result<()>

Add consumer offsets to a transaction

For exactly-once consume-transform-produce patterns: commits consumer offsets atomically with the produced messages.

§Arguments
  • txn_id - Transaction identifier
  • producer - Producer state
  • group_id - Consumer group ID
  • offsets - List of (topic, partition, offset) to commit
Source

pub async fn commit_transaction( &mut self, txn_id: impl Into<String>, producer: &ProducerState, ) -> Result<()>

Commit a transaction

Makes all writes in the transaction visible to consumers atomically. If this fails, the transaction should be aborted.

§Arguments
  • txn_id - Transaction identifier
  • producer - Producer state
Source

pub async fn abort_transaction( &mut self, txn_id: impl Into<String>, producer: &ProducerState, ) -> Result<()>

Abort a transaction

Discards all writes in the transaction. Call this if any write fails or if you need to cancel the transaction.

§Arguments
  • txn_id - Transaction identifier
  • producer - Producer state

Auto Trait Implementations§

§

impl !Freeze for Client

§

impl RefUnwindSafe for Client

§

impl Send for Client

§

impl Sync for Client

§

impl Unpin for Client

§

impl UnwindSafe for Client

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more