pub mod follower;
pub mod leader;
#[cfg(test)]
mod tests;
use crate::protocol::{Message, Offset, PartitionId, TopicName};
use crate::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::info;
pub use follower::{FollowerState, FollowerSync};
pub use leader::{LeaderState, ReplicationManager};
pub type BrokerId = u32;
#[derive(Debug, Clone, PartialEq)]
pub enum ReplicationRole {
Leader,
Follower { leader_id: BrokerId },
Offline,
}
#[derive(Debug, Clone)]
pub struct PartitionReplicaInfo {
pub topic: TopicName,
pub partition: PartitionId,
pub role: ReplicationRole,
pub high_watermark: Offset,
pub log_end_offset: Offset,
pub replicas: Vec<BrokerId>,
pub in_sync_replicas: Vec<BrokerId>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplicationMessage {
ReplicateRequest {
topic: TopicName,
partition: PartitionId,
leader_epoch: u64,
prev_log_offset: Offset,
entries: Vec<LogEntry>,
leader_commit: Offset,
},
ReplicateResponse {
topic: TopicName,
partition: PartitionId,
success: bool,
last_log_offset: Offset,
follower_id: BrokerId,
},
Heartbeat {
leader_id: BrokerId,
term: u64,
commit_index: Offset,
},
HeartbeatResponse {
follower_id: BrokerId,
term: u64,
success: bool,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub offset: Offset,
pub term: u64,
pub message: Message,
pub timestamp: u64,
}
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
pub min_isr: u32,
pub replication_factor: u32,
pub heartbeat_interval_ms: u64,
pub replication_timeout_ms: u64,
pub max_batch_size: usize,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
min_isr: 1,
replication_factor: 3,
heartbeat_interval_ms: 1000,
replication_timeout_ms: 5000,
max_batch_size: 1000,
}
}
}
#[derive(Debug)]
pub struct ReplicationCoordinator {
broker_id: BrokerId,
config: ReplicationConfig,
partition_states: Arc<RwLock<HashMap<(TopicName, PartitionId), PartitionReplicaInfo>>>,
leaders: Arc<RwLock<HashMap<(TopicName, PartitionId), Arc<LeaderState>>>>,
followers: Arc<RwLock<HashMap<(TopicName, PartitionId), Arc<FollowerState>>>>,
}
impl ReplicationCoordinator {
pub fn new(broker_id: BrokerId, config: ReplicationConfig) -> Self {
Self {
broker_id,
config,
partition_states: Arc::new(RwLock::new(HashMap::new())),
leaders: Arc::new(RwLock::new(HashMap::new())),
followers: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn become_leader(
&self,
topic: &str,
partition: PartitionId,
replicas: Vec<BrokerId>,
) -> Result<()> {
let key = (topic.to_string(), partition);
let replica_info = PartitionReplicaInfo {
topic: topic.to_string(),
partition,
role: ReplicationRole::Leader,
high_watermark: 0,
log_end_offset: 0,
replicas: replicas.clone(),
in_sync_replicas: vec![self.broker_id],
};
{
let mut states = self.partition_states.write().await;
states.insert(key.clone(), replica_info);
}
let leader_state = Arc::new(LeaderState::new(
self.broker_id,
topic.to_string(),
partition,
replicas,
self.config.clone(),
));
{
let mut leaders = self.leaders.write().await;
leaders.insert(key, leader_state);
}
info!("Became leader for {}:{}", topic, partition);
Ok(())
}
pub async fn become_follower(
&self,
topic: &str,
partition: PartitionId,
leader_id: BrokerId,
) -> Result<()> {
let key = (topic.to_string(), partition);
let replica_info = PartitionReplicaInfo {
topic: topic.to_string(),
partition,
role: ReplicationRole::Follower { leader_id },
high_watermark: 0,
log_end_offset: 0,
replicas: vec![leader_id, self.broker_id],
in_sync_replicas: vec![leader_id],
};
{
let mut states = self.partition_states.write().await;
states.insert(key.clone(), replica_info);
}
let follower_state = Arc::new(FollowerState::new(
self.broker_id,
topic.to_string(),
partition,
leader_id,
self.config.clone(),
));
{
let mut followers = self.followers.write().await;
followers.insert(key, follower_state);
}
info!(
"Became follower for {}:{} with leader {}",
topic, partition, leader_id
);
Ok(())
}
pub async fn get_partition_state(
&self,
topic: &str,
partition: PartitionId,
) -> Option<PartitionReplicaInfo> {
let states = self.partition_states.read().await;
states.get(&(topic.to_string(), partition)).cloned()
}
pub async fn is_leader(&self, topic: &str, partition: PartitionId) -> bool {
if let Some(state) = self.get_partition_state(topic, partition).await {
matches!(state.role, ReplicationRole::Leader)
} else {
false
}
}
pub async fn get_leader(&self, topic: &str, partition: PartitionId) -> Option<BrokerId> {
if let Some(state) = self.get_partition_state(topic, partition).await {
match state.role {
ReplicationRole::Leader => Some(self.broker_id),
ReplicationRole::Follower { leader_id } => Some(leader_id),
ReplicationRole::Offline => None,
}
} else {
None
}
}
}