crabka-client-consumer 0.3.0

Subscribe-style consumer client for Apache Kafka in Rust
Documentation

Subscribe-style consumer client for Apache Kafka in Rust.

Builds on crabka-client-core for transport; adds the classic consumer-group lifecycle (JoinGroupSyncGroupHeartbeatFetchOffsetCommitLeaveGroup) and a built-in heartbeat task.

Quick start

use std::time::Duration;
use crabka_client_consumer::{Consumer, AutoOffsetReset};

# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let mut consumer = Consumer::builder()
    .bootstrap("localhost:9092")
    .group_id("my-group")
    .client_id("my-app")
    .auto_offset_reset(AutoOffsetReset::Earliest)
    .subscribe(["my-topic".to_string()])
    .build()
    .await?;

loop {
    let records = consumer.poll(Duration::from_millis(500)).await?;
    for _r in records {
        // ... handle r ...
    }
    consumer.commit_sync().await?;
}
# }

Share-group consumption

use std::time::Duration;
use crabka_client_consumer::{ShareAckMode, ShareAckType, ShareConsumer};

# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let mut consumer = ShareConsumer::builder()
    .bootstrap("localhost:9092")
    .group_id("share-workers")
    .subscribe(["jobs".to_string()])
    .ack_mode(ShareAckMode::Explicit)
    .build()
    .await?;

let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
    consumer.acknowledge(record, ShareAckType::Accept)?;
}
consumer.commit().await?;
# Ok(())
# }

Current boundaries

  • assign() (manual partition consumption) — use crabka-client-core directly.
  • Admin RPCs (DescribeGroups, ListGroups).
  • KIP-848 broker-side migration is not exposed through this client API.
  • Full EOS transactional consumer guarantees.

Cargo features

None for now.