kf_protocol_message/kf_code_gen/
fetch.rs1use std::fmt::Debug;
5use std::marker::PhantomData;
6
7use kf_protocol::Decoder;
8use kf_protocol::Encoder;
9
10use serde::{Deserialize, Serialize};
11
12use kf_protocol_api::ErrorCode;
13use kf_protocol_api::Isolation;
14use kf_protocol_api::Request;
15
16use kf_protocol_derive::Decode;
17use kf_protocol_derive::Encode;
18use kf_protocol_derive::KfDefault;
19
20#[derive(Encode, Decode, Serialize, Deserialize, KfDefault, Debug)]
25pub struct KfFetchRequest<R>
26where
27 R: Encoder + Decoder + Default + Debug,
28{
29 pub replica_id: i32,
31
32 pub max_wait: i32,
34
35 pub min_bytes: i32,
37
38 #[fluvio_kf(min_version = 3, ignorable)]
40 pub max_bytes: i32,
41
42 #[fluvio_kf(min_version = 4)]
49 pub isolation_level: Isolation,
50
51 #[fluvio_kf(min_version = 7)]
53 pub session_id: i32,
54
55 #[fluvio_kf(min_version = 7)]
57 pub epoch: i32,
58
59 pub topics: Vec<FetchableTopic>,
61
62 #[fluvio_kf(min_version = 7)]
64 pub forgotten: Vec<ForgottenTopic>,
65
66 pub data: PhantomData<R>,
67}
68
69#[derive(Encode, Decode, Serialize, Deserialize, KfDefault, Debug)]
70pub struct FetchableTopic {
71 pub name: String,
73
74 pub fetch_partitions: Vec<FetchPartition>,
76}
77
78#[derive(Encode, Decode, Serialize, Deserialize, KfDefault, Debug)]
79pub struct ForgottenTopic {
80 #[fluvio_kf(min_version = 7)]
82 pub name: String,
83
84 #[fluvio_kf(min_version = 7)]
86 pub forgotten_partition_indexes: Vec<i32>,
87}
88
89#[derive(Encode, Decode, Serialize, Deserialize, KfDefault, Debug)]
90pub struct FetchPartition {
91 pub partition_index: i32,
93
94 #[fluvio_kf(min_version = 9, ignorable)]
96 pub current_leader_epoch: i32,
97
98 pub fetch_offset: i64,
100
101 #[fluvio_kf(min_version = 5)]
104 pub log_start_offset: i64,
105
106 pub max_bytes: i32,
109}
110
111#[derive(Encode, Decode, Serialize, Deserialize, KfDefault, Debug)]
116pub struct KfFetchResponse<R>
117where
118 R: Encoder + Decoder + Default + Debug,
119{
120 #[fluvio_kf(min_version = 1, ignorable)]
123 pub throttle_time_ms: i32,
124
125 #[fluvio_kf(min_version = 7)]
127 pub error_code: ErrorCode,
128
129 #[fluvio_kf(min_version = 7)]
131 pub session_id: i32,
132
133 pub topics: Vec<FetchableTopicResponse<R>>
135}
136
137impl <R>KfFetchResponse<R>
138 where R: Encoder + Decoder + Default + Debug {
139
140 pub fn find_partition(self,topic: &str,partition: i32) -> Option<FetchablePartitionResponse<R>> {
141
142 for topic_res in self.topics {
143 if topic_res.name == topic {
144 for partition_res in topic_res.partitions {
145 if partition_res.partition_index == partition {
146 return Some(partition_res);
147 }
148 }
149 }
150 }
151
152 None
153
154 }
155
156
157}
158
159
160#[derive(Encode, Decode, Serialize, Deserialize, KfDefault, Debug)]
161pub struct FetchableTopicResponse<R>
162where
163 R: Encoder + Decoder + Default + Debug,
164{
165 pub name: String,
167
168 pub partitions: Vec<FetchablePartitionResponse<R>>,
170 pub data: PhantomData<R>,
171}
172
173#[derive(Encode, Decode, Serialize, Deserialize, KfDefault, Debug)]
174pub struct FetchablePartitionResponse<R>
175 where
176 R: Encoder + Decoder + Default + Debug,
177{
178 pub partition_index: i32,
180
181 pub error_code: ErrorCode,
183
184 pub high_watermark: i64,
186
187 #[fluvio_kf(min_version = 4, ignorable)]
191 pub last_stable_offset: i64,
192
193 #[fluvio_kf(min_version = 5, ignorable)]
195 pub log_start_offset: i64,
196
197 #[fluvio_kf(min_version = 4)]
199 pub aborted: Option<Vec<AbortedTransaction>>,
200
201 pub records: R,
203}
204
205#[derive(Encode, Decode, Serialize, Deserialize, KfDefault, Debug)]
206pub struct AbortedTransaction {
207 #[fluvio_kf(min_version = 4)]
209 pub producer_id: i64,
210
211 #[fluvio_kf(min_version = 4)]
213 pub first_offset: i64,
214}
215
216impl<R> Request for KfFetchRequest<R>
221 where R: Debug + Decoder + Encoder
222{
223 const API_KEY: u16 = 1;
224
225 const MIN_API_VERSION: i16 = 0;
226 const MAX_API_VERSION: i16 = 10;
227 const DEFAULT_API_VERSION: i16 = 10;
228
229 type Response = KfFetchResponse<R>;
230}