1use crate::node::NodeId;
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6
7#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
9pub struct PartitionId {
10 pub topic: String,
11 pub partition: u32,
12}
13
14impl PartitionId {
15 pub fn new(topic: impl Into<String>, partition: u32) -> Self {
16 Self {
17 topic: topic.into(),
18 partition,
19 }
20 }
21}
22
23impl std::fmt::Display for PartitionId {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 write!(f, "{}/{}", self.topic, self.partition)
26 }
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "lowercase")]
32pub enum ReplicaState {
33 InSync,
35 CatchingUp,
37 Offline,
39 Adding,
41 Removing,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ReplicaInfo {
48 pub node_id: NodeId,
50 pub state: ReplicaState,
52 pub log_end_offset: u64,
54 pub high_watermark: u64,
56 pub lag: u64,
58}
59
60impl ReplicaInfo {
61 pub fn new(node_id: NodeId) -> Self {
62 Self {
63 node_id,
64 state: ReplicaState::Adding,
65 log_end_offset: 0,
66 high_watermark: 0,
67 lag: 0,
68 }
69 }
70
71 pub fn is_in_sync(&self) -> bool {
73 matches!(self.state, ReplicaState::InSync)
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct PartitionState {
80 pub id: PartitionId,
82
83 pub leader: Option<NodeId>,
85
86 pub preferred_leader: NodeId,
88
89 pub replicas: Vec<ReplicaInfo>,
91
92 pub isr: HashSet<NodeId>,
94
95 pub leader_epoch: u64,
97
98 pub high_watermark: u64,
100
101 pub log_start_offset: u64,
103
104 pub online: bool,
106
107 pub under_replicated: bool,
109}
110
111impl PartitionState {
112 pub fn new(id: PartitionId, replicas: Vec<NodeId>) -> Self {
114 let preferred_leader = replicas.first().cloned().unwrap_or_default();
115 let replica_infos: Vec<_> = replicas
116 .iter()
117 .map(|n| ReplicaInfo::new(n.clone()))
118 .collect();
119 let isr: HashSet<_> = replicas.into_iter().collect();
120
121 let under_replicated = isr.len() < replica_infos.len();
123
124 Self {
125 id,
126 leader: None,
127 preferred_leader,
128 replicas: replica_infos,
129 isr,
130 leader_epoch: 0,
131 high_watermark: 0,
132 log_start_offset: 0,
133 online: false,
134 under_replicated,
135 }
136 }
137
138 pub fn elect_leader(&mut self) -> Option<&NodeId> {
140 if self.isr.contains(&self.preferred_leader) {
142 self.leader = Some(self.preferred_leader.clone());
143 } else {
144 let mut sorted_isr: Vec<_> = self.isr.iter().collect();
147 sorted_isr.sort();
148 self.leader = sorted_isr.first().map(|n| (*n).clone());
149 }
150
151 if self.leader.is_some() {
152 self.leader_epoch += 1;
153 self.online = true;
154 }
155
156 self.leader.as_ref()
157 }
158
159 pub fn add_to_isr(&mut self, node_id: &NodeId) {
161 self.isr.insert(node_id.clone());
162 self.update_under_replicated();
163
164 if let Some(replica) = self.replicas.iter_mut().find(|r| &r.node_id == node_id) {
166 replica.state = ReplicaState::InSync;
167 }
168 }
169
170 pub fn remove_from_isr(&mut self, node_id: &NodeId) {
172 self.isr.remove(node_id);
173 self.update_under_replicated();
174
175 if let Some(replica) = self.replicas.iter_mut().find(|r| &r.node_id == node_id) {
177 replica.state = ReplicaState::CatchingUp;
178 }
179
180 if self.leader.as_ref() == Some(node_id) {
182 self.leader = None;
183 self.online = false;
184 }
185 }
186
187 pub fn update_replica_offset(&mut self, node_id: &NodeId, log_end_offset: u64) {
189 let leader_leo = self.leader.as_ref().and_then(|leader| {
191 self.replicas
192 .iter()
193 .find(|r| &r.node_id == leader)
194 .map(|r| r.log_end_offset)
195 });
196
197 if let Some(replica) = self.replicas.iter_mut().find(|r| &r.node_id == node_id) {
199 replica.log_end_offset = log_end_offset;
200
201 if let Some(leo) = leader_leo {
203 replica.lag = leo.saturating_sub(log_end_offset);
204 }
205 }
206 }
207
208 pub fn advance_high_watermark(&mut self) {
210 let min_leo = self
212 .replicas
213 .iter()
214 .filter(|r| self.isr.contains(&r.node_id))
215 .map(|r| r.log_end_offset)
216 .min()
217 .unwrap_or(self.high_watermark);
218
219 if min_leo > self.high_watermark {
220 self.high_watermark = min_leo;
221
222 for replica in &mut self.replicas {
224 replica.high_watermark = self.high_watermark;
225 }
226 }
227 }
228
229 fn update_under_replicated(&mut self) {
231 let expected = self.replicas.len();
232 let in_sync = self.isr.len();
233 self.under_replicated = in_sync < expected;
234 }
235
236 pub fn replica_nodes(&self) -> Vec<&NodeId> {
238 self.replicas.iter().map(|r| &r.node_id).collect()
239 }
240
241 pub fn isr_nodes(&self) -> Vec<&NodeId> {
243 let mut nodes: Vec<_> = self.isr.iter().collect();
244 nodes.sort();
245 nodes
246 }
247
248 pub fn is_leader(&self, node_id: &NodeId) -> bool {
250 self.leader.as_ref() == Some(node_id)
251 }
252
253 pub fn is_replica(&self, node_id: &NodeId) -> bool {
255 self.replicas.iter().any(|r| &r.node_id == node_id)
256 }
257
258 pub fn replication_factor(&self) -> usize {
260 self.replicas.len()
261 }
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
266pub struct TopicConfig {
267 pub name: String,
269
270 pub partitions: u32,
272
273 pub replication_factor: u16,
275
276 pub retention_ms: u64,
278
279 pub segment_bytes: u64,
281
282 pub min_isr: u16,
284
285 pub config: std::collections::HashMap<String, String>,
287}
288
289impl TopicConfig {
290 pub fn new(name: impl Into<String>, partitions: u32, replication_factor: u16) -> Self {
291 Self {
292 name: name.into(),
293 partitions,
294 replication_factor,
295 retention_ms: 7 * 24 * 60 * 60 * 1000, segment_bytes: 1024 * 1024 * 1024, min_isr: 1,
298 config: std::collections::HashMap::new(),
299 }
300 }
301
302 pub fn with_retention_ms(mut self, ms: u64) -> Self {
303 self.retention_ms = ms;
304 self
305 }
306
307 pub fn with_min_isr(mut self, min_isr: u16) -> Self {
308 self.min_isr = min_isr;
309 self
310 }
311}
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct TopicState {
316 pub config: TopicConfig,
318
319 pub partitions: Vec<PartitionState>,
321}
322
323impl TopicState {
324 pub fn new(config: TopicConfig, partition_assignments: Vec<Vec<NodeId>>) -> Self {
325 let partitions = partition_assignments
326 .into_iter()
327 .enumerate()
328 .map(|(i, replicas)| {
329 let mut state =
330 PartitionState::new(PartitionId::new(&config.name, i as u32), replicas);
331 state.elect_leader();
333 state
334 })
335 .collect();
336
337 Self { config, partitions }
338 }
339
340 pub fn partition(&self, idx: u32) -> Option<&PartitionState> {
342 self.partitions.get(idx as usize)
343 }
344
345 pub fn partition_mut(&mut self, idx: u32) -> Option<&mut PartitionState> {
347 self.partitions.get_mut(idx as usize)
348 }
349
350 pub fn is_fully_online(&self) -> bool {
352 self.partitions.iter().all(|p| p.online)
353 }
354
355 pub fn is_under_replicated(&self) -> bool {
357 self.partitions.iter().any(|p| p.under_replicated)
358 }
359
360 pub fn offline_partitions(&self) -> Vec<u32> {
362 self.partitions
363 .iter()
364 .enumerate()
365 .filter(|(_, p)| !p.online)
366 .map(|(i, _)| i as u32)
367 .collect()
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 #[test]
376 fn test_partition_leader_election() {
377 let id = PartitionId::new("test-topic", 0);
378 let replicas = vec![
379 "node-1".to_string(),
380 "node-2".to_string(),
381 "node-3".to_string(),
382 ];
383 let mut partition = PartitionState::new(id, replicas);
384
385 assert!(partition.leader.is_none());
387 assert!(!partition.online);
388
389 let leader = partition.elect_leader();
391 assert!(leader.is_some());
392 assert_eq!(leader.unwrap(), "node-1"); assert!(partition.online);
394 assert_eq!(partition.leader_epoch, 1);
395 }
396
397 #[test]
398 fn test_isr_management() {
399 let id = PartitionId::new("test-topic", 0);
400 let replicas = vec![
401 "node-1".to_string(),
402 "node-2".to_string(),
403 "node-3".to_string(),
404 ];
405 let mut partition = PartitionState::new(id, replicas);
406 partition.elect_leader();
407
408 assert_eq!(partition.isr.len(), 3);
410 assert!(!partition.under_replicated);
411
412 partition.remove_from_isr(&"node-2".to_string());
414 assert_eq!(partition.isr.len(), 2);
415 assert!(partition.under_replicated);
416
417 partition.add_to_isr(&"node-2".to_string());
419 assert_eq!(partition.isr.len(), 3);
420 assert!(!partition.under_replicated);
421 }
422
423 #[test]
424 fn test_high_watermark_advancement() {
425 let id = PartitionId::new("test-topic", 0);
426 let replicas = vec!["node-1".to_string(), "node-2".to_string()];
427 let mut partition = PartitionState::new(id, replicas);
428 partition.elect_leader();
429
430 partition.update_replica_offset(&"node-1".to_string(), 100);
432 partition.update_replica_offset(&"node-2".to_string(), 80);
433
434 partition.advance_high_watermark();
436 assert_eq!(partition.high_watermark, 80);
437
438 partition.update_replica_offset(&"node-2".to_string(), 100);
440 partition.advance_high_watermark();
441 assert_eq!(partition.high_watermark, 100);
442 }
443}