# Consumer
`KafkaConsumer` is an async Kafka group consumer for Kafka 4.0+ modern consumer
groups. It supports topic subscriptions, regex subscriptions, manual assignment,
polling, commits, seek, pause/resume, and offset lookup.
Classic consumer group membership is not implemented.
## Subscribe And Poll
```rust,no_run
use kafkit_client::{AutoOffsetReset, KafkaClient};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
.topic("orders")
.consumer("orders-reader")
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.with_instance_id("orders-reader-1")
.connect()
.await?;
let records = consumer.poll().await?;
for record in records.iter() {
println!(
"{}:{}@{}",
record.topic,
record.partition,
record.offset
);
}
consumer.commit(&records).await?;
consumer.shutdown().await?;
Ok(())
}
```
## Subscription Modes
Use the builder for one or more known topics:
```rust,no_run
use kafkit_client::KafkaClient;
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
.consumer("orders-reader")
.with_topics(["orders", "payments"])
.connect()
.await?;
consumer.shutdown().await?;
Ok(())
}
```
Or call subscription methods after connecting:
```rust,no_run
use kafkit_client::{ConsumerConfig, KafkaConsumer};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer =
KafkaConsumer::connect(ConsumerConfig::new("localhost:9092", "orders-reader")).await?;
consumer.subscribe(vec!["orders".to_owned()]).await?;
consumer.subscribe_regex("^orders-.*".to_owned()).await?;
consumer.unsubscribe().await?;
consumer.shutdown().await?;
Ok(())
}
```
## Manual Assignment
Manual assignment gives direct partition control and does not rely on group
subscription assignment.
```rust,no_run
use kafkit_client::{ConsumerConfig, KafkaConsumer, TopicPartition};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer =
KafkaConsumer::connect(ConsumerConfig::new("localhost:9092", "orders-reader")).await?;
let partition = TopicPartition::new("orders", 0);
consumer.assign(vec![partition.clone()]).await?;
consumer.seek(partition.clone(), 0).await?;
let position = consumer.position(partition).await?;
println!("position: {position}");
consumer.shutdown().await?;
Ok(())
}
```
## Commits And Offsets
- `commit(&records)` commits offsets derived from a returned `ConsumerRecords`
batch.
- `commit_offsets(...)` commits explicit offsets.
- `committed(...)` fetches committed offsets for partitions.
- `beginning_offsets(...)` and `end_offsets(...)` query log boundaries.
- `offsets_for_times(...)` looks up offsets by timestamp.
Commit records after processing them. For read-process-write flows that require
atomic output and offset commits, use a transactional producer and
`send_offsets_to_transaction`.
## Pause, Resume, And Wakeup
- `pause(...)` temporarily stops fetching from partitions.
- `resume(...)` restarts fetching from paused partitions.
- `wakeup()` interrupts waiting poll work.
- `shutdown()` stops the background consumer task and closes the client.
## Modern Groups
This consumer uses Kafka's modern consumer group protocol. Existing Kafka
offsets remain Kafka offsets, but membership and heartbeat flows are modern
Kafka 4.0+ flows rather than classic `JoinGroup` / `SyncGroup` membership.