use std::io;
use crate::codec::err_codec_message;
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct ApiMessageType {
pub api_key: i16,
pub lowest_supported_version: i16,
pub highest_supported_version: i16,
pub latest_version_unstable: bool,
}
impl ApiMessageType {
const fn new(
api_key: i16,
lowest_supported_version: i16,
highest_supported_version: i16,
latest_version_unstable: bool,
) -> Self {
Self {
api_key,
lowest_supported_version,
highest_supported_version,
latest_version_unstable,
}
}
}
impl ApiMessageType {
pub const PRODUCE: Self = ApiMessageType::new(0, 0, 9, false);
pub const FETCH: Self = ApiMessageType::new(1, 0, 15, false);
pub const METADATA: Self = ApiMessageType::new(3, 0, 12, false);
pub const OFFSET_FETCH: Self = ApiMessageType::new(9, 0, 8, false);
pub const FIND_COORDINATOR: Self = ApiMessageType::new(10, 0, 4, false);
pub const JOIN_GROUP: Self = ApiMessageType::new(11, 0, 9, false);
pub const HEARTBEAT: Self = ApiMessageType::new(12, 0, 4, false);
pub const SYNC_GROUP: Self = ApiMessageType::new(14, 0, 5, false);
pub const API_VERSIONS: Self = ApiMessageType::new(18, 0, 3, false);
pub const CREATE_TOPICS: Self = ApiMessageType::new(19, 0, 7, false);
pub const INIT_PRODUCER_ID: Self = ApiMessageType::new(22, 0, 4, false);
}
impl TryFrom<i16> for ApiMessageType {
type Error = io::Error;
fn try_from(api_key: i16) -> Result<Self, Self::Error> {
match api_key {
0 => Ok(ApiMessageType::PRODUCE),
1 => Ok(ApiMessageType::FETCH),
3 => Ok(ApiMessageType::METADATA),
9 => Ok(ApiMessageType::OFFSET_FETCH),
10 => Ok(ApiMessageType::FIND_COORDINATOR),
11 => Ok(ApiMessageType::JOIN_GROUP),
12 => Ok(ApiMessageType::HEARTBEAT),
14 => Ok(ApiMessageType::SYNC_GROUP),
18 => Ok(ApiMessageType::API_VERSIONS),
19 => Ok(ApiMessageType::CREATE_TOPICS),
22 => Ok(ApiMessageType::INIT_PRODUCER_ID),
_ => Err(err_codec_message(format!("unknown api key {api_key}"))),
}
}
}
impl ApiMessageType {
pub fn request_header_version(&self, api_version: i16) -> i16 {
fn resolve_request_header_version(flexible: bool) -> i16 {
if flexible {
2
} else {
1
}
}
match *self {
ApiMessageType::PRODUCE => resolve_request_header_version(api_version >= 9),
ApiMessageType::FETCH => resolve_request_header_version(api_version >= 12),
ApiMessageType::METADATA => resolve_request_header_version(api_version >= 9),
ApiMessageType::OFFSET_FETCH => resolve_request_header_version(api_version >= 6),
ApiMessageType::FIND_COORDINATOR => resolve_request_header_version(api_version >= 3),
ApiMessageType::JOIN_GROUP => resolve_request_header_version(api_version >= 6),
ApiMessageType::HEARTBEAT => resolve_request_header_version(api_version >= 4),
ApiMessageType::SYNC_GROUP => resolve_request_header_version(api_version >= 4),
ApiMessageType::API_VERSIONS => resolve_request_header_version(api_version >= 3),
ApiMessageType::CREATE_TOPICS => resolve_request_header_version(api_version >= 5),
ApiMessageType::INIT_PRODUCER_ID => resolve_request_header_version(api_version >= 2),
_ => unreachable!("unknown api type {}", self.api_key),
}
}
pub fn response_header_version(&self, api_version: i16) -> i16 {
fn resolve_response_header_version(flexible: bool) -> i16 {
if flexible {
1
} else {
0
}
}
match *self {
ApiMessageType::PRODUCE => resolve_response_header_version(api_version >= 9),
ApiMessageType::FETCH => resolve_response_header_version(api_version >= 12),
ApiMessageType::METADATA => resolve_response_header_version(api_version >= 9),
ApiMessageType::OFFSET_FETCH => resolve_response_header_version(api_version >= 6),
ApiMessageType::FIND_COORDINATOR => resolve_response_header_version(api_version >= 3),
ApiMessageType::JOIN_GROUP => resolve_response_header_version(api_version >= 6),
ApiMessageType::HEARTBEAT => resolve_response_header_version(api_version >= 4),
ApiMessageType::SYNC_GROUP => resolve_response_header_version(api_version >= 4),
ApiMessageType::API_VERSIONS => {
0
}
ApiMessageType::CREATE_TOPICS => resolve_response_header_version(api_version >= 5),
ApiMessageType::INIT_PRODUCER_ID => resolve_response_header_version(api_version >= 2),
_ => unreachable!("unknown api type {}", self.api_key),
}
}
}