mockforge_kafka/
topics.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3
4use crate::fixtures::KafkaFixture;
5use crate::partitions::Partition;
6
7/// Represents a Kafka topic
8#[derive(Debug)]
9pub struct Topic {
10    pub name: String,
11    pub partitions: Vec<Partition>,
12    pub config: TopicConfig,
13    pub fixtures: Vec<Arc<KafkaFixture>>,
14    round_robin_counter: AtomicUsize,
15}
16
17#[derive(Debug, Clone)]
18pub struct TopicConfig {
19    pub num_partitions: i32,
20    pub replication_factor: i16,
21    pub retention_ms: i64,
22    pub max_message_bytes: i32,
23}
24
25impl Default for TopicConfig {
26    fn default() -> Self {
27        Self {
28            num_partitions: 3,
29            replication_factor: 1,
30            retention_ms: 604800000,    // 7 days
31            max_message_bytes: 1048576, // 1MB
32        }
33    }
34}
35
36impl Topic {
37    /// Create a new topic
38    pub fn new(name: String, config: TopicConfig) -> Self {
39        let partitions = (0..config.num_partitions).map(Partition::new).collect();
40
41        Self {
42            name,
43            partitions,
44            config,
45            fixtures: vec![],
46            round_robin_counter: AtomicUsize::new(0),
47        }
48    }
49
50    /// Assign partition for a message based on key
51    pub fn assign_partition(&mut self, key: Option<&[u8]>) -> i32 {
52        match key {
53            Some(key_bytes) => {
54                // Use murmur hash for partition assignment
55                use std::collections::hash_map::DefaultHasher;
56                use std::hash::{Hash, Hasher};
57                let mut hasher = DefaultHasher::new();
58                key_bytes.hash(&mut hasher);
59                let hash = hasher.finish();
60                (hash % self.config.num_partitions as u64) as i32
61            }
62            None => {
63                // Round-robin for messages without keys
64                let partition = self.round_robin_counter.fetch_add(1, Ordering::Relaxed)
65                    % self.config.num_partitions as usize;
66                partition as i32
67            }
68        }
69    }
70
71    /// Produce a record to the appropriate partition
72    pub async fn produce(
73        &mut self,
74        partition: i32,
75        record: crate::partitions::KafkaMessage,
76    ) -> mockforge_core::Result<i64> {
77        if let Some(partition) = self.partitions.get_mut(partition as usize) {
78            Ok(partition.append(record))
79        } else {
80            Err(mockforge_core::Error::generic(format!(
81                "Partition {} does not exist",
82                partition
83            )))
84        }
85    }
86
87    /// Get partition by ID
88    pub fn get_partition(&self, id: i32) -> Option<&Partition> {
89        self.partitions.get(id as usize)
90    }
91
92    /// Get mutable partition by ID
93    pub fn get_partition_mut(&mut self, id: i32) -> Option<&mut Partition> {
94        self.partitions.get_mut(id as usize)
95    }
96}