mockforge_kafka/
partitions.rs1use std::collections::VecDeque;
2
3#[derive(Debug)]
5pub struct Partition {
6 pub id: i32,
7 pub messages: VecDeque<KafkaMessage>,
8 pub high_watermark: i64,
9 pub log_start_offset: i64,
10}
11
12#[derive(Debug, Clone, serde::Serialize)]
13pub struct KafkaMessage {
14 pub offset: i64,
15 pub timestamp: i64,
16 pub key: Option<Vec<u8>>,
17 pub value: Vec<u8>,
18 pub headers: Vec<(String, Vec<u8>)>,
19}
20
21impl Partition {
22 pub fn new(id: i32) -> Self {
24 Self {
25 id,
26 messages: VecDeque::new(),
27 high_watermark: 0,
28 log_start_offset: 0,
29 }
30 }
31
32 pub fn append(&mut self, message: KafkaMessage) -> i64 {
34 let offset = self.high_watermark;
35 self.messages.push_back(message);
36 self.high_watermark += 1;
37 offset
38 }
39
40 pub fn fetch(&self, offset: i64, max_bytes: i32) -> Vec<&KafkaMessage> {
42 let start_idx = (offset - self.log_start_offset) as usize;
43 let mut result = Vec::new();
44 let mut total_bytes = 0;
45
46 for message in self.messages.iter().skip(start_idx) {
47 if total_bytes + message.value.len() as i32 > max_bytes && !result.is_empty() {
48 break;
49 }
50 result.push(message);
51 total_bytes += message.value.len() as i32;
52 }
53
54 result
55 }
56
57 pub fn latest_offset(&self) -> i64 {
59 self.high_watermark - 1
60 }
61
62 pub fn has_offset(&self, offset: i64) -> bool {
64 offset >= self.log_start_offset && offset < self.high_watermark
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use super::*;
71
72 fn create_test_message(offset: i64, value: &[u8]) -> KafkaMessage {
73 KafkaMessage {
74 offset,
75 timestamp: 1234567890,
76 key: None,
77 value: value.to_vec(),
78 headers: vec![],
79 }
80 }
81
82 #[test]
83 fn test_partition_new() {
84 let partition = Partition::new(5);
85 assert_eq!(partition.id, 5);
86 assert!(partition.messages.is_empty());
87 assert_eq!(partition.high_watermark, 0);
88 assert_eq!(partition.log_start_offset, 0);
89 }
90
91 #[test]
92 fn test_partition_append() {
93 let mut partition = Partition::new(0);
94 let msg = create_test_message(0, b"test message");
95
96 let offset = partition.append(msg);
97 assert_eq!(offset, 0);
98 assert_eq!(partition.high_watermark, 1);
99 assert_eq!(partition.messages.len(), 1);
100 }
101
102 #[test]
103 fn test_partition_append_multiple() {
104 let mut partition = Partition::new(0);
105
106 let offset1 = partition.append(create_test_message(0, b"msg1"));
107 let offset2 = partition.append(create_test_message(1, b"msg2"));
108 let offset3 = partition.append(create_test_message(2, b"msg3"));
109
110 assert_eq!(offset1, 0);
111 assert_eq!(offset2, 1);
112 assert_eq!(offset3, 2);
113 assert_eq!(partition.high_watermark, 3);
114 }
115
116 #[test]
117 fn test_partition_fetch() {
118 let mut partition = Partition::new(0);
119 partition.append(create_test_message(0, b"msg1"));
120 partition.append(create_test_message(1, b"msg2"));
121 partition.append(create_test_message(2, b"msg3"));
122
123 let messages = partition.fetch(0, 1000);
124 assert_eq!(messages.len(), 3);
125 }
126
127 #[test]
128 fn test_partition_fetch_from_offset() {
129 let mut partition = Partition::new(0);
130 partition.append(create_test_message(0, b"msg1"));
131 partition.append(create_test_message(1, b"msg2"));
132 partition.append(create_test_message(2, b"msg3"));
133
134 let messages = partition.fetch(1, 1000);
135 assert_eq!(messages.len(), 2);
136 }
137
138 #[test]
139 fn test_partition_fetch_with_byte_limit() {
140 let mut partition = Partition::new(0);
141 partition.append(create_test_message(0, b"short"));
142 partition.append(create_test_message(1, b"this is a longer message"));
143 partition.append(create_test_message(2, b"another long message here"));
144
145 let messages = partition.fetch(0, 10);
147 assert!(messages.len() >= 1);
148 }
149
150 #[test]
151 fn test_partition_latest_offset() {
152 let mut partition = Partition::new(0);
153 assert_eq!(partition.latest_offset(), -1); partition.append(create_test_message(0, b"msg1"));
156 assert_eq!(partition.latest_offset(), 0);
157
158 partition.append(create_test_message(1, b"msg2"));
159 assert_eq!(partition.latest_offset(), 1);
160 }
161
162 #[test]
163 fn test_partition_has_offset() {
164 let mut partition = Partition::new(0);
165 assert!(!partition.has_offset(0)); partition.append(create_test_message(0, b"msg1"));
168 partition.append(create_test_message(1, b"msg2"));
169
170 assert!(partition.has_offset(0));
171 assert!(partition.has_offset(1));
172 assert!(!partition.has_offset(2));
173 assert!(!partition.has_offset(-1));
174 }
175
176 #[test]
177 fn test_kafka_message_clone() {
178 let msg = KafkaMessage {
179 offset: 10,
180 timestamp: 1234567890,
181 key: Some(b"key".to_vec()),
182 value: b"value".to_vec(),
183 headers: vec![("header1".to_string(), b"hvalue".to_vec())],
184 };
185
186 let cloned = msg.clone();
187 assert_eq!(msg.offset, cloned.offset);
188 assert_eq!(msg.key, cloned.key);
189 assert_eq!(msg.value, cloned.value);
190 assert_eq!(msg.headers, cloned.headers);
191 }
192
193 #[test]
194 fn test_kafka_message_serialize() {
195 let msg = KafkaMessage {
196 offset: 5,
197 timestamp: 1234567890,
198 key: Some(b"key".to_vec()),
199 value: b"value".to_vec(),
200 headers: vec![],
201 };
202
203 let json = serde_json::to_string(&msg).unwrap();
204 assert!(json.contains("\"offset\":5"));
205 }
206
207 #[test]
208 fn test_kafka_message_debug() {
209 let msg = create_test_message(0, b"test");
210 let debug = format!("{:?}", msg);
211 assert!(debug.contains("KafkaMessage"));
212 }
213
214 #[test]
215 fn test_partition_debug() {
216 let partition = Partition::new(3);
217 let debug = format!("{:?}", partition);
218 assert!(debug.contains("Partition"));
219 assert!(debug.contains("3"));
220 }
221}