kafka_api/schemata/
mod.rs1use std::io::Cursor;
16
17use byteorder::BigEndian;
18use byteorder::ReadBytesExt;
19use byteorder::WriteBytesExt;
20
21use crate::codec::Decodable;
22use crate::codec::Encodable;
23use crate::codec::Encoder;
24use crate::codec::Int32;
25use crate::schemata::apikey::ApiMessageType;
26use crate::schemata::request_header::RequestHeader;
27use crate::schemata::response_header::ResponseHeader;
28use crate::IoResult;
29
30pub mod apikey;
31#[allow(dead_code)]
32pub mod error;
33
34pub mod api_versions_request;
35pub mod api_versions_response;
36pub mod create_topic_request;
37pub mod create_topic_response;
38pub mod fetch_request;
39pub mod fetch_response;
40pub mod find_coordinator_request;
41pub mod find_coordinator_response;
42pub mod heartbeat_request;
43pub mod heartbeat_response;
44pub mod init_producer_id_request;
45pub mod init_producer_id_response;
46pub mod join_group_request;
47pub mod join_group_response;
48pub mod metadata_request;
49pub mod metadata_response;
50pub mod offset_fetch_request;
51pub mod offset_fetch_response;
52pub mod produce_request;
53pub mod produce_response;
54pub mod request_header;
55pub mod response_header;
56pub mod sync_group_request;
57pub mod sync_group_response;
58
59#[derive(Debug)]
60pub enum Request {
61 ApiVersionsRequest(api_versions_request::ApiVersionsRequest),
62 CreateTopicRequest(create_topic_request::CreateTopicsRequest),
63 FetchRequest(fetch_request::FetchRequest),
64 FindCoordinatorRequest(find_coordinator_request::FindCoordinatorRequest),
65 HeartbeatRequest(heartbeat_request::HeartbeatRequest),
66 InitProducerIdRequest(init_producer_id_request::InitProducerIdRequest),
67 JoinGroupRequest(join_group_request::JoinGroupRequest),
68 MetadataRequest(metadata_request::MetadataRequest),
69 OffsetFetchRequest(offset_fetch_request::OffsetFetchRequest),
70 ProduceRequest(produce_request::ProduceRequest),
71 SyncGroupRequest(sync_group_request::SyncGroupRequest),
72}
73
74impl Request {
75 pub fn decode<T: AsRef<[u8]>>(buf: &mut Cursor<T>) -> IoResult<(RequestHeader, Request)> {
76 let header_version = {
77 let pos = buf.position();
78 let api_key = buf.read_i16::<BigEndian>()?;
79 let api_version = buf.read_i16::<BigEndian>()?;
80 buf.set_position(pos);
81 ApiMessageType::try_from(api_key)?.request_header_version(api_version)
82 };
83
84 let header = RequestHeader::read(buf, header_version)?;
85 let api_type = ApiMessageType::try_from(header.request_api_key)?;
86 let api_version = header.request_api_version;
87
88 let request = match api_type {
89 ApiMessageType::API_VERSIONS => {
90 api_versions_request::ApiVersionsRequest::read(buf, api_version)
91 .map(Request::ApiVersionsRequest)
92 }
93 ApiMessageType::CREATE_TOPICS => {
94 create_topic_request::CreateTopicsRequest::read(buf, api_version)
95 .map(Request::CreateTopicRequest)
96 }
97 ApiMessageType::FETCH => {
98 fetch_request::FetchRequest::read(buf, api_version).map(Request::FetchRequest)
99 }
100 ApiMessageType::FIND_COORDINATOR => {
101 find_coordinator_request::FindCoordinatorRequest::read(buf, api_version)
102 .map(Request::FindCoordinatorRequest)
103 }
104 ApiMessageType::INIT_PRODUCER_ID => {
105 init_producer_id_request::InitProducerIdRequest::read(buf, api_version)
106 .map(Request::InitProducerIdRequest)
107 }
108 ApiMessageType::JOIN_GROUP => {
109 join_group_request::JoinGroupRequest::read(buf, api_version)
110 .map(Request::JoinGroupRequest)
111 }
112 ApiMessageType::HEARTBEAT => {
113 heartbeat_request::HeartbeatRequest::read(buf, api_version)
114 .map(Request::HeartbeatRequest)
115 }
116 ApiMessageType::METADATA => metadata_request::MetadataRequest::read(buf, api_version)
117 .map(Request::MetadataRequest),
118 ApiMessageType::OFFSET_FETCH => {
119 offset_fetch_request::OffsetFetchRequest::read(buf, api_version)
120 .map(Request::OffsetFetchRequest)
121 }
122 ApiMessageType::PRODUCE => {
123 produce_request::ProduceRequest::read(buf, api_version).map(Request::ProduceRequest)
124 }
125 ApiMessageType::SYNC_GROUP => {
126 sync_group_request::SyncGroupRequest::read(buf, api_version)
127 .map(Request::SyncGroupRequest)
128 }
129 api_type => unreachable!("unknown api type {}", api_type),
130 }?;
131
132 Ok((header, request))
133 }
134}
135
136#[derive(Debug)]
137pub enum Response {
138 ApiVersionsResponse(api_versions_response::ApiVersionsResponse),
139 CreateTopicsResponse(create_topic_response::CreateTopicsResponse),
140 FindCoordinatorResponse(find_coordinator_response::FindCoordinatorResponse),
141 FetchResponse(fetch_response::FetchResponse),
142 HeartbeatResponse(heartbeat_response::HeartbeatResponse),
143 InitProducerIdResponse(init_producer_id_response::InitProducerIdResponse),
144 JoinGroupResponse(join_group_response::JoinGroupResponse),
145 MetadataResponse(metadata_response::MetadataResponse),
146 OffsetFetchResponse(offset_fetch_response::OffsetFetchResponse),
147 ProduceResponse(produce_response::ProduceResponse),
148 SyncGroupResponse(sync_group_response::SyncGroupResponse),
149}
150
151impl Response {
152 pub fn encode<B: WriteBytesExt>(&self, header: RequestHeader, buf: &mut B) -> IoResult<()> {
153 let api_type = ApiMessageType::try_from(header.request_api_key)?;
154 let api_version = header.request_api_version;
155 let correlation_id = header.correlation_id;
156
157 let response_header_version = api_type.response_header_version(api_version);
158 let response_header = ResponseHeader {
159 correlation_id,
160 unknown_tagged_fields: vec![],
161 };
162
163 let size = self.calculate_size(api_version)
165 + response_header.calculate_size(response_header_version);
166 Int32.encode(buf, size as i32)?;
167
168 response_header.write(buf, response_header_version)?;
170
171 self.do_encode(buf, api_version)
173 }
174
175 fn calculate_size(&self, version: i16) -> usize {
176 match self {
177 Response::ApiVersionsResponse(resp) => resp.calculate_size(version),
178 Response::CreateTopicsResponse(resp) => resp.calculate_size(version),
179 Response::FindCoordinatorResponse(resp) => resp.calculate_size(version),
180 Response::FetchResponse(resp) => resp.calculate_size(version),
181 Response::HeartbeatResponse(resp) => resp.calculate_size(version),
182 Response::InitProducerIdResponse(resp) => resp.calculate_size(version),
183 Response::JoinGroupResponse(resp) => resp.calculate_size(version),
184 Response::MetadataResponse(resp) => resp.calculate_size(version),
185 Response::OffsetFetchResponse(resp) => resp.calculate_size(version),
186 Response::ProduceResponse(resp) => resp.calculate_size(version),
187 Response::SyncGroupResponse(resp) => resp.calculate_size(version),
188 }
189 }
190
191 fn do_encode<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
192 match self {
193 Response::ApiVersionsResponse(resp) => resp.write(buf, version),
194 Response::CreateTopicsResponse(resp) => resp.write(buf, version),
195 Response::FindCoordinatorResponse(resp) => resp.write(buf, version),
196 Response::FetchResponse(resp) => resp.write(buf, version),
197 Response::HeartbeatResponse(resp) => resp.write(buf, version),
198 Response::InitProducerIdResponse(resp) => resp.write(buf, version),
199 Response::JoinGroupResponse(resp) => resp.write(buf, version),
200 Response::MetadataResponse(resp) => resp.write(buf, version),
201 Response::OffsetFetchResponse(resp) => resp.write(buf, version),
202 Response::ProduceResponse(resp) => resp.write(buf, version),
203 Response::SyncGroupResponse(resp) => resp.write(buf, version),
204 }
205 }
206}