kafka_protocol/messages/
find_coordinator_response.rs

1//! FindCoordinatorResponse
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/FindCoordinatorResponse.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-5
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct Coordinator {
24    /// The coordinator key.
25    ///
26    /// Supported API versions: 4-5
27    pub key: StrBytes,
28
29    /// The node id.
30    ///
31    /// Supported API versions: 4-5
32    pub node_id: super::BrokerId,
33
34    /// The host name.
35    ///
36    /// Supported API versions: 4-5
37    pub host: StrBytes,
38
39    /// The port.
40    ///
41    /// Supported API versions: 4-5
42    pub port: i32,
43
44    /// The error code, or 0 if there was no error.
45    ///
46    /// Supported API versions: 4-5
47    pub error_code: i16,
48
49    /// The error message, or null if there was no error.
50    ///
51    /// Supported API versions: 4-5
52    pub error_message: Option<StrBytes>,
53
54    /// Other tagged fields
55    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl Coordinator {
59    /// Sets `key` to the passed value.
60    ///
61    /// The coordinator key.
62    ///
63    /// Supported API versions: 4-5
64    pub fn with_key(mut self, value: StrBytes) -> Self {
65        self.key = value;
66        self
67    }
68    /// Sets `node_id` to the passed value.
69    ///
70    /// The node id.
71    ///
72    /// Supported API versions: 4-5
73    pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
74        self.node_id = value;
75        self
76    }
77    /// Sets `host` to the passed value.
78    ///
79    /// The host name.
80    ///
81    /// Supported API versions: 4-5
82    pub fn with_host(mut self, value: StrBytes) -> Self {
83        self.host = value;
84        self
85    }
86    /// Sets `port` to the passed value.
87    ///
88    /// The port.
89    ///
90    /// Supported API versions: 4-5
91    pub fn with_port(mut self, value: i32) -> Self {
92        self.port = value;
93        self
94    }
95    /// Sets `error_code` to the passed value.
96    ///
97    /// The error code, or 0 if there was no error.
98    ///
99    /// Supported API versions: 4-5
100    pub fn with_error_code(mut self, value: i16) -> Self {
101        self.error_code = value;
102        self
103    }
104    /// Sets `error_message` to the passed value.
105    ///
106    /// The error message, or null if there was no error.
107    ///
108    /// Supported API versions: 4-5
109    pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
110        self.error_message = value;
111        self
112    }
113    /// Sets unknown_tagged_fields to the passed value.
114    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115        self.unknown_tagged_fields = value;
116        self
117    }
118    /// Inserts an entry into unknown_tagged_fields.
119    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120        self.unknown_tagged_fields.insert(key, value);
121        self
122    }
123}
124
125#[cfg(feature = "broker")]
126impl Encodable for Coordinator {
127    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128        if version >= 4 {
129            types::CompactString.encode(buf, &self.key)?;
130        } else {
131            if !self.key.is_empty() {
132                bail!("A field is set that is not available on the selected protocol version");
133            }
134        }
135        if version >= 4 {
136            types::Int32.encode(buf, &self.node_id)?;
137        } else {
138            if self.node_id != 0 {
139                bail!("A field is set that is not available on the selected protocol version");
140            }
141        }
142        if version >= 4 {
143            types::CompactString.encode(buf, &self.host)?;
144        } else {
145            if !self.host.is_empty() {
146                bail!("A field is set that is not available on the selected protocol version");
147            }
148        }
149        if version >= 4 {
150            types::Int32.encode(buf, &self.port)?;
151        } else {
152            if self.port != 0 {
153                bail!("A field is set that is not available on the selected protocol version");
154            }
155        }
156        if version >= 4 {
157            types::Int16.encode(buf, &self.error_code)?;
158        } else {
159            if self.error_code != 0 {
160                bail!("A field is set that is not available on the selected protocol version");
161            }
162        }
163        if version >= 4 {
164            types::CompactString.encode(buf, &self.error_message)?;
165        }
166        if version >= 3 {
167            let num_tagged_fields = self.unknown_tagged_fields.len();
168            if num_tagged_fields > std::u32::MAX as usize {
169                bail!(
170                    "Too many tagged fields to encode ({} fields)",
171                    num_tagged_fields
172                );
173            }
174            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
175
176            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
177        }
178        Ok(())
179    }
180    fn compute_size(&self, version: i16) -> Result<usize> {
181        let mut total_size = 0;
182        if version >= 4 {
183            total_size += types::CompactString.compute_size(&self.key)?;
184        } else {
185            if !self.key.is_empty() {
186                bail!("A field is set that is not available on the selected protocol version");
187            }
188        }
189        if version >= 4 {
190            total_size += types::Int32.compute_size(&self.node_id)?;
191        } else {
192            if self.node_id != 0 {
193                bail!("A field is set that is not available on the selected protocol version");
194            }
195        }
196        if version >= 4 {
197            total_size += types::CompactString.compute_size(&self.host)?;
198        } else {
199            if !self.host.is_empty() {
200                bail!("A field is set that is not available on the selected protocol version");
201            }
202        }
203        if version >= 4 {
204            total_size += types::Int32.compute_size(&self.port)?;
205        } else {
206            if self.port != 0 {
207                bail!("A field is set that is not available on the selected protocol version");
208            }
209        }
210        if version >= 4 {
211            total_size += types::Int16.compute_size(&self.error_code)?;
212        } else {
213            if self.error_code != 0 {
214                bail!("A field is set that is not available on the selected protocol version");
215            }
216        }
217        if version >= 4 {
218            total_size += types::CompactString.compute_size(&self.error_message)?;
219        }
220        if version >= 3 {
221            let num_tagged_fields = self.unknown_tagged_fields.len();
222            if num_tagged_fields > std::u32::MAX as usize {
223                bail!(
224                    "Too many tagged fields to encode ({} fields)",
225                    num_tagged_fields
226                );
227            }
228            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
229
230            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
231        }
232        Ok(total_size)
233    }
234}
235
236#[cfg(feature = "client")]
237impl Decodable for Coordinator {
238    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
239        let key = if version >= 4 {
240            types::CompactString.decode(buf)?
241        } else {
242            Default::default()
243        };
244        let node_id = if version >= 4 {
245            types::Int32.decode(buf)?
246        } else {
247            (0).into()
248        };
249        let host = if version >= 4 {
250            types::CompactString.decode(buf)?
251        } else {
252            Default::default()
253        };
254        let port = if version >= 4 {
255            types::Int32.decode(buf)?
256        } else {
257            0
258        };
259        let error_code = if version >= 4 {
260            types::Int16.decode(buf)?
261        } else {
262            0
263        };
264        let error_message = if version >= 4 {
265            types::CompactString.decode(buf)?
266        } else {
267            Some(Default::default())
268        };
269        let mut unknown_tagged_fields = BTreeMap::new();
270        if version >= 3 {
271            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
272            for _ in 0..num_tagged_fields {
273                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
274                let size: u32 = types::UnsignedVarInt.decode(buf)?;
275                let unknown_value = buf.try_get_bytes(size as usize)?;
276                unknown_tagged_fields.insert(tag as i32, unknown_value);
277            }
278        }
279        Ok(Self {
280            key,
281            node_id,
282            host,
283            port,
284            error_code,
285            error_message,
286            unknown_tagged_fields,
287        })
288    }
289}
290
291impl Default for Coordinator {
292    fn default() -> Self {
293        Self {
294            key: Default::default(),
295            node_id: (0).into(),
296            host: Default::default(),
297            port: 0,
298            error_code: 0,
299            error_message: Some(Default::default()),
300            unknown_tagged_fields: BTreeMap::new(),
301        }
302    }
303}
304
305impl Message for Coordinator {
306    const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
307    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
308}
309
310/// Valid versions: 0-5
311#[non_exhaustive]
312#[derive(Debug, Clone, PartialEq)]
313pub struct FindCoordinatorResponse {
314    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
315    ///
316    /// Supported API versions: 1-5
317    pub throttle_time_ms: i32,
318
319    /// The error code, or 0 if there was no error.
320    ///
321    /// Supported API versions: 0-3
322    pub error_code: i16,
323
324    /// The error message, or null if there was no error.
325    ///
326    /// Supported API versions: 1-3
327    pub error_message: Option<StrBytes>,
328
329    /// The node id.
330    ///
331    /// Supported API versions: 0-3
332    pub node_id: super::BrokerId,
333
334    /// The host name.
335    ///
336    /// Supported API versions: 0-3
337    pub host: StrBytes,
338
339    /// The port.
340    ///
341    /// Supported API versions: 0-3
342    pub port: i32,
343
344    /// Each coordinator result in the response
345    ///
346    /// Supported API versions: 4-5
347    pub coordinators: Vec<Coordinator>,
348
349    /// Other tagged fields
350    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
351}
352
353impl FindCoordinatorResponse {
354    /// Sets `throttle_time_ms` to the passed value.
355    ///
356    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
357    ///
358    /// Supported API versions: 1-5
359    pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
360        self.throttle_time_ms = value;
361        self
362    }
363    /// Sets `error_code` to the passed value.
364    ///
365    /// The error code, or 0 if there was no error.
366    ///
367    /// Supported API versions: 0-3
368    pub fn with_error_code(mut self, value: i16) -> Self {
369        self.error_code = value;
370        self
371    }
372    /// Sets `error_message` to the passed value.
373    ///
374    /// The error message, or null if there was no error.
375    ///
376    /// Supported API versions: 1-3
377    pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
378        self.error_message = value;
379        self
380    }
381    /// Sets `node_id` to the passed value.
382    ///
383    /// The node id.
384    ///
385    /// Supported API versions: 0-3
386    pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
387        self.node_id = value;
388        self
389    }
390    /// Sets `host` to the passed value.
391    ///
392    /// The host name.
393    ///
394    /// Supported API versions: 0-3
395    pub fn with_host(mut self, value: StrBytes) -> Self {
396        self.host = value;
397        self
398    }
399    /// Sets `port` to the passed value.
400    ///
401    /// The port.
402    ///
403    /// Supported API versions: 0-3
404    pub fn with_port(mut self, value: i32) -> Self {
405        self.port = value;
406        self
407    }
408    /// Sets `coordinators` to the passed value.
409    ///
410    /// Each coordinator result in the response
411    ///
412    /// Supported API versions: 4-5
413    pub fn with_coordinators(mut self, value: Vec<Coordinator>) -> Self {
414        self.coordinators = value;
415        self
416    }
417    /// Sets unknown_tagged_fields to the passed value.
418    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
419        self.unknown_tagged_fields = value;
420        self
421    }
422    /// Inserts an entry into unknown_tagged_fields.
423    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
424        self.unknown_tagged_fields.insert(key, value);
425        self
426    }
427}
428
429#[cfg(feature = "broker")]
430impl Encodable for FindCoordinatorResponse {
431    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
432        if version >= 1 {
433            types::Int32.encode(buf, &self.throttle_time_ms)?;
434        }
435        if version <= 3 {
436            types::Int16.encode(buf, &self.error_code)?;
437        } else {
438            if self.error_code != 0 {
439                bail!("A field is set that is not available on the selected protocol version");
440            }
441        }
442        if version >= 1 && version <= 3 {
443            if version >= 3 {
444                types::CompactString.encode(buf, &self.error_message)?;
445            } else {
446                types::String.encode(buf, &self.error_message)?;
447            }
448        }
449        if version <= 3 {
450            types::Int32.encode(buf, &self.node_id)?;
451        } else {
452            if self.node_id != 0 {
453                bail!("A field is set that is not available on the selected protocol version");
454            }
455        }
456        if version <= 3 {
457            if version >= 3 {
458                types::CompactString.encode(buf, &self.host)?;
459            } else {
460                types::String.encode(buf, &self.host)?;
461            }
462        } else {
463            if !self.host.is_empty() {
464                bail!("A field is set that is not available on the selected protocol version");
465            }
466        }
467        if version <= 3 {
468            types::Int32.encode(buf, &self.port)?;
469        } else {
470            if self.port != 0 {
471                bail!("A field is set that is not available on the selected protocol version");
472            }
473        }
474        if version >= 4 {
475            types::CompactArray(types::Struct { version }).encode(buf, &self.coordinators)?;
476        } else {
477            if !self.coordinators.is_empty() {
478                bail!("A field is set that is not available on the selected protocol version");
479            }
480        }
481        if version >= 3 {
482            let num_tagged_fields = self.unknown_tagged_fields.len();
483            if num_tagged_fields > std::u32::MAX as usize {
484                bail!(
485                    "Too many tagged fields to encode ({} fields)",
486                    num_tagged_fields
487                );
488            }
489            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
490
491            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
492        }
493        Ok(())
494    }
495    fn compute_size(&self, version: i16) -> Result<usize> {
496        let mut total_size = 0;
497        if version >= 1 {
498            total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
499        }
500        if version <= 3 {
501            total_size += types::Int16.compute_size(&self.error_code)?;
502        } else {
503            if self.error_code != 0 {
504                bail!("A field is set that is not available on the selected protocol version");
505            }
506        }
507        if version >= 1 && version <= 3 {
508            if version >= 3 {
509                total_size += types::CompactString.compute_size(&self.error_message)?;
510            } else {
511                total_size += types::String.compute_size(&self.error_message)?;
512            }
513        }
514        if version <= 3 {
515            total_size += types::Int32.compute_size(&self.node_id)?;
516        } else {
517            if self.node_id != 0 {
518                bail!("A field is set that is not available on the selected protocol version");
519            }
520        }
521        if version <= 3 {
522            if version >= 3 {
523                total_size += types::CompactString.compute_size(&self.host)?;
524            } else {
525                total_size += types::String.compute_size(&self.host)?;
526            }
527        } else {
528            if !self.host.is_empty() {
529                bail!("A field is set that is not available on the selected protocol version");
530            }
531        }
532        if version <= 3 {
533            total_size += types::Int32.compute_size(&self.port)?;
534        } else {
535            if self.port != 0 {
536                bail!("A field is set that is not available on the selected protocol version");
537            }
538        }
539        if version >= 4 {
540            total_size +=
541                types::CompactArray(types::Struct { version }).compute_size(&self.coordinators)?;
542        } else {
543            if !self.coordinators.is_empty() {
544                bail!("A field is set that is not available on the selected protocol version");
545            }
546        }
547        if version >= 3 {
548            let num_tagged_fields = self.unknown_tagged_fields.len();
549            if num_tagged_fields > std::u32::MAX as usize {
550                bail!(
551                    "Too many tagged fields to encode ({} fields)",
552                    num_tagged_fields
553                );
554            }
555            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
556
557            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
558        }
559        Ok(total_size)
560    }
561}
562
563#[cfg(feature = "client")]
564impl Decodable for FindCoordinatorResponse {
565    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
566        let throttle_time_ms = if version >= 1 {
567            types::Int32.decode(buf)?
568        } else {
569            0
570        };
571        let error_code = if version <= 3 {
572            types::Int16.decode(buf)?
573        } else {
574            0
575        };
576        let error_message = if version >= 1 && version <= 3 {
577            if version >= 3 {
578                types::CompactString.decode(buf)?
579            } else {
580                types::String.decode(buf)?
581            }
582        } else {
583            Some(Default::default())
584        };
585        let node_id = if version <= 3 {
586            types::Int32.decode(buf)?
587        } else {
588            (0).into()
589        };
590        let host = if version <= 3 {
591            if version >= 3 {
592                types::CompactString.decode(buf)?
593            } else {
594                types::String.decode(buf)?
595            }
596        } else {
597            Default::default()
598        };
599        let port = if version <= 3 {
600            types::Int32.decode(buf)?
601        } else {
602            0
603        };
604        let coordinators = if version >= 4 {
605            types::CompactArray(types::Struct { version }).decode(buf)?
606        } else {
607            Default::default()
608        };
609        let mut unknown_tagged_fields = BTreeMap::new();
610        if version >= 3 {
611            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
612            for _ in 0..num_tagged_fields {
613                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
614                let size: u32 = types::UnsignedVarInt.decode(buf)?;
615                let unknown_value = buf.try_get_bytes(size as usize)?;
616                unknown_tagged_fields.insert(tag as i32, unknown_value);
617            }
618        }
619        Ok(Self {
620            throttle_time_ms,
621            error_code,
622            error_message,
623            node_id,
624            host,
625            port,
626            coordinators,
627            unknown_tagged_fields,
628        })
629    }
630}
631
632impl Default for FindCoordinatorResponse {
633    fn default() -> Self {
634        Self {
635            throttle_time_ms: 0,
636            error_code: 0,
637            error_message: Some(Default::default()),
638            node_id: (0).into(),
639            host: Default::default(),
640            port: 0,
641            coordinators: Default::default(),
642            unknown_tagged_fields: BTreeMap::new(),
643        }
644    }
645}
646
647impl Message for FindCoordinatorResponse {
648    const VERSIONS: VersionRange = VersionRange { min: 0, max: 5 };
649    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
650}
651
652impl HeaderVersion for FindCoordinatorResponse {
653    fn header_version(version: i16) -> i16 {
654        if version >= 3 {
655            1
656        } else {
657            0
658        }
659    }
660}