kafka_api/schemata/
find_coordinator_response.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::WriteBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 adds throttle time and error messages.
21//
22// Starting in version 2, on quota violation, brokers send out responses before throttling.
23//
24// Version 3 is the first flexible version.
25//
26// Version 4 adds support for batching via Coordinators (KIP-699)
27
28#[derive(Debug, Default, Clone)]
29pub struct FindCoordinatorResponse {
30    /// The duration in milliseconds for which the request was throttled due to a quota violation,
31    /// or zero if the request did not violate any quota.
32    pub throttle_time_ms: i32,
33    /// The error code, or 0 if there was no error.
34    pub error_code: i16,
35    /// The error message, or null if there was no error.
36    pub error_message: Option<String>,
37    /// The node id.
38    pub node_id: i32,
39    /// The host name.
40    pub host: String,
41    /// The port.
42    pub port: i32,
43    /// Each coordinator result in the response
44    pub coordinators: Vec<Coordinator>,
45    /// Unknown tagged fields.
46    pub unknown_tagged_fields: Vec<RawTaggedField>,
47}
48
49impl Encodable for FindCoordinatorResponse {
50    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
51        if version >= 1 {
52            Int32.encode(buf, self.throttle_time_ms)?;
53        }
54        if version <= 3 {
55            Int16.encode(buf, self.error_code)?;
56        }
57        if (1..=3).contains(&version) {
58            NullableString(version >= 3).encode(buf, self.error_message.as_deref())?;
59        }
60        if version <= 3 {
61            Int32.encode(buf, self.node_id)?;
62        }
63        if version <= 3 {
64            NullableString(version >= 3).encode(buf, self.host.as_str())?;
65        }
66        if version <= 3 {
67            Int32.encode(buf, self.port)?;
68        }
69        if version >= 4 {
70            NullableArray(Struct(version), true).encode(buf, self.coordinators.as_slice())?;
71        }
72        if version >= 3 {
73            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
74        }
75        Ok(())
76    }
77
78    fn calculate_size(&self, version: i16) -> usize {
79        let mut res = 0;
80        if version >= 1 {
81            res += Int32::SIZE; // self.throttle_time_ms
82        }
83        if version <= 3 {
84            res += Int16::SIZE; // self.error_code
85        }
86        if (1..=3).contains(&version) {
87            res += NullableString(version >= 3).calculate_size(self.error_message.as_deref());
88        }
89        if version <= 3 {
90            res += Int32::SIZE; // self.node_id
91        }
92        if version <= 3 {
93            res += NullableString(version >= 3).calculate_size(self.host.as_str());
94        }
95        if version <= 3 {
96            res += Int32::SIZE; // self.port
97        }
98        if version >= 4 {
99            res +=
100                NullableArray(Struct(version), true).calculate_size(self.coordinators.as_slice());
101        }
102        if version >= 3 {
103            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
104        }
105        res
106    }
107}
108
109#[derive(Debug, Default, Clone)]
110pub struct Coordinator {
111    /// The coordinator key.
112    pub key: String,
113    /// The node id.
114    pub node_id: i32,
115    /// The host name.
116    pub host: String,
117    /// The port.
118    pub port: i32,
119    /// The error code, or 0 if there was no error.
120    pub error_code: i16,
121    /// The error message, or null if there was no error.
122    pub error_message: Option<String>,
123    /// Unknown tagged fields.
124    pub unknown_tagged_fields: Vec<RawTaggedField>,
125}
126
127impl Encodable for Coordinator {
128    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
129        if version > 4 {
130            Err(err_encode_message_unsupported(version, "Coordinator"))?
131        }
132        NullableString(true).encode(buf, self.key.as_str())?;
133        Int32.encode(buf, self.node_id)?;
134        NullableString(true).encode(buf, self.host.as_str())?;
135        Int32.encode(buf, self.port)?;
136        Int16.encode(buf, self.error_code)?;
137        NullableString(true).encode(buf, self.error_message.as_deref())?;
138        RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
139        Ok(())
140    }
141
142    fn calculate_size(&self, _version: i16) -> usize {
143        let mut res = 0;
144        res += NullableString(true).calculate_size(self.key.as_str());
145        res += Int32::SIZE; // self.node_id
146        res += NullableString(true).calculate_size(self.host.as_str());
147        res += Int32::SIZE; // self.port
148        res += Int16::SIZE; // self.error_code
149        res += NullableString(true).calculate_size(self.error_message.as_deref());
150        res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
151        res
152    }
153}