krafka 0.11.0

A pure Rust, async-native Apache Kafka client
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
---
layout: default
title: Consumer
nav_order: 4
description: "Consumer groups with rebalancing and offset management"
---

# Consumer Guide

This guide covers consumer usage, including consumer groups, offset management, partition assignment, and error handling.

## Overview

The Krafka consumer is an async-native, feature-rich Kafka consumer with:

- Consumer group coordination
- Automatic offset management
- Multiple partition assignment strategies
- Manual offset control
- Seek operations
- Incremental fetch sessions (KIP-227)
- Closest-replica fetching (KIP-392)
- Static group membership (KIP-345)
- KIP-848 consumer group protocol (server-side assignment)
- Interceptor hooks
- Log compaction awareness with [`CompactedTable`](#compactedtable) and [`CompactedTopicConsumer`](#compactedtopicconsumer) for key→value tables
- Per-partition offset lag tracking

## Basic Usage

```rust
use krafka::consumer::Consumer;
use krafka::error::Result;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let consumer = Consumer::builder()
        .bootstrap_servers("localhost:9092")
        .group_id("my-group")
        .build()
        .await?;

    consumer.subscribe(&["my-topic"]).await?;

    loop {
        let records = consumer.poll(Duration::from_secs(1)).await?;
        for record in records {
            println!("Received: {:?}", record);
        }
    }
}
```

## Authentication

Connect to secured Kafka clusters using SASL or TLS:

```rust
use krafka::consumer::Consumer;

// SASL/SCRAM-SHA-256
let consumer = Consumer::builder()
    .bootstrap_servers("broker:9093")
    .group_id("secure-group")
    .sasl_scram_sha256("username", "password")
    .build()
    .await?;

// AWS MSK IAM
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let consumer = Consumer::builder()
    .bootstrap_servers("broker:9094")
    .group_id("msk-group")
    .auth(auth)
    .build()
    .await?;
```

See the [Authentication Guide](authentication.md) for all supported mechanisms.

## Consumer Configuration

### Auto Offset Reset

Control behavior when no committed offset exists:

```rust
use krafka::consumer::{Consumer, AutoOffsetReset};

// Start from the earliest available message
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .auto_offset_reset(AutoOffsetReset::Earliest)
    .build()
    .await?;

// Start from the latest message (only new messages)
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .auto_offset_reset(AutoOffsetReset::Latest)
    .build()
    .await?;

// Error if no committed offset exists
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .auto_offset_reset(AutoOffsetReset::None)
    .build()
    .await?;
// poll() will return an error for partitions without committed offsets
```

> **Note:** After a consumer group rebalance, Krafka automatically fetches previously committed offsets from the group coordinator (OffsetFetch). Partitions without committed offsets use the configured `auto_offset_reset` policy.

> **OffsetOutOfRange Recovery:** If the broker returns `OffsetOutOfRange` during a fetch (e.g., because a partition was truncated or the consumer fell behind log retention), Krafka automatically applies the configured `auto_offset_reset` policy to recover the partition instead of stalling. This works for both group-based and standalone (manually assigned) consumers.

> **Offset Resolution:** When multiple partitions need offset resolution (e.g., after a rebalance or on first poll), Krafka batches `ListOffsets` requests by leader broker — resolving 50 partitions in 2-3 RPCs instead of 50. Failed offset resolutions use per-partition exponential backoff (100ms base, 30s cap) to prevent retry storms under sustained broker unavailability.

### Offset Commit

Control how offsets are committed. When auto-commit is enabled (the default), Krafka automatically commits offsets during each `poll()` call when the commit interval has elapsed, during `close()`, and **before partition revocations** during rebalances (so the new partition owner sees up-to-date committed positions). `close().await` still tears down local state before returning; final auto-commit failures that only indicate the member already lost the group during a rebalance are treated as best-effort shutdown races, while other close-time commit failures still surface:

> **Warning — at-least-once caveat:** Auto-commit commits the offset of the last record *returned* by `poll()`, not the last record *processed* by the application. If the application crashes after `poll()` returns but before processing completes, those records may be skipped on restart. For strict at-least-once guarantees, disable auto-commit and call `commit()` explicitly after processing each batch.

```rust
use krafka::consumer::Consumer;
use std::time::Duration;

// Auto-commit (default)
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .enable_auto_commit(true)
    .auto_commit_interval(Duration::from_secs(5))
    .build()
    .await?;

// Manual commit
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .enable_auto_commit(false)
    .build()
    .await?;
```

### Fetch Configuration

Control message fetching behavior:

```rust
use krafka::consumer::Consumer;
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .fetch_min_bytes(1)                          // Min bytes before returning
    .fetch_max_bytes(52428800)                   // Max bytes per fetch (50MB)
    .max_partition_fetch_bytes(1048576)          // Max bytes per partition (1MB)
    .max_poll_records(500)                       // Max records per poll
    .max_buffered_records(500)                   // Buffer cap for recv()
    .fetch_max_wait(Duration::from_millis(500))  // Max wait time
    .build()
    .await?;
```

### Buffer Cap

When using `recv()`, records from `poll()` that are not immediately returned are buffered internally. The `max_buffered_records` setting controls the maximum number of records held in this buffer. When the buffer reaches the limit, `poll()` skips fetching new data until the buffer drains below the threshold. Auto-commit and rebalance handling still run so the consumer remains healthy in the group.

For single-caller `recv()` usage the buffer is naturally bounded by `max_poll_records` (one `poll()` batch minus the record returned to the caller). The cap adds an additional guard for:
- Mixed `poll()` / `recv()` usage on the same consumer
- Multiple tasks calling `recv()` concurrently

Set to `0` to disable the buffer cap (unlimited). Defaults to `500`.

```rust
use krafka::consumer::Consumer;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .max_buffered_records(1000) // Allow up to 1000 buffered records
    .build()
    .await?;
```

### Isolation Level

Control visibility of transactional records. When consuming from topics that receive transactional writes, set `isolation_level` to `read_committed` to only see committed records:

```rust
use krafka::consumer::{Consumer, IsolationLevel};

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .isolation_level(IsolationLevel::ReadCommitted) // Only see committed txn records
    .build()
    .await?;
```

| Level | Description |
|-------|-------------|
| `ReadUncommitted` (default) | See all records, including uncommitted transactional records |
| `ReadCommitted` | Only see committed records; uncommitted transactional records are filtered |

> **Note:** `isolation_level` affects both data fetches and offset resolution (ListOffsets). Krafka passes the isolation level to the broker via ListOffsets (v2+, up to v11).

### Metadata Topic Cache TTL

During a partial metadata refresh (where only the subscribed topics are re-fetched rather than the entire cluster), Krafka caches each topic's metadata between refreshes. By default, a topic entry is evicted from this cache after **5 minutes** of not being successfully refreshed — matching Java's `metadata.max.idle.ms` — to prevent unbounded growth when topics are deleted or subscriptions change.

```rust
use krafka::consumer::Consumer;
use std::time::Duration;

// Use a custom TTL (e.g. 10 minutes):
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .metadata_topic_cache_ttl(Duration::from_secs(600))
    .build()
    .await?;

// Opt out of TTL eviction entirely (topics persist until the cache is flushed):
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .disable_metadata_topic_cache_ttl()
    .build()
    .await?;
```

> **Note:** TTL eviction only affects the partial-refresh cache. A full metadata refresh (triggered by `metadata_max_age` expiry or an explicit refresh) always replaces the cache unconditionally.

## Consumer Groups

### How Consumer Groups Work

1. Consumers with the same `group_id` form a consumer group
2. Partitions are distributed among group members
3. Each partition is consumed by exactly one consumer
4. When consumers join/leave, partitions are rebalanced

```rust
use krafka::consumer::Consumer;

// Multiple consumers in the same group share partitions
let consumer1 = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("processing-group")
    .build()
    .await?;

let consumer2 = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("processing-group")
    .build()
    .await?;

// Both subscribe to the same topic - partitions are split between them
consumer1.subscribe(&["events"]).await?;
consumer2.subscribe(&["events"]).await?;
```

### Partition Assignment Strategies

Krafka supports multiple assignment strategies. Configure the strategy via the builder:

```rust
use krafka::consumer::{Consumer, PartitionAssignmentStrategy};

// Range assignor (default)
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .partition_assignment_strategy(PartitionAssignmentStrategy::Range)
    .build()
    .await?;

// Round-robin for balanced distribution
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .partition_assignment_strategy(PartitionAssignmentStrategy::RoundRobin)
    .build()
    .await?;

// Cooperative sticky for minimal partition movement during rebalances
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .partition_assignment_strategy(PartitionAssignmentStrategy::CooperativeSticky)
    .build()
    .await?;
```

The underlying assignor implementations are also available directly:

```rust
use krafka::consumer::{RangeAssignor, RoundRobinAssignor, CooperativeStickyAssignor, PartitionAssignor};

// Range assignor (default)
// Assigns partition ranges to consumers: [0,1,2] [3,4,5]
// Best for: Co-partitioned topics
let range = RangeAssignor;
assert_eq!(range.name(), "range");

// Round-robin assignor
// Distributes partitions evenly across all consumers
// Best for: Balanced load across many consumers
let round_robin = RoundRobinAssignor;
assert_eq!(round_robin.name(), "roundrobin");

// Cooperative sticky assignor
// Minimizes partition movement during rebalances (incremental cooperative)
// Best for: Production workloads needing minimal disruption
let cooperative = CooperativeStickyAssignor::new();
assert_eq!(cooperative.name(), "cooperative-sticky");
```

#### Cooperative Sticky Assignor

The `CooperativeStickyAssignor` implements the incremental cooperative rebalancing
protocol (KIP-429), minimizing partition movement and avoiding stop-the-world
rebalances when consumers join or leave the group.

**Key features:**
- **Incremental two-phase rebalance**: Only the partitions being moved are revoked
  and cleaned up — unaffected partitions retain their state and do not go through
  a full revoke/reassign cycle.
- **Stickiness**: Partitions stay with their current owner when possible, reducing
  unnecessary movement.
- **Balanced distribution**: Ensures fair partition allocation across consumers.
- **Owned-partition metadata (v1)**: Encodes each member's current assignment in
  JoinGroup metadata so the leader can compute minimal revocations.
- **Proper revocation semantics**: `on_partitions_revoked` is called only for the
  diff (partitions being moved) during normal rebalances (including topic deletion),
  while `on_partitions_lost` is used when ownership may already have been transferred
  (e.g., session timeout, fencing, or graceful shutdown via `close()`).

```rust
use krafka::consumer::{ConsumerBuilder, PartitionAssignmentStrategy};

let consumer = ConsumerBuilder::default()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .partition_assignment_strategy(PartitionAssignmentStrategy::CooperativeSticky)
    .build()
    .await?;

// During a rebalance, only affected partitions are revoked/released from this consumer.
// Unaffected partitions keep their assignment and continue being consumed.
```

**How it works:**

1. A rebalance is triggered (new member joins, member leaves, etc.).
2. Phase 1: All members join and receive new target assignments.
3. Each member computes which partitions to revoke (old − new).
4. Revoked partitions are released and `on_partitions_revoked` fires.
5. Phase 2: Members rejoin with updated owned-partition metadata.
6. Final assignments are distributed and `on_partitions_assigned` fires
   with **only the newly acquired partitions** (delta vs previous round).
   Committed offsets are fetched only for the newly acquired partitions.

> **Java client parity:** `on_partitions_assigned` follows the same
> delta semantics as the Java `ConsumerRebalanceListener.onPartitionsAssigned`.
> To get the **full** post-rebalance assignment call `consumer.assignment()`
> from inside the callback.

### Rebalance Listener

Get notified when partition assignments change during rebalances. Register a listener via the builder:

```rust
use krafka::consumer::{Consumer, ConsumerRebalanceListener, TopicPartition};
use std::sync::Arc;

struct MyRebalanceListener;

impl ConsumerRebalanceListener for MyRebalanceListener {
    fn on_partitions_assigned(&self, partitions: &[TopicPartition]) {
        println!("Assigned: {:?}", partitions);
        // Initialize state for new partitions
        // Load any existing checkpoints from external storage
    }

    fn on_partitions_revoked(&self, partitions: &[TopicPartition]) {
        println!("Revoked: {:?}", partitions);
        // Commit offsets synchronously before losing partitions
        // Save any in-memory state to external storage
    }

    fn on_partitions_lost(&self, partitions: &[TopicPartition]) {
        // Called when partitions are lost unexpectedly (e.g., session timeout)
        // Unlike revoked, offsets may already be committed by another consumer
        println!("Lost: {:?}", partitions);
    }
}

// Wire into the consumer via the builder:
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .rebalance_listener(Arc::new(MyRebalanceListener))
    .build()
    .await?;

// Use the NoOpRebalanceListener for a no-op implementation (default):
use krafka::consumer::NoOpRebalanceListener;
let _listener = NoOpRebalanceListener;
```

The listener is automatically invoked during `poll()` (before/after rebalance) and `close()` (partitions lost). Callbacks are useful for:
- Committing offsets before partition loss
- Saving processing state to external storage
- Initializing resources when new partitions are assigned
- Proper cleanup during consumer group rebalances

> **Note:** After rebalance completes, Krafka automatically issues `OffsetFetch` to the group coordinator to retrieve committed offsets for all assigned partitions. This ensures seamless resumption from the last committed position.

## Offset Management

### Manual Commit

For precise control over offset commits:

```rust
use krafka::consumer::Consumer;
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .enable_auto_commit(false)
    .build()
    .await?;

consumer.subscribe(&["orders"]).await?;

loop {
    let records = consumer.poll(Duration::from_secs(1)).await?;
    
    for record in &records {
        // Process each record
        process_order(&record).await?;
    }
    
    // Commit after processing
    if !records.is_empty() {
        consumer.commit().await?;
    }
}
```

### Async Commit

For non-blocking commits:

```rust
// Commit asynchronously and await the final outcome.
// Snapshot, transport, and broker failures are surfaced on the handle.
// Retriable coordinator failures use the same short retry loop as commit().
// If the assignment or offset snapshot cannot be taken, the handle resolves
// to an error instead of silently skipping the commit cycle.
consumer.commit_async().await?;
```

### Commit with Metadata

Commit specific offsets with application-specific metadata:

```rust
use std::collections::HashMap;
use krafka::consumer::{Consumer, OffsetAndMetadata, TopicPartition};

// Commit specific offsets with metadata
let mut offsets = HashMap::new();
offsets.insert(
    TopicPartition::new("orders", 0),
    OffsetAndMetadata::with_metadata(1500, "checkpoint-abc123"),
);
offsets.insert(
    TopicPartition::new("orders", 1),
    OffsetAndMetadata::new(2000),
);

consumer.commit_with_metadata(offsets).await?;
```

In group mode, only currently assigned partitions are committed. Retriable
coordinator failures use the same short retry loop as `commit()` and
`commit_async()`.

This is useful for:
- Storing application checkpoints
- Recording processing state
- Debugging offset issues (metadata is visible in Kafka tools)

### Position and Seeking

Query and control consumer position:

```rust
// Get current position
let offset = consumer.position("topic", 0).await;
println!("Current position: {:?}", offset);

// Seek to a specific offset
consumer.seek("topic", 0, 1000).await?;

// Seek multiple partitions atomically (one lock acquisition)
use std::collections::HashMap;
consumer.seek_many(&HashMap::from([
    (("orders".to_string(), 0), 1_000),
    (("orders".to_string(), 1), 2_000),
])).await?;

// Seek to the beginning (earliest available)
consumer.seek_to_beginning("topic", 0).await?;

// Seek to the end (latest, only receive new messages)
consumer.seek_to_end("topic", 0).await?;
```

### Starting from Known Offsets (Exactly-Once Recovery)

Use `initial_offsets` on the builder to set per-partition start positions before
`auto_offset_reset` is applied. This is ideal for recovery pipelines that
checkpoint positions externally:

```rust
use std::collections::HashMap;
use krafka::consumer::Consumer;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .initial_offsets(HashMap::from([
        (("orders".to_string(), 0), 1_234),
        (("orders".to_string(), 1), 5_678),
    ]))
    .build()
    .await?;
```

Initial offsets are applied when a partition is first assigned and has no
committed group offset. They override `auto_offset_reset` for the matching
partitions; unmatched partitions still follow `auto_offset_reset`.

### Pause and Resume

Temporarily pause consumption of specific partitions:

```rust
// Pause specific partitions
consumer.pause("orders", &[0, 1]).await;

// Check which partitions are paused
let paused = consumer.paused_partitions().await;
println!("Paused partitions: {:?}", paused);

// Resume consumption
consumer.resume("orders", &[0, 1]).await;
```

Paused partitions are skipped during `poll()` until resumed. This is useful for:
- Back-pressure handling when downstream is slow
- Prioritizing certain partitions
- Implementing rate limiting

> **Rebalance behavior:** Pause state is preserved for partitions that remain assigned to the same consumer across both eager and cooperative rebalances. Only revoked partitions lose their pause state. `unsubscribe()` and `close()` still clear all pause state.

## Manual Partition Assignment

For direct partition control (without consumer groups):

> **Note:** Manual assignment and group subscription are mutually exclusive.
> Calling `assign()` on a consumer with a `group_id` configured will return an error.

> **Standalone Recovery:** Standalone consumers have the same `OffsetOutOfRange` recovery as group consumers — the configured `auto_offset_reset` policy is applied automatically to recover stalled partitions.

```rust
use krafka::consumer::Consumer;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    // Note: no group_id for manual assignment
    .auto_offset_reset(krafka::consumer::AutoOffsetReset::Earliest)
    .build()
    .await?;

// Assign specific partitions
consumer.assign("topic", vec![0, 1, 2]).await?;
```

## Subscription Management

### Subscribe to Multiple Topics

`subscribe()` **replaces** the current subscription (it does not append):

```rust
// Subscribe to initial topics
consumer.subscribe(&["orders", "payments"]).await?;

// This REPLACES the subscription — only "shipments" is subscribed now
consumer.subscribe(&["shipments"]).await?;
```

### Check Subscriptions and Assignments

```rust
// Get subscribed topics
let topics = consumer.subscription().await;
println!("Subscribed to: {:?}", topics);

// Get assigned partitions
let assignments = consumer.assignment().await;
println!("Assigned partitions: {:?}", assignments);
```

### Unsubscribe

Calling `unsubscribe()` performs a full cleanup: revokes partitions (notifying
the rebalance listener), leaves the consumer group, and clears all internal
state (offsets, paused partitions, buffered records). It returns a leave-group
error after local state has still been cleared.

```rust
consumer.unsubscribe().await?;
```

### Pause and Resume

Temporarily pause consumption of specific partitions without disconnecting:

```rust
// Pause partitions 0 and 1 of "orders" topic
consumer.pause("orders", &[0, 1]).await;

// These partitions will be skipped during poll()
let records = consumer.poll(Duration::from_secs(1)).await?;
// Only records from non-paused partitions are returned

// Check which partitions are paused
let paused = consumer.paused_partitions().await;
println!("Paused partitions: {:?}", paused);

// Resume consumption
consumer.resume("orders", &[0, 1]).await;
```

Use cases for pause/resume:
- **Backpressure handling**: Pause when downstream systems are slow
- **Priority processing**: Pause low-priority partitions during high load
- **Graceful degradation**: Pause non-essential partitions when resources are constrained

## Error Handling

### Handling Poll Errors

```rust
use krafka::consumer::Consumer;
use krafka::error::KrafkaError;
use std::time::Duration;

async fn consume_with_error_handling(consumer: &Consumer) {
    loop {
        match consumer.poll(Duration::from_secs(1)).await {
            Ok(records) => {
                for record in records {
                    process_record(record).await;
                }
            }
            Err(KrafkaError::Timeout(_)) => {
                // Normal - no messages available
                continue;
            }
            Err(e) => {
                eprintln!("Error polling: {}", e);
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    }
}
```

### Streaming with `recv()`

The `recv()` method returns individual records as a stream-like API.
It internally buffers records fetched by `poll()` and returns them one by one,
ensuring no data loss even when `poll()` returns multiple records.

`recv()` returns `Result<ConsumerRecord, RecvError>` instead of `Result<Option<ConsumerRecord>>`:
- `Ok(record)` — a record was received.
- `Err(RecvError::Closed)` — the consumer was shut down.
- `Err(RecvError::Error(e))` — a broker or network error occurred.

```rust
use krafka::consumer::Consumer;
use krafka::error::Result;
use krafka::RecvError;

async fn consume_stream(consumer: &Consumer) -> Result<()> {
    loop {
        match consumer.recv().await {
            Ok(record) => println!(
                "topic={}, partition={}, offset={}",
                record.topic, record.partition, record.offset
            ),
            Err(RecvError::Closed)   => break,
            Err(RecvError::Error(e)) => return Err(e),
            Err(_) => break,
        }
    }
    Ok(())
}
```

### High-Throughput Batch Receive

`batch_recv(max_records, timeout)` collects up to `max_records` in one call,
returning an explicit [`BatchRecvOutcome`] so timeout/close/empty-request are
unambiguous:

```rust
use std::time::Duration;
use krafka::consumer::{BatchRecvOutcome, Consumer};
use krafka::error::Result;

async fn process_batches(consumer: &Consumer) -> Result<()> {
    loop {
        match consumer.batch_recv(100, Duration::from_millis(200)).await? {
            BatchRecvOutcome::Records(batch) => {
                for record in batch {
                    println!("offset={}", record.offset);
                }
            }
            BatchRecvOutcome::TimedOut => continue,
            BatchRecvOutcome::Closed => break,
            BatchRecvOutcome::EmptyRequest => continue,
            _ => continue,
        }
    }
    Ok(())
}
```

### Async `Stream` API

The `stream()` method returns a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html)
of `Result<ConsumerRecord>`, enabling use with `tokio-stream` combinators
(`.map()`, `.filter()`, `.take()`, `.buffer_unordered()`, etc.):

```rust
use krafka::consumer::Consumer;
use krafka::error::Result;
use tokio_stream::StreamExt; // requires tokio-stream dependency

async fn consume_with_stream(consumer: &Consumer) -> Result<()> {
    let mut stream = consumer.stream();
    while let Some(result) = stream.next().await {
        let record = result?;
        println!(
            "topic={}, partition={}, offset={}",
            record.topic, record.partition, record.offset
        );
    }
    Ok(())
}
```

The stream terminates when the consumer is closed. Internally it delegates to
`recv()`, so all features (auto-commit, rebalancing, fetch sessions, buffering)
work identically.

### Graceful Shutdown

Always close consumers properly:

```rust
use tokio::signal;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .build()
    .await?;

consumer.subscribe(&["topic"]).await?;

tokio::select! {
    _ = signal::ctrl_c() => {
        println!("Shutting down...");
    }
    _ = async {
        loop {
            let records = consumer.poll(Duration::from_secs(1)).await?;
            for record in records {
                process_record(record).await;
            }
        }
        #[allow(unreachable_code)]
        Ok::<(), KrafkaError>(())
    } => {}
}

// Commit final offsets and close
consumer.commit().await?;
consumer.close().await?;
```

## Poll Architecture

### Batch Fetch by Broker

Krafka optimizes the `poll()` operation by batching fetch requests per broker. Instead of sending 
one request per partition (O(n) round trips), it groups partitions by their leader broker and sends 
one request per broker (O(k) round trips, where k = number of unique leaders).

```
  Consumer.poll()
         │
         ▼
  ┌──────────────────────────────┐
  │ Group partitions by leader   │
  │                              │
  │ Broker 1: [p0, p1, p2]       │
  │ Broker 2: [p3, p4]           │
  │ Broker 3: [p5]               │
  └──────────────────────────────┘
         │
         ▼
  ┌──────────────────────────────┐
  │ One FetchRequest per broker  │
  │                              │
  │ Request 1 → Broker 1         │
  │ Request 2 → Broker 2         │
  │ Request 3 → Broker 3         │
  └──────────────────────────────┘
         │
         ▼
    Merge results
```

This optimization significantly improves throughput when consuming from topics with many partitions 
spread across multiple brokers.

### Incremental Fetch Sessions (KIP-227)

Krafka implements [KIP-227](https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability) fetch sessions to minimize fetch request sizes. Instead of sending the full partition list on every `poll()`, the broker tracks per-session state and the client sends only partition changes.

**How it works:**

1. On the first fetch to a broker, Krafka sends a full fetch request (epoch 0) with all partitions
2. The broker establishes a session and returns a `session_id`
3. On subsequent fetches, Krafka computes a diff against the previous session state:
   - **Changed partitions**: Only partitions with new offsets or different `max_bytes`
   - **Forgotten topics**: Partitions removed since the last fetch (e.g., after rebalance)
4. The broker applies the diff to its session state and returns data for all tracked partitions

```
  First poll()              Subsequent poll()
  (full fetch)              (incremental)
  ┌──────────────┐          ┌──────────────┐
  │ session_id: 0│          │ session_id: 42│
  │ epoch: 0     │          │ epoch: 1      │
  │ topics:      │          │ topics:       │
  │   p0, p1, p2 │    →     │   p1 (changed)│
  │   p3, p4     │          │ forgotten:    │
  └──────────────┘          │   p4 (removed)│
                            └──────────────┘
```

**Benefits:**

- **Reduced bandwidth**: With 100 partitions, incremental fetches can be 10-100x smaller
- **Lower broker CPU**: Broker parses smaller requests
- **Automatic fallback**: Falls back to Fetch v4 (full requests) for brokers that don't support v7+

**Error recovery:**

- `FetchSessionIdNotFound` or `InvalidFetchSessionEpoch` errors automatically reset the session
- The next fetch sends a full request to re-establish the session
- All sessions are reset on consumer group rebalance

Fetch sessions are enabled automatically when the broker supports Fetch API v7+. No configuration is needed.

### Closest-Replica Fetching (KIP-392)

Krafka implements [KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica) to allow consumers to fetch from the closest replica rather than always from the partition leader. This is especially useful in multi-datacenter or multi-availability-zone deployments where cross-rack traffic is expensive.

**Configuration:**

Set `client_rack` to the rack or availability zone of the consumer:

```rust
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .client_rack("us-east-1a")
    .build()
    .await?;
```

**How it works:**

1. The consumer includes its `rack_id` in Fetch requests (Fetch API v11+)
2. The broker compares the consumer's rack with each partition's replica placement
3. If a replica exists in the same rack, the broker returns it as `preferred_read_replica`
4. On subsequent polls, Krafka routes that partition's fetch to the preferred replica
5. The mapping expires after `metadata_max_age` (default 5 minutes), causing a fresh lookup

**Error fallback:**

- If a non-leader replica returns an error, the preferred replica mapping is cleared
- The next poll falls back to the partition leader
- On rebalance or unsubscribe, all preferred replica mappings are cleared

**Requirements:**

- Broker must support Fetch API v11 (Kafka 2.4+)
- Brokers must be configured with `broker.rack`
- When `client_rack` is not set, Krafka negotiates up to Fetch v10 (sessions + leader epoch fencing) but does not send a rack ID

## Performance Tips

### High Throughput

```rust
use krafka::consumer::Consumer;
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("high-throughput")
    .fetch_max_bytes(104857600)              // 100MB max fetch
    .max_partition_fetch_bytes(10485760)     // 10MB per partition
    .max_poll_records(10000)                 // Many records per poll
    .max_buffered_records(10000)              // Match poll batch size
    .fetch_max_wait(Duration::from_millis(100))
    .build()
    .await?;
```

### Low Latency

```rust
use krafka::consumer::Consumer;
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("low-latency")
    .fetch_min_bytes(1)                      // Return immediately when data available
    .fetch_max_wait(Duration::from_millis(10))
    .max_poll_records(1)                     // Process one at a time
    .build()
    .await?;
```

### Memory Efficiency

```rust
use krafka::consumer::Consumer;
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("memory-efficient")
    .fetch_max_bytes(1048576)                // Limit to 1MB
    .max_partition_fetch_bytes(262144)       // 256KB per partition
    .max_poll_records(100)                   // Limit in-memory records
    .max_buffered_records(200)               // Tight buffer cap
    .build()
    .await?;
```

## Static Group Membership (KIP-345)

Static group membership allows consumers to maintain a persistent identity across restarts,
avoiding unnecessary rebalances. When a consumer with a `group_instance_id` disconnects and
reconnects (within the session timeout), it automatically gets the same partition assignment
without triggering a rebalance for the entire group.

### Enabling Static Membership

```rust
use krafka::consumer::Consumer;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .group_instance_id("instance-1")  // Stable identity
    .session_timeout(Duration::from_secs(300))  // Longer timeout for restarts
    .build()
    .await?;

consumer.subscribe(&["my-topic"]).await?;
```

### How It Works

| Behavior | Dynamic (default) | Static (with `group_instance_id`) |
|----------|-------------------|-----------------------------------|
| **Disconnect** | Immediate rebalance | No rebalance until session timeout |
| **Reconnect** | New member, rebalance | Same member, no rebalance |
| **Rolling restart** | N rebalances | Zero rebalances |
| **Protocol version** | JoinGroup v0 | JoinGroup v5 |

When `group_instance_id` is set, Krafka automatically:
- Uses JoinGroup v5 and Heartbeat v3 protocol versions
- Includes the instance ID in all group coordinator requests (Join, Sync, Heartbeat, OffsetCommit, Leave)
- Uses LeaveGroup v3 with member identity on graceful shutdown

### Best Practices

- Assign a **unique** `group_instance_id` per consumer instance (e.g., hostname, pod name)
- Increase `session_timeout` to cover restart duration (e.g., 5 minutes for rolling deployments)
- Use with `CooperativeSticky` assignor for minimal partition movement

```rust
use krafka::consumer::{Consumer, PartitionAssignmentStrategy};
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .group_instance_id("pod-abc-123")
    .partition_assignment_strategy(PartitionAssignmentStrategy::CooperativeSticky)
    .session_timeout(Duration::from_secs(300))
    .build()
    .await?;
```

## KIP-848 Consumer Group Protocol

KIP-848 introduces a new consumer group protocol where the server performs
partition assignment instead of the group leader. This eliminates the
JoinGroup/SyncGroup round-trip and replaces it with a single
`ConsumerGroupHeartbeat` API (key 68, v0–v1).

### Enabling KIP-848

Set `GroupProtocol::Consumer` on the builder to use the KIP-848 consumer
protocol. Requires Kafka 3.7+ (KIP-848 GA in Kafka 4.0).

```rust
use krafka::consumer::{Consumer, GroupProtocol};

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .group_protocol(GroupProtocol::Consumer)  // KIP-848
    .build()
    .await?;

consumer.subscribe(&["my-topic"]).await?;
```

### How It Works

| Classic Protocol | KIP-848 Consumer Protocol |
|-----------------|--------------------------|
| JoinGroup + SyncGroup + Heartbeat | ConsumerGroupHeartbeat only |
| Client-side assignment (group leader) | Server-side assignment |
| Generation ID | Member epoch |
| `generation_id = -1` (unjoined) | `member_epoch = 0` (join) |
| LeaveGroup request | `member_epoch = -1` (permanent leave) or `-2` (static member temporary leave) |

With the consumer protocol:
1. A member joins by sending a heartbeat with `member_epoch = 0`
2. The coordinator assigns partitions and returns the assignment in the response
3. Members maintain their session by sending periodic heartbeats
4. The heartbeat task updates the local assignment and state when the broker returns new assignments
5. The consumer layer computes an incremental diff to determine revoked vs. newly assigned partitions. `on_partitions_revoked` is fired for the affected revoked partitions, while `on_partitions_assigned` receives the **full post-rebalance assignment** (consistent with the cooperative and eager paths in this crate)
6. To leave, a dynamic member sends `member_epoch = -1` (permanent). A static member (with `group_instance_id`) sends `member_epoch = -2` (temporary leave — the broker retains the assignment for the session-timeout window so the instance can rejoin quickly)

### Subscription Changes

If `subscribe()` is called with a different topic list while the consumer is
already active (state `Stable`), the existing heartbeat task is stopped and the
next `poll()` sends a full heartbeat with all fields (including the new topic
list). This mirrors the cooperative-rebalance subscription-change detection.

### Topic UUID Resolution

The ConsumerGroupHeartbeat response uses 16-byte topic UUIDs in assignments.
Krafka resolves these UUIDs to topic names with a two-level lookup order:

1. **Cluster metadata lookup** — first consult `ClusterMetadata::topic_name_for_id`.
   In Metadata v10 and later, brokers can return topic UUID → name mappings
   in metadata responses, and Krafka uses automatic API version negotiation
   to take advantage of that when supported.
2. **Local topic names cache** — if metadata does not contain the mapping,
   fall back to a local UUID → name cache built from previously resolved
   assignments.  This cache survives metadata cache flushes and mirrors the
   Java client's `AbstractMembershipManager` behavior once a name has been
   learned.

Successfully resolved names are cached locally. Unresolvable UUIDs still
trigger an automatic metadata refresh.

If topic UUIDs
remain unresolved after a metadata refresh during the initial heartbeat
response handling, the client returns a protocol error rather than silently
operating with an empty or partial assignment. Inside the background heartbeat
task, unresolved UUIDs produce a `warn!` log and the assignment is retained
for re-resolution on the next tick. The raw target assignment (with UUIDs) is
always retained so resolution can be retried after future updates or once a
UUID → name mapping becomes available.

The `StaleMemberEpoch` error (113) is handled as a transient condition: the
member epoch is updated from the response and the heartbeat retries on the
next tick without triggering a rebalance.

### Dynamic Heartbeat Interval

The coordinator may adjust the heartbeat interval over time by returning a
different `heartbeat_interval_ms` in the ConsumerGroupHeartbeat response. The
KIP-848 heartbeat task honours these updates: after each successful response,
the current interval is compared with the response value and, if changed, the
timer is reset to the new duration (with a minimum floor of 1 000 ms).

### Version Notes

- **v0** — Base version; compatible with Kafka 3.7+ (EA) and 4.0+ (GA)
- **v1** — Adds `SubscribedTopicRegex` for regex-based topic subscription (KIP-848) and requires consumer-generated member IDs (KIP-1082); available on Kafka 4.0+

Both v0 and v1 are supported (`CONSUMER_GROUP_HEARTBEAT_MIN = 0`, `CONSUMER_GROUP_HEARTBEAT_MAX = 1`).

### Error Handling

The ConsumerGroupHeartbeat response may return these KIP-848-specific errors:

| Error Code | Name | Handling |
|-----------|------|----------|
| 8 | `RebalanceInProgress` | Signal rebalance; consumer processes assignment diff on next poll |
| 14 | `CoordinatorLoadInProgress` | Transient — retry on next heartbeat tick |
| 15 | `NotCoordinator` | Clear cached coordinator, trigger rediscovery |
| 16 | `CoordinatorNotAvailable` | Clear cached coordinator, trigger rediscovery |
| 110 | `FencedMemberEpoch` | Fenced — heartbeat task stops, member preserves its `member_id` and rejoins with epoch 0 via a full heartbeat (all top-level fields) |
| 111 | `UnreleasedInstanceId` | Static member instance ID held by another member — same fencing recovery as `FencedMemberEpoch` |
| 112 | `UnsupportedAssignor` | Server-side assignor not recognized |
| 113 | `StaleMemberEpoch` | Update local epoch from response, retry on next heartbeat |
| 128 | `InvalidRegularExpression` | Regex subscription (v1+) is malformed |

### Fencing Recovery

When the heartbeat task receives `FencedMemberEpoch`, `UnknownMemberId`, or
`UnreleasedInstanceId`, it:

1. Signals the consumer layer (member invalidated + rebalance needed)
2. Stops the heartbeat task (no more skinny heartbeats with stale state)

On the next `poll()`, the consumer detects the fencing via `needs_rejoin()`:

1. Resets `member_epoch` to 0 and clears assignment/target state
2. **Preserves `member_id`** — per KIP-848, a fenced member must "rejoin with
   the same member id and epoch 0"
3. Sets state to `Unjoined`

The `handle_group_rebalance()` path then calls `ensure_active_membership()`,
which sends a **full heartbeat** (subscription, rebalance timeout, all
top-level fields) and starts a fresh heartbeat task.

### Requirements and Compatibility

- **Minimum broker version**: Kafka 4.0 (GA). Earlier brokers (3.7–3.9) expose
  KIP-848 behind `group.coordinator.new.enable=true` but it is **not production-stable
  before 4.0**. The client selects between classic (`GroupProtocol::Classic`) and
  KIP-848 (`GroupProtocol::Consumer`) at runtime via the `group_protocol` builder
  option — no Cargo feature flag is required.
- **Protocol stability**: `GroupProtocol::Classic` (the default) works with all
  Kafka versions ≥ 0.10. Use `GroupProtocol::Consumer` only when you can guarantee
  all brokers in the cluster run Kafka 4.0+.
- **Required API key**: API key 68 (`ConsumerGroupHeartbeat`), versions 0–1.
- **Known limitations vs classic protocol**:
  - Transactional offset commits (`TxnOffsetCommit`) are not yet implemented on
    the KIP-848 path.
  - Regex-based subscriptions require `ConsumerGroupHeartbeat` v1 (Kafka 4.0+).
  - The server-side assignor name is always the Kafka broker's uniform assignor;
    client-side assignors (`range`, `roundrobin`, `cooperative-sticky`) are
    ignored when `GroupProtocol::Consumer` is active.

### Describing KIP-848 Groups

To inspect a KIP-848 consumer group (state, epochs, member assignments), use
the AdminClient's `describe_consumer_groups()` method which auto-detects the
group type and dispatches to the appropriate API. See the
[Admin Client Guide](admin.md#describing-consumer-groups) for details.

### Limitations

Full transactional offset support (`TxnOffsetCommit`) is not yet
implemented.

## Consumer Interceptors

Interceptors allow you to observe records after they are fetched and monitor offset commits.
See the [Interceptors Guide](interceptors.md) for full details.

```rust
use krafka::interceptor::{ConsumerInterceptor, InterceptorResult};
use krafka::consumer::{Consumer, ConsumerRecord};
use std::sync::Arc;

#[derive(Debug)]
struct MetricsInterceptor;

impl ConsumerInterceptor for MetricsInterceptor {
    fn on_consume(&self, records: &[ConsumerRecord]) -> InterceptorResult {
        println!("Consumed {} records", records.len());
        Ok(())
    }
}

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .interceptor(Arc::new(MetricsInterceptor))
    .build()
    .await?;
```

## Log Compaction Awareness

Krafka correctly handles log-compacted topics where records may have been deleted within a batch.
Record offsets are calculated using each record's `offset_delta` rather than sequential indices,
ensuring accurate offset tracking even when records within a batch have been removed by compaction.

This means:
- `consumer.position()` always returns the correct offset, even on compacted topics
- Offset commits are accurate — no risk of re-processing or skipping records
- No special configuration needed; compaction awareness is built-in

### Tombstone Detection

Records in compacted topics with a key but no value are **tombstones** — deletion markers that eventually cause the key to be removed from the log. Use `ConsumerRecord::is_tombstone()` to detect them:

```rust
use std::time::Duration;

// Assuming `consumer` is an already-configured Consumer instance
let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
    if record.is_tombstone() {
        println!("Key {:?} was deleted", record.key);
    } else {
        println!("Key {:?} = {:?}", record.key, record.value);
    }
}
```

### CompactedTable

`CompactedTable` is a standalone, Kafka-agnostic data structure that maintains an in-memory key→value snapshot from consumer records. It handles tombstones automatically and tracks changes via `TableChange`. Because it is decoupled from the consumer, it composes with **any** consumer setup — group-coordinated, standalone, or manually assigned:

```rust
use krafka::consumer::{Consumer, CompactedTable};
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .build()
    .await?;
consumer.subscribe(&["user-profiles"]).await?;

let mut table = CompactedTable::new();
loop {
    let records = consumer.poll(Duration::from_secs(1)).await?;
    let changes = table.apply(&records);
    for change in &changes {
        if change.is_delete() {
            println!("Deleted: {:?}", change.key);
        } else if change.is_insert() {
            println!("New: {:?} = {:?}", change.key, change.new_value);
        } else {
            println!("Updated: {:?} = {:?}", change.key, change.new_value);
        }
    }
}
```

Key behaviors:
- **Tombstone handling** — keys are removed from the table when a null-valued record arrives
- **Keyless records** — silently skipped (compacted topics require keys)
- **Metrics** — `records_processed()` and `tombstones_processed()` are available for monitoring
- **Read access** — `get()`, `contains_key()`, `keys()`, `values()`, `iter()`, `snapshot()`, `len()`, `is_empty()`
- **Bulk load** — `ingest()` applies records without building a change list (ideal for initial scans)
- **Reset** — `clear()` removes all entries and resets counters (useful during rebalances)
- **Clone** — `table.clone()` produces a full copy including counters; `table.snapshot()` clones only the entries
- **Equality** — two tables are equal (`PartialEq`/`Eq`) when they contain the same entries; processing counters are ignored
- **IntoIterator** — `for (key, value) in &table { ... }` or `for (key, value) in table { ... }` (consuming)

`TableChange` derives `PartialEq` and `Eq`, so changes can be compared directly with `assert_eq!` in tests.

### CompactedTopicConsumer

For the common case of scanning an entire compacted topic from the beginning, `CompactedTopicConsumer` bundles a `Consumer` and `CompactedTable` together with built-in caught-up detection:

```rust
use krafka::consumer::CompactedTopicConsumer;
use std::time::Duration;

let mut ctc = CompactedTopicConsumer::builder()
    .bootstrap_servers("localhost:9092")
    .topic("user-profiles")
    .build()
    .await?;

// Build the initial snapshot (blocks until caught up)
ctc.scan(Duration::from_secs(1)).await?;
assert!(ctc.is_caught_up());

// Read individual keys via the table
if let Some(value) = ctc.table().get(b"user-123") {
    println!("User profile: {:?}", value);
}

// Get the full snapshot
let snapshot = ctc.table().snapshot();
println!("{} keys in table", snapshot.len());

// Tail for live updates
loop {
    let changes = ctc.poll(Duration::from_secs(1)).await?;
    for change in &changes {
        if change.is_delete() {
            println!("Deleted: {:?}", change.key);
        } else if change.is_insert() {
            println!("New: {:?} = {:?}", change.key, change.new_value);
        } else {
            println!("Updated: {:?} = {:?}", change.key, change.new_value);
        }
    }
}
```

Key behaviors:
- **No consumer group** — uses standalone assignment of all partitions
- **Starts from earliest** — `auto_offset_reset` is set to `Earliest` internally
- **Caught-up detection** — `scan()` returns when all partitions reach their high watermarks; `poll()` also updates the flag. Because the high watermark is refreshed on each fetch, `scan()` may block indefinitely on actively written topics — treat it as a best-effort catch-up rather than a bounded snapshot
- **Table access** — `table()` and `table_mut()` give direct access to the underlying `CompactedTable`
- **Consumer access** — `consumer()` and `consumer_mut()` expose the underlying `Consumer` for seek, pause, commit, or metrics; `into_parts()` decomposes the wrapper into `(Consumer, CompactedTable)`

For custom consumer setups (e.g., consumer groups, manual offsets), use `CompactedTable` directly.

#### From an Existing Consumer

If you need full control over the consumer configuration (TLS, auth, custom timeouts), build the consumer yourself and pass it in:

```rust
let consumer = Consumer::builder()
    .bootstrap_servers("broker:9093")
    .auto_offset_reset(AutoOffsetReset::Earliest)
    .enable_auto_commit(false)
    .auth(AuthConfig::sasl_scram_sha256("user", "password"))
    .build()
    .await?;
consumer.assign("config-topic", vec![0, 1, 2]).await?;

let mut ctc = CompactedTopicConsumer::from_consumer(consumer, "config-topic");
ctc.scan(Duration::from_secs(1)).await?;
```

#### Authentication

Pass an `AuthConfig` to connect to secured clusters:

```rust
use krafka::auth::AuthConfig;

let mut ctc = CompactedTopicConsumer::builder()
    .bootstrap_servers("broker:9093")
    .topic("config-topic")
    .auth(AuthConfig::sasl_scram_sha256("user", "password"))
    .build()
    .await?;
```

## Offset Lag Tracking

Krafka tracks consumer lag automatically by caching the high watermark returned in every fetch response. When the broker supports Fetch v5+, the log start offset is also cached. No additional network calls are needed.

Lag values are returned as `u64` (always non-negative, clamped at zero when the position is ahead of the watermark) to match the internal metrics representation.

```rust
// Per-partition lag (returns None if no fetch has completed for this partition)
if let Some(lag) = consumer.current_lag("my-topic", 0).await {
    println!("Partition 0 lag: {} records", lag);
}

// All partition lags at once
let lags = consumer.lag().await;
for ((topic, partition), lag) in &lags {
    println!("{}-{}: {} records behind", topic, partition, lag);
}

// Cached beginning/end offsets (no network call)
if let Some(start) = consumer.cached_beginning_offset("my-topic", 0).await {
    println!("Earliest available offset: {}", start);
}
if let Some(end) = consumer.cached_end_offset("my-topic", 0).await {
    println!("High watermark: {}", end);
}
```

Lag is also exposed via metrics (recomputed after every offset or high-watermark mutation — seek, commit, poll, offset reset, revocation):

| Metric | Description |
|--------|-------------|
| `lag` | Total lag across all assigned partitions |
| `lag_max` | Maximum per-partition lag |

High watermarks and log start offsets are automatically cleared when partitions are revoked or the consumer unsubscribes. Lag metrics are recomputed accordingly.

> **Staleness caveat** — High watermarks are only updated when a fetch
> response is received from the broker. If the consumer is paused, slow, or
> not polling, the cached watermarks (and therefore `current_lag`,
> `compute_aggregate_lag`, and the `lag`/`lag_max` metrics) can become stale
> and undercount the true lag. Treat lag values as eventually consistent
> rather than real-time.

## Next Steps

- [Interceptors Guide](interceptors.md) - Producer and consumer interceptor hooks
- [Producer Guide](producer.md) - Learn about producing messages
- [Configuration Reference](configuration.md) - All consumer options
- [Architecture Overview](architecture.md) - How the consumer works internally