1use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
3use crate::primitives::string_bytes::{
4 compact_nullable_string_len, compact_string_len, nullable_string_len,
5 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
6};
7use crate::primitives::string_bytes::{
8 put_bytes, put_compact_bytes, put_compact_nullable_bytes, put_nullable_bytes,
9};
10use crate::primitives::string_bytes_borrowed::{
11 get_bytes_borrowed, get_compact_bytes_borrowed, get_compact_nullable_bytes_borrowed,
12 get_nullable_bytes_borrowed,
13};
14use crate::primitives::string_bytes_borrowed::{
15 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
16 get_nullable_string_borrowed, get_string_borrowed,
17};
18use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
19use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
20use bytes::BufMut;
21pub const API_KEY: i16 = 78;
22pub const MIN_VERSION: i16 = 1;
23pub const MAX_VERSION: i16 = 2;
24pub const FLEXIBLE_MIN: i16 = 0;
25#[inline]
26fn is_flexible(version: i16) -> bool {
27 version >= FLEXIBLE_MIN
28}
29#[derive(Debug, Clone, PartialEq, Eq, Default)]
30pub struct ShareFetchResponse<'a> {
31 pub throttle_time_ms: i32,
32 pub error_code: i16,
33 pub error_message: Option<&'a str>,
34 pub acquisition_lock_timeout_ms: i32,
35 pub responses: Vec<ShareFetchableTopicResponse<'a>>,
36 pub node_endpoints: Vec<NodeEndpoint<'a>>,
37 pub unknown_tagged_fields: UnknownTaggedFields,
38}
39impl ShareFetchResponse<'_> {
40 pub fn to_owned(&self) -> crate::owned::share_fetch_response::ShareFetchResponse {
41 crate::owned::share_fetch_response::ShareFetchResponse {
42 throttle_time_ms: (self.throttle_time_ms),
43 error_code: (self.error_code),
44 error_message: (self.error_message).map(std::string::ToString::to_string),
45 acquisition_lock_timeout_ms: (self.acquisition_lock_timeout_ms),
46 responses: (self.responses)
47 .iter()
48 .map(ShareFetchableTopicResponse::to_owned)
49 .collect(),
50 node_endpoints: (self.node_endpoints)
51 .iter()
52 .map(NodeEndpoint::to_owned)
53 .collect(),
54 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
55 }
56 }
57}
58impl Encode for ShareFetchResponse<'_> {
59 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
60 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
61 return Err(ProtocolError::UnsupportedVersion {
62 api_key: API_KEY,
63 version,
64 });
65 }
66 let flex = is_flexible(version);
67 if version >= 0 {
68 put_i32(buf, self.throttle_time_ms);
69 }
70 if version >= 0 {
71 put_i16(buf, self.error_code);
72 }
73 if version >= 0 {
74 if flex {
75 put_compact_nullable_string(buf, self.error_message);
76 } else {
77 put_nullable_string(buf, self.error_message);
78 }
79 }
80 if version >= 1 {
81 put_i32(buf, self.acquisition_lock_timeout_ms);
82 }
83 if version >= 0 {
84 {
85 crate::primitives::array::put_array_len(buf, (self.responses).len(), flex);
86 for it in &self.responses {
87 it.encode(buf, version)?;
88 }
89 }
90 }
91 if version >= 0 {
92 {
93 crate::primitives::array::put_array_len(buf, (self.node_endpoints).len(), flex);
94 for it in &self.node_endpoints {
95 it.encode(buf, version)?;
96 }
97 }
98 }
99 if flex {
100 let tagged = WriteTaggedFields::new();
101 tagged.write(buf, &self.unknown_tagged_fields);
102 }
103 Ok(())
104 }
105 fn encoded_len(&self, version: i16) -> usize {
106 let flex = is_flexible(version);
107 let mut n: usize = 0;
108 if version >= 0 {
109 n += 4;
110 }
111 if version >= 0 {
112 n += 2;
113 }
114 if version >= 0 {
115 n += if flex {
116 compact_nullable_string_len(self.error_message)
117 } else {
118 nullable_string_len(self.error_message)
119 };
120 }
121 if version >= 1 {
122 n += 4;
123 }
124 if version >= 0 {
125 n += {
126 let prefix =
127 crate::primitives::array::array_len_prefix_len((self.responses).len(), flex);
128 let body: usize = (self.responses)
129 .iter()
130 .map(|it| it.encoded_len(version))
131 .sum();
132 prefix + body
133 };
134 }
135 if version >= 0 {
136 n += {
137 let prefix = crate::primitives::array::array_len_prefix_len(
138 (self.node_endpoints).len(),
139 flex,
140 );
141 let body: usize = (self.node_endpoints)
142 .iter()
143 .map(|it| it.encoded_len(version))
144 .sum();
145 prefix + body
146 };
147 }
148 if flex {
149 let known_pairs: Vec<(u32, usize)> = Vec::new();
150 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
151 }
152 n
153 }
154}
155impl<'de> DecodeBorrow<'de> for ShareFetchResponse<'de> {
156 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
157 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
158 return Err(ProtocolError::UnsupportedVersion {
159 api_key: API_KEY,
160 version,
161 });
162 }
163 let flex = is_flexible(version);
164 let mut out = Self::default();
165 if version >= 0 {
166 out.throttle_time_ms = get_i32(buf)?;
167 }
168 if version >= 0 {
169 out.error_code = get_i16(buf)?;
170 }
171 if version >= 0 {
172 out.error_message = if flex {
173 get_compact_nullable_string_borrowed(buf)?
174 } else {
175 get_nullable_string_borrowed(buf)?
176 };
177 }
178 if version >= 1 {
179 out.acquisition_lock_timeout_ms = get_i32(buf)?;
180 }
181 if version >= 0 {
182 out.responses = {
183 let n = crate::primitives::array::get_array_len(buf, flex)?;
184 let mut v = Vec::with_capacity(n);
185 for _ in 0..n {
186 v.push(ShareFetchableTopicResponse::decode_borrow(buf, version)?);
187 }
188 v
189 };
190 }
191 if version >= 0 {
192 out.node_endpoints = {
193 let n = crate::primitives::array::get_array_len(buf, flex)?;
194 let mut v = Vec::with_capacity(n);
195 for _ in 0..n {
196 v.push(NodeEndpoint::decode_borrow(buf, version)?);
197 }
198 v
199 };
200 }
201 if flex {
202 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
203 }
204 Ok(out)
205 }
206}
207#[cfg(test)]
208impl ShareFetchResponse<'_> {
209 #[must_use]
210 pub fn populated(version: i16) -> Self {
211 let mut m = Self::default();
212 if version >= 0 {
213 m.throttle_time_ms = 1i32;
214 }
215 if version >= 0 {
216 m.error_code = 1i16;
217 }
218 if version >= 0 {
219 m.error_message = Some("x");
220 }
221 if version >= 1 {
222 m.acquisition_lock_timeout_ms = 1i32;
223 }
224 if version >= 0 {
225 m.responses = vec![ShareFetchableTopicResponse::populated(version)];
226 }
227 if version >= 0 {
228 m.node_endpoints = vec![NodeEndpoint::populated(version)];
229 }
230 m
231 }
232}
233#[derive(Debug, Clone, PartialEq, Eq, Default)]
234pub struct ShareFetchableTopicResponse<'a> {
235 pub topic_id: crate::primitives::uuid::Uuid,
236 pub partitions: Vec<PartitionData<'a>>,
237 pub unknown_tagged_fields: UnknownTaggedFields,
238}
239impl ShareFetchableTopicResponse<'_> {
240 pub fn to_owned(&self) -> crate::owned::share_fetch_response::ShareFetchableTopicResponse {
241 crate::owned::share_fetch_response::ShareFetchableTopicResponse {
242 topic_id: (self.topic_id),
243 partitions: (self.partitions)
244 .iter()
245 .map(PartitionData::to_owned)
246 .collect(),
247 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
248 }
249 }
250}
251impl Encode for ShareFetchableTopicResponse<'_> {
252 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
253 let flex = version >= 0;
254 if version >= 0 {
255 crate::primitives::uuid::put_uuid(buf, self.topic_id);
256 }
257 if version >= 0 {
258 {
259 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
260 for it in &self.partitions {
261 it.encode(buf, version)?;
262 }
263 }
264 }
265 if flex {
266 let tagged = WriteTaggedFields::new();
267 tagged.write(buf, &self.unknown_tagged_fields);
268 }
269 Ok(())
270 }
271 fn encoded_len(&self, version: i16) -> usize {
272 let flex = version >= 0;
273 let mut n: usize = 0;
274 if version >= 0 {
275 n += 16;
276 }
277 if version >= 0 {
278 n += {
279 let prefix =
280 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
281 let body: usize = (self.partitions)
282 .iter()
283 .map(|it| it.encoded_len(version))
284 .sum();
285 prefix + body
286 };
287 }
288 if flex {
289 let known_pairs: Vec<(u32, usize)> = Vec::new();
290 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
291 }
292 n
293 }
294}
295impl<'de> DecodeBorrow<'de> for ShareFetchableTopicResponse<'de> {
296 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
297 let flex = version >= 0;
298 let mut out = Self::default();
299 if version >= 0 {
300 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
301 }
302 if version >= 0 {
303 out.partitions = {
304 let n = crate::primitives::array::get_array_len(buf, flex)?;
305 let mut v = Vec::with_capacity(n);
306 for _ in 0..n {
307 v.push(PartitionData::decode_borrow(buf, version)?);
308 }
309 v
310 };
311 }
312 if flex {
313 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
314 }
315 Ok(out)
316 }
317}
318#[cfg(test)]
319impl ShareFetchableTopicResponse<'_> {
320 #[must_use]
321 pub fn populated(version: i16) -> Self {
322 let mut m = Self::default();
323 if version >= 0 {
324 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
325 }
326 if version >= 0 {
327 m.partitions = vec![PartitionData::populated(version)];
328 }
329 m
330 }
331}
332#[derive(Debug, Clone, PartialEq, Eq, Default)]
333pub struct PartitionData<'a> {
334 pub partition_index: i32,
335 pub error_code: i16,
336 pub error_message: Option<&'a str>,
337 pub acknowledge_error_code: i16,
338 pub acknowledge_error_message: Option<&'a str>,
339 pub current_leader: LeaderIdAndEpoch,
340 pub records: Option<crate::records::RecordsPayloadBorrowed<'a>>,
341 pub acquired_records: Vec<AcquiredRecords>,
342 pub unknown_tagged_fields: UnknownTaggedFields,
343}
344impl PartitionData<'_> {
345 pub fn to_owned(&self) -> crate::owned::share_fetch_response::PartitionData {
346 crate::owned::share_fetch_response::PartitionData {
347 partition_index: (self.partition_index),
348 error_code: (self.error_code),
349 error_message: (self.error_message).map(std::string::ToString::to_string),
350 acknowledge_error_code: (self.acknowledge_error_code),
351 acknowledge_error_message: (self.acknowledge_error_message)
352 .map(std::string::ToString::to_string),
353 current_leader: (self.current_leader).to_owned(),
354 records: (self.records)
355 .as_ref()
356 .map(|rb| rb.to_owned().expect("records to_owned")),
357 acquired_records: (self.acquired_records)
358 .iter()
359 .map(AcquiredRecords::to_owned)
360 .collect(),
361 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
362 }
363 }
364}
365impl Encode for PartitionData<'_> {
366 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
367 let flex = version >= 0;
368 if version >= 0 {
369 put_i32(buf, self.partition_index);
370 }
371 if version >= 0 {
372 put_i16(buf, self.error_code);
373 }
374 if version >= 0 {
375 if flex {
376 put_compact_nullable_string(buf, self.error_message);
377 } else {
378 put_nullable_string(buf, self.error_message);
379 }
380 }
381 if version >= 0 {
382 put_i16(buf, self.acknowledge_error_code);
383 }
384 if version >= 0 {
385 if flex {
386 put_compact_nullable_string(buf, self.acknowledge_error_message);
387 } else {
388 put_nullable_string(buf, self.acknowledge_error_message);
389 }
390 }
391 if version >= 0 {
392 self.current_leader.encode(buf, version)?;
393 }
394 if version >= 0 {
395 if version <= 0 {
396 match &self.records {
397 None => {
398 if flex {
399 put_compact_nullable_bytes(buf, None);
400 } else {
401 put_nullable_bytes(buf, None);
402 }
403 }
404 Some(__rb) => {
405 let mut __rb_buf = bytes::BytesMut::new();
406 <crate::records::RecordsPayloadBorrowed as crate::Encode>::encode(
407 __rb,
408 &mut __rb_buf,
409 version,
410 )?;
411 if flex {
412 put_compact_bytes(buf, &__rb_buf);
413 } else {
414 put_bytes(buf, &__rb_buf);
415 }
416 }
417 }
418 } else {
419 match &self.records {
420 None => {
421 let __rb_buf = bytes::BytesMut::new();
422 if flex {
423 put_compact_bytes(buf, &__rb_buf);
424 } else {
425 put_bytes(buf, &__rb_buf);
426 }
427 }
428 Some(__rb) => {
429 let mut __rb_buf = bytes::BytesMut::new();
430 <crate::records::RecordsPayloadBorrowed as crate::Encode>::encode(
431 __rb,
432 &mut __rb_buf,
433 version,
434 )?;
435 if flex {
436 put_compact_bytes(buf, &__rb_buf);
437 } else {
438 put_bytes(buf, &__rb_buf);
439 }
440 }
441 }
442 }
443 }
444 if version >= 0 {
445 {
446 crate::primitives::array::put_array_len(buf, (self.acquired_records).len(), flex);
447 for it in &self.acquired_records {
448 it.encode(buf, version)?;
449 }
450 }
451 }
452 if flex {
453 let tagged = WriteTaggedFields::new();
454 tagged.write(buf, &self.unknown_tagged_fields);
455 }
456 Ok(())
457 }
458 fn encoded_len(&self, version: i16) -> usize {
459 let flex = version >= 0;
460 let mut n: usize = 0;
461 if version >= 0 {
462 n += 4;
463 }
464 if version >= 0 {
465 n += 2;
466 }
467 if version >= 0 {
468 n += if flex {
469 compact_nullable_string_len(self.error_message)
470 } else {
471 nullable_string_len(self.error_message)
472 };
473 }
474 if version >= 0 {
475 n += 2;
476 }
477 if version >= 0 {
478 n += if flex {
479 compact_nullable_string_len(self.acknowledge_error_message)
480 } else {
481 nullable_string_len(self.acknowledge_error_message)
482 };
483 }
484 if version >= 0 {
485 n += self.current_leader.encoded_len(version);
486 }
487 if version >= 0 {
488 n += if version <= 0 {
489 match &self.records {
490 None => {
491 if flex {
492 crate::primitives::varint::uvarint_len(0)
493 } else {
494 4
495 }
496 }
497 Some(__rb) => {
498 let __rb_len =
499 <crate::records::RecordsPayloadBorrowed as crate::Encode>::encoded_len(
500 __rb, version,
501 );
502 if flex {
503 crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len)
504 } else {
505 4 + __rb_len
506 }
507 }
508 }
509 } else {
510 match &self.records {
511 None => {
512 if flex {
513 crate::primitives::string_bytes::compact_bytes_len_from_size(0)
514 } else {
515 4
516 }
517 }
518 Some(__rb) => {
519 let __rb_len =
520 <crate::records::RecordsPayloadBorrowed as crate::Encode>::encoded_len(
521 __rb, version,
522 );
523 if flex {
524 crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len)
525 } else {
526 4 + __rb_len
527 }
528 }
529 }
530 };
531 }
532 if version >= 0 {
533 n += {
534 let prefix = crate::primitives::array::array_len_prefix_len(
535 (self.acquired_records).len(),
536 flex,
537 );
538 let body: usize = (self.acquired_records)
539 .iter()
540 .map(|it| it.encoded_len(version))
541 .sum();
542 prefix + body
543 };
544 }
545 if flex {
546 let known_pairs: Vec<(u32, usize)> = Vec::new();
547 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
548 }
549 n
550 }
551}
552impl<'de> DecodeBorrow<'de> for PartitionData<'de> {
553 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
554 let flex = version >= 0;
555 let mut out = Self::default();
556 if version >= 0 {
557 out.partition_index = get_i32(buf)?;
558 }
559 if version >= 0 {
560 out.error_code = get_i16(buf)?;
561 }
562 if version >= 0 {
563 out.error_message = if flex {
564 get_compact_nullable_string_borrowed(buf)?
565 } else {
566 get_nullable_string_borrowed(buf)?
567 };
568 }
569 if version >= 0 {
570 out.acknowledge_error_code = get_i16(buf)?;
571 }
572 if version >= 0 {
573 out.acknowledge_error_message = if flex {
574 get_compact_nullable_string_borrowed(buf)?
575 } else {
576 get_nullable_string_borrowed(buf)?
577 };
578 }
579 if version >= 0 {
580 out.current_leader = LeaderIdAndEpoch::decode_borrow(buf, version)?;
581 }
582 if version >= 0 {
583 out.records = if version <= 0 {
584 {
585 let __rb_opt = if flex {
586 get_compact_nullable_bytes_borrowed(buf)?
587 } else {
588 get_nullable_bytes_borrowed(buf)?
589 };
590 match __rb_opt {
591 None => None,
592 Some(__rb_slice) => {
593 let mut __rb_cur = __rb_slice;
594 Some(
595 <crate::records::RecordsPayloadBorrowed as crate::DecodeBorrow>::decode_borrow(
596 &mut __rb_cur,
597 version,
598 )?,
599 )
600 }
601 }
602 }
603 } else {
604 Some({
605 let __rb_slice = if flex {
606 get_compact_bytes_borrowed(buf)?
607 } else {
608 get_bytes_borrowed(buf)?
609 };
610 let mut __rb_cur = __rb_slice;
611 <crate::records::RecordsPayloadBorrowed as crate::DecodeBorrow>::decode_borrow(
612 &mut __rb_cur,
613 version,
614 )?
615 })
616 };
617 }
618 if version >= 0 {
619 out.acquired_records = {
620 let n = crate::primitives::array::get_array_len(buf, flex)?;
621 let mut v = Vec::with_capacity(n);
622 for _ in 0..n {
623 v.push(AcquiredRecords::decode_borrow(buf, version)?);
624 }
625 v
626 };
627 }
628 if flex {
629 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
630 }
631 Ok(out)
632 }
633}
634#[cfg(test)]
635impl PartitionData<'_> {
636 #[must_use]
637 pub fn populated(version: i16) -> Self {
638 let mut m = Self::default();
639 if version >= 0 {
640 m.partition_index = 1i32;
641 }
642 if version >= 0 {
643 m.error_code = 1i16;
644 }
645 if version >= 0 {
646 m.error_message = Some("x");
647 }
648 if version >= 0 {
649 m.acknowledge_error_code = 1i16;
650 }
651 if version >= 0 {
652 m.acknowledge_error_message = Some("x");
653 }
654 if version >= 0 {
655 m.current_leader = LeaderIdAndEpoch::populated(version);
656 }
657 if version >= 0 {
658 m.acquired_records = vec![AcquiredRecords::populated(version)];
659 }
660 m
661 }
662}
663#[derive(Debug, Clone, PartialEq, Eq, Default)]
664pub struct LeaderIdAndEpoch {
665 pub leader_id: i32,
666 pub leader_epoch: i32,
667 pub unknown_tagged_fields: UnknownTaggedFields,
668}
669impl LeaderIdAndEpoch {
670 pub fn to_owned(&self) -> crate::owned::share_fetch_response::LeaderIdAndEpoch {
671 crate::owned::share_fetch_response::LeaderIdAndEpoch {
672 leader_id: (self.leader_id),
673 leader_epoch: (self.leader_epoch),
674 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
675 }
676 }
677}
678impl Encode for LeaderIdAndEpoch {
679 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
680 let flex = version >= 0;
681 if version >= 0 {
682 put_i32(buf, self.leader_id);
683 }
684 if version >= 0 {
685 put_i32(buf, self.leader_epoch);
686 }
687 if flex {
688 let tagged = WriteTaggedFields::new();
689 tagged.write(buf, &self.unknown_tagged_fields);
690 }
691 Ok(())
692 }
693 fn encoded_len(&self, version: i16) -> usize {
694 let flex = version >= 0;
695 let mut n: usize = 0;
696 if version >= 0 {
697 n += 4;
698 }
699 if version >= 0 {
700 n += 4;
701 }
702 if flex {
703 let known_pairs: Vec<(u32, usize)> = Vec::new();
704 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
705 }
706 n
707 }
708}
709impl<'de> DecodeBorrow<'de> for LeaderIdAndEpoch {
710 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
711 let flex = version >= 0;
712 let mut out = Self::default();
713 if version >= 0 {
714 out.leader_id = get_i32(buf)?;
715 }
716 if version >= 0 {
717 out.leader_epoch = get_i32(buf)?;
718 }
719 if flex {
720 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
721 }
722 Ok(out)
723 }
724}
725#[cfg(test)]
726impl LeaderIdAndEpoch {
727 #[must_use]
728 pub fn populated(version: i16) -> Self {
729 let mut m = Self::default();
730 if version >= 0 {
731 m.leader_id = 1i32;
732 }
733 if version >= 0 {
734 m.leader_epoch = 1i32;
735 }
736 m
737 }
738}
739#[derive(Debug, Clone, PartialEq, Eq, Default)]
740pub struct AcquiredRecords {
741 pub first_offset: i64,
742 pub last_offset: i64,
743 pub delivery_count: i16,
744 pub unknown_tagged_fields: UnknownTaggedFields,
745}
746impl AcquiredRecords {
747 pub fn to_owned(&self) -> crate::owned::share_fetch_response::AcquiredRecords {
748 crate::owned::share_fetch_response::AcquiredRecords {
749 first_offset: (self.first_offset),
750 last_offset: (self.last_offset),
751 delivery_count: (self.delivery_count),
752 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
753 }
754 }
755}
756impl Encode for AcquiredRecords {
757 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
758 let flex = version >= 0;
759 if version >= 0 {
760 put_i64(buf, self.first_offset);
761 }
762 if version >= 0 {
763 put_i64(buf, self.last_offset);
764 }
765 if version >= 0 {
766 put_i16(buf, self.delivery_count);
767 }
768 if flex {
769 let tagged = WriteTaggedFields::new();
770 tagged.write(buf, &self.unknown_tagged_fields);
771 }
772 Ok(())
773 }
774 fn encoded_len(&self, version: i16) -> usize {
775 let flex = version >= 0;
776 let mut n: usize = 0;
777 if version >= 0 {
778 n += 8;
779 }
780 if version >= 0 {
781 n += 8;
782 }
783 if version >= 0 {
784 n += 2;
785 }
786 if flex {
787 let known_pairs: Vec<(u32, usize)> = Vec::new();
788 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
789 }
790 n
791 }
792}
793impl<'de> DecodeBorrow<'de> for AcquiredRecords {
794 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
795 let flex = version >= 0;
796 let mut out = Self::default();
797 if version >= 0 {
798 out.first_offset = get_i64(buf)?;
799 }
800 if version >= 0 {
801 out.last_offset = get_i64(buf)?;
802 }
803 if version >= 0 {
804 out.delivery_count = get_i16(buf)?;
805 }
806 if flex {
807 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
808 }
809 Ok(out)
810 }
811}
812#[cfg(test)]
813impl AcquiredRecords {
814 #[must_use]
815 pub fn populated(version: i16) -> Self {
816 let mut m = Self::default();
817 if version >= 0 {
818 m.first_offset = 1i64;
819 }
820 if version >= 0 {
821 m.last_offset = 1i64;
822 }
823 if version >= 0 {
824 m.delivery_count = 1i16;
825 }
826 m
827 }
828}
829#[derive(Debug, Clone, PartialEq, Eq, Default)]
830pub struct NodeEndpoint<'a> {
831 pub node_id: i32,
832 pub host: &'a str,
833 pub port: i32,
834 pub rack: Option<&'a str>,
835 pub unknown_tagged_fields: UnknownTaggedFields,
836}
837impl NodeEndpoint<'_> {
838 pub fn to_owned(&self) -> crate::owned::share_fetch_response::NodeEndpoint {
839 crate::owned::share_fetch_response::NodeEndpoint {
840 node_id: (self.node_id),
841 host: (self.host).to_string(),
842 port: (self.port),
843 rack: (self.rack).map(std::string::ToString::to_string),
844 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
845 }
846 }
847}
848impl Encode for NodeEndpoint<'_> {
849 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
850 let flex = version >= 0;
851 if version >= 0 {
852 put_i32(buf, self.node_id);
853 }
854 if version >= 0 {
855 if flex {
856 put_compact_string(buf, self.host);
857 } else {
858 put_string(buf, self.host);
859 }
860 }
861 if version >= 0 {
862 put_i32(buf, self.port);
863 }
864 if version >= 0 {
865 if flex {
866 put_compact_nullable_string(buf, self.rack);
867 } else {
868 put_nullable_string(buf, self.rack);
869 }
870 }
871 if flex {
872 let tagged = WriteTaggedFields::new();
873 tagged.write(buf, &self.unknown_tagged_fields);
874 }
875 Ok(())
876 }
877 fn encoded_len(&self, version: i16) -> usize {
878 let flex = version >= 0;
879 let mut n: usize = 0;
880 if version >= 0 {
881 n += 4;
882 }
883 if version >= 0 {
884 n += if flex {
885 compact_string_len(self.host)
886 } else {
887 string_len(self.host)
888 };
889 }
890 if version >= 0 {
891 n += 4;
892 }
893 if version >= 0 {
894 n += if flex {
895 compact_nullable_string_len(self.rack)
896 } else {
897 nullable_string_len(self.rack)
898 };
899 }
900 if flex {
901 let known_pairs: Vec<(u32, usize)> = Vec::new();
902 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
903 }
904 n
905 }
906}
907impl<'de> DecodeBorrow<'de> for NodeEndpoint<'de> {
908 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
909 let flex = version >= 0;
910 let mut out = Self::default();
911 if version >= 0 {
912 out.node_id = get_i32(buf)?;
913 }
914 if version >= 0 {
915 out.host = if flex {
916 get_compact_string_borrowed(buf)?
917 } else {
918 get_string_borrowed(buf)?
919 };
920 }
921 if version >= 0 {
922 out.port = get_i32(buf)?;
923 }
924 if version >= 0 {
925 out.rack = if flex {
926 get_compact_nullable_string_borrowed(buf)?
927 } else {
928 get_nullable_string_borrowed(buf)?
929 };
930 }
931 if flex {
932 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
933 }
934 Ok(out)
935 }
936}
937#[cfg(test)]
938impl NodeEndpoint<'_> {
939 #[must_use]
940 pub fn populated(version: i16) -> Self {
941 let mut m = Self::default();
942 if version >= 0 {
943 m.node_id = 1i32;
944 }
945 if version >= 0 {
946 m.host = "x";
947 }
948 if version >= 0 {
949 m.port = 1i32;
950 }
951 if version >= 0 {
952 m.rack = Some("x");
953 }
954 m
955 }
956}