use crate::node::NodeId;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct PartitionId {
pub topic: String,
pub partition: u32,
}
impl PartitionId {
pub fn new(topic: impl Into<String>, partition: u32) -> Self {
Self {
topic: topic.into(),
partition,
}
}
}
impl std::fmt::Display for PartitionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.topic, self.partition)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ReplicaState {
InSync,
CatchingUp,
Offline,
Adding,
Removing,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicaInfo {
pub node_id: NodeId,
pub state: ReplicaState,
pub log_end_offset: u64,
pub high_watermark: u64,
pub lag: u64,
}
impl ReplicaInfo {
pub fn new(node_id: NodeId) -> Self {
Self {
node_id,
state: ReplicaState::Adding,
log_end_offset: 0,
high_watermark: 0,
lag: 0,
}
}
pub fn is_in_sync(&self) -> bool {
matches!(self.state, ReplicaState::InSync)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionState {
pub id: PartitionId,
pub leader: Option<NodeId>,
pub preferred_leader: NodeId,
pub replicas: Vec<ReplicaInfo>,
pub isr: HashSet<NodeId>,
pub leader_epoch: u64,
pub high_watermark: u64,
pub log_start_offset: u64,
pub online: bool,
pub under_replicated: bool,
}
impl PartitionState {
pub fn new(id: PartitionId, replicas: Vec<NodeId>) -> Self {
let preferred_leader = replicas.first().cloned().unwrap_or_default();
let replica_infos: Vec<_> = replicas
.iter()
.map(|n| ReplicaInfo::new(n.clone()))
.collect();
let isr: HashSet<_> = replicas.into_iter().collect();
let under_replicated = isr.len() < replica_infos.len();
Self {
id,
leader: None,
preferred_leader,
replicas: replica_infos,
isr,
leader_epoch: 0,
high_watermark: 0,
log_start_offset: 0,
online: false,
under_replicated,
}
}
pub fn elect_leader(&mut self) -> Option<&NodeId> {
if self.isr.contains(&self.preferred_leader) {
self.leader = Some(self.preferred_leader.clone());
} else {
let mut sorted_isr: Vec<_> = self.isr.iter().collect();
sorted_isr.sort();
self.leader = sorted_isr.first().map(|n| (*n).clone());
}
if self.leader.is_some() {
self.leader_epoch += 1;
self.online = true;
}
self.leader.as_ref()
}
pub fn add_to_isr(&mut self, node_id: &NodeId) {
self.isr.insert(node_id.clone());
self.update_under_replicated();
if let Some(replica) = self.replicas.iter_mut().find(|r| &r.node_id == node_id) {
replica.state = ReplicaState::InSync;
}
}
pub fn remove_from_isr(&mut self, node_id: &NodeId) {
self.isr.remove(node_id);
self.update_under_replicated();
if let Some(replica) = self.replicas.iter_mut().find(|r| &r.node_id == node_id) {
replica.state = ReplicaState::CatchingUp;
}
if self.leader.as_ref() == Some(node_id) {
self.leader = None;
self.online = false;
}
}
pub fn update_replica_offset(&mut self, node_id: &NodeId, log_end_offset: u64) {
let leader_leo = self.leader.as_ref().and_then(|leader| {
self.replicas
.iter()
.find(|r| &r.node_id == leader)
.map(|r| r.log_end_offset)
});
if let Some(replica) = self.replicas.iter_mut().find(|r| &r.node_id == node_id) {
replica.log_end_offset = log_end_offset;
if let Some(leo) = leader_leo {
replica.lag = leo.saturating_sub(log_end_offset);
}
}
}
pub fn advance_high_watermark(&mut self) {
let min_leo = self
.replicas
.iter()
.filter(|r| self.isr.contains(&r.node_id))
.map(|r| r.log_end_offset)
.min()
.unwrap_or(self.high_watermark);
if min_leo > self.high_watermark {
self.high_watermark = min_leo;
for replica in &mut self.replicas {
replica.high_watermark = self.high_watermark;
}
}
}
fn update_under_replicated(&mut self) {
let expected = self.replicas.len();
let in_sync = self.isr.len();
self.under_replicated = in_sync < expected;
}
pub fn replica_nodes(&self) -> Vec<&NodeId> {
self.replicas.iter().map(|r| &r.node_id).collect()
}
pub fn isr_nodes(&self) -> Vec<&NodeId> {
let mut nodes: Vec<_> = self.isr.iter().collect();
nodes.sort();
nodes
}
pub fn is_leader(&self, node_id: &NodeId) -> bool {
self.leader.as_ref() == Some(node_id)
}
pub fn is_replica(&self, node_id: &NodeId) -> bool {
self.replicas.iter().any(|r| &r.node_id == node_id)
}
pub fn replication_factor(&self) -> usize {
self.replicas.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TopicConfig {
pub name: String,
pub partitions: u32,
pub replication_factor: u16,
pub retention_ms: u64,
pub segment_bytes: u64,
pub min_isr: u16,
pub config: std::collections::HashMap<String, String>,
}
impl TopicConfig {
pub fn new(name: impl Into<String>, partitions: u32, replication_factor: u16) -> Self {
Self {
name: name.into(),
partitions,
replication_factor,
retention_ms: 7 * 24 * 60 * 60 * 1000, segment_bytes: 1024 * 1024 * 1024, min_isr: 1,
config: std::collections::HashMap::new(),
}
}
pub fn with_retention_ms(mut self, ms: u64) -> Self {
self.retention_ms = ms;
self
}
pub fn with_min_isr(mut self, min_isr: u16) -> Self {
self.min_isr = min_isr;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicState {
pub config: TopicConfig,
pub partitions: Vec<PartitionState>,
}
impl TopicState {
pub fn new(config: TopicConfig, partition_assignments: Vec<Vec<NodeId>>) -> Self {
let partitions = partition_assignments
.into_iter()
.enumerate()
.map(|(i, replicas)| {
let mut state =
PartitionState::new(PartitionId::new(&config.name, i as u32), replicas);
state.elect_leader();
state
})
.collect();
Self { config, partitions }
}
pub fn partition(&self, idx: u32) -> Option<&PartitionState> {
self.partitions.get(idx as usize)
}
pub fn partition_mut(&mut self, idx: u32) -> Option<&mut PartitionState> {
self.partitions.get_mut(idx as usize)
}
pub fn is_fully_online(&self) -> bool {
self.partitions.iter().all(|p| p.online)
}
pub fn is_under_replicated(&self) -> bool {
self.partitions.iter().any(|p| p.under_replicated)
}
pub fn offline_partitions(&self) -> Vec<u32> {
self.partitions
.iter()
.enumerate()
.filter(|(_, p)| !p.online)
.map(|(i, _)| i as u32)
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_partition_leader_election() {
let id = PartitionId::new("test-topic", 0);
let replicas = vec![
"node-1".to_string(),
"node-2".to_string(),
"node-3".to_string(),
];
let mut partition = PartitionState::new(id, replicas);
assert!(partition.leader.is_none());
assert!(!partition.online);
let leader = partition.elect_leader();
assert!(leader.is_some());
assert_eq!(leader.unwrap(), "node-1"); assert!(partition.online);
assert_eq!(partition.leader_epoch, 1);
}
#[test]
fn test_isr_management() {
let id = PartitionId::new("test-topic", 0);
let replicas = vec![
"node-1".to_string(),
"node-2".to_string(),
"node-3".to_string(),
];
let mut partition = PartitionState::new(id, replicas);
partition.elect_leader();
assert_eq!(partition.isr.len(), 3);
assert!(!partition.under_replicated);
partition.remove_from_isr(&"node-2".to_string());
assert_eq!(partition.isr.len(), 2);
assert!(partition.under_replicated);
partition.add_to_isr(&"node-2".to_string());
assert_eq!(partition.isr.len(), 3);
assert!(!partition.under_replicated);
}
#[test]
fn test_high_watermark_advancement() {
let id = PartitionId::new("test-topic", 0);
let replicas = vec!["node-1".to_string(), "node-2".to_string()];
let mut partition = PartitionState::new(id, replicas);
partition.elect_leader();
partition.update_replica_offset(&"node-1".to_string(), 100);
partition.update_replica_offset(&"node-2".to_string(), 80);
partition.advance_high_watermark();
assert_eq!(partition.high_watermark, 80);
partition.update_replica_offset(&"node-2".to_string(), 100);
partition.advance_high_watermark();
assert_eq!(partition.high_watermark, 100);
}
}