1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct Cursor {
24 pub topic_name: super::TopicName,
28
29 pub partition_index: i32,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl Cursor {
39 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
45 self.topic_name = value;
46 self
47 }
48 pub fn with_partition_index(mut self, value: i32) -> Self {
54 self.partition_index = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for Cursor {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 types::CompactString.encode(buf, &self.topic_name)?;
73 types::Int32.encode(buf, &self.partition_index)?;
74 let num_tagged_fields = self.unknown_tagged_fields.len();
75 if num_tagged_fields > std::u32::MAX as usize {
76 bail!(
77 "Too many tagged fields to encode ({} fields)",
78 num_tagged_fields
79 );
80 }
81 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
82
83 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
84 Ok(())
85 }
86 fn compute_size(&self, version: i16) -> Result<usize> {
87 let mut total_size = 0;
88 total_size += types::CompactString.compute_size(&self.topic_name)?;
89 total_size += types::Int32.compute_size(&self.partition_index)?;
90 let num_tagged_fields = self.unknown_tagged_fields.len();
91 if num_tagged_fields > std::u32::MAX as usize {
92 bail!(
93 "Too many tagged fields to encode ({} fields)",
94 num_tagged_fields
95 );
96 }
97 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
98
99 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
100 Ok(total_size)
101 }
102}
103
104#[cfg(feature = "broker")]
105impl Decodable for Cursor {
106 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
107 let topic_name = types::CompactString.decode(buf)?;
108 let partition_index = types::Int32.decode(buf)?;
109 let mut unknown_tagged_fields = BTreeMap::new();
110 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
111 for _ in 0..num_tagged_fields {
112 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
113 let size: u32 = types::UnsignedVarInt.decode(buf)?;
114 let unknown_value = buf.try_get_bytes(size as usize)?;
115 unknown_tagged_fields.insert(tag as i32, unknown_value);
116 }
117 Ok(Self {
118 topic_name,
119 partition_index,
120 unknown_tagged_fields,
121 })
122 }
123}
124
125impl Default for Cursor {
126 fn default() -> Self {
127 Self {
128 topic_name: Default::default(),
129 partition_index: 0,
130 unknown_tagged_fields: BTreeMap::new(),
131 }
132 }
133}
134
135impl Message for Cursor {
136 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
137 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
138}
139
140#[non_exhaustive]
142#[derive(Debug, Clone, PartialEq)]
143pub struct DescribeTopicPartitionsRequest {
144 pub topics: Vec<TopicRequest>,
148
149 pub response_partition_limit: i32,
153
154 pub cursor: Option<Cursor>,
158
159 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
161}
162
163impl DescribeTopicPartitionsRequest {
164 pub fn with_topics(mut self, value: Vec<TopicRequest>) -> Self {
170 self.topics = value;
171 self
172 }
173 pub fn with_response_partition_limit(mut self, value: i32) -> Self {
179 self.response_partition_limit = value;
180 self
181 }
182 pub fn with_cursor(mut self, value: Option<Cursor>) -> Self {
188 self.cursor = value;
189 self
190 }
191 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
193 self.unknown_tagged_fields = value;
194 self
195 }
196 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
198 self.unknown_tagged_fields.insert(key, value);
199 self
200 }
201}
202
203#[cfg(feature = "client")]
204impl Encodable for DescribeTopicPartitionsRequest {
205 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
206 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
207 types::Int32.encode(buf, &self.response_partition_limit)?;
208 types::OptionStruct { version }.encode(buf, &self.cursor)?;
209 let num_tagged_fields = self.unknown_tagged_fields.len();
210 if num_tagged_fields > std::u32::MAX as usize {
211 bail!(
212 "Too many tagged fields to encode ({} fields)",
213 num_tagged_fields
214 );
215 }
216 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
217
218 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
219 Ok(())
220 }
221 fn compute_size(&self, version: i16) -> Result<usize> {
222 let mut total_size = 0;
223 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
224 total_size += types::Int32.compute_size(&self.response_partition_limit)?;
225 total_size += types::OptionStruct { version }.compute_size(&self.cursor)?;
226 let num_tagged_fields = self.unknown_tagged_fields.len();
227 if num_tagged_fields > std::u32::MAX as usize {
228 bail!(
229 "Too many tagged fields to encode ({} fields)",
230 num_tagged_fields
231 );
232 }
233 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
234
235 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
236 Ok(total_size)
237 }
238}
239
240#[cfg(feature = "broker")]
241impl Decodable for DescribeTopicPartitionsRequest {
242 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
243 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
244 let response_partition_limit = types::Int32.decode(buf)?;
245 let cursor = types::OptionStruct { version }.decode(buf)?;
246 let mut unknown_tagged_fields = BTreeMap::new();
247 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
248 for _ in 0..num_tagged_fields {
249 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
250 let size: u32 = types::UnsignedVarInt.decode(buf)?;
251 let unknown_value = buf.try_get_bytes(size as usize)?;
252 unknown_tagged_fields.insert(tag as i32, unknown_value);
253 }
254 Ok(Self {
255 topics,
256 response_partition_limit,
257 cursor,
258 unknown_tagged_fields,
259 })
260 }
261}
262
263impl Default for DescribeTopicPartitionsRequest {
264 fn default() -> Self {
265 Self {
266 topics: Default::default(),
267 response_partition_limit: 2000,
268 cursor: None,
269 unknown_tagged_fields: BTreeMap::new(),
270 }
271 }
272}
273
274impl Message for DescribeTopicPartitionsRequest {
275 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
276 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
277}
278
279#[non_exhaustive]
281#[derive(Debug, Clone, PartialEq)]
282pub struct TopicRequest {
283 pub name: super::TopicName,
287
288 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
290}
291
292impl TopicRequest {
293 pub fn with_name(mut self, value: super::TopicName) -> Self {
299 self.name = value;
300 self
301 }
302 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
304 self.unknown_tagged_fields = value;
305 self
306 }
307 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
309 self.unknown_tagged_fields.insert(key, value);
310 self
311 }
312}
313
314#[cfg(feature = "client")]
315impl Encodable for TopicRequest {
316 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
317 types::CompactString.encode(buf, &self.name)?;
318 let num_tagged_fields = self.unknown_tagged_fields.len();
319 if num_tagged_fields > std::u32::MAX as usize {
320 bail!(
321 "Too many tagged fields to encode ({} fields)",
322 num_tagged_fields
323 );
324 }
325 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
326
327 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
328 Ok(())
329 }
330 fn compute_size(&self, version: i16) -> Result<usize> {
331 let mut total_size = 0;
332 total_size += types::CompactString.compute_size(&self.name)?;
333 let num_tagged_fields = self.unknown_tagged_fields.len();
334 if num_tagged_fields > std::u32::MAX as usize {
335 bail!(
336 "Too many tagged fields to encode ({} fields)",
337 num_tagged_fields
338 );
339 }
340 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
341
342 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
343 Ok(total_size)
344 }
345}
346
347#[cfg(feature = "broker")]
348impl Decodable for TopicRequest {
349 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
350 let name = types::CompactString.decode(buf)?;
351 let mut unknown_tagged_fields = BTreeMap::new();
352 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
353 for _ in 0..num_tagged_fields {
354 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
355 let size: u32 = types::UnsignedVarInt.decode(buf)?;
356 let unknown_value = buf.try_get_bytes(size as usize)?;
357 unknown_tagged_fields.insert(tag as i32, unknown_value);
358 }
359 Ok(Self {
360 name,
361 unknown_tagged_fields,
362 })
363 }
364}
365
366impl Default for TopicRequest {
367 fn default() -> Self {
368 Self {
369 name: Default::default(),
370 unknown_tagged_fields: BTreeMap::new(),
371 }
372 }
373}
374
375impl Message for TopicRequest {
376 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
377 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
378}
379
380impl HeaderVersion for DescribeTopicPartitionsRequest {
381 fn header_version(version: i16) -> i16 {
382 2
383 }
384}