1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i8, get_i32, get_i64, put_i8, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
9};
10use crate::primitives::string_bytes_borrowed::{get_compact_string_borrowed, get_string_borrowed};
11use crate::tagged_fields::{
12 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
13};
14use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
15
16pub const API_KEY: i16 = 1;
17pub const MIN_VERSION: i16 = 4;
18pub const MAX_VERSION: i16 = 18;
19pub const FLEXIBLE_MIN: i16 = 12;
20
21#[inline]
22fn is_flexible(version: i16) -> bool {
23 version >= FLEXIBLE_MIN
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct FetchRequest<'a> {
28 pub replica_id: i32,
29 pub max_wait_ms: i32,
30 pub min_bytes: i32,
31 pub max_bytes: i32,
32 pub isolation_level: i8,
33 pub session_id: i32,
34 pub session_epoch: i32,
35 pub topics: Vec<FetchTopic<'a>>,
36 pub forgotten_topics_data: Vec<ForgottenTopic<'a>>,
37 pub rack_id: &'a str,
38 pub cluster_id: Option<String>,
39 pub replica_state: ReplicaState,
40 pub unknown_tagged_fields: UnknownTaggedFields,
41}
42impl Default for FetchRequest<'_> {
43 fn default() -> Self {
44 Self {
45 replica_id: -1i32,
46 max_wait_ms: 0i32,
47 min_bytes: 0i32,
48 max_bytes: 2_147_483_647i32,
49 isolation_level: 0i8,
50 session_id: 0i32,
51 session_epoch: -1i32,
52 topics: Vec::new(),
53 forgotten_topics_data: Vec::new(),
54 rack_id: "",
55 cluster_id: None,
56 replica_state: Default::default(),
57 unknown_tagged_fields: Default::default(),
58 }
59 }
60}
61impl FetchRequest<'_> {
62 pub fn to_owned(&self) -> crate::owned::fetch_request::FetchRequest {
63 crate::owned::fetch_request::FetchRequest {
64 replica_id: (self.replica_id),
65 max_wait_ms: (self.max_wait_ms),
66 min_bytes: (self.min_bytes),
67 max_bytes: (self.max_bytes),
68 isolation_level: (self.isolation_level),
69 session_id: (self.session_id),
70 session_epoch: (self.session_epoch),
71 topics: (self.topics).iter().map(FetchTopic::to_owned).collect(),
72 forgotten_topics_data: (self.forgotten_topics_data)
73 .iter()
74 .map(ForgottenTopic::to_owned)
75 .collect(),
76 rack_id: (self.rack_id).to_string(),
77 cluster_id: self.cluster_id.clone(),
78 replica_state: (self.replica_state).to_owned(),
79 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
80 }
81 }
82}
83impl Encode for FetchRequest<'_> {
84 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
85 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
86 return Err(ProtocolError::UnsupportedVersion {
87 api_key: API_KEY,
88 version,
89 });
90 }
91 let flex = is_flexible(version);
92 if (0..=14).contains(&version) {
93 put_i32(buf, self.replica_id);
94 }
95 if version >= 0 {
96 put_i32(buf, self.max_wait_ms);
97 }
98 if version >= 0 {
99 put_i32(buf, self.min_bytes);
100 }
101 if version >= 3 {
102 put_i32(buf, self.max_bytes);
103 }
104 if version >= 4 {
105 put_i8(buf, self.isolation_level);
106 }
107 if version >= 7 {
108 put_i32(buf, self.session_id);
109 }
110 if version >= 7 {
111 put_i32(buf, self.session_epoch);
112 }
113 if version >= 0 {
114 {
115 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
116 for it in &self.topics {
117 it.encode(buf, version)?;
118 }
119 }
120 }
121 if version >= 7 {
122 {
123 crate::primitives::array::put_array_len(
124 buf,
125 (self.forgotten_topics_data).len(),
126 flex,
127 );
128 for it in &self.forgotten_topics_data {
129 it.encode(buf, version)?;
130 }
131 }
132 }
133 if version >= 11 {
134 if flex {
135 put_compact_string(buf, self.rack_id);
136 } else {
137 put_string(buf, self.rack_id);
138 }
139 }
140 if flex {
141 let mut tagged = WriteTaggedFields::new();
142 if !(self.cluster_id.is_none()) {
143 let payload = encode_to_bytes(
144 if flex {
145 compact_nullable_string_len(self.cluster_id.as_deref())
146 } else {
147 nullable_string_len(self.cluster_id.as_deref())
148 },
149 |b| {
150 if flex {
151 put_compact_nullable_string(b, self.cluster_id.as_deref());
152 } else {
153 put_nullable_string(b, self.cluster_id.as_deref());
154 }
155 Ok(())
156 },
157 );
158 tagged.add(0, payload);
159 }
160 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
161 let payload = encode_to_bytes(self.replica_state.encoded_len(version), |b| {
162 self.replica_state.encode(b, version)?;
163 Ok(())
164 });
165 tagged.add(1, payload);
166 }
167 tagged.write(buf, &self.unknown_tagged_fields);
168 }
169 Ok(())
170 }
171 fn encoded_len(&self, version: i16) -> usize {
172 let flex = is_flexible(version);
173 let mut n: usize = 0;
174 if (0..=14).contains(&version) {
175 n += 4;
176 }
177 if version >= 0 {
178 n += 4;
179 }
180 if version >= 0 {
181 n += 4;
182 }
183 if version >= 3 {
184 n += 4;
185 }
186 if version >= 4 {
187 n += 1;
188 }
189 if version >= 7 {
190 n += 4;
191 }
192 if version >= 7 {
193 n += 4;
194 }
195 if version >= 0 {
196 n += {
197 let prefix =
198 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
199 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
200 prefix + body
201 };
202 }
203 if version >= 7 {
204 n += {
205 let prefix = crate::primitives::array::array_len_prefix_len(
206 (self.forgotten_topics_data).len(),
207 flex,
208 );
209 let body: usize = (self.forgotten_topics_data)
210 .iter()
211 .map(|it| it.encoded_len(version))
212 .sum();
213 prefix + body
214 };
215 }
216 if version >= 11 {
217 n += if flex {
218 compact_string_len(self.rack_id)
219 } else {
220 string_len(self.rack_id)
221 };
222 }
223 if flex {
224 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
225 if !(self.cluster_id.is_none()) {
226 known_pairs.push((
227 0,
228 if flex {
229 compact_nullable_string_len(self.cluster_id.as_deref())
230 } else {
231 nullable_string_len(self.cluster_id.as_deref())
232 },
233 ));
234 }
235 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
236 known_pairs.push((1, self.replica_state.encoded_len(version)));
237 }
238 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
239 }
240 n
241 }
242}
243impl<'de> DecodeBorrow<'de> for FetchRequest<'de> {
244 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
245 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
246 return Err(ProtocolError::UnsupportedVersion {
247 api_key: API_KEY,
248 version,
249 });
250 }
251 let flex = is_flexible(version);
252 let mut out = Self::default();
253 if (0..=14).contains(&version) {
254 out.replica_id = get_i32(buf)?;
255 }
256 if version >= 0 {
257 out.max_wait_ms = get_i32(buf)?;
258 }
259 if version >= 0 {
260 out.min_bytes = get_i32(buf)?;
261 }
262 if version >= 3 {
263 out.max_bytes = get_i32(buf)?;
264 }
265 if version >= 4 {
266 out.isolation_level = get_i8(buf)?;
267 }
268 if version >= 7 {
269 out.session_id = get_i32(buf)?;
270 }
271 if version >= 7 {
272 out.session_epoch = get_i32(buf)?;
273 }
274 if version >= 0 {
275 out.topics = {
276 let n = crate::primitives::array::get_array_len(buf, flex)?;
277 let mut v = Vec::with_capacity(n);
278 for _ in 0..n {
279 v.push(FetchTopic::decode_borrow(buf, version)?);
280 }
281 v
282 };
283 }
284 if version >= 7 {
285 out.forgotten_topics_data = {
286 let n = crate::primitives::array::get_array_len(buf, flex)?;
287 let mut v = Vec::with_capacity(n);
288 for _ in 0..n {
289 v.push(ForgottenTopic::decode_borrow(buf, version)?);
290 }
291 v
292 };
293 }
294 if version >= 11 {
295 out.rack_id = if flex {
296 get_compact_string_borrowed(buf)?
297 } else {
298 get_string_borrowed(buf)?
299 };
300 }
301 if flex {
302 let mut tag_cluster_id = None;
303 let mut tag_replica_state = None;
304 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
305 0 => {
306 tag_cluster_id = Some({
307 let b: &mut &[u8] = payload;
308 if flex {
309 crate::primitives::string_bytes::get_compact_nullable_string_owned(b)?
310 } else {
311 crate::primitives::string_bytes::get_nullable_string_owned(b)?
312 }
313 });
314 Ok(true)
315 }
316 1 => {
317 tag_replica_state = Some({
318 let b: &mut &[u8] = payload;
319 ReplicaState::decode_borrow(b, version)?
320 });
321 Ok(true)
322 }
323 _ => Ok(false),
324 })?;
325 if let Some(v) = tag_cluster_id {
326 out.cluster_id = v;
327 }
328 if let Some(v) = tag_replica_state {
329 out.replica_state = v;
330 }
331 }
332 Ok(out)
333 }
334}
335#[cfg(test)]
336impl FetchRequest<'_> {
337 #[must_use]
338 pub fn populated(version: i16) -> Self {
339 let mut m = Self::default();
340 if (0..=14).contains(&version) {
341 m.replica_id = 1i32;
342 }
343 if version >= 0 {
344 m.max_wait_ms = 1i32;
345 }
346 if version >= 0 {
347 m.min_bytes = 1i32;
348 }
349 if version >= 3 {
350 m.max_bytes = 1i32;
351 }
352 if version >= 4 {
353 m.isolation_level = 1i8;
354 }
355 if version >= 7 {
356 m.session_id = 1i32;
357 }
358 if version >= 7 {
359 m.session_epoch = 1i32;
360 }
361 if version >= 0 {
362 m.topics = vec![FetchTopic::populated(version)];
363 }
364 if version >= 7 {
365 m.forgotten_topics_data = vec![ForgottenTopic::populated(version)];
366 }
367 if version >= 11 {
368 m.rack_id = "x";
369 }
370 if version >= 12 {
371 m.cluster_id = Some("x".to_string());
372 }
373 if version >= 15 {
374 m.replica_state = ReplicaState::populated(version);
375 }
376 m
377 }
378}
379#[derive(Debug, Clone, PartialEq, Eq)]
380pub struct ReplicaState {
381 pub replica_id: i32,
382 pub replica_epoch: i64,
383 pub unknown_tagged_fields: UnknownTaggedFields,
384}
385impl Default for ReplicaState {
386 fn default() -> Self {
387 Self {
388 replica_id: -1i32,
389 replica_epoch: -1i64,
390 unknown_tagged_fields: Default::default(),
391 }
392 }
393}
394impl ReplicaState {
395 pub fn to_owned(&self) -> crate::owned::fetch_request::ReplicaState {
396 crate::owned::fetch_request::ReplicaState {
397 replica_id: (self.replica_id),
398 replica_epoch: (self.replica_epoch),
399 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
400 }
401 }
402}
403impl Encode for ReplicaState {
404 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
405 let flex = version >= 12;
406 if version >= 15 {
407 put_i32(buf, self.replica_id);
408 }
409 if version >= 15 {
410 put_i64(buf, self.replica_epoch);
411 }
412 if flex {
413 let tagged = WriteTaggedFields::new();
414 tagged.write(buf, &self.unknown_tagged_fields);
415 }
416 Ok(())
417 }
418 fn encoded_len(&self, version: i16) -> usize {
419 let flex = version >= 12;
420 let mut n: usize = 0;
421 if version >= 15 {
422 n += 4;
423 }
424 if version >= 15 {
425 n += 8;
426 }
427 if flex {
428 let known_pairs: Vec<(u32, usize)> = Vec::new();
429 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
430 }
431 n
432 }
433}
434impl<'de> DecodeBorrow<'de> for ReplicaState {
435 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
436 let flex = version >= 12;
437 let mut out = Self::default();
438 if version >= 15 {
439 out.replica_id = get_i32(buf)?;
440 }
441 if version >= 15 {
442 out.replica_epoch = get_i64(buf)?;
443 }
444 if flex {
445 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
446 }
447 Ok(out)
448 }
449}
450#[cfg(test)]
451impl ReplicaState {
452 #[must_use]
453 pub fn populated(version: i16) -> Self {
454 let mut m = Self::default();
455 if version >= 15 {
456 m.replica_id = 1i32;
457 }
458 if version >= 15 {
459 m.replica_epoch = 1i64;
460 }
461 m
462 }
463}
464#[derive(Debug, Clone, PartialEq, Eq, Default)]
465pub struct FetchTopic<'a> {
466 pub topic: &'a str,
467 pub topic_id: crate::primitives::uuid::Uuid,
468 pub partitions: Vec<FetchPartition>,
469 pub unknown_tagged_fields: UnknownTaggedFields,
470}
471impl FetchTopic<'_> {
472 pub fn to_owned(&self) -> crate::owned::fetch_request::FetchTopic {
473 crate::owned::fetch_request::FetchTopic {
474 topic: (self.topic).to_string(),
475 topic_id: (self.topic_id),
476 partitions: (self.partitions)
477 .iter()
478 .map(FetchPartition::to_owned)
479 .collect(),
480 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
481 }
482 }
483}
484impl Encode for FetchTopic<'_> {
485 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
486 let flex = version >= 12;
487 if (0..=12).contains(&version) {
488 if flex {
489 put_compact_string(buf, self.topic);
490 } else {
491 put_string(buf, self.topic);
492 }
493 }
494 if version >= 13 {
495 crate::primitives::uuid::put_uuid(buf, self.topic_id);
496 }
497 if version >= 0 {
498 {
499 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
500 for it in &self.partitions {
501 it.encode(buf, version)?;
502 }
503 }
504 }
505 if flex {
506 let tagged = WriteTaggedFields::new();
507 tagged.write(buf, &self.unknown_tagged_fields);
508 }
509 Ok(())
510 }
511 fn encoded_len(&self, version: i16) -> usize {
512 let flex = version >= 12;
513 let mut n: usize = 0;
514 if (0..=12).contains(&version) {
515 n += if flex {
516 compact_string_len(self.topic)
517 } else {
518 string_len(self.topic)
519 };
520 }
521 if version >= 13 {
522 n += 16;
523 }
524 if version >= 0 {
525 n += {
526 let prefix =
527 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
528 let body: usize = (self.partitions)
529 .iter()
530 .map(|it| it.encoded_len(version))
531 .sum();
532 prefix + body
533 };
534 }
535 if flex {
536 let known_pairs: Vec<(u32, usize)> = Vec::new();
537 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
538 }
539 n
540 }
541}
542impl<'de> DecodeBorrow<'de> for FetchTopic<'de> {
543 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
544 let flex = version >= 12;
545 let mut out = Self::default();
546 if (0..=12).contains(&version) {
547 out.topic = if flex {
548 get_compact_string_borrowed(buf)?
549 } else {
550 get_string_borrowed(buf)?
551 };
552 }
553 if version >= 13 {
554 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
555 }
556 if version >= 0 {
557 out.partitions = {
558 let n = crate::primitives::array::get_array_len(buf, flex)?;
559 let mut v = Vec::with_capacity(n);
560 for _ in 0..n {
561 v.push(FetchPartition::decode_borrow(buf, version)?);
562 }
563 v
564 };
565 }
566 if flex {
567 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
568 }
569 Ok(out)
570 }
571}
572#[cfg(test)]
573impl FetchTopic<'_> {
574 #[must_use]
575 pub fn populated(version: i16) -> Self {
576 let mut m = Self::default();
577 if (0..=12).contains(&version) {
578 m.topic = "x";
579 }
580 if version >= 13 {
581 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
582 }
583 if version >= 0 {
584 m.partitions = vec![FetchPartition::populated(version)];
585 }
586 m
587 }
588}
589#[derive(Debug, Clone, PartialEq, Eq)]
590pub struct FetchPartition {
591 pub partition: i32,
592 pub current_leader_epoch: i32,
593 pub fetch_offset: i64,
594 pub last_fetched_epoch: i32,
595 pub log_start_offset: i64,
596 pub partition_max_bytes: i32,
597 pub replica_directory_id: crate::primitives::uuid::Uuid,
598 pub high_watermark: i64,
599 pub unknown_tagged_fields: UnknownTaggedFields,
600}
601impl Default for FetchPartition {
602 fn default() -> Self {
603 Self {
604 partition: 0i32,
605 current_leader_epoch: -1i32,
606 fetch_offset: 0i64,
607 last_fetched_epoch: -1i32,
608 log_start_offset: -1i64,
609 partition_max_bytes: 0i32,
610 replica_directory_id: Default::default(),
611 high_watermark: 9_223_372_036_854_775_807i64,
612 unknown_tagged_fields: Default::default(),
613 }
614 }
615}
616impl FetchPartition {
617 pub fn to_owned(&self) -> crate::owned::fetch_request::FetchPartition {
618 crate::owned::fetch_request::FetchPartition {
619 partition: (self.partition),
620 current_leader_epoch: (self.current_leader_epoch),
621 fetch_offset: (self.fetch_offset),
622 last_fetched_epoch: (self.last_fetched_epoch),
623 log_start_offset: (self.log_start_offset),
624 partition_max_bytes: (self.partition_max_bytes),
625 replica_directory_id: (self.replica_directory_id),
626 high_watermark: (self.high_watermark),
627 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
628 }
629 }
630}
631impl Encode for FetchPartition {
632 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
633 let flex = version >= 12;
634 if version >= 0 {
635 put_i32(buf, self.partition);
636 }
637 if version >= 9 {
638 put_i32(buf, self.current_leader_epoch);
639 }
640 if version >= 0 {
641 put_i64(buf, self.fetch_offset);
642 }
643 if version >= 12 {
644 put_i32(buf, self.last_fetched_epoch);
645 }
646 if version >= 5 {
647 put_i64(buf, self.log_start_offset);
648 }
649 if version >= 0 {
650 put_i32(buf, self.partition_max_bytes);
651 }
652 if flex {
653 let mut tagged = WriteTaggedFields::new();
654 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
655 let payload = encode_to_bytes(16, |b| {
656 crate::primitives::uuid::put_uuid(b, self.replica_directory_id);
657 Ok(())
658 });
659 tagged.add(0, payload);
660 }
661 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
662 let payload = encode_to_bytes(8, |b| {
663 put_i64(b, self.high_watermark);
664 Ok(())
665 });
666 tagged.add(1, payload);
667 }
668 tagged.write(buf, &self.unknown_tagged_fields);
669 }
670 Ok(())
671 }
672 fn encoded_len(&self, version: i16) -> usize {
673 let flex = version >= 12;
674 let mut n: usize = 0;
675 if version >= 0 {
676 n += 4;
677 }
678 if version >= 9 {
679 n += 4;
680 }
681 if version >= 0 {
682 n += 8;
683 }
684 if version >= 12 {
685 n += 4;
686 }
687 if version >= 5 {
688 n += 8;
689 }
690 if version >= 0 {
691 n += 4;
692 }
693 if flex {
694 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
695 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
696 known_pairs.push((0, 16));
697 }
698 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
699 known_pairs.push((1, 8));
700 }
701 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
702 }
703 n
704 }
705}
706impl<'de> DecodeBorrow<'de> for FetchPartition {
707 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
708 let flex = version >= 12;
709 let mut out = Self::default();
710 if version >= 0 {
711 out.partition = get_i32(buf)?;
712 }
713 if version >= 9 {
714 out.current_leader_epoch = get_i32(buf)?;
715 }
716 if version >= 0 {
717 out.fetch_offset = get_i64(buf)?;
718 }
719 if version >= 12 {
720 out.last_fetched_epoch = get_i32(buf)?;
721 }
722 if version >= 5 {
723 out.log_start_offset = get_i64(buf)?;
724 }
725 if version >= 0 {
726 out.partition_max_bytes = get_i32(buf)?;
727 }
728 if flex {
729 let mut tag_replica_directory_id = None;
730 let mut tag_high_watermark = None;
731 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
732 0 => {
733 tag_replica_directory_id = Some({
734 let b: &mut &[u8] = payload;
735 crate::primitives::uuid::get_uuid(b)?
736 });
737 Ok(true)
738 }
739 1 => {
740 tag_high_watermark = Some({
741 let b: &mut &[u8] = payload;
742 get_i64(b)?
743 });
744 Ok(true)
745 }
746 _ => Ok(false),
747 })?;
748 if let Some(v) = tag_replica_directory_id {
749 out.replica_directory_id = v;
750 }
751 if let Some(v) = tag_high_watermark {
752 out.high_watermark = v;
753 }
754 }
755 Ok(out)
756 }
757}
758#[cfg(test)]
759impl FetchPartition {
760 #[must_use]
761 pub fn populated(version: i16) -> Self {
762 let mut m = Self::default();
763 if version >= 0 {
764 m.partition = 1i32;
765 }
766 if version >= 9 {
767 m.current_leader_epoch = 1i32;
768 }
769 if version >= 0 {
770 m.fetch_offset = 1i64;
771 }
772 if version >= 12 {
773 m.last_fetched_epoch = 1i32;
774 }
775 if version >= 5 {
776 m.log_start_offset = 1i64;
777 }
778 if version >= 0 {
779 m.partition_max_bytes = 1i32;
780 }
781 if version >= 17 {
782 m.replica_directory_id = crate::primitives::uuid::Uuid([1u8; 16]);
783 }
784 if version >= 18 {
785 m.high_watermark = 1i64;
786 }
787 m
788 }
789}
790#[derive(Debug, Clone, PartialEq, Eq, Default)]
791pub struct ForgottenTopic<'a> {
792 pub topic: &'a str,
793 pub topic_id: crate::primitives::uuid::Uuid,
794 pub partitions: Vec<i32>,
795 pub unknown_tagged_fields: UnknownTaggedFields,
796}
797impl ForgottenTopic<'_> {
798 pub fn to_owned(&self) -> crate::owned::fetch_request::ForgottenTopic {
799 crate::owned::fetch_request::ForgottenTopic {
800 topic: (self.topic).to_string(),
801 topic_id: (self.topic_id),
802 partitions: (self.partitions).clone(),
803 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
804 }
805 }
806}
807impl Encode for ForgottenTopic<'_> {
808 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
809 let flex = version >= 12;
810 if (7..=12).contains(&version) {
811 if flex {
812 put_compact_string(buf, self.topic);
813 } else {
814 put_string(buf, self.topic);
815 }
816 }
817 if version >= 13 {
818 crate::primitives::uuid::put_uuid(buf, self.topic_id);
819 }
820 if version >= 7 {
821 {
822 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
823 for it in &self.partitions {
824 put_i32(buf, *it);
825 }
826 }
827 }
828 if flex {
829 let tagged = WriteTaggedFields::new();
830 tagged.write(buf, &self.unknown_tagged_fields);
831 }
832 Ok(())
833 }
834 fn encoded_len(&self, version: i16) -> usize {
835 let flex = version >= 12;
836 let mut n: usize = 0;
837 if (7..=12).contains(&version) {
838 n += if flex {
839 compact_string_len(self.topic)
840 } else {
841 string_len(self.topic)
842 };
843 }
844 if version >= 13 {
845 n += 16;
846 }
847 if version >= 7 {
848 n += {
849 let prefix =
850 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
851 let body: usize = (self.partitions).iter().map(|_| 4).sum();
852 prefix + body
853 };
854 }
855 if flex {
856 let known_pairs: Vec<(u32, usize)> = Vec::new();
857 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
858 }
859 n
860 }
861}
862impl<'de> DecodeBorrow<'de> for ForgottenTopic<'de> {
863 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
864 let flex = version >= 12;
865 let mut out = Self::default();
866 if (7..=12).contains(&version) {
867 out.topic = if flex {
868 get_compact_string_borrowed(buf)?
869 } else {
870 get_string_borrowed(buf)?
871 };
872 }
873 if version >= 13 {
874 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
875 }
876 if version >= 7 {
877 out.partitions = {
878 let n = crate::primitives::array::get_array_len(buf, flex)?;
879 let mut v = Vec::with_capacity(n);
880 for _ in 0..n {
881 v.push(get_i32(buf)?);
882 }
883 v
884 };
885 }
886 if flex {
887 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
888 }
889 Ok(out)
890 }
891}
892#[cfg(test)]
893impl ForgottenTopic<'_> {
894 #[must_use]
895 pub fn populated(version: i16) -> Self {
896 let mut m = Self::default();
897 if (7..=12).contains(&version) {
898 m.topic = "x";
899 }
900 if version >= 13 {
901 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
902 }
903 if version >= 7 {
904 m.partitions = vec![1i32];
905 }
906 m
907 }
908}