---
layout: default
title: Share Consumer
nav_order: 5
description: "Queue-like consumption with KIP-932 share groups"
---
# Share Consumer Guide
This guide covers the share consumer, which provides queue-like consumption semantics via [KIP-932 share groups](https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka). Share groups are stable as of Apache Kafka 4.0+.
## Overview
Share groups differ from traditional consumer groups in several key ways:
| Feature | Consumer Group | Share Group |
|---|---|---|
| Assignment | Client or server-side | Server-side only |
| Offset tracking | Per-partition committed offsets | Per-record acknowledgements |
| Delivery | Exactly-once (with transactions) | At-least-once |
| Record sharing | One consumer per partition | Multiple consumers per partition |
| Redelivery | Seek / reset offsets | Automatic (release/reject) |
Multiple consumers in the same share group receive **non-overlapping subsets of records** from the same partition — the server handles all assignment and delivery tracking.
## Basic Usage
```rust
use krafka::share_consumer::{ShareConsumer, AcknowledgementMode};
use std::time::Duration;
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-share-group")
.build()
.await?;
consumer.subscribe(&["events"]).await?;
loop {
let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
process(record);
}
// In Implicit mode (default), records are auto-accepted on next poll()
}
```
## Acknowledgement Modes
### Implicit (Default)
Records fetched by the previous `poll()` are automatically accepted when the next `poll()` is called. This is the simplest mode — no application-level acknowledgement logic is needed. Consecutive offsets for the same partition are coalesced into contiguous ranges to reduce wire overhead.
### Explicit
The application controls acknowledgement per record. **All records from the previous `poll()` must be acknowledged before calling `poll()` again** — otherwise `poll()` returns an error. `acknowledge()` is one-shot per record: acknowledging the same record twice returns an error instead of sending duplicate broker intent. If a later `commit_sync()` or `commit_async()` flush fails, the consumer restores that batch locally and later `poll()` calls keep returning an error until the commit is retried successfully or the local share-consumer state is cleared.
```rust
use krafka::share_consumer::{ShareConsumer, AcknowledgementMode, AcknowledgeType};
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-share-group")
.acknowledgement_mode(AcknowledgementMode::Explicit)
.build()
.await?;
consumer.subscribe(&["events"]).await?;
let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
match try_process(record) {
Ok(_) => consumer.acknowledge(record, AcknowledgeType::Accept).await?,
Err(_) => consumer.acknowledge(record, AcknowledgeType::Release).await?,
}
}
consumer.commit_sync().await?;
```
### Acknowledge Types
| Type | Value | Meaning |
|---|---|---|
| `Accept` | 1 | Record processed successfully |
| `Release` | 2 | Record released for redelivery to another consumer |
| `Reject` | 3 | Record rejected (moved to dead-letter after max retries) |
## Delivery Count
Each `ConsumerRecord` includes a `delivery_count` field (populated from the server's acquired-records metadata). This tells you how many times the record has been delivered, which is useful for implementing retry limits:
```rust
for record in &records {
if let Some(count) = record.delivery_count {
if count > 5 {
consumer.acknowledge(record, AcknowledgeType::Reject).await?;
continue;
}
}
process(record);
}
```
## Async Commit
`commit_async()` returns a handle that resolves to the final commit outcome. This keeps the send off the caller's immediate path while still surfacing transport, decode, and broker errors explicitly. If any failure occurs, the batch is restored locally for the next commit cycle rather than silently dropped:
```rust
consumer.commit_async().await?;
```
## Streaming API
The share consumer also supports a `Stream`-based API:
```rust
use tokio_stream::StreamExt;
let mut stream = consumer.stream();
while let Some(record) = stream.next().await {
let record = record?;
process(&record);
}
```
## Configuration
| Option | Default | Description |
|---|---|---|
| `bootstrap_servers` | (required) | Comma-separated broker addresses |
| `group_id` | (required) | Share group identifier |
| `client_id` | `"krafka-share-consumer"` | Client identifier |
| `acknowledgement_mode` | `Implicit` | `Implicit` or `Explicit` |
| `fetch_min_bytes` | `1` | Minimum bytes per fetch |
| `fetch_max_bytes` | `52_428_800` | Maximum bytes per fetch |
| `max_poll_records` | `500` | Maximum records per poll |
| `max_records` | `-1` | Server-side max records (`-1` = no limit) |
| `batch_size` | `0` | Server-side batch size hint (`0` = default) |
| `fetch_max_wait_ms` | `500` | Maximum wait time for fetch responses |
| `request_timeout` | `30s` | Request timeout |
| `session_timeout` | `45s` | Session timeout for group membership |
| `heartbeat_interval` | `5s` | Heartbeat interval (must be < session_timeout) |
| `metadata_max_age` | `5min` | Metadata cache TTL |
| `metadata_topic_cache_ttl` | `Some(5min)` | TTL for topic entries in the partial-refresh cache. `None` disables eviction. Use `disable_metadata_topic_cache_ttl()` to opt out. |
| `client_rack` | `None` | Rack ID for rack-aware fetching |
### Metadata Topic Cache TTL
During a partial metadata refresh (where only the subscribed topics are re-fetched rather than the entire cluster), Krafka caches each topic's metadata between refreshes. By default, a topic entry is evicted from this cache after **5 minutes** of not being successfully refreshed — matching Java's `metadata.max.idle.ms` — to prevent unbounded growth when topics are deleted or subscriptions change.
```rust
use krafka::share_consumer::ShareConsumer;
use std::time::Duration;
// Use a custom TTL (e.g. 10 minutes):
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-share-group")
.metadata_topic_cache_ttl(Duration::from_secs(600))
.build()
.await?;
// Opt out of TTL eviction entirely (topics persist until the cache is flushed):
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-share-group")
.disable_metadata_topic_cache_ttl()
.build()
.await?;
```
> **Note:** TTL eviction only affects the partial-refresh cache. A full metadata refresh (triggered by `metadata_max_age` expiry or an explicit refresh) always replaces the cache unconditionally.
## Session Management
Share sessions (similar to fetch sessions from KIP-227) track per-broker state with epoch-based sequencing:
- **Epoch 0**: Opens a new session (full fetch)
- **Epoch 1..N**: Incremental fetches
- **Epoch -1**: Closes the session
Sessions are managed automatically. They reset on errors or assignment changes.
## Concurrent Fetching
Each `poll()` issues ShareFetch requests to all assigned brokers **concurrently** by spawning one Tokio task per broker and awaiting the handles directly. Pending acknowledgements are piggybacked on fetch requests to reduce round trips. If a broker fetch fails, records from other brokers are still returned, the session for the failed broker is reset, and the unsent piggyback acknowledgements are restored for the next commit cycle.
## Coordinator Handling
The share consumer discovers its group coordinator via `FindCoordinator` (key type = GROUP). The coordinator is cached and re-discovered automatically when:
- A heartbeat fails
- A `NOT_COORDINATOR` error is received
- `unsubscribe()` or `close()` is called
## Lifecycle
```rust
// Create
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-group")
.build()
.await?;
// Subscribe
consumer.subscribe(&["topic1", "topic2"]).await?;
// Consume
let records = consumer.poll(Duration::from_secs(1)).await?;
// Unsubscribe (leaves group, generates a new member ID)
consumer.unsubscribe().await;
// Close (idempotent)
consumer.close().await?;
```
### Close Semantics
`close()` is terminal and returns the first cleanup error after local state and connections have still been closed:
1. **Implicit mode**: all pending accept acks are converted to **releases** so acquired records return to the pool for redelivery by other consumers.
2. **Explicit mode**: pending acks (accept/release/reject) are flushed as-is.
3. Sends and validates a leave-group heartbeat.
4. Clears all local state and closes connections.
### Unsubscribe Semantics
`unsubscribe()` attempts a best-effort leave-group heartbeat, logs any leave failure internally, clears all partition state (pending acks, sessions, coordinator), and generates a fresh member ID. The consumer can be resubscribed afterwards.
## Wire Protocol
The share consumer uses four Kafka APIs (all feature-gated behind `unstable-protocol`):
| API | Key | Versions | Purpose |
|---|---|---|---|
| ShareGroupHeartbeat | 76 | v1 | Group membership and assignment |
| ShareGroupDescribe | 77 | v1 | Describe share group state |
| ShareFetch | 78 | v1–v2 | Fetch records with acquisition tracking |
| ShareAcknowledge | 79 | v1–v2 | Acknowledge processed records |
See the [Protocol Reference](protocol.md) for wire format details.