kafka_api/schemata/
mod.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Cursor;
16
17use byteorder::BigEndian;
18use byteorder::ReadBytesExt;
19use byteorder::WriteBytesExt;
20
21use crate::codec::Decodable;
22use crate::codec::Encodable;
23use crate::codec::Encoder;
24use crate::codec::Int32;
25use crate::schemata::apikey::ApiMessageType;
26use crate::schemata::request_header::RequestHeader;
27use crate::schemata::response_header::ResponseHeader;
28use crate::IoResult;
29
30pub mod apikey;
31#[allow(dead_code)]
32pub mod error;
33
34pub mod api_versions_request;
35pub mod api_versions_response;
36pub mod create_topic_request;
37pub mod create_topic_response;
38pub mod fetch_request;
39pub mod fetch_response;
40pub mod find_coordinator_request;
41pub mod find_coordinator_response;
42pub mod heartbeat_request;
43pub mod heartbeat_response;
44pub mod init_producer_id_request;
45pub mod init_producer_id_response;
46pub mod join_group_request;
47pub mod join_group_response;
48pub mod metadata_request;
49pub mod metadata_response;
50pub mod offset_fetch_request;
51pub mod offset_fetch_response;
52pub mod produce_request;
53pub mod produce_response;
54pub mod request_header;
55pub mod response_header;
56pub mod sync_group_request;
57pub mod sync_group_response;
58
59#[derive(Debug)]
60pub enum Request {
61    ApiVersionsRequest(api_versions_request::ApiVersionsRequest),
62    CreateTopicRequest(create_topic_request::CreateTopicsRequest),
63    FetchRequest(fetch_request::FetchRequest),
64    FindCoordinatorRequest(find_coordinator_request::FindCoordinatorRequest),
65    HeartbeatRequest(heartbeat_request::HeartbeatRequest),
66    InitProducerIdRequest(init_producer_id_request::InitProducerIdRequest),
67    JoinGroupRequest(join_group_request::JoinGroupRequest),
68    MetadataRequest(metadata_request::MetadataRequest),
69    OffsetFetchRequest(offset_fetch_request::OffsetFetchRequest),
70    ProduceRequest(produce_request::ProduceRequest),
71    SyncGroupRequest(sync_group_request::SyncGroupRequest),
72}
73
74impl Request {
75    pub fn decode<T: AsRef<[u8]>>(buf: &mut Cursor<T>) -> IoResult<(RequestHeader, Request)> {
76        let header_version = {
77            let pos = buf.position();
78            let api_key = buf.read_i16::<BigEndian>()?;
79            let api_version = buf.read_i16::<BigEndian>()?;
80            buf.set_position(pos);
81            ApiMessageType::try_from(api_key)?.request_header_version(api_version)
82        };
83
84        let header = RequestHeader::read(buf, header_version)?;
85        let api_type = ApiMessageType::try_from(header.request_api_key)?;
86        let api_version = header.request_api_version;
87
88        let request = match api_type {
89            ApiMessageType::API_VERSIONS => {
90                api_versions_request::ApiVersionsRequest::read(buf, api_version)
91                    .map(Request::ApiVersionsRequest)
92            }
93            ApiMessageType::CREATE_TOPICS => {
94                create_topic_request::CreateTopicsRequest::read(buf, api_version)
95                    .map(Request::CreateTopicRequest)
96            }
97            ApiMessageType::FETCH => {
98                fetch_request::FetchRequest::read(buf, api_version).map(Request::FetchRequest)
99            }
100            ApiMessageType::FIND_COORDINATOR => {
101                find_coordinator_request::FindCoordinatorRequest::read(buf, api_version)
102                    .map(Request::FindCoordinatorRequest)
103            }
104            ApiMessageType::INIT_PRODUCER_ID => {
105                init_producer_id_request::InitProducerIdRequest::read(buf, api_version)
106                    .map(Request::InitProducerIdRequest)
107            }
108            ApiMessageType::JOIN_GROUP => {
109                join_group_request::JoinGroupRequest::read(buf, api_version)
110                    .map(Request::JoinGroupRequest)
111            }
112            ApiMessageType::HEARTBEAT => {
113                heartbeat_request::HeartbeatRequest::read(buf, api_version)
114                    .map(Request::HeartbeatRequest)
115            }
116            ApiMessageType::METADATA => metadata_request::MetadataRequest::read(buf, api_version)
117                .map(Request::MetadataRequest),
118            ApiMessageType::OFFSET_FETCH => {
119                offset_fetch_request::OffsetFetchRequest::read(buf, api_version)
120                    .map(Request::OffsetFetchRequest)
121            }
122            ApiMessageType::PRODUCE => {
123                produce_request::ProduceRequest::read(buf, api_version).map(Request::ProduceRequest)
124            }
125            ApiMessageType::SYNC_GROUP => {
126                sync_group_request::SyncGroupRequest::read(buf, api_version)
127                    .map(Request::SyncGroupRequest)
128            }
129            api_type => unreachable!("unknown api type {}", api_type),
130        }?;
131
132        Ok((header, request))
133    }
134}
135
136#[derive(Debug)]
137pub enum Response {
138    ApiVersionsResponse(api_versions_response::ApiVersionsResponse),
139    CreateTopicsResponse(create_topic_response::CreateTopicsResponse),
140    FindCoordinatorResponse(find_coordinator_response::FindCoordinatorResponse),
141    FetchResponse(fetch_response::FetchResponse),
142    HeartbeatResponse(heartbeat_response::HeartbeatResponse),
143    InitProducerIdResponse(init_producer_id_response::InitProducerIdResponse),
144    JoinGroupResponse(join_group_response::JoinGroupResponse),
145    MetadataResponse(metadata_response::MetadataResponse),
146    OffsetFetchResponse(offset_fetch_response::OffsetFetchResponse),
147    ProduceResponse(produce_response::ProduceResponse),
148    SyncGroupResponse(sync_group_response::SyncGroupResponse),
149}
150
151impl Response {
152    pub fn encode<B: WriteBytesExt>(&self, header: RequestHeader, buf: &mut B) -> IoResult<()> {
153        let api_type = ApiMessageType::try_from(header.request_api_key)?;
154        let api_version = header.request_api_version;
155        let correlation_id = header.correlation_id;
156
157        let response_header_version = api_type.response_header_version(api_version);
158        let response_header = ResponseHeader {
159            correlation_id,
160            unknown_tagged_fields: vec![],
161        };
162
163        // 1. total size
164        let size = self.calculate_size(api_version)
165            + response_header.calculate_size(response_header_version);
166        Int32.encode(buf, size as i32)?;
167
168        // 2. response header
169        response_header.write(buf, response_header_version)?;
170
171        // 3. response body
172        self.do_encode(buf, api_version)
173    }
174
175    fn calculate_size(&self, version: i16) -> usize {
176        match self {
177            Response::ApiVersionsResponse(resp) => resp.calculate_size(version),
178            Response::CreateTopicsResponse(resp) => resp.calculate_size(version),
179            Response::FindCoordinatorResponse(resp) => resp.calculate_size(version),
180            Response::FetchResponse(resp) => resp.calculate_size(version),
181            Response::HeartbeatResponse(resp) => resp.calculate_size(version),
182            Response::InitProducerIdResponse(resp) => resp.calculate_size(version),
183            Response::JoinGroupResponse(resp) => resp.calculate_size(version),
184            Response::MetadataResponse(resp) => resp.calculate_size(version),
185            Response::OffsetFetchResponse(resp) => resp.calculate_size(version),
186            Response::ProduceResponse(resp) => resp.calculate_size(version),
187            Response::SyncGroupResponse(resp) => resp.calculate_size(version),
188        }
189    }
190
191    fn do_encode<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
192        match self {
193            Response::ApiVersionsResponse(resp) => resp.write(buf, version),
194            Response::CreateTopicsResponse(resp) => resp.write(buf, version),
195            Response::FindCoordinatorResponse(resp) => resp.write(buf, version),
196            Response::FetchResponse(resp) => resp.write(buf, version),
197            Response::HeartbeatResponse(resp) => resp.write(buf, version),
198            Response::InitProducerIdResponse(resp) => resp.write(buf, version),
199            Response::JoinGroupResponse(resp) => resp.write(buf, version),
200            Response::MetadataResponse(resp) => resp.write(buf, version),
201            Response::OffsetFetchResponse(resp) => resp.write(buf, version),
202            Response::ProduceResponse(resp) => resp.write(buf, version),
203            Response::SyncGroupResponse(resp) => resp.write(buf, version),
204        }
205    }
206}