1use std::fmt;
16use std::fmt::Display;
17use std::io;
18
19use crate::codec::err_codec_message;
20
21#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
22pub struct ApiMessageType {
23 api_key: i16,
24 lowest_supported_version: i16,
25 highest_supported_version: i16,
26 latest_version_unstable: bool,
27}
28
29impl ApiMessageType {
30 const fn new(
31 api_key: i16,
32 lowest_supported_version: i16,
33 highest_supported_version: i16,
34 latest_version_unstable: bool,
35 ) -> Self {
36 Self {
37 api_key,
38 lowest_supported_version,
39 highest_supported_version,
40 latest_version_unstable,
41 }
42 }
43
44 pub fn api_key(&self) -> i16 {
45 self.api_key
46 }
47
48 pub fn lowest_supported_version(&self) -> i16 {
49 self.lowest_supported_version
50 }
51
52 pub fn highest_supported_version(&self) -> i16 {
53 self.highest_supported_version
54 }
55
56 pub fn latest_version_unstable(&self) -> bool {
57 self.latest_version_unstable
58 }
59}
60
61impl ApiMessageType {
62 pub const PRODUCE: Self = ApiMessageType::new(0, 0, 9, false);
63 pub const FETCH: Self = ApiMessageType::new(1, 0, 15, false);
64 pub const METADATA: Self = ApiMessageType::new(3, 0, 12, false);
65 pub const OFFSET_FETCH: Self = ApiMessageType::new(9, 0, 8, false);
66 pub const FIND_COORDINATOR: Self = ApiMessageType::new(10, 0, 4, false);
67 pub const JOIN_GROUP: Self = ApiMessageType::new(11, 0, 9, false);
68 pub const HEARTBEAT: Self = ApiMessageType::new(12, 0, 4, false);
69 pub const SYNC_GROUP: Self = ApiMessageType::new(14, 0, 5, false);
70 pub const API_VERSIONS: Self = ApiMessageType::new(18, 0, 3, false);
71 pub const CREATE_TOPICS: Self = ApiMessageType::new(19, 0, 7, false);
72 pub const INIT_PRODUCER_ID: Self = ApiMessageType::new(22, 0, 4, false);
73}
74
75impl TryFrom<i16> for ApiMessageType {
76 type Error = io::Error;
77
78 fn try_from(api_key: i16) -> Result<Self, Self::Error> {
79 match api_key {
80 0 => Ok(ApiMessageType::PRODUCE),
81 1 => Ok(ApiMessageType::FETCH),
82 3 => Ok(ApiMessageType::METADATA),
83 9 => Ok(ApiMessageType::OFFSET_FETCH),
84 10 => Ok(ApiMessageType::FIND_COORDINATOR),
85 11 => Ok(ApiMessageType::JOIN_GROUP),
86 12 => Ok(ApiMessageType::HEARTBEAT),
87 14 => Ok(ApiMessageType::SYNC_GROUP),
88 18 => Ok(ApiMessageType::API_VERSIONS),
89 19 => Ok(ApiMessageType::CREATE_TOPICS),
90 22 => Ok(ApiMessageType::INIT_PRODUCER_ID),
91 _ => Err(err_codec_message(format!("unknown api key {api_key}"))),
92 }
93 }
94}
95
96impl Display for ApiMessageType {
97 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98 write!(
99 f,
100 "{}",
101 match *self {
102 ApiMessageType::PRODUCE => "Produce",
103 ApiMessageType::FETCH => "Fetch",
104 ApiMessageType::METADATA => "Metadata",
105 ApiMessageType::OFFSET_FETCH => "OffsetFetch",
106 ApiMessageType::FIND_COORDINATOR => "FindCoordinator",
107 ApiMessageType::JOIN_GROUP => "JoinGroup",
108 ApiMessageType::HEARTBEAT => "Heartbeat",
109 ApiMessageType::SYNC_GROUP => "SyncGroup",
110 ApiMessageType::API_VERSIONS => "ApiVersions",
111 ApiMessageType::CREATE_TOPICS => "CreateTopics",
112 ApiMessageType::INIT_PRODUCER_ID => "InitProducerId",
113 api_key => unreachable!("unknown api key {:?}", api_key),
114 }
115 )
116 }
117}
118
119impl ApiMessageType {
120 pub fn request_header_version(&self, api_version: i16) -> i16 {
121 fn resolve_request_header_version(flexible: bool) -> i16 {
123 if flexible {
124 2
125 } else {
126 1
127 }
128 }
129
130 match *self {
131 ApiMessageType::PRODUCE => resolve_request_header_version(api_version >= 9),
132 ApiMessageType::FETCH => resolve_request_header_version(api_version >= 12),
133 ApiMessageType::METADATA => resolve_request_header_version(api_version >= 9),
134 ApiMessageType::OFFSET_FETCH => resolve_request_header_version(api_version >= 6),
135 ApiMessageType::FIND_COORDINATOR => resolve_request_header_version(api_version >= 3),
136 ApiMessageType::JOIN_GROUP => resolve_request_header_version(api_version >= 6),
137 ApiMessageType::HEARTBEAT => resolve_request_header_version(api_version >= 4),
138 ApiMessageType::SYNC_GROUP => resolve_request_header_version(api_version >= 4),
139 ApiMessageType::API_VERSIONS => resolve_request_header_version(api_version >= 3),
140 ApiMessageType::CREATE_TOPICS => resolve_request_header_version(api_version >= 5),
141 ApiMessageType::INIT_PRODUCER_ID => resolve_request_header_version(api_version >= 2),
142 _ => unreachable!("unknown api type {}", self.api_key),
143 }
144 }
145
146 pub fn response_header_version(&self, api_version: i16) -> i16 {
147 fn resolve_response_header_version(flexible: bool) -> i16 {
149 if flexible {
150 1
151 } else {
152 0
153 }
154 }
155
156 match *self {
157 ApiMessageType::PRODUCE => resolve_response_header_version(api_version >= 9),
158 ApiMessageType::FETCH => resolve_response_header_version(api_version >= 12),
159 ApiMessageType::METADATA => resolve_response_header_version(api_version >= 9),
160 ApiMessageType::OFFSET_FETCH => resolve_response_header_version(api_version >= 6),
161 ApiMessageType::FIND_COORDINATOR => resolve_response_header_version(api_version >= 3),
162 ApiMessageType::JOIN_GROUP => resolve_response_header_version(api_version >= 6),
163 ApiMessageType::HEARTBEAT => resolve_response_header_version(api_version >= 4),
164 ApiMessageType::SYNC_GROUP => resolve_response_header_version(api_version >= 4),
165 ApiMessageType::API_VERSIONS => {
166 0
169 }
170 ApiMessageType::CREATE_TOPICS => resolve_response_header_version(api_version >= 5),
171 ApiMessageType::INIT_PRODUCER_ID => resolve_response_header_version(api_version >= 2),
172 _ => unreachable!("unknown api type {}", self.api_key),
173 }
174 }
175}