1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2024 tison <wander4096@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;

use crate::codec::err_codec_message;

#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct ApiMessageType {
    pub api_key: i16,
    pub lowest_supported_version: i16,
    pub highest_supported_version: i16,
    pub latest_version_unstable: bool,
}

impl ApiMessageType {
    const fn new(
        api_key: i16,
        lowest_supported_version: i16,
        highest_supported_version: i16,
        latest_version_unstable: bool,
    ) -> Self {
        Self {
            api_key,
            lowest_supported_version,
            highest_supported_version,
            latest_version_unstable,
        }
    }
}

impl ApiMessageType {
    pub const PRODUCE: Self = ApiMessageType::new(0, 0, 9, false);
    pub const FETCH: Self = ApiMessageType::new(1, 0, 15, false);
    pub const METADATA: Self = ApiMessageType::new(3, 0, 12, false);
    pub const OFFSET_FETCH: Self = ApiMessageType::new(9, 0, 8, false);
    pub const FIND_COORDINATOR: Self = ApiMessageType::new(10, 0, 4, false);
    pub const JOIN_GROUP: Self = ApiMessageType::new(11, 0, 9, false);
    pub const HEARTBEAT: Self = ApiMessageType::new(12, 0, 4, false);
    pub const SYNC_GROUP: Self = ApiMessageType::new(14, 0, 5, false);
    pub const API_VERSIONS: Self = ApiMessageType::new(18, 0, 3, false);
    pub const CREATE_TOPICS: Self = ApiMessageType::new(19, 0, 7, false);
    pub const INIT_PRODUCER_ID: Self = ApiMessageType::new(22, 0, 4, false);
}

impl TryFrom<i16> for ApiMessageType {
    type Error = io::Error;

    fn try_from(api_key: i16) -> Result<Self, Self::Error> {
        match api_key {
            0 => Ok(ApiMessageType::PRODUCE),
            1 => Ok(ApiMessageType::FETCH),
            3 => Ok(ApiMessageType::METADATA),
            9 => Ok(ApiMessageType::OFFSET_FETCH),
            10 => Ok(ApiMessageType::FIND_COORDINATOR),
            11 => Ok(ApiMessageType::JOIN_GROUP),
            12 => Ok(ApiMessageType::HEARTBEAT),
            14 => Ok(ApiMessageType::SYNC_GROUP),
            18 => Ok(ApiMessageType::API_VERSIONS),
            19 => Ok(ApiMessageType::CREATE_TOPICS),
            22 => Ok(ApiMessageType::INIT_PRODUCER_ID),
            _ => Err(err_codec_message(format!("unknown api key {api_key}"))),
        }
    }
}

impl ApiMessageType {
    pub fn request_header_version(&self, api_version: i16) -> i16 {
        // the current different is whether the request is flexible
        fn resolve_request_header_version(flexible: bool) -> i16 {
            if flexible {
                2
            } else {
                1
            }
        }

        match *self {
            ApiMessageType::PRODUCE => resolve_request_header_version(api_version >= 9),
            ApiMessageType::FETCH => resolve_request_header_version(api_version >= 12),
            ApiMessageType::METADATA => resolve_request_header_version(api_version >= 9),
            ApiMessageType::OFFSET_FETCH => resolve_request_header_version(api_version >= 6),
            ApiMessageType::FIND_COORDINATOR => resolve_request_header_version(api_version >= 3),
            ApiMessageType::JOIN_GROUP => resolve_request_header_version(api_version >= 6),
            ApiMessageType::HEARTBEAT => resolve_request_header_version(api_version >= 4),
            ApiMessageType::SYNC_GROUP => resolve_request_header_version(api_version >= 4),
            ApiMessageType::API_VERSIONS => resolve_request_header_version(api_version >= 3),
            ApiMessageType::CREATE_TOPICS => resolve_request_header_version(api_version >= 5),
            ApiMessageType::INIT_PRODUCER_ID => resolve_request_header_version(api_version >= 2),
            _ => unreachable!("unknown api type {}", self.api_key),
        }
    }

    pub fn response_header_version(&self, api_version: i16) -> i16 {
        // the current different is whether the response is flexible
        fn resolve_response_header_version(flexible: bool) -> i16 {
            if flexible {
                1
            } else {
                0
            }
        }

        match *self {
            ApiMessageType::PRODUCE => resolve_response_header_version(api_version >= 9),
            ApiMessageType::FETCH => resolve_response_header_version(api_version >= 12),
            ApiMessageType::METADATA => resolve_response_header_version(api_version >= 9),
            ApiMessageType::OFFSET_FETCH => resolve_response_header_version(api_version >= 6),
            ApiMessageType::FIND_COORDINATOR => resolve_response_header_version(api_version >= 3),
            ApiMessageType::JOIN_GROUP => resolve_response_header_version(api_version >= 6),
            ApiMessageType::HEARTBEAT => resolve_response_header_version(api_version >= 4),
            ApiMessageType::SYNC_GROUP => resolve_response_header_version(api_version >= 4),
            ApiMessageType::API_VERSIONS => {
                // ApiVersionsResponse always includes a v0 header.
                // @see KIP-511 https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers
                0
            }
            ApiMessageType::CREATE_TOPICS => resolve_response_header_version(api_version >= 5),
            ApiMessageType::INIT_PRODUCER_ID => resolve_response_header_version(api_version >= 2),
            _ => unreachable!("unknown api type {}", self.api_key),
        }
    }
}