kafka_protocol/messages/
find_coordinator_request.rs1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FindCoordinatorRequest {
24 pub key: StrBytes,
28
29 pub key_type: i8,
33
34 pub coordinator_keys: Vec<StrBytes>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl FindCoordinatorRequest {
44 pub fn with_key(mut self, value: StrBytes) -> Self {
50 self.key = value;
51 self
52 }
53 pub fn with_key_type(mut self, value: i8) -> Self {
59 self.key_type = value;
60 self
61 }
62 pub fn with_coordinator_keys(mut self, value: Vec<StrBytes>) -> Self {
68 self.coordinator_keys = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for FindCoordinatorRequest {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version <= 3 {
87 if version >= 3 {
88 types::CompactString.encode(buf, &self.key)?;
89 } else {
90 types::String.encode(buf, &self.key)?;
91 }
92 } else {
93 if !self.key.is_empty() {
94 bail!("A field is set that is not available on the selected protocol version");
95 }
96 }
97 if version >= 1 {
98 types::Int8.encode(buf, &self.key_type)?;
99 } else {
100 if self.key_type != 0 {
101 bail!("A field is set that is not available on the selected protocol version");
102 }
103 }
104 if version >= 4 {
105 types::CompactArray(types::CompactString).encode(buf, &self.coordinator_keys)?;
106 } else {
107 if !self.coordinator_keys.is_empty() {
108 bail!("A field is set that is not available on the selected protocol version");
109 }
110 }
111 if version >= 3 {
112 let num_tagged_fields = self.unknown_tagged_fields.len();
113 if num_tagged_fields > std::u32::MAX as usize {
114 bail!(
115 "Too many tagged fields to encode ({} fields)",
116 num_tagged_fields
117 );
118 }
119 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
120
121 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
122 }
123 Ok(())
124 }
125 fn compute_size(&self, version: i16) -> Result<usize> {
126 let mut total_size = 0;
127 if version <= 3 {
128 if version >= 3 {
129 total_size += types::CompactString.compute_size(&self.key)?;
130 } else {
131 total_size += types::String.compute_size(&self.key)?;
132 }
133 } else {
134 if !self.key.is_empty() {
135 bail!("A field is set that is not available on the selected protocol version");
136 }
137 }
138 if version >= 1 {
139 total_size += types::Int8.compute_size(&self.key_type)?;
140 } else {
141 if self.key_type != 0 {
142 bail!("A field is set that is not available on the selected protocol version");
143 }
144 }
145 if version >= 4 {
146 total_size +=
147 types::CompactArray(types::CompactString).compute_size(&self.coordinator_keys)?;
148 } else {
149 if !self.coordinator_keys.is_empty() {
150 bail!("A field is set that is not available on the selected protocol version");
151 }
152 }
153 if version >= 3 {
154 let num_tagged_fields = self.unknown_tagged_fields.len();
155 if num_tagged_fields > std::u32::MAX as usize {
156 bail!(
157 "Too many tagged fields to encode ({} fields)",
158 num_tagged_fields
159 );
160 }
161 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
162
163 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
164 }
165 Ok(total_size)
166 }
167}
168
169#[cfg(feature = "broker")]
170impl Decodable for FindCoordinatorRequest {
171 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
172 let key = if version <= 3 {
173 if version >= 3 {
174 types::CompactString.decode(buf)?
175 } else {
176 types::String.decode(buf)?
177 }
178 } else {
179 Default::default()
180 };
181 let key_type = if version >= 1 {
182 types::Int8.decode(buf)?
183 } else {
184 0
185 };
186 let coordinator_keys = if version >= 4 {
187 types::CompactArray(types::CompactString).decode(buf)?
188 } else {
189 Default::default()
190 };
191 let mut unknown_tagged_fields = BTreeMap::new();
192 if version >= 3 {
193 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
194 for _ in 0..num_tagged_fields {
195 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
196 let size: u32 = types::UnsignedVarInt.decode(buf)?;
197 let unknown_value = buf.try_get_bytes(size as usize)?;
198 unknown_tagged_fields.insert(tag as i32, unknown_value);
199 }
200 }
201 Ok(Self {
202 key,
203 key_type,
204 coordinator_keys,
205 unknown_tagged_fields,
206 })
207 }
208}
209
210impl Default for FindCoordinatorRequest {
211 fn default() -> Self {
212 Self {
213 key: Default::default(),
214 key_type: 0,
215 coordinator_keys: Default::default(),
216 unknown_tagged_fields: BTreeMap::new(),
217 }
218 }
219}
220
221impl Message for FindCoordinatorRequest {
222 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
223 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 0 });
224}
225
226impl HeaderVersion for FindCoordinatorRequest {
227 fn header_version(version: i16) -> i16 {
228 if version >= 3 {
229 2
230 } else {
231 1
232 }
233 }
234}