mockforge_kafka/
protocol.rs

1//! Kafka protocol handling
2//!
3//! This module contains the low-level Kafka protocol implementation,
4//! including request/response parsing and wire protocol handling.
5
6use std::collections::HashMap;
7
8/// Kafka protocol handler
9#[derive(Debug)]
10pub struct KafkaProtocolHandler {
11    api_versions: HashMap<i16, ApiVersion>,
12}
13
14impl KafkaProtocolHandler {
15    /// Create a new protocol handler
16    pub fn new() -> Self {
17        let mut api_versions = HashMap::new();
18        // Add supported API versions
19        api_versions.insert(
20            0,
21            ApiVersion {
22                min_version: 0,
23                max_version: 12,
24            },
25        ); // Produce
26        api_versions.insert(
27            1,
28            ApiVersion {
29                min_version: 0,
30                max_version: 16,
31            },
32        ); // Fetch
33        api_versions.insert(
34            3,
35            ApiVersion {
36                min_version: 0,
37                max_version: 12,
38            },
39        ); // Metadata
40        api_versions.insert(
41            9,
42            ApiVersion {
43                min_version: 0,
44                max_version: 5,
45            },
46        ); // ListGroups
47        api_versions.insert(
48            15,
49            ApiVersion {
50                min_version: 0,
51                max_version: 9,
52            },
53        ); // DescribeGroups
54        api_versions.insert(
55            16,
56            ApiVersion {
57                min_version: 0,
58                max_version: 9,
59            },
60        ); // DescribeGroups (alternative)
61        api_versions.insert(
62            18,
63            ApiVersion {
64                min_version: 0,
65                max_version: 4,
66            },
67        ); // ApiVersions
68        api_versions.insert(
69            19,
70            ApiVersion {
71                min_version: 0,
72                max_version: 7,
73            },
74        ); // CreateTopics
75        api_versions.insert(
76            20,
77            ApiVersion {
78                min_version: 0,
79                max_version: 6,
80            },
81        ); // DeleteTopics
82        api_versions.insert(
83            32,
84            ApiVersion {
85                min_version: 0,
86                max_version: 4,
87            },
88        ); // DescribeConfigs
89        api_versions.insert(
90            49,
91            ApiVersion {
92                min_version: 0,
93                max_version: 4,
94            },
95        ); // DescribeConfigs (alternative)
96
97        Self { api_versions }
98    }
99}
100
101impl Default for KafkaProtocolHandler {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl KafkaProtocolHandler {
108    /// Parse a Kafka request from bytes
109    pub fn parse_request(&self, data: &[u8]) -> Result<KafkaRequest> {
110        // Parse Kafka protocol header
111        if data.len() < 12 {
112            return Err(anyhow::anyhow!("Message too short for header"));
113        }
114
115        // Extract API key from bytes 4-5 (big-endian i16)
116        let api_key = ((data[4] as i16) << 8) | (data[5] as i16);
117
118        // Extract API version from bytes 6-7 (big-endian i16)
119        let api_version = ((data[6] as i16) << 8) | (data[7] as i16);
120
121        // Extract correlation ID from bytes 8-11 (big-endian i32)
122        let correlation_id = ((data[8] as i32) << 24)
123            | ((data[9] as i32) << 16)
124            | ((data[10] as i32) << 8)
125            | (data[11] as i32);
126
127        // Parse client ID length from bytes 12-13 (big-endian i16)
128        if data.len() < 14 {
129            return Err(anyhow::anyhow!("Message too short for client ID length"));
130        }
131        let client_id_len = ((data[12] as i16) << 8) | (data[13] as i16);
132
133        // Parse client ID
134        let client_id_start = 14;
135        let client_id_end = client_id_start + (client_id_len as usize);
136        if data.len() < client_id_end {
137            return Err(anyhow::anyhow!("Message too short for client ID"));
138        }
139        let client_id = if client_id_len > 0 {
140            String::from_utf8(data[client_id_start..client_id_end].to_vec())
141                .map_err(|e| anyhow::anyhow!("Invalid client ID encoding: {}", e))?
142        } else {
143            String::new()
144        };
145
146        let request_type = match api_key {
147            0 => KafkaRequestType::Produce,
148            1 => KafkaRequestType::Fetch,
149            3 => KafkaRequestType::Metadata,
150            9 => KafkaRequestType::ListGroups,
151            15 => KafkaRequestType::DescribeGroups,
152            18 => KafkaRequestType::ApiVersions,
153            19 => KafkaRequestType::CreateTopics,
154            20 => KafkaRequestType::DeleteTopics,
155            32 => KafkaRequestType::DescribeConfigs,
156            _ => KafkaRequestType::ApiVersions, // Default to ApiVersions for unsupported APIs
157        };
158
159        Ok(KafkaRequest {
160            api_key,
161            api_version,
162            correlation_id,
163            client_id,
164            request_type,
165        })
166    }
167
168    /// Serialize a Kafka response to bytes
169    pub fn serialize_response(
170        &self,
171        response: &KafkaResponse,
172        correlation_id: i32,
173    ) -> Result<Vec<u8>> {
174        // Basic response serialization - full protocol serialization not yet implemented
175        match response {
176            KafkaResponse::ApiVersions => {
177                // Minimal ApiVersions response
178                let mut data = Vec::new();
179                // Correlation ID
180                data.extend_from_slice(&correlation_id.to_be_bytes());
181                // Error code (0 = success)
182                data.extend_from_slice(&0i16.to_be_bytes());
183                // Empty API keys array for now
184                data.extend_from_slice(&0i32.to_be_bytes());
185                Ok(data)
186            }
187            KafkaResponse::CreateTopics => {
188                // Minimal CreateTopics response
189                let mut data = Vec::new();
190                data.extend_from_slice(&correlation_id.to_be_bytes());
191                data.extend_from_slice(&0i16.to_be_bytes()); // Error code
192                data.extend_from_slice(&1i32.to_be_bytes()); // Number of topics
193                                                             // Topic name (length + bytes)
194                let topic_name = b"default-topic";
195                data.extend_from_slice(&(topic_name.len() as i16).to_be_bytes());
196                data.extend_from_slice(topic_name);
197                data.extend_from_slice(&0i16.to_be_bytes()); // Error code for topic
198                Ok(data)
199            }
200            _ => {
201                // Minimal response for other types
202                let mut data = Vec::new();
203                data.extend_from_slice(&correlation_id.to_be_bytes());
204                data.extend_from_slice(&0i16.to_be_bytes()); // Error code
205                Ok(data)
206            }
207        }
208    }
209
210    /// Check if API version is supported
211    pub fn is_api_version_supported(&self, api_key: i16, version: i16) -> bool {
212        if let Some(api_version) = self.api_versions.get(&api_key) {
213            version >= api_version.min_version && version <= api_version.max_version
214        } else {
215            false
216        }
217    }
218}
219
220/// Represents a parsed Kafka request with header information
221#[derive(Debug)]
222pub struct KafkaRequest {
223    pub api_key: i16,
224    pub api_version: i16,
225    pub correlation_id: i32,
226    pub client_id: String,
227    pub request_type: KafkaRequestType,
228}
229
230/// Kafka request types
231#[derive(Debug)]
232pub enum KafkaRequestType {
233    Metadata,
234    Produce,
235    Fetch,
236    ListGroups,
237    DescribeGroups,
238    ApiVersions,
239    CreateTopics,
240    DeleteTopics,
241    DescribeConfigs,
242}
243
244/// Represents a Kafka response
245#[derive(Debug)]
246pub enum KafkaResponse {
247    Metadata,
248    Produce,
249    Fetch,
250    ListGroups,
251    DescribeGroups,
252    ApiVersions,
253    CreateTopics,
254    DeleteTopics,
255    DescribeConfigs,
256}
257
258#[derive(Debug)]
259struct ApiVersion {
260    min_version: i16,
261    max_version: i16,
262}
263
264type Result<T> = std::result::Result<T, anyhow::Error>;
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    // ==================== KafkaProtocolHandler Tests ====================
271
272    #[test]
273    fn test_protocol_handler_new() {
274        let handler = KafkaProtocolHandler::new();
275        assert!(handler.api_versions.len() > 0);
276        assert!(handler.api_versions.contains_key(&0)); // Produce
277        assert!(handler.api_versions.contains_key(&1)); // Fetch
278        assert!(handler.api_versions.contains_key(&18)); // ApiVersions
279    }
280
281    #[test]
282    fn test_protocol_handler_default() {
283        let handler = KafkaProtocolHandler::default();
284        assert!(handler.api_versions.len() > 0);
285    }
286
287    #[test]
288    fn test_is_api_version_supported_produce() {
289        let handler = KafkaProtocolHandler::new();
290        // Produce API (key 0) supports versions 0-12
291        assert!(handler.is_api_version_supported(0, 0));
292        assert!(handler.is_api_version_supported(0, 12));
293        assert!(!handler.is_api_version_supported(0, 13));
294        assert!(!handler.is_api_version_supported(0, -1));
295    }
296
297    #[test]
298    fn test_is_api_version_supported_fetch() {
299        let handler = KafkaProtocolHandler::new();
300        // Fetch API (key 1) supports versions 0-16
301        assert!(handler.is_api_version_supported(1, 0));
302        assert!(handler.is_api_version_supported(1, 16));
303        assert!(!handler.is_api_version_supported(1, 17));
304    }
305
306    #[test]
307    fn test_is_api_version_supported_metadata() {
308        let handler = KafkaProtocolHandler::new();
309        // Metadata API (key 3) supports versions 0-12
310        assert!(handler.is_api_version_supported(3, 0));
311        assert!(handler.is_api_version_supported(3, 12));
312        assert!(!handler.is_api_version_supported(3, 13));
313    }
314
315    #[test]
316    fn test_is_api_version_supported_api_versions() {
317        let handler = KafkaProtocolHandler::new();
318        // ApiVersions API (key 18) supports versions 0-4
319        assert!(handler.is_api_version_supported(18, 0));
320        assert!(handler.is_api_version_supported(18, 4));
321        assert!(!handler.is_api_version_supported(18, 5));
322    }
323
324    #[test]
325    fn test_is_api_version_unsupported_api_key() {
326        let handler = KafkaProtocolHandler::new();
327        // API key 999 doesn't exist
328        assert!(!handler.is_api_version_supported(999, 0));
329        assert!(!handler.is_api_version_supported(-1, 0));
330    }
331
332    // ==================== parse_request Tests ====================
333
334    #[test]
335    fn test_parse_request_too_short() {
336        let handler = KafkaProtocolHandler::new();
337        let data = vec![0u8; 5]; // Too short for header
338        let result = handler.parse_request(&data);
339        assert!(result.is_err());
340    }
341
342    #[test]
343    fn test_parse_request_minimal_header() {
344        let handler = KafkaProtocolHandler::new();
345        // Create minimal valid request
346        let mut data = vec![0u8; 14];
347        // API key (2 bytes): 18 (ApiVersions)
348        data[4] = 0;
349        data[5] = 18;
350        // API version (2 bytes): 0
351        data[6] = 0;
352        data[7] = 0;
353        // Correlation ID (4 bytes): 1
354        data[8] = 0;
355        data[9] = 0;
356        data[10] = 0;
357        data[11] = 1;
358        // Client ID length (2 bytes): 0 (empty string)
359        data[12] = 0;
360        data[13] = 0;
361
362        let result = handler.parse_request(&data);
363        assert!(result.is_ok());
364        let request = result.unwrap();
365        assert_eq!(request.api_key, 18);
366        assert_eq!(request.api_version, 0);
367        assert_eq!(request.correlation_id, 1);
368        assert_eq!(request.client_id, "");
369    }
370
371    #[test]
372    fn test_parse_request_with_client_id() {
373        let handler = KafkaProtocolHandler::new();
374        let client_id = b"test-client";
375        let client_id_len = client_id.len() as i16;
376
377        let mut data = vec![0u8; 14 + client_id.len()];
378        // API key: 0 (Produce)
379        data[4] = 0;
380        data[5] = 0;
381        // API version: 7
382        data[6] = 0;
383        data[7] = 7;
384        // Correlation ID: 42
385        data[8] = 0;
386        data[9] = 0;
387        data[10] = 0;
388        data[11] = 42;
389        // Client ID length
390        data[12] = (client_id_len >> 8) as u8;
391        data[13] = (client_id_len & 0xFF) as u8;
392        // Client ID
393        data[14..].copy_from_slice(client_id);
394
395        let result = handler.parse_request(&data);
396        assert!(result.is_ok());
397        let request = result.unwrap();
398        assert_eq!(request.api_key, 0);
399        assert_eq!(request.api_version, 7);
400        assert_eq!(request.correlation_id, 42);
401        assert_eq!(request.client_id, "test-client");
402        assert!(matches!(request.request_type, KafkaRequestType::Produce));
403    }
404
405    #[test]
406    fn test_parse_request_produce() {
407        let handler = KafkaProtocolHandler::new();
408        let mut data = vec![0u8; 14];
409        data[4] = 0; // API key: 0 (Produce)
410        data[5] = 0;
411        data[12] = 0; // Client ID length: 0
412        data[13] = 0;
413
414        let result = handler.parse_request(&data).unwrap();
415        assert!(matches!(result.request_type, KafkaRequestType::Produce));
416    }
417
418    #[test]
419    fn test_parse_request_fetch() {
420        let handler = KafkaProtocolHandler::new();
421        let mut data = vec![0u8; 14];
422        data[4] = 0; // API key: 1 (Fetch)
423        data[5] = 1;
424        data[12] = 0;
425        data[13] = 0;
426
427        let result = handler.parse_request(&data).unwrap();
428        assert!(matches!(result.request_type, KafkaRequestType::Fetch));
429    }
430
431    #[test]
432    fn test_parse_request_metadata() {
433        let handler = KafkaProtocolHandler::new();
434        let mut data = vec![0u8; 14];
435        data[4] = 0; // API key: 3 (Metadata)
436        data[5] = 3;
437        data[12] = 0;
438        data[13] = 0;
439
440        let result = handler.parse_request(&data).unwrap();
441        assert!(matches!(result.request_type, KafkaRequestType::Metadata));
442    }
443
444    #[test]
445    fn test_parse_request_list_groups() {
446        let handler = KafkaProtocolHandler::new();
447        let mut data = vec![0u8; 14];
448        data[4] = 0; // API key: 9 (ListGroups)
449        data[5] = 9;
450        data[12] = 0;
451        data[13] = 0;
452
453        let result = handler.parse_request(&data).unwrap();
454        assert!(matches!(result.request_type, KafkaRequestType::ListGroups));
455    }
456
457    #[test]
458    fn test_parse_request_describe_groups() {
459        let handler = KafkaProtocolHandler::new();
460        let mut data = vec![0u8; 14];
461        data[4] = 0; // API key: 15 (DescribeGroups)
462        data[5] = 15;
463        data[12] = 0;
464        data[13] = 0;
465
466        let result = handler.parse_request(&data).unwrap();
467        assert!(matches!(result.request_type, KafkaRequestType::DescribeGroups));
468    }
469
470    #[test]
471    fn test_parse_request_api_versions() {
472        let handler = KafkaProtocolHandler::new();
473        let mut data = vec![0u8; 14];
474        data[4] = 0; // API key: 18 (ApiVersions)
475        data[5] = 18;
476        data[12] = 0;
477        data[13] = 0;
478
479        let result = handler.parse_request(&data).unwrap();
480        assert!(matches!(result.request_type, KafkaRequestType::ApiVersions));
481    }
482
483    #[test]
484    fn test_parse_request_create_topics() {
485        let handler = KafkaProtocolHandler::new();
486        let mut data = vec![0u8; 14];
487        data[4] = 0; // API key: 19 (CreateTopics)
488        data[5] = 19;
489        data[12] = 0;
490        data[13] = 0;
491
492        let result = handler.parse_request(&data).unwrap();
493        assert!(matches!(result.request_type, KafkaRequestType::CreateTopics));
494    }
495
496    #[test]
497    fn test_parse_request_delete_topics() {
498        let handler = KafkaProtocolHandler::new();
499        let mut data = vec![0u8; 14];
500        data[4] = 0; // API key: 20 (DeleteTopics)
501        data[5] = 20;
502        data[12] = 0;
503        data[13] = 0;
504
505        let result = handler.parse_request(&data).unwrap();
506        assert!(matches!(result.request_type, KafkaRequestType::DeleteTopics));
507    }
508
509    #[test]
510    fn test_parse_request_describe_configs() {
511        let handler = KafkaProtocolHandler::new();
512        let mut data = vec![0u8; 14];
513        data[4] = 0; // API key: 32 (DescribeConfigs)
514        data[5] = 32;
515        data[12] = 0;
516        data[13] = 0;
517
518        let result = handler.parse_request(&data).unwrap();
519        assert!(matches!(result.request_type, KafkaRequestType::DescribeConfigs));
520    }
521
522    #[test]
523    fn test_parse_request_unsupported_api_defaults_to_api_versions() {
524        let handler = KafkaProtocolHandler::new();
525        let mut data = vec![0u8; 14];
526        data[4] = 0; // API key: 99 (unsupported)
527        data[5] = 99;
528        data[12] = 0;
529        data[13] = 0;
530
531        let result = handler.parse_request(&data).unwrap();
532        // Unsupported APIs default to ApiVersions
533        assert!(matches!(result.request_type, KafkaRequestType::ApiVersions));
534    }
535
536    #[test]
537    fn test_parse_request_invalid_client_id_length() {
538        let handler = KafkaProtocolHandler::new();
539        let mut data = vec![0u8; 14];
540        data[4] = 0;
541        data[5] = 18;
542        // Client ID length: 100 (but not enough data)
543        data[12] = 0;
544        data[13] = 100;
545
546        let result = handler.parse_request(&data);
547        assert!(result.is_err());
548    }
549
550    #[test]
551    fn test_parse_request_missing_client_id_length() {
552        let handler = KafkaProtocolHandler::new();
553        let data = vec![0u8; 12]; // Too short to contain client ID length
554
555        let result = handler.parse_request(&data);
556        assert!(result.is_err());
557    }
558
559    #[test]
560    fn test_parse_request_max_values() {
561        let handler = KafkaProtocolHandler::new();
562        let mut data = vec![0u8; 14];
563        // Max API key value
564        data[4] = 0x7F;
565        data[5] = 0xFF;
566        // Max API version
567        data[6] = 0x7F;
568        data[7] = 0xFF;
569        // Max correlation ID
570        data[8] = 0x7F;
571        data[9] = 0xFF;
572        data[10] = 0xFF;
573        data[11] = 0xFF;
574        // Empty client ID
575        data[12] = 0;
576        data[13] = 0;
577
578        let result = handler.parse_request(&data);
579        assert!(result.is_ok());
580        let request = result.unwrap();
581        assert_eq!(request.api_key, 0x7FFF);
582        assert_eq!(request.api_version, 0x7FFF);
583        assert_eq!(request.correlation_id, 0x7FFFFFFF);
584    }
585
586    // ==================== serialize_response Tests ====================
587
588    #[test]
589    fn test_serialize_response_api_versions() {
590        let handler = KafkaProtocolHandler::new();
591        let response = KafkaResponse::ApiVersions;
592        let correlation_id = 12345;
593
594        let result = handler.serialize_response(&response, correlation_id);
595        assert!(result.is_ok());
596
597        let data = result.unwrap();
598        assert!(data.len() > 0);
599
600        // Check correlation ID (first 4 bytes)
601        let corr_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
602        assert_eq!(corr_id, correlation_id);
603
604        // Check error code (next 2 bytes)
605        let error_code = i16::from_be_bytes([data[4], data[5]]);
606        assert_eq!(error_code, 0); // No error
607    }
608
609    #[test]
610    fn test_serialize_response_create_topics() {
611        let handler = KafkaProtocolHandler::new();
612        let response = KafkaResponse::CreateTopics;
613        let correlation_id = 999;
614
615        let result = handler.serialize_response(&response, correlation_id);
616        assert!(result.is_ok());
617
618        let data = result.unwrap();
619        assert!(data.len() > 0);
620
621        // Check correlation ID
622        let corr_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
623        assert_eq!(corr_id, correlation_id);
624    }
625
626    #[test]
627    fn test_serialize_response_metadata() {
628        let handler = KafkaProtocolHandler::new();
629        let response = KafkaResponse::Metadata;
630        let correlation_id = 1;
631
632        let result = handler.serialize_response(&response, correlation_id);
633        assert!(result.is_ok());
634
635        let data = result.unwrap();
636        assert!(data.len() >= 6); // correlation_id (4) + error_code (2)
637    }
638
639    #[test]
640    fn test_serialize_response_produce() {
641        let handler = KafkaProtocolHandler::new();
642        let response = KafkaResponse::Produce;
643        let correlation_id = 42;
644
645        let result = handler.serialize_response(&response, correlation_id);
646        assert!(result.is_ok());
647    }
648
649    #[test]
650    fn test_serialize_response_fetch() {
651        let handler = KafkaProtocolHandler::new();
652        let response = KafkaResponse::Fetch;
653        let correlation_id = 100;
654
655        let result = handler.serialize_response(&response, correlation_id);
656        assert!(result.is_ok());
657    }
658
659    #[test]
660    fn test_serialize_response_list_groups() {
661        let handler = KafkaProtocolHandler::new();
662        let response = KafkaResponse::ListGroups;
663        let correlation_id = 200;
664
665        let result = handler.serialize_response(&response, correlation_id);
666        assert!(result.is_ok());
667    }
668
669    #[test]
670    fn test_serialize_response_describe_groups() {
671        let handler = KafkaProtocolHandler::new();
672        let response = KafkaResponse::DescribeGroups;
673        let correlation_id = 300;
674
675        let result = handler.serialize_response(&response, correlation_id);
676        assert!(result.is_ok());
677    }
678
679    #[test]
680    fn test_serialize_response_delete_topics() {
681        let handler = KafkaProtocolHandler::new();
682        let response = KafkaResponse::DeleteTopics;
683        let correlation_id = 400;
684
685        let result = handler.serialize_response(&response, correlation_id);
686        assert!(result.is_ok());
687    }
688
689    #[test]
690    fn test_serialize_response_describe_configs() {
691        let handler = KafkaProtocolHandler::new();
692        let response = KafkaResponse::DescribeConfigs;
693        let correlation_id = 500;
694
695        let result = handler.serialize_response(&response, correlation_id);
696        assert!(result.is_ok());
697    }
698
699    #[test]
700    fn test_serialize_response_negative_correlation_id() {
701        let handler = KafkaProtocolHandler::new();
702        let response = KafkaResponse::ApiVersions;
703        let correlation_id = -1;
704
705        let result = handler.serialize_response(&response, correlation_id);
706        assert!(result.is_ok());
707
708        let data = result.unwrap();
709        let corr_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
710        assert_eq!(corr_id, -1);
711    }
712
713    #[test]
714    fn test_serialize_response_zero_correlation_id() {
715        let handler = KafkaProtocolHandler::new();
716        let response = KafkaResponse::ApiVersions;
717        let correlation_id = 0;
718
719        let result = handler.serialize_response(&response, correlation_id);
720        assert!(result.is_ok());
721
722        let data = result.unwrap();
723        let corr_id = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
724        assert_eq!(corr_id, 0);
725    }
726
727    // ==================== KafkaRequest Debug Tests ====================
728
729    #[test]
730    fn test_kafka_request_debug() {
731        let request = KafkaRequest {
732            api_key: 0,
733            api_version: 7,
734            correlation_id: 123,
735            client_id: "test".to_string(),
736            request_type: KafkaRequestType::Produce,
737        };
738
739        let debug_str = format!("{:?}", request);
740        assert!(debug_str.contains("KafkaRequest"));
741        assert!(debug_str.contains("api_key"));
742    }
743
744    #[test]
745    fn test_kafka_request_type_debug() {
746        let metadata = KafkaRequestType::Metadata;
747        let debug_str = format!("{:?}", metadata);
748        assert!(debug_str.contains("Metadata"));
749    }
750
751    #[test]
752    fn test_kafka_response_debug() {
753        let response = KafkaResponse::Produce;
754        let debug_str = format!("{:?}", response);
755        assert!(debug_str.contains("Produce"));
756    }
757
758    // ==================== API Version Ranges Tests ====================
759
760    #[test]
761    fn test_api_version_ranges_complete() {
762        let handler = KafkaProtocolHandler::new();
763
764        // Test all configured API versions
765        let api_configs = vec![
766            (0, 0, 12), // Produce
767            (1, 0, 16), // Fetch
768            (3, 0, 12), // Metadata
769            (9, 0, 5),  // ListGroups
770            (15, 0, 9), // DescribeGroups
771            (16, 0, 9), // DescribeGroups (alternative)
772            (18, 0, 4), // ApiVersions
773            (19, 0, 7), // CreateTopics
774            (20, 0, 6), // DeleteTopics
775            (32, 0, 4), // DescribeConfigs
776            (49, 0, 4), // DescribeConfigs (alternative)
777        ];
778
779        for (api_key, min_ver, max_ver) in api_configs {
780            assert!(handler.is_api_version_supported(api_key, min_ver));
781            assert!(handler.is_api_version_supported(api_key, max_ver));
782            assert!(!handler.is_api_version_supported(api_key, max_ver + 1));
783            if min_ver > 0 {
784                assert!(!handler.is_api_version_supported(api_key, min_ver - 1));
785            }
786        }
787    }
788
789    // ==================== Edge Case Tests ====================
790
791    #[test]
792    fn test_parse_request_large_client_id() {
793        let handler = KafkaProtocolHandler::new();
794        let client_id = "a".repeat(1000); // Large client ID
795        let client_id_len = client_id.len() as i16;
796
797        let mut data = vec![0u8; 14 + client_id.len()];
798        data[4] = 0;
799        data[5] = 18;
800        data[12] = (client_id_len >> 8) as u8;
801        data[13] = (client_id_len & 0xFF) as u8;
802        data[14..].copy_from_slice(client_id.as_bytes());
803
804        let result = handler.parse_request(&data);
805        assert!(result.is_ok());
806        assert_eq!(result.unwrap().client_id, client_id);
807    }
808
809    #[test]
810    fn test_parse_request_invalid_utf8_client_id() {
811        let handler = KafkaProtocolHandler::new();
812        let mut data = vec![0u8; 17];
813        data[4] = 0;
814        data[5] = 18;
815        data[12] = 0;
816        data[13] = 3; // 3 bytes client ID
817                      // Invalid UTF-8 sequence
818        data[14] = 0xFF;
819        data[15] = 0xFF;
820        data[16] = 0xFF;
821
822        let result = handler.parse_request(&data);
823        assert!(result.is_err());
824    }
825
826    #[test]
827    fn test_api_version_struct() {
828        let api_version = ApiVersion {
829            min_version: 0,
830            max_version: 10,
831        };
832
833        assert_eq!(api_version.min_version, 0);
834        assert_eq!(api_version.max_version, 10);
835    }
836}