1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct Coordinator {
24 pub key: StrBytes,
28
29 pub node_id: super::BrokerId,
33
34 pub host: StrBytes,
38
39 pub port: i32,
43
44 pub error_code: i16,
48
49 pub error_message: Option<StrBytes>,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl Coordinator {
59 pub fn with_key(mut self, value: StrBytes) -> Self {
65 self.key = value;
66 self
67 }
68 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
74 self.node_id = value;
75 self
76 }
77 pub fn with_host(mut self, value: StrBytes) -> Self {
83 self.host = value;
84 self
85 }
86 pub fn with_port(mut self, value: i32) -> Self {
92 self.port = value;
93 self
94 }
95 pub fn with_error_code(mut self, value: i16) -> Self {
101 self.error_code = value;
102 self
103 }
104 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
110 self.error_message = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "broker")]
126impl Encodable for Coordinator {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version >= 4 {
129 types::CompactString.encode(buf, &self.key)?;
130 } else {
131 if !self.key.is_empty() {
132 bail!("A field is set that is not available on the selected protocol version");
133 }
134 }
135 if version >= 4 {
136 types::Int32.encode(buf, &self.node_id)?;
137 } else {
138 if self.node_id != 0 {
139 bail!("A field is set that is not available on the selected protocol version");
140 }
141 }
142 if version >= 4 {
143 types::CompactString.encode(buf, &self.host)?;
144 } else {
145 if !self.host.is_empty() {
146 bail!("A field is set that is not available on the selected protocol version");
147 }
148 }
149 if version >= 4 {
150 types::Int32.encode(buf, &self.port)?;
151 } else {
152 if self.port != 0 {
153 bail!("A field is set that is not available on the selected protocol version");
154 }
155 }
156 if version >= 4 {
157 types::Int16.encode(buf, &self.error_code)?;
158 } else {
159 if self.error_code != 0 {
160 bail!("A field is set that is not available on the selected protocol version");
161 }
162 }
163 if version >= 4 {
164 types::CompactString.encode(buf, &self.error_message)?;
165 }
166 if version >= 3 {
167 let num_tagged_fields = self.unknown_tagged_fields.len();
168 if num_tagged_fields > std::u32::MAX as usize {
169 bail!(
170 "Too many tagged fields to encode ({} fields)",
171 num_tagged_fields
172 );
173 }
174 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
175
176 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
177 }
178 Ok(())
179 }
180 fn compute_size(&self, version: i16) -> Result<usize> {
181 let mut total_size = 0;
182 if version >= 4 {
183 total_size += types::CompactString.compute_size(&self.key)?;
184 } else {
185 if !self.key.is_empty() {
186 bail!("A field is set that is not available on the selected protocol version");
187 }
188 }
189 if version >= 4 {
190 total_size += types::Int32.compute_size(&self.node_id)?;
191 } else {
192 if self.node_id != 0 {
193 bail!("A field is set that is not available on the selected protocol version");
194 }
195 }
196 if version >= 4 {
197 total_size += types::CompactString.compute_size(&self.host)?;
198 } else {
199 if !self.host.is_empty() {
200 bail!("A field is set that is not available on the selected protocol version");
201 }
202 }
203 if version >= 4 {
204 total_size += types::Int32.compute_size(&self.port)?;
205 } else {
206 if self.port != 0 {
207 bail!("A field is set that is not available on the selected protocol version");
208 }
209 }
210 if version >= 4 {
211 total_size += types::Int16.compute_size(&self.error_code)?;
212 } else {
213 if self.error_code != 0 {
214 bail!("A field is set that is not available on the selected protocol version");
215 }
216 }
217 if version >= 4 {
218 total_size += types::CompactString.compute_size(&self.error_message)?;
219 }
220 if version >= 3 {
221 let num_tagged_fields = self.unknown_tagged_fields.len();
222 if num_tagged_fields > std::u32::MAX as usize {
223 bail!(
224 "Too many tagged fields to encode ({} fields)",
225 num_tagged_fields
226 );
227 }
228 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
229
230 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
231 }
232 Ok(total_size)
233 }
234}
235
236#[cfg(feature = "client")]
237impl Decodable for Coordinator {
238 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
239 let key = if version >= 4 {
240 types::CompactString.decode(buf)?
241 } else {
242 Default::default()
243 };
244 let node_id = if version >= 4 {
245 types::Int32.decode(buf)?
246 } else {
247 (0).into()
248 };
249 let host = if version >= 4 {
250 types::CompactString.decode(buf)?
251 } else {
252 Default::default()
253 };
254 let port = if version >= 4 {
255 types::Int32.decode(buf)?
256 } else {
257 0
258 };
259 let error_code = if version >= 4 {
260 types::Int16.decode(buf)?
261 } else {
262 0
263 };
264 let error_message = if version >= 4 {
265 types::CompactString.decode(buf)?
266 } else {
267 Some(Default::default())
268 };
269 let mut unknown_tagged_fields = BTreeMap::new();
270 if version >= 3 {
271 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
272 for _ in 0..num_tagged_fields {
273 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
274 let size: u32 = types::UnsignedVarInt.decode(buf)?;
275 let unknown_value = buf.try_get_bytes(size as usize)?;
276 unknown_tagged_fields.insert(tag as i32, unknown_value);
277 }
278 }
279 Ok(Self {
280 key,
281 node_id,
282 host,
283 port,
284 error_code,
285 error_message,
286 unknown_tagged_fields,
287 })
288 }
289}
290
291impl Default for Coordinator {
292 fn default() -> Self {
293 Self {
294 key: Default::default(),
295 node_id: (0).into(),
296 host: Default::default(),
297 port: 0,
298 error_code: 0,
299 error_message: Some(Default::default()),
300 unknown_tagged_fields: BTreeMap::new(),
301 }
302 }
303}
304
305impl Message for Coordinator {
306 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
307 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
308}
309
310#[non_exhaustive]
312#[derive(Debug, Clone, PartialEq)]
313pub struct FindCoordinatorResponse {
314 pub throttle_time_ms: i32,
318
319 pub error_code: i16,
323
324 pub error_message: Option<StrBytes>,
328
329 pub node_id: super::BrokerId,
333
334 pub host: StrBytes,
338
339 pub port: i32,
343
344 pub coordinators: Vec<Coordinator>,
348
349 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
351}
352
353impl FindCoordinatorResponse {
354 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
360 self.throttle_time_ms = value;
361 self
362 }
363 pub fn with_error_code(mut self, value: i16) -> Self {
369 self.error_code = value;
370 self
371 }
372 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
378 self.error_message = value;
379 self
380 }
381 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
387 self.node_id = value;
388 self
389 }
390 pub fn with_host(mut self, value: StrBytes) -> Self {
396 self.host = value;
397 self
398 }
399 pub fn with_port(mut self, value: i32) -> Self {
405 self.port = value;
406 self
407 }
408 pub fn with_coordinators(mut self, value: Vec<Coordinator>) -> Self {
414 self.coordinators = value;
415 self
416 }
417 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
419 self.unknown_tagged_fields = value;
420 self
421 }
422 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
424 self.unknown_tagged_fields.insert(key, value);
425 self
426 }
427}
428
429#[cfg(feature = "broker")]
430impl Encodable for FindCoordinatorResponse {
431 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
432 if version >= 1 {
433 types::Int32.encode(buf, &self.throttle_time_ms)?;
434 }
435 if version <= 3 {
436 types::Int16.encode(buf, &self.error_code)?;
437 } else {
438 if self.error_code != 0 {
439 bail!("A field is set that is not available on the selected protocol version");
440 }
441 }
442 if version >= 1 && version <= 3 {
443 if version >= 3 {
444 types::CompactString.encode(buf, &self.error_message)?;
445 } else {
446 types::String.encode(buf, &self.error_message)?;
447 }
448 }
449 if version <= 3 {
450 types::Int32.encode(buf, &self.node_id)?;
451 } else {
452 if self.node_id != 0 {
453 bail!("A field is set that is not available on the selected protocol version");
454 }
455 }
456 if version <= 3 {
457 if version >= 3 {
458 types::CompactString.encode(buf, &self.host)?;
459 } else {
460 types::String.encode(buf, &self.host)?;
461 }
462 } else {
463 if !self.host.is_empty() {
464 bail!("A field is set that is not available on the selected protocol version");
465 }
466 }
467 if version <= 3 {
468 types::Int32.encode(buf, &self.port)?;
469 } else {
470 if self.port != 0 {
471 bail!("A field is set that is not available on the selected protocol version");
472 }
473 }
474 if version >= 4 {
475 types::CompactArray(types::Struct { version }).encode(buf, &self.coordinators)?;
476 } else {
477 if !self.coordinators.is_empty() {
478 bail!("A field is set that is not available on the selected protocol version");
479 }
480 }
481 if version >= 3 {
482 let num_tagged_fields = self.unknown_tagged_fields.len();
483 if num_tagged_fields > std::u32::MAX as usize {
484 bail!(
485 "Too many tagged fields to encode ({} fields)",
486 num_tagged_fields
487 );
488 }
489 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
490
491 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
492 }
493 Ok(())
494 }
495 fn compute_size(&self, version: i16) -> Result<usize> {
496 let mut total_size = 0;
497 if version >= 1 {
498 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
499 }
500 if version <= 3 {
501 total_size += types::Int16.compute_size(&self.error_code)?;
502 } else {
503 if self.error_code != 0 {
504 bail!("A field is set that is not available on the selected protocol version");
505 }
506 }
507 if version >= 1 && version <= 3 {
508 if version >= 3 {
509 total_size += types::CompactString.compute_size(&self.error_message)?;
510 } else {
511 total_size += types::String.compute_size(&self.error_message)?;
512 }
513 }
514 if version <= 3 {
515 total_size += types::Int32.compute_size(&self.node_id)?;
516 } else {
517 if self.node_id != 0 {
518 bail!("A field is set that is not available on the selected protocol version");
519 }
520 }
521 if version <= 3 {
522 if version >= 3 {
523 total_size += types::CompactString.compute_size(&self.host)?;
524 } else {
525 total_size += types::String.compute_size(&self.host)?;
526 }
527 } else {
528 if !self.host.is_empty() {
529 bail!("A field is set that is not available on the selected protocol version");
530 }
531 }
532 if version <= 3 {
533 total_size += types::Int32.compute_size(&self.port)?;
534 } else {
535 if self.port != 0 {
536 bail!("A field is set that is not available on the selected protocol version");
537 }
538 }
539 if version >= 4 {
540 total_size +=
541 types::CompactArray(types::Struct { version }).compute_size(&self.coordinators)?;
542 } else {
543 if !self.coordinators.is_empty() {
544 bail!("A field is set that is not available on the selected protocol version");
545 }
546 }
547 if version >= 3 {
548 let num_tagged_fields = self.unknown_tagged_fields.len();
549 if num_tagged_fields > std::u32::MAX as usize {
550 bail!(
551 "Too many tagged fields to encode ({} fields)",
552 num_tagged_fields
553 );
554 }
555 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
556
557 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
558 }
559 Ok(total_size)
560 }
561}
562
563#[cfg(feature = "client")]
564impl Decodable for FindCoordinatorResponse {
565 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
566 let throttle_time_ms = if version >= 1 {
567 types::Int32.decode(buf)?
568 } else {
569 0
570 };
571 let error_code = if version <= 3 {
572 types::Int16.decode(buf)?
573 } else {
574 0
575 };
576 let error_message = if version >= 1 && version <= 3 {
577 if version >= 3 {
578 types::CompactString.decode(buf)?
579 } else {
580 types::String.decode(buf)?
581 }
582 } else {
583 Some(Default::default())
584 };
585 let node_id = if version <= 3 {
586 types::Int32.decode(buf)?
587 } else {
588 (0).into()
589 };
590 let host = if version <= 3 {
591 if version >= 3 {
592 types::CompactString.decode(buf)?
593 } else {
594 types::String.decode(buf)?
595 }
596 } else {
597 Default::default()
598 };
599 let port = if version <= 3 {
600 types::Int32.decode(buf)?
601 } else {
602 0
603 };
604 let coordinators = if version >= 4 {
605 types::CompactArray(types::Struct { version }).decode(buf)?
606 } else {
607 Default::default()
608 };
609 let mut unknown_tagged_fields = BTreeMap::new();
610 if version >= 3 {
611 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
612 for _ in 0..num_tagged_fields {
613 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
614 let size: u32 = types::UnsignedVarInt.decode(buf)?;
615 let unknown_value = buf.try_get_bytes(size as usize)?;
616 unknown_tagged_fields.insert(tag as i32, unknown_value);
617 }
618 }
619 Ok(Self {
620 throttle_time_ms,
621 error_code,
622 error_message,
623 node_id,
624 host,
625 port,
626 coordinators,
627 unknown_tagged_fields,
628 })
629 }
630}
631
632impl Default for FindCoordinatorResponse {
633 fn default() -> Self {
634 Self {
635 throttle_time_ms: 0,
636 error_code: 0,
637 error_message: Some(Default::default()),
638 node_id: (0).into(),
639 host: Default::default(),
640 port: 0,
641 coordinators: Default::default(),
642 unknown_tagged_fields: BTreeMap::new(),
643 }
644 }
645}
646
647impl Message for FindCoordinatorResponse {
648 const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
649 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
650}
651
652impl HeaderVersion for FindCoordinatorResponse {
653 fn header_version(version: i16) -> i16 {
654 if version >= 3 {
655 1
656 } else {
657 0
658 }
659 }
660}