# Consumer
`KafkaConsumer` is an async Kafka group consumer for Kafka 4.0+ modern consumer
groups. It supports topic subscriptions, regex subscriptions, manual assignment,
polling, commits, seek, pause/resume, and offset lookup.
Classic consumer group membership is not implemented.
## Subscribe And Poll
```rust,no_run
use kafkit_client::{AutoOffsetReset, KafkaClient};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
.topic("orders")
.consumer("orders-reader")
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.with_instance_id("orders-reader-1")
.connect()
.await?;
let records = consumer.poll().await?;
for record in records.iter() {
println!(
"{}:{}@{}",
record.topic,
record.partition,
record.offset
);
}
consumer.commit(&records).await?;
consumer.shutdown().await?;
Ok(())
}
```
## Subscription Modes
Use the builder for one or more known topics:
```rust,no_run
use kafkit_client::KafkaClient;
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
.topics(["orders", "payments"])
.consumer("orders-reader")
.connect()
.await?;
consumer.shutdown().await?;
Ok(())
}
```
Or call subscription methods after connecting:
```rust,no_run
use kafkit_client::{ConsumerConfig, KafkaConsumer};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer =
KafkaConsumer::connect(ConsumerConfig::new("localhost:9092", "orders-reader")).await?;
consumer.subscribe(vec!["orders".to_owned()]).await?;
consumer.subscribe_regex("^orders-.*".to_owned()).await?;
consumer.unsubscribe().await?;
consumer.shutdown().await?;
Ok(())
}
```
## Manual Assignment
Manual assignment gives direct partition control and does not rely on group
subscription assignment.
```rust,no_run
use kafkit_client::{ConsumerConfig, KafkaConsumer, TopicPartition};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer =
KafkaConsumer::connect(ConsumerConfig::new("localhost:9092", "orders-reader")).await?;
let partition = TopicPartition::new("orders", 0);
consumer.assign(vec![partition.clone()]).await?;
consumer.seek(partition.clone(), 0).await?;
let position = consumer.position(partition).await?;
println!("position: {position}");
consumer.shutdown().await?;
Ok(())
}
```
## Commits And Offsets
- `commit(&records)` commits offsets derived from a returned `ConsumerRecords`
batch.
- `commit_offsets(...)` commits explicit offsets.
- `committed(...)` fetches committed offsets for partitions.
- `beginning_offsets(...)` and `end_offsets(...)` query log boundaries.
- `offsets_for_times(...)` looks up offsets by timestamp.
Commit records after processing them. For read-process-write flows that require
atomic output and offset commits, use a transactional producer and
`send_offsets_to_transaction`.
## Pause, Resume, And Wakeup
- `pause(...)` temporarily stops fetching from partitions.
- `resume(...)` restarts fetching from paused partitions.
- `wakeup()` interrupts waiting poll work.
- `shutdown()` stops the background consumer task and closes the client.
## Lifecycle Semantics
`KafkaConsumer::connect(...)` starts a background runtime task. Subscription,
assignment, polling, commit, seek, offset lookup, pause, resume, and shutdown
methods send work to that task and await the result. Dropping the handle without
calling `shutdown()` drops the application side of the control channel; call
`shutdown().await` when the application wants an orderly leave-group and close.
`subscribe(...)`, `subscribe_regex(...)`, and `subscribe_pattern(...)` replace
the current group subscription and trigger a new modern-group heartbeat. The
consumer joins the group lazily; assignment becomes visible after heartbeats and
metadata resolution complete. `unsubscribe().await` clears the group
subscription and assignment. `assign(...)` switches to manual assignment and
does not participate in group partition assignment.
`poll().await` waits for available records using the configured fetch settings.
`poll_for(timeout).await` returns an empty batch when the timeout expires. The
client records delivered offsets for returned records, but it does not commit
them unless the caller uses `commit(...)`, `commit_offsets(...)`, or enables
auto commit in the config. `wakeup()` interrupts an outstanding poll with a
wakeup error and leaves the consumer usable for later polls.
On a rebalance, revoked partitions are removed from the active assignment before
new partitions are made fetchable. If a rebalance listener is configured, it is
called for revoke, assign, and lost events around those transitions. The client
does not automatically commit on revoke unless auto commit is enabled and
delivered offsets are pending.
`shutdown().await` stops accepting new work, finishes pending commits where
possible, sends a leave-group heartbeat when the consumer is an active group
member, and resolves the shutdown future when the runtime task has stopped. If
the runtime has already failed with a terminal error, shutdown reports that
failure.
## Modern Groups
This consumer uses Kafka's modern consumer group protocol. Existing Kafka
offsets remain Kafka offsets, but membership and heartbeat flows are modern
Kafka 4.0+ flows rather than classic `JoinGroup` / `SyncGroup` membership.