Skip to main content

crabka_client_consumer/
poll.rs

1//! `Consumer::poll` — issues one `Fetch` covering every assigned partition,
2//! advances next-offsets, and returns the decoded records.
3
4use std::collections::HashMap;
5use std::time::Duration;
6
7use crabka_protocol::owned::fetch_request::{FetchPartition, FetchRequest, FetchTopic};
8use crabka_protocol::owned::list_offsets_request::{
9    ListOffsetsPartition, ListOffsetsRequest, ListOffsetsTopic,
10};
11
12use crate::builder::{AutoOffsetReset, IsolationLevel};
13use crate::consumer::{Consumer, ConsumerRecord};
14use crate::error::ConsumerError;
15
16/// Synthetic leader id meaning "leader unknown → use the bootstrap connection".
17/// Matches `BrokerPool`'s bootstrap slot so a fallback Fetch is sent via
18/// `Client::send` rather than `Client::broker(id)`.
19const BOOTSTRAP_LEADER: i32 = -1;
20
21/// One fetchable partition's request fields:
22/// `(partition, fetch_offset, current_leader_epoch, last_fetched_epoch)`.
23type FetchSpec = (i32, i64, i32, i32);
24
25/// Partitions to fetch, grouped first by leader id, then by topic.
26type FetchByLeader = HashMap<i32, HashMap<String, Vec<FetchSpec>>>;
27
28impl Consumer {
29    /// Returns the records from every v2 batch the broker returned per
30    /// assigned partition, or an empty vec on timeout. Under
31    /// `read_committed` isolation, control batches and records belonging to
32    /// aborted transactions are filtered client-side using the response's
33    /// `aborted_transactions` list (the broker returns verbatim bytes).
34    /// Rebalances are handled transparently by the internal coordinator
35    /// task, which mutates the live `assigned` snapshot in place; `poll()`
36    /// simply reads it on each call.
37    #[allow(clippy::too_many_lines)]
38    pub async fn poll(&mut self, timeout: Duration) -> Result<Vec<ConsumerRecord>, ConsumerError> {
39        // 1. Resolve any i64::MAX sentinels (auto.offset.reset=Latest) via
40        //    ListOffsets(timestamp=-1).
41        self.resolve_latest_sentinels().await?;
42
43        // KIP-320: refresh leader epochs and proactively validate any position
44        // whose leader epoch advanced, before fetching. Truncated partitions
45        // are reset here (or surfaced for auto.offset.reset=None below).
46        self.refresh_leader_epochs().await?;
47        let truncated = self.validate_positions().await?;
48        if !truncated.is_empty() {
49            self.apply_truncation(&truncated).await?;
50        }
51
52        // 2. Build a FetchRequest covering every assigned partition.
53        let assigned = self.assigned.lock().await.clone();
54        if assigned.is_empty() {
55            tokio::time::sleep(timeout).await;
56            return Ok(Vec::new());
57        }
58
59        // Group the fetchable partitions by their leader id so each FetchRequest
60        // reaches the broker that actually hosts the partition. On a
61        // multi-broker cluster the bootstrap connection is rarely the leader of
62        // every partition, and a Fetch sent to a non-leader returns
63        // NOT_LEADER_OR_FOLLOWER instead of records. The per-partition leader
64        // lives in the `positions` sidecar (populated by `refresh_leader_epochs`
65        // from Metadata, whose `refresh_metadata` also teaches the pool each
66        // broker's address so `Client::broker(id)` can connect).
67        //
68        // A partition whose leader is unknown (or whose advertised address is
69        // unusable) falls back to the bootstrap connection (synthetic id `-1`)
70        // for this round. The `refresh_leader_epochs` pass at the top of every
71        // poll already re-pulls Metadata, so the next poll re-targets it once
72        // the leader is learnable — no extra refresh needed here.
73        //
74        // (partition, fetch_offset, current_leader_epoch, last_fetched_epoch).
75        // Lock order: next_offsets first, then positions (matching the
76        // coordinator's order so poll can never deadlock against a rebalance).
77        // Both guards are dropped before any per-leader Fetch is issued — the
78        // sends are await points and we must never hold a Mutex guard across an
79        // `.await`.
80        let mut by_leader: FetchByLeader = HashMap::new();
81        {
82            let offsets = self.next_offsets.lock().await;
83            let positions = self.positions.lock().await;
84            for (t, p) in &assigned {
85                // Skip partitions still awaiting validation — they must not be
86                // fetched until proven consistent.
87                if positions
88                    .get(&(t.clone(), *p))
89                    .is_some_and(|x| x.awaiting_validation)
90                {
91                    continue;
92                }
93                let next = offsets.get(&(t.clone(), *p)).copied().unwrap_or(0);
94                let pos = positions.get(&(t.clone(), *p)).copied().unwrap_or_default();
95                // Route to the leader when its id is known AND the pool has a
96                // dialable address for it; otherwise fall back to the bootstrap
97                // connection. `knows_broker` is a synchronous registry lookup
98                // (no await), so it's safe to call while the offsets/positions
99                // guards are held. A leader whose advertised address is unusable
100                // (e.g. port 0 from an in-process test broker) is treated as
101                // unknown — the bootstrap broker is the leader in that
102                // single-broker case anyway.
103                let leader = if pos.leader_id >= 0 && self.client.knows_broker(pos.leader_id) {
104                    pos.leader_id
105                } else {
106                    BOOTSTRAP_LEADER
107                };
108                by_leader
109                    .entry(leader)
110                    .or_default()
111                    .entry(t.clone())
112                    .or_default()
113                    .push((*p, next, pos.leader_epoch, pos.offset_epoch));
114            }
115        }
116
117        let topic_ids = self.topic_ids.lock().await.clone();
118        let timeout_ms = i32::try_from(timeout.as_millis()).unwrap_or(i32::MAX);
119
120        // Issue one Fetch per leader. All guards are released; we collect every
121        // response before re-locking to process them. Sent sequentially so a
122        // single parked leader can't starve the others' deadlines beyond the
123        // per-request timeout (and to keep the borrow on `self.client` simple).
124        let mut responses = Vec::with_capacity(by_leader.len());
125        for (leader, by_topic) in by_leader {
126            let topics: Vec<FetchTopic> = by_topic
127                .into_iter()
128                .map(|(name, plist)| {
129                    let topic_id = topic_ids.get(&name).copied().unwrap_or_default();
130                    FetchTopic {
131                        topic: name,
132                        topic_id,
133                        partitions: plist
134                            .into_iter()
135                            .map(
136                                |(p, off, leader_epoch, last_fetched_epoch)| FetchPartition {
137                                    partition: p,
138                                    fetch_offset: off,
139                                    current_leader_epoch: leader_epoch,
140                                    last_fetched_epoch,
141                                    partition_max_bytes: 1 << 20,
142                                    ..Default::default()
143                                },
144                            )
145                            .collect(),
146                        ..Default::default()
147                    }
148                })
149                .collect();
150            let req = FetchRequest {
151                max_wait_ms: timeout_ms,
152                min_bytes: 1,
153                max_bytes: 50 * 1024 * 1024,
154                isolation_level: self.isolation_level.wire(),
155                topics,
156                ..Default::default()
157            };
158            let resp = if leader == BOOTSTRAP_LEADER {
159                self.client.send(req).await?
160            } else {
161                self.client.broker(leader).send(req).await?
162            };
163            responses.push(resp);
164        }
165
166        // 3. Decode each partition's RecordBatches, advance next-offsets.
167        //
168        // The wire-level `records` field can carry multiple concatenated
169        // RecordBatches; we iterate every v2 batch, emit one ConsumerRecord
170        // per Record, and bump next_offsets to the highest seen offset + 1.
171        // Reverse-map topic_id → name. At Fetch v ≥ 13 the response carries
172        // only `topic_id`; `topic.topic` is empty.
173        let id_to_name: HashMap<_, _> = topic_ids
174            .iter()
175            .map(|(name, id)| (*id, name.clone()))
176            .collect();
177
178        // Re-snapshot the assignment: a cooperative rebalance may have
179        // revoked partitions while this Fetch was in flight. Records for
180        // partitions we no longer own must be dropped — the new owner will
181        // serve them from the offset we committed at revoke time. Snapshot
182        // before locking `next_offsets` to keep the coordinator's
183        // assigned→next_offsets lock order (avoids deadlock).
184        let still_owned: std::collections::HashSet<(String, i32)> =
185            self.assigned.lock().await.iter().cloned().collect();
186
187        let mut out: Vec<ConsumerRecord> = Vec::new();
188        // Set when a NOT_LEADER_OR_FOLLOWER response carried no current_leader
189        // hint: we refresh metadata after the processing loop (we can't `.await`
190        // while the `offsets`/`positions` guards are held) so the next poll
191        // re-targets the new leader.
192        let mut refresh_after_processing = false;
193        let mut offsets = self.next_offsets.lock().await;
194        // Process every per-leader response with the identical per-partition
195        // logic (error-first, offset advance, fetch_floor, read_committed). The
196        // partition key is unique across leaders, so the order responses are
197        // drained in doesn't matter.
198        for topic in responses.iter().flat_map(|resp| &resp.responses) {
199            let topic_name = if topic.topic.is_empty() {
200                id_to_name.get(&topic.topic_id).cloned().unwrap_or_default()
201            } else {
202                topic.topic.clone()
203            };
204            for part in &topic.partitions {
205                // Drop records for partitions revoked while this Fetch was
206                // in flight (cooperative rebalance transparency).
207                if !still_owned.contains(&(topic_name.clone(), part.partition_index)) {
208                    continue;
209                }
210
211                let key = (topic_name.clone(), part.partition_index);
212
213                // KIP-320 in-band truncation: leader served no records and told
214                // us where to truncate (diverging_epoch.end_offset >= 0).
215                if part.diverging_epoch.end_offset >= 0 {
216                    self.handle_truncation_in_poll(
217                        &mut offsets,
218                        &key,
219                        part.diverging_epoch.end_offset,
220                    )?;
221                    continue;
222                }
223                // Error-first: inspect the partition error_code before decoding.
224                match part.error_code {
225                    0 => {}
226                    1 /* OFFSET_OUT_OF_RANGE */ => {
227                        // Reset per policy using the response's log_start_offset
228                        // (the broker includes it in every OOR partition response).
229                        // We must NOT use a hardcoded 0: if retention has moved
230                        // log_start forward, re-fetching from 0 re-triggers OOR
231                        // forever. Mirrors what the replicator does on OOR.
232                        // No RPC needed — log_start_offset is already in `part`.
233                        let fetch_offset = offsets.get(&key).copied().unwrap_or(-1);
234                        let log_start = part.log_start_offset;
235                        let (topic, partition) = (key.0.clone(), key.1);
236                        match self.auto_offset_reset {
237                            AutoOffsetReset::Earliest => {
238                                // Reset to the real log start, not 0.
239                                offsets.insert(key.clone(), log_start);
240                            }
241                            AutoOffsetReset::Latest => {
242                                // Plant i64::MAX sentinel; resolved next poll
243                                // by resolve_latest_sentinels via ListOffsets.
244                                offsets.insert(key.clone(), i64::MAX);
245                            }
246                            AutoOffsetReset::None => {
247                                return Err(ConsumerError::LogTruncation {
248                                    topic,
249                                    partition,
250                                    fetch_offset,
251                                    safe_offset: log_start,
252                                });
253                            }
254                        }
255                        continue;
256                    }
257                    6 /* NOT_LEADER_OR_FOLLOWER */ => {
258                        // A routing miss, NOT a truncation: we sent the Fetch to
259                        // a broker that no longer leads this partition (e.g. a
260                        // leadership change since the last metadata refresh).
261                        // Re-target the leader so the next poll routes correctly;
262                        // do NOT set awaiting_validation (nothing diverged).
263                        let mut positions = self.positions.lock().await;
264                        if part.current_leader.leader_id >= 0 {
265                            // The broker handed us the new leader inline (KIP-320
266                            // current_leader hint). Adopt it immediately.
267                            let p = positions.entry(key.clone()).or_default();
268                            p.leader_id = part.current_leader.leader_id;
269                            p.leader_epoch = part.current_leader.leader_epoch;
270                        } else {
271                            // No hint: force a metadata refresh after this loop
272                            // so the next poll learns the new leader. Reset the
273                            // stale leader id so the bootstrap fallback (and a
274                            // re-flag, if metadata advances the epoch) kicks in.
275                            if let Some(p) = positions.get_mut(&key) {
276                                p.leader_id = -1;
277                            }
278                            drop(positions);
279                            refresh_after_processing = true;
280                        }
281                        continue;
282                    }
283                    74 /* FENCED_LEADER_EPOCH */
284                    | 75 /* UNKNOWN_LEADER_EPOCH */ => {
285                        let mut positions = self.positions.lock().await;
286                        if let Some(p) = positions.get_mut(&key) {
287                            // Force refresh_leader_epochs to re-flag against
288                            // fresher metadata next poll (any real epoch >= 0 > -1).
289                            p.leader_epoch = -1;
290                            // Only gate on validation when we have a consumed epoch
291                            // to validate against. A never-consumed partition
292                            // (offset_epoch < 0) has nothing to validate; flagging it
293                            // would wedge it — validate_positions skips offset_epoch
294                            // < 0, and the fetch builder skips awaiting_validation.
295                            if p.offset_epoch >= 0 {
296                                p.awaiting_validation = true;
297                            }
298                        }
299                        continue;
300                    }
301                    other => {
302                        return Err(ConsumerError::Server(other));
303                    }
304                }
305
306                let Some(payload) = &part.records else {
307                    continue;
308                };
309                // Legacy MessageSet payloads are skipped here; the consumer
310                // only handles v2 batches.
311                let Some(batches) = payload.as_v2() else {
312                    continue;
313                };
314                // The broker returns whole record batches whose last offset is
315                // >= the requested fetch_offset, even when the batch starts
316                // before it (e.g. after an OFFSET_OUT_OF_RANGE reset or when
317                // a single large batch straddles log_start). Kafka's JVM
318                // client skips any records below the position; we do the same.
319                // Capture the position now — before `next_offset_after` updates
320                // it — so the filter baseline matches the actual fetch offset.
321                let fetch_floor = offsets.get(&key).copied().unwrap_or(0);
322                // read_committed filtering happens entirely client-side: the
323                // broker returns verbatim on-disk bytes (control batches,
324                // aborted records and all) plus an `aborted_transactions`
325                // list. We replay Kafka's algorithm — walk batches in offset
326                // order, tracking which producer_ids have an open aborted
327                // transaction, and drop transactional records from those.
328                let read_committed = self.isolation_level == IsolationLevel::ReadCommitted;
329                // Aborted txns sorted by first_offset; consumed front-to-back
330                // as batch offsets advance past each entry's start.
331                let mut aborted: std::collections::VecDeque<(i64, i64)> = if read_committed {
332                    let mut v: Vec<(i64, i64)> = part
333                        .aborted_transactions
334                        .as_deref()
335                        .unwrap_or(&[])
336                        .iter()
337                        .map(|a| (a.first_offset, a.producer_id))
338                        .collect();
339                    v.sort_unstable();
340                    v.into()
341                } else {
342                    std::collections::VecDeque::new()
343                };
344                // producer_ids with a currently-open aborted transaction.
345                let mut aborted_pids: std::collections::HashSet<i64> =
346                    std::collections::HashSet::new();
347                for batch in batches {
348                    // Move every aborted txn that starts at or before this
349                    // batch into the active set.
350                    if read_committed {
351                        while let Some(&(first_offset, pid)) = aborted.front() {
352                            if first_offset <= batch.base_offset {
353                                aborted_pids.insert(pid);
354                                aborted.pop_front();
355                            } else {
356                                break;
357                            }
358                        }
359                    }
360                    // Control batches (commit/abort markers) carry no user
361                    // records. A control batch for a producer ends its aborted
362                    // transaction; drop the batch either way.
363                    if batch.attributes.is_control_batch() {
364                        if read_committed {
365                            aborted_pids.remove(&batch.producer_id);
366                        }
367                        continue;
368                    }
369                    // Drop transactional records belonging to an aborted txn.
370                    if read_committed
371                        && batch.attributes.is_transactional()
372                        && aborted_pids.contains(&batch.producer_id)
373                    {
374                        continue;
375                    }
376                    for r in &batch.records {
377                        let offset = batch.base_offset + i64::from(r.offset_delta);
378                        // Skip records that precede the fetch floor: the broker
379                        // returned a whole batch whose base_offset < our
380                        // position (straddle case — see fetch_floor comment).
381                        if offset < fetch_floor {
382                            continue;
383                        }
384                        out.push(ConsumerRecord {
385                            topic: topic_name.clone(),
386                            partition: part.partition_index,
387                            offset,
388                            leader_epoch: batch.partition_leader_epoch,
389                            timestamp: batch.base_timestamp + r.timestamp_delta,
390                            key: r.key.clone(),
391                            value: r.value.clone(),
392                        });
393                    }
394                }
395                if let Some(next) = next_offset_after(batches) {
396                    offsets.insert(key.clone(), next);
397                    // Advance the position's offset_epoch to the highest batch
398                    // leader epoch consumed, so the next Fetch sends the correct
399                    // last_fetched_epoch (KIP-320). Lock order holds: offsets is
400                    // already locked, positions acquired second.
401                    if let Some(last_epoch) = batches.iter().map(|b| b.partition_leader_epoch).max()
402                    {
403                        let mut positions = self.positions.lock().await;
404                        positions.entry(key.clone()).or_default().offset_epoch = last_epoch;
405                    }
406                }
407            }
408        }
409        // Drop the offsets guard before any `.await`: refreshing metadata is an
410        // RPC, and we must never hold a Mutex guard across an await point.
411        drop(offsets);
412        if refresh_after_processing {
413            // Best-effort: a NOT_LEADER_OR_FOLLOWER without a current_leader
414            // hint means our cached leader is stale; learn the new one so the
415            // next poll routes correctly. A failure is non-fatal — the next
416            // refresh_leader_epochs pass retries.
417            let _ = self.client.refresh_metadata().await;
418        }
419        Ok(out)
420    }
421}
422
423/// The offset to fetch next after consuming `batches`: one past the highest
424/// `base_offset + last_offset_delta` across all decoded batches. `None` when
425/// there are no batches (offset unchanged). Used so the consumer advances past
426/// control/aborted batches that emit no records, instead of re-fetching them.
427fn next_offset_after(batches: &[crabka_protocol::records::RecordBatch]) -> Option<i64> {
428    batches
429        .iter()
430        .map(|b| b.base_offset + i64::from(b.last_offset_delta) + 1)
431        .max()
432}
433
434impl Consumer {
435    /// Replace any `i64::MAX` sentinels in `next_offsets` (planted by
436    /// `auto_offset_reset = Latest` at build time) with the real log-end
437    /// offset from `ListOffsets(timestamp=-1)`.
438    async fn resolve_latest_sentinels(&self) -> Result<(), ConsumerError> {
439        let mut offsets = self.next_offsets.lock().await;
440        let sentinels: Vec<(String, i32)> = offsets
441            .iter()
442            .filter(|(_, v)| **v == i64::MAX)
443            .map(|(k, _)| k.clone())
444            .collect();
445        if sentinels.is_empty() {
446            return Ok(());
447        }
448        let mut by_topic: HashMap<String, Vec<i32>> = HashMap::new();
449        for (t, p) in &sentinels {
450            by_topic.entry(t.clone()).or_default().push(*p);
451        }
452        let topics: Vec<ListOffsetsTopic> = by_topic
453            .into_iter()
454            .map(|(name, partitions)| ListOffsetsTopic {
455                name,
456                partitions: partitions
457                    .into_iter()
458                    .map(|p| ListOffsetsPartition {
459                        partition_index: p,
460                        timestamp: -1, // LATEST
461                        ..Default::default()
462                    })
463                    .collect(),
464                ..Default::default()
465            })
466            .collect();
467        let lo = self
468            .client
469            .send(ListOffsetsRequest {
470                replica_id: -1,
471                topics,
472                ..Default::default()
473            })
474            .await?;
475        for t in &lo.topics {
476            for p in &t.partitions {
477                offsets.insert((t.name.clone(), p.partition_index), p.offset);
478            }
479        }
480        Ok(())
481    }
482}
483
484impl Consumer {
485    /// Apply truncations detected by the proactive validate pass to
486    /// `next_offsets`, honoring `auto.offset.reset` (None → error on the first
487    /// truncated partition).
488    async fn apply_truncation(
489        &self,
490        truncated: &HashMap<(String, i32), i64>,
491    ) -> Result<(), ConsumerError> {
492        let mut offsets = self.next_offsets.lock().await;
493        for (key, safe_offset) in truncated {
494            if let AutoOffsetReset::None = self.auto_offset_reset {
495                let fetch_offset = offsets.get(key).copied().unwrap_or(-1);
496                return Err(ConsumerError::LogTruncation {
497                    topic: key.0.clone(),
498                    partition: key.1,
499                    fetch_offset,
500                    safe_offset: *safe_offset,
501                });
502            }
503            offsets.insert(key.clone(), *safe_offset);
504        }
505        Ok(())
506    }
507
508    /// In-band `diverging_epoch` handler used inside the poll loop while the
509    /// `next_offsets` guard is already held.
510    fn handle_truncation_in_poll(
511        &self,
512        offsets: &mut HashMap<(String, i32), i64>,
513        key: &(String, i32),
514        safe_offset: i64,
515    ) -> Result<(), ConsumerError> {
516        if let AutoOffsetReset::None = self.auto_offset_reset {
517            let fetch_offset = offsets.get(key).copied().unwrap_or(-1);
518            return Err(ConsumerError::LogTruncation {
519                topic: key.0.clone(),
520                partition: key.1,
521                fetch_offset,
522                safe_offset,
523            });
524        }
525        offsets.insert(key.clone(), safe_offset);
526        Ok(())
527    }
528}
529
530#[cfg(test)]
531mod offset_advance_tests {
532    use assert2::assert;
533    use crabka_protocol::records::{RecordBatch, RecordsPayload};
534
535    #[test]
536    fn advance_target_uses_last_offset_delta_not_record_count() {
537        // A batch spanning offsets 10..=14 (last_offset_delta = 4) but carrying
538        // zero surviving records must still advance the fetch offset to 15.
539        let batch = RecordBatch {
540            base_offset: 10,
541            last_offset_delta: 4,
542            records: vec![],
543            ..Default::default()
544        };
545        let payload = RecordsPayload::V2(vec![batch]);
546        let batches = payload.as_v2().unwrap();
547        assert!(super::next_offset_after(batches) == Some(15));
548    }
549
550    #[test]
551    fn advance_target_none_for_empty() {
552        let payload = RecordsPayload::V2(vec![]);
553        assert!(super::next_offset_after(payload.as_v2().unwrap()) == None);
554    }
555}