mockforge_kafka/
topics.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3
4use crate::fixtures::KafkaFixture;
5use crate::partitions::Partition;
6
7/// Represents a Kafka topic
8#[derive(Debug)]
9pub struct Topic {
10    pub name: String,
11    pub partitions: Vec<Partition>,
12    pub config: TopicConfig,
13    pub fixtures: Vec<Arc<KafkaFixture>>,
14    round_robin_counter: AtomicUsize,
15}
16
17#[derive(Debug, Clone)]
18pub struct TopicConfig {
19    pub num_partitions: i32,
20    pub replication_factor: i16,
21    pub retention_ms: i64,
22    pub max_message_bytes: i32,
23}
24
25impl Default for TopicConfig {
26    fn default() -> Self {
27        Self {
28            num_partitions: 3,
29            replication_factor: 1,
30            retention_ms: 604800000,    // 7 days
31            max_message_bytes: 1048576, // 1MB
32        }
33    }
34}
35
36impl Topic {
37    /// Create a new topic
38    pub fn new(name: String, config: TopicConfig) -> Self {
39        let partitions = (0..config.num_partitions).map(Partition::new).collect();
40
41        Self {
42            name,
43            partitions,
44            config,
45            fixtures: vec![],
46            round_robin_counter: AtomicUsize::new(0),
47        }
48    }
49
50    /// Assign partition for a message based on key
51    pub fn assign_partition(&mut self, key: Option<&[u8]>) -> i32 {
52        match key {
53            Some(key_bytes) => {
54                // Use murmur hash for partition assignment
55                use std::collections::hash_map::DefaultHasher;
56                use std::hash::{Hash, Hasher};
57                let mut hasher = DefaultHasher::new();
58                key_bytes.hash(&mut hasher);
59                let hash = hasher.finish();
60                (hash % self.config.num_partitions as u64) as i32
61            }
62            None => {
63                // Round-robin for messages without keys
64                let partition = self.round_robin_counter.fetch_add(1, Ordering::Relaxed)
65                    % self.config.num_partitions as usize;
66                partition as i32
67            }
68        }
69    }
70
71    /// Produce a record to the appropriate partition
72    pub async fn produce(
73        &mut self,
74        partition: i32,
75        record: crate::partitions::KafkaMessage,
76    ) -> mockforge_core::Result<i64> {
77        if let Some(partition) = self.partitions.get_mut(partition as usize) {
78            Ok(partition.append(record))
79        } else {
80            Err(mockforge_core::Error::generic(format!(
81                "Partition {} does not exist",
82                partition
83            )))
84        }
85    }
86
87    /// Get partition by ID
88    pub fn get_partition(&self, id: i32) -> Option<&Partition> {
89        self.partitions.get(id as usize)
90    }
91
92    /// Get mutable partition by ID
93    pub fn get_partition_mut(&mut self, id: i32) -> Option<&mut Partition> {
94        self.partitions.get_mut(id as usize)
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101
102    #[test]
103    fn test_topic_config_default() {
104        let config = TopicConfig::default();
105        assert_eq!(config.num_partitions, 3);
106        assert_eq!(config.replication_factor, 1);
107        assert_eq!(config.retention_ms, 604800000);
108        assert_eq!(config.max_message_bytes, 1048576);
109    }
110
111    #[test]
112    fn test_topic_config_clone() {
113        let config = TopicConfig {
114            num_partitions: 5,
115            replication_factor: 3,
116            retention_ms: 86400000,
117            max_message_bytes: 2097152,
118        };
119        let cloned = config.clone();
120        assert_eq!(config.num_partitions, cloned.num_partitions);
121        assert_eq!(config.replication_factor, cloned.replication_factor);
122    }
123
124    #[test]
125    fn test_topic_new() {
126        let config = TopicConfig::default();
127        let topic = Topic::new("test-topic".to_string(), config);
128
129        assert_eq!(topic.name, "test-topic");
130        assert_eq!(topic.partitions.len(), 3);
131        assert!(topic.fixtures.is_empty());
132    }
133
134    #[test]
135    fn test_topic_new_custom_partitions() {
136        let config = TopicConfig {
137            num_partitions: 10,
138            ..Default::default()
139        };
140        let topic = Topic::new("test".to_string(), config);
141        assert_eq!(topic.partitions.len(), 10);
142    }
143
144    #[test]
145    fn test_topic_assign_partition_with_key() {
146        let config = TopicConfig {
147            num_partitions: 5,
148            ..Default::default()
149        };
150        let mut topic = Topic::new("test".to_string(), config);
151
152        // Same key should always get the same partition
153        let key = b"user-123";
154        let partition1 = topic.assign_partition(Some(key));
155        let partition2 = topic.assign_partition(Some(key));
156        assert_eq!(partition1, partition2);
157
158        // Partition should be in valid range
159        assert!(partition1 >= 0 && partition1 < 5);
160    }
161
162    #[test]
163    fn test_topic_assign_partition_without_key() {
164        let config = TopicConfig {
165            num_partitions: 3,
166            ..Default::default()
167        };
168        let mut topic = Topic::new("test".to_string(), config);
169
170        // Without key, should round-robin
171        let p1 = topic.assign_partition(None);
172        let p2 = topic.assign_partition(None);
173        let p3 = topic.assign_partition(None);
174        let p4 = topic.assign_partition(None);
175
176        // All should be in valid range
177        assert!(p1 >= 0 && p1 < 3);
178        assert!(p2 >= 0 && p2 < 3);
179        assert!(p3 >= 0 && p3 < 3);
180        assert!(p4 >= 0 && p4 < 3);
181
182        // Should cycle through partitions
183        assert_eq!(p1, 0);
184        assert_eq!(p2, 1);
185        assert_eq!(p3, 2);
186        assert_eq!(p4, 0); // wraps around
187    }
188
189    #[test]
190    fn test_topic_get_partition() {
191        let config = TopicConfig::default();
192        let topic = Topic::new("test".to_string(), config);
193
194        assert!(topic.get_partition(0).is_some());
195        assert!(topic.get_partition(1).is_some());
196        assert!(topic.get_partition(2).is_some());
197        assert!(topic.get_partition(3).is_none());
198    }
199
200    #[test]
201    fn test_topic_get_partition_mut() {
202        let config = TopicConfig::default();
203        let mut topic = Topic::new("test".to_string(), config);
204
205        assert!(topic.get_partition_mut(0).is_some());
206        assert!(topic.get_partition_mut(100).is_none());
207    }
208
209    #[test]
210    fn test_different_keys_may_get_different_partitions() {
211        let config = TopicConfig {
212            num_partitions: 10,
213            ..Default::default()
214        };
215        let mut topic = Topic::new("test".to_string(), config);
216
217        // Different keys should potentially get different partitions
218        // (though they could happen to hash to the same one)
219        let partitions: Vec<i32> = (0..100)
220            .map(|i| {
221                let key = format!("key-{}", i);
222                topic.assign_partition(Some(key.as_bytes()))
223            })
224            .collect();
225
226        // Should have some variety (not all same partition)
227        let unique_partitions: std::collections::HashSet<_> = partitions.iter().collect();
228        assert!(unique_partitions.len() > 1);
229    }
230
231    #[test]
232    fn test_topic_debug() {
233        let config = TopicConfig::default();
234        let topic = Topic::new("debug-test".to_string(), config);
235        let debug = format!("{:?}", topic);
236        assert!(debug.contains("Topic"));
237        assert!(debug.contains("debug-test"));
238    }
239
240    #[test]
241    fn test_topic_config_debug() {
242        let config = TopicConfig::default();
243        let debug = format!("{:?}", config);
244        assert!(debug.contains("TopicConfig"));
245        assert!(debug.contains("num_partitions"));
246    }
247}