1use byteorder::WriteBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20#[derive(Debug, Default, Clone)]
50pub struct FetchResponse {
51 pub throttle_time_ms: i32,
54 pub error_code: i16,
56 pub session_id: i32,
58 pub responses: Vec<FetchableTopicResponse>,
60 pub unknown_tagged_fields: Vec<RawTaggedField>,
62}
63
64impl Encodable for FetchResponse {
65 fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
66 if version >= 1 {
67 Int32.encode(buf, self.throttle_time_ms)?;
68 }
69 if version >= 7 {
70 Int16.encode(buf, self.error_code)?;
71 Int32.encode(buf, self.session_id)?;
72 }
73 NullableArray(Struct(version), version >= 12).encode(buf, self.responses.as_slice())?;
74 if version >= 12 {
75 RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
76 }
77 Ok(())
78 }
79
80 fn calculate_size(&self, version: i16) -> usize {
81 let mut res = 0;
82 if version >= 1 {
83 res += Int32::SIZE; }
85 if version >= 7 {
86 res += Int16::SIZE; res += Int32::SIZE; }
89 res +=
90 NullableArray(Struct(version), version >= 12).calculate_size(self.responses.as_slice());
91 if version >= 12 {
92 res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
93 }
94 res
95 }
96}
97
98#[derive(Debug, Default, Clone)]
99pub struct FetchableTopicResponse {
100 pub topic: String,
102 pub topic_id: uuid::Uuid,
104 pub partitions: Vec<PartitionData>,
106 pub unknown_tagged_fields: Vec<RawTaggedField>,
108}
109
110impl Encodable for FetchableTopicResponse {
111 fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
112 if version <= 12 {
113 NullableString(version >= 12).encode(buf, self.topic.as_str())?;
114 }
115 if version >= 13 {
116 Uuid.encode(buf, self.topic_id)?;
117 }
118 NullableArray(Struct(version), version >= 12).encode(buf, self.partitions.as_slice())?;
119 if version >= 12 {
120 RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
121 }
122 Ok(())
123 }
124
125 fn calculate_size(&self, version: i16) -> usize {
126 let mut res = 0;
127 if version <= 12 {
128 res += NullableString(version >= 12).calculate_size(self.topic.as_str());
129 }
130 if version >= 13 {
131 res += Uuid::SIZE; }
133 res += NullableArray(Struct(version), version >= 12)
134 .calculate_size(self.partitions.as_slice());
135 if version >= 12 {
136 res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
137 }
138 res
139 }
140}
141
142#[derive(Debug, Clone)]
143pub struct PartitionData {
144 pub partition_index: i32,
146 pub error_code: i16,
148 pub high_watermark: i64,
150 pub last_stable_offset: i64,
154 pub log_start_offset: i64,
156 pub diverging_epoch: Option<EpochEndOffset>,
160 pub current_leader: Option<LeaderIdAndEpoch>,
161 pub snapshot_id: Option<SnapshotId>,
164 pub aborted_transactions: Option<Vec<AbortedTransaction>>,
166 pub preferred_read_replica: i32,
168 pub records: Vec<u8>,
170 pub unknown_tagged_fields: Vec<RawTaggedField>,
172}
173
174impl Default for PartitionData {
175 fn default() -> Self {
176 PartitionData {
177 partition_index: 0,
178 error_code: 0,
179 high_watermark: 0,
180 last_stable_offset: -1,
181 log_start_offset: -1,
182 diverging_epoch: None,
183 current_leader: None,
184 snapshot_id: None,
185 aborted_transactions: None,
186 preferred_read_replica: -1,
187 records: Default::default(),
188 unknown_tagged_fields: vec![],
189 }
190 }
191}
192
193impl Encodable for PartitionData {
194 fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
195 Int32.encode(buf, self.partition_index)?;
196 Int16.encode(buf, self.error_code)?;
197 Int64.encode(buf, self.high_watermark)?;
198 if version >= 4 {
199 Int64.encode(buf, self.last_stable_offset)?;
200 }
201 if version >= 5 {
202 Int64.encode(buf, self.log_start_offset)?;
203 }
204 if version >= 4 {
205 NullableArray(Struct(version), version >= 12)
206 .encode(buf, self.aborted_transactions.as_deref())?;
207 }
208 if version >= 11 {
209 Int32.encode(buf, self.preferred_read_replica)?;
210 }
211 NullableBytes(version >= 12).encode(buf, &self.records)?;
212 if version >= 12 {
213 let mut n = self.diverging_epoch.is_some() as usize;
214 n += self.current_leader.is_some() as usize;
215 n += self.snapshot_id.is_some() as usize;
216 RawTaggedFieldList.encode_with(buf, n, &self.unknown_tagged_fields, |buf| {
217 if let Some(diverging_epoch) = &self.diverging_epoch {
218 RawTaggedFieldWriter.write_field(buf, 0, Struct(version), diverging_epoch)?;
219 }
220 if let Some(current_leader) = &self.current_leader {
221 RawTaggedFieldWriter.write_field(buf, 1, Struct(version), current_leader)?;
222 }
223 if let Some(snapshot_id) = &self.snapshot_id {
224 RawTaggedFieldWriter.write_field(buf, 2, Struct(version), snapshot_id)?;
225 }
226 Ok(())
227 })?;
228 }
229 Ok(())
230 }
231
232 fn calculate_size(&self, version: i16) -> usize {
233 let mut res = 0;
234 res += Int32::SIZE; res += Int16::SIZE; res += Int64::SIZE; if version >= 4 {
238 res += Int64::SIZE; }
240 if version >= 5 {
241 res += Int64::SIZE; }
243 if version >= 4 {
244 res += NullableArray(Struct(version), version >= 12)
245 .calculate_size(self.aborted_transactions.as_deref());
246 }
247 if version >= 11 {
248 res += Int32::SIZE; }
250 res += NullableBytes(version >= 12).calculate_size(&self.records);
251 if version >= 12 {
252 let mut n = 0;
253 let mut bs = 0;
254 if let Some(diverging_epoch) = &self.diverging_epoch {
255 n += 1;
256 bs +=
257 RawTaggedFieldWriter.calculate_field_size(0, Struct(version), diverging_epoch);
258 }
259 if let Some(current_leader) = &self.current_leader {
260 n += 1;
261 bs += RawTaggedFieldWriter.calculate_field_size(0, Struct(version), current_leader);
262 }
263 if let Some(snapshot_id) = &self.snapshot_id {
264 n += 1;
265 bs += RawTaggedFieldWriter.calculate_field_size(0, Struct(version), snapshot_id);
266 }
267 res += RawTaggedFieldList.calculate_size_with(n, bs, &self.unknown_tagged_fields);
268 }
269 res
270 }
271}
272
273#[derive(Debug, Clone)]
274pub struct EpochEndOffset {
275 pub epoch: i32,
276 pub end_offset: i64,
277 pub unknown_tagged_fields: Vec<RawTaggedField>,
279}
280
281impl Default for EpochEndOffset {
282 fn default() -> Self {
283 EpochEndOffset {
284 epoch: -1,
285 end_offset: -1,
286 unknown_tagged_fields: vec![],
287 }
288 }
289}
290
291impl Encodable for EpochEndOffset {
292 fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
293 if version < 12 {
294 Err(err_encode_message_unsupported(version, "EpochEndOffset"))?
295 }
296 Int32.encode(buf, self.epoch)?;
297 Int64.encode(buf, self.end_offset)?;
298 RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
299 Ok(())
300 }
301
302 fn calculate_size(&self, _version: i16) -> usize {
303 let mut res = 0;
304 res += Int32::SIZE; res += Int64::SIZE; res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
307 res
308 }
309}
310
311#[derive(Debug, Clone)]
312pub struct LeaderIdAndEpoch {
313 pub leader_id: i32,
315 pub leader_epoch: i32,
317 pub unknown_tagged_fields: Vec<RawTaggedField>,
319}
320
321impl Default for LeaderIdAndEpoch {
322 fn default() -> Self {
323 LeaderIdAndEpoch {
324 leader_id: -1,
325 leader_epoch: -1,
326 unknown_tagged_fields: vec![],
327 }
328 }
329}
330
331impl Encodable for LeaderIdAndEpoch {
332 fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
333 if version < 12 {
334 Err(err_encode_message_unsupported(version, "LeaderIdAndEpoch"))?
335 }
336 Int32.encode(buf, self.leader_id)?;
337 Int32.encode(buf, self.leader_epoch)?;
338 RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
339 Ok(())
340 }
341
342 fn calculate_size(&self, _version: i16) -> usize {
343 let mut res = 0;
344 res += Int32::SIZE; res += Int32::SIZE; res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
347 res
348 }
349}
350
351#[derive(Debug, Clone)]
352pub struct SnapshotId {
353 pub end_offset: i64,
354 pub epoch: i32,
355 pub unknown_tagged_fields: Vec<RawTaggedField>,
357}
358
359impl Default for SnapshotId {
360 fn default() -> Self {
361 SnapshotId {
362 end_offset: -1,
363 epoch: -1,
364 unknown_tagged_fields: vec![],
365 }
366 }
367}
368
369impl Encodable for SnapshotId {
370 fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
371 if version < 12 {
372 Err(err_encode_message_unsupported(version, "SnapshotId"))?
373 }
374 Int64.encode(buf, self.end_offset)?;
375 Int32.encode(buf, self.epoch)?;
376 RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
377 Ok(())
378 }
379
380 fn calculate_size(&self, _version: i16) -> usize {
381 let mut res = 0;
382 res += Int64::SIZE; res += Int32::SIZE; res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
385 res
386 }
387}
388
389#[derive(Debug, Default, Clone)]
390pub struct AbortedTransaction {
391 pub producer_id: i64,
393 pub first_offset: i64,
395 pub unknown_tagged_fields: Vec<RawTaggedField>,
397}
398
399impl Encodable for AbortedTransaction {
400 fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
401 if version < 4 {
402 Err(err_encode_message_unsupported(
403 version,
404 "AbortedTransaction",
405 ))?
406 }
407 Int64.encode(buf, self.producer_id)?;
408 Int64.encode(buf, self.first_offset)?;
409 if version >= 12 {
410 RawTaggedFieldList.encode(buf, self.unknown_tagged_fields.as_slice())?;
411 }
412 Ok(())
413 }
414
415 fn calculate_size(&self, version: i16) -> usize {
416 let mut res = 0;
417 res += Int64::SIZE; res += Int64::SIZE; if version >= 12 {
420 res += RawTaggedFieldList.calculate_size(self.unknown_tagged_fields.as_slice());
421 }
422 res
423 }
424}