1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FeatureUpdateKey {
24 pub feature: StrBytes,
28
29 pub max_version_level: i16,
33
34 pub allow_downgrade: bool,
38
39 pub upgrade_type: i8,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl FeatureUpdateKey {
49 pub fn with_feature(mut self, value: StrBytes) -> Self {
55 self.feature = value;
56 self
57 }
58 pub fn with_max_version_level(mut self, value: i16) -> Self {
64 self.max_version_level = value;
65 self
66 }
67 pub fn with_allow_downgrade(mut self, value: bool) -> Self {
73 self.allow_downgrade = value;
74 self
75 }
76 pub fn with_upgrade_type(mut self, value: i8) -> Self {
82 self.upgrade_type = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for FeatureUpdateKey {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 types::CompactString.encode(buf, &self.feature)?;
101 types::Int16.encode(buf, &self.max_version_level)?;
102 if version == 0 {
103 types::Boolean.encode(buf, &self.allow_downgrade)?;
104 } else {
105 if self.allow_downgrade {
106 bail!("A field is set that is not available on the selected protocol version");
107 }
108 }
109 if version >= 1 {
110 types::Int8.encode(buf, &self.upgrade_type)?;
111 } else {
112 if self.upgrade_type != 1 {
113 bail!("A field is set that is not available on the selected protocol version");
114 }
115 }
116 let num_tagged_fields = self.unknown_tagged_fields.len();
117 if num_tagged_fields > std::u32::MAX as usize {
118 bail!(
119 "Too many tagged fields to encode ({} fields)",
120 num_tagged_fields
121 );
122 }
123 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
124
125 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
126 Ok(())
127 }
128 fn compute_size(&self, version: i16) -> Result<usize> {
129 let mut total_size = 0;
130 total_size += types::CompactString.compute_size(&self.feature)?;
131 total_size += types::Int16.compute_size(&self.max_version_level)?;
132 if version == 0 {
133 total_size += types::Boolean.compute_size(&self.allow_downgrade)?;
134 } else {
135 if self.allow_downgrade {
136 bail!("A field is set that is not available on the selected protocol version");
137 }
138 }
139 if version >= 1 {
140 total_size += types::Int8.compute_size(&self.upgrade_type)?;
141 } else {
142 if self.upgrade_type != 1 {
143 bail!("A field is set that is not available on the selected protocol version");
144 }
145 }
146 let num_tagged_fields = self.unknown_tagged_fields.len();
147 if num_tagged_fields > std::u32::MAX as usize {
148 bail!(
149 "Too many tagged fields to encode ({} fields)",
150 num_tagged_fields
151 );
152 }
153 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
154
155 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
156 Ok(total_size)
157 }
158}
159
160#[cfg(feature = "broker")]
161impl Decodable for FeatureUpdateKey {
162 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
163 let feature = types::CompactString.decode(buf)?;
164 let max_version_level = types::Int16.decode(buf)?;
165 let allow_downgrade = if version == 0 {
166 types::Boolean.decode(buf)?
167 } else {
168 false
169 };
170 let upgrade_type = if version >= 1 {
171 types::Int8.decode(buf)?
172 } else {
173 1
174 };
175 let mut unknown_tagged_fields = BTreeMap::new();
176 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
177 for _ in 0..num_tagged_fields {
178 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
179 let size: u32 = types::UnsignedVarInt.decode(buf)?;
180 let unknown_value = buf.try_get_bytes(size as usize)?;
181 unknown_tagged_fields.insert(tag as i32, unknown_value);
182 }
183 Ok(Self {
184 feature,
185 max_version_level,
186 allow_downgrade,
187 upgrade_type,
188 unknown_tagged_fields,
189 })
190 }
191}
192
193impl Default for FeatureUpdateKey {
194 fn default() -> Self {
195 Self {
196 feature: Default::default(),
197 max_version_level: 0,
198 allow_downgrade: false,
199 upgrade_type: 1,
200 unknown_tagged_fields: BTreeMap::new(),
201 }
202 }
203}
204
205impl Message for FeatureUpdateKey {
206 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
207 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
208}
209
210#[non_exhaustive]
212#[derive(Debug, Clone, PartialEq)]
213pub struct UpdateFeaturesRequest {
214 pub timeout_ms: i32,
218
219 pub feature_updates: Vec<FeatureUpdateKey>,
223
224 pub validate_only: bool,
228
229 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
231}
232
233impl UpdateFeaturesRequest {
234 pub fn with_timeout_ms(mut self, value: i32) -> Self {
240 self.timeout_ms = value;
241 self
242 }
243 pub fn with_feature_updates(mut self, value: Vec<FeatureUpdateKey>) -> Self {
249 self.feature_updates = value;
250 self
251 }
252 pub fn with_validate_only(mut self, value: bool) -> Self {
258 self.validate_only = value;
259 self
260 }
261 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
263 self.unknown_tagged_fields = value;
264 self
265 }
266 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
268 self.unknown_tagged_fields.insert(key, value);
269 self
270 }
271}
272
273#[cfg(feature = "client")]
274impl Encodable for UpdateFeaturesRequest {
275 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
276 types::Int32.encode(buf, &self.timeout_ms)?;
277 types::CompactArray(types::Struct { version }).encode(buf, &self.feature_updates)?;
278 if version >= 1 {
279 types::Boolean.encode(buf, &self.validate_only)?;
280 } else {
281 if self.validate_only {
282 bail!("A field is set that is not available on the selected protocol version");
283 }
284 }
285 let num_tagged_fields = self.unknown_tagged_fields.len();
286 if num_tagged_fields > std::u32::MAX as usize {
287 bail!(
288 "Too many tagged fields to encode ({} fields)",
289 num_tagged_fields
290 );
291 }
292 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
293
294 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
295 Ok(())
296 }
297 fn compute_size(&self, version: i16) -> Result<usize> {
298 let mut total_size = 0;
299 total_size += types::Int32.compute_size(&self.timeout_ms)?;
300 total_size +=
301 types::CompactArray(types::Struct { version }).compute_size(&self.feature_updates)?;
302 if version >= 1 {
303 total_size += types::Boolean.compute_size(&self.validate_only)?;
304 } else {
305 if self.validate_only {
306 bail!("A field is set that is not available on the selected protocol version");
307 }
308 }
309 let num_tagged_fields = self.unknown_tagged_fields.len();
310 if num_tagged_fields > std::u32::MAX as usize {
311 bail!(
312 "Too many tagged fields to encode ({} fields)",
313 num_tagged_fields
314 );
315 }
316 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
317
318 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
319 Ok(total_size)
320 }
321}
322
323#[cfg(feature = "broker")]
324impl Decodable for UpdateFeaturesRequest {
325 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
326 let timeout_ms = types::Int32.decode(buf)?;
327 let feature_updates = types::CompactArray(types::Struct { version }).decode(buf)?;
328 let validate_only = if version >= 1 {
329 types::Boolean.decode(buf)?
330 } else {
331 false
332 };
333 let mut unknown_tagged_fields = BTreeMap::new();
334 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
335 for _ in 0..num_tagged_fields {
336 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
337 let size: u32 = types::UnsignedVarInt.decode(buf)?;
338 let unknown_value = buf.try_get_bytes(size as usize)?;
339 unknown_tagged_fields.insert(tag as i32, unknown_value);
340 }
341 Ok(Self {
342 timeout_ms,
343 feature_updates,
344 validate_only,
345 unknown_tagged_fields,
346 })
347 }
348}
349
350impl Default for UpdateFeaturesRequest {
351 fn default() -> Self {
352 Self {
353 timeout_ms: 60000,
354 feature_updates: Default::default(),
355 validate_only: false,
356 unknown_tagged_fields: BTreeMap::new(),
357 }
358 }
359}
360
361impl Message for UpdateFeaturesRequest {
362 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
363 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
364}
365
366impl HeaderVersion for UpdateFeaturesRequest {
367 fn header_version(version: i16) -> i16 {
368 2
369 }
370}