use crate::protocol::{PartitionId, TopicName};
use crate::Result;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::info;
#[derive(Debug, Clone)]
pub struct TopicConfig {
pub num_partitions: u32,
pub replication_factor: u32,
pub segment_size: u64,
pub retention_ms: Option<u64>,
}
impl Default for TopicConfig {
fn default() -> Self {
Self {
num_partitions: 3, replication_factor: 1, segment_size: 1024 * 1024 * 1024, retention_ms: None, }
}
}
#[derive(Debug)]
pub struct TopicManager {
topics: Arc<RwLock<HashMap<TopicName, TopicMetadata>>>,
}
#[derive(Debug, Clone)]
pub struct TopicMetadata {
pub name: TopicName,
pub num_partitions: u32,
pub config: TopicConfig,
pub partitions: Vec<PartitionInfo>,
}
#[derive(Debug, Clone)]
pub struct PartitionInfo {
pub id: PartitionId,
pub leader: Option<u32>, pub replicas: Vec<u32>,
pub in_sync_replicas: Vec<u32>,
}
impl TopicManager {
pub fn new() -> Self {
Self {
topics: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn create_topic(&self, topic_name: &str, config: TopicConfig) -> Result<()> {
let mut topics = self.topics.write();
if topics.contains_key(topic_name) {
return Ok(()); }
let partitions: Vec<PartitionInfo> = (0..config.num_partitions)
.map(|partition_id| PartitionInfo {
id: partition_id,
leader: Some(0), replicas: vec![0],
in_sync_replicas: vec![0],
})
.collect();
let metadata = TopicMetadata {
name: topic_name.to_string(),
num_partitions: config.num_partitions,
config: config.clone(),
partitions,
};
topics.insert(topic_name.to_string(), metadata);
info!(
"Created topic '{}' with {} partitions",
topic_name, config.num_partitions
);
Ok(())
}
pub fn get_topic(&self, topic_name: &str) -> Option<TopicMetadata> {
let topics = self.topics.read();
topics.get(topic_name).cloned()
}
pub fn list_topics(&self) -> Vec<TopicName> {
let topics = self.topics.read();
topics.keys().cloned().collect()
}
pub fn get_partition_for_key(
&self,
topic_name: &str,
key: Option<&[u8]>,
) -> Option<PartitionId> {
let topics = self.topics.read();
let topic = topics.get(topic_name)?;
let partition = match key {
Some(key_bytes) => {
let hash = self.hash_key(key_bytes);
hash % topic.num_partitions
}
None => {
0
}
};
Some(partition)
}
pub fn get_partition_round_robin(&self, topic_name: &str, counter: u64) -> Option<PartitionId> {
let topics = self.topics.read();
let topic = topics.get(topic_name)?;
Some((counter % topic.num_partitions as u64) as PartitionId)
}
fn hash_key(&self, key: &[u8]) -> u32 {
let mut hash: u32 = 2166136261;
for byte in key {
hash ^= *byte as u32;
hash = hash.wrapping_mul(16777619);
}
hash
}
pub fn ensure_topic_exists(&self, topic_name: &str) -> Result<TopicMetadata> {
if let Some(topic) = self.get_topic(topic_name) {
return Ok(topic);
}
let default_config = TopicConfig::default();
self.create_topic(topic_name, default_config)?;
self.get_topic(topic_name)
.ok_or_else(|| crate::FluxmqError::Config("Failed to create topic".to_string()))
}
pub fn get_partitions(&self, topic_name: &str) -> Vec<PartitionId> {
let topics = self.topics.read();
match topics.get(topic_name) {
Some(topic) => topic.partitions.iter().map(|p| p.id).collect(),
None => vec![],
}
}
pub fn partition_exists(&self, topic_name: &str, partition_id: PartitionId) -> bool {
let topics = self.topics.read();
match topics.get(topic_name) {
Some(topic) => partition_id < topic.num_partitions,
None => false,
}
}
pub fn delete_topic(&self, topic_name: &str) -> crate::Result<()> {
let mut topics = self.topics.write();
match topics.remove(topic_name) {
Some(_) => {
info!("Successfully deleted topic '{}'", topic_name);
Ok(())
}
None => Err(crate::FluxmqError::Config(format!(
"Topic '{}' does not exist",
topic_name
))),
}
}
pub fn get_topic_stats(&self, topic_name: &str) -> Option<u32> {
let topics = self.topics.read();
topics.get(topic_name).map(|topic| topic.num_partitions)
}
}
pub enum PartitionStrategy {
KeyHash,
RoundRobin,
Manual(PartitionId),
}
pub struct PartitionAssigner {
manager: Arc<TopicManager>,
round_robin_counters: Arc<RwLock<HashMap<TopicName, u64>>>,
}
impl PartitionAssigner {
pub fn new(manager: Arc<TopicManager>) -> Self {
Self {
manager,
round_robin_counters: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn assign_partition(
&self,
topic_name: &str,
key: Option<&[u8]>,
strategy: PartitionStrategy,
) -> Result<PartitionId> {
let _topic = self.manager.ensure_topic_exists(topic_name)?;
let partition = match strategy {
PartitionStrategy::KeyHash => self
.manager
.get_partition_for_key(topic_name, key)
.unwrap_or(0),
PartitionStrategy::RoundRobin => {
let mut counters = self.round_robin_counters.write();
let counter = counters.entry(topic_name.to_string()).or_insert(0);
*counter += 1;
self.manager
.get_partition_round_robin(topic_name, *counter)
.unwrap_or(0)
}
PartitionStrategy::Manual(partition_id) => {
if self.manager.partition_exists(topic_name, partition_id) {
partition_id
} else {
return Err(crate::FluxmqError::Config(format!(
"Partition {} does not exist for topic {}",
partition_id, topic_name
)));
}
}
};
Ok(partition)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_topic_creation() {
let manager = TopicManager::new();
let config = TopicConfig {
num_partitions: 5,
..Default::default()
};
manager.create_topic("test-topic", config).unwrap();
let topic = manager.get_topic("test-topic").unwrap();
assert_eq!(topic.name, "test-topic");
assert_eq!(topic.num_partitions, 5);
assert_eq!(topic.partitions.len(), 5);
}
#[test]
fn test_partition_assignment_key_hash() {
let manager = TopicManager::new();
let config = TopicConfig {
num_partitions: 3,
..Default::default()
};
manager.create_topic("test-topic", config).unwrap();
let partition1 = manager
.get_partition_for_key("test-topic", Some(b"key1"))
.unwrap();
let partition2 = manager
.get_partition_for_key("test-topic", Some(b"key1"))
.unwrap();
assert_eq!(partition1, partition2);
let partition3 = manager
.get_partition_for_key("test-topic", Some(b"key2"))
.unwrap();
assert!(partition1 < 3);
assert!(partition3 < 3);
}
#[test]
fn test_partition_assigner() {
let manager = Arc::new(TopicManager::new());
let assigner = PartitionAssigner::new(manager);
let partition = assigner
.assign_partition("auto-topic", Some(b"key1"), PartitionStrategy::KeyHash)
.unwrap();
assert!(partition < 3);
let p1 = assigner
.assign_partition("rr-topic", None, PartitionStrategy::RoundRobin)
.unwrap();
let p2 = assigner
.assign_partition("rr-topic", None, PartitionStrategy::RoundRobin)
.unwrap();
assert_ne!(p1, p2);
}
#[test]
fn test_hash_consistency() {
let manager = TopicManager::new();
let hash1 = manager.hash_key(b"test-key");
let hash2 = manager.hash_key(b"test-key");
assert_eq!(hash1, hash2);
let hash3 = manager.hash_key(b"different-key");
assert_ne!(hash1, hash3);
}
}