1use crate::primitives::fixed::{get_i16, get_i32, put_i16, put_i32};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
10use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
11use bytes::{Buf, BufMut};
12pub const API_KEY: i16 = 79;
13pub const MIN_VERSION: i16 = 1;
14pub const MAX_VERSION: i16 = 2;
15pub const FLEXIBLE_MIN: i16 = 0;
16#[inline]
17fn is_flexible(version: i16) -> bool {
18 version >= FLEXIBLE_MIN
19}
20#[derive(Debug, Clone, PartialEq, Eq, Default)]
21pub struct ShareAcknowledgeResponse {
22 pub throttle_time_ms: i32,
23 pub error_code: i16,
24 pub error_message: Option<String>,
25 pub acquisition_lock_timeout_ms: i32,
26 pub responses: Vec<ShareAcknowledgeTopicResponse>,
27 pub node_endpoints: Vec<NodeEndpoint>,
28 pub unknown_tagged_fields: UnknownTaggedFields,
29}
30impl Encode for ShareAcknowledgeResponse {
31 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
32 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
33 return Err(ProtocolError::UnsupportedVersion {
34 api_key: API_KEY,
35 version,
36 });
37 }
38 let flex = is_flexible(version);
39 if version >= 0 {
40 put_i32(buf, self.throttle_time_ms);
41 }
42 if version >= 0 {
43 put_i16(buf, self.error_code);
44 }
45 if version >= 0 {
46 if flex {
47 put_compact_nullable_string(buf, self.error_message.as_deref());
48 } else {
49 put_nullable_string(buf, self.error_message.as_deref());
50 }
51 }
52 if version >= 2 {
53 put_i32(buf, self.acquisition_lock_timeout_ms);
54 }
55 if version >= 0 {
56 {
57 crate::primitives::array::put_array_len(buf, (self.responses).len(), flex);
58 for it in &self.responses {
59 it.encode(buf, version)?;
60 }
61 }
62 }
63 if version >= 0 {
64 {
65 crate::primitives::array::put_array_len(buf, (self.node_endpoints).len(), flex);
66 for it in &self.node_endpoints {
67 it.encode(buf, version)?;
68 }
69 }
70 }
71 if flex {
72 let tagged = WriteTaggedFields::new();
73 tagged.write(buf, &self.unknown_tagged_fields);
74 }
75 Ok(())
76 }
77 fn encoded_len(&self, version: i16) -> usize {
78 let flex = is_flexible(version);
79 let mut n: usize = 0;
80 if version >= 0 {
81 n += 4;
82 }
83 if version >= 0 {
84 n += 2;
85 }
86 if version >= 0 {
87 n += if flex {
88 compact_nullable_string_len(self.error_message.as_deref())
89 } else {
90 nullable_string_len(self.error_message.as_deref())
91 };
92 }
93 if version >= 2 {
94 n += 4;
95 }
96 if version >= 0 {
97 n += {
98 let prefix =
99 crate::primitives::array::array_len_prefix_len((self.responses).len(), flex);
100 let body: usize = (self.responses)
101 .iter()
102 .map(|it| it.encoded_len(version))
103 .sum();
104 prefix + body
105 };
106 }
107 if version >= 0 {
108 n += {
109 let prefix = crate::primitives::array::array_len_prefix_len(
110 (self.node_endpoints).len(),
111 flex,
112 );
113 let body: usize = (self.node_endpoints)
114 .iter()
115 .map(|it| it.encoded_len(version))
116 .sum();
117 prefix + body
118 };
119 }
120 if flex {
121 let known_pairs: Vec<(u32, usize)> = Vec::new();
122 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
123 }
124 n
125 }
126}
127impl Decode<'_> for ShareAcknowledgeResponse {
128 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
129 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
130 return Err(ProtocolError::UnsupportedVersion {
131 api_key: API_KEY,
132 version,
133 });
134 }
135 let flex = is_flexible(version);
136 let mut out = Self::default();
137 if version >= 0 {
138 out.throttle_time_ms = get_i32(buf)?;
139 }
140 if version >= 0 {
141 out.error_code = get_i16(buf)?;
142 }
143 if version >= 0 {
144 out.error_message = if flex {
145 get_compact_nullable_string_owned(buf)?
146 } else {
147 get_nullable_string_owned(buf)?
148 };
149 }
150 if version >= 2 {
151 out.acquisition_lock_timeout_ms = get_i32(buf)?;
152 }
153 if version >= 0 {
154 out.responses = {
155 let n = crate::primitives::array::get_array_len(buf, flex)?;
156 let mut v = Vec::with_capacity(n);
157 for _ in 0..n {
158 v.push(ShareAcknowledgeTopicResponse::decode(buf, version)?);
159 }
160 v
161 };
162 }
163 if version >= 0 {
164 out.node_endpoints = {
165 let n = crate::primitives::array::get_array_len(buf, flex)?;
166 let mut v = Vec::with_capacity(n);
167 for _ in 0..n {
168 v.push(NodeEndpoint::decode(buf, version)?);
169 }
170 v
171 };
172 }
173 if flex {
174 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
175 }
176 Ok(out)
177 }
178}
179#[cfg(test)]
180impl ShareAcknowledgeResponse {
181 #[must_use]
182 pub fn populated(version: i16) -> Self {
183 let mut m = Self::default();
184 if version >= 0 {
185 m.throttle_time_ms = 1i32;
186 }
187 if version >= 0 {
188 m.error_code = 1i16;
189 }
190 if version >= 0 {
191 m.error_message = Some("x".to_string());
192 }
193 if version >= 2 {
194 m.acquisition_lock_timeout_ms = 1i32;
195 }
196 if version >= 0 {
197 m.responses = vec![ShareAcknowledgeTopicResponse::populated(version)];
198 }
199 if version >= 0 {
200 m.node_endpoints = vec![NodeEndpoint::populated(version)];
201 }
202 m
203 }
204}
205#[derive(Debug, Clone, PartialEq, Eq, Default)]
206pub struct ShareAcknowledgeTopicResponse {
207 pub topic_id: crate::primitives::uuid::Uuid,
208 pub partitions: Vec<PartitionData>,
209 pub unknown_tagged_fields: UnknownTaggedFields,
210}
211impl Encode for ShareAcknowledgeTopicResponse {
212 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
213 let flex = version >= 0;
214 if version >= 0 {
215 crate::primitives::uuid::put_uuid(buf, self.topic_id);
216 }
217 if version >= 0 {
218 {
219 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
220 for it in &self.partitions {
221 it.encode(buf, version)?;
222 }
223 }
224 }
225 if flex {
226 let tagged = WriteTaggedFields::new();
227 tagged.write(buf, &self.unknown_tagged_fields);
228 }
229 Ok(())
230 }
231 fn encoded_len(&self, version: i16) -> usize {
232 let flex = version >= 0;
233 let mut n: usize = 0;
234 if version >= 0 {
235 n += 16;
236 }
237 if version >= 0 {
238 n += {
239 let prefix =
240 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
241 let body: usize = (self.partitions)
242 .iter()
243 .map(|it| it.encoded_len(version))
244 .sum();
245 prefix + body
246 };
247 }
248 if flex {
249 let known_pairs: Vec<(u32, usize)> = Vec::new();
250 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
251 }
252 n
253 }
254}
255impl Decode<'_> for ShareAcknowledgeTopicResponse {
256 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
257 let flex = version >= 0;
258 let mut out = Self::default();
259 if version >= 0 {
260 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
261 }
262 if version >= 0 {
263 out.partitions = {
264 let n = crate::primitives::array::get_array_len(buf, flex)?;
265 let mut v = Vec::with_capacity(n);
266 for _ in 0..n {
267 v.push(PartitionData::decode(buf, version)?);
268 }
269 v
270 };
271 }
272 if flex {
273 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
274 }
275 Ok(out)
276 }
277}
278#[cfg(test)]
279impl ShareAcknowledgeTopicResponse {
280 #[must_use]
281 pub fn populated(version: i16) -> Self {
282 let mut m = Self::default();
283 if version >= 0 {
284 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
285 }
286 if version >= 0 {
287 m.partitions = vec![PartitionData::populated(version)];
288 }
289 m
290 }
291}
292#[derive(Debug, Clone, PartialEq, Eq, Default)]
293pub struct PartitionData {
294 pub partition_index: i32,
295 pub error_code: i16,
296 pub error_message: Option<String>,
297 pub current_leader: LeaderIdAndEpoch,
298 pub unknown_tagged_fields: UnknownTaggedFields,
299}
300impl Encode for PartitionData {
301 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
302 let flex = version >= 0;
303 if version >= 0 {
304 put_i32(buf, self.partition_index);
305 }
306 if version >= 0 {
307 put_i16(buf, self.error_code);
308 }
309 if version >= 0 {
310 if flex {
311 put_compact_nullable_string(buf, self.error_message.as_deref());
312 } else {
313 put_nullable_string(buf, self.error_message.as_deref());
314 }
315 }
316 if version >= 0 {
317 self.current_leader.encode(buf, version)?;
318 }
319 if flex {
320 let tagged = WriteTaggedFields::new();
321 tagged.write(buf, &self.unknown_tagged_fields);
322 }
323 Ok(())
324 }
325 fn encoded_len(&self, version: i16) -> usize {
326 let flex = version >= 0;
327 let mut n: usize = 0;
328 if version >= 0 {
329 n += 4;
330 }
331 if version >= 0 {
332 n += 2;
333 }
334 if version >= 0 {
335 n += if flex {
336 compact_nullable_string_len(self.error_message.as_deref())
337 } else {
338 nullable_string_len(self.error_message.as_deref())
339 };
340 }
341 if version >= 0 {
342 n += self.current_leader.encoded_len(version);
343 }
344 if flex {
345 let known_pairs: Vec<(u32, usize)> = Vec::new();
346 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
347 }
348 n
349 }
350}
351impl Decode<'_> for PartitionData {
352 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
353 let flex = version >= 0;
354 let mut out = Self::default();
355 if version >= 0 {
356 out.partition_index = get_i32(buf)?;
357 }
358 if version >= 0 {
359 out.error_code = get_i16(buf)?;
360 }
361 if version >= 0 {
362 out.error_message = if flex {
363 get_compact_nullable_string_owned(buf)?
364 } else {
365 get_nullable_string_owned(buf)?
366 };
367 }
368 if version >= 0 {
369 out.current_leader = LeaderIdAndEpoch::decode(buf, version)?;
370 }
371 if flex {
372 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
373 }
374 Ok(out)
375 }
376}
377#[cfg(test)]
378impl PartitionData {
379 #[must_use]
380 pub fn populated(version: i16) -> Self {
381 let mut m = Self::default();
382 if version >= 0 {
383 m.partition_index = 1i32;
384 }
385 if version >= 0 {
386 m.error_code = 1i16;
387 }
388 if version >= 0 {
389 m.error_message = Some("x".to_string());
390 }
391 if version >= 0 {
392 m.current_leader = LeaderIdAndEpoch::populated(version);
393 }
394 m
395 }
396}
397#[derive(Debug, Clone, PartialEq, Eq, Default)]
398pub struct LeaderIdAndEpoch {
399 pub leader_id: i32,
400 pub leader_epoch: i32,
401 pub unknown_tagged_fields: UnknownTaggedFields,
402}
403impl Encode for LeaderIdAndEpoch {
404 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
405 let flex = version >= 0;
406 if version >= 0 {
407 put_i32(buf, self.leader_id);
408 }
409 if version >= 0 {
410 put_i32(buf, self.leader_epoch);
411 }
412 if flex {
413 let tagged = WriteTaggedFields::new();
414 tagged.write(buf, &self.unknown_tagged_fields);
415 }
416 Ok(())
417 }
418 fn encoded_len(&self, version: i16) -> usize {
419 let flex = version >= 0;
420 let mut n: usize = 0;
421 if version >= 0 {
422 n += 4;
423 }
424 if version >= 0 {
425 n += 4;
426 }
427 if flex {
428 let known_pairs: Vec<(u32, usize)> = Vec::new();
429 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
430 }
431 n
432 }
433}
434impl Decode<'_> for LeaderIdAndEpoch {
435 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
436 let flex = version >= 0;
437 let mut out = Self::default();
438 if version >= 0 {
439 out.leader_id = get_i32(buf)?;
440 }
441 if version >= 0 {
442 out.leader_epoch = get_i32(buf)?;
443 }
444 if flex {
445 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
446 }
447 Ok(out)
448 }
449}
450#[cfg(test)]
451impl LeaderIdAndEpoch {
452 #[must_use]
453 pub fn populated(version: i16) -> Self {
454 let mut m = Self::default();
455 if version >= 0 {
456 m.leader_id = 1i32;
457 }
458 if version >= 0 {
459 m.leader_epoch = 1i32;
460 }
461 m
462 }
463}
464#[derive(Debug, Clone, PartialEq, Eq, Default)]
465pub struct NodeEndpoint {
466 pub node_id: i32,
467 pub host: String,
468 pub port: i32,
469 pub rack: Option<String>,
470 pub unknown_tagged_fields: UnknownTaggedFields,
471}
472impl Encode for NodeEndpoint {
473 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
474 let flex = version >= 0;
475 if version >= 0 {
476 put_i32(buf, self.node_id);
477 }
478 if version >= 0 {
479 if flex {
480 put_compact_string(buf, &self.host);
481 } else {
482 put_string(buf, &self.host);
483 }
484 }
485 if version >= 0 {
486 put_i32(buf, self.port);
487 }
488 if version >= 0 {
489 if flex {
490 put_compact_nullable_string(buf, self.rack.as_deref());
491 } else {
492 put_nullable_string(buf, self.rack.as_deref());
493 }
494 }
495 if flex {
496 let tagged = WriteTaggedFields::new();
497 tagged.write(buf, &self.unknown_tagged_fields);
498 }
499 Ok(())
500 }
501 fn encoded_len(&self, version: i16) -> usize {
502 let flex = version >= 0;
503 let mut n: usize = 0;
504 if version >= 0 {
505 n += 4;
506 }
507 if version >= 0 {
508 n += if flex {
509 compact_string_len(&self.host)
510 } else {
511 string_len(&self.host)
512 };
513 }
514 if version >= 0 {
515 n += 4;
516 }
517 if version >= 0 {
518 n += if flex {
519 compact_nullable_string_len(self.rack.as_deref())
520 } else {
521 nullable_string_len(self.rack.as_deref())
522 };
523 }
524 if flex {
525 let known_pairs: Vec<(u32, usize)> = Vec::new();
526 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
527 }
528 n
529 }
530}
531impl Decode<'_> for NodeEndpoint {
532 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
533 let flex = version >= 0;
534 let mut out = Self::default();
535 if version >= 0 {
536 out.node_id = get_i32(buf)?;
537 }
538 if version >= 0 {
539 out.host = if flex {
540 get_compact_string_owned(buf)?
541 } else {
542 get_string_owned(buf)?
543 };
544 }
545 if version >= 0 {
546 out.port = get_i32(buf)?;
547 }
548 if version >= 0 {
549 out.rack = if flex {
550 get_compact_nullable_string_owned(buf)?
551 } else {
552 get_nullable_string_owned(buf)?
553 };
554 }
555 if flex {
556 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
557 }
558 Ok(out)
559 }
560}
561#[cfg(test)]
562impl NodeEndpoint {
563 #[must_use]
564 pub fn populated(version: i16) -> Self {
565 let mut m = Self::default();
566 if version >= 0 {
567 m.node_id = 1i32;
568 }
569 if version >= 0 {
570 m.host = "x".to_string();
571 }
572 if version >= 0 {
573 m.port = 1i32;
574 }
575 if version >= 0 {
576 m.rack = Some("x".to_string());
577 }
578 m
579 }
580}
581#[must_use]
584#[allow(unused_comparisons)]
585pub fn default_json(version: i16) -> ::serde_json::Value {
586 let mut obj = ::serde_json::Map::new();
587 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
588 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
589 obj.insert("errorMessage".to_string(), ::serde_json::Value::Null);
590 if version >= 2 {
591 obj.insert(
592 "acquisitionLockTimeoutMs".to_string(),
593 ::serde_json::json!(0),
594 );
595 }
596 obj.insert("responses".to_string(), ::serde_json::Value::Array(vec![]));
597 obj.insert(
598 "nodeEndpoints".to_string(),
599 ::serde_json::Value::Array(vec![]),
600 );
601 ::serde_json::Value::Object(obj)
602}