mockforge_kafka/
protocol.rs

1//! Kafka protocol handling
2//!
3//! This module contains the low-level Kafka protocol implementation,
4//! including request/response parsing and wire protocol handling.
5
6use std::collections::HashMap;
7
8/// Kafka protocol handler
9#[derive(Debug)]
10pub struct KafkaProtocolHandler {
11    api_versions: HashMap<i16, ApiVersion>,
12}
13
14impl KafkaProtocolHandler {
15    /// Create a new protocol handler
16    pub fn new() -> Self {
17        let mut api_versions = HashMap::new();
18        // Add supported API versions
19        api_versions.insert(
20            0,
21            ApiVersion {
22                min_version: 0,
23                max_version: 12,
24            },
25        ); // Produce
26        api_versions.insert(
27            1,
28            ApiVersion {
29                min_version: 0,
30                max_version: 16,
31            },
32        ); // Fetch
33        api_versions.insert(
34            3,
35            ApiVersion {
36                min_version: 0,
37                max_version: 12,
38            },
39        ); // Metadata
40        api_versions.insert(
41            9,
42            ApiVersion {
43                min_version: 0,
44                max_version: 5,
45            },
46        ); // ListGroups
47        api_versions.insert(
48            15,
49            ApiVersion {
50                min_version: 0,
51                max_version: 9,
52            },
53        ); // DescribeGroups
54        api_versions.insert(
55            16,
56            ApiVersion {
57                min_version: 0,
58                max_version: 9,
59            },
60        ); // DescribeGroups (alternative)
61        api_versions.insert(
62            18,
63            ApiVersion {
64                min_version: 0,
65                max_version: 4,
66            },
67        ); // ApiVersions
68        api_versions.insert(
69            19,
70            ApiVersion {
71                min_version: 0,
72                max_version: 7,
73            },
74        ); // CreateTopics
75        api_versions.insert(
76            20,
77            ApiVersion {
78                min_version: 0,
79                max_version: 6,
80            },
81        ); // DeleteTopics
82        api_versions.insert(
83            32,
84            ApiVersion {
85                min_version: 0,
86                max_version: 4,
87            },
88        ); // DescribeConfigs
89        api_versions.insert(
90            49,
91            ApiVersion {
92                min_version: 0,
93                max_version: 4,
94            },
95        ); // DescribeConfigs (alternative)
96
97        Self { api_versions }
98    }
99}
100
101impl Default for KafkaProtocolHandler {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl KafkaProtocolHandler {
108    /// Parse a Kafka request from bytes
109    pub fn parse_request(&self, data: &[u8]) -> Result<KafkaRequest> {
110        // Parse Kafka protocol header
111        if data.len() < 12 {
112            return Err(anyhow::anyhow!("Message too short for header"));
113        }
114
115        // Extract API key from bytes 4-5 (big-endian i16)
116        let api_key = ((data[4] as i16) << 8) | (data[5] as i16);
117
118        // Extract API version from bytes 6-7 (big-endian i16)
119        let api_version = ((data[6] as i16) << 8) | (data[7] as i16);
120
121        // Extract correlation ID from bytes 8-11 (big-endian i32)
122        let correlation_id = ((data[8] as i32) << 24)
123            | ((data[9] as i32) << 16)
124            | ((data[10] as i32) << 8)
125            | (data[11] as i32);
126
127        // Parse client ID length from bytes 12-13 (big-endian i16)
128        if data.len() < 14 {
129            return Err(anyhow::anyhow!("Message too short for client ID length"));
130        }
131        let client_id_len = ((data[12] as i16) << 8) | (data[13] as i16);
132
133        // Parse client ID
134        let client_id_start = 14;
135        let client_id_end = client_id_start + (client_id_len as usize);
136        if data.len() < client_id_end {
137            return Err(anyhow::anyhow!("Message too short for client ID"));
138        }
139        let client_id = if client_id_len > 0 {
140            String::from_utf8(data[client_id_start..client_id_end].to_vec())
141                .map_err(|e| anyhow::anyhow!("Invalid client ID encoding: {}", e))?
142        } else {
143            String::new()
144        };
145
146        let request_type = match api_key {
147            0 => KafkaRequestType::Produce,
148            1 => KafkaRequestType::Fetch,
149            3 => KafkaRequestType::Metadata,
150            9 => KafkaRequestType::ListGroups,
151            15 => KafkaRequestType::DescribeGroups,
152            18 => KafkaRequestType::ApiVersions,
153            19 => KafkaRequestType::CreateTopics,
154            20 => KafkaRequestType::DeleteTopics,
155            32 => KafkaRequestType::DescribeConfigs,
156            _ => KafkaRequestType::ApiVersions, // Default to ApiVersions for unsupported APIs
157        };
158
159        Ok(KafkaRequest {
160            api_key,
161            api_version,
162            correlation_id,
163            client_id,
164            request_type,
165        })
166    }
167
168    /// Serialize a Kafka response to bytes
169    pub fn serialize_response(
170        &self,
171        response: &KafkaResponse,
172        correlation_id: i32,
173    ) -> Result<Vec<u8>> {
174        // Basic response serialization - full protocol serialization not yet implemented
175        match response {
176            KafkaResponse::ApiVersions => {
177                // Minimal ApiVersions response
178                let mut data = Vec::new();
179                // Correlation ID
180                data.extend_from_slice(&correlation_id.to_be_bytes());
181                // Error code (0 = success)
182                data.extend_from_slice(&0i16.to_be_bytes());
183                // Empty API keys array for now
184                data.extend_from_slice(&0i32.to_be_bytes());
185                Ok(data)
186            }
187            KafkaResponse::CreateTopics => {
188                // Minimal CreateTopics response
189                let mut data = Vec::new();
190                data.extend_from_slice(&correlation_id.to_be_bytes());
191                data.extend_from_slice(&0i16.to_be_bytes()); // Error code
192                data.extend_from_slice(&1i32.to_be_bytes()); // Number of topics
193                                                             // Topic name (length + bytes)
194                let topic_name = b"default-topic";
195                data.extend_from_slice(&(topic_name.len() as i16).to_be_bytes());
196                data.extend_from_slice(topic_name);
197                data.extend_from_slice(&0i16.to_be_bytes()); // Error code for topic
198                Ok(data)
199            }
200            _ => {
201                // Minimal response for other types
202                let mut data = Vec::new();
203                data.extend_from_slice(&correlation_id.to_be_bytes());
204                data.extend_from_slice(&0i16.to_be_bytes()); // Error code
205                Ok(data)
206            }
207        }
208    }
209
210    /// Check if API version is supported
211    pub fn is_api_version_supported(&self, api_key: i16, version: i16) -> bool {
212        if let Some(api_version) = self.api_versions.get(&api_key) {
213            version >= api_version.min_version && version <= api_version.max_version
214        } else {
215            false
216        }
217    }
218}
219
220/// Represents a parsed Kafka request with header information
221#[derive(Debug)]
222pub struct KafkaRequest {
223    pub api_key: i16,
224    pub api_version: i16,
225    pub correlation_id: i32,
226    pub client_id: String,
227    pub request_type: KafkaRequestType,
228}
229
230/// Kafka request types
231#[derive(Debug)]
232pub enum KafkaRequestType {
233    Metadata,
234    Produce,
235    Fetch,
236    ListGroups,
237    DescribeGroups,
238    ApiVersions,
239    CreateTopics,
240    DeleteTopics,
241    DescribeConfigs,
242}
243
244/// Represents a Kafka response
245#[derive(Debug)]
246pub enum KafkaResponse {
247    Metadata,
248    Produce,
249    Fetch,
250    ListGroups,
251    DescribeGroups,
252    ApiVersions,
253    CreateTopics,
254    DeleteTopics,
255    DescribeConfigs,
256}
257
258#[derive(Debug)]
259struct ApiVersion {
260    min_version: i16,
261    max_version: i16,
262}
263
264type Result<T> = std::result::Result<T, anyhow::Error>;