crabka_client_consumer/poll.rs
1//! `Consumer::poll` — issues one `Fetch` covering every assigned partition,
2//! advances next-offsets, and returns the decoded records.
3
4use std::collections::HashMap;
5use std::time::Duration;
6
7use crabka_protocol::owned::fetch_request::{FetchPartition, FetchRequest, FetchTopic};
8use crabka_protocol::owned::list_offsets_request::{
9 ListOffsetsPartition, ListOffsetsRequest, ListOffsetsTopic,
10};
11
12use crate::builder::{AutoOffsetReset, IsolationLevel};
13use crate::consumer::{Consumer, ConsumerRecord};
14use crate::error::ConsumerError;
15
16/// Synthetic leader id meaning "leader unknown → use the bootstrap connection".
17/// Matches `BrokerPool`'s bootstrap slot so a fallback Fetch is sent via
18/// `Client::send` rather than `Client::broker(id)`.
19const BOOTSTRAP_LEADER: i32 = -1;
20
21/// One fetchable partition's request fields:
22/// `(partition, fetch_offset, current_leader_epoch, last_fetched_epoch)`.
23type FetchSpec = (i32, i64, i32, i32);
24
25/// Partitions to fetch, grouped first by leader id, then by topic.
26type FetchByLeader = HashMap<i32, HashMap<String, Vec<FetchSpec>>>;
27
28impl Consumer {
29 /// Returns the records from every v2 batch the broker returned per
30 /// assigned partition, or an empty vec on timeout. Under
31 /// `read_committed` isolation, control batches and records belonging to
32 /// aborted transactions are filtered client-side using the response's
33 /// `aborted_transactions` list (the broker returns verbatim bytes).
34 /// Rebalances are handled transparently by the internal coordinator
35 /// task, which mutates the live `assigned` snapshot in place; `poll()`
36 /// simply reads it on each call.
37 #[allow(clippy::too_many_lines)]
38 pub async fn poll(&mut self, timeout: Duration) -> Result<Vec<ConsumerRecord>, ConsumerError> {
39 // 1. Resolve any i64::MAX sentinels (auto.offset.reset=Latest) via
40 // ListOffsets(timestamp=-1).
41 self.resolve_latest_sentinels().await?;
42
43 // KIP-320: refresh leader epochs and proactively validate any position
44 // whose leader epoch advanced, before fetching. Truncated partitions
45 // are reset here (or surfaced for auto.offset.reset=None below).
46 self.refresh_leader_epochs().await?;
47 let truncated = self.validate_positions().await?;
48 if !truncated.is_empty() {
49 self.apply_truncation(&truncated).await?;
50 }
51
52 // 2. Build a FetchRequest covering every assigned partition.
53 let assigned = self.assigned.lock().await.clone();
54 if assigned.is_empty() {
55 tokio::time::sleep(timeout).await;
56 return Ok(Vec::new());
57 }
58
59 // Group the fetchable partitions by their leader id so each FetchRequest
60 // reaches the broker that actually hosts the partition. On a
61 // multi-broker cluster the bootstrap connection is rarely the leader of
62 // every partition, and a Fetch sent to a non-leader returns
63 // NOT_LEADER_OR_FOLLOWER instead of records. The per-partition leader
64 // lives in the `positions` sidecar (populated by `refresh_leader_epochs`
65 // from Metadata, whose `refresh_metadata` also teaches the pool each
66 // broker's address so `Client::broker(id)` can connect).
67 //
68 // A partition whose leader is unknown (or whose advertised address is
69 // unusable) falls back to the bootstrap connection (synthetic id `-1`)
70 // for this round. The `refresh_leader_epochs` pass at the top of every
71 // poll already re-pulls Metadata, so the next poll re-targets it once
72 // the leader is learnable — no extra refresh needed here.
73 //
74 // (partition, fetch_offset, current_leader_epoch, last_fetched_epoch).
75 // Lock order: next_offsets first, then positions (matching the
76 // coordinator's order so poll can never deadlock against a rebalance).
77 // Both guards are dropped before any per-leader Fetch is issued — the
78 // sends are await points and we must never hold a Mutex guard across an
79 // `.await`.
80 let mut by_leader: FetchByLeader = HashMap::new();
81 {
82 let offsets = self.next_offsets.lock().await;
83 let positions = self.positions.lock().await;
84 for (t, p) in &assigned {
85 // Skip partitions still awaiting validation — they must not be
86 // fetched until proven consistent.
87 if positions
88 .get(&(t.clone(), *p))
89 .is_some_and(|x| x.awaiting_validation)
90 {
91 continue;
92 }
93 let next = offsets.get(&(t.clone(), *p)).copied().unwrap_or(0);
94 let pos = positions.get(&(t.clone(), *p)).copied().unwrap_or_default();
95 // Route to the leader when its id is known AND the pool has a
96 // dialable address for it; otherwise fall back to the bootstrap
97 // connection. `knows_broker` is a synchronous registry lookup
98 // (no await), so it's safe to call while the offsets/positions
99 // guards are held. A leader whose advertised address is unusable
100 // (e.g. port 0 from an in-process test broker) is treated as
101 // unknown — the bootstrap broker is the leader in that
102 // single-broker case anyway.
103 let leader = if pos.leader_id >= 0 && self.client.knows_broker(pos.leader_id) {
104 pos.leader_id
105 } else {
106 BOOTSTRAP_LEADER
107 };
108 by_leader
109 .entry(leader)
110 .or_default()
111 .entry(t.clone())
112 .or_default()
113 .push((*p, next, pos.leader_epoch, pos.offset_epoch));
114 }
115 }
116
117 let topic_ids = self.topic_ids.lock().await.clone();
118 let timeout_ms = i32::try_from(timeout.as_millis()).unwrap_or(i32::MAX);
119
120 // Issue one Fetch per leader. All guards are released; we collect every
121 // response before re-locking to process them. Sent sequentially so a
122 // single parked leader can't starve the others' deadlines beyond the
123 // per-request timeout (and to keep the borrow on `self.client` simple).
124 let mut responses = Vec::with_capacity(by_leader.len());
125 for (leader, by_topic) in by_leader {
126 let topics: Vec<FetchTopic> = by_topic
127 .into_iter()
128 .map(|(name, plist)| {
129 let topic_id = topic_ids.get(&name).copied().unwrap_or_default();
130 FetchTopic {
131 topic: name,
132 topic_id,
133 partitions: plist
134 .into_iter()
135 .map(
136 |(p, off, leader_epoch, last_fetched_epoch)| FetchPartition {
137 partition: p,
138 fetch_offset: off,
139 current_leader_epoch: leader_epoch,
140 last_fetched_epoch,
141 partition_max_bytes: 1 << 20,
142 ..Default::default()
143 },
144 )
145 .collect(),
146 ..Default::default()
147 }
148 })
149 .collect();
150 let req = FetchRequest {
151 max_wait_ms: timeout_ms,
152 min_bytes: 1,
153 max_bytes: 50 * 1024 * 1024,
154 isolation_level: self.isolation_level.wire(),
155 topics,
156 ..Default::default()
157 };
158 let resp = if leader == BOOTSTRAP_LEADER {
159 self.client.send(req).await?
160 } else {
161 self.client.broker(leader).send(req).await?
162 };
163 responses.push(resp);
164 }
165
166 // 3. Decode each partition's RecordBatches, advance next-offsets.
167 //
168 // The wire-level `records` field can carry multiple concatenated
169 // RecordBatches; we iterate every v2 batch, emit one ConsumerRecord
170 // per Record, and bump next_offsets to the highest seen offset + 1.
171 // Reverse-map topic_id → name. At Fetch v ≥ 13 the response carries
172 // only `topic_id`; `topic.topic` is empty.
173 let id_to_name: HashMap<_, _> = topic_ids
174 .iter()
175 .map(|(name, id)| (*id, name.clone()))
176 .collect();
177
178 // Re-snapshot the assignment: a cooperative rebalance may have
179 // revoked partitions while this Fetch was in flight. Records for
180 // partitions we no longer own must be dropped — the new owner will
181 // serve them from the offset we committed at revoke time. Snapshot
182 // before locking `next_offsets` to keep the coordinator's
183 // assigned→next_offsets lock order (avoids deadlock).
184 let still_owned: std::collections::HashSet<(String, i32)> =
185 self.assigned.lock().await.iter().cloned().collect();
186
187 let mut out: Vec<ConsumerRecord> = Vec::new();
188 // Set when a NOT_LEADER_OR_FOLLOWER response carried no current_leader
189 // hint: we refresh metadata after the processing loop (we can't `.await`
190 // while the `offsets`/`positions` guards are held) so the next poll
191 // re-targets the new leader.
192 let mut refresh_after_processing = false;
193 let mut offsets = self.next_offsets.lock().await;
194 // Process every per-leader response with the identical per-partition
195 // logic (error-first, offset advance, fetch_floor, read_committed). The
196 // partition key is unique across leaders, so the order responses are
197 // drained in doesn't matter.
198 for topic in responses.iter().flat_map(|resp| &resp.responses) {
199 let topic_name = if topic.topic.is_empty() {
200 id_to_name.get(&topic.topic_id).cloned().unwrap_or_default()
201 } else {
202 topic.topic.clone()
203 };
204 for part in &topic.partitions {
205 // Drop records for partitions revoked while this Fetch was
206 // in flight (cooperative rebalance transparency).
207 if !still_owned.contains(&(topic_name.clone(), part.partition_index)) {
208 continue;
209 }
210
211 let key = (topic_name.clone(), part.partition_index);
212
213 // KIP-320 in-band truncation: leader served no records and told
214 // us where to truncate (diverging_epoch.end_offset >= 0).
215 if part.diverging_epoch.end_offset >= 0 {
216 self.handle_truncation_in_poll(
217 &mut offsets,
218 &key,
219 part.diverging_epoch.end_offset,
220 )?;
221 continue;
222 }
223 // Error-first: inspect the partition error_code before decoding.
224 match part.error_code {
225 0 => {}
226 1 /* OFFSET_OUT_OF_RANGE */ => {
227 // Reset per policy using the response's log_start_offset
228 // (the broker includes it in every OOR partition response).
229 // We must NOT use a hardcoded 0: if retention has moved
230 // log_start forward, re-fetching from 0 re-triggers OOR
231 // forever. Mirrors what the replicator does on OOR.
232 // No RPC needed — log_start_offset is already in `part`.
233 let fetch_offset = offsets.get(&key).copied().unwrap_or(-1);
234 let log_start = part.log_start_offset;
235 let (topic, partition) = (key.0.clone(), key.1);
236 match self.auto_offset_reset {
237 AutoOffsetReset::Earliest => {
238 // Reset to the real log start, not 0.
239 offsets.insert(key.clone(), log_start);
240 }
241 AutoOffsetReset::Latest => {
242 // Plant i64::MAX sentinel; resolved next poll
243 // by resolve_latest_sentinels via ListOffsets.
244 offsets.insert(key.clone(), i64::MAX);
245 }
246 AutoOffsetReset::None => {
247 return Err(ConsumerError::LogTruncation {
248 topic,
249 partition,
250 fetch_offset,
251 safe_offset: log_start,
252 });
253 }
254 }
255 continue;
256 }
257 6 /* NOT_LEADER_OR_FOLLOWER */ => {
258 // A routing miss, NOT a truncation: we sent the Fetch to
259 // a broker that no longer leads this partition (e.g. a
260 // leadership change since the last metadata refresh).
261 // Re-target the leader so the next poll routes correctly;
262 // do NOT set awaiting_validation (nothing diverged).
263 let mut positions = self.positions.lock().await;
264 if part.current_leader.leader_id >= 0 {
265 // The broker handed us the new leader inline (KIP-320
266 // current_leader hint). Adopt it immediately.
267 let p = positions.entry(key.clone()).or_default();
268 p.leader_id = part.current_leader.leader_id;
269 p.leader_epoch = part.current_leader.leader_epoch;
270 } else {
271 // No hint: force a metadata refresh after this loop
272 // so the next poll learns the new leader. Reset the
273 // stale leader id so the bootstrap fallback (and a
274 // re-flag, if metadata advances the epoch) kicks in.
275 if let Some(p) = positions.get_mut(&key) {
276 p.leader_id = -1;
277 }
278 drop(positions);
279 refresh_after_processing = true;
280 }
281 continue;
282 }
283 74 /* FENCED_LEADER_EPOCH */
284 | 75 /* UNKNOWN_LEADER_EPOCH */ => {
285 let mut positions = self.positions.lock().await;
286 if let Some(p) = positions.get_mut(&key) {
287 // Force refresh_leader_epochs to re-flag against
288 // fresher metadata next poll (any real epoch >= 0 > -1).
289 p.leader_epoch = -1;
290 // Only gate on validation when we have a consumed epoch
291 // to validate against. A never-consumed partition
292 // (offset_epoch < 0) has nothing to validate; flagging it
293 // would wedge it — validate_positions skips offset_epoch
294 // < 0, and the fetch builder skips awaiting_validation.
295 if p.offset_epoch >= 0 {
296 p.awaiting_validation = true;
297 }
298 }
299 continue;
300 }
301 other => {
302 return Err(ConsumerError::Server(other));
303 }
304 }
305
306 let Some(payload) = &part.records else {
307 continue;
308 };
309 // Legacy MessageSet payloads are skipped here; the consumer
310 // only handles v2 batches.
311 let Some(batches) = payload.as_v2() else {
312 continue;
313 };
314 // The broker returns whole record batches whose last offset is
315 // >= the requested fetch_offset, even when the batch starts
316 // before it (e.g. after an OFFSET_OUT_OF_RANGE reset or when
317 // a single large batch straddles log_start). Kafka's JVM
318 // client skips any records below the position; we do the same.
319 // Capture the position now — before `next_offset_after` updates
320 // it — so the filter baseline matches the actual fetch offset.
321 let fetch_floor = offsets.get(&key).copied().unwrap_or(0);
322 // read_committed filtering happens entirely client-side: the
323 // broker returns verbatim on-disk bytes (control batches,
324 // aborted records and all) plus an `aborted_transactions`
325 // list. We replay Kafka's algorithm — walk batches in offset
326 // order, tracking which producer_ids have an open aborted
327 // transaction, and drop transactional records from those.
328 let read_committed = self.isolation_level == IsolationLevel::ReadCommitted;
329 // Aborted txns sorted by first_offset; consumed front-to-back
330 // as batch offsets advance past each entry's start.
331 let mut aborted: std::collections::VecDeque<(i64, i64)> = if read_committed {
332 let mut v: Vec<(i64, i64)> = part
333 .aborted_transactions
334 .as_deref()
335 .unwrap_or(&[])
336 .iter()
337 .map(|a| (a.first_offset, a.producer_id))
338 .collect();
339 v.sort_unstable();
340 v.into()
341 } else {
342 std::collections::VecDeque::new()
343 };
344 // producer_ids with a currently-open aborted transaction.
345 let mut aborted_pids: std::collections::HashSet<i64> =
346 std::collections::HashSet::new();
347 for batch in batches {
348 // Move every aborted txn that starts at or before this
349 // batch into the active set.
350 if read_committed {
351 while let Some(&(first_offset, pid)) = aborted.front() {
352 if first_offset <= batch.base_offset {
353 aborted_pids.insert(pid);
354 aborted.pop_front();
355 } else {
356 break;
357 }
358 }
359 }
360 // Control batches (commit/abort markers) carry no user
361 // records. A control batch for a producer ends its aborted
362 // transaction; drop the batch either way.
363 if batch.attributes.is_control_batch() {
364 if read_committed {
365 aborted_pids.remove(&batch.producer_id);
366 }
367 continue;
368 }
369 // Drop transactional records belonging to an aborted txn.
370 if read_committed
371 && batch.attributes.is_transactional()
372 && aborted_pids.contains(&batch.producer_id)
373 {
374 continue;
375 }
376 for r in &batch.records {
377 let offset = batch.base_offset + i64::from(r.offset_delta);
378 // Skip records that precede the fetch floor: the broker
379 // returned a whole batch whose base_offset < our
380 // position (straddle case — see fetch_floor comment).
381 if offset < fetch_floor {
382 continue;
383 }
384 out.push(ConsumerRecord {
385 topic: topic_name.clone(),
386 partition: part.partition_index,
387 offset,
388 leader_epoch: batch.partition_leader_epoch,
389 timestamp: batch.base_timestamp + r.timestamp_delta,
390 key: r.key.clone(),
391 value: r.value.clone(),
392 });
393 }
394 }
395 if let Some(next) = next_offset_after(batches) {
396 offsets.insert(key.clone(), next);
397 // Advance the position's offset_epoch to the highest batch
398 // leader epoch consumed, so the next Fetch sends the correct
399 // last_fetched_epoch (KIP-320). Lock order holds: offsets is
400 // already locked, positions acquired second.
401 if let Some(last_epoch) = batches.iter().map(|b| b.partition_leader_epoch).max()
402 {
403 let mut positions = self.positions.lock().await;
404 positions.entry(key.clone()).or_default().offset_epoch = last_epoch;
405 }
406 }
407 }
408 }
409 // Drop the offsets guard before any `.await`: refreshing metadata is an
410 // RPC, and we must never hold a Mutex guard across an await point.
411 drop(offsets);
412 if refresh_after_processing {
413 // Best-effort: a NOT_LEADER_OR_FOLLOWER without a current_leader
414 // hint means our cached leader is stale; learn the new one so the
415 // next poll routes correctly. A failure is non-fatal — the next
416 // refresh_leader_epochs pass retries.
417 let _ = self.client.refresh_metadata().await;
418 }
419 Ok(out)
420 }
421}
422
423/// The offset to fetch next after consuming `batches`: one past the highest
424/// `base_offset + last_offset_delta` across all decoded batches. `None` when
425/// there are no batches (offset unchanged). Used so the consumer advances past
426/// control/aborted batches that emit no records, instead of re-fetching them.
427fn next_offset_after(batches: &[crabka_protocol::records::RecordBatch]) -> Option<i64> {
428 batches
429 .iter()
430 .map(|b| b.base_offset + i64::from(b.last_offset_delta) + 1)
431 .max()
432}
433
434impl Consumer {
435 /// Replace any `i64::MAX` sentinels in `next_offsets` (planted by
436 /// `auto_offset_reset = Latest` at build time) with the real log-end
437 /// offset from `ListOffsets(timestamp=-1)`.
438 async fn resolve_latest_sentinels(&self) -> Result<(), ConsumerError> {
439 let mut offsets = self.next_offsets.lock().await;
440 let sentinels: Vec<(String, i32)> = offsets
441 .iter()
442 .filter(|(_, v)| **v == i64::MAX)
443 .map(|(k, _)| k.clone())
444 .collect();
445 if sentinels.is_empty() {
446 return Ok(());
447 }
448 let mut by_topic: HashMap<String, Vec<i32>> = HashMap::new();
449 for (t, p) in &sentinels {
450 by_topic.entry(t.clone()).or_default().push(*p);
451 }
452 let topics: Vec<ListOffsetsTopic> = by_topic
453 .into_iter()
454 .map(|(name, partitions)| ListOffsetsTopic {
455 name,
456 partitions: partitions
457 .into_iter()
458 .map(|p| ListOffsetsPartition {
459 partition_index: p,
460 timestamp: -1, // LATEST
461 ..Default::default()
462 })
463 .collect(),
464 ..Default::default()
465 })
466 .collect();
467 let lo = self
468 .client
469 .send(ListOffsetsRequest {
470 replica_id: -1,
471 topics,
472 ..Default::default()
473 })
474 .await?;
475 for t in &lo.topics {
476 for p in &t.partitions {
477 offsets.insert((t.name.clone(), p.partition_index), p.offset);
478 }
479 }
480 Ok(())
481 }
482}
483
484impl Consumer {
485 /// Apply truncations detected by the proactive validate pass to
486 /// `next_offsets`, honoring `auto.offset.reset` (None → error on the first
487 /// truncated partition).
488 async fn apply_truncation(
489 &self,
490 truncated: &HashMap<(String, i32), i64>,
491 ) -> Result<(), ConsumerError> {
492 let mut offsets = self.next_offsets.lock().await;
493 for (key, safe_offset) in truncated {
494 if let AutoOffsetReset::None = self.auto_offset_reset {
495 let fetch_offset = offsets.get(key).copied().unwrap_or(-1);
496 return Err(ConsumerError::LogTruncation {
497 topic: key.0.clone(),
498 partition: key.1,
499 fetch_offset,
500 safe_offset: *safe_offset,
501 });
502 }
503 offsets.insert(key.clone(), *safe_offset);
504 }
505 Ok(())
506 }
507
508 /// In-band `diverging_epoch` handler used inside the poll loop while the
509 /// `next_offsets` guard is already held.
510 fn handle_truncation_in_poll(
511 &self,
512 offsets: &mut HashMap<(String, i32), i64>,
513 key: &(String, i32),
514 safe_offset: i64,
515 ) -> Result<(), ConsumerError> {
516 if let AutoOffsetReset::None = self.auto_offset_reset {
517 let fetch_offset = offsets.get(key).copied().unwrap_or(-1);
518 return Err(ConsumerError::LogTruncation {
519 topic: key.0.clone(),
520 partition: key.1,
521 fetch_offset,
522 safe_offset,
523 });
524 }
525 offsets.insert(key.clone(), safe_offset);
526 Ok(())
527 }
528}
529
530#[cfg(test)]
531mod offset_advance_tests {
532 use assert2::assert;
533 use crabka_protocol::records::{RecordBatch, RecordsPayload};
534
535 #[test]
536 fn advance_target_uses_last_offset_delta_not_record_count() {
537 // A batch spanning offsets 10..=14 (last_offset_delta = 4) but carrying
538 // zero surviving records must still advance the fetch offset to 15.
539 let batch = RecordBatch {
540 base_offset: 10,
541 last_offset_delta: 4,
542 records: vec![],
543 ..Default::default()
544 };
545 let payload = RecordsPayload::V2(vec![batch]);
546 let batches = payload.as_v2().unwrap();
547 assert!(super::next_offset_after(batches) == Some(15));
548 }
549
550 #[test]
551 fn advance_target_none_for_empty() {
552 let payload = RecordsPayload::V2(vec![]);
553 assert!(super::next_offset_after(payload.as_v2().unwrap()) == None);
554 }
555}