1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct DescribeTransactionsResponse {
24 pub throttle_time_ms: i32,
28
29 pub transaction_states: Vec<TransactionState>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl DescribeTransactionsResponse {
39 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
45 self.throttle_time_ms = value;
46 self
47 }
48 pub fn with_transaction_states(mut self, value: Vec<TransactionState>) -> Self {
54 self.transaction_states = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for DescribeTransactionsResponse {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version != 0 {
73 bail!("specified version not supported by this message type");
74 }
75 types::Int32.encode(buf, &self.throttle_time_ms)?;
76 types::CompactArray(types::Struct { version }).encode(buf, &self.transaction_states)?;
77 let num_tagged_fields = self.unknown_tagged_fields.len();
78 if num_tagged_fields > std::u32::MAX as usize {
79 bail!(
80 "Too many tagged fields to encode ({} fields)",
81 num_tagged_fields
82 );
83 }
84 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
85
86 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
87 Ok(())
88 }
89 fn compute_size(&self, version: i16) -> Result<usize> {
90 let mut total_size = 0;
91 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
92 total_size += types::CompactArray(types::Struct { version })
93 .compute_size(&self.transaction_states)?;
94 let num_tagged_fields = self.unknown_tagged_fields.len();
95 if num_tagged_fields > std::u32::MAX as usize {
96 bail!(
97 "Too many tagged fields to encode ({} fields)",
98 num_tagged_fields
99 );
100 }
101 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
102
103 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
104 Ok(total_size)
105 }
106}
107
108#[cfg(feature = "client")]
109impl Decodable for DescribeTransactionsResponse {
110 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
111 if version != 0 {
112 bail!("specified version not supported by this message type");
113 }
114 let throttle_time_ms = types::Int32.decode(buf)?;
115 let transaction_states = types::CompactArray(types::Struct { version }).decode(buf)?;
116 let mut unknown_tagged_fields = BTreeMap::new();
117 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
118 for _ in 0..num_tagged_fields {
119 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
120 let size: u32 = types::UnsignedVarInt.decode(buf)?;
121 let unknown_value = buf.try_get_bytes(size as usize)?;
122 unknown_tagged_fields.insert(tag as i32, unknown_value);
123 }
124 Ok(Self {
125 throttle_time_ms,
126 transaction_states,
127 unknown_tagged_fields,
128 })
129 }
130}
131
132impl Default for DescribeTransactionsResponse {
133 fn default() -> Self {
134 Self {
135 throttle_time_ms: 0,
136 transaction_states: Default::default(),
137 unknown_tagged_fields: BTreeMap::new(),
138 }
139 }
140}
141
142impl Message for DescribeTransactionsResponse {
143 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
144 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
145}
146
147#[non_exhaustive]
149#[derive(Debug, Clone, PartialEq)]
150pub struct TopicData {
151 pub topic: super::TopicName,
155
156 pub partitions: Vec<i32>,
160
161 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
163}
164
165impl TopicData {
166 pub fn with_topic(mut self, value: super::TopicName) -> Self {
172 self.topic = value;
173 self
174 }
175 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
181 self.partitions = value;
182 self
183 }
184 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
186 self.unknown_tagged_fields = value;
187 self
188 }
189 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
191 self.unknown_tagged_fields.insert(key, value);
192 self
193 }
194}
195
196#[cfg(feature = "broker")]
197impl Encodable for TopicData {
198 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
199 if version != 0 {
200 bail!("specified version not supported by this message type");
201 }
202 types::CompactString.encode(buf, &self.topic)?;
203 types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
204 let num_tagged_fields = self.unknown_tagged_fields.len();
205 if num_tagged_fields > std::u32::MAX as usize {
206 bail!(
207 "Too many tagged fields to encode ({} fields)",
208 num_tagged_fields
209 );
210 }
211 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
212
213 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
214 Ok(())
215 }
216 fn compute_size(&self, version: i16) -> Result<usize> {
217 let mut total_size = 0;
218 total_size += types::CompactString.compute_size(&self.topic)?;
219 total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
220 let num_tagged_fields = self.unknown_tagged_fields.len();
221 if num_tagged_fields > std::u32::MAX as usize {
222 bail!(
223 "Too many tagged fields to encode ({} fields)",
224 num_tagged_fields
225 );
226 }
227 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
228
229 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
230 Ok(total_size)
231 }
232}
233
234#[cfg(feature = "client")]
235impl Decodable for TopicData {
236 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
237 if version != 0 {
238 bail!("specified version not supported by this message type");
239 }
240 let topic = types::CompactString.decode(buf)?;
241 let partitions = types::CompactArray(types::Int32).decode(buf)?;
242 let mut unknown_tagged_fields = BTreeMap::new();
243 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
244 for _ in 0..num_tagged_fields {
245 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
246 let size: u32 = types::UnsignedVarInt.decode(buf)?;
247 let unknown_value = buf.try_get_bytes(size as usize)?;
248 unknown_tagged_fields.insert(tag as i32, unknown_value);
249 }
250 Ok(Self {
251 topic,
252 partitions,
253 unknown_tagged_fields,
254 })
255 }
256}
257
258impl Default for TopicData {
259 fn default() -> Self {
260 Self {
261 topic: Default::default(),
262 partitions: Default::default(),
263 unknown_tagged_fields: BTreeMap::new(),
264 }
265 }
266}
267
268impl Message for TopicData {
269 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
270 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
271}
272
273#[non_exhaustive]
275#[derive(Debug, Clone, PartialEq)]
276pub struct TransactionState {
277 pub error_code: i16,
281
282 pub transactional_id: super::TransactionalId,
286
287 pub transaction_state: StrBytes,
291
292 pub transaction_timeout_ms: i32,
296
297 pub transaction_start_time_ms: i64,
301
302 pub producer_id: super::ProducerId,
306
307 pub producer_epoch: i16,
311
312 pub topics: Vec<TopicData>,
316
317 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
319}
320
321impl TransactionState {
322 pub fn with_error_code(mut self, value: i16) -> Self {
328 self.error_code = value;
329 self
330 }
331 pub fn with_transactional_id(mut self, value: super::TransactionalId) -> Self {
337 self.transactional_id = value;
338 self
339 }
340 pub fn with_transaction_state(mut self, value: StrBytes) -> Self {
346 self.transaction_state = value;
347 self
348 }
349 pub fn with_transaction_timeout_ms(mut self, value: i32) -> Self {
355 self.transaction_timeout_ms = value;
356 self
357 }
358 pub fn with_transaction_start_time_ms(mut self, value: i64) -> Self {
364 self.transaction_start_time_ms = value;
365 self
366 }
367 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
373 self.producer_id = value;
374 self
375 }
376 pub fn with_producer_epoch(mut self, value: i16) -> Self {
382 self.producer_epoch = value;
383 self
384 }
385 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
391 self.topics = value;
392 self
393 }
394 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
396 self.unknown_tagged_fields = value;
397 self
398 }
399 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
401 self.unknown_tagged_fields.insert(key, value);
402 self
403 }
404}
405
406#[cfg(feature = "broker")]
407impl Encodable for TransactionState {
408 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
409 if version != 0 {
410 bail!("specified version not supported by this message type");
411 }
412 types::Int16.encode(buf, &self.error_code)?;
413 types::CompactString.encode(buf, &self.transactional_id)?;
414 types::CompactString.encode(buf, &self.transaction_state)?;
415 types::Int32.encode(buf, &self.transaction_timeout_ms)?;
416 types::Int64.encode(buf, &self.transaction_start_time_ms)?;
417 types::Int64.encode(buf, &self.producer_id)?;
418 types::Int16.encode(buf, &self.producer_epoch)?;
419 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
420 let num_tagged_fields = self.unknown_tagged_fields.len();
421 if num_tagged_fields > std::u32::MAX as usize {
422 bail!(
423 "Too many tagged fields to encode ({} fields)",
424 num_tagged_fields
425 );
426 }
427 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
428
429 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
430 Ok(())
431 }
432 fn compute_size(&self, version: i16) -> Result<usize> {
433 let mut total_size = 0;
434 total_size += types::Int16.compute_size(&self.error_code)?;
435 total_size += types::CompactString.compute_size(&self.transactional_id)?;
436 total_size += types::CompactString.compute_size(&self.transaction_state)?;
437 total_size += types::Int32.compute_size(&self.transaction_timeout_ms)?;
438 total_size += types::Int64.compute_size(&self.transaction_start_time_ms)?;
439 total_size += types::Int64.compute_size(&self.producer_id)?;
440 total_size += types::Int16.compute_size(&self.producer_epoch)?;
441 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
442 let num_tagged_fields = self.unknown_tagged_fields.len();
443 if num_tagged_fields > std::u32::MAX as usize {
444 bail!(
445 "Too many tagged fields to encode ({} fields)",
446 num_tagged_fields
447 );
448 }
449 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
450
451 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
452 Ok(total_size)
453 }
454}
455
456#[cfg(feature = "client")]
457impl Decodable for TransactionState {
458 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
459 if version != 0 {
460 bail!("specified version not supported by this message type");
461 }
462 let error_code = types::Int16.decode(buf)?;
463 let transactional_id = types::CompactString.decode(buf)?;
464 let transaction_state = types::CompactString.decode(buf)?;
465 let transaction_timeout_ms = types::Int32.decode(buf)?;
466 let transaction_start_time_ms = types::Int64.decode(buf)?;
467 let producer_id = types::Int64.decode(buf)?;
468 let producer_epoch = types::Int16.decode(buf)?;
469 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
470 let mut unknown_tagged_fields = BTreeMap::new();
471 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
472 for _ in 0..num_tagged_fields {
473 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
474 let size: u32 = types::UnsignedVarInt.decode(buf)?;
475 let unknown_value = buf.try_get_bytes(size as usize)?;
476 unknown_tagged_fields.insert(tag as i32, unknown_value);
477 }
478 Ok(Self {
479 error_code,
480 transactional_id,
481 transaction_state,
482 transaction_timeout_ms,
483 transaction_start_time_ms,
484 producer_id,
485 producer_epoch,
486 topics,
487 unknown_tagged_fields,
488 })
489 }
490}
491
492impl Default for TransactionState {
493 fn default() -> Self {
494 Self {
495 error_code: 0,
496 transactional_id: Default::default(),
497 transaction_state: Default::default(),
498 transaction_timeout_ms: 0,
499 transaction_start_time_ms: 0,
500 producer_id: (0).into(),
501 producer_epoch: 0,
502 topics: Default::default(),
503 unknown_tagged_fields: BTreeMap::new(),
504 }
505 }
506}
507
508impl Message for TransactionState {
509 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
510 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
511}
512
513impl HeaderVersion for DescribeTransactionsResponse {
514 fn header_version(version: i16) -> i16 {
515 1
516 }
517}