1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FeatureUpdateKey {
24 pub feature: StrBytes,
28
29 pub max_version_level: i16,
33
34 pub allow_downgrade: bool,
38
39 pub upgrade_type: i8,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl FeatureUpdateKey {
49 pub fn with_feature(mut self, value: StrBytes) -> Self {
55 self.feature = value;
56 self
57 }
58 pub fn with_max_version_level(mut self, value: i16) -> Self {
64 self.max_version_level = value;
65 self
66 }
67 pub fn with_allow_downgrade(mut self, value: bool) -> Self {
73 self.allow_downgrade = value;
74 self
75 }
76 pub fn with_upgrade_type(mut self, value: i8) -> Self {
82 self.upgrade_type = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for FeatureUpdateKey {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 0 || version > 1 {
101 bail!("specified version not supported by this message type");
102 }
103 types::CompactString.encode(buf, &self.feature)?;
104 types::Int16.encode(buf, &self.max_version_level)?;
105 if version == 0 {
106 types::Boolean.encode(buf, &self.allow_downgrade)?;
107 } else {
108 if self.allow_downgrade {
109 bail!("A field is set that is not available on the selected protocol version");
110 }
111 }
112 if version >= 1 {
113 types::Int8.encode(buf, &self.upgrade_type)?;
114 } else {
115 if self.upgrade_type != 1 {
116 bail!("A field is set that is not available on the selected protocol version");
117 }
118 }
119 let num_tagged_fields = self.unknown_tagged_fields.len();
120 if num_tagged_fields > std::u32::MAX as usize {
121 bail!(
122 "Too many tagged fields to encode ({} fields)",
123 num_tagged_fields
124 );
125 }
126 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
127
128 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
129 Ok(())
130 }
131 fn compute_size(&self, version: i16) -> Result<usize> {
132 let mut total_size = 0;
133 total_size += types::CompactString.compute_size(&self.feature)?;
134 total_size += types::Int16.compute_size(&self.max_version_level)?;
135 if version == 0 {
136 total_size += types::Boolean.compute_size(&self.allow_downgrade)?;
137 } else {
138 if self.allow_downgrade {
139 bail!("A field is set that is not available on the selected protocol version");
140 }
141 }
142 if version >= 1 {
143 total_size += types::Int8.compute_size(&self.upgrade_type)?;
144 } else {
145 if self.upgrade_type != 1 {
146 bail!("A field is set that is not available on the selected protocol version");
147 }
148 }
149 let num_tagged_fields = self.unknown_tagged_fields.len();
150 if num_tagged_fields > std::u32::MAX as usize {
151 bail!(
152 "Too many tagged fields to encode ({} fields)",
153 num_tagged_fields
154 );
155 }
156 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
157
158 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
159 Ok(total_size)
160 }
161}
162
163#[cfg(feature = "broker")]
164impl Decodable for FeatureUpdateKey {
165 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
166 if version < 0 || version > 1 {
167 bail!("specified version not supported by this message type");
168 }
169 let feature = types::CompactString.decode(buf)?;
170 let max_version_level = types::Int16.decode(buf)?;
171 let allow_downgrade = if version == 0 {
172 types::Boolean.decode(buf)?
173 } else {
174 false
175 };
176 let upgrade_type = if version >= 1 {
177 types::Int8.decode(buf)?
178 } else {
179 1
180 };
181 let mut unknown_tagged_fields = BTreeMap::new();
182 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
183 for _ in 0..num_tagged_fields {
184 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
185 let size: u32 = types::UnsignedVarInt.decode(buf)?;
186 let unknown_value = buf.try_get_bytes(size as usize)?;
187 unknown_tagged_fields.insert(tag as i32, unknown_value);
188 }
189 Ok(Self {
190 feature,
191 max_version_level,
192 allow_downgrade,
193 upgrade_type,
194 unknown_tagged_fields,
195 })
196 }
197}
198
199impl Default for FeatureUpdateKey {
200 fn default() -> Self {
201 Self {
202 feature: Default::default(),
203 max_version_level: 0,
204 allow_downgrade: false,
205 upgrade_type: 1,
206 unknown_tagged_fields: BTreeMap::new(),
207 }
208 }
209}
210
211impl Message for FeatureUpdateKey {
212 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
213 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
214}
215
216#[non_exhaustive]
218#[derive(Debug, Clone, PartialEq)]
219pub struct UpdateFeaturesRequest {
220 pub timeout_ms: i32,
224
225 pub feature_updates: Vec<FeatureUpdateKey>,
229
230 pub validate_only: bool,
234
235 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
237}
238
239impl UpdateFeaturesRequest {
240 pub fn with_timeout_ms(mut self, value: i32) -> Self {
246 self.timeout_ms = value;
247 self
248 }
249 pub fn with_feature_updates(mut self, value: Vec<FeatureUpdateKey>) -> Self {
255 self.feature_updates = value;
256 self
257 }
258 pub fn with_validate_only(mut self, value: bool) -> Self {
264 self.validate_only = value;
265 self
266 }
267 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
269 self.unknown_tagged_fields = value;
270 self
271 }
272 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
274 self.unknown_tagged_fields.insert(key, value);
275 self
276 }
277}
278
279#[cfg(feature = "client")]
280impl Encodable for UpdateFeaturesRequest {
281 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
282 if version < 0 || version > 1 {
283 bail!("specified version not supported by this message type");
284 }
285 types::Int32.encode(buf, &self.timeout_ms)?;
286 types::CompactArray(types::Struct { version }).encode(buf, &self.feature_updates)?;
287 if version >= 1 {
288 types::Boolean.encode(buf, &self.validate_only)?;
289 } else {
290 if self.validate_only {
291 bail!("A field is set that is not available on the selected protocol version");
292 }
293 }
294 let num_tagged_fields = self.unknown_tagged_fields.len();
295 if num_tagged_fields > std::u32::MAX as usize {
296 bail!(
297 "Too many tagged fields to encode ({} fields)",
298 num_tagged_fields
299 );
300 }
301 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
302
303 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
304 Ok(())
305 }
306 fn compute_size(&self, version: i16) -> Result<usize> {
307 let mut total_size = 0;
308 total_size += types::Int32.compute_size(&self.timeout_ms)?;
309 total_size +=
310 types::CompactArray(types::Struct { version }).compute_size(&self.feature_updates)?;
311 if version >= 1 {
312 total_size += types::Boolean.compute_size(&self.validate_only)?;
313 } else {
314 if self.validate_only {
315 bail!("A field is set that is not available on the selected protocol version");
316 }
317 }
318 let num_tagged_fields = self.unknown_tagged_fields.len();
319 if num_tagged_fields > std::u32::MAX as usize {
320 bail!(
321 "Too many tagged fields to encode ({} fields)",
322 num_tagged_fields
323 );
324 }
325 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
326
327 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
328 Ok(total_size)
329 }
330}
331
332#[cfg(feature = "broker")]
333impl Decodable for UpdateFeaturesRequest {
334 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
335 if version < 0 || version > 1 {
336 bail!("specified version not supported by this message type");
337 }
338 let timeout_ms = types::Int32.decode(buf)?;
339 let feature_updates = types::CompactArray(types::Struct { version }).decode(buf)?;
340 let validate_only = if version >= 1 {
341 types::Boolean.decode(buf)?
342 } else {
343 false
344 };
345 let mut unknown_tagged_fields = BTreeMap::new();
346 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
347 for _ in 0..num_tagged_fields {
348 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
349 let size: u32 = types::UnsignedVarInt.decode(buf)?;
350 let unknown_value = buf.try_get_bytes(size as usize)?;
351 unknown_tagged_fields.insert(tag as i32, unknown_value);
352 }
353 Ok(Self {
354 timeout_ms,
355 feature_updates,
356 validate_only,
357 unknown_tagged_fields,
358 })
359 }
360}
361
362impl Default for UpdateFeaturesRequest {
363 fn default() -> Self {
364 Self {
365 timeout_ms: 60000,
366 feature_updates: Default::default(),
367 validate_only: false,
368 unknown_tagged_fields: BTreeMap::new(),
369 }
370 }
371}
372
373impl Message for UpdateFeaturesRequest {
374 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
375 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
376}
377
378impl HeaderVersion for UpdateFeaturesRequest {
379 fn header_version(version: i16) -> i16 {
380 2
381 }
382}