1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
//! KIP-858: report which log-directory UUID hosts each local replica by
//! sending `AssignReplicasToDirs` (api key 73) to the controller leader.
//!
//! Two entry points are exported:
//!
//! - [`build_request`] — pure grouping builder, unit-testable with no
//! network.
//! - [`send_assignments`] — async sender that mirrors the pattern used by
//! `isr_maintenance::send_alter_partition`.
use std::collections::BTreeMap;
use std::sync::Arc;
use crabka_protocol::owned::assign_replicas_to_dirs_request::{
AssignReplicasToDirsRequest, DirectoryData, PartitionData, TopicData,
};
/// Group flat `(topic_id, partition, dir_uuid)` assignments into the nested
/// `AssignReplicasToDirs` wire shape:
/// `directories[]` → `topics[]` → `partitions[]`.
///
/// Groups deterministically (`BTreeMap` keyed by the 16-byte UUID representation)
/// so the resulting request is stable across calls, which is important for
/// unit tests.
///
/// `broker_epoch` is set to `-1` (unknown), matching the convention used by
/// `send_alter_partition`.
pub(crate) fn build_request(
broker_id: i32,
assignments: &[(uuid::Uuid, i32, uuid::Uuid)], // (topic_id, partition, dir_uuid)
) -> AssignReplicasToDirsRequest {
// dir_uuid → topic_id → [partition_index]
let mut by_dir: BTreeMap<[u8; 16], BTreeMap<[u8; 16], Vec<i32>>> = BTreeMap::new();
for (topic_id, partition, dir_uuid) in assignments {
by_dir
.entry(*dir_uuid.as_bytes())
.or_default()
.entry(*topic_id.as_bytes())
.or_default()
.push(*partition);
}
let directories: Vec<DirectoryData> = by_dir
.into_iter()
.map(|(dir_bytes, topics_map)| {
let topics: Vec<TopicData> = topics_map
.into_iter()
.map(|(topic_bytes, mut partitions)| {
partitions.sort_unstable();
TopicData {
topic_id: crabka_protocol::primitives::uuid::Uuid(topic_bytes),
partitions: partitions
.into_iter()
.map(|p| PartitionData {
partition_index: p,
..Default::default()
})
.collect(),
..Default::default()
}
})
.collect();
DirectoryData {
id: crabka_protocol::primitives::uuid::Uuid(dir_bytes),
topics,
..Default::default()
}
})
.collect();
AssignReplicasToDirsRequest {
broker_id,
broker_epoch: -1,
directories,
..Default::default()
}
}
/// Send an `AssignReplicasToDirs` report to the controller leader.
///
/// Resolves the controller leader's address from `controller`, opens a
/// short-lived `crabka_client_core::Client`, sends `req`, and checks the
/// top-level `error_code`.
///
/// Returns `Err` on:
/// - no controller leader in the image
/// - leader broker record not in the image
/// - connection failure
/// - send/receive failure
/// - non-zero `error_code` in the response
pub(crate) async fn send_assignments(
controller: &Arc<dyn crate::metadata_source::MetadataSource>,
client_id: &str,
req: AssignReplicasToDirsRequest,
) -> Result<(), String> {
// Resolve the controller leader address — same pattern as
// `isr_maintenance::send_alter_partition`.
let leader_id = *controller.watch_leader().borrow();
let Some(leader_id) = leader_id else {
return Err("no controller leader".into());
};
let image = controller.current_image();
let Some(broker_rec) = image.broker(leader_id) else {
return Err("controller leader not in image".into());
};
let addr = format!("{}:{}", broker_rec.host, broker_rec.port);
let client = crabka_client_core::Client::builder()
.bootstrap(addr)
.client_id(client_id.to_owned())
.build()
.await
.map_err(|e| format!("connect: {e}"))?;
let resp = client.send(req).await.map_err(|e| format!("send: {e}"))?;
if resp.error_code != 0 {
return Err(format!(
"AssignReplicasToDirs rejected by controller: error_code={}",
resp.error_code
));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use uuid::Uuid;
#[test]
fn build_request_groups_correctly() {
// Assignments:
// (tA, 0, dX) → dir dX, topic tA, partition 0
// (tA, 1, dX) → dir dX, topic tA, partition 1
// (tB, 0, dY) → dir dY, topic tB, partition 0
let ta = Uuid::from_u128(0xAAAA);
let tb = Uuid::from_u128(0xBBBB);
let dx = Uuid::from_u128(0xDDDD);
let dy = Uuid::from_u128(0xEEEE);
let assignments = [(ta, 0i32, dx), (ta, 1i32, dx), (tb, 0i32, dy)];
let req = build_request(7, &assignments);
// broker_id and epoch
assert!(req.broker_id == 7);
assert!(req.broker_epoch == -1);
// Two directories
assert!(req.directories.len() == 2);
// Find dir dX and dY by their UUID bytes.
let dir_x = req
.directories
.iter()
.find(|d| d.id.0 == *dx.as_bytes())
.expect("dir dX missing");
let dir_y = req
.directories
.iter()
.find(|d| d.id.0 == *dy.as_bytes())
.expect("dir dY missing");
// dX should have exactly one topic (tA) with two partitions [0, 1].
assert!(dir_x.topics.len() == 1);
let topic_a = dir_x
.topics
.iter()
.find(|t| t.topic_id.0 == *ta.as_bytes())
.expect("topic tA in dX missing");
let mut part_indices: Vec<i32> = topic_a
.partitions
.iter()
.map(|p| p.partition_index)
.collect();
part_indices.sort_unstable();
assert!(part_indices == vec![0, 1]);
// dY should have exactly one topic (tB) with one partition [0].
assert!(dir_y.topics.len() == 1);
let topic_b = dir_y
.topics
.iter()
.find(|t| t.topic_id.0 == *tb.as_bytes())
.expect("topic tB in dY missing");
assert!(topic_b.partitions.len() == 1);
assert!(topic_b.partitions[0].partition_index == 0);
}
#[test]
fn build_request_empty_assignments() {
let req = build_request(1, &[]);
assert!(req.broker_id == 1);
assert!(req.directories.is_empty());
}
}