1use std::collections::HashMap;
7
8#[derive(Debug)]
10pub struct KafkaProtocolHandler {
11 api_versions: HashMap<i16, ApiVersion>,
12}
13
14impl KafkaProtocolHandler {
15 pub fn new() -> Self {
17 let mut api_versions = HashMap::new();
18 api_versions.insert(
20 0,
21 ApiVersion {
22 min_version: 0,
23 max_version: 12,
24 },
25 ); api_versions.insert(
27 1,
28 ApiVersion {
29 min_version: 0,
30 max_version: 16,
31 },
32 ); api_versions.insert(
34 3,
35 ApiVersion {
36 min_version: 0,
37 max_version: 12,
38 },
39 ); api_versions.insert(
41 9,
42 ApiVersion {
43 min_version: 0,
44 max_version: 5,
45 },
46 ); api_versions.insert(
48 15,
49 ApiVersion {
50 min_version: 0,
51 max_version: 9,
52 },
53 ); api_versions.insert(
55 16,
56 ApiVersion {
57 min_version: 0,
58 max_version: 9,
59 },
60 ); api_versions.insert(
62 18,
63 ApiVersion {
64 min_version: 0,
65 max_version: 4,
66 },
67 ); api_versions.insert(
69 19,
70 ApiVersion {
71 min_version: 0,
72 max_version: 7,
73 },
74 ); api_versions.insert(
76 20,
77 ApiVersion {
78 min_version: 0,
79 max_version: 6,
80 },
81 ); api_versions.insert(
83 32,
84 ApiVersion {
85 min_version: 0,
86 max_version: 4,
87 },
88 ); api_versions.insert(
90 49,
91 ApiVersion {
92 min_version: 0,
93 max_version: 4,
94 },
95 ); Self { api_versions }
98 }
99}
100
101impl Default for KafkaProtocolHandler {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl KafkaProtocolHandler {
108 pub fn parse_request(&self, data: &[u8]) -> Result<KafkaRequest> {
110 if data.len() < 12 {
112 return Err(anyhow::anyhow!("Message too short for header"));
113 }
114
115 let api_key = ((data[4] as i16) << 8) | (data[5] as i16);
117
118 let api_version = ((data[6] as i16) << 8) | (data[7] as i16);
120
121 let correlation_id = ((data[8] as i32) << 24)
123 | ((data[9] as i32) << 16)
124 | ((data[10] as i32) << 8)
125 | (data[11] as i32);
126
127 if data.len() < 14 {
129 return Err(anyhow::anyhow!("Message too short for client ID length"));
130 }
131 let client_id_len = ((data[12] as i16) << 8) | (data[13] as i16);
132
133 let client_id_start = 14;
135 let client_id_end = client_id_start + (client_id_len as usize);
136 if data.len() < client_id_end {
137 return Err(anyhow::anyhow!("Message too short for client ID"));
138 }
139 let client_id = if client_id_len > 0 {
140 String::from_utf8(data[client_id_start..client_id_end].to_vec())
141 .map_err(|e| anyhow::anyhow!("Invalid client ID encoding: {}", e))?
142 } else {
143 String::new()
144 };
145
146 let request_type = match api_key {
147 0 => KafkaRequestType::Produce,
148 1 => KafkaRequestType::Fetch,
149 3 => KafkaRequestType::Metadata,
150 9 => KafkaRequestType::ListGroups,
151 15 => KafkaRequestType::DescribeGroups,
152 18 => KafkaRequestType::ApiVersions,
153 19 => KafkaRequestType::CreateTopics,
154 20 => KafkaRequestType::DeleteTopics,
155 32 => KafkaRequestType::DescribeConfigs,
156 _ => KafkaRequestType::ApiVersions, };
158
159 Ok(KafkaRequest {
160 api_key,
161 api_version,
162 correlation_id,
163 client_id,
164 request_type,
165 })
166 }
167
168 pub fn serialize_response(
170 &self,
171 response: &KafkaResponse,
172 correlation_id: i32,
173 ) -> Result<Vec<u8>> {
174 match response {
176 KafkaResponse::ApiVersions => {
177 let mut data = Vec::new();
179 data.extend_from_slice(&correlation_id.to_be_bytes());
181 data.extend_from_slice(&0i16.to_be_bytes());
183 data.extend_from_slice(&0i32.to_be_bytes());
185 Ok(data)
186 }
187 KafkaResponse::CreateTopics => {
188 let mut data = Vec::new();
190 data.extend_from_slice(&correlation_id.to_be_bytes());
191 data.extend_from_slice(&0i16.to_be_bytes()); data.extend_from_slice(&1i32.to_be_bytes()); let topic_name = b"default-topic";
195 data.extend_from_slice(&(topic_name.len() as i16).to_be_bytes());
196 data.extend_from_slice(topic_name);
197 data.extend_from_slice(&0i16.to_be_bytes()); Ok(data)
199 }
200 _ => {
201 let mut data = Vec::new();
203 data.extend_from_slice(&correlation_id.to_be_bytes());
204 data.extend_from_slice(&0i16.to_be_bytes()); Ok(data)
206 }
207 }
208 }
209
210 pub fn is_api_version_supported(&self, api_key: i16, version: i16) -> bool {
212 if let Some(api_version) = self.api_versions.get(&api_key) {
213 version >= api_version.min_version && version <= api_version.max_version
214 } else {
215 false
216 }
217 }
218}
219
220#[derive(Debug)]
222pub struct KafkaRequest {
223 pub api_key: i16,
224 pub api_version: i16,
225 pub correlation_id: i32,
226 pub client_id: String,
227 pub request_type: KafkaRequestType,
228}
229
230#[derive(Debug)]
232pub enum KafkaRequestType {
233 Metadata,
234 Produce,
235 Fetch,
236 ListGroups,
237 DescribeGroups,
238 ApiVersions,
239 CreateTopics,
240 DeleteTopics,
241 DescribeConfigs,
242}
243
244#[derive(Debug)]
246pub enum KafkaResponse {
247 Metadata,
248 Produce,
249 Fetch,
250 ListGroups,
251 DescribeGroups,
252 ApiVersions,
253 CreateTopics,
254 DeleteTopics,
255 DescribeConfigs,
256}
257
258#[derive(Debug)]
259struct ApiVersion {
260 min_version: i16,
261 max_version: i16,
262}
263
264type Result<T> = std::result::Result<T, anyhow::Error>;