kafka_protocol/messages/
consumer_protocol_subscription.rs1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ConsumerProtocolSubscription {
24 pub topics: Vec<StrBytes>,
28
29 pub user_data: Option<Bytes>,
33
34 pub owned_partitions: Vec<TopicPartition>,
38
39 pub generation_id: i32,
43
44 pub rack_id: Option<StrBytes>,
48}
49
50impl ConsumerProtocolSubscription {
51 pub fn with_topics(mut self, value: Vec<StrBytes>) -> Self {
57 self.topics = value;
58 self
59 }
60 pub fn with_user_data(mut self, value: Option<Bytes>) -> Self {
66 self.user_data = value;
67 self
68 }
69 pub fn with_owned_partitions(mut self, value: Vec<TopicPartition>) -> Self {
75 self.owned_partitions = value;
76 self
77 }
78 pub fn with_generation_id(mut self, value: i32) -> Self {
84 self.generation_id = value;
85 self
86 }
87 pub fn with_rack_id(mut self, value: Option<StrBytes>) -> Self {
93 self.rack_id = value;
94 self
95 }
96}
97
98impl Encodable for ConsumerProtocolSubscription {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 types::Array(types::String).encode(buf, &self.topics)?;
101 types::Bytes.encode(buf, &self.user_data)?;
102 if version >= 1 {
103 types::Array(types::Struct { version }).encode(buf, &self.owned_partitions)?;
104 }
105 if version >= 2 {
106 types::Int32.encode(buf, &self.generation_id)?;
107 }
108 if version >= 3 {
109 types::String.encode(buf, &self.rack_id)?;
110 }
111
112 Ok(())
113 }
114 fn compute_size(&self, version: i16) -> Result<usize> {
115 let mut total_size = 0;
116 total_size += types::Array(types::String).compute_size(&self.topics)?;
117 total_size += types::Bytes.compute_size(&self.user_data)?;
118 if version >= 1 {
119 total_size +=
120 types::Array(types::Struct { version }).compute_size(&self.owned_partitions)?;
121 }
122 if version >= 2 {
123 total_size += types::Int32.compute_size(&self.generation_id)?;
124 }
125 if version >= 3 {
126 total_size += types::String.compute_size(&self.rack_id)?;
127 }
128
129 Ok(total_size)
130 }
131}
132
133impl Decodable for ConsumerProtocolSubscription {
134 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
135 let topics = types::Array(types::String).decode(buf)?;
136 let user_data = types::Bytes.decode(buf)?;
137 let owned_partitions = if version >= 1 {
138 types::Array(types::Struct { version }).decode(buf)?
139 } else {
140 Default::default()
141 };
142 let generation_id = if version >= 2 {
143 types::Int32.decode(buf)?
144 } else {
145 -1
146 };
147 let rack_id = if version >= 3 {
148 types::String.decode(buf)?
149 } else {
150 None
151 };
152 Ok(Self {
153 topics,
154 user_data,
155 owned_partitions,
156 generation_id,
157 rack_id,
158 })
159 }
160}
161
162impl Default for ConsumerProtocolSubscription {
163 fn default() -> Self {
164 Self {
165 topics: Default::default(),
166 user_data: None,
167 owned_partitions: Default::default(),
168 generation_id: -1,
169 rack_id: None,
170 }
171 }
172}
173
174impl Message for ConsumerProtocolSubscription {
175 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
176 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
177}
178
179#[non_exhaustive]
181#[derive(Debug, Clone, PartialEq)]
182pub struct TopicPartition {
183 pub topic: super::TopicName,
187
188 pub partitions: Vec<i32>,
192}
193
194impl TopicPartition {
195 pub fn with_topic(mut self, value: super::TopicName) -> Self {
201 self.topic = value;
202 self
203 }
204 pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
210 self.partitions = value;
211 self
212 }
213}
214
215impl Encodable for TopicPartition {
216 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
217 if version >= 1 {
218 types::String.encode(buf, &self.topic)?;
219 } else {
220 if !self.topic.is_empty() {
221 bail!("A field is set that is not available on the selected protocol version");
222 }
223 }
224 if version >= 1 {
225 types::Array(types::Int32).encode(buf, &self.partitions)?;
226 } else {
227 if !self.partitions.is_empty() {
228 bail!("A field is set that is not available on the selected protocol version");
229 }
230 }
231
232 Ok(())
233 }
234 fn compute_size(&self, version: i16) -> Result<usize> {
235 let mut total_size = 0;
236 if version >= 1 {
237 total_size += types::String.compute_size(&self.topic)?;
238 } else {
239 if !self.topic.is_empty() {
240 bail!("A field is set that is not available on the selected protocol version");
241 }
242 }
243 if version >= 1 {
244 total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
245 } else {
246 if !self.partitions.is_empty() {
247 bail!("A field is set that is not available on the selected protocol version");
248 }
249 }
250
251 Ok(total_size)
252 }
253}
254
255impl Decodable for TopicPartition {
256 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
257 let topic = if version >= 1 {
258 types::String.decode(buf)?
259 } else {
260 Default::default()
261 };
262 let partitions = if version >= 1 {
263 types::Array(types::Int32).decode(buf)?
264 } else {
265 Default::default()
266 };
267 Ok(Self { topic, partitions })
268 }
269}
270
271impl Default for TopicPartition {
272 fn default() -> Self {
273 Self {
274 topic: Default::default(),
275 partitions: Default::default(),
276 }
277 }
278}
279
280impl Message for TopicPartition {
281 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
282 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
283}