1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
//! Per-leader-partition ISR maintenance. Compares each follower's
//! last-fetch time vs `replica_lag_time_max_ms` and proposes
//! `AlterPartition` shrink/expand to the controller leader.
#![allow(dead_code)]
use std::sync::Arc;
use std::time::{Duration, Instant};
use crabka_raft::NodeId;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use crate::partition::Partition;
use crate::partition_registry::PartitionRegistry;
pub(crate) struct Config {
pub node_id: NodeId,
pub partitions: Arc<PartitionRegistry>,
pub controller: Arc<dyn crate::metadata_source::MetadataSource>,
pub replica_lag_time_max: Duration,
pub broker_id: i32,
pub shutdown: CancellationToken,
/// Bumped on each proposed shrink / expand.
pub metrics: crate::metrics::BrokerMetrics,
}
pub(crate) async fn run(cfg: Config) {
let mut tick = tokio::time::interval(Duration::from_secs(1));
// Reused across ticks to avoid re-allocating the snapshot Vec each second.
// Holds cheap `Arc<Partition>` clones (no String allocation, no second
// registry lookup). Cleared and refilled each tick.
let mut snapshot: Vec<Arc<Partition>> = Vec::new();
loop {
tokio::select! {
_ = tick.tick() => {},
() = cfg.shutdown.cancelled() => return,
}
// Snapshot the partition values as cheap Arc clones in a single
// iteration, then iterate the owned `Vec` so we never hold a shard
// guard across a yield point.
snapshot.clear();
snapshot.extend(cfg.partitions.arcs());
for part in snapshot.drain(..) {
if part
.current_leader
.load(std::sync::atomic::Ordering::Acquire)
!= cfg.node_id
{
continue;
}
let Some(proposal) = compute_proposal(&part, cfg.replica_lag_time_max).await else {
continue;
};
// Classify the proposal as shrink/expand using the ISRs captured
// inside `compute_proposal`'s single lock scope. `compute_proposal`
// already filtered for "actually changed", so at least one of these
// bumps fires. Reusing its captured `prev_isr` avoids a second
// `replica_state` lock and closes the TOCTOU window where the ISR
// could change between the two acquisitions.
let prev_isr: std::collections::HashSet<NodeId> =
proposal.prev_isr.iter().copied().collect();
let next_isr: std::collections::HashSet<NodeId> =
proposal.new_isr.iter().copied().collect();
if prev_isr.difference(&next_isr).next().is_some() {
cfg.metrics.isr_shrinks_total.inc();
}
if next_isr.difference(&prev_isr).next().is_some() {
cfg.metrics.isr_expands_total.inc();
}
if let Err(e) = send_alter_partition(
&cfg.controller,
cfg.broker_id,
&part.topic,
part.partition_id,
proposal.new_isr,
proposal.leader_epoch,
)
.await
{
warn!(topic = %part.topic, partition = part.partition_id, error = %e,
"AlterPartition propose failed");
}
}
}
}
/// A computed ISR change proposal. All fields are captured within
/// `compute_proposal`'s single `replica_state` lock scope so the caller
/// can classify shrink/expand and submit the proposal without re-locking
/// (and without a TOCTOU window where the ISR shifts between locks).
struct Proposal {
/// The pre-proposal ISR (sorted), used by the caller for shrink/expand
/// metric classification.
prev_isr: Vec<NodeId>,
/// The proposed new ISR (sorted). Guaranteed `!= prev_isr`.
new_isr: Vec<NodeId>,
/// Leader epoch to stamp on the `AlterPartition` request.
leader_epoch: i32,
}
/// Returns `Some(Proposal)` if the ISR should change, else `None`.
async fn compute_proposal(part: &Partition, lag_max: Duration) -> Option<Proposal> {
let st = part.replica_state.lock().await;
let now = Instant::now();
// Capture the pre-proposal ISR (sorted) once, inside this lock scope.
let mut prev_isr: Vec<NodeId> = st.isr.iter().copied().collect();
prev_isr.sort_unstable();
let mut new_isr: Vec<NodeId> = prev_isr.clone();
// Shrink: drop followers lagging > lag_max.
new_isr.retain(|n| {
st.per_follower
.get(n)
.is_none_or(|stats| now.duration_since(stats.last_fetch) <= lag_max)
});
// Expand: add followers in per_follower not in current ISR that have
// been recently caught up.
for (n, stats) in &st.per_follower {
if !st.isr.contains(n)
&& now.duration_since(stats.last_caught_up) <= lag_max
&& !new_isr.contains(n)
{
new_isr.push(*n);
}
}
new_isr.sort_unstable();
let no_change = new_isr == prev_isr;
if no_change {
None
} else {
Some(Proposal {
prev_isr,
new_isr,
leader_epoch: st.current_leader_epoch,
})
}
}
async fn send_alter_partition(
controller: &Arc<dyn crate::metadata_source::MetadataSource>,
broker_id: i32,
topic: &str,
partition: i32,
new_isr: Vec<NodeId>,
leader_epoch: i32,
) -> Result<(), String> {
use crabka_protocol::owned::alter_partition_request::{
AlterPartitionRequest, BrokerState, PartitionData, TopicData,
};
// Look up the controller leader's address via metadata image.
let leader_id = *controller.watch_leader().borrow();
let Some(leader_id) = leader_id else {
return Err("no controller leader".into());
};
let image = controller.current_image();
let Some(broker_rec) = image.broker(leader_id) else {
return Err("controller leader not in image".into());
};
let addr = format!("{}:{}", broker_rec.host, broker_rec.port);
// Look up topic_id from the metadata image and convert to the protocol Uuid type.
let topic_id = {
let raw: [u8; 16] = image
.topic(topic)
.map_or([0u8; 16], |t| *t.topic_id.as_bytes());
crabka_protocol::primitives::uuid::Uuid(raw)
};
// `new_isr` is the v2 field (versions 2 only on the wire).
// `new_isr_with_epochs` is the v3 field; the client negotiates MAX_VERSION
// (= 3), so we must populate both so that whichever version is selected
// carries the correct ISR. The handler side reads `new_isr_with_epochs`
// when `new_isr` is empty (i.e. version 3).
// KIP-903: per-member epochs come from the metadata image; unknown brokers fall back to -1.
let new_isr_i32: Vec<i32> = new_isr
.iter()
.map(|n| i32::try_from(*n).unwrap_or(i32::MAX))
.collect();
let new_isr_with_epochs: Vec<BrokerState> = new_isr_i32
.iter()
.map(|&bid| BrokerState {
broker_id: bid,
broker_epoch: image
.broker_epoch(u64::try_from(bid).unwrap_or(0))
.unwrap_or(-1),
..Default::default()
})
.collect();
let req = AlterPartitionRequest {
broker_id,
// KIP-903: the partition leader stamps its own broker epoch and each
// ISR member's epoch from the metadata image so the controller can
// fence stale replicas. Unknown brokers fall back to -1 (skip-check).
broker_epoch: image
.broker_epoch(u64::try_from(broker_id).unwrap_or(0))
.unwrap_or(-1),
topics: vec![TopicData {
topic_id,
partitions: vec![PartitionData {
partition_index: partition,
leader_epoch,
new_isr: new_isr_i32,
new_isr_with_epochs,
leader_recovery_state: 0,
partition_epoch: 0,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let client = crabka_client_core::Client::builder()
.bootstrap(addr)
.client_id(format!("crabka-broker-{broker_id}-isr"))
.build()
.await
.map_err(|e| format!("connect: {e}"))?;
let resp = client.send(req).await.map_err(|e| format!("send: {e}"))?;
// Log the global error code and per-partition error codes so failures
// are visible (previously _resp was discarded, hiding non-zero codes).
let global_err = resp.error_code;
let part_err = resp
.topics
.first()
.and_then(|t| t.partitions.first())
.map_or(0, |p| p.error_code);
if global_err != 0 || part_err != 0 {
warn!(
topic = topic,
partition = partition,
new_isr_len = new_isr.len(),
global_error_code = global_err,
partition_error_code = part_err,
"AlterPartition rejected by controller"
);
return Err(format!(
"AlterPartition rejected: global={global_err} partition={part_err}"
));
}
debug!(
topic = topic,
partition = partition,
new_isr_len = new_isr.len(),
"AlterPartition proposed"
);
Ok(())
}