fluvio_spu_schema/server/
fetch_offset.rs1use std::fmt;
6
7use fluvio_protocol::api::Request;
8use fluvio_protocol::{Encoder, Decoder};
9use fluvio_protocol::record::PartitionOffset;
10use fluvio_protocol::record::ReplicaKey;
11
12use fluvio_types::PartitionId;
13
14use crate::COMMON_VERSION;
15use crate::errors::ErrorCode;
16use super::SpuServerApiKey;
17
18#[derive(Decoder, Encoder, Default, Debug)]
24pub struct FetchOffsetsRequest {
25 pub topics: Vec<FetchOffsetTopic>,
27
28 #[fluvio(min_version = 23)]
29 pub consumer_id: Option<String>,
30}
31
32impl Request for FetchOffsetsRequest {
33 const API_KEY: u16 = SpuServerApiKey::FetchOffsets as u16;
34 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
35 type Response = FetchOffsetsResponse;
36}
37
38impl FetchOffsetsRequest {
39 pub fn new(topic: String, partition: u32, consumer_id: Option<String>) -> Self {
41 Self {
42 topics: vec![FetchOffsetTopic {
43 name: topic,
44 partitions: vec![FetchOffsetPartition {
45 partition_index: partition,
46 }],
47 }],
48 consumer_id,
49 }
50 }
51}
52
53#[derive(Decoder, Encoder, Default, Debug)]
54pub struct FetchOffsetTopic {
55 pub name: String,
57
58 pub partitions: Vec<FetchOffsetPartition>,
60}
61
62#[derive(Decoder, Encoder, Default, Debug)]
63pub struct FetchOffsetPartition {
64 pub partition_index: PartitionId,
66}
67
68#[derive(Encoder, Decoder, Default, Debug)]
73pub struct FetchOffsetsResponse {
74 pub topics: Vec<FetchOffsetTopicResponse>,
76}
77
78impl FetchOffsetsResponse {
79 pub fn find_partition(self, replica: &ReplicaKey) -> Option<FetchOffsetPartitionResponse> {
80 for topic_res in self.topics {
81 if topic_res.name == replica.topic {
82 for partition_res in topic_res.partitions {
83 if partition_res.partition_index == replica.partition {
84 return Some(partition_res);
85 }
86 }
87 }
88 }
89
90 None
91 }
92}
93
94#[derive(Encoder, Decoder, Default, Debug)]
95pub struct FetchOffsetTopicResponse {
96 pub name: String,
98
99 pub partitions: Vec<FetchOffsetPartitionResponse>,
101}
102
103#[derive(Encoder, Decoder, Default, Debug)]
104pub struct FetchOffsetPartitionResponse {
105 pub error_code: ErrorCode,
107
108 pub partition_index: PartitionId,
110
111 pub start_offset: i64,
113
114 pub last_stable_offset: i64,
116}
117
118impl fmt::Display for FetchOffsetPartitionResponse {
119 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120 write!(
121 f,
122 "error: {:#?}, partition: {}, start: {}, last: {}",
123 self.error_code, self.partition_index, self.start_offset, self.last_stable_offset
124 )
125 }
126}
127
128impl PartitionOffset for FetchOffsetPartitionResponse {
129 fn last_stable_offset(&self) -> i64 {
130 self.last_stable_offset
131 }
132
133 fn start_offset(&self) -> i64 {
134 self.start_offset
135 }
136}