kafka_api/schemata/
apikey.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::Display;
17use std::io;
18
19use crate::codec::err_codec_message;
20
21#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
22pub struct ApiMessageType {
23    api_key: i16,
24    lowest_supported_version: i16,
25    highest_supported_version: i16,
26    latest_version_unstable: bool,
27}
28
29impl ApiMessageType {
30    const fn new(
31        api_key: i16,
32        lowest_supported_version: i16,
33        highest_supported_version: i16,
34        latest_version_unstable: bool,
35    ) -> Self {
36        Self {
37            api_key,
38            lowest_supported_version,
39            highest_supported_version,
40            latest_version_unstable,
41        }
42    }
43
44    pub fn api_key(&self) -> i16 {
45        self.api_key
46    }
47
48    pub fn lowest_supported_version(&self) -> i16 {
49        self.lowest_supported_version
50    }
51
52    pub fn highest_supported_version(&self) -> i16 {
53        self.highest_supported_version
54    }
55
56    pub fn latest_version_unstable(&self) -> bool {
57        self.latest_version_unstable
58    }
59}
60
61impl ApiMessageType {
62    pub const PRODUCE: Self = ApiMessageType::new(0, 0, 9, false);
63    pub const FETCH: Self = ApiMessageType::new(1, 0, 15, false);
64    pub const METADATA: Self = ApiMessageType::new(3, 0, 12, false);
65    pub const OFFSET_FETCH: Self = ApiMessageType::new(9, 0, 8, false);
66    pub const FIND_COORDINATOR: Self = ApiMessageType::new(10, 0, 4, false);
67    pub const JOIN_GROUP: Self = ApiMessageType::new(11, 0, 9, false);
68    pub const HEARTBEAT: Self = ApiMessageType::new(12, 0, 4, false);
69    pub const SYNC_GROUP: Self = ApiMessageType::new(14, 0, 5, false);
70    pub const API_VERSIONS: Self = ApiMessageType::new(18, 0, 3, false);
71    pub const CREATE_TOPICS: Self = ApiMessageType::new(19, 0, 7, false);
72    pub const INIT_PRODUCER_ID: Self = ApiMessageType::new(22, 0, 4, false);
73}
74
75impl TryFrom<i16> for ApiMessageType {
76    type Error = io::Error;
77
78    fn try_from(api_key: i16) -> Result<Self, Self::Error> {
79        match api_key {
80            0 => Ok(ApiMessageType::PRODUCE),
81            1 => Ok(ApiMessageType::FETCH),
82            3 => Ok(ApiMessageType::METADATA),
83            9 => Ok(ApiMessageType::OFFSET_FETCH),
84            10 => Ok(ApiMessageType::FIND_COORDINATOR),
85            11 => Ok(ApiMessageType::JOIN_GROUP),
86            12 => Ok(ApiMessageType::HEARTBEAT),
87            14 => Ok(ApiMessageType::SYNC_GROUP),
88            18 => Ok(ApiMessageType::API_VERSIONS),
89            19 => Ok(ApiMessageType::CREATE_TOPICS),
90            22 => Ok(ApiMessageType::INIT_PRODUCER_ID),
91            _ => Err(err_codec_message(format!("unknown api key {api_key}"))),
92        }
93    }
94}
95
96impl Display for ApiMessageType {
97    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98        write!(
99            f,
100            "{}",
101            match *self {
102                ApiMessageType::PRODUCE => "Produce",
103                ApiMessageType::FETCH => "Fetch",
104                ApiMessageType::METADATA => "Metadata",
105                ApiMessageType::OFFSET_FETCH => "OffsetFetch",
106                ApiMessageType::FIND_COORDINATOR => "FindCoordinator",
107                ApiMessageType::JOIN_GROUP => "JoinGroup",
108                ApiMessageType::HEARTBEAT => "Heartbeat",
109                ApiMessageType::SYNC_GROUP => "SyncGroup",
110                ApiMessageType::API_VERSIONS => "ApiVersions",
111                ApiMessageType::CREATE_TOPICS => "CreateTopics",
112                ApiMessageType::INIT_PRODUCER_ID => "InitProducerId",
113                api_key => unreachable!("unknown api key {:?}", api_key),
114            }
115        )
116    }
117}
118
119impl ApiMessageType {
120    pub fn request_header_version(&self, api_version: i16) -> i16 {
121        // the current different is whether the request is flexible
122        fn resolve_request_header_version(flexible: bool) -> i16 {
123            if flexible {
124                2
125            } else {
126                1
127            }
128        }
129
130        match *self {
131            ApiMessageType::PRODUCE => resolve_request_header_version(api_version >= 9),
132            ApiMessageType::FETCH => resolve_request_header_version(api_version >= 12),
133            ApiMessageType::METADATA => resolve_request_header_version(api_version >= 9),
134            ApiMessageType::OFFSET_FETCH => resolve_request_header_version(api_version >= 6),
135            ApiMessageType::FIND_COORDINATOR => resolve_request_header_version(api_version >= 3),
136            ApiMessageType::JOIN_GROUP => resolve_request_header_version(api_version >= 6),
137            ApiMessageType::HEARTBEAT => resolve_request_header_version(api_version >= 4),
138            ApiMessageType::SYNC_GROUP => resolve_request_header_version(api_version >= 4),
139            ApiMessageType::API_VERSIONS => resolve_request_header_version(api_version >= 3),
140            ApiMessageType::CREATE_TOPICS => resolve_request_header_version(api_version >= 5),
141            ApiMessageType::INIT_PRODUCER_ID => resolve_request_header_version(api_version >= 2),
142            _ => unreachable!("unknown api type {}", self.api_key),
143        }
144    }
145
146    pub fn response_header_version(&self, api_version: i16) -> i16 {
147        // the current different is whether the response is flexible
148        fn resolve_response_header_version(flexible: bool) -> i16 {
149            if flexible {
150                1
151            } else {
152                0
153            }
154        }
155
156        match *self {
157            ApiMessageType::PRODUCE => resolve_response_header_version(api_version >= 9),
158            ApiMessageType::FETCH => resolve_response_header_version(api_version >= 12),
159            ApiMessageType::METADATA => resolve_response_header_version(api_version >= 9),
160            ApiMessageType::OFFSET_FETCH => resolve_response_header_version(api_version >= 6),
161            ApiMessageType::FIND_COORDINATOR => resolve_response_header_version(api_version >= 3),
162            ApiMessageType::JOIN_GROUP => resolve_response_header_version(api_version >= 6),
163            ApiMessageType::HEARTBEAT => resolve_response_header_version(api_version >= 4),
164            ApiMessageType::SYNC_GROUP => resolve_response_header_version(api_version >= 4),
165            ApiMessageType::API_VERSIONS => {
166                // ApiVersionsResponse always includes a v0 header.
167                // @see KIP-511 https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers
168                0
169            }
170            ApiMessageType::CREATE_TOPICS => resolve_response_header_version(api_version >= 5),
171            ApiMessageType::INIT_PRODUCER_ID => resolve_response_header_version(api_version >= 2),
172            _ => unreachable!("unknown api type {}", self.api_key),
173        }
174    }
175}