kafka_protocol/messages/
consumer_protocol_subscription.rs1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ConsumerProtocolSubscription {
24 pub topics: Vec<StrBytes>,
28
29 pub user_data: Option<Bytes>,
33
34 pub owned_partitions: Vec<TopicPartition>,
38
39 pub generation_id: i32,
43
44 pub rack_id: Option<StrBytes>,
48}
49
50impl ConsumerProtocolSubscription {
51 pub fn with_topics(mut self, value: Vec<StrBytes>) -> Self {
57 self.topics = value;
58 self
59 }
60 pub fn with_user_data(mut self, value: Option<Bytes>) -> Self {
66 self.user_data = value;
67 self
68 }
69 pub fn with_owned_partitions(mut self, value: Vec<TopicPartition>) -> Self {
75 self.owned_partitions = value;
76 self
77 }
78 pub fn with_generation_id(mut self, value: i32) -> Self {
84 self.generation_id = value;
85 self
86 }
87 pub fn with_rack_id(mut self, value: Option<StrBytes>) -> Self {
93 self.rack_id = value;
94 self
95 }
96}
97
98impl Encodable for ConsumerProtocolSubscription {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 0 || version > 3 {
101 bail!("specified version not supported by this message type");
102 }
103 types::Array(types::String).encode(buf, &self.topics)?;
104 types::Bytes.encode(buf, &self.user_data)?;
105 if version >= 1 {
106 types::Array(types::Struct { version }).encode(buf, &self.owned_partitions)?;
107 }
108 if version >= 2 {
109 types::Int32.encode(buf, &self.generation_id)?;
110 }
111 if version >= 3 {
112 types::String.encode(buf, &self.rack_id)?;
113 }
114
115 Ok(())
116 }
117 fn compute_size(&self, version: i16) -> Result<usize> {
118 let mut total_size = 0;
119 total_size += types::Array(types::String).compute_size(&self.topics)?;
120 total_size += types::Bytes.compute_size(&self.user_data)?;
121 if version >= 1 {
122 total_size +=
123 types::Array(types::Struct { version }).compute_size(&self.owned_partitions)?;
124 }
125 if version >= 2 {
126 total_size += types::Int32.compute_size(&self.generation_id)?;
127 }
128 if version >= 3 {
129 total_size += types::String.compute_size(&self.rack_id)?;
130 }
131
132 Ok(total_size)
133 }
134}
135
136impl Decodable for ConsumerProtocolSubscription {
137 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
138 if version < 0 || version > 3 {
139 bail!("specified version not supported by this message type");
140 }
141 let topics = types::Array(types::String).decode(buf)?;
142 let user_data = types::Bytes.decode(buf)?;
143 let owned_partitions = if version >= 1 {
144 types::Array(types::Struct { version }).decode(buf)?
145 } else {
146 Default::default()
147 };
148 let generation_id = if version >= 2 {
149 types::Int32.decode(buf)?
150 } else {
151 -1
152 };
153 let rack_id = if version >= 3 {
154 types::String.decode(buf)?
155 } else {
156 None
157 };
158 Ok(Self {
159 topics,
160 user_data,
161 owned_partitions,
162 generation_id,
163 rack_id,
164 })
165 }
166}
167
168impl Default for ConsumerProtocolSubscription {
169 fn default() -> Self {
170 Self {
171 topics: Default::default(),
172 user_data: None,
173 owned_partitions: Default::default(),
174 generation_id: -1,
175 rack_id: None,
176 }
177 }
178}
179
180impl Message for ConsumerProtocolSubscription {
181 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
182 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
183}
184
185#[non_exhaustive]
187#[derive(Debug, Clone, PartialEq)]
188pub struct TopicPartition {
189 pub topic: super::TopicName,
193
194 pub partitions: Vec<i32>,
198}
199
200impl TopicPartition {
201 pub fn with_topic(mut self, value: super::TopicName) -> Self {
207 self.topic = value;
208 self
209 }
210 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
216 self.partitions = value;
217 self
218 }
219}
220
221impl Encodable for TopicPartition {
222 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
223 if version < 0 || version > 3 {
224 bail!("specified version not supported by this message type");
225 }
226 if version >= 1 {
227 types::String.encode(buf, &self.topic)?;
228 } else {
229 if !self.topic.is_empty() {
230 bail!("A field is set that is not available on the selected protocol version");
231 }
232 }
233 if version >= 1 {
234 types::Array(types::Int32).encode(buf, &self.partitions)?;
235 } else {
236 if !self.partitions.is_empty() {
237 bail!("A field is set that is not available on the selected protocol version");
238 }
239 }
240
241 Ok(())
242 }
243 fn compute_size(&self, version: i16) -> Result<usize> {
244 let mut total_size = 0;
245 if version >= 1 {
246 total_size += types::String.compute_size(&self.topic)?;
247 } else {
248 if !self.topic.is_empty() {
249 bail!("A field is set that is not available on the selected protocol version");
250 }
251 }
252 if version >= 1 {
253 total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
254 } else {
255 if !self.partitions.is_empty() {
256 bail!("A field is set that is not available on the selected protocol version");
257 }
258 }
259
260 Ok(total_size)
261 }
262}
263
264impl Decodable for TopicPartition {
265 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
266 if version < 0 || version > 3 {
267 bail!("specified version not supported by this message type");
268 }
269 let topic = if version >= 1 {
270 types::String.decode(buf)?
271 } else {
272 Default::default()
273 };
274 let partitions = if version >= 1 {
275 types::Array(types::Int32).decode(buf)?
276 } else {
277 Default::default()
278 };
279 Ok(Self { topic, partitions })
280 }
281}
282
283impl Default for TopicPartition {
284 fn default() -> Self {
285 Self {
286 topic: Default::default(),
287 partitions: Default::default(),
288 }
289 }
290}
291
292impl Message for TopicPartition {
293 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
294 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
295}