1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct Coordinator {
24 pub key: StrBytes,
28
29 pub node_id: super::BrokerId,
33
34 pub host: StrBytes,
38
39 pub port: i32,
43
44 pub error_code: i16,
48
49 pub error_message: Option<StrBytes>,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl Coordinator {
59 pub fn with_key(mut self, value: StrBytes) -> Self {
65 self.key = value;
66 self
67 }
68 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
74 self.node_id = value;
75 self
76 }
77 pub fn with_host(mut self, value: StrBytes) -> Self {
83 self.host = value;
84 self
85 }
86 pub fn with_port(mut self, value: i32) -> Self {
92 self.port = value;
93 self
94 }
95 pub fn with_error_code(mut self, value: i16) -> Self {
101 self.error_code = value;
102 self
103 }
104 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
110 self.error_message = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "broker")]
126impl Encodable for Coordinator {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version < 0 || version > 6 {
129 bail!("specified version not supported by this message type");
130 }
131 if version >= 4 {
132 types::CompactString.encode(buf, &self.key)?;
133 } else {
134 if !self.key.is_empty() {
135 bail!("A field is set that is not available on the selected protocol version");
136 }
137 }
138 if version >= 4 {
139 types::Int32.encode(buf, &self.node_id)?;
140 } else {
141 if self.node_id != 0 {
142 bail!("A field is set that is not available on the selected protocol version");
143 }
144 }
145 if version >= 4 {
146 types::CompactString.encode(buf, &self.host)?;
147 } else {
148 if !self.host.is_empty() {
149 bail!("A field is set that is not available on the selected protocol version");
150 }
151 }
152 if version >= 4 {
153 types::Int32.encode(buf, &self.port)?;
154 } else {
155 if self.port != 0 {
156 bail!("A field is set that is not available on the selected protocol version");
157 }
158 }
159 if version >= 4 {
160 types::Int16.encode(buf, &self.error_code)?;
161 } else {
162 if self.error_code != 0 {
163 bail!("A field is set that is not available on the selected protocol version");
164 }
165 }
166 if version >= 4 {
167 types::CompactString.encode(buf, &self.error_message)?;
168 }
169 if version >= 3 {
170 let num_tagged_fields = self.unknown_tagged_fields.len();
171 if num_tagged_fields > std::u32::MAX as usize {
172 bail!(
173 "Too many tagged fields to encode ({} fields)",
174 num_tagged_fields
175 );
176 }
177 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
178
179 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
180 }
181 Ok(())
182 }
183 fn compute_size(&self, version: i16) -> Result<usize> {
184 let mut total_size = 0;
185 if version >= 4 {
186 total_size += types::CompactString.compute_size(&self.key)?;
187 } else {
188 if !self.key.is_empty() {
189 bail!("A field is set that is not available on the selected protocol version");
190 }
191 }
192 if version >= 4 {
193 total_size += types::Int32.compute_size(&self.node_id)?;
194 } else {
195 if self.node_id != 0 {
196 bail!("A field is set that is not available on the selected protocol version");
197 }
198 }
199 if version >= 4 {
200 total_size += types::CompactString.compute_size(&self.host)?;
201 } else {
202 if !self.host.is_empty() {
203 bail!("A field is set that is not available on the selected protocol version");
204 }
205 }
206 if version >= 4 {
207 total_size += types::Int32.compute_size(&self.port)?;
208 } else {
209 if self.port != 0 {
210 bail!("A field is set that is not available on the selected protocol version");
211 }
212 }
213 if version >= 4 {
214 total_size += types::Int16.compute_size(&self.error_code)?;
215 } else {
216 if self.error_code != 0 {
217 bail!("A field is set that is not available on the selected protocol version");
218 }
219 }
220 if version >= 4 {
221 total_size += types::CompactString.compute_size(&self.error_message)?;
222 }
223 if version >= 3 {
224 let num_tagged_fields = self.unknown_tagged_fields.len();
225 if num_tagged_fields > std::u32::MAX as usize {
226 bail!(
227 "Too many tagged fields to encode ({} fields)",
228 num_tagged_fields
229 );
230 }
231 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
232
233 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
234 }
235 Ok(total_size)
236 }
237}
238
239#[cfg(feature = "client")]
240impl Decodable for Coordinator {
241 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
242 if version < 0 || version > 6 {
243 bail!("specified version not supported by this message type");
244 }
245 let key = if version >= 4 {
246 types::CompactString.decode(buf)?
247 } else {
248 Default::default()
249 };
250 let node_id = if version >= 4 {
251 types::Int32.decode(buf)?
252 } else {
253 (0).into()
254 };
255 let host = if version >= 4 {
256 types::CompactString.decode(buf)?
257 } else {
258 Default::default()
259 };
260 let port = if version >= 4 {
261 types::Int32.decode(buf)?
262 } else {
263 0
264 };
265 let error_code = if version >= 4 {
266 types::Int16.decode(buf)?
267 } else {
268 0
269 };
270 let error_message = if version >= 4 {
271 types::CompactString.decode(buf)?
272 } else {
273 Some(Default::default())
274 };
275 let mut unknown_tagged_fields = BTreeMap::new();
276 if version >= 3 {
277 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
278 for _ in 0..num_tagged_fields {
279 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
280 let size: u32 = types::UnsignedVarInt.decode(buf)?;
281 let unknown_value = buf.try_get_bytes(size as usize)?;
282 unknown_tagged_fields.insert(tag as i32, unknown_value);
283 }
284 }
285 Ok(Self {
286 key,
287 node_id,
288 host,
289 port,
290 error_code,
291 error_message,
292 unknown_tagged_fields,
293 })
294 }
295}
296
297impl Default for Coordinator {
298 fn default() -> Self {
299 Self {
300 key: Default::default(),
301 node_id: (0).into(),
302 host: Default::default(),
303 port: 0,
304 error_code: 0,
305 error_message: Some(Default::default()),
306 unknown_tagged_fields: BTreeMap::new(),
307 }
308 }
309}
310
311impl Message for Coordinator {
312 const VERSIONS: VersionRange = VersionRange { min: 0, max: 6 };
313 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
314}
315
316#[non_exhaustive]
318#[derive(Debug, Clone, PartialEq)]
319pub struct FindCoordinatorResponse {
320 pub throttle_time_ms: i32,
324
325 pub error_code: i16,
329
330 pub error_message: Option<StrBytes>,
334
335 pub node_id: super::BrokerId,
339
340 pub host: StrBytes,
344
345 pub port: i32,
349
350 pub coordinators: Vec<Coordinator>,
354
355 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
357}
358
359impl FindCoordinatorResponse {
360 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
366 self.throttle_time_ms = value;
367 self
368 }
369 pub fn with_error_code(mut self, value: i16) -> Self {
375 self.error_code = value;
376 self
377 }
378 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
384 self.error_message = value;
385 self
386 }
387 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
393 self.node_id = value;
394 self
395 }
396 pub fn with_host(mut self, value: StrBytes) -> Self {
402 self.host = value;
403 self
404 }
405 pub fn with_port(mut self, value: i32) -> Self {
411 self.port = value;
412 self
413 }
414 pub fn with_coordinators(mut self, value: Vec<Coordinator>) -> Self {
420 self.coordinators = value;
421 self
422 }
423 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
425 self.unknown_tagged_fields = value;
426 self
427 }
428 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
430 self.unknown_tagged_fields.insert(key, value);
431 self
432 }
433}
434
435#[cfg(feature = "broker")]
436impl Encodable for FindCoordinatorResponse {
437 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
438 if version < 0 || version > 6 {
439 bail!("specified version not supported by this message type");
440 }
441 if version >= 1 {
442 types::Int32.encode(buf, &self.throttle_time_ms)?;
443 }
444 if version <= 3 {
445 types::Int16.encode(buf, &self.error_code)?;
446 } else {
447 if self.error_code != 0 {
448 bail!("A field is set that is not available on the selected protocol version");
449 }
450 }
451 if version >= 1 && version <= 3 {
452 if version >= 3 {
453 types::CompactString.encode(buf, &self.error_message)?;
454 } else {
455 types::String.encode(buf, &self.error_message)?;
456 }
457 }
458 if version <= 3 {
459 types::Int32.encode(buf, &self.node_id)?;
460 } else {
461 if self.node_id != 0 {
462 bail!("A field is set that is not available on the selected protocol version");
463 }
464 }
465 if version <= 3 {
466 if version >= 3 {
467 types::CompactString.encode(buf, &self.host)?;
468 } else {
469 types::String.encode(buf, &self.host)?;
470 }
471 } else {
472 if !self.host.is_empty() {
473 bail!("A field is set that is not available on the selected protocol version");
474 }
475 }
476 if version <= 3 {
477 types::Int32.encode(buf, &self.port)?;
478 } else {
479 if self.port != 0 {
480 bail!("A field is set that is not available on the selected protocol version");
481 }
482 }
483 if version >= 4 {
484 types::CompactArray(types::Struct { version }).encode(buf, &self.coordinators)?;
485 } else {
486 if !self.coordinators.is_empty() {
487 bail!("A field is set that is not available on the selected protocol version");
488 }
489 }
490 if version >= 3 {
491 let num_tagged_fields = self.unknown_tagged_fields.len();
492 if num_tagged_fields > std::u32::MAX as usize {
493 bail!(
494 "Too many tagged fields to encode ({} fields)",
495 num_tagged_fields
496 );
497 }
498 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
499
500 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
501 }
502 Ok(())
503 }
504 fn compute_size(&self, version: i16) -> Result<usize> {
505 let mut total_size = 0;
506 if version >= 1 {
507 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
508 }
509 if version <= 3 {
510 total_size += types::Int16.compute_size(&self.error_code)?;
511 } else {
512 if self.error_code != 0 {
513 bail!("A field is set that is not available on the selected protocol version");
514 }
515 }
516 if version >= 1 && version <= 3 {
517 if version >= 3 {
518 total_size += types::CompactString.compute_size(&self.error_message)?;
519 } else {
520 total_size += types::String.compute_size(&self.error_message)?;
521 }
522 }
523 if version <= 3 {
524 total_size += types::Int32.compute_size(&self.node_id)?;
525 } else {
526 if self.node_id != 0 {
527 bail!("A field is set that is not available on the selected protocol version");
528 }
529 }
530 if version <= 3 {
531 if version >= 3 {
532 total_size += types::CompactString.compute_size(&self.host)?;
533 } else {
534 total_size += types::String.compute_size(&self.host)?;
535 }
536 } else {
537 if !self.host.is_empty() {
538 bail!("A field is set that is not available on the selected protocol version");
539 }
540 }
541 if version <= 3 {
542 total_size += types::Int32.compute_size(&self.port)?;
543 } else {
544 if self.port != 0 {
545 bail!("A field is set that is not available on the selected protocol version");
546 }
547 }
548 if version >= 4 {
549 total_size +=
550 types::CompactArray(types::Struct { version }).compute_size(&self.coordinators)?;
551 } else {
552 if !self.coordinators.is_empty() {
553 bail!("A field is set that is not available on the selected protocol version");
554 }
555 }
556 if version >= 3 {
557 let num_tagged_fields = self.unknown_tagged_fields.len();
558 if num_tagged_fields > std::u32::MAX as usize {
559 bail!(
560 "Too many tagged fields to encode ({} fields)",
561 num_tagged_fields
562 );
563 }
564 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
565
566 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
567 }
568 Ok(total_size)
569 }
570}
571
572#[cfg(feature = "client")]
573impl Decodable for FindCoordinatorResponse {
574 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
575 if version < 0 || version > 6 {
576 bail!("specified version not supported by this message type");
577 }
578 let throttle_time_ms = if version >= 1 {
579 types::Int32.decode(buf)?
580 } else {
581 0
582 };
583 let error_code = if version <= 3 {
584 types::Int16.decode(buf)?
585 } else {
586 0
587 };
588 let error_message = if version >= 1 && version <= 3 {
589 if version >= 3 {
590 types::CompactString.decode(buf)?
591 } else {
592 types::String.decode(buf)?
593 }
594 } else {
595 Some(Default::default())
596 };
597 let node_id = if version <= 3 {
598 types::Int32.decode(buf)?
599 } else {
600 (0).into()
601 };
602 let host = if version <= 3 {
603 if version >= 3 {
604 types::CompactString.decode(buf)?
605 } else {
606 types::String.decode(buf)?
607 }
608 } else {
609 Default::default()
610 };
611 let port = if version <= 3 {
612 types::Int32.decode(buf)?
613 } else {
614 0
615 };
616 let coordinators = if version >= 4 {
617 types::CompactArray(types::Struct { version }).decode(buf)?
618 } else {
619 Default::default()
620 };
621 let mut unknown_tagged_fields = BTreeMap::new();
622 if version >= 3 {
623 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
624 for _ in 0..num_tagged_fields {
625 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
626 let size: u32 = types::UnsignedVarInt.decode(buf)?;
627 let unknown_value = buf.try_get_bytes(size as usize)?;
628 unknown_tagged_fields.insert(tag as i32, unknown_value);
629 }
630 }
631 Ok(Self {
632 throttle_time_ms,
633 error_code,
634 error_message,
635 node_id,
636 host,
637 port,
638 coordinators,
639 unknown_tagged_fields,
640 })
641 }
642}
643
644impl Default for FindCoordinatorResponse {
645 fn default() -> Self {
646 Self {
647 throttle_time_ms: 0,
648 error_code: 0,
649 error_message: Some(Default::default()),
650 node_id: (0).into(),
651 host: Default::default(),
652 port: 0,
653 coordinators: Default::default(),
654 unknown_tagged_fields: BTreeMap::new(),
655 }
656 }
657}
658
659impl Message for FindCoordinatorResponse {
660 const VERSIONS: VersionRange = VersionRange { min: 0, max: 6 };
661 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
662}
663
664impl HeaderVersion for FindCoordinatorResponse {
665 fn header_version(version: i16) -> i16 {
666 if version >= 3 {
667 1
668 } else {
669 0
670 }
671 }
672}