1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
9};
10use crate::primitives::string_bytes::{
11 put_bytes, put_compact_bytes, put_compact_nullable_bytes, put_nullable_bytes,
12};
13use crate::primitives::string_bytes_borrowed::{
14 get_bytes_borrowed, get_compact_bytes_borrowed, get_compact_nullable_bytes_borrowed,
15 get_nullable_bytes_borrowed,
16};
17use crate::primitives::string_bytes_borrowed::{
18 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
19 get_nullable_string_borrowed, get_string_borrowed,
20};
21use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
22use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
23
24pub const API_KEY: i16 = 78;
25pub const MIN_VERSION: i16 = 1;
26pub const MAX_VERSION: i16 = 2;
27pub const FLEXIBLE_MIN: i16 = 0;
28
29#[inline]
30fn is_flexible(version: i16) -> bool {
31 version >= FLEXIBLE_MIN
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Default)]
35pub struct ShareFetchResponse<'a> {
36 pub throttle_time_ms: i32,
37 pub error_code: i16,
38 pub error_message: Option<&'a str>,
39 pub acquisition_lock_timeout_ms: i32,
40 pub responses: Vec<ShareFetchableTopicResponse<'a>>,
41 pub node_endpoints: Vec<NodeEndpoint<'a>>,
42 pub unknown_tagged_fields: UnknownTaggedFields,
43}
44impl ShareFetchResponse<'_> {
45 pub fn to_owned(&self) -> crate::owned::share_fetch_response::ShareFetchResponse {
46 crate::owned::share_fetch_response::ShareFetchResponse {
47 throttle_time_ms: (self.throttle_time_ms),
48 error_code: (self.error_code),
49 error_message: (self.error_message).map(std::string::ToString::to_string),
50 acquisition_lock_timeout_ms: (self.acquisition_lock_timeout_ms),
51 responses: (self.responses)
52 .iter()
53 .map(ShareFetchableTopicResponse::to_owned)
54 .collect(),
55 node_endpoints: (self.node_endpoints)
56 .iter()
57 .map(NodeEndpoint::to_owned)
58 .collect(),
59 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
60 }
61 }
62}
63impl Encode for ShareFetchResponse<'_> {
64 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
65 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
66 return Err(ProtocolError::UnsupportedVersion {
67 api_key: API_KEY,
68 version,
69 });
70 }
71 let flex = is_flexible(version);
72 if version >= 0 {
73 put_i32(buf, self.throttle_time_ms);
74 }
75 if version >= 0 {
76 put_i16(buf, self.error_code);
77 }
78 if version >= 0 {
79 if flex {
80 put_compact_nullable_string(buf, self.error_message);
81 } else {
82 put_nullable_string(buf, self.error_message);
83 }
84 }
85 if version >= 1 {
86 put_i32(buf, self.acquisition_lock_timeout_ms);
87 }
88 if version >= 0 {
89 {
90 crate::primitives::array::put_array_len(buf, (self.responses).len(), flex);
91 for it in &self.responses {
92 it.encode(buf, version)?;
93 }
94 }
95 }
96 if version >= 0 {
97 {
98 crate::primitives::array::put_array_len(buf, (self.node_endpoints).len(), flex);
99 for it in &self.node_endpoints {
100 it.encode(buf, version)?;
101 }
102 }
103 }
104 if flex {
105 let tagged = WriteTaggedFields::new();
106 tagged.write(buf, &self.unknown_tagged_fields);
107 }
108 Ok(())
109 }
110 fn encoded_len(&self, version: i16) -> usize {
111 let flex = is_flexible(version);
112 let mut n: usize = 0;
113 if version >= 0 {
114 n += 4;
115 }
116 if version >= 0 {
117 n += 2;
118 }
119 if version >= 0 {
120 n += if flex {
121 compact_nullable_string_len(self.error_message)
122 } else {
123 nullable_string_len(self.error_message)
124 };
125 }
126 if version >= 1 {
127 n += 4;
128 }
129 if version >= 0 {
130 n += {
131 let prefix =
132 crate::primitives::array::array_len_prefix_len((self.responses).len(), flex);
133 let body: usize = (self.responses)
134 .iter()
135 .map(|it| it.encoded_len(version))
136 .sum();
137 prefix + body
138 };
139 }
140 if version >= 0 {
141 n += {
142 let prefix = crate::primitives::array::array_len_prefix_len(
143 (self.node_endpoints).len(),
144 flex,
145 );
146 let body: usize = (self.node_endpoints)
147 .iter()
148 .map(|it| it.encoded_len(version))
149 .sum();
150 prefix + body
151 };
152 }
153 if flex {
154 let known_pairs: Vec<(u32, usize)> = Vec::new();
155 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
156 }
157 n
158 }
159}
160impl<'de> DecodeBorrow<'de> for ShareFetchResponse<'de> {
161 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
162 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
163 return Err(ProtocolError::UnsupportedVersion {
164 api_key: API_KEY,
165 version,
166 });
167 }
168 let flex = is_flexible(version);
169 let mut out = Self::default();
170 if version >= 0 {
171 out.throttle_time_ms = get_i32(buf)?;
172 }
173 if version >= 0 {
174 out.error_code = get_i16(buf)?;
175 }
176 if version >= 0 {
177 out.error_message = if flex {
178 get_compact_nullable_string_borrowed(buf)?
179 } else {
180 get_nullable_string_borrowed(buf)?
181 };
182 }
183 if version >= 1 {
184 out.acquisition_lock_timeout_ms = get_i32(buf)?;
185 }
186 if version >= 0 {
187 out.responses = {
188 let n = crate::primitives::array::get_array_len(buf, flex)?;
189 let mut v = Vec::with_capacity(n);
190 for _ in 0..n {
191 v.push(ShareFetchableTopicResponse::decode_borrow(buf, version)?);
192 }
193 v
194 };
195 }
196 if version >= 0 {
197 out.node_endpoints = {
198 let n = crate::primitives::array::get_array_len(buf, flex)?;
199 let mut v = Vec::with_capacity(n);
200 for _ in 0..n {
201 v.push(NodeEndpoint::decode_borrow(buf, version)?);
202 }
203 v
204 };
205 }
206 if flex {
207 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
208 }
209 Ok(out)
210 }
211}
212#[cfg(test)]
213impl ShareFetchResponse<'_> {
214 #[must_use]
215 pub fn populated(version: i16) -> Self {
216 let mut m = Self::default();
217 if version >= 0 {
218 m.throttle_time_ms = 1i32;
219 }
220 if version >= 0 {
221 m.error_code = 1i16;
222 }
223 if version >= 0 {
224 m.error_message = Some("x");
225 }
226 if version >= 1 {
227 m.acquisition_lock_timeout_ms = 1i32;
228 }
229 if version >= 0 {
230 m.responses = vec![ShareFetchableTopicResponse::populated(version)];
231 }
232 if version >= 0 {
233 m.node_endpoints = vec![NodeEndpoint::populated(version)];
234 }
235 m
236 }
237}
238#[derive(Debug, Clone, PartialEq, Eq, Default)]
239pub struct ShareFetchableTopicResponse<'a> {
240 pub topic_id: crate::primitives::uuid::Uuid,
241 pub partitions: Vec<PartitionData<'a>>,
242 pub unknown_tagged_fields: UnknownTaggedFields,
243}
244impl ShareFetchableTopicResponse<'_> {
245 pub fn to_owned(&self) -> crate::owned::share_fetch_response::ShareFetchableTopicResponse {
246 crate::owned::share_fetch_response::ShareFetchableTopicResponse {
247 topic_id: (self.topic_id),
248 partitions: (self.partitions)
249 .iter()
250 .map(PartitionData::to_owned)
251 .collect(),
252 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
253 }
254 }
255}
256impl Encode for ShareFetchableTopicResponse<'_> {
257 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
258 let flex = version >= 0;
259 if version >= 0 {
260 crate::primitives::uuid::put_uuid(buf, self.topic_id);
261 }
262 if version >= 0 {
263 {
264 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
265 for it in &self.partitions {
266 it.encode(buf, version)?;
267 }
268 }
269 }
270 if flex {
271 let tagged = WriteTaggedFields::new();
272 tagged.write(buf, &self.unknown_tagged_fields);
273 }
274 Ok(())
275 }
276 fn encoded_len(&self, version: i16) -> usize {
277 let flex = version >= 0;
278 let mut n: usize = 0;
279 if version >= 0 {
280 n += 16;
281 }
282 if version >= 0 {
283 n += {
284 let prefix =
285 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
286 let body: usize = (self.partitions)
287 .iter()
288 .map(|it| it.encoded_len(version))
289 .sum();
290 prefix + body
291 };
292 }
293 if flex {
294 let known_pairs: Vec<(u32, usize)> = Vec::new();
295 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
296 }
297 n
298 }
299}
300impl<'de> DecodeBorrow<'de> for ShareFetchableTopicResponse<'de> {
301 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
302 let flex = version >= 0;
303 let mut out = Self::default();
304 if version >= 0 {
305 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
306 }
307 if version >= 0 {
308 out.partitions = {
309 let n = crate::primitives::array::get_array_len(buf, flex)?;
310 let mut v = Vec::with_capacity(n);
311 for _ in 0..n {
312 v.push(PartitionData::decode_borrow(buf, version)?);
313 }
314 v
315 };
316 }
317 if flex {
318 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
319 }
320 Ok(out)
321 }
322}
323#[cfg(test)]
324impl ShareFetchableTopicResponse<'_> {
325 #[must_use]
326 pub fn populated(version: i16) -> Self {
327 let mut m = Self::default();
328 if version >= 0 {
329 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
330 }
331 if version >= 0 {
332 m.partitions = vec![PartitionData::populated(version)];
333 }
334 m
335 }
336}
337#[derive(Debug, Clone, PartialEq, Eq, Default)]
338pub struct PartitionData<'a> {
339 pub partition_index: i32,
340 pub error_code: i16,
341 pub error_message: Option<&'a str>,
342 pub acknowledge_error_code: i16,
343 pub acknowledge_error_message: Option<&'a str>,
344 pub current_leader: LeaderIdAndEpoch,
345 pub records: Option<crate::records::RecordsPayloadBorrowed<'a>>,
346 pub acquired_records: Vec<AcquiredRecords>,
347 pub unknown_tagged_fields: UnknownTaggedFields,
348}
349impl PartitionData<'_> {
350 pub fn to_owned(&self) -> crate::owned::share_fetch_response::PartitionData {
351 crate::owned::share_fetch_response::PartitionData {
352 partition_index: (self.partition_index),
353 error_code: (self.error_code),
354 error_message: (self.error_message).map(std::string::ToString::to_string),
355 acknowledge_error_code: (self.acknowledge_error_code),
356 acknowledge_error_message: (self.acknowledge_error_message)
357 .map(std::string::ToString::to_string),
358 current_leader: (self.current_leader).to_owned(),
359 records: (self.records)
360 .as_ref()
361 .map(|rb| rb.to_owned().expect("records to_owned")),
362 acquired_records: (self.acquired_records)
363 .iter()
364 .map(AcquiredRecords::to_owned)
365 .collect(),
366 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
367 }
368 }
369}
370impl Encode for PartitionData<'_> {
371 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
372 let flex = version >= 0;
373 if version >= 0 {
374 put_i32(buf, self.partition_index);
375 }
376 if version >= 0 {
377 put_i16(buf, self.error_code);
378 }
379 if version >= 0 {
380 if flex {
381 put_compact_nullable_string(buf, self.error_message);
382 } else {
383 put_nullable_string(buf, self.error_message);
384 }
385 }
386 if version >= 0 {
387 put_i16(buf, self.acknowledge_error_code);
388 }
389 if version >= 0 {
390 if flex {
391 put_compact_nullable_string(buf, self.acknowledge_error_message);
392 } else {
393 put_nullable_string(buf, self.acknowledge_error_message);
394 }
395 }
396 if version >= 0 {
397 self.current_leader.encode(buf, version)?;
398 }
399 if version >= 0 {
400 if version <= 0 {
401 match &self.records {
402 None => {
403 if flex {
404 put_compact_nullable_bytes(buf, None);
405 } else {
406 put_nullable_bytes(buf, None);
407 }
408 }
409 Some(__rb) => {
410 let mut __rb_buf = bytes::BytesMut::new();
411 <crate::records::RecordsPayloadBorrowed as crate::Encode>::encode(
412 __rb,
413 &mut __rb_buf,
414 version,
415 )?;
416 if flex {
417 put_compact_bytes(buf, &__rb_buf);
418 } else {
419 put_bytes(buf, &__rb_buf);
420 }
421 }
422 }
423 } else {
424 match &self.records {
425 None => {
426 let __rb_buf = bytes::BytesMut::new();
427 if flex {
428 put_compact_bytes(buf, &__rb_buf);
429 } else {
430 put_bytes(buf, &__rb_buf);
431 }
432 }
433 Some(__rb) => {
434 let mut __rb_buf = bytes::BytesMut::new();
435 <crate::records::RecordsPayloadBorrowed as crate::Encode>::encode(
436 __rb,
437 &mut __rb_buf,
438 version,
439 )?;
440 if flex {
441 put_compact_bytes(buf, &__rb_buf);
442 } else {
443 put_bytes(buf, &__rb_buf);
444 }
445 }
446 }
447 }
448 }
449 if version >= 0 {
450 {
451 crate::primitives::array::put_array_len(buf, (self.acquired_records).len(), flex);
452 for it in &self.acquired_records {
453 it.encode(buf, version)?;
454 }
455 }
456 }
457 if flex {
458 let tagged = WriteTaggedFields::new();
459 tagged.write(buf, &self.unknown_tagged_fields);
460 }
461 Ok(())
462 }
463 fn encoded_len(&self, version: i16) -> usize {
464 let flex = version >= 0;
465 let mut n: usize = 0;
466 if version >= 0 {
467 n += 4;
468 }
469 if version >= 0 {
470 n += 2;
471 }
472 if version >= 0 {
473 n += if flex {
474 compact_nullable_string_len(self.error_message)
475 } else {
476 nullable_string_len(self.error_message)
477 };
478 }
479 if version >= 0 {
480 n += 2;
481 }
482 if version >= 0 {
483 n += if flex {
484 compact_nullable_string_len(self.acknowledge_error_message)
485 } else {
486 nullable_string_len(self.acknowledge_error_message)
487 };
488 }
489 if version >= 0 {
490 n += self.current_leader.encoded_len(version);
491 }
492 if version >= 0 {
493 n += if version <= 0 {
494 match &self.records {
495 None => {
496 if flex {
497 crate::primitives::varint::uvarint_len(0)
498 } else {
499 4
500 }
501 }
502 Some(__rb) => {
503 let __rb_len =
504 <crate::records::RecordsPayloadBorrowed as crate::Encode>::encoded_len(
505 __rb, version,
506 );
507 if flex {
508 crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len)
509 } else {
510 4 + __rb_len
511 }
512 }
513 }
514 } else {
515 match &self.records {
516 None => {
517 if flex {
518 crate::primitives::string_bytes::compact_bytes_len_from_size(0)
519 } else {
520 4
521 }
522 }
523 Some(__rb) => {
524 let __rb_len =
525 <crate::records::RecordsPayloadBorrowed as crate::Encode>::encoded_len(
526 __rb, version,
527 );
528 if flex {
529 crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len)
530 } else {
531 4 + __rb_len
532 }
533 }
534 }
535 };
536 }
537 if version >= 0 {
538 n += {
539 let prefix = crate::primitives::array::array_len_prefix_len(
540 (self.acquired_records).len(),
541 flex,
542 );
543 let body: usize = (self.acquired_records)
544 .iter()
545 .map(|it| it.encoded_len(version))
546 .sum();
547 prefix + body
548 };
549 }
550 if flex {
551 let known_pairs: Vec<(u32, usize)> = Vec::new();
552 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
553 }
554 n
555 }
556}
557impl<'de> DecodeBorrow<'de> for PartitionData<'de> {
558 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
559 let flex = version >= 0;
560 let mut out = Self::default();
561 if version >= 0 {
562 out.partition_index = get_i32(buf)?;
563 }
564 if version >= 0 {
565 out.error_code = get_i16(buf)?;
566 }
567 if version >= 0 {
568 out.error_message = if flex {
569 get_compact_nullable_string_borrowed(buf)?
570 } else {
571 get_nullable_string_borrowed(buf)?
572 };
573 }
574 if version >= 0 {
575 out.acknowledge_error_code = get_i16(buf)?;
576 }
577 if version >= 0 {
578 out.acknowledge_error_message = if flex {
579 get_compact_nullable_string_borrowed(buf)?
580 } else {
581 get_nullable_string_borrowed(buf)?
582 };
583 }
584 if version >= 0 {
585 out.current_leader = LeaderIdAndEpoch::decode_borrow(buf, version)?;
586 }
587 if version >= 0 {
588 out.records = if version <= 0 {
589 {
590 let __rb_opt = if flex {
591 get_compact_nullable_bytes_borrowed(buf)?
592 } else {
593 get_nullable_bytes_borrowed(buf)?
594 };
595 match __rb_opt {
596 None => None,
597 Some(__rb_slice) => {
598 let mut __rb_cur = __rb_slice;
599 Some (< crate :: records :: RecordsPayloadBorrowed as crate :: DecodeBorrow >:: decode_borrow (& mut __rb_cur , version) ?)
600 }
601 }
602 }
603 } else {
604 Some({
605 let __rb_slice = if flex {
606 get_compact_bytes_borrowed(buf)?
607 } else {
608 get_bytes_borrowed(buf)?
609 };
610 let mut __rb_cur = __rb_slice;
611 <crate::records::RecordsPayloadBorrowed as crate::DecodeBorrow>::decode_borrow(
612 &mut __rb_cur,
613 version,
614 )?
615 })
616 };
617 }
618 if version >= 0 {
619 out.acquired_records = {
620 let n = crate::primitives::array::get_array_len(buf, flex)?;
621 let mut v = Vec::with_capacity(n);
622 for _ in 0..n {
623 v.push(AcquiredRecords::decode_borrow(buf, version)?);
624 }
625 v
626 };
627 }
628 if flex {
629 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
630 }
631 Ok(out)
632 }
633}
634#[cfg(test)]
635impl PartitionData<'_> {
636 #[must_use]
637 pub fn populated(version: i16) -> Self {
638 let mut m = Self::default();
639 if version >= 0 {
640 m.partition_index = 1i32;
641 }
642 if version >= 0 {
643 m.error_code = 1i16;
644 }
645 if version >= 0 {
646 m.error_message = Some("x");
647 }
648 if version >= 0 {
649 m.acknowledge_error_code = 1i16;
650 }
651 if version >= 0 {
652 m.acknowledge_error_message = Some("x");
653 }
654 if version >= 0 {
655 m.current_leader = LeaderIdAndEpoch::populated(version);
656 }
657 if version >= 0 {
658 m.acquired_records = vec![AcquiredRecords::populated(version)];
659 }
660 m
661 }
662}
663#[derive(Debug, Clone, PartialEq, Eq, Default)]
664pub struct LeaderIdAndEpoch {
665 pub leader_id: i32,
666 pub leader_epoch: i32,
667 pub unknown_tagged_fields: UnknownTaggedFields,
668}
669impl LeaderIdAndEpoch {
670 pub fn to_owned(&self) -> crate::owned::share_fetch_response::LeaderIdAndEpoch {
671 crate::owned::share_fetch_response::LeaderIdAndEpoch {
672 leader_id: (self.leader_id),
673 leader_epoch: (self.leader_epoch),
674 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
675 }
676 }
677}
678impl Encode for LeaderIdAndEpoch {
679 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
680 let flex = version >= 0;
681 if version >= 0 {
682 put_i32(buf, self.leader_id);
683 }
684 if version >= 0 {
685 put_i32(buf, self.leader_epoch);
686 }
687 if flex {
688 let tagged = WriteTaggedFields::new();
689 tagged.write(buf, &self.unknown_tagged_fields);
690 }
691 Ok(())
692 }
693 fn encoded_len(&self, version: i16) -> usize {
694 let flex = version >= 0;
695 let mut n: usize = 0;
696 if version >= 0 {
697 n += 4;
698 }
699 if version >= 0 {
700 n += 4;
701 }
702 if flex {
703 let known_pairs: Vec<(u32, usize)> = Vec::new();
704 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
705 }
706 n
707 }
708}
709impl<'de> DecodeBorrow<'de> for LeaderIdAndEpoch {
710 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
711 let flex = version >= 0;
712 let mut out = Self::default();
713 if version >= 0 {
714 out.leader_id = get_i32(buf)?;
715 }
716 if version >= 0 {
717 out.leader_epoch = get_i32(buf)?;
718 }
719 if flex {
720 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
721 }
722 Ok(out)
723 }
724}
725#[cfg(test)]
726impl LeaderIdAndEpoch {
727 #[must_use]
728 pub fn populated(version: i16) -> Self {
729 let mut m = Self::default();
730 if version >= 0 {
731 m.leader_id = 1i32;
732 }
733 if version >= 0 {
734 m.leader_epoch = 1i32;
735 }
736 m
737 }
738}
739#[derive(Debug, Clone, PartialEq, Eq, Default)]
740pub struct AcquiredRecords {
741 pub first_offset: i64,
742 pub last_offset: i64,
743 pub delivery_count: i16,
744 pub unknown_tagged_fields: UnknownTaggedFields,
745}
746impl AcquiredRecords {
747 pub fn to_owned(&self) -> crate::owned::share_fetch_response::AcquiredRecords {
748 crate::owned::share_fetch_response::AcquiredRecords {
749 first_offset: (self.first_offset),
750 last_offset: (self.last_offset),
751 delivery_count: (self.delivery_count),
752 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
753 }
754 }
755}
756impl Encode for AcquiredRecords {
757 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
758 let flex = version >= 0;
759 if version >= 0 {
760 put_i64(buf, self.first_offset);
761 }
762 if version >= 0 {
763 put_i64(buf, self.last_offset);
764 }
765 if version >= 0 {
766 put_i16(buf, self.delivery_count);
767 }
768 if flex {
769 let tagged = WriteTaggedFields::new();
770 tagged.write(buf, &self.unknown_tagged_fields);
771 }
772 Ok(())
773 }
774 fn encoded_len(&self, version: i16) -> usize {
775 let flex = version >= 0;
776 let mut n: usize = 0;
777 if version >= 0 {
778 n += 8;
779 }
780 if version >= 0 {
781 n += 8;
782 }
783 if version >= 0 {
784 n += 2;
785 }
786 if flex {
787 let known_pairs: Vec<(u32, usize)> = Vec::new();
788 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
789 }
790 n
791 }
792}
793impl<'de> DecodeBorrow<'de> for AcquiredRecords {
794 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
795 let flex = version >= 0;
796 let mut out = Self::default();
797 if version >= 0 {
798 out.first_offset = get_i64(buf)?;
799 }
800 if version >= 0 {
801 out.last_offset = get_i64(buf)?;
802 }
803 if version >= 0 {
804 out.delivery_count = get_i16(buf)?;
805 }
806 if flex {
807 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
808 }
809 Ok(out)
810 }
811}
812#[cfg(test)]
813impl AcquiredRecords {
814 #[must_use]
815 pub fn populated(version: i16) -> Self {
816 let mut m = Self::default();
817 if version >= 0 {
818 m.first_offset = 1i64;
819 }
820 if version >= 0 {
821 m.last_offset = 1i64;
822 }
823 if version >= 0 {
824 m.delivery_count = 1i16;
825 }
826 m
827 }
828}
829#[derive(Debug, Clone, PartialEq, Eq, Default)]
830pub struct NodeEndpoint<'a> {
831 pub node_id: i32,
832 pub host: &'a str,
833 pub port: i32,
834 pub rack: Option<&'a str>,
835 pub unknown_tagged_fields: UnknownTaggedFields,
836}
837impl NodeEndpoint<'_> {
838 pub fn to_owned(&self) -> crate::owned::share_fetch_response::NodeEndpoint {
839 crate::owned::share_fetch_response::NodeEndpoint {
840 node_id: (self.node_id),
841 host: (self.host).to_string(),
842 port: (self.port),
843 rack: (self.rack).map(std::string::ToString::to_string),
844 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
845 }
846 }
847}
848impl Encode for NodeEndpoint<'_> {
849 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
850 let flex = version >= 0;
851 if version >= 0 {
852 put_i32(buf, self.node_id);
853 }
854 if version >= 0 {
855 if flex {
856 put_compact_string(buf, self.host);
857 } else {
858 put_string(buf, self.host);
859 }
860 }
861 if version >= 0 {
862 put_i32(buf, self.port);
863 }
864 if version >= 0 {
865 if flex {
866 put_compact_nullable_string(buf, self.rack);
867 } else {
868 put_nullable_string(buf, self.rack);
869 }
870 }
871 if flex {
872 let tagged = WriteTaggedFields::new();
873 tagged.write(buf, &self.unknown_tagged_fields);
874 }
875 Ok(())
876 }
877 fn encoded_len(&self, version: i16) -> usize {
878 let flex = version >= 0;
879 let mut n: usize = 0;
880 if version >= 0 {
881 n += 4;
882 }
883 if version >= 0 {
884 n += if flex {
885 compact_string_len(self.host)
886 } else {
887 string_len(self.host)
888 };
889 }
890 if version >= 0 {
891 n += 4;
892 }
893 if version >= 0 {
894 n += if flex {
895 compact_nullable_string_len(self.rack)
896 } else {
897 nullable_string_len(self.rack)
898 };
899 }
900 if flex {
901 let known_pairs: Vec<(u32, usize)> = Vec::new();
902 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
903 }
904 n
905 }
906}
907impl<'de> DecodeBorrow<'de> for NodeEndpoint<'de> {
908 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
909 let flex = version >= 0;
910 let mut out = Self::default();
911 if version >= 0 {
912 out.node_id = get_i32(buf)?;
913 }
914 if version >= 0 {
915 out.host = if flex {
916 get_compact_string_borrowed(buf)?
917 } else {
918 get_string_borrowed(buf)?
919 };
920 }
921 if version >= 0 {
922 out.port = get_i32(buf)?;
923 }
924 if version >= 0 {
925 out.rack = if flex {
926 get_compact_nullable_string_borrowed(buf)?
927 } else {
928 get_nullable_string_borrowed(buf)?
929 };
930 }
931 if flex {
932 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
933 }
934 Ok(out)
935 }
936}
937#[cfg(test)]
938impl NodeEndpoint<'_> {
939 #[must_use]
940 pub fn populated(version: i16) -> Self {
941 let mut m = Self::default();
942 if version >= 0 {
943 m.node_id = 1i32;
944 }
945 if version >= 0 {
946 m.host = "x";
947 }
948 if version >= 0 {
949 m.port = 1i32;
950 }
951 if version >= 0 {
952 m.rack = Some("x");
953 }
954 m
955 }
956}