1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! KIP-320 consumer position validation. Two responsibilities:
//! 1. Refresh per-partition leader id + leader epoch from `Metadata`, flagging
//! a partition `awaiting_validation` when its leader epoch advances.
//! 2. For flagged partitions, issue `OffsetForLeaderEpoch` and decide (via
//! `position::classify`) whether to resume or reset for truncation.
use std::collections::HashMap;
use crate::consumer::Consumer;
use crate::error::ConsumerError;
use crate::position::{ValidationOutcome, classify};
impl Consumer {
/// Refresh leader id / leader epoch for every partition reported by
/// `Metadata`. A partition whose metadata leader epoch is greater than the
/// epoch we last consumed (`offset_epoch`) is flagged `awaiting_validation`.
pub(crate) async fn refresh_leader_epochs(&self) -> Result<(), ConsumerError> {
// `refresh_metadata` (not a bare `send(MetadataRequest)`) so the main
// client's BrokerPool learns each broker's (id → addr) mapping — this
// is what lets `poll`/`validate` route Fetch and OffsetForLeaderEpoch
// to the partition *leader* via `Client::broker(id)` instead of always
// hitting the bootstrap connection.
let md = self.client.refresh_metadata().await?;
let mut positions = self.positions.lock().await;
for t in &md.topics {
let Some(name) = &t.name else { continue };
for p in &t.partitions {
let key = (name.clone(), p.partition_index);
let entry = positions.entry(key).or_default();
entry.leader_id = p.leader_id;
if p.leader_epoch > entry.leader_epoch {
entry.leader_epoch = p.leader_epoch;
if p.leader_epoch > entry.offset_epoch && entry.offset_epoch >= 0 {
entry.awaiting_validation = true;
}
}
}
}
Ok(())
}
/// Validate every `awaiting_validation` partition via `OffsetForLeaderEpoch`.
/// Returns the set of partitions that truncated, mapped to the safe offset
/// the caller must reset `next_offsets` to. Clears the validation flag for
/// partitions confirmed consistent.
pub(crate) async fn validate_positions(
&self,
) -> Result<HashMap<(String, i32), i64>, ConsumerError> {
// Snapshot the work to do under the lock, then issue RPCs unlocked.
// Lock order: next_offsets first, then positions — matching the
// coordinator's established order, so we never deadlock against poll.
// (topic, partition, offset, offset_epoch, leader_epoch, leader_id).
let to_validate: Vec<(String, i32, i64, i32, i32, i32)> = {
let offsets = self.next_offsets.lock().await;
let mut positions = self.positions.lock().await;
// Defensive: clear awaiting_validation for any partition whose
// offset_epoch < 0 (never consumed). There is nothing to validate
// below an offset never consumed, and leaving the flag set would
// wedge the partition — validate_positions skips it but the fetch
// builder also skips it, causing a permanent stall.
for p in positions.values_mut() {
if p.awaiting_validation && p.offset_epoch < 0 {
p.awaiting_validation = false;
}
}
positions
.iter()
.filter(|(_, p)| p.awaiting_validation && p.offset_epoch >= 0)
.filter_map(|((t, part), p)| {
let off = *offsets.get(&(t.clone(), *part))?;
Some((
t.clone(),
*part,
off,
p.offset_epoch,
p.leader_epoch,
p.leader_id,
))
})
.collect()
};
let mut truncated: HashMap<(String, i32), i64> = HashMap::new();
for (topic, partition, offset, offset_epoch, leader_epoch, leader_id) in to_validate {
// RPC issued with no lock held. KIP-320 requires the
// OffsetForLeaderEpoch reach the partition *leader* — it is the
// only replica with the authoritative epoch→end-offset history.
// Route to the leader when its id is known and the pool has a
// dialable address for it (registry populated by
// `refresh_leader_epochs` → `refresh_metadata`); fall back to the
// bootstrap connection otherwise (e.g. single-broker test brokers
// advertising port 0, where bootstrap is the leader anyway).
let answer = if leader_id >= 0 && self.client.knows_broker(leader_id) {
self.client
.offset_for_leader_epoch_on(
leader_id,
&topic,
partition,
leader_epoch,
offset_epoch,
)
.await?
} else {
self.client
.offset_for_leader_epoch(&topic, partition, leader_epoch, offset_epoch)
.await?
};
// Re-check the partition is still assigned + epoch unchanged before
// applying — a rebalance may have moved it.
let mut positions = self.positions.lock().await;
let Some(pos) = positions.get_mut(&(topic.clone(), partition)) else {
continue;
};
if pos.leader_epoch != leader_epoch {
continue; // metadata moved under us; revalidate next poll
}
// CRITICAL: inspect the per-partition error_code BEFORE classify.
// On a non-zero code (FENCED_LEADER_EPOCH=74, UNKNOWN_LEADER_EPOCH=75,
// NOT_LEADER_OR_FOLLOWER=6, UNKNOWN_TOPIC_OR_PARTITION=3, ...) the
// broker returns end_offset = -1. Feeding that into `classify` would
// look like truncation and wrongly reset to 0. Instead leave the
// partition flagged for re-validation (and force a metadata refresh
// next poll) and skip it.
if answer.error_code != 0 {
// Reset leader_epoch so refresh_leader_epochs re-flags + the next
// pass re-issues this RPC against fresher metadata.
pos.leader_epoch = -1;
pos.awaiting_validation = true;
continue;
}
match classify(offset, offset_epoch, answer.leader_epoch, answer.end_offset) {
ValidationOutcome::Valid { leader_epoch: le } => {
pos.offset_epoch = le;
pos.awaiting_validation = false;
}
ValidationOutcome::Truncated { safe_offset } => {
pos.awaiting_validation = false;
truncated.insert((topic, partition), safe_offset);
}
}
}
Ok(truncated)
}
}