1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct DescribeTransactionsResponse {
24 pub throttle_time_ms: i32,
28
29 pub transaction_states: Vec<TransactionState>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl DescribeTransactionsResponse {
39 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
45 self.throttle_time_ms = value;
46 self
47 }
48 pub fn with_transaction_states(mut self, value: Vec<TransactionState>) -> Self {
54 self.transaction_states = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for DescribeTransactionsResponse {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 types::Int32.encode(buf, &self.throttle_time_ms)?;
73 types::CompactArray(types::Struct { version }).encode(buf, &self.transaction_states)?;
74 let num_tagged_fields = self.unknown_tagged_fields.len();
75 if num_tagged_fields > std::u32::MAX as usize {
76 bail!(
77 "Too many tagged fields to encode ({} fields)",
78 num_tagged_fields
79 );
80 }
81 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
82
83 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
84 Ok(())
85 }
86 fn compute_size(&self, version: i16) -> Result<usize> {
87 let mut total_size = 0;
88 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
89 total_size += types::CompactArray(types::Struct { version })
90 .compute_size(&self.transaction_states)?;
91 let num_tagged_fields = self.unknown_tagged_fields.len();
92 if num_tagged_fields > std::u32::MAX as usize {
93 bail!(
94 "Too many tagged fields to encode ({} fields)",
95 num_tagged_fields
96 );
97 }
98 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
99
100 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
101 Ok(total_size)
102 }
103}
104
105#[cfg(feature = "client")]
106impl Decodable for DescribeTransactionsResponse {
107 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
108 let throttle_time_ms = types::Int32.decode(buf)?;
109 let transaction_states = types::CompactArray(types::Struct { version }).decode(buf)?;
110 let mut unknown_tagged_fields = BTreeMap::new();
111 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
112 for _ in 0..num_tagged_fields {
113 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
114 let size: u32 = types::UnsignedVarInt.decode(buf)?;
115 let unknown_value = buf.try_get_bytes(size as usize)?;
116 unknown_tagged_fields.insert(tag as i32, unknown_value);
117 }
118 Ok(Self {
119 throttle_time_ms,
120 transaction_states,
121 unknown_tagged_fields,
122 })
123 }
124}
125
126impl Default for DescribeTransactionsResponse {
127 fn default() -> Self {
128 Self {
129 throttle_time_ms: 0,
130 transaction_states: Default::default(),
131 unknown_tagged_fields: BTreeMap::new(),
132 }
133 }
134}
135
136impl Message for DescribeTransactionsResponse {
137 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
138 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
139}
140
141#[non_exhaustive]
143#[derive(Debug, Clone, PartialEq)]
144pub struct TopicData {
145 pub topic: super::TopicName,
149
150 pub partitions: Vec<i32>,
154
155 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
157}
158
159impl TopicData {
160 pub fn with_topic(mut self, value: super::TopicName) -> Self {
166 self.topic = value;
167 self
168 }
169 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
175 self.partitions = value;
176 self
177 }
178 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
180 self.unknown_tagged_fields = value;
181 self
182 }
183 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
185 self.unknown_tagged_fields.insert(key, value);
186 self
187 }
188}
189
190#[cfg(feature = "broker")]
191impl Encodable for TopicData {
192 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
193 types::CompactString.encode(buf, &self.topic)?;
194 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
195 let num_tagged_fields = self.unknown_tagged_fields.len();
196 if num_tagged_fields > std::u32::MAX as usize {
197 bail!(
198 "Too many tagged fields to encode ({} fields)",
199 num_tagged_fields
200 );
201 }
202 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
203
204 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
205 Ok(())
206 }
207 fn compute_size(&self, version: i16) -> Result<usize> {
208 let mut total_size = 0;
209 total_size += types::CompactString.compute_size(&self.topic)?;
210 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
211 let num_tagged_fields = self.unknown_tagged_fields.len();
212 if num_tagged_fields > std::u32::MAX as usize {
213 bail!(
214 "Too many tagged fields to encode ({} fields)",
215 num_tagged_fields
216 );
217 }
218 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
219
220 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
221 Ok(total_size)
222 }
223}
224
225#[cfg(feature = "client")]
226impl Decodable for TopicData {
227 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
228 let topic = types::CompactString.decode(buf)?;
229 let partitions = types::CompactArray(types::Int32).decode(buf)?;
230 let mut unknown_tagged_fields = BTreeMap::new();
231 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
232 for _ in 0..num_tagged_fields {
233 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
234 let size: u32 = types::UnsignedVarInt.decode(buf)?;
235 let unknown_value = buf.try_get_bytes(size as usize)?;
236 unknown_tagged_fields.insert(tag as i32, unknown_value);
237 }
238 Ok(Self {
239 topic,
240 partitions,
241 unknown_tagged_fields,
242 })
243 }
244}
245
246impl Default for TopicData {
247 fn default() -> Self {
248 Self {
249 topic: Default::default(),
250 partitions: Default::default(),
251 unknown_tagged_fields: BTreeMap::new(),
252 }
253 }
254}
255
256impl Message for TopicData {
257 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
258 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
259}
260
261#[non_exhaustive]
263#[derive(Debug, Clone, PartialEq)]
264pub struct TransactionState {
265 pub error_code: i16,
269
270 pub transactional_id: super::TransactionalId,
274
275 pub transaction_state: StrBytes,
279
280 pub transaction_timeout_ms: i32,
284
285 pub transaction_start_time_ms: i64,
289
290 pub producer_id: super::ProducerId,
294
295 pub producer_epoch: i16,
299
300 pub topics: Vec<TopicData>,
304
305 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
307}
308
309impl TransactionState {
310 pub fn with_error_code(mut self, value: i16) -> Self {
316 self.error_code = value;
317 self
318 }
319 pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
325 self.transactional_id = value;
326 self
327 }
328 pub fn with_transaction_state(mut self, value: StrBytes) -> Self {
334 self.transaction_state = value;
335 self
336 }
337 pub fn with_transaction_timeout_ms(mut self, value: i32) -> Self {
343 self.transaction_timeout_ms = value;
344 self
345 }
346 pub fn with_transaction_start_time_ms(mut self, value: i64) -> Self {
352 self.transaction_start_time_ms = value;
353 self
354 }
355 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
361 self.producer_id = value;
362 self
363 }
364 pub fn with_producer_epoch(mut self, value: i16) -> Self {
370 self.producer_epoch = value;
371 self
372 }
373 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
379 self.topics = value;
380 self
381 }
382 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
384 self.unknown_tagged_fields = value;
385 self
386 }
387 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
389 self.unknown_tagged_fields.insert(key, value);
390 self
391 }
392}
393
394#[cfg(feature = "broker")]
395impl Encodable for TransactionState {
396 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
397 types::Int16.encode(buf, &self.error_code)?;
398 types::CompactString.encode(buf, &self.transactional_id)?;
399 types::CompactString.encode(buf, &self.transaction_state)?;
400 types::Int32.encode(buf, &self.transaction_timeout_ms)?;
401 types::Int64.encode(buf, &self.transaction_start_time_ms)?;
402 types::Int64.encode(buf, &self.producer_id)?;
403 types::Int16.encode(buf, &self.producer_epoch)?;
404 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
405 let num_tagged_fields = self.unknown_tagged_fields.len();
406 if num_tagged_fields > std::u32::MAX as usize {
407 bail!(
408 "Too many tagged fields to encode ({} fields)",
409 num_tagged_fields
410 );
411 }
412 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
413
414 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
415 Ok(())
416 }
417 fn compute_size(&self, version: i16) -> Result<usize> {
418 let mut total_size = 0;
419 total_size += types::Int16.compute_size(&self.error_code)?;
420 total_size += types::CompactString.compute_size(&self.transactional_id)?;
421 total_size += types::CompactString.compute_size(&self.transaction_state)?;
422 total_size += types::Int32.compute_size(&self.transaction_timeout_ms)?;
423 total_size += types::Int64.compute_size(&self.transaction_start_time_ms)?;
424 total_size += types::Int64.compute_size(&self.producer_id)?;
425 total_size += types::Int16.compute_size(&self.producer_epoch)?;
426 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
427 let num_tagged_fields = self.unknown_tagged_fields.len();
428 if num_tagged_fields > std::u32::MAX as usize {
429 bail!(
430 "Too many tagged fields to encode ({} fields)",
431 num_tagged_fields
432 );
433 }
434 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
435
436 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
437 Ok(total_size)
438 }
439}
440
441#[cfg(feature = "client")]
442impl Decodable for TransactionState {
443 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
444 let error_code = types::Int16.decode(buf)?;
445 let transactional_id = types::CompactString.decode(buf)?;
446 let transaction_state = types::CompactString.decode(buf)?;
447 let transaction_timeout_ms = types::Int32.decode(buf)?;
448 let transaction_start_time_ms = types::Int64.decode(buf)?;
449 let producer_id = types::Int64.decode(buf)?;
450 let producer_epoch = types::Int16.decode(buf)?;
451 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
452 let mut unknown_tagged_fields = BTreeMap::new();
453 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
454 for _ in 0..num_tagged_fields {
455 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
456 let size: u32 = types::UnsignedVarInt.decode(buf)?;
457 let unknown_value = buf.try_get_bytes(size as usize)?;
458 unknown_tagged_fields.insert(tag as i32, unknown_value);
459 }
460 Ok(Self {
461 error_code,
462 transactional_id,
463 transaction_state,
464 transaction_timeout_ms,
465 transaction_start_time_ms,
466 producer_id,
467 producer_epoch,
468 topics,
469 unknown_tagged_fields,
470 })
471 }
472}
473
474impl Default for TransactionState {
475 fn default() -> Self {
476 Self {
477 error_code: 0,
478 transactional_id: Default::default(),
479 transaction_state: Default::default(),
480 transaction_timeout_ms: 0,
481 transaction_start_time_ms: 0,
482 producer_id: (0).into(),
483 producer_epoch: 0,
484 topics: Default::default(),
485 unknown_tagged_fields: BTreeMap::new(),
486 }
487 }
488}
489
490impl Message for TransactionState {
491 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
492 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
493}
494
495impl HeaderVersion for DescribeTransactionsResponse {
496 fn header_version(version: i16) -> i16 {
497 1
498 }
499}