mockforge_kafka/
partitions.rs

1use std::collections::VecDeque;
2
3/// Represents a Kafka partition
4#[derive(Debug)]
5pub struct Partition {
6    pub id: i32,
7    pub messages: VecDeque<KafkaMessage>,
8    pub high_watermark: i64,
9    pub log_start_offset: i64,
10}
11
12#[derive(Debug, Clone, serde::Serialize)]
13pub struct KafkaMessage {
14    pub offset: i64,
15    pub timestamp: i64,
16    pub key: Option<Vec<u8>>,
17    pub value: Vec<u8>,
18    pub headers: Vec<(String, Vec<u8>)>,
19}
20
21impl Partition {
22    /// Create a new partition
23    pub fn new(id: i32) -> Self {
24        Self {
25            id,
26            messages: VecDeque::new(),
27            high_watermark: 0,
28            log_start_offset: 0,
29        }
30    }
31
32    /// Append a message to the partition
33    pub fn append(&mut self, message: KafkaMessage) -> i64 {
34        let offset = self.high_watermark;
35        self.messages.push_back(message);
36        self.high_watermark += 1;
37        offset
38    }
39
40    /// Fetch messages from a given offset
41    pub fn fetch(&self, offset: i64, max_bytes: i32) -> Vec<&KafkaMessage> {
42        let start_idx = (offset - self.log_start_offset) as usize;
43        let mut result = Vec::new();
44        let mut total_bytes = 0;
45
46        for message in self.messages.iter().skip(start_idx) {
47            if total_bytes + message.value.len() as i32 > max_bytes && !result.is_empty() {
48                break;
49            }
50            result.push(message);
51            total_bytes += message.value.len() as i32;
52        }
53
54        result
55    }
56
57    /// Get the latest offset
58    pub fn latest_offset(&self) -> i64 {
59        self.high_watermark - 1
60    }
61
62    /// Check if partition has messages from offset
63    pub fn has_offset(&self, offset: i64) -> bool {
64        offset >= self.log_start_offset && offset < self.high_watermark
65    }
66}