Skip to main content

Crate crabka_client_consumer

Crate crabka_client_consumer 

Source
Expand description

Subscribe-style consumer client for Apache Kafka in Rust.

Builds on crabka-client-core for transport; adds the classic consumer-group lifecycle (JoinGroupSyncGroupHeartbeatFetchOffsetCommitLeaveGroup) and a built-in heartbeat task.

§Quick start

use crabka_client_consumer::{AutoOffsetReset, Consumer};
use std::time::Duration;

let mut consumer = Consumer::builder()
    .bootstrap("localhost:9092")
    .group_id("my-group")
    .client_id("my-app")
    .auto_offset_reset(AutoOffsetReset::Earliest)
    .subscribe(["my-topic".to_string()])
    .build()
    .await?;

loop {
    let records = consumer.poll(Duration::from_millis(500)).await?;
    for _r in records {
        // ... handle r ...
    }
    consumer.commit_sync().await?;
}

§Share-group consumption

use crabka_client_consumer::{ShareAckMode, ShareAckType, ShareConsumer};
use std::time::Duration;

let mut consumer = ShareConsumer::builder()
    .bootstrap("localhost:9092")
    .group_id("share-workers")
    .subscribe(["jobs".to_string()])
    .ack_mode(ShareAckMode::Explicit)
    .build()
    .await?;

let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
    consumer.acknowledge(record, ShareAckType::Accept)?;
}
consumer.commit().await?;

§Capabilities and boundaries

This crate owns consumer-facing semantics: classic group membership, assignment, fetch/poll, offset commit, cooperative shutdown, and KIP-932 share-group consumption. It intentionally does not duplicate admin-client surfaces such as DescribeGroups/ListGroups, and manual partition fetches remain available through the lower-level helpers in crabka-client-core. Transactional consume-process-produce workflows use this crate’s ConsumerGroupMetadata together with crabka-client-producer’s send_offsets_to_transaction support.

§Cargo features

None for now.

Structs§

Consumer
Subscribe-style consumer handle. Construct via Consumer::builder.
ConsumerGroupMetadata
The identity a consumer presents to a transactional producer for KIP-447 offset-commit fencing. Mirrors the JVM’s org.apache.kafka.clients.consumer.ConsumerGroupMetadata.
ConsumerRecord
One record returned by Consumer::poll.
ShareConsumer
A share-group consumer. Construct via ShareConsumer::builder.
ShareConsumerRecord
One record delivered by ShareConsumer::poll.

Enums§

Assignor
AutoOffsetReset
What to do when a partition has no committed offset.
ConsumerError
Errors returned by Consumer.
IsolationLevel
Controls which records are visible to this consumer.
ShareAckMode
How acquired records are acknowledged.
ShareAckType
The disposition of an acknowledged record (KIP-932 wire codes).