pub struct Client { /* private fields */ }Expand description
Rivven client for connecting to a Rivven server
Implementations§
Source§impl Client
impl Client
Sourcepub async fn authenticate(
&mut self,
username: &str,
password: &str,
) -> Result<AuthSession>
pub async fn authenticate( &mut self, username: &str, password: &str, ) -> Result<AuthSession>
Authenticate with simple username/password
This uses a simple plaintext password protocol. For production use over
untrusted networks, prefer authenticate_scram() or use TLS.
Sourcepub async fn authenticate_scram(
&mut self,
username: &str,
password: &str,
) -> Result<AuthSession>
pub async fn authenticate_scram( &mut self, username: &str, password: &str, ) -> Result<AuthSession>
Authenticate using SCRAM-SHA-256 (secure challenge-response)
SCRAM-SHA-256 (RFC 5802/7677) provides:
- Password never sent over the wire
- Mutual authentication (server proves it knows password too)
- Protection against replay attacks
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let session = client.authenticate_scram("alice", "password123").await?;
println!("Session: {} (expires in {}s)", session.session_id, session.expires_in);Sourcepub async fn publish(
&mut self,
topic: impl Into<String>,
value: impl Into<Bytes>,
) -> Result<u64>
pub async fn publish( &mut self, topic: impl Into<String>, value: impl Into<Bytes>, ) -> Result<u64>
Publish a message to a topic
Sourcepub async fn publish_with_key(
&mut self,
topic: impl Into<String>,
key: Option<impl Into<Bytes>>,
value: impl Into<Bytes>,
) -> Result<u64>
pub async fn publish_with_key( &mut self, topic: impl Into<String>, key: Option<impl Into<Bytes>>, value: impl Into<Bytes>, ) -> Result<u64>
Publish a message with a key to a topic
Sourcepub async fn publish_to_partition(
&mut self,
topic: impl Into<String>,
partition: u32,
key: Option<impl Into<Bytes>>,
value: impl Into<Bytes>,
) -> Result<u64>
pub async fn publish_to_partition( &mut self, topic: impl Into<String>, partition: u32, key: Option<impl Into<Bytes>>, value: impl Into<Bytes>, ) -> Result<u64>
Publish a message to a specific partition
Sourcepub async fn consume(
&mut self,
topic: impl Into<String>,
partition: u32,
offset: u64,
max_messages: usize,
) -> Result<Vec<MessageData>>
pub async fn consume( &mut self, topic: impl Into<String>, partition: u32, offset: u64, max_messages: usize, ) -> Result<Vec<MessageData>>
Consume messages from a topic partition
Uses read_uncommitted isolation level (default).
For transactional consumers that should not see aborted transaction messages,
use Self::consume_with_isolation with isolation_level = 1 (read_committed).
Sourcepub async fn consume_with_isolation(
&mut self,
topic: impl Into<String>,
partition: u32,
offset: u64,
max_messages: usize,
isolation_level: Option<u8>,
) -> Result<Vec<MessageData>>
pub async fn consume_with_isolation( &mut self, topic: impl Into<String>, partition: u32, offset: u64, max_messages: usize, isolation_level: Option<u8>, ) -> Result<Vec<MessageData>>
Consume messages from a topic partition with specified isolation level
§Arguments
topic- Topic namepartition- Partition numberoffset- Starting offsetmax_messages- Maximum messages to returnisolation_level- Transaction isolation level:NoneorSome(0)= read_uncommitted (default): Returns all messagesSome(1)= read_committed: Filters out messages from aborted transactions
§Read Committed Isolation
When using isolation_level = Some(1) (read_committed), the consumer will:
- Not see messages from transactions that were aborted
- Not see control records (transaction markers)
- Only see committed transactional messages
This is essential for exactly-once semantics (EOS) consumers.
Sourcepub async fn consume_read_committed(
&mut self,
topic: impl Into<String>,
partition: u32,
offset: u64,
max_messages: usize,
) -> Result<Vec<MessageData>>
pub async fn consume_read_committed( &mut self, topic: impl Into<String>, partition: u32, offset: u64, max_messages: usize, ) -> Result<Vec<MessageData>>
Consume messages with read_committed isolation level
This is a convenience method for transactional consumers that should only see committed messages. Messages from aborted transactions are filtered out.
Equivalent to calling Self::consume_with_isolation with isolation_level = Some(1).
Sourcepub async fn create_topic(
&mut self,
name: impl Into<String>,
partitions: Option<u32>,
) -> Result<u32>
pub async fn create_topic( &mut self, name: impl Into<String>, partitions: Option<u32>, ) -> Result<u32>
Create a new topic
Sourcepub async fn list_topics(&mut self) -> Result<Vec<String>>
pub async fn list_topics(&mut self) -> Result<Vec<String>>
List all topics
Sourcepub async fn commit_offset(
&mut self,
consumer_group: impl Into<String>,
topic: impl Into<String>,
partition: u32,
offset: u64,
) -> Result<()>
pub async fn commit_offset( &mut self, consumer_group: impl Into<String>, topic: impl Into<String>, partition: u32, offset: u64, ) -> Result<()>
Commit consumer offset
Sourcepub async fn get_offset(
&mut self,
consumer_group: impl Into<String>,
topic: impl Into<String>,
partition: u32,
) -> Result<Option<u64>>
pub async fn get_offset( &mut self, consumer_group: impl Into<String>, topic: impl Into<String>, partition: u32, ) -> Result<Option<u64>>
Get consumer offset
Sourcepub async fn get_offset_bounds(
&mut self,
topic: impl Into<String>,
partition: u32,
) -> Result<(u64, u64)>
pub async fn get_offset_bounds( &mut self, topic: impl Into<String>, partition: u32, ) -> Result<(u64, u64)>
Get earliest and latest offsets for a topic partition
Returns (earliest, latest) where:
- earliest: First available offset (messages before this are deleted/compacted)
- latest: Next offset to be assigned (one past the last message)
Sourcepub async fn get_metadata(
&mut self,
topic: impl Into<String>,
) -> Result<(String, u32)>
pub async fn get_metadata( &mut self, topic: impl Into<String>, ) -> Result<(String, u32)>
Get topic metadata
Sourcepub async fn register_schema(
&self,
registry_url: &str,
subject: &str,
schema_type: &str,
schema: &str,
) -> Result<u32>
pub async fn register_schema( &self, registry_url: &str, subject: &str, schema_type: &str, schema: &str, ) -> Result<u32>
Register a schema with the schema registry (via HTTP REST API)
The schema registry runs as a separate service (rivven-schema) with a
Confluent-compatible REST API. This method performs a minimal HTTP/1.1 POST
to {registry_url}/subjects/{subject}/versions without external HTTP deps.
§Arguments
registry_url- Schema registry base URL (e.g.,http://localhost:8081)subject- Subject name (typically{topic}-keyor{topic}-value)schema_type- Schema format:"AVRO","PROTOBUF", or"JSON"schema- The schema definition string
§Returns
The global schema ID on success.
Sourcepub async fn list_groups(&mut self) -> Result<Vec<String>>
pub async fn list_groups(&mut self) -> Result<Vec<String>>
List all consumer groups
Sourcepub async fn describe_group(
&mut self,
consumer_group: impl Into<String>,
) -> Result<HashMap<String, HashMap<u32, u64>>>
pub async fn describe_group( &mut self, consumer_group: impl Into<String>, ) -> Result<HashMap<String, HashMap<u32, u64>>>
Describe a consumer group (get all committed offsets)
Sourcepub async fn delete_group(
&mut self,
consumer_group: impl Into<String>,
) -> Result<()>
pub async fn delete_group( &mut self, consumer_group: impl Into<String>, ) -> Result<()>
Delete a consumer group
Sourcepub async fn get_offset_for_timestamp(
&mut self,
topic: impl Into<String>,
partition: u32,
timestamp_ms: i64,
) -> Result<Option<u64>>
pub async fn get_offset_for_timestamp( &mut self, topic: impl Into<String>, partition: u32, timestamp_ms: i64, ) -> Result<Option<u64>>
Get the first offset with timestamp >= the given timestamp
§Arguments
topic- The topic namepartition- The partition numbertimestamp_ms- Timestamp in milliseconds since Unix epoch
§Returns
Some(offset)- The first offset with message timestamp >= timestamp_msNone- No messages found with timestamp >= timestamp_ms
Sourcepub async fn describe_topic_configs(
&mut self,
topics: &[&str],
) -> Result<HashMap<String, HashMap<String, String>>>
pub async fn describe_topic_configs( &mut self, topics: &[&str], ) -> Result<HashMap<String, HashMap<String, String>>>
Describe topic configurations
Returns the current configuration for the specified topics.
§Arguments
topics- Topics to describe (empty slice = all topics)
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let configs = client.describe_topic_configs(&["orders", "events"]).await?;
for (topic, config) in configs {
println!("{}: {:?}", topic, config);
}Sourcepub async fn alter_topic_config(
&mut self,
topic: impl Into<String>,
configs: &[(&str, Option<&str>)],
) -> Result<AlterTopicConfigResult>
pub async fn alter_topic_config( &mut self, topic: impl Into<String>, configs: &[(&str, Option<&str>)], ) -> Result<AlterTopicConfigResult>
Alter topic configuration
Modifies configuration for an existing topic. Pass None as value to reset
a configuration key to its default.
§Arguments
topic- Topic nameconfigs- Configuration changes: (key, value) pairs. UseNoneto reset to default.
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let result = client.alter_topic_config("orders", &[
("retention.ms", Some("86400000")), // 1 day retention
("cleanup.policy", Some("compact")), // Enable compaction
]).await?;
println!("Changed {} configs", result.changed_count);Sourcepub async fn create_partitions(
&mut self,
topic: impl Into<String>,
new_partition_count: u32,
) -> Result<u32>
pub async fn create_partitions( &mut self, topic: impl Into<String>, new_partition_count: u32, ) -> Result<u32>
Create additional partitions for an existing topic
Increases the partition count for a topic. The new partition count must be greater than the current count (you cannot reduce partitions).
§Arguments
topic- Topic namenew_partition_count- New total partition count
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
// Increase from 3 to 6 partitions
let new_count = client.create_partitions("orders", 6).await?;
println!("Topic now has {} partitions", new_count);Sourcepub async fn delete_records(
&mut self,
topic: impl Into<String>,
partition_offsets: &[(u32, u64)],
) -> Result<Vec<DeleteRecordsResult>>
pub async fn delete_records( &mut self, topic: impl Into<String>, partition_offsets: &[(u32, u64)], ) -> Result<Vec<DeleteRecordsResult>>
Delete records before a given offset (log truncation)
Removes all records with offsets less than the specified offset for each partition. This is useful for freeing up disk space or removing old data.
§Arguments
topic- Topic namepartition_offsets- List of (partition, before_offset) pairs
§Returns
A list of results indicating the new low watermark for each partition.
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
// Delete records before offset 1000 on partitions 0, 1, 2
let results = client.delete_records("orders", &[
(0, 1000),
(1, 1000),
(2, 1000),
]).await?;
for r in results {
println!("Partition {}: low watermark now {}", r.partition, r.low_watermark);
}Sourcepub async fn init_producer_id(
&mut self,
previous_producer_id: Option<u64>,
) -> Result<ProducerState>
pub async fn init_producer_id( &mut self, previous_producer_id: Option<u64>, ) -> Result<ProducerState>
Initialize an idempotent producer
Returns a producer ID and epoch that should be used for all subsequent idempotent publish operations. The broker uses these to detect and deduplicate messages in case of retries.
§Arguments
previous_producer_id- If reconnecting, pass the previous producer_id to bump the epoch (prevents zombie producers)
§Returns
ProducerState containing the producer_id and producer_epoch
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let producer = client.init_producer_id(None).await?;
println!("Producer ID: {}, Epoch: {}", producer.producer_id, producer.producer_epoch);Sourcepub async fn publish_idempotent(
&mut self,
topic: impl Into<String>,
key: Option<impl Into<Bytes>>,
value: impl Into<Bytes>,
producer: &mut ProducerState,
) -> Result<(u64, u32, bool)>
pub async fn publish_idempotent( &mut self, topic: impl Into<String>, key: Option<impl Into<Bytes>>, value: impl Into<Bytes>, producer: &mut ProducerState, ) -> Result<(u64, u32, bool)>
Publish a message with idempotent semantics
Uses producer_id/epoch/sequence for exactly-once delivery. The broker deduplicates messages based on these values, making retries safe.
§Arguments
topic- Topic to publish tokey- Optional message key (used for partitioning)value- Message payloadproducer- Producer state frominit_producer_id
§Returns
Tuple of (offset, partition, was_duplicate)
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let mut producer = client.init_producer_id(None).await?;
let (offset, partition, duplicate) = client
.publish_idempotent("orders", None::<Vec<u8>>, b"order-1".to_vec(), &mut producer)
.await?;
println!("Published to partition {} at offset {}", partition, offset);
if duplicate {
println!("(This was a retry - message already existed)");
}Sourcepub async fn begin_transaction(
&mut self,
txn_id: impl Into<String>,
producer: &ProducerState,
timeout_ms: Option<u64>,
) -> Result<()>
pub async fn begin_transaction( &mut self, txn_id: impl Into<String>, producer: &ProducerState, timeout_ms: Option<u64>, ) -> Result<()>
Begin a new transaction
Starts a transaction that can span multiple topics and partitions. All writes within the transaction are atomic - they either all succeed or all fail together.
§Arguments
txn_id- Unique transaction identifier (should be stable per producer)producer- Producer state frominit_producer_idtimeout_ms- Optional transaction timeout (defaults to 60s)
§Example
let mut client = Client::connect("127.0.0.1:9092").await?;
let producer = client.init_producer_id(None).await?;
// Start a transaction
client.begin_transaction("txn-1", &producer, None).await?;
// ... publish messages ...
client.commit_transaction("txn-1", &producer).await?;Sourcepub async fn add_partitions_to_txn(
&mut self,
txn_id: impl Into<String>,
producer: &ProducerState,
partitions: &[(&str, u32)],
) -> Result<usize>
pub async fn add_partitions_to_txn( &mut self, txn_id: impl Into<String>, producer: &ProducerState, partitions: &[(&str, u32)], ) -> Result<usize>
Add partitions to an active transaction
Registers partitions that will be written to within the transaction. This must be called before publishing to a new partition.
§Arguments
txn_id- Transaction identifierproducer- Producer statepartitions- List of (topic, partition) pairs to add
Sourcepub async fn publish_transactional(
&mut self,
txn_id: impl Into<String>,
topic: impl Into<String>,
key: Option<impl Into<Bytes>>,
value: impl Into<Bytes>,
producer: &mut ProducerState,
) -> Result<(u64, u32, i32)>
pub async fn publish_transactional( &mut self, txn_id: impl Into<String>, topic: impl Into<String>, key: Option<impl Into<Bytes>>, value: impl Into<Bytes>, producer: &mut ProducerState, ) -> Result<(u64, u32, i32)>
Publish a message within a transaction
Like publish_idempotent, but the message is only visible to consumers
after the transaction is committed.
§Arguments
txn_id- Transaction identifiertopic- Topic to publish tokey- Optional message keyvalue- Message payloadproducer- Producer state with sequence tracking
§Returns
Tuple of (offset, partition, sequence) - offset is pending until commit
Sourcepub async fn add_offsets_to_txn(
&mut self,
txn_id: impl Into<String>,
producer: &ProducerState,
group_id: impl Into<String>,
offsets: &[(&str, u32, i64)],
) -> Result<()>
pub async fn add_offsets_to_txn( &mut self, txn_id: impl Into<String>, producer: &ProducerState, group_id: impl Into<String>, offsets: &[(&str, u32, i64)], ) -> Result<()>
Add consumer offsets to a transaction
For exactly-once consume-transform-produce patterns: commits consumer offsets atomically with the produced messages.
§Arguments
txn_id- Transaction identifierproducer- Producer stategroup_id- Consumer group IDoffsets- List of (topic, partition, offset) to commit
Sourcepub async fn commit_transaction(
&mut self,
txn_id: impl Into<String>,
producer: &ProducerState,
) -> Result<()>
pub async fn commit_transaction( &mut self, txn_id: impl Into<String>, producer: &ProducerState, ) -> Result<()>
Commit a transaction
Makes all writes in the transaction visible to consumers atomically. If this fails, the transaction should be aborted.
§Arguments
txn_id- Transaction identifierproducer- Producer state
Sourcepub async fn abort_transaction(
&mut self,
txn_id: impl Into<String>,
producer: &ProducerState,
) -> Result<()>
pub async fn abort_transaction( &mut self, txn_id: impl Into<String>, producer: &ProducerState, ) -> Result<()>
Abort a transaction
Discards all writes in the transaction. Call this if any write fails or if you need to cancel the transaction.
§Arguments
txn_id- Transaction identifierproducer- Producer state