1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i16, get_i32, put_i16, put_i32};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
9};
10use crate::primitives::string_bytes_borrowed::{
11 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
12 get_nullable_string_borrowed, get_string_borrowed,
13};
14use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
15use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
16
17pub const API_KEY: i16 = 79;
18pub const MIN_VERSION: i16 = 1;
19pub const MAX_VERSION: i16 = 2;
20pub const FLEXIBLE_MIN: i16 = 0;
21
22#[inline]
23fn is_flexible(version: i16) -> bool {
24 version >= FLEXIBLE_MIN
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Default)]
28pub struct ShareAcknowledgeResponse<'a> {
29 pub throttle_time_ms: i32,
30 pub error_code: i16,
31 pub error_message: Option<&'a str>,
32 pub acquisition_lock_timeout_ms: i32,
33 pub responses: Vec<ShareAcknowledgeTopicResponse<'a>>,
34 pub node_endpoints: Vec<NodeEndpoint<'a>>,
35 pub unknown_tagged_fields: UnknownTaggedFields,
36}
37impl ShareAcknowledgeResponse<'_> {
38 pub fn to_owned(&self) -> crate::owned::share_acknowledge_response::ShareAcknowledgeResponse {
39 crate::owned::share_acknowledge_response::ShareAcknowledgeResponse {
40 throttle_time_ms: (self.throttle_time_ms),
41 error_code: (self.error_code),
42 error_message: (self.error_message).map(std::string::ToString::to_string),
43 acquisition_lock_timeout_ms: (self.acquisition_lock_timeout_ms),
44 responses: (self.responses)
45 .iter()
46 .map(ShareAcknowledgeTopicResponse::to_owned)
47 .collect(),
48 node_endpoints: (self.node_endpoints)
49 .iter()
50 .map(NodeEndpoint::to_owned)
51 .collect(),
52 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
53 }
54 }
55}
56impl Encode for ShareAcknowledgeResponse<'_> {
57 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
58 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
59 return Err(ProtocolError::UnsupportedVersion {
60 api_key: API_KEY,
61 version,
62 });
63 }
64 let flex = is_flexible(version);
65 if version >= 0 {
66 put_i32(buf, self.throttle_time_ms);
67 }
68 if version >= 0 {
69 put_i16(buf, self.error_code);
70 }
71 if version >= 0 {
72 if flex {
73 put_compact_nullable_string(buf, self.error_message);
74 } else {
75 put_nullable_string(buf, self.error_message);
76 }
77 }
78 if version >= 2 {
79 put_i32(buf, self.acquisition_lock_timeout_ms);
80 }
81 if version >= 0 {
82 {
83 crate::primitives::array::put_array_len(buf, (self.responses).len(), flex);
84 for it in &self.responses {
85 it.encode(buf, version)?;
86 }
87 }
88 }
89 if version >= 0 {
90 {
91 crate::primitives::array::put_array_len(buf, (self.node_endpoints).len(), flex);
92 for it in &self.node_endpoints {
93 it.encode(buf, version)?;
94 }
95 }
96 }
97 if flex {
98 let tagged = WriteTaggedFields::new();
99 tagged.write(buf, &self.unknown_tagged_fields);
100 }
101 Ok(())
102 }
103 fn encoded_len(&self, version: i16) -> usize {
104 let flex = is_flexible(version);
105 let mut n: usize = 0;
106 if version >= 0 {
107 n += 4;
108 }
109 if version >= 0 {
110 n += 2;
111 }
112 if version >= 0 {
113 n += if flex {
114 compact_nullable_string_len(self.error_message)
115 } else {
116 nullable_string_len(self.error_message)
117 };
118 }
119 if version >= 2 {
120 n += 4;
121 }
122 if version >= 0 {
123 n += {
124 let prefix =
125 crate::primitives::array::array_len_prefix_len((self.responses).len(), flex);
126 let body: usize = (self.responses)
127 .iter()
128 .map(|it| it.encoded_len(version))
129 .sum();
130 prefix + body
131 };
132 }
133 if version >= 0 {
134 n += {
135 let prefix = crate::primitives::array::array_len_prefix_len(
136 (self.node_endpoints).len(),
137 flex,
138 );
139 let body: usize = (self.node_endpoints)
140 .iter()
141 .map(|it| it.encoded_len(version))
142 .sum();
143 prefix + body
144 };
145 }
146 if flex {
147 let known_pairs: Vec<(u32, usize)> = Vec::new();
148 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
149 }
150 n
151 }
152}
153impl<'de> DecodeBorrow<'de> for ShareAcknowledgeResponse<'de> {
154 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
155 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
156 return Err(ProtocolError::UnsupportedVersion {
157 api_key: API_KEY,
158 version,
159 });
160 }
161 let flex = is_flexible(version);
162 let mut out = Self::default();
163 if version >= 0 {
164 out.throttle_time_ms = get_i32(buf)?;
165 }
166 if version >= 0 {
167 out.error_code = get_i16(buf)?;
168 }
169 if version >= 0 {
170 out.error_message = if flex {
171 get_compact_nullable_string_borrowed(buf)?
172 } else {
173 get_nullable_string_borrowed(buf)?
174 };
175 }
176 if version >= 2 {
177 out.acquisition_lock_timeout_ms = get_i32(buf)?;
178 }
179 if version >= 0 {
180 out.responses = {
181 let n = crate::primitives::array::get_array_len(buf, flex)?;
182 let mut v = Vec::with_capacity(n);
183 for _ in 0..n {
184 v.push(ShareAcknowledgeTopicResponse::decode_borrow(buf, version)?);
185 }
186 v
187 };
188 }
189 if version >= 0 {
190 out.node_endpoints = {
191 let n = crate::primitives::array::get_array_len(buf, flex)?;
192 let mut v = Vec::with_capacity(n);
193 for _ in 0..n {
194 v.push(NodeEndpoint::decode_borrow(buf, version)?);
195 }
196 v
197 };
198 }
199 if flex {
200 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
201 }
202 Ok(out)
203 }
204}
205#[cfg(test)]
206impl ShareAcknowledgeResponse<'_> {
207 #[must_use]
208 pub fn populated(version: i16) -> Self {
209 let mut m = Self::default();
210 if version >= 0 {
211 m.throttle_time_ms = 1i32;
212 }
213 if version >= 0 {
214 m.error_code = 1i16;
215 }
216 if version >= 0 {
217 m.error_message = Some("x");
218 }
219 if version >= 2 {
220 m.acquisition_lock_timeout_ms = 1i32;
221 }
222 if version >= 0 {
223 m.responses = vec![ShareAcknowledgeTopicResponse::populated(version)];
224 }
225 if version >= 0 {
226 m.node_endpoints = vec![NodeEndpoint::populated(version)];
227 }
228 m
229 }
230}
231#[derive(Debug, Clone, PartialEq, Eq, Default)]
232pub struct ShareAcknowledgeTopicResponse<'a> {
233 pub topic_id: crate::primitives::uuid::Uuid,
234 pub partitions: Vec<PartitionData<'a>>,
235 pub unknown_tagged_fields: UnknownTaggedFields,
236}
237impl ShareAcknowledgeTopicResponse<'_> {
238 pub fn to_owned(
239 &self,
240 ) -> crate::owned::share_acknowledge_response::ShareAcknowledgeTopicResponse {
241 crate::owned::share_acknowledge_response::ShareAcknowledgeTopicResponse {
242 topic_id: (self.topic_id),
243 partitions: (self.partitions)
244 .iter()
245 .map(PartitionData::to_owned)
246 .collect(),
247 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
248 }
249 }
250}
251impl Encode for ShareAcknowledgeTopicResponse<'_> {
252 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
253 let flex = version >= 0;
254 if version >= 0 {
255 crate::primitives::uuid::put_uuid(buf, self.topic_id);
256 }
257 if version >= 0 {
258 {
259 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
260 for it in &self.partitions {
261 it.encode(buf, version)?;
262 }
263 }
264 }
265 if flex {
266 let tagged = WriteTaggedFields::new();
267 tagged.write(buf, &self.unknown_tagged_fields);
268 }
269 Ok(())
270 }
271 fn encoded_len(&self, version: i16) -> usize {
272 let flex = version >= 0;
273 let mut n: usize = 0;
274 if version >= 0 {
275 n += 16;
276 }
277 if version >= 0 {
278 n += {
279 let prefix =
280 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
281 let body: usize = (self.partitions)
282 .iter()
283 .map(|it| it.encoded_len(version))
284 .sum();
285 prefix + body
286 };
287 }
288 if flex {
289 let known_pairs: Vec<(u32, usize)> = Vec::new();
290 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
291 }
292 n
293 }
294}
295impl<'de> DecodeBorrow<'de> for ShareAcknowledgeTopicResponse<'de> {
296 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
297 let flex = version >= 0;
298 let mut out = Self::default();
299 if version >= 0 {
300 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
301 }
302 if version >= 0 {
303 out.partitions = {
304 let n = crate::primitives::array::get_array_len(buf, flex)?;
305 let mut v = Vec::with_capacity(n);
306 for _ in 0..n {
307 v.push(PartitionData::decode_borrow(buf, version)?);
308 }
309 v
310 };
311 }
312 if flex {
313 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
314 }
315 Ok(out)
316 }
317}
318#[cfg(test)]
319impl ShareAcknowledgeTopicResponse<'_> {
320 #[must_use]
321 pub fn populated(version: i16) -> Self {
322 let mut m = Self::default();
323 if version >= 0 {
324 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
325 }
326 if version >= 0 {
327 m.partitions = vec![PartitionData::populated(version)];
328 }
329 m
330 }
331}
332#[derive(Debug, Clone, PartialEq, Eq, Default)]
333pub struct PartitionData<'a> {
334 pub partition_index: i32,
335 pub error_code: i16,
336 pub error_message: Option<&'a str>,
337 pub current_leader: LeaderIdAndEpoch,
338 pub unknown_tagged_fields: UnknownTaggedFields,
339}
340impl PartitionData<'_> {
341 pub fn to_owned(&self) -> crate::owned::share_acknowledge_response::PartitionData {
342 crate::owned::share_acknowledge_response::PartitionData {
343 partition_index: (self.partition_index),
344 error_code: (self.error_code),
345 error_message: (self.error_message).map(std::string::ToString::to_string),
346 current_leader: (self.current_leader).to_owned(),
347 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
348 }
349 }
350}
351impl Encode for PartitionData<'_> {
352 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
353 let flex = version >= 0;
354 if version >= 0 {
355 put_i32(buf, self.partition_index);
356 }
357 if version >= 0 {
358 put_i16(buf, self.error_code);
359 }
360 if version >= 0 {
361 if flex {
362 put_compact_nullable_string(buf, self.error_message);
363 } else {
364 put_nullable_string(buf, self.error_message);
365 }
366 }
367 if version >= 0 {
368 self.current_leader.encode(buf, version)?;
369 }
370 if flex {
371 let tagged = WriteTaggedFields::new();
372 tagged.write(buf, &self.unknown_tagged_fields);
373 }
374 Ok(())
375 }
376 fn encoded_len(&self, version: i16) -> usize {
377 let flex = version >= 0;
378 let mut n: usize = 0;
379 if version >= 0 {
380 n += 4;
381 }
382 if version >= 0 {
383 n += 2;
384 }
385 if version >= 0 {
386 n += if flex {
387 compact_nullable_string_len(self.error_message)
388 } else {
389 nullable_string_len(self.error_message)
390 };
391 }
392 if version >= 0 {
393 n += self.current_leader.encoded_len(version);
394 }
395 if flex {
396 let known_pairs: Vec<(u32, usize)> = Vec::new();
397 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
398 }
399 n
400 }
401}
402impl<'de> DecodeBorrow<'de> for PartitionData<'de> {
403 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
404 let flex = version >= 0;
405 let mut out = Self::default();
406 if version >= 0 {
407 out.partition_index = get_i32(buf)?;
408 }
409 if version >= 0 {
410 out.error_code = get_i16(buf)?;
411 }
412 if version >= 0 {
413 out.error_message = if flex {
414 get_compact_nullable_string_borrowed(buf)?
415 } else {
416 get_nullable_string_borrowed(buf)?
417 };
418 }
419 if version >= 0 {
420 out.current_leader = LeaderIdAndEpoch::decode_borrow(buf, version)?;
421 }
422 if flex {
423 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
424 }
425 Ok(out)
426 }
427}
428#[cfg(test)]
429impl PartitionData<'_> {
430 #[must_use]
431 pub fn populated(version: i16) -> Self {
432 let mut m = Self::default();
433 if version >= 0 {
434 m.partition_index = 1i32;
435 }
436 if version >= 0 {
437 m.error_code = 1i16;
438 }
439 if version >= 0 {
440 m.error_message = Some("x");
441 }
442 if version >= 0 {
443 m.current_leader = LeaderIdAndEpoch::populated(version);
444 }
445 m
446 }
447}
448#[derive(Debug, Clone, PartialEq, Eq, Default)]
449pub struct LeaderIdAndEpoch {
450 pub leader_id: i32,
451 pub leader_epoch: i32,
452 pub unknown_tagged_fields: UnknownTaggedFields,
453}
454impl LeaderIdAndEpoch {
455 pub fn to_owned(&self) -> crate::owned::share_acknowledge_response::LeaderIdAndEpoch {
456 crate::owned::share_acknowledge_response::LeaderIdAndEpoch {
457 leader_id: (self.leader_id),
458 leader_epoch: (self.leader_epoch),
459 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
460 }
461 }
462}
463impl Encode for LeaderIdAndEpoch {
464 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
465 let flex = version >= 0;
466 if version >= 0 {
467 put_i32(buf, self.leader_id);
468 }
469 if version >= 0 {
470 put_i32(buf, self.leader_epoch);
471 }
472 if flex {
473 let tagged = WriteTaggedFields::new();
474 tagged.write(buf, &self.unknown_tagged_fields);
475 }
476 Ok(())
477 }
478 fn encoded_len(&self, version: i16) -> usize {
479 let flex = version >= 0;
480 let mut n: usize = 0;
481 if version >= 0 {
482 n += 4;
483 }
484 if version >= 0 {
485 n += 4;
486 }
487 if flex {
488 let known_pairs: Vec<(u32, usize)> = Vec::new();
489 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
490 }
491 n
492 }
493}
494impl<'de> DecodeBorrow<'de> for LeaderIdAndEpoch {
495 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
496 let flex = version >= 0;
497 let mut out = Self::default();
498 if version >= 0 {
499 out.leader_id = get_i32(buf)?;
500 }
501 if version >= 0 {
502 out.leader_epoch = get_i32(buf)?;
503 }
504 if flex {
505 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
506 }
507 Ok(out)
508 }
509}
510#[cfg(test)]
511impl LeaderIdAndEpoch {
512 #[must_use]
513 pub fn populated(version: i16) -> Self {
514 let mut m = Self::default();
515 if version >= 0 {
516 m.leader_id = 1i32;
517 }
518 if version >= 0 {
519 m.leader_epoch = 1i32;
520 }
521 m
522 }
523}
524#[derive(Debug, Clone, PartialEq, Eq, Default)]
525pub struct NodeEndpoint<'a> {
526 pub node_id: i32,
527 pub host: &'a str,
528 pub port: i32,
529 pub rack: Option<&'a str>,
530 pub unknown_tagged_fields: UnknownTaggedFields,
531}
532impl NodeEndpoint<'_> {
533 pub fn to_owned(&self) -> crate::owned::share_acknowledge_response::NodeEndpoint {
534 crate::owned::share_acknowledge_response::NodeEndpoint {
535 node_id: (self.node_id),
536 host: (self.host).to_string(),
537 port: (self.port),
538 rack: (self.rack).map(std::string::ToString::to_string),
539 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
540 }
541 }
542}
543impl Encode for NodeEndpoint<'_> {
544 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
545 let flex = version >= 0;
546 if version >= 0 {
547 put_i32(buf, self.node_id);
548 }
549 if version >= 0 {
550 if flex {
551 put_compact_string(buf, self.host);
552 } else {
553 put_string(buf, self.host);
554 }
555 }
556 if version >= 0 {
557 put_i32(buf, self.port);
558 }
559 if version >= 0 {
560 if flex {
561 put_compact_nullable_string(buf, self.rack);
562 } else {
563 put_nullable_string(buf, self.rack);
564 }
565 }
566 if flex {
567 let tagged = WriteTaggedFields::new();
568 tagged.write(buf, &self.unknown_tagged_fields);
569 }
570 Ok(())
571 }
572 fn encoded_len(&self, version: i16) -> usize {
573 let flex = version >= 0;
574 let mut n: usize = 0;
575 if version >= 0 {
576 n += 4;
577 }
578 if version >= 0 {
579 n += if flex {
580 compact_string_len(self.host)
581 } else {
582 string_len(self.host)
583 };
584 }
585 if version >= 0 {
586 n += 4;
587 }
588 if version >= 0 {
589 n += if flex {
590 compact_nullable_string_len(self.rack)
591 } else {
592 nullable_string_len(self.rack)
593 };
594 }
595 if flex {
596 let known_pairs: Vec<(u32, usize)> = Vec::new();
597 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
598 }
599 n
600 }
601}
602impl<'de> DecodeBorrow<'de> for NodeEndpoint<'de> {
603 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
604 let flex = version >= 0;
605 let mut out = Self::default();
606 if version >= 0 {
607 out.node_id = get_i32(buf)?;
608 }
609 if version >= 0 {
610 out.host = if flex {
611 get_compact_string_borrowed(buf)?
612 } else {
613 get_string_borrowed(buf)?
614 };
615 }
616 if version >= 0 {
617 out.port = get_i32(buf)?;
618 }
619 if version >= 0 {
620 out.rack = if flex {
621 get_compact_nullable_string_borrowed(buf)?
622 } else {
623 get_nullable_string_borrowed(buf)?
624 };
625 }
626 if flex {
627 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
628 }
629 Ok(out)
630 }
631}
632#[cfg(test)]
633impl NodeEndpoint<'_> {
634 #[must_use]
635 pub fn populated(version: i16) -> Self {
636 let mut m = Self::default();
637 if version >= 0 {
638 m.node_id = 1i32;
639 }
640 if version >= 0 {
641 m.host = "x";
642 }
643 if version >= 0 {
644 m.port = 1i32;
645 }
646 if version >= 0 {
647 m.rack = Some("x");
648 }
649 m
650 }
651}