kf_protocol_message/
kf_handler.rs

1use kf_protocol_api::PartitionOffset;
2
3use crate::offset::KfListOffsetResponse;
4use crate::offset::ListOffsetPartitionResponse;
5
6impl KfListOffsetResponse {
7
8    pub fn find_partition(self,topic: &str,partition: i32) -> Option<ListOffsetPartitionResponse> {
9
10        for topic_res in self.topics {
11            if topic_res.name == topic {
12                for partition_res in topic_res.partitions {
13                    if partition_res.partition_index == partition {
14                        return Some(partition_res);
15                    }
16                }
17            }
18        }
19
20        None
21
22    }
23
24}
25
26impl PartitionOffset for ListOffsetPartitionResponse {
27
28    fn last_stable_offset(&self) -> i64 {
29        self.offset
30    }
31
32
33    // we don't have start offset yet for kafka
34    fn start_offset(&self) -> i64 {
35        self.offset
36    }
37}