mockforge_kafka/
topics.rs1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::Arc;
3
4use crate::fixtures::KafkaFixture;
5use crate::partitions::Partition;
6
7#[derive(Debug)]
9pub struct Topic {
10 pub name: String,
11 pub partitions: Vec<Partition>,
12 pub config: TopicConfig,
13 pub fixtures: Vec<Arc<KafkaFixture>>,
14 round_robin_counter: AtomicUsize,
15}
16
17#[derive(Debug, Clone)]
18pub struct TopicConfig {
19 pub num_partitions: i32,
20 pub replication_factor: i16,
21 pub retention_ms: i64,
22 pub max_message_bytes: i32,
23}
24
25impl Default for TopicConfig {
26 fn default() -> Self {
27 Self {
28 num_partitions: 3,
29 replication_factor: 1,
30 retention_ms: 604800000, max_message_bytes: 1048576, }
33 }
34}
35
36impl Topic {
37 pub fn new(name: String, config: TopicConfig) -> Self {
39 let partitions = (0..config.num_partitions).map(Partition::new).collect();
40
41 Self {
42 name,
43 partitions,
44 config,
45 fixtures: vec![],
46 round_robin_counter: AtomicUsize::new(0),
47 }
48 }
49
50 pub fn assign_partition(&mut self, key: Option<&[u8]>) -> i32 {
52 match key {
53 Some(key_bytes) => {
54 use std::collections::hash_map::DefaultHasher;
56 use std::hash::{Hash, Hasher};
57 let mut hasher = DefaultHasher::new();
58 key_bytes.hash(&mut hasher);
59 let hash = hasher.finish();
60 (hash % self.config.num_partitions as u64) as i32
61 }
62 None => {
63 let partition = self.round_robin_counter.fetch_add(1, Ordering::Relaxed)
65 % self.config.num_partitions as usize;
66 partition as i32
67 }
68 }
69 }
70
71 pub async fn produce(
73 &mut self,
74 partition: i32,
75 record: crate::partitions::KafkaMessage,
76 ) -> mockforge_core::Result<i64> {
77 if let Some(partition) = self.partitions.get_mut(partition as usize) {
78 Ok(partition.append(record))
79 } else {
80 Err(mockforge_core::Error::generic(format!(
81 "Partition {} does not exist",
82 partition
83 )))
84 }
85 }
86
87 pub fn get_partition(&self, id: i32) -> Option<&Partition> {
89 self.partitions.get(id as usize)
90 }
91
92 pub fn get_partition_mut(&mut self, id: i32) -> Option<&mut Partition> {
94 self.partitions.get_mut(id as usize)
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101
102 #[test]
103 fn test_topic_config_default() {
104 let config = TopicConfig::default();
105 assert_eq!(config.num_partitions, 3);
106 assert_eq!(config.replication_factor, 1);
107 assert_eq!(config.retention_ms, 604800000);
108 assert_eq!(config.max_message_bytes, 1048576);
109 }
110
111 #[test]
112 fn test_topic_config_clone() {
113 let config = TopicConfig {
114 num_partitions: 5,
115 replication_factor: 3,
116 retention_ms: 86400000,
117 max_message_bytes: 2097152,
118 };
119 let cloned = config.clone();
120 assert_eq!(config.num_partitions, cloned.num_partitions);
121 assert_eq!(config.replication_factor, cloned.replication_factor);
122 }
123
124 #[test]
125 fn test_topic_new() {
126 let config = TopicConfig::default();
127 let topic = Topic::new("test-topic".to_string(), config);
128
129 assert_eq!(topic.name, "test-topic");
130 assert_eq!(topic.partitions.len(), 3);
131 assert!(topic.fixtures.is_empty());
132 }
133
134 #[test]
135 fn test_topic_new_custom_partitions() {
136 let config = TopicConfig {
137 num_partitions: 10,
138 ..Default::default()
139 };
140 let topic = Topic::new("test".to_string(), config);
141 assert_eq!(topic.partitions.len(), 10);
142 }
143
144 #[test]
145 fn test_topic_assign_partition_with_key() {
146 let config = TopicConfig {
147 num_partitions: 5,
148 ..Default::default()
149 };
150 let mut topic = Topic::new("test".to_string(), config);
151
152 let key = b"user-123";
154 let partition1 = topic.assign_partition(Some(key));
155 let partition2 = topic.assign_partition(Some(key));
156 assert_eq!(partition1, partition2);
157
158 assert!(partition1 >= 0 && partition1 < 5);
160 }
161
162 #[test]
163 fn test_topic_assign_partition_without_key() {
164 let config = TopicConfig {
165 num_partitions: 3,
166 ..Default::default()
167 };
168 let mut topic = Topic::new("test".to_string(), config);
169
170 let p1 = topic.assign_partition(None);
172 let p2 = topic.assign_partition(None);
173 let p3 = topic.assign_partition(None);
174 let p4 = topic.assign_partition(None);
175
176 assert!(p1 >= 0 && p1 < 3);
178 assert!(p2 >= 0 && p2 < 3);
179 assert!(p3 >= 0 && p3 < 3);
180 assert!(p4 >= 0 && p4 < 3);
181
182 assert_eq!(p1, 0);
184 assert_eq!(p2, 1);
185 assert_eq!(p3, 2);
186 assert_eq!(p4, 0); }
188
189 #[test]
190 fn test_topic_get_partition() {
191 let config = TopicConfig::default();
192 let topic = Topic::new("test".to_string(), config);
193
194 assert!(topic.get_partition(0).is_some());
195 assert!(topic.get_partition(1).is_some());
196 assert!(topic.get_partition(2).is_some());
197 assert!(topic.get_partition(3).is_none());
198 }
199
200 #[test]
201 fn test_topic_get_partition_mut() {
202 let config = TopicConfig::default();
203 let mut topic = Topic::new("test".to_string(), config);
204
205 assert!(topic.get_partition_mut(0).is_some());
206 assert!(topic.get_partition_mut(100).is_none());
207 }
208
209 #[test]
210 fn test_different_keys_may_get_different_partitions() {
211 let config = TopicConfig {
212 num_partitions: 10,
213 ..Default::default()
214 };
215 let mut topic = Topic::new("test".to_string(), config);
216
217 let partitions: Vec<i32> = (0..100)
220 .map(|i| {
221 let key = format!("key-{}", i);
222 topic.assign_partition(Some(key.as_bytes()))
223 })
224 .collect();
225
226 let unique_partitions: std::collections::HashSet<_> = partitions.iter().collect();
228 assert!(unique_partitions.len() > 1);
229 }
230
231 #[test]
232 fn test_topic_debug() {
233 let config = TopicConfig::default();
234 let topic = Topic::new("debug-test".to_string(), config);
235 let debug = format!("{:?}", topic);
236 assert!(debug.contains("Topic"));
237 assert!(debug.contains("debug-test"));
238 }
239
240 #[test]
241 fn test_topic_config_debug() {
242 let config = TopicConfig::default();
243 let debug = format!("{:?}", config);
244 assert!(debug.contains("TopicConfig"));
245 assert!(debug.contains("num_partitions"));
246 }
247}