mockforge_kafka/
partitions.rs

1use std::collections::VecDeque;
2
3/// Represents a Kafka partition
4#[derive(Debug)]
5pub struct Partition {
6    pub id: i32,
7    pub messages: VecDeque<KafkaMessage>,
8    pub high_watermark: i64,
9    pub log_start_offset: i64,
10}
11
12#[derive(Debug, Clone, serde::Serialize)]
13pub struct KafkaMessage {
14    pub offset: i64,
15    pub timestamp: i64,
16    pub key: Option<Vec<u8>>,
17    pub value: Vec<u8>,
18    pub headers: Vec<(String, Vec<u8>)>,
19}
20
21impl Partition {
22    /// Create a new partition
23    pub fn new(id: i32) -> Self {
24        Self {
25            id,
26            messages: VecDeque::new(),
27            high_watermark: 0,
28            log_start_offset: 0,
29        }
30    }
31
32    /// Append a message to the partition
33    pub fn append(&mut self, message: KafkaMessage) -> i64 {
34        let offset = self.high_watermark;
35        self.messages.push_back(message);
36        self.high_watermark += 1;
37        offset
38    }
39
40    /// Fetch messages from a given offset
41    pub fn fetch(&self, offset: i64, max_bytes: i32) -> Vec<&KafkaMessage> {
42        let start_idx = (offset - self.log_start_offset) as usize;
43        let mut result = Vec::new();
44        let mut total_bytes = 0;
45
46        for message in self.messages.iter().skip(start_idx) {
47            if total_bytes + message.value.len() as i32 > max_bytes && !result.is_empty() {
48                break;
49            }
50            result.push(message);
51            total_bytes += message.value.len() as i32;
52        }
53
54        result
55    }
56
57    /// Get the latest offset
58    pub fn latest_offset(&self) -> i64 {
59        self.high_watermark - 1
60    }
61
62    /// Check if partition has messages from offset
63    pub fn has_offset(&self, offset: i64) -> bool {
64        offset >= self.log_start_offset && offset < self.high_watermark
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71
72    fn create_test_message(offset: i64, value: &[u8]) -> KafkaMessage {
73        KafkaMessage {
74            offset,
75            timestamp: 1234567890,
76            key: None,
77            value: value.to_vec(),
78            headers: vec![],
79        }
80    }
81
82    #[test]
83    fn test_partition_new() {
84        let partition = Partition::new(5);
85        assert_eq!(partition.id, 5);
86        assert!(partition.messages.is_empty());
87        assert_eq!(partition.high_watermark, 0);
88        assert_eq!(partition.log_start_offset, 0);
89    }
90
91    #[test]
92    fn test_partition_append() {
93        let mut partition = Partition::new(0);
94        let msg = create_test_message(0, b"test message");
95
96        let offset = partition.append(msg);
97        assert_eq!(offset, 0);
98        assert_eq!(partition.high_watermark, 1);
99        assert_eq!(partition.messages.len(), 1);
100    }
101
102    #[test]
103    fn test_partition_append_multiple() {
104        let mut partition = Partition::new(0);
105
106        let offset1 = partition.append(create_test_message(0, b"msg1"));
107        let offset2 = partition.append(create_test_message(1, b"msg2"));
108        let offset3 = partition.append(create_test_message(2, b"msg3"));
109
110        assert_eq!(offset1, 0);
111        assert_eq!(offset2, 1);
112        assert_eq!(offset3, 2);
113        assert_eq!(partition.high_watermark, 3);
114    }
115
116    #[test]
117    fn test_partition_fetch() {
118        let mut partition = Partition::new(0);
119        partition.append(create_test_message(0, b"msg1"));
120        partition.append(create_test_message(1, b"msg2"));
121        partition.append(create_test_message(2, b"msg3"));
122
123        let messages = partition.fetch(0, 1000);
124        assert_eq!(messages.len(), 3);
125    }
126
127    #[test]
128    fn test_partition_fetch_from_offset() {
129        let mut partition = Partition::new(0);
130        partition.append(create_test_message(0, b"msg1"));
131        partition.append(create_test_message(1, b"msg2"));
132        partition.append(create_test_message(2, b"msg3"));
133
134        let messages = partition.fetch(1, 1000);
135        assert_eq!(messages.len(), 2);
136    }
137
138    #[test]
139    fn test_partition_fetch_with_byte_limit() {
140        let mut partition = Partition::new(0);
141        partition.append(create_test_message(0, b"short"));
142        partition.append(create_test_message(1, b"this is a longer message"));
143        partition.append(create_test_message(2, b"another long message here"));
144
145        // Limit to 10 bytes - should get first message at least
146        let messages = partition.fetch(0, 10);
147        assert!(messages.len() >= 1);
148    }
149
150    #[test]
151    fn test_partition_latest_offset() {
152        let mut partition = Partition::new(0);
153        assert_eq!(partition.latest_offset(), -1); // Empty partition
154
155        partition.append(create_test_message(0, b"msg1"));
156        assert_eq!(partition.latest_offset(), 0);
157
158        partition.append(create_test_message(1, b"msg2"));
159        assert_eq!(partition.latest_offset(), 1);
160    }
161
162    #[test]
163    fn test_partition_has_offset() {
164        let mut partition = Partition::new(0);
165        assert!(!partition.has_offset(0)); // Empty partition
166
167        partition.append(create_test_message(0, b"msg1"));
168        partition.append(create_test_message(1, b"msg2"));
169
170        assert!(partition.has_offset(0));
171        assert!(partition.has_offset(1));
172        assert!(!partition.has_offset(2));
173        assert!(!partition.has_offset(-1));
174    }
175
176    #[test]
177    fn test_kafka_message_clone() {
178        let msg = KafkaMessage {
179            offset: 10,
180            timestamp: 1234567890,
181            key: Some(b"key".to_vec()),
182            value: b"value".to_vec(),
183            headers: vec![("header1".to_string(), b"hvalue".to_vec())],
184        };
185
186        let cloned = msg.clone();
187        assert_eq!(msg.offset, cloned.offset);
188        assert_eq!(msg.key, cloned.key);
189        assert_eq!(msg.value, cloned.value);
190        assert_eq!(msg.headers, cloned.headers);
191    }
192
193    #[test]
194    fn test_kafka_message_serialize() {
195        let msg = KafkaMessage {
196            offset: 5,
197            timestamp: 1234567890,
198            key: Some(b"key".to_vec()),
199            value: b"value".to_vec(),
200            headers: vec![],
201        };
202
203        let json = serde_json::to_string(&msg).unwrap();
204        assert!(json.contains("\"offset\":5"));
205    }
206
207    #[test]
208    fn test_kafka_message_debug() {
209        let msg = create_test_message(0, b"test");
210        let debug = format!("{:?}", msg);
211        assert!(debug.contains("KafkaMessage"));
212    }
213
214    #[test]
215    fn test_partition_debug() {
216        let partition = Partition::new(3);
217        let debug = format!("{:?}", partition);
218        assert!(debug.contains("Partition"));
219        assert!(debug.contains("3"));
220    }
221}