mockforge_kafka/
partitions.rs1use std::collections::VecDeque;
2
3#[derive(Debug)]
5pub struct Partition {
6 pub id: i32,
7 pub messages: VecDeque<KafkaMessage>,
8 pub high_watermark: i64,
9 pub log_start_offset: i64,
10}
11
12#[derive(Debug, Clone, serde::Serialize)]
13pub struct KafkaMessage {
14 pub offset: i64,
15 pub timestamp: i64,
16 pub key: Option<Vec<u8>>,
17 pub value: Vec<u8>,
18 pub headers: Vec<(String, Vec<u8>)>,
19}
20
21impl Partition {
22 pub fn new(id: i32) -> Self {
24 Self {
25 id,
26 messages: VecDeque::new(),
27 high_watermark: 0,
28 log_start_offset: 0,
29 }
30 }
31
32 pub fn append(&mut self, message: KafkaMessage) -> i64 {
34 let offset = self.high_watermark;
35 self.messages.push_back(message);
36 self.high_watermark += 1;
37 offset
38 }
39
40 pub fn fetch(&self, offset: i64, max_bytes: i32) -> Vec<&KafkaMessage> {
42 let start_idx = (offset - self.log_start_offset) as usize;
43 let mut result = Vec::new();
44 let mut total_bytes = 0;
45
46 for message in self.messages.iter().skip(start_idx) {
47 if total_bytes + message.value.len() as i32 > max_bytes && !result.is_empty() {
48 break;
49 }
50 result.push(message);
51 total_bytes += message.value.len() as i32;
52 }
53
54 result
55 }
56
57 pub fn latest_offset(&self) -> i64 {
59 self.high_watermark - 1
60 }
61
62 pub fn has_offset(&self, offset: i64) -> bool {
64 offset >= self.log_start_offset && offset < self.high_watermark
65 }
66}